Commit 12ab4211 authored by 赵威's avatar 赵威

update key

parent 7b103f10
......@@ -70,11 +70,14 @@ def update_tag3_user_portrait(cl_id, redis_client):
"projects": projects_score
}
key = "doris:user_portrait:tag3:device_id:" + str(cl_id)
key2 = "doris:user_portrait:tag3:increment_update:device_id:" + str(cl_id)
redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379")
if (len(first_demands_score.keys()) > 0) or (len(second_demands_score.keys()) > 0) or \
(len(first_solutions_score.keys()) > 0) or (len(second_solutions_score.keys()) > 0) or \
(len(first_positions_score.keys()) > 0) or (len(second_positions_score.keys()) > 0) or \
(len(projects_score.keys()) > 0):
key = "doris:user_portrait:tag3:device_id:" + str(cl_id)
redis_client.set(key, json.dumps(res))
redis_client.expire(key, 60 * 60 * 24 * 30)
......@@ -88,8 +91,6 @@ def update_tag3_user_portrait(cl_id, redis_client):
"second_positions": list(second_positions_score.keys()),
"projects": list(projects_score.keys())
}
key2 = "doris:user_portrait:tag3:increment_update:device_id:" + str(cl_id)
redis_client.delete(key2)
redis_client.set(key2, json.dumps(res2))
redis_client.expire(key2, 60 * 60 * 24 * 30)
......@@ -97,6 +98,9 @@ def update_tag3_user_portrait(cl_id, redis_client):
",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()),
",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()),
",".join(projects_score.keys()))
else:
redis_client.delete(key)
redis_client.delete(key2)
return cl_id
......@@ -124,10 +128,8 @@ def consume_kafka():
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000)
result = device_ids_lst_rdd.repartition(100).map(lambda x: update_tag3_user_portrait(x, redis_client))
result = device_ids_lst_rdd.repartition(100).map(lambda x: update_tag3_user_portrait(x))
# result.foreach(print)
result.collect()
spark.stop()
......@@ -136,10 +138,6 @@ def consume_kafka():
if __name__ == "__main__":
# cl_id = "866017030837899"
# res = update_tag3_user_portrait(cl_id)
# print(res)
start = datetime.datetime.now()
consume_kafka()
end = datetime.datetime.now()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment