Commit 7b103f10 authored by 赵威's avatar 赵威

update when not empty

parent 2f5972f0
...@@ -25,7 +25,7 @@ def make_dict_from_pair(x): ...@@ -25,7 +25,7 @@ def make_dict_from_pair(x):
return dict(zip(x[0], [x[1]] * len(x[0]))) return dict(zip(x[0], [x[1]] * len(x[0])))
def update_tag3_user_portrait(cl_id): def update_tag3_user_portrait(cl_id, redis_client):
user_df = get_tag3_user_log(cl_id) user_df = get_tag3_user_log(cl_id)
if not user_df.empty: if not user_df.empty:
user_df["first_solutions"] = list(zip(user_df["first_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"])) user_df["first_solutions"] = list(zip(user_df["first_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
...@@ -70,8 +70,11 @@ def update_tag3_user_portrait(cl_id): ...@@ -70,8 +70,11 @@ def update_tag3_user_portrait(cl_id):
"projects": projects_score "projects": projects_score
} }
if (len(first_demands_score.keys()) > 0) or (len(second_demands_score.keys()) > 0) or \
(len(first_solutions_score.keys()) > 0) or (len(second_solutions_score.keys()) > 0) or \
(len(first_positions_score.keys()) > 0) or (len(second_positions_score.keys()) > 0) or \
(len(projects_score.keys()) > 0):
key = "doris:user_portrait:tag3:device_id:" + str(cl_id) key = "doris:user_portrait:tag3:device_id:" + str(cl_id)
redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379")
redis_client.set(key, json.dumps(res)) redis_client.set(key, json.dumps(res))
redis_client.expire(key, 60 * 60 * 24 * 30) redis_client.expire(key, 60 * 60 * 24 * 30)
...@@ -121,8 +124,10 @@ def consume_kafka(): ...@@ -121,8 +124,10 @@ def consume_kafka():
spark.sparkContext.setLogLevel("WARN") spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py") spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000) device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000)
result = device_ids_lst_rdd.repartition(100).map(lambda x: update_tag3_user_portrait(x)) result = device_ids_lst_rdd.repartition(100).map(lambda x: update_tag3_user_portrait(x, redis_client))
# result.foreach(print) # result.foreach(print)
result.collect() result.collect()
spark.stop() spark.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment