Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
22f0063f
Commit
22f0063f
authored
Oct 10, 2019
by
高雅喆
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
93844478
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
209 additions
and
182 deletions
+209
-182
dist_update_user_portrait_service.py
eda/smart_rank/dist_update_user_portrait_service.py
+1
-181
evaluation_metrics.py
eda/smart_rank/evaluation_metrics.py
+1
-1
utils.py
eda/smart_rank/utils.py
+207
-0
No files found.
eda/smart_rank/dist_update_user_portrait_service.py
View file @
22f0063f
...
...
@@ -17,187 +17,7 @@ import numpy as np
import
pandas
as
pd
from
pyspark.sql.functions
import
lit
from
pyspark.sql.functions
import
concat_ws
def
send_email
(
app
,
id
,
e
):
# 第三方 SMTP 服务
mail_host
=
'smtp.exmail.qq.com'
# 设置服务器
mail_user
=
"gaoyazhe@igengmei.com"
# 用户名
mail_pass
=
"VCrKTui99a7ALhiK"
# 口令
sender
=
'gaoyazhe@igengmei.com'
receivers
=
[
'gaoyazhe@igengmei.com'
]
# 接收邮件,可设置为你的QQ邮箱或者其他邮箱
e
=
str
(
e
)
msg
=
MIMEMultipart
()
part
=
MIMEText
(
'app_id:'
+
id
+
':fail'
,
'plain'
,
'utf-8'
)
msg
.
attach
(
part
)
msg
[
'From'
]
=
formataddr
([
"gaoyazhe"
,
sender
])
# 括号里的对应收件人邮箱昵称、收件人邮箱账号
msg
[
'To'
]
=
";"
.
join
(
receivers
)
# message['Cc'] = ";".join(cc_reciver)
msg
[
'Subject'
]
=
'spark streaming:app_name:'
+
app
with
open
(
'error.txt'
,
'w'
)
as
f
:
f
.
write
(
e
)
f
.
close
()
part
=
MIMEApplication
(
open
(
'error.txt'
,
'r'
)
.
read
())
part
.
add_header
(
'Content-Disposition'
,
'attachment'
,
filename
=
"error.txt"
)
msg
.
attach
(
part
)
try
:
smtpObj
=
smtplib
.
SMTP_SSL
(
mail_host
,
465
)
smtpObj
.
login
(
mail_user
,
mail_pass
)
smtpObj
.
sendmail
(
sender
,
receivers
,
msg
.
as_string
())
except
smtplib
.
SMTPException
:
print
(
'error'
)
def
get_data_by_mysql
(
host
,
port
,
user
,
passwd
,
db
,
sql
):
try
:
db
=
pymysql
.
connect
(
host
=
host
,
port
=
port
,
user
=
user
,
passwd
=
passwd
,
db
=
db
,
cursorclass
=
pymysql
.
cursors
.
DictCursor
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
results
=
cursor
.
fetchall
()
db
.
close
()
return
results
except
Exception
as
e
:
print
(
e
)
def
get_all_search_word_and_synonym_tags
():
"""
:return:dict {"search_word1":[tag_list1],"search_word2":[tag_list2]...}
"""
try
:
sql
=
"select a.keyword , c.id from api_wordrel a "
\
"left join api_wordrelsynonym b on a.id = b.wordrel_id "
\
"left join api_tag c on b.word=c.name "
\
"where a.category in (1,13,10,11,12) and c.tag_type+0<'4'+0 and c.is_online=1"
mysql_results
=
get_data_by_mysql
(
'172.16.30.141'
,
3306
,
'work'
,
'BJQaT9VzDcuPBqkd'
,
'zhengxing'
,
sql
)
result_dict
=
dict
()
for
data
in
mysql_results
:
if
data
[
'keyword'
]
not
in
result_dict
:
result_dict
[
data
[
'keyword'
]]
=
[
data
[
'id'
]]
else
:
result_dict
[
data
[
'keyword'
]]
.
append
(
data
[
'id'
])
return
result_dict
except
Exception
as
e
:
print
(
e
)
def
get_all_synonym_tags
():
"""
:return:dict {"search_word1":[tag_list1],"search_word2":[tag_list2]...}
"""
try
:
sql
=
"select a.word, b.id from api_wordrelsynonym a left join api_tag b "
\
"on a.word=b.name where b.tag_type+0<'4'+0 and b.is_online=1"
mysql_results
=
get_data_by_mysql
(
'172.16.30.141'
,
3306
,
'work'
,
'BJQaT9VzDcuPBqkd'
,
'zhengxing'
,
sql
)
result_dict
=
dict
()
for
data
in
mysql_results
:
if
data
[
'word'
]
not
in
result_dict
:
result_dict
[
data
[
'word'
]]
=
[
data
[
'id'
]]
else
:
result_dict
[
data
[
'word'
]]
.
append
(
data
[
'id'
])
return
result_dict
except
Exception
as
e
:
print
(
e
)
def
get_all_word_tags
():
try
:
search_word_and_synonym_tags
=
get_all_search_word_and_synonym_tags
()
synonym_tags
=
get_all_synonym_tags
()
if
search_word_and_synonym_tags
and
synonym_tags
:
return
{
**
synonym_tags
,
**
search_word_and_synonym_tags
}
except
Exception
as
e
:
print
(
e
)
def
get_all_tag_tag_type
():
"""
:return:dict {tag_id1:tag_type1,tag_id2:tag_type2...}
"""
try
:
sql
=
"select id,tag_type from api_tag where tag_type+0<'4'+0 and is_online=1"
mysql_results
=
get_data_by_mysql
(
'172.16.30.141'
,
3306
,
'work'
,
'BJQaT9VzDcuPBqkd'
,
'zhengxing'
,
sql
)
result_dict
=
dict
()
for
data
in
mysql_results
:
result_dict
[
data
[
'id'
]]
=
data
[
'tag_type'
]
return
result_dict
except
Exception
as
e
:
print
(
e
)
def
get_all_3tag_2tag
():
try
:
sql
=
"select a.child_id,a.parent_id from api_tagrelation a"
\
" left join api_tag b on a.parent_id=b.id "
\
"where a.child_id in (select id from api_tag where tag_type='3' and is_online=1) "
\
"and b.tag_type='2'"
mysql_results
=
get_data_by_mysql
(
'172.16.30.141'
,
3306
,
'work'
,
'BJQaT9VzDcuPBqkd'
,
'zhengxing'
,
sql
)
result_dict
=
dict
()
for
data
in
mysql_results
:
if
data
[
'child_id'
]
not
in
result_dict
:
result_dict
[
data
[
'child_id'
]]
=
[
data
[
'parent_id'
]]
else
:
result_dict
[
data
[
'child_id'
]]
.
append
(
data
[
'parent_id'
])
return
result_dict
except
Exception
as
e
:
print
(
e
)
def
get_tag2_from_tag3
(
tag3
,
all_3tag_2tag
,
user_log_df_tag2_list
):
try
:
tag2s
=
[]
if
tag3
in
all_3tag_2tag
:
tag2s
=
all_3tag_2tag
[
tag3
]
for
tag2
in
tag2s
:
if
tag2
in
user_log_df_tag2_list
:
return
tag2
return
tag3
except
Exception
as
e
:
print
(
e
)
def
compute_henqiang
(
x
):
score
=
15
-
x
*
((
15
-
0.5
)
/
180
)
if
score
>
0.5
:
return
score
else
:
return
0.5
def
compute_jiaoqiang
(
x
):
score
=
12
-
x
*
(
12
/
180
)
if
score
>
0.5
:
return
score
else
:
return
0.5
def
compute_ruoyixiang
(
x
):
score
=
5
-
x
*
((
5
-
0.5
)
/
180
)
if
score
>
0.5
:
return
score
else
:
return
0.5
def
compute_validate
(
x
):
score
=
10
-
x
*
((
10
-
0.5
)
/
180
)
if
score
>
0.5
:
return
score
else
:
return
0.5
def
compute_ai_scan
(
x
):
score
=
2
-
x
*
((
2
-
0.5
)
/
180
)
if
score
>
0.5
:
return
score
else
:
return
0.5
def
get_action_tag_count
(
df
,
action_time
):
try
:
if
not
df
[
df
[
'time'
]
==
action_time
]
.
empty
:
return
len
(
df
[
df
[
'time'
]
==
action_time
])
else
:
return
1
except
Exception
as
e
:
print
(
e
)
from
eda.smart_rank.utils
import
*
def
get_user_service_portrait
(
cl_id
,
all_word_tags
,
all_tag_tag_type
,
all_3tag_2tag
,
size
=
10
):
...
...
eda/smart_rank/evaluation_metrics.py
View file @
22f0063f
...
...
@@ -5,7 +5,7 @@ import time
import
json
import
numpy
as
np
import
pandas
as
pd
from
eda.smart_rank.
dist_update_user_portrait_service
import
*
from
eda.smart_rank.
utils
import
*
def
get_user_service_portrait_not_alipay
(
cl_id
,
all_word_tags
,
all_tag_tag_type
,
all_3tag_2tag
,
size
=
10
):
...
...
eda/smart_rank/utils.py
0 → 100644
View file @
22f0063f
import
pymysql
import
redis
import
datetime
import
time
import
json
import
numpy
as
np
import
pandas
as
pd
def
send_email
(
app
,
id
,
e
):
# 第三方 SMTP 服务
mail_host
=
'smtp.exmail.qq.com'
# 设置服务器
mail_user
=
"gaoyazhe@igengmei.com"
# 用户名
mail_pass
=
"VCrKTui99a7ALhiK"
# 口令
sender
=
'gaoyazhe@igengmei.com'
receivers
=
[
'gaoyazhe@igengmei.com'
]
# 接收邮件,可设置为你的QQ邮箱或者其他邮箱
e
=
str
(
e
)
msg
=
MIMEMultipart
()
part
=
MIMEText
(
'app_id:'
+
id
+
':fail'
,
'plain'
,
'utf-8'
)
msg
.
attach
(
part
)
msg
[
'From'
]
=
formataddr
([
"gaoyazhe"
,
sender
])
# 括号里的对应收件人邮箱昵称、收件人邮箱账号
msg
[
'To'
]
=
";"
.
join
(
receivers
)
# message['Cc'] = ";".join(cc_reciver)
msg
[
'Subject'
]
=
'spark streaming:app_name:'
+
app
with
open
(
'error.txt'
,
'w'
)
as
f
:
f
.
write
(
e
)
f
.
close
()
part
=
MIMEApplication
(
open
(
'error.txt'
,
'r'
)
.
read
())
part
.
add_header
(
'Content-Disposition'
,
'attachment'
,
filename
=
"error.txt"
)
msg
.
attach
(
part
)
try
:
smtpObj
=
smtplib
.
SMTP_SSL
(
mail_host
,
465
)
smtpObj
.
login
(
mail_user
,
mail_pass
)
smtpObj
.
sendmail
(
sender
,
receivers
,
msg
.
as_string
())
except
smtplib
.
SMTPException
:
print
(
'error'
)
def
get_data_by_mysql
(
host
,
port
,
user
,
passwd
,
db
,
sql
):
try
:
db
=
pymysql
.
connect
(
host
=
host
,
port
=
port
,
user
=
user
,
passwd
=
passwd
,
db
=
db
,
cursorclass
=
pymysql
.
cursors
.
DictCursor
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
results
=
cursor
.
fetchall
()
db
.
close
()
return
results
except
Exception
as
e
:
print
(
e
)
def
get_all_search_word_synonym_tags
():
"""
:return:dict {"search_word1":[tag_list1],"search_word2":[tag_list2]...}
"""
try
:
sql
=
"select a.keyword , c.id from api_wordrel a "
\
"left join api_wordrelsynonym b on a.id = b.wordrel_id "
\
"left join api_tag c on b.word=c.name "
\
"where a.category in (1,13,10,11,12) and c.tag_type+0<'4'+0 and c.is_online=1"
mysql_results
=
get_data_by_mysql
(
'172.16.30.141'
,
3306
,
'work'
,
'BJQaT9VzDcuPBqkd'
,
'zhengxing'
,
sql
)
result_dict
=
dict
()
for
data
in
mysql_results
:
if
data
[
'keyword'
]
not
in
result_dict
:
result_dict
[
data
[
'keyword'
]]
=
[
data
[
'id'
]]
else
:
result_dict
[
data
[
'keyword'
]]
.
append
(
data
[
'id'
])
return
result_dict
except
Exception
as
e
:
print
(
e
)
def
get_all_synonym_tags
():
"""
:return:dict {"search_word1":[tag_list1],"search_word2":[tag_list2]...}
"""
try
:
sql
=
"select a.word, b.id from api_wordrelsynonym a left join api_tag b "
\
"on a.word=b.name where b.tag_type+0<'4'+0 and b.is_online=1"
mysql_results
=
get_data_by_mysql
(
'172.16.30.141'
,
3306
,
'work'
,
'BJQaT9VzDcuPBqkd'
,
'zhengxing'
,
sql
)
result_dict
=
dict
()
for
data
in
mysql_results
:
if
data
[
'word'
]
not
in
result_dict
:
result_dict
[
data
[
'word'
]]
=
[
data
[
'id'
]]
else
:
result_dict
[
data
[
'word'
]]
.
append
(
data
[
'id'
])
return
result_dict
except
Exception
as
e
:
print
(
e
)
def
get_all_api_tags
():
"""
:return:dict {"search_word1":[tag_list1],"search_word2":[tag_list2]...}
"""
try
:
sql
=
"select name, id from api_tag where tag_type+0<'4'+0 and is_online=1"
mysql_results
=
get_data_by_mysql
(
'172.16.30.141'
,
3306
,
'work'
,
'BJQaT9VzDcuPBqkd'
,
'zhengxing'
,
sql
)
result_dict
=
dict
()
for
data
in
mysql_results
:
if
data
[
'name'
]
not
in
result_dict
:
result_dict
[
data
[
'name'
]]
=
[
data
[
'id'
]]
else
:
result_dict
[
data
[
'name'
]]
.
append
(
data
[
'id'
])
return
result_dict
except
Exception
as
e
:
print
(
e
)
def
get_all_word_tags
():
try
:
search_word_synonym_tags
=
get_all_search_word_synonym_tags
()
synonym_tags
=
get_all_synonym_tags
()
api_tags
=
get_all_api_tags
()
return
{
**
search_word_synonym_tags
,
**
synonym_tags
,
**
api_tags
}
except
Exception
as
e
:
print
(
e
)
def
get_all_tag_tag_type
():
"""
:return:dict {tag_id1:tag_type1,tag_id2:tag_type2...}
"""
try
:
sql
=
"select id,tag_type from api_tag where tag_type+0<'4'+0 and is_online=1"
mysql_results
=
get_data_by_mysql
(
'172.16.30.141'
,
3306
,
'work'
,
'BJQaT9VzDcuPBqkd'
,
'zhengxing'
,
sql
)
result_dict
=
dict
()
for
data
in
mysql_results
:
result_dict
[
data
[
'id'
]]
=
data
[
'tag_type'
]
return
result_dict
except
Exception
as
e
:
print
(
e
)
def
get_all_3tag_2tag
():
try
:
sql
=
"select a.child_id,a.parent_id from api_tagrelation a"
\
" left join api_tag b on a.parent_id=b.id "
\
"where a.child_id in (select id from api_tag where tag_type='3' and is_online=1) "
\
"and b.tag_type='2'"
mysql_results
=
get_data_by_mysql
(
'172.16.30.141'
,
3306
,
'work'
,
'BJQaT9VzDcuPBqkd'
,
'zhengxing'
,
sql
)
result_dict
=
dict
()
for
data
in
mysql_results
:
if
data
[
'child_id'
]
not
in
result_dict
:
result_dict
[
data
[
'child_id'
]]
=
[
data
[
'parent_id'
]]
else
:
result_dict
[
data
[
'child_id'
]]
.
append
(
data
[
'parent_id'
])
return
result_dict
except
Exception
as
e
:
print
(
e
)
def
get_tag2_from_tag3
(
tag3
,
all_3tag_2tag
,
user_log_df_tag2_list
):
try
:
tag2s
=
[]
if
tag3
in
all_3tag_2tag
:
tag2s
=
all_3tag_2tag
[
tag3
]
for
tag2
in
tag2s
:
if
tag2
in
user_log_df_tag2_list
:
return
tag2
return
tag3
except
Exception
as
e
:
print
(
e
)
def
compute_henqiang
(
x
):
score
=
15
-
x
*
((
15
-
0.5
)
/
180
)
if
score
>
0.5
:
return
score
else
:
return
0.5
def
compute_jiaoqiang
(
x
):
score
=
12
-
x
*
(
12
/
180
)
if
score
>
0.5
:
return
score
else
:
return
0.5
def
compute_ruoyixiang
(
x
):
score
=
5
-
x
*
((
5
-
0.5
)
/
180
)
if
score
>
0.5
:
return
score
else
:
return
0.5
def
compute_validate
(
x
):
score
=
10
-
x
*
((
10
-
0.5
)
/
180
)
if
score
>
0.5
:
return
score
else
:
return
0.5
def
compute_ai_scan
(
x
):
score
=
2
-
x
*
((
2
-
0.5
)
/
180
)
if
score
>
0.5
:
return
score
else
:
return
0.5
def
get_action_tag_count
(
df
,
action_time
):
try
:
if
not
df
[
df
[
'time'
]
==
action_time
]
.
empty
:
return
len
(
df
[
df
[
'time'
]
==
action_time
])
else
:
return
1
except
Exception
as
e
:
print
(
e
)
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment