From adde276993c5489a723b04e4e63189284ec5c814 Mon Sep 17 00:00:00 2001 From: zhaowei <zhaowei@igengmei.com> Date: Thu, 11 Jun 2020 17:33:56 +0800 Subject: [PATCH] add event --- .../tag3_update_user_portrait_offline.py | 46 +++++++++---------- eda/smart_rank/tool.py | 34 ++++++++++++++ 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/eda/smart_rank/tag3_update_user_portrait_offline.py b/eda/smart_rank/tag3_update_user_portrait_offline.py index 02ba01af..29998458 100644 --- a/eda/smart_rank/tag3_update_user_portrait_offline.py +++ b/eda/smart_rank/tag3_update_user_portrait_offline.py @@ -8,7 +8,7 @@ import redis from pyspark import SparkConf from pyspark.sql import SparkSession -from tool import (get_jerry_test, get_tag3_user_log, send_email, write_user_portrait) +from tool import (get_jerry_test, get_tag3_user_log, send_email, write_user_portrait, write_user_portrait_by_event) # [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}] @@ -27,6 +27,15 @@ def make_dict_from_pair(x): return dict(zip(x[0], [x[1]] * len(x[0]))) +def update_tag3_user_portrait_by_event(cl_id): + user_df = get_tag3_user_log(cl_id) + if not user_df.empty: + user_df["first_solutions_1"] = list(zip(user_df["event_cn"], user_df["first_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"])) + print(user_df) + + return cl_id + + def update_tag3_user_portrait(cl_id): user_df = get_tag3_user_log(cl_id) if not user_df.empty: @@ -82,20 +91,6 @@ def update_tag3_user_portrait(cl_id): redis_client.set(key, json.dumps(res)) redis_client.expire(key, 60 * 60 * 24 * 30) - # only need the first time - # key2 = "doris:user_portrait:tag3:increment_update:device_id:" + str(cl_id) - # res2 = { - # "first_demands": list(first_demands_score.keys()), - # "second_demands": list(second_demands_score.keys()), - # "first_solutions": list(first_solutions_score.keys()), - # "second_solutions": list(second_solutions_score.keys()), - # "first_positions": list(first_positions_score.keys()), - # "second_positions": list(second_positions_score.keys()), - # "projects": list(projects_score.keys()) - # } - # redis_client.set(key2, json.dumps(res2)) - # redis_client.expire(key2, 60 * 60 * 24 * 30) - write_user_portrait(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()), ",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()), ",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()), @@ -105,14 +100,15 @@ def update_tag3_user_portrait(cl_id): def consume_kafka(): - sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))" - db, cursor = get_jerry_test() - cursor.execute(sql) - device_ids_lst = [i[0] for i in cursor.fetchall()] - db.close() - cursor.close() + # TODO comment + # sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))" + # db, cursor = get_jerry_test() + # cursor.execute(sql) + # device_ids_lst = [i[0] for i in cursor.fetchall()] + # db.close() + # cursor.close() - # device_ids_lst = ["androidid_a25a1129c0b38f7b"] + device_ids_lst = ["androidid_a25a1129c0b38f7b"] sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ @@ -140,7 +136,11 @@ def consume_kafka(): if __name__ == "__main__": start = datetime.datetime.now() - consume_kafka() + # TODO + # consume_kafka() + + update_tag3_user_portrait_by_event("androidid_a25a1129c0b38f7b") + end = datetime.datetime.now() print(end - start) print("done") diff --git a/eda/smart_rank/tool.py b/eda/smart_rank/tool.py index e08b978c..af3578db 100644 --- a/eda/smart_rank/tool.py +++ b/eda/smart_rank/tool.py @@ -439,3 +439,37 @@ def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands, except Exception as e: print("write db error") print(e) + + +# CREATE TABLE `user_tag3_event_portrait` ( +# `id` int(11) NOT NULL AUTO_INCREMENT, +# `date` text NOT NULL, +# `cl_id` text NOT NULL, +# `first_solutions` text NOT NULL, +# `second_solutions` text NOT NULL, +# `first_demands` text NOT NULL, +# `second_demands` text NOT NULL, +# `first_positions` text NOT NULL, +# `second_positions` text NOT NULL, +# `projects` text NOT NULL, +# `event` text NOT NULL, +# PRIMARY KEY(`id`) +# ) +def write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions, + second_positions, projects, event): + try: + today = datetime.date.today() + oneday = datetime.timedelta(days=1) + yesterday = today - oneday + sql = """insert into user_tag3_event_portrait values(null, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')""".format( + yesterday, cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions, second_positions, + projects, event) + + db, cursor = get_jerry_test() + cursor.execute(sql) + db.commit() + db.close() + cursor.close() + except Exception as e: + print("write db error") + print(e) -- 2.18.0