Commit ad8ef78a authored by 赵威's avatar 赵威

try offline

parent a073d013
...@@ -9,7 +9,7 @@ from pyspark import SparkConf ...@@ -9,7 +9,7 @@ from pyspark import SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from tool import (get_doris_prod, get_redis_client, get_tag3_user_log, get_user_portrait_tag3_with_score, send_email, from tool import (get_doris_prod, get_redis_client, get_tag3_user_log, get_user_portrait_tag3_with_score, send_email,
write_user_portrait, write_user_portrait_by_event, write_user_portrait_doris) write_user_portrait)
# [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873, '植发际线': 7.1}] # [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873, '植发际线': 7.1}]
...@@ -130,59 +130,59 @@ def _get_all_by_projects(name_lst): ...@@ -130,59 +130,59 @@ def _get_all_by_projects(name_lst):
return None return None
def update_tag3_user_portrait_by_event(cl_id): # def update_tag3_user_portrait_by_event(cl_id):
user_df = get_tag3_user_log(cl_id) # user_df = get_tag3_user_log(cl_id)
if not user_df.empty: # if not user_df.empty:
user_df["first_solutions"] = list(zip(user_df["first_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"])) # user_df["first_solutions"] = list(zip(user_df["first_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_solutions"] = list(zip(user_df["second_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"])) # user_df["second_solutions"] = list(zip(user_df["second_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["first_demands"] = list(zip(user_df["first_demands"].apply(lambda x: x.split(",")), user_df["tag_score"])) # user_df["first_demands"] = list(zip(user_df["first_demands"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_demands"] = list(zip(user_df["second_demands"].apply(lambda x: x.split(",")), user_df["tag_score"])) # user_df["second_demands"] = list(zip(user_df["second_demands"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["first_positions"] = list(zip(user_df["first_positions"].apply(lambda x: x.split(",")), user_df["tag_score"])) # user_df["first_positions"] = list(zip(user_df["first_positions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_positions"] = list(zip(user_df["second_positions"].apply(lambda x: x.split(",")), user_df["tag_score"])) # user_df["second_positions"] = list(zip(user_df["second_positions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["projects"] = list(zip(user_df["projects"].apply(lambda x: x.split(",")), user_df["tag_score"])) # user_df["projects"] = list(zip(user_df["projects"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["first_solutions_dict"] = user_df["first_solutions"].apply(lambda x: make_dict_from_pair(x)) # user_df["first_solutions_dict"] = user_df["first_solutions"].apply(lambda x: make_dict_from_pair(x))
user_df["second_solutions_dict"] = user_df["second_solutions"].apply(lambda x: make_dict_from_pair(x)) # user_df["second_solutions_dict"] = user_df["second_solutions"].apply(lambda x: make_dict_from_pair(x))
user_df["first_demands_dict"] = user_df["first_demands"].apply(lambda x: make_dict_from_pair(x)) # user_df["first_demands_dict"] = user_df["first_demands"].apply(lambda x: make_dict_from_pair(x))
user_df["second_demands_dict"] = user_df["second_demands"].apply(lambda x: make_dict_from_pair(x)) # user_df["second_demands_dict"] = user_df["second_demands"].apply(lambda x: make_dict_from_pair(x))
user_df["first_positions_dict"] = user_df["first_positions"].apply(lambda x: make_dict_from_pair(x)) # user_df["first_positions_dict"] = user_df["first_positions"].apply(lambda x: make_dict_from_pair(x))
user_df["second_positions_dict"] = user_df["second_positions"].apply(lambda x: make_dict_from_pair(x)) # user_df["second_positions_dict"] = user_df["second_positions"].apply(lambda x: make_dict_from_pair(x))
user_df["projects_dict"] = user_df["projects"].apply(lambda x: make_dict_from_pair(x)) # user_df["projects_dict"] = user_df["projects"].apply(lambda x: make_dict_from_pair(x))
user_df["first_solutions"] = list(zip(user_df["event_cn"], user_df["first_solutions_dict"])) # user_df["first_solutions"] = list(zip(user_df["event_cn"], user_df["first_solutions_dict"]))
user_df["second_solutions"] = list(zip(user_df["event_cn"], user_df["second_solutions_dict"])) # user_df["second_solutions"] = list(zip(user_df["event_cn"], user_df["second_solutions_dict"]))
user_df["first_demands"] = list(zip(user_df["event_cn"], user_df["first_demands_dict"])) # user_df["first_demands"] = list(zip(user_df["event_cn"], user_df["first_demands_dict"]))
user_df["second_demands"] = list(zip(user_df["event_cn"], user_df["second_demands_dict"])) # user_df["second_demands"] = list(zip(user_df["event_cn"], user_df["second_demands_dict"]))
user_df["first_positions"] = list(zip(user_df["event_cn"], user_df["first_positions_dict"])) # user_df["first_positions"] = list(zip(user_df["event_cn"], user_df["first_positions_dict"]))
user_df["second_positions"] = list(zip(user_df["event_cn"], user_df["second_positions_dict"])) # user_df["second_positions"] = list(zip(user_df["event_cn"], user_df["second_positions_dict"]))
user_df["projects"] = list(zip(user_df["event_cn"], user_df["projects_dict"])) # user_df["projects"] = list(zip(user_df["event_cn"], user_df["projects_dict"]))
first_solutions_dict = merge_results_by_event(user_df["first_solutions"].tolist()) # first_solutions_dict = merge_results_by_event(user_df["first_solutions"].tolist())
second_solutions_dict = merge_results_by_event(user_df["second_solutions"].tolist()) # second_solutions_dict = merge_results_by_event(user_df["second_solutions"].tolist())
first_demands_dict = merge_results_by_event(user_df["first_demands"].tolist()) # first_demands_dict = merge_results_by_event(user_df["first_demands"].tolist())
second_demands_dict = merge_results_by_event(user_df["second_demands"].tolist()) # second_demands_dict = merge_results_by_event(user_df["second_demands"].tolist())
first_positions_dict = merge_results_by_event(user_df["first_positions"].tolist()) # first_positions_dict = merge_results_by_event(user_df["first_positions"].tolist())
second_positions_dict = merge_results_by_event(user_df["second_positions"].tolist()) # second_positions_dict = merge_results_by_event(user_df["second_positions"].tolist())
projects_dict = merge_results_by_event(user_df["projects"].tolist()) # projects_dict = merge_results_by_event(user_df["projects"].tolist())
events = set( # events = set(
list(first_solutions_dict.keys()) + list(second_solutions_dict.keys()) + list(first_demands_dict.keys()) + # list(first_solutions_dict.keys()) + list(second_solutions_dict.keys()) + list(first_demands_dict.keys()) +
list(second_demands_dict.keys()) + list(first_positions_dict.keys()) + list(second_positions_dict.keys()) + # list(second_demands_dict.keys()) + list(first_positions_dict.keys()) + list(second_positions_dict.keys()) +
list(projects_dict.keys())) # list(projects_dict.keys()))
for e in events: # for e in events:
first_solutions = ",".join(first_solutions_dict.get(e, [])) # first_solutions = ",".join(first_solutions_dict.get(e, []))
second_solutions = ",".join(second_solutions_dict.get(e, [])) # second_solutions = ",".join(second_solutions_dict.get(e, []))
first_demands = ",".join(first_demands_dict.get(e, [])) # first_demands = ",".join(first_demands_dict.get(e, []))
second_demands = ",".join(second_demands_dict.get(e, [])) # second_demands = ",".join(second_demands_dict.get(e, []))
first_positions = ",".join(first_positions_dict.get(e, [])) # first_positions = ",".join(first_positions_dict.get(e, []))
second_positions = ",".join(second_positions_dict.get(e, [])) # second_positions = ",".join(second_positions_dict.get(e, []))
projects = ",".join(projects_dict.get(e, [])) # projects = ",".join(projects_dict.get(e, []))
write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions, # write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions,
second_positions, projects, e) # second_positions, projects, e)
return cl_id # return cl_id
def update_tag3_user_portrait(cl_id): def update_tag3_user_portrait(cl_id):
...@@ -195,6 +195,7 @@ def update_tag3_user_portrait(cl_id): ...@@ -195,6 +195,7 @@ def update_tag3_user_portrait(cl_id):
user_df["first_positions"] = list(zip(user_df["first_positions"].apply(lambda x: x.split(",")), user_df["tag_score"])) user_df["first_positions"] = list(zip(user_df["first_positions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_positions"] = list(zip(user_df["second_positions"].apply(lambda x: x.split(",")), user_df["tag_score"])) user_df["second_positions"] = list(zip(user_df["second_positions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["projects"] = list(zip(user_df["projects"].apply(lambda x: x.split(",")), user_df["tag_score"])) user_df["projects"] = list(zip(user_df["projects"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["business_tags"] = list(zip(user_df["business_tags"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["first_solutions_dict"] = user_df["first_solutions"].apply(lambda x: make_dict_from_pair(x)) user_df["first_solutions_dict"] = user_df["first_solutions"].apply(lambda x: make_dict_from_pair(x))
user_df["second_solutions_dict"] = user_df["second_solutions"].apply(lambda x: make_dict_from_pair(x)) user_df["second_solutions_dict"] = user_df["second_solutions"].apply(lambda x: make_dict_from_pair(x))
...@@ -203,6 +204,7 @@ def update_tag3_user_portrait(cl_id): ...@@ -203,6 +204,7 @@ def update_tag3_user_portrait(cl_id):
user_df["first_positions_dict"] = user_df["first_positions"].apply(lambda x: make_dict_from_pair(x)) user_df["first_positions_dict"] = user_df["first_positions"].apply(lambda x: make_dict_from_pair(x))
user_df["second_positions_dict"] = user_df["second_positions"].apply(lambda x: make_dict_from_pair(x)) user_df["second_positions_dict"] = user_df["second_positions"].apply(lambda x: make_dict_from_pair(x))
user_df["projects_dict"] = user_df["projects"].apply(lambda x: make_dict_from_pair(x)) user_df["projects_dict"] = user_df["projects"].apply(lambda x: make_dict_from_pair(x))
user_df["business_tags_dict"] = user_df["business_tags"].apply(lambda x: make_dict_from_pair(x))
first_solutions_list = user_df["first_solutions_dict"].tolist() first_solutions_list = user_df["first_solutions_dict"].tolist()
second_solutions_list = user_df["second_solutions_dict"].tolist() second_solutions_list = user_df["second_solutions_dict"].tolist()
...@@ -211,6 +213,7 @@ def update_tag3_user_portrait(cl_id): ...@@ -211,6 +213,7 @@ def update_tag3_user_portrait(cl_id):
first_positions_list = user_df["first_positions_dict"].tolist() first_positions_list = user_df["first_positions_dict"].tolist()
second_positions_list = user_df["second_positions_dict"].tolist() second_positions_list = user_df["second_positions_dict"].tolist()
projects_list = user_df["projects_dict"].tolist() projects_list = user_df["projects_dict"].tolist()
business_tags_list = user_df["business_tags_dict"].tolist()
first_demands_score = merge_values(first_demands_list) first_demands_score = merge_values(first_demands_list)
second_demands_score = merge_values(second_demands_list) second_demands_score = merge_values(second_demands_list)
...@@ -220,6 +223,7 @@ def update_tag3_user_portrait(cl_id): ...@@ -220,6 +223,7 @@ def update_tag3_user_portrait(cl_id):
second_positions_score = merge_values(second_positions_list) second_positions_score = merge_values(second_positions_list)
# projects_score = merge_values(projects_list) # projects_score = merge_values(projects_list)
projects_score_tmp = merge_values(projects_list) projects_score_tmp = merge_values(projects_list)
business_tags_score = merge_values(business_tags_list)
tmp = [ tmp = [
"不感兴趣", "没有想法", "八卦来了", "颜值速报", "医美审美干货", "其他项目", "网红颜值", "少年之名", "郑人予", "热点课代表", "私密", "陈瑞泽", "符仁杰", "祖力亚尔", "刘泽旭", "不感兴趣", "没有想法", "八卦来了", "颜值速报", "医美审美干货", "其他项目", "网红颜值", "少年之名", "郑人予", "热点课代表", "私密", "陈瑞泽", "符仁杰", "祖力亚尔", "刘泽旭",
...@@ -248,7 +252,8 @@ def update_tag3_user_portrait(cl_id): ...@@ -248,7 +252,8 @@ def update_tag3_user_portrait(cl_id):
"second_solutions": second_solutions_score, "second_solutions": second_solutions_score,
"first_positions": first_positions_score, "first_positions": first_positions_score,
"second_positions": second_positions_score, "second_positions": second_positions_score,
"projects": projects_score "projects": projects_score,
"business_tags": business_tags_score
} }
if tmp_res: if tmp_res:
res.update(tmp_res) res.update(tmp_res)
...@@ -262,14 +267,21 @@ def update_tag3_user_portrait(cl_id): ...@@ -262,14 +267,21 @@ def update_tag3_user_portrait(cl_id):
if (len(first_demands_score.keys()) > 0) or (len(second_demands_score.keys()) > 0) or \ if (len(first_demands_score.keys()) > 0) or (len(second_demands_score.keys()) > 0) or \
(len(first_solutions_score.keys()) > 0) or (len(second_solutions_score.keys()) > 0) or \ (len(first_solutions_score.keys()) > 0) or (len(second_solutions_score.keys()) > 0) or \
(len(first_positions_score.keys()) > 0) or (len(second_positions_score.keys()) > 0) or \ (len(first_positions_score.keys()) > 0) or (len(second_positions_score.keys()) > 0) or \
(len(projects_score.keys()) > 0): (len(projects_score.keys()) > 0) or (len(business_tags_score.keys()) > 0):
redis_client.set(key, json.dumps(res)) redis_client.set(key, json.dumps(res))
redis_client.expire(key, 60 * 60 * 24 * 180) redis_client.expire(key, 60 * 60 * 24 * 180)
write_user_portrait(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()), write_user_portrait(
",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()), cl_id,
",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()), ",".join(first_solutions_score.keys()),
",".join(projects_score.keys())) ",".join(second_solutions_score.keys()),
",".join(first_demands_score.keys()),
",".join(second_demands_score.keys()),
",".join(first_positions_score.keys()),
",".join(second_positions_score.keys()),
",".join(projects_score.keys()),
",".join(business_tags_score.keys()),
)
# body = {} # body = {}
# for (k, v) in res.items(): # for (k, v) in res.items():
...@@ -333,6 +345,11 @@ if __name__ == "__main__": ...@@ -333,6 +345,11 @@ if __name__ == "__main__":
# update_tag3_user_portrait("862460044588666") # update_tag3_user_portrait("862460044588666")
# update_tag3_user_portrait("androidid_a25a1129c0b38f7b") # update_tag3_user_portrait("androidid_a25a1129c0b38f7b")
# cl_id = "864350041167473"
# df = get_tag3_user_log(cl_id)
# df[["projects", "business_tags"]]
# update_tag3_user_portrait(cl_id)
consume_kafka() consume_kafka()
end = datetime.datetime.now() end = datetime.datetime.now()
print(end - start) print(end - start)
......
# coding: utf-8 # coding: utf-8
from __future__ import absolute_import from __future__ import absolute_import, division, print_function
from __future__ import division
from __future__ import print_function import datetime
import pymysql import json
import smtplib import smtplib
import time
import traceback
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText from email.mime.text import MIMEText
from email.utils import formataddr from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import redis
import datetime
import time
import json
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import traceback import pymysql
import redis
def send_email(app,id,e): def send_email(app, id, e):
# 第三方 SMTP 服务 # 第三方 SMTP 服务
mail_host = 'smtp.exmail.qq.com' # 设置服务器 mail_host = "smtp.exmail.qq.com" # 设置服务器
mail_user = "zhaowei@igengmei.com" # 用户名 mail_user = "zhaowei@igengmei.com" # 用户名
mail_pass = "Gengmei1234" # 口令 mail_pass = "Gengmei1234" # 口令
sender = 'zhaowei@igengmei.com' sender = "zhaowei@igengmei.com"
receivers = ['zhaowei@igengmei.com'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱 receivers = ["zhaowei@igengmei.com"] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
e = str(e) e = str(e)
msg = MIMEMultipart() msg = MIMEMultipart()
part = MIMEText('app_id:'+id+':fail' + "\n" + e, 'plain', 'utf-8') part = MIMEText("app_id:" + id + ":fail" + "\n" + e, "plain", "utf-8")
msg.attach(part) msg.attach(part)
msg['From'] = formataddr(["gaoyazhe", sender]) msg["From"] = formataddr(["gaoyazhe", sender])
# 括号里的对应收件人邮箱昵称、收件人邮箱账号 # 括号里的对应收件人邮箱昵称、收件人邮箱账号
msg['To'] = ";".join(receivers) msg["To"] = ";".join(receivers)
# message['Cc'] = ";".join(cc_reciver) # message['Cc'] = ";".join(cc_reciver)
msg['Subject'] = 'spark streaming:app_name:'+app msg["Subject"] = "spark streaming:app_name:" + app
try: try:
with open('error.txt','w') as f: with open("error.txt", "w") as f:
f.write(e) f.write(e)
f.close() f.close()
part = MIMEApplication(open('error.txt', 'r').read()) part = MIMEApplication(open("error.txt", "r").read())
part.add_header('Content-Disposition', 'attachment', filename="error.txt") part.add_header("Content-Disposition", "attachment", filename="error.txt")
msg.attach(part) msg.attach(part)
except Exception as e: except Exception as e:
print(e) print(e)
...@@ -51,7 +51,7 @@ def send_email(app,id,e): ...@@ -51,7 +51,7 @@ def send_email(app,id,e):
smtpObj.login(mail_user, mail_pass) smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, msg.as_string()) smtpObj.sendmail(sender, receivers, msg.as_string())
except smtplib.SMTPException: except smtplib.SMTPException:
print('error') print("error")
def get_data_by_mysql(host, port, user, passwd, db, sql): def get_data_by_mysql(host, port, user, passwd, db, sql):
...@@ -90,13 +90,13 @@ def get_all_search_word_synonym_tags(): ...@@ -90,13 +90,13 @@ def get_all_search_word_synonym_tags():
"left join api_wordrelsynonym b on a.id = b.wordrel_id " \ "left join api_wordrelsynonym b on a.id = b.wordrel_id " \
"left join api_tag c on b.word=c.name " \ "left join api_tag c on b.word=c.name " \
"where c.tag_type+0<'4'+0 and c.is_online=1" "where c.tag_type+0<'4'+0 and c.is_online=1"
mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'zx_str', 'ZXueX58pStrage', 'zhengxing', sql) mysql_results = get_data_by_mysql("172.16.30.141", 3306, "zx_str", "ZXueX58pStrage", "zhengxing", sql)
result_dict = dict() result_dict = dict()
for data in mysql_results: for data in mysql_results:
if data['keyword'] not in result_dict: if data["keyword"] not in result_dict:
result_dict[data['keyword']] = [data['id']] result_dict[data["keyword"]] = [data["id"]]
else: else:
result_dict[data['keyword']].append(data['id']) result_dict[data["keyword"]].append(data["id"])
return result_dict return result_dict
except Exception as e: except Exception as e:
print(e) print(e)
...@@ -106,13 +106,13 @@ def get_all_word_synonym_words(): ...@@ -106,13 +106,13 @@ def get_all_word_synonym_words():
try: try:
sql = "select a.keyword, b.word from api_wordrel a " \ sql = "select a.keyword, b.word from api_wordrel a " \
"left join api_wordrelsynonym b on a.id = b.wordrel_id " "left join api_wordrelsynonym b on a.id = b.wordrel_id "
mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'zx_str', 'ZXueX58pStrage', 'zhengxing', sql) mysql_results = get_data_by_mysql("172.16.30.141", 3306, "zx_str", "ZXueX58pStrage", "zhengxing", sql)
result_dict = dict() result_dict = dict()
for data in mysql_results: for data in mysql_results:
if data['keyword'] not in result_dict: if data["keyword"] not in result_dict:
result_dict[data['keyword']] = [data['word']] result_dict[data["keyword"]] = [data["word"]]
else: else:
result_dict[data['keyword']].append(data['word']) result_dict[data["keyword"]].append(data["word"])
return result_dict return result_dict
except Exception as e: except Exception as e:
print(e) print(e)
...@@ -125,13 +125,13 @@ def get_all_synonym_tags(): ...@@ -125,13 +125,13 @@ def get_all_synonym_tags():
try: try:
sql = "select a.word, b.id from api_wordrelsynonym a left join api_tag b " \ sql = "select a.word, b.id from api_wordrelsynonym a left join api_tag b " \
"on a.word=b.name where b.tag_type+0<'4'+0 and b.is_online=1" "on a.word=b.name where b.tag_type+0<'4'+0 and b.is_online=1"
mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'zx_str', 'ZXueX58pStrage', 'zhengxing', sql) mysql_results = get_data_by_mysql("172.16.30.141", 3306, "zx_str", "ZXueX58pStrage", "zhengxing", sql)
result_dict = dict() result_dict = dict()
for data in mysql_results: for data in mysql_results:
if data['word'] not in result_dict: if data["word"] not in result_dict:
result_dict[data['word']] = [data['id']] result_dict[data["word"]] = [data["id"]]
else: else:
result_dict[data['word']].append(data['id']) result_dict[data["word"]].append(data["id"])
return result_dict return result_dict
except Exception as e: except Exception as e:
print(e) print(e)
...@@ -143,13 +143,13 @@ def get_all_api_tags(): ...@@ -143,13 +143,13 @@ def get_all_api_tags():
""" """
try: try:
sql = "select name, id from api_tag where tag_type in ('1', '2', '3', '5') and is_online=1" sql = "select name, id from api_tag where tag_type in ('1', '2', '3', '5') and is_online=1"
mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'zx_str', 'ZXueX58pStrage', 'zhengxing', sql) mysql_results = get_data_by_mysql("172.16.30.141", 3306, "zx_str", "ZXueX58pStrage", "zhengxing", sql)
result_dict = dict() result_dict = dict()
for data in mysql_results: for data in mysql_results:
if data['name'] not in result_dict: if data["name"] not in result_dict:
result_dict[data['name']] = [data['id']] result_dict[data["name"]] = [data["id"]]
else: else:
result_dict[data['name']].append(data['id']) result_dict[data["name"]].append(data["id"])
return result_dict return result_dict
except Exception as e: except Exception as e:
print(e) print(e)
...@@ -171,10 +171,10 @@ def get_all_tag_tag_type(): ...@@ -171,10 +171,10 @@ def get_all_tag_tag_type():
""" """
try: try:
sql = "select id,tag_type from api_tag where tag_type+0<'4'+0 and is_online=1" sql = "select id,tag_type from api_tag where tag_type+0<'4'+0 and is_online=1"
mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'zx_str', 'ZXueX58pStrage', 'zhengxing', sql) mysql_results = get_data_by_mysql("172.16.30.141", 3306, "zx_str", "ZXueX58pStrage", "zhengxing", sql)
result_dict = dict() result_dict = dict()
for data in mysql_results: for data in mysql_results:
result_dict[data['id']] = data['tag_type'] result_dict[data["id"]] = data["tag_type"]
return result_dict return result_dict
except Exception as e: except Exception as e:
print(e) print(e)
...@@ -186,13 +186,13 @@ def get_all_3tag_2tag(): ...@@ -186,13 +186,13 @@ def get_all_3tag_2tag():
" left join api_tag b on a.parent_id=b.id " \ " left join api_tag b on a.parent_id=b.id " \
"where a.child_id in (select id from api_tag where tag_type='3' and is_online=1) " \ "where a.child_id in (select id from api_tag where tag_type='3' and is_online=1) " \
"and b.tag_type='2' and b.is_online=1" "and b.tag_type='2' and b.is_online=1"
mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'zx_str', 'ZXueX58pStrage', 'zhengxing', sql) mysql_results = get_data_by_mysql("172.16.30.141", 3306, "zx_str", "ZXueX58pStrage", "zhengxing", sql)
result_dict = dict() result_dict = dict()
for data in mysql_results: for data in mysql_results:
if data['child_id'] not in result_dict: if data["child_id"] not in result_dict:
result_dict[data['child_id']] = [data['parent_id']] result_dict[data["child_id"]] = [data["parent_id"]]
else: else:
result_dict[data['child_id']].append(data['parent_id']) result_dict[data["child_id"]].append(data["parent_id"])
return result_dict return result_dict
except Exception as e: except Exception as e:
print(e) print(e)
...@@ -204,13 +204,13 @@ def get_all_tag_parent_tag(): ...@@ -204,13 +204,13 @@ def get_all_tag_parent_tag():
" left join api_tag b on a.parent_id=b.id " \ " left join api_tag b on a.parent_id=b.id " \
"where a.child_id in (select id from api_tag where tag_type+0<'4'+0 and is_online=1) " \ "where a.child_id in (select id from api_tag where tag_type+0<'4'+0 and is_online=1) " \
"and b.tag_type+0<'4'+0 and b.is_online=1" "and b.tag_type+0<'4'+0 and b.is_online=1"
mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'zx_str', 'ZXueX58pStrage', 'zhengxing', sql) mysql_results = get_data_by_mysql("172.16.30.141", 3306, "zx_str", "ZXueX58pStrage", "zhengxing", sql)
result_dict = dict() result_dict = dict()
for data in mysql_results: for data in mysql_results:
if data['child_id'] not in result_dict: if data["child_id"] not in result_dict:
result_dict[data['child_id']] = [data['parent_id']] result_dict[data["child_id"]] = [data["parent_id"]]
else: else:
result_dict[data['child_id']].append(data['parent_id']) result_dict[data["child_id"]].append(data["parent_id"])
return result_dict return result_dict
except Exception as e: except Exception as e:
print(e) print(e)
...@@ -219,10 +219,10 @@ def get_all_tag_parent_tag(): ...@@ -219,10 +219,10 @@ def get_all_tag_parent_tag():
def get_all_tags_name(): def get_all_tags_name():
try: try:
sql = "select id, name from api_tag where tag_type+0<'4'+0 and is_online=1" sql = "select id, name from api_tag where tag_type+0<'4'+0 and is_online=1"
mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'zx_str', 'ZXueX58pStrage', 'zhengxing', sql) mysql_results = get_data_by_mysql("172.16.30.141", 3306, "zx_str", "ZXueX58pStrage", "zhengxing", sql)
result_dict = dict() result_dict = dict()
for data in mysql_results: for data in mysql_results:
result_dict[data['id']] = data['name'] result_dict[data["id"]] = data["name"]
return result_dict return result_dict
except Exception as e: except Exception as e:
print(e) print(e)
...@@ -244,57 +244,67 @@ def get_tag2_from_tag3(tag3, all_3tag_2tag, user_log_df_tag2_list): ...@@ -244,57 +244,67 @@ def get_tag2_from_tag3(tag3, all_3tag_2tag, user_log_df_tag2_list):
def compute_henqiang(x, decay_days=30, exponential=0, action_tag_count=1): def compute_henqiang(x, decay_days=30, exponential=0, action_tag_count=1):
if exponential: if exponential:
alpha = exponential_decay(x, decay_days) alpha = exponential_decay(x, decay_days)
score = 15/action_tag_count - 1.1**alpha * ((15-0.5)/decay_days) score = 15 / action_tag_count - 1.1**alpha * ((15 - 0.5) / decay_days)
else: else:
score = 15/action_tag_count - x * ((15-0.5)/decay_days) score = 15 / action_tag_count - x * ((15 - 0.5) / decay_days)
if score > 0.5: if score > 0.5:
return score return score
else: else:
return 0.5 return 0.5
def compute_jiaoqiang(x, decay_days=30, exponential=0, action_tag_count=1): def compute_jiaoqiang(x, decay_days=30, exponential=0, action_tag_count=1):
if exponential: if exponential:
alpha = exponential_decay(x, decay_days) alpha = exponential_decay(x, decay_days)
score = 12/action_tag_count - 1.1**alpha * ((12-0.5)/decay_days) score = 12 / action_tag_count - 1.1**alpha * ((12 - 0.5) / decay_days)
else: else:
score = 12/action_tag_count - x * ((12-0.5)/decay_days) score = 12 / action_tag_count - x * ((12 - 0.5) / decay_days)
if score > 0.5: if score > 0.5:
return score return score
else: else:
return 0.5 return 0.5
def compute_ruoyixiang(x, decay_days=30, exponential=0, action_tag_count=1): def compute_ruoyixiang(x, decay_days=30, exponential=0, action_tag_count=1):
if exponential: if exponential:
alpha = exponential_decay(x, decay_days) alpha = exponential_decay(x, decay_days)
score = 5/action_tag_count - 1.1**alpha * ((5-0.5)/decay_days) score = 5 / action_tag_count - 1.1**alpha * ((5 - 0.5) / decay_days)
else: else:
score = 5/action_tag_count - x * ((5-0.5)/decay_days) score = 5 / action_tag_count - x * ((5 - 0.5) / decay_days)
if score > 0.5: if score > 0.5:
return score return score
else: else:
return 0.5 return 0.5
def compute_validate(x, decay_days=30, exponential=0, action_tag_count=1): def compute_validate(x, decay_days=30, exponential=0, action_tag_count=1):
if exponential: if exponential:
alpha = exponential_decay(x, decay_days) alpha = exponential_decay(x, decay_days)
score = 10/action_tag_count - 1.1**alpha * ((10-0.5)/decay_days) score = 10 / action_tag_count - 1.1**alpha * ((10 - 0.5) / decay_days)
else: else:
score = 10/action_tag_count - x * ((10-0.5)/decay_days) score = 10 / action_tag_count - x * ((10 - 0.5) / decay_days)
if score > 0.5: if score > 0.5:
return score return score
else: else:
return 0.5 return 0.5
def compute_ai_scan(x, decay_days=30, exponential=0, action_tag_count=1): def compute_ai_scan(x, decay_days=30, exponential=0, action_tag_count=1):
if exponential: if exponential:
alpha = exponential_decay(x, decay_days) alpha = exponential_decay(x, decay_days)
score = 2/action_tag_count - 1.1**alpha * ((2-0.5)/decay_days) score = 2 / action_tag_count - 1.1**alpha * ((2 - 0.5) / decay_days)
else: else:
score = 2/action_tag_count - x * ((2-0.5)/decay_days) score = 2 / action_tag_count - x * ((2 - 0.5) / decay_days)
if score > 0.5: if score > 0.5:
return score return score
else: else:
return 0.5 return 0.5
def get_action_tag_count(df, action_time): def get_action_tag_count(df, action_time):
try: try:
if not df[df['time'] == action_time].empty: if not df[df["time"] == action_time].empty:
return len(df[df['time'] == action_time]) return len(df[df["time"] == action_time])
else: else:
return 1 return 1
except Exception as e: except Exception as e:
...@@ -303,7 +313,7 @@ def get_action_tag_count(df, action_time): ...@@ -303,7 +313,7 @@ def get_action_tag_count(df, action_time):
def exponential_decay(days_diff, decay_days=30): def exponential_decay(days_diff, decay_days=30):
# 天数差归一化到[0, decay_days] # 天数差归一化到[0, decay_days]
x = np.arange(1, 180+1, 1) x = np.arange(1, 180 + 1, 1)
a = (decay_days - 0) * (days_diff - min(x)) / (max(x) - min(x)) a = (decay_days - 0) * (days_diff - min(x)) / (max(x) - min(x))
return a return a
...@@ -315,8 +325,12 @@ def args_test(x): ...@@ -315,8 +325,12 @@ def args_test(x):
def get_user_log(cl_id, all_word_tags, pay_time=0, debug=0): def get_user_log(cl_id, all_word_tags, pay_time=0, debug=0):
user_df_service = pd.DataFrame(columns=["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"]) user_df_service = pd.DataFrame(columns=["time", "cl_id", "score_type", "tag_id", "tag_referrer", "action"])
try: try:
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy', db_jerry_test = pymysql.connect(host="172.16.40.158",
db='jerry_test', charset='utf8') port=4000,
user="st_user",
passwd="aqpuBLYzEV7tML5RPsN1pntUzFy",
db="jerry_test",
charset="utf8")
cur_jerry_test = db_jerry_test.cursor() cur_jerry_test = db_jerry_test.cursor()
if pay_time == 0: if pay_time == 0:
user_df_service_sql = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log " \ user_df_service_sql = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log " \
...@@ -335,19 +349,19 @@ def get_user_log(cl_id, all_word_tags, pay_time=0, debug=0): ...@@ -335,19 +349,19 @@ def get_user_log(cl_id, all_word_tags, pay_time=0, debug=0):
user_df_search = user_df_service[user_df_service["action"] == "do_search"] user_df_search = user_df_service[user_df_service["action"] == "do_search"]
if debug: if debug:
# 用户的非搜索、支付行为 # 用户的非搜索、支付行为
user_df_service = user_df_service.loc[ user_df_service = user_df_service.loc[~user_df_service["action"].isin(["do_search", "api/settlement/alipay_callback"]
~user_df_service["action"].isin(["do_search", "api/settlement/alipay_callback"])] )]
else: else:
# 用户的非搜索行为 # 用户的非搜索行为
user_df_service = user_df_service.loc[~user_df_service["action"].isin(["do_search"])] user_df_service = user_df_service.loc[~user_df_service["action"].isin(["do_search"])]
# 搜索词转成tag,合并用户日志 # 搜索词转成tag,合并用户日志
user_df_search_dict = dict() user_df_search_dict = dict()
for index, row in user_df_search.iterrows(): for index, row in user_df_search.iterrows():
if row['tag_referrer'] in all_word_tags: if row["tag_referrer"] in all_word_tags:
word_tag_list = all_word_tags[row['tag_referrer']] word_tag_list = all_word_tags[row["tag_referrer"]]
row['tag_id'] = int(word_tag_list[0]) if word_tag_list else -1 row["tag_id"] = int(word_tag_list[0]) if word_tag_list else -1
else: else:
row['tag_id'] = -1 row["tag_id"] = -1
user_df_service = user_df_service.append(user_df_search) user_df_service = user_df_service.append(user_df_search)
return user_df_service[user_df_service["tag_id"] != -1] return user_df_service[user_df_service["tag_id"] != -1]
except: except:
...@@ -366,12 +380,7 @@ def get_jerry_test(): ...@@ -366,12 +380,7 @@ def get_jerry_test():
def get_doris_prod(): def get_doris_prod():
db = pymysql.connect(host="172.16.30.136", db = pymysql.connect(host="172.16.30.136", port=3306, user="doris", passwd="o5gbA27hXHHm", db="doris_prod", charset="utf8")
port=3306,
user="doris",
passwd="o5gbA27hXHHm",
db="doris_prod",
charset="utf8")
return db, db.cursor() return db, db.cursor()
...@@ -391,11 +400,11 @@ def compute_tag3_score(x): ...@@ -391,11 +400,11 @@ def compute_tag3_score(x):
def get_tag3_user_log(cl_id): def get_tag3_user_log(cl_id):
columns = [ columns = [
"log_time", "score_type", "event_cn", "first_solutions", "second_solutions", "first_demands", "second_demands", "log_time", "score_type", "event_cn", "first_solutions", "second_solutions", "first_demands", "second_demands",
"first_positions", "second_positions", "projects" "first_positions", "second_positions", "projects", "business_tags"
] ]
try: try:
sql = """select log_time, score_type, event_cn, first_solutions, second_solutions, first_demands, sql = """select log_time, score_type, event_cn, first_solutions, second_solutions, first_demands,
second_demands, first_positions, second_positions, projects second_demands, first_positions, second_positions, projects, business_tags
from kafka_tag3_log where cl_id = '{}'""".format(cl_id) from kafka_tag3_log where cl_id = '{}'""".format(cl_id)
db, cursor = get_doris_prod() db, cursor = get_doris_prod()
...@@ -412,6 +421,7 @@ def get_tag3_user_log(cl_id): ...@@ -412,6 +421,7 @@ def get_tag3_user_log(cl_id):
user_df["days_diff_now"] = round((int(time.time()) - user_df["log_time"].astype(float)) / (24 * 60 * 60)) user_df["days_diff_now"] = round((int(time.time()) - user_df["log_time"].astype(float)) / (24 * 60 * 60))
user_df["tag_score"] = user_df.apply(lambda x: compute_tag3_score(x), axis=1) user_df["tag_score"] = user_df.apply(lambda x: compute_tag3_score(x), axis=1)
user_df["business_tags"] = user_df["business_tags"].fillna("")
return user_df return user_df
except Exception as e: except Exception as e:
print(e) print(e)
...@@ -431,15 +441,16 @@ def get_tag3_user_log(cl_id): ...@@ -431,15 +441,16 @@ def get_tag3_user_log(cl_id):
# `projects` text NOT NULL, # `projects` text NOT NULL,
# PRIMARY KEY(`id`) # PRIMARY KEY(`id`)
# ) # )
def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands, second_demands, # ALTER TABLE `user_tag3_portrait` ADD COLUMN business_tags text COMMENT '商业标签';
first_positions, second_positions, projects): def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions,
second_positions, projects, business_tags):
try: try:
today = datetime.date.today() today = datetime.date.today()
oneday = datetime.timedelta(days=1) oneday = datetime.timedelta(days=1)
yesterday = today - oneday yesterday = today - oneday
sql = """insert into user_tag3_portrait values(null, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')""".format( sql = """insert into user_tag3_portrait values(null, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')""".format(
yesterday, cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions, yesterday, cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions, second_positions,
second_positions, projects) projects, business_tags)
db, cursor = get_jerry_test() db, cursor = get_jerry_test()
cursor.execute(sql) cursor.execute(sql)
...@@ -451,24 +462,24 @@ def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands, ...@@ -451,24 +462,24 @@ def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands,
print(e) print(e)
def write_user_portrait_doris(cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions, # def write_user_portrait_doris(cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions,
second_positions, projects): # second_positions, projects):
try: # try:
today = datetime.date.today() # today = datetime.date.today()
oneday = datetime.timedelta(days=1) # oneday = datetime.timedelta(days=1)
yesterday = today - oneday # yesterday = today - oneday
sql = """insert into user_tag3_portrait values(null, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')""".format( # sql = """insert into user_tag3_portrait values(null, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')""".format(
yesterday, cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions, second_positions, # yesterday, cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions, second_positions,
projects) # projects)
db, cursor = get_doris_prod() # db, cursor = get_doris_prod()
cursor.execute(sql) # cursor.execute(sql)
db.commit() # db.commit()
db.close() # db.close()
cursor.close() # cursor.close()
except Exception as e: # except Exception as e:
print("write db error") # print("write db error")
print(e) # print(e)
# CREATE TABLE `user_tag3_event_portrait` ( # CREATE TABLE `user_tag3_event_portrait` (
...@@ -506,7 +517,7 @@ def write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first ...@@ -506,7 +517,7 @@ def write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first
def get_redis_client(): def get_redis_client():
return redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN9@172.16.40.173:6379') return redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379")
def get_user_portrait_tag3_from_redis(device_id, limit_score=0): def get_user_portrait_tag3_from_redis(device_id, limit_score=0):
...@@ -514,6 +525,7 @@ def get_user_portrait_tag3_from_redis(device_id, limit_score=0): ...@@ -514,6 +525,7 @@ def get_user_portrait_tag3_from_redis(device_id, limit_score=0):
new_d = dict(sorted(d.items(), key=lambda x: x[1], reverse=True)) new_d = dict(sorted(d.items(), key=lambda x: x[1], reverse=True))
res = {tag: float(score) for tag, score in new_d.items() if float(score) >= limit_score} res = {tag: float(score) for tag, score in new_d.items() if float(score) >= limit_score}
return list(res.keys()) return list(res.keys())
portrait_key = "doris:user_portrait:tag3:device_id:" + str(device_id) portrait_key = "doris:user_portrait:tag3:device_id:" + str(device_id)
redis_client = get_redis_client() redis_client = get_redis_client()
if redis_client.exists(portrait_key): if redis_client.exists(portrait_key):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment