Commit e418fb7a authored by 高雅喆's avatar 高雅喆

update

parent 3e0282f2
......@@ -91,7 +91,7 @@ def tag_list2dict(lst, size):
return result[:size]
def get_user_tag_score(cl_id, all_log_df, stat_date, size=10):
def get_user_tag_score(cl_id, all_log_df, gm_kv_cli, size=10):
try:
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
......@@ -110,17 +110,16 @@ def get_user_tag_score(cl_id, all_log_df, stat_date, size=10):
finally_score_lst = finally_score[["tag_id","tag_score"]].to_dict('record')
tag_id_list = tag_list2dict(finally_score_lst, size)
# 写gmkv
gm_kv_cli = redis.Redis(host="172.16.40.135", port=5379, db=2, socket_timeout=2000)
cl_id_portrait_key = "user:portrait_tags:cl_id:" + str(cl_id)
tag_id_list_json = json.dumps(tag_id_list)
gm_kv_cli.set(cl_id_portrait_key, tag_id_list_json)
# 写tidb
replace_sql = """replace into user_portrait_tags (stat_date, cl_id, tag_list) values("{stat_date}","{cl_id}","{tag_list}")"""\
.format(stat_date=stat_date, cl_id=cl_id, tag_list=tag_id_list)
cur_jerry_test.execute(replace_sql)
db_jerry_test.commit()
db_jerry_test.close()
return "sucess"
# replace_sql = """replace into user_portrait_tags (stat_date, cl_id, tag_list) values("{stat_date}","{cl_id}","{tag_list}")"""\
# .format(stat_date=stat_date, cl_id=cl_id, tag_list=tag_id_list)
# cur_jerry_test.execute(replace_sql)
# db_jerry_test.commit()
# db_jerry_test.close()
# return "sucess"
except Exception as e:
return 'pass'
......@@ -155,9 +154,9 @@ if __name__ == '__main__':
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
gm_kv_cli = redis.Redis(host="172.16.40.135", port=5379, db=2, socket_timeout=2000)
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst)
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_tag_score(x, all_log_df, stat_date))
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_tag_score(x, all_log_df, gm_kv_cli))
result.collect()
# result_last = result_rename.withColumn("stat_date", lit(stat_date))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment