Commit e58fc94d authored by 赵威's avatar 赵威

update expire time

parent 4aa782a8
...@@ -5,7 +5,6 @@ import operator ...@@ -5,7 +5,6 @@ import operator
from collections import Counter from collections import Counter
import redis import redis
from pyspark import SparkConf from pyspark import SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from tool import (get_jerry_test, get_tag3_user_log, send_email, write_user_portrait) from tool import (get_jerry_test, get_tag3_user_log, send_email, write_user_portrait)
...@@ -29,18 +28,12 @@ def make_dict_from_pair(x): ...@@ -29,18 +28,12 @@ def make_dict_from_pair(x):
def update_tag3_user_portrait(cl_id): def update_tag3_user_portrait(cl_id):
user_df = get_tag3_user_log(cl_id) user_df = get_tag3_user_log(cl_id)
if not user_df.empty: if not user_df.empty:
user_df["first_solutions"] = list( user_df["first_solutions"] = list(zip(user_df["first_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
zip(user_df["first_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"])) user_df["second_solutions"] = list(zip(user_df["second_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_solutions"] = list( user_df["first_demands"] = list(zip(user_df["first_demands"].apply(lambda x: x.split(",")), user_df["tag_score"]))
zip(user_df["second_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"])) user_df["second_demands"] = list(zip(user_df["second_demands"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["first_demands"] = list( user_df["first_positions"] = list(zip(user_df["first_positions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
zip(user_df["first_demands"].apply(lambda x: x.split(",")), user_df["tag_score"])) user_df["second_positions"] = list(zip(user_df["second_positions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_demands"] = list(
zip(user_df["second_demands"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["first_positions"] = list(
zip(user_df["first_positions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_positions"] = list(
zip(user_df["second_positions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["projects"] = list(zip(user_df["projects"].apply(lambda x: x.split(",")), user_df["tag_score"])) user_df["projects"] = list(zip(user_df["projects"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["first_solutions_dict"] = user_df["first_solutions"].apply(lambda x: make_dict_from_pair(x)) user_df["first_solutions_dict"] = user_df["first_solutions"].apply(lambda x: make_dict_from_pair(x))
...@@ -78,26 +71,21 @@ def update_tag3_user_portrait(cl_id): ...@@ -78,26 +71,21 @@ def update_tag3_user_portrait(cl_id):
"need_update_data": 0 "need_update_data": 0
} }
# TODO 冷启动 key = "doris:user_portrait:tag3:device_id:" + str(cl_id)
# TODO doris:user_portrait:tag3:device_id:
# TODO expire time
key = "doris:test:device_id:" + str(cl_id)
redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379") redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379")
redis_client.set(key, json.dumps(res)) redis_client.set(key, json.dumps(res))
redis_client.expire(key, 60 * 60 * 24) redis_client.expire(key, 60 * 60 * 24 * 30)
write_user_portrait(cl_id, ",".join(first_solutions_score.keys()), write_user_portrait(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()),
",".join(second_solutions_score.keys()), ",".join(first_demands_score.keys()), ",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()),
",".join(second_demands_score.keys()), ",".join(first_positions_score.keys()), ",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()),
",".join(second_positions_score.keys()), ",".join(projects_score.keys())) ",".join(projects_score.keys()))
return cl_id return cl_id
def consume_kafka(): def consume_kafka():
# TODO 30 sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))"
sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 15 day))"
db, cursor = get_jerry_test() db, cursor = get_jerry_test()
cursor.execute(sql) cursor.execute(sql)
device_ids_lst = [i[0] for i in cursor.fetchall()] device_ids_lst = [i[0] for i in cursor.fetchall()]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment