Commit 824c467f authored by 高雅喆's avatar 高雅喆

update

parent c1c3c4ef
......@@ -58,7 +58,11 @@ def tag_list2dict(lst, size):
return result[:size]
def get_user_tag_score(cl_id, all_log_df, size=10):
def get_user_tag_score(cl_id, all_log_df, stat_date, size=10):
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur_jerry_test = db_jerry_test.cursor()
user_log_df = all_log_df.loc[all_log_df['cl_id'] == cl_id]
if not user_log_df.empty:
user_log_df["tag_id"] = np.where(user_log_df["action"] == "do_search",user_log_df["tag_referrer"],user_log_df["tag_id"])
......@@ -71,7 +75,14 @@ def get_user_tag_score(cl_id, all_log_df, size=10):
finally_score.drop_duplicates(subset="tag_id", inplace=True)
finally_score_lst = finally_score[["tag_id","tag_score"]].to_dict('record')
tag_id_list = tag_list2dict(finally_score_lst, size)
return cl_id, tag_id_list
replace_sql = """replace into user_portrait_tags (stat_date, cl_id, tag_list) values("{stat_date}","{cl_id}","{tag_list}")"""\
.format(stat_date=stat_date, cl_id=cl_id, tag_list=tag_id_list)
cur_jerry_test.execute(replace_sql)
db_jerry_test.commit()
db_jerry_test.close()
return "sucess"
def send_email(app,id,e):
......@@ -128,6 +139,7 @@ if __name__ == '__main__':
db_jerry_test.close()
all_log_df = pd.DataFrame(list(all_log))
all_log_df.columns = ["time", "cl_id", "score_type","tag_id","tag_referrer","action"]
stat_date = datetime.datetime.today().strftime('%Y-%m-%d')
# rdd
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
......@@ -142,18 +154,9 @@ if __name__ == '__main__':
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst)
gm_kv_cli = redis.Redis(host="172.16.40.135", port=5379, db=6, socket_timeout=2000)
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_tag_score(x, all_log_df))
a = result.collect()
print(a)
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_tag_score(x, all_log_df, stat_date))
# print(result.take(1))
stat_date = datetime.datetime.today().strftime('%Y-%m-%d')
for i in a:
insert_sql = """insert into user_portrait_tags values("{stat_date}","{cl_id}","{tag_list}")"""\
.format(stat_date=stat_date, cl_id=i[0], tag_list=i[1])
print(insert_sql)
cur_jerry_test.execute(insert_sql)
db_jerry_test.commit()
db_jerry_test.close()
# result_last = result_rename.withColumn("stat_date", lit(stat_date))
# result_last.show()
# df = result_last.select("stat_date", "cl_id", concat_ws(',', 'tag_list').alias("tag_list"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment