Commit adde2769 authored by 赵威's avatar 赵威

add event

parent 08cd43e0
...@@ -8,7 +8,7 @@ import redis ...@@ -8,7 +8,7 @@ import redis
from pyspark import SparkConf from pyspark import SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from tool import (get_jerry_test, get_tag3_user_log, send_email, write_user_portrait) from tool import (get_jerry_test, get_tag3_user_log, send_email, write_user_portrait, write_user_portrait_by_event)
# [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}] # [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}]
...@@ -27,6 +27,15 @@ def make_dict_from_pair(x): ...@@ -27,6 +27,15 @@ def make_dict_from_pair(x):
return dict(zip(x[0], [x[1]] * len(x[0]))) return dict(zip(x[0], [x[1]] * len(x[0])))
def update_tag3_user_portrait_by_event(cl_id):
user_df = get_tag3_user_log(cl_id)
if not user_df.empty:
user_df["first_solutions_1"] = list(zip(user_df["event_cn"], user_df["first_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
print(user_df)
return cl_id
def update_tag3_user_portrait(cl_id): def update_tag3_user_portrait(cl_id):
user_df = get_tag3_user_log(cl_id) user_df = get_tag3_user_log(cl_id)
if not user_df.empty: if not user_df.empty:
...@@ -82,20 +91,6 @@ def update_tag3_user_portrait(cl_id): ...@@ -82,20 +91,6 @@ def update_tag3_user_portrait(cl_id):
redis_client.set(key, json.dumps(res)) redis_client.set(key, json.dumps(res))
redis_client.expire(key, 60 * 60 * 24 * 30) redis_client.expire(key, 60 * 60 * 24 * 30)
# only need the first time
# key2 = "doris:user_portrait:tag3:increment_update:device_id:" + str(cl_id)
# res2 = {
# "first_demands": list(first_demands_score.keys()),
# "second_demands": list(second_demands_score.keys()),
# "first_solutions": list(first_solutions_score.keys()),
# "second_solutions": list(second_solutions_score.keys()),
# "first_positions": list(first_positions_score.keys()),
# "second_positions": list(second_positions_score.keys()),
# "projects": list(projects_score.keys())
# }
# redis_client.set(key2, json.dumps(res2))
# redis_client.expire(key2, 60 * 60 * 24 * 30)
write_user_portrait(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()), write_user_portrait(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()),
",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()), ",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()),
",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()), ",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()),
...@@ -105,14 +100,15 @@ def update_tag3_user_portrait(cl_id): ...@@ -105,14 +100,15 @@ def update_tag3_user_portrait(cl_id):
def consume_kafka(): def consume_kafka():
sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))" # TODO comment
db, cursor = get_jerry_test() # sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))"
cursor.execute(sql) # db, cursor = get_jerry_test()
device_ids_lst = [i[0] for i in cursor.fetchall()] # cursor.execute(sql)
db.close() # device_ids_lst = [i[0] for i in cursor.fetchall()]
cursor.close() # db.close()
# cursor.close()
# device_ids_lst = ["androidid_a25a1129c0b38f7b"] device_ids_lst = ["androidid_a25a1129c0b38f7b"]
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
...@@ -140,7 +136,11 @@ def consume_kafka(): ...@@ -140,7 +136,11 @@ def consume_kafka():
if __name__ == "__main__": if __name__ == "__main__":
start = datetime.datetime.now() start = datetime.datetime.now()
consume_kafka() # TODO
# consume_kafka()
update_tag3_user_portrait_by_event("androidid_a25a1129c0b38f7b")
end = datetime.datetime.now() end = datetime.datetime.now()
print(end - start) print(end - start)
print("done") print("done")
...@@ -439,3 +439,37 @@ def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands, ...@@ -439,3 +439,37 @@ def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands,
except Exception as e: except Exception as e:
print("write db error") print("write db error")
print(e) print(e)
# CREATE TABLE `user_tag3_event_portrait` (
# `id` int(11) NOT NULL AUTO_INCREMENT,
# `date` text NOT NULL,
# `cl_id` text NOT NULL,
# `first_solutions` text NOT NULL,
# `second_solutions` text NOT NULL,
# `first_demands` text NOT NULL,
# `second_demands` text NOT NULL,
# `first_positions` text NOT NULL,
# `second_positions` text NOT NULL,
# `projects` text NOT NULL,
# `event` text NOT NULL,
# PRIMARY KEY(`id`)
# )
def write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions,
second_positions, projects, event):
try:
today = datetime.date.today()
oneday = datetime.timedelta(days=1)
yesterday = today - oneday
sql = """insert into user_tag3_event_portrait values(null, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')""".format(
yesterday, cl_id, first_solutions, second_solutions, first_demands, second_demands, first_positions, second_positions,
projects, event)
db, cursor = get_jerry_test()
cursor.execute(sql)
db.commit()
db.close()
cursor.close()
except Exception as e:
print("write db error")
print(e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment