Commit 578d8211 authored by 高雅喆's avatar 高雅喆

优化

parent 73ebb97e
...@@ -97,13 +97,20 @@ def tag_list2dict(lst, size): ...@@ -97,13 +97,20 @@ def tag_list2dict(lst, size):
return result[:size] return result[:size]
def get_user_tag_score(cl_id, all_log_df, size=10): def get_user_tag_score(cl_id, size=10):
try: try:
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8') db='jerry_test', charset='utf8')
cur_jerry_test = db_jerry_test.cursor() cur_jerry_test = db_jerry_test.cursor()
user_log_df = all_log_df.loc[all_log_df['cl_id'] == cl_id]
# 获取该用户的所有行为
sql_user_log = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log where cl_id ='{}'".format(cl_id)
cur_jerry_test.execute(sql_user_log)
user_log = cur_jerry_test.fetchall()
user_log_df = pd.DataFrame(list(user_log))
user_log_df.columns = ["time", "cl_id", "score_type","tag_id","tag_referrer","action"]
if not user_log_df.empty: if not user_log_df.empty:
user_log_df["tag_id"] = np.where(user_log_df["action"] == "do_search",user_log_df["tag_referrer"],user_log_df["tag_id"]) user_log_df["tag_id"] = np.where(user_log_df["action"] == "do_search",user_log_df["tag_referrer"],user_log_df["tag_id"])
user_log_df["days_diff_now"] = round((int(time.time())-user_log_df["time"]) / (24*60*60)) user_log_df["days_diff_now"] = round((int(time.time())-user_log_df["time"]) / (24*60*60))
...@@ -122,12 +129,14 @@ def get_user_tag_score(cl_id, all_log_df, size=10): ...@@ -122,12 +129,14 @@ def get_user_tag_score(cl_id, all_log_df, size=10):
tag_id_list_json = json.dumps(tag_id_list) tag_id_list_json = json.dumps(tag_id_list)
gm_kv_cli.set(cl_id_portrait_key, tag_id_list_json) gm_kv_cli.set(cl_id_portrait_key, tag_id_list_json)
# 写tidb # 写tidb
stat_date = datetime.datetime.today().strftime('%Y-%m-%d')
replace_sql = """replace into user_portrait_tags (stat_date, cl_id, tag_list) values("{stat_date}","{cl_id}","{tag_list}")"""\ replace_sql = """replace into user_portrait_tags (stat_date, cl_id, tag_list) values("{stat_date}","{cl_id}","{tag_list}")"""\
.format(stat_date=stat_date, cl_id=cl_id, tag_list=tag_id_list) .format(stat_date=stat_date, cl_id=cl_id, tag_list=tag_id_list)
cur_jerry_test.execute(replace_sql) cur_jerry_test.execute(replace_sql)
db_jerry_test.commit() db_jerry_test.commit()
db_jerry_test.close() db_jerry_test.close()
return "sucess" return "sucess"
db_jerry_test.close()
except Exception as e: except Exception as e:
return 'pass' return 'pass'
...@@ -148,16 +157,8 @@ if __name__ == '__main__': ...@@ -148,16 +157,8 @@ if __name__ == '__main__':
# 获取所有用户的行为日志 # 获取所有用户的行为日志
# sql_all_log = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log" # sql_all_log = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log"
# 获取最近30天内的用户的所有行为
sql_all_log = "select time,cl_id,score_type,tag_id,tag_referrer,action from user_new_tag_log where cl_id in " \
"(select distinct cl_id from user_new_tag_log " \
"where time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day)))"
cur_jerry_test.execute(sql_all_log)
all_log = cur_jerry_test.fetchall()
db_jerry_test.close()
all_log_df = pd.DataFrame(list(all_log))
all_log_df.columns = ["time", "cl_id", "score_type","tag_id","tag_referrer","action"]
stat_date = datetime.datetime.today().strftime('%Y-%m-%d')
# rdd # rdd
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
...@@ -170,17 +171,8 @@ if __name__ == '__main__': ...@@ -170,17 +171,8 @@ if __name__ == '__main__':
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN") spark.sparkContext.setLogLevel("WARN")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst) device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst)
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_tag_score(x, all_log_df)) result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_tag_score(x))
result.collect() result.collect()
# result_last = result_rename.withColumn("stat_date", lit(stat_date))
# result_last.show()
# df = result_last.select("stat_date", "cl_id", concat_ws(',', 'tag_list').alias("tag_list"))
# df.show()
# df.write.jdbc(
# mode="overwrite",
# url="jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&useSSL=true",
# table="user_portrait_tags",
# properties={"driver": 'com.mysql.jdbc.Driver'})
except Exception as e: except Exception as e:
send_email("dist_update_user_portrait", "dist_update_user_portrait", "dist_update_user_portrait") send_email("dist_update_user_portrait", "dist_update_user_portrait", "dist_update_user_portrait")
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment