Commit 9b54ff3d authored by 赵威's avatar 赵威

Merge branch 'offic' into 'master'

Offic

See merge request !74
parents a568be7f e353a4ff
......@@ -8,8 +8,8 @@ import pymysql
from pyspark import SparkConf
from pyspark.sql import SparkSession
from tool import (get_doris_prod, get_redis_client, get_tag3_user_log, get_user_portrait_tag3_with_score, send_email,
write_user_portrait)
from tool import (get_doris_prod, get_redis_client, get_tag3_user_log, get_tag3_user_order_log, get_user_portrait_tag3_with_score,
send_email, write_user_portrait)
# [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873, '植发际线': 7.1}]
......@@ -187,6 +187,7 @@ def _get_all_by_projects(name_lst):
def update_tag3_user_portrait(cl_id):
user_df = get_tag3_user_log(cl_id)
order_df = get_tag3_user_order_log(cl_id)
if not user_df.empty:
user_df["first_solutions"] = list(zip(user_df["first_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_solutions"] = list(zip(user_df["second_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
......@@ -240,6 +241,24 @@ def update_tag3_user_portrait(cl_id):
if k not in forbidden_set:
projects_score[k] = v
paid_business_tags_score = {}
validate_business_tags_score = {}
if not order_df.empty:
paid_order_df = order_df[order_df["event_cn"] == "支付订单"]
if not paid_order_df.empty:
paid_order_df["business_tags"] = list(zip(paid_order_df["business_tags"].apply(lambda x: x.split(",")), paid_order_df["tag_score"]))
paid_order_df["business_tags_dict"] = paid_order_df["business_tags"].apply(lambda x: make_dict_from_pair(x))
paid_business_tags_list = paid_order_df["business_tags_dict"].tolist()
paid_business_tags_score = merge_values(paid_business_tags_list)
validate_order_df = order_df[order_df["event_cn"] == "验证订单"]
if not validate_order_df.empty:
validate_order_df["business_tags"] = list(zip(validate_order_df["business_tags"].apply(lambda x: x.split(",")), validate_order_df["tag_score"]))
validate_order_df["business_tags_dict"] = validate_order_df["business_tags"].apply(lambda x: make_dict_from_pair(x))
validate_business_tags_list = validate_order_df["business_tags_dict"].tolist()
validate_business_tags_score = merge_values(validate_business_tags_list)
tmp_res = {}
if (len(first_demands_score) == 0) and (len(first_solutions_score) == 0) and (len(first_positions_score) == 0) and (
len(second_demands_score) == 0) and (len(second_solutions_score) == 0) and (len(second_positions_score) == 0):
......@@ -253,7 +272,9 @@ def update_tag3_user_portrait(cl_id):
"first_positions": first_positions_score,
"second_positions": second_positions_score,
"projects": projects_score,
"business_tags": business_tags_score
"business_tags": business_tags_score,
"paid_business_tags": paid_business_tags_score,
"validate_business_tags": validate_business_tags_score
}
if tmp_res:
res.update(tmp_res)
......@@ -350,6 +371,9 @@ if __name__ == "__main__":
# df[["projects", "business_tags"]]
# update_tag3_user_portrait(cl_id)
# cl_id = "867617044159377"
# print(update_tag3_user_portrait(cl_id))
consume_kafka()
end = datetime.datetime.now()
print(end - start)
......
......@@ -442,6 +442,36 @@ def get_tag3_user_log(cl_id):
return pd.DataFrame(columns=columns)
# select log_time, score_type, business_tags, event_cn from kafka_tag3_log where cl_id = '867617044159377' and event_cn in ('支付订单', '验证订单');
def get_tag3_user_order_log(cl_id):
columns = [
"log_time", "score_type", "business_tags", "event_cn"
]
try:
sql = """select log_time, score_type, business_tags, event_cn
from kafka_tag3_log
where cl_id = '{}' and event_cn in ('支付订单', '验证订单')""".format(cl_id)
db, cursor = get_doris_prod()
cursor.execute(sql)
data = list(cursor.fetchall())
db.close()
cursor.close()
if data:
user_df = pd.DataFrame(data)
user_df.columns = columns
else:
return pd.DataFrame(columns=columns)
user_df["days_diff_now"] = round((int(time.time()) - user_df["log_time"].astype(float)) / (24 * 60 * 60))
user_df["tag_score"] = user_df.apply(lambda x: compute_tag3_score(x), axis=1)
user_df["business_tags"] = user_df["business_tags"].fillna("")
return user_df
except Exception as e:
print(e)
return pd.DataFrame(columns=columns)
# CREATE TABLE `user_tag3_portrait` (
# `id` int(11) NOT NULL AUTO_INCREMENT,
# `date` text NOT NULL,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment