Commit e703252e authored by 高雅喆's avatar 高雅喆

test

parent 7445df8a
...@@ -41,93 +41,90 @@ def get_hot_search_words_tag(): ...@@ -41,93 +41,90 @@ def get_hot_search_words_tag():
def get_user_service_portrait(cl_id, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name, size=None): def get_user_service_portrait(cl_id, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name, size=None):
try: db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset='utf8')
db='jerry_test', charset='utf8') cur_jerry_test = db_jerry_test.cursor()
cur_jerry_test = db_jerry_test.cursor()
user_df_service = get_user_log(cl_id, all_word_tags)
user_df_service = get_user_log(cl_id, all_word_tags)
# 增加df字段(days_diff_now, tag_type, tag2)
# 增加df字段(days_diff_now, tag_type, tag2) if not user_df_service.empty:
if not user_df_service.empty: user_df_service["days_diff_now"] = round((int(time.time()) - user_df_service["time"].astype(float)) / (24 * 60 * 60))
user_df_service["days_diff_now"] = round((int(time.time()) - user_df_service["time"].astype(float)) / (24 * 60 * 60)) user_df_service["tag_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag_id"]), axis=1)
user_df_service["tag_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag_id"]), axis=1) user_df_service = user_df_service[user_df_service['tag_type'].isin(['2','3'])]
user_df_service = user_df_service[user_df_service['tag_type'].isin(['2','3'])] user_log_df_tag2_list = user_df_service[user_df_service['tag_type'] == '2']['tag_id'].unique().tolist()
user_log_df_tag2_list = user_df_service[user_df_service['tag_type'] == '2']['tag_id'].unique().tolist() user_df_service["tag2"] = user_df_service.apply(lambda x:
user_df_service["tag2"] = user_df_service.apply(lambda x: get_tag2_from_tag3(x.tag_id, all_3tag_2tag, user_log_df_tag2_list)
get_tag2_from_tag3(x.tag_id, all_3tag_2tag, user_log_df_tag2_list) if x.tag_type == '3' else x.tag_id, axis=1)
if x.tag_type == '3' else x.tag_id, axis=1) user_df_service["tag2_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag2"]), axis=1)
user_df_service["tag2_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag2"]), axis=1) # 算分及比例
# 算分及比例 user_df_service["tag_score"] = user_df_service.apply(
user_df_service["tag_score"] = user_df_service.apply( lambda x: compute_henqiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "henqiang" else (
lambda x: compute_henqiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "henqiang" else ( compute_jiaoqiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "jiaoqiang" else (
compute_jiaoqiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "jiaoqiang" else ( compute_ai_scan(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ai_scan" else (
compute_ai_scan(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ai_scan" else ( compute_ruoyixiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ruoyixiang" else
compute_ruoyixiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ruoyixiang" else compute_validate(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time)))), axis=1)
compute_validate(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time)))), axis=1) tag_score_sum = user_df_service.groupby(by=["tag2", "tag2_type"]).agg(
tag_score_sum = user_df_service.groupby(by=["tag2", "tag2_type"]).agg( {'tag_score': 'sum', 'cl_id': 'first', 'action': 'first'}).reset_index().sort_values(by=["tag_score"],
{'tag_score': 'sum', 'cl_id': 'first', 'action': 'first'}).reset_index().sort_values(by=["tag_score"], ascending=False)
ascending=False) tag_score_sum['weight'] = 100 * tag_score_sum['tag_score'] / tag_score_sum['tag_score'].sum()
tag_score_sum['weight'] = 100 * tag_score_sum['tag_score'] / tag_score_sum['tag_score'].sum() tag_score_sum["pay_type"] = tag_score_sum.apply(
tag_score_sum["pay_type"] = tag_score_sum.apply( lambda x: 3 if x.action == "api/order/validate" else (
lambda x: 3 if x.action == "api/order/validate" else ( 2 if x.action == "api/settlement/alipay_callback" else 1
2 if x.action == "api/settlement/alipay_callback" else 1 ), axis=1
), axis=1 )
) gmkv_tag_score_sum = tag_score_sum[["tag2", "tag_score", "weight"]][:size].to_dict('record')
gmkv_tag_score_sum = tag_score_sum[["tag2", "tag_score", "weight"]][:size].to_dict('record') gmkv_tag_score2_sum = tag_score_sum[["tag2", "tag_score"]][:size].to_dict('record')
gmkv_tag_score2_sum = tag_score_sum[["tag2", "tag_score"]][:size].to_dict('record') gmkv_tag_score2_sum_dict = {i["tag2"]: i["tag_score"] for i in gmkv_tag_score2_sum}
gmkv_tag_score2_sum_dict = {i["tag2"]: i["tag_score"] for i in gmkv_tag_score2_sum}
# 写redis
# 写redis redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN9@172.16.40.173:6379')
redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN9@172.16.40.173:6379') cl_id_portrait_key2 = "user:service_portrait_tags2:cl_id:" + str(cl_id)
cl_id_portrait_key2 = "user:service_portrait_tags2:cl_id:" + str(cl_id) # 如果画像的tag个数小于5,则补充热搜词
# 如果画像的tag个数小于5,则补充热搜词 if len(gmkv_tag_score2_sum_dict) < 5:
if len(gmkv_tag_score2_sum_dict) < 5: hot_search_wordskey2 = "user:service_coldstart_tags2"
hot_search_wordskey2 = "user:service_coldstart_tags2" hot_search_words_score = redis_client.hgetall(hot_search_wordskey2)
hot_search_words_score = redis_client.hgetall(hot_search_wordskey2) for tag_id in hot_search_words_score:
for tag_id in hot_search_words_score: if int(tag_id) in gmkv_tag_score2_sum_dict:
if int(tag_id) in gmkv_tag_score2_sum_dict: continue
continue else:
else: gmkv_tag_score2_sum_dict.update({tag_id: hot_search_words[tag_id]})
gmkv_tag_score2_sum_dict.update({tag_id: hot_search_words[tag_id]}) if len(gmkv_tag_score2_sum_dict) > 4:
if len(gmkv_tag_score2_sum_dict) > 4: break
break redis_client.delete(cl_id_portrait_key2)
redis_client.delete(cl_id_portrait_key2) redis_client.hmset(cl_id_portrait_key2, gmkv_tag_score2_sum_dict)
redis_client.hmset(cl_id_portrait_key2, gmkv_tag_score2_sum_dict) redis_client.expire(cl_id_portrait_key2, time=30 * 24 * 60 * 60)
redis_client.expire(cl_id_portrait_key2, time=30 * 24 * 60 * 60)
# 标签name写redis
# 标签name写redis cl_id_portrait_key3 = "user:service_portrait_tags3:cl_id:" + str(cl_id)
cl_id_portrait_key3 = "user:service_portrait_tags3:cl_id:" + str(cl_id) gmkv_tag_score3_sum_dict = {all_tags_name[i]: gmkv_tag_score2_sum_dict[i] for i in gmkv_tag_score2_sum_dict}
gmkv_tag_score3_sum_dict = {all_tags_name[i]: gmkv_tag_score2_sum_dict[i] for i in gmkv_tag_score2_sum_dict} redis_client.delete(cl_id_portrait_key3)
redis_client.delete(cl_id_portrait_key3) redis_client.hmset(cl_id_portrait_key3, gmkv_tag_score3_sum_dict)
redis_client.hmset(cl_id_portrait_key3, gmkv_tag_score3_sum_dict) redis_client.expire(cl_id_portrait_key3, time=30 * 24 * 60 * 60)
redis_client.expire(cl_id_portrait_key3, time=30 * 24 * 60 * 60)
# 写tidb,redis同步
# 写tidb,redis同步 stat_date = datetime.datetime.today().strftime('%Y-%m-%d')
stat_date = datetime.datetime.today().strftime('%Y-%m-%d') replace_sql = """replace into user_service_portrait_tags (stat_date, cl_id, tag_list) values("{stat_date}","{cl_id}","{tag_list}")"""\
replace_sql = """replace into user_service_portrait_tags (stat_date, cl_id, tag_list) values("{stat_date}","{cl_id}","{tag_list}")"""\ .format(stat_date=stat_date, cl_id=cl_id, tag_list=gmkv_tag_score_sum)
.format(stat_date=stat_date, cl_id=cl_id, tag_list=gmkv_tag_score_sum) cur_jerry_test.execute(replace_sql)
cur_jerry_test.execute(replace_sql) db_jerry_test.commit()
db_jerry_test.commit() cur_jerry_test.close()
cur_jerry_test.close() db_jerry_test.close()
db_jerry_test.close() # # 写tidb 用户分层营销
# # 写tidb 用户分层营销 # # todo 不准确,因为聚合后,一个标签会有多个来源,即多个pay_type
# # todo 不准确,因为聚合后,一个标签会有多个来源,即多个pay_type # score_result = tag_score_sum[["tag2", "cl_id", "tag_score", "weight", "pay_type"]]
# score_result = tag_score_sum[["tag2", "cl_id", "tag_score", "weight", "pay_type"]] # score_result.rename(columns={"tag2": "tag_id", "cl_id": "device_id", "tag_score": "score"}, inplace=True)
# score_result.rename(columns={"tag2": "tag_id", "cl_id": "device_id", "tag_score": "score"}, inplace=True) # delete_sql = "delete from api_market_personas where device_id='{}'".format(cl_id)
# delete_sql = "delete from api_market_personas where device_id='{}'".format(cl_id) # cur_jerry_test.execute(delete_sql)
# cur_jerry_test.execute(delete_sql) # db_jerry_test.commit()
# db_jerry_test.commit() #
# # for index, row in score_result.iterrows():
# for index, row in score_result.iterrows(): # insert_sql = "insert into api_market_personas values (null, {}, '{}', {}, {}, {})".format(
# insert_sql = "insert into api_market_personas values (null, {}, '{}', {}, {}, {})".format( # row['tag_id'], row['device_id'], row['score'], row['weight'], row['pay_type'])
# row['tag_id'], row['device_id'], row['score'], row['weight'], row['pay_type']) # cur_jerry_test.execute(insert_sql)
# cur_jerry_test.execute(insert_sql) # db_jerry_test.commit()
# db_jerry_test.commit() # db_jerry_test.close()
# db_jerry_test.close() return cl_id
return "sucess"
except Exception as e:
print(e)
if __name__ == '__main__': if __name__ == '__main__':
...@@ -210,4 +207,4 @@ if __name__ == '__main__': ...@@ -210,4 +207,4 @@ if __name__ == '__main__':
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py") spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000) device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000)
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_service_portrait(x, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name)) result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_service_portrait(x, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name))
result.collect() result.foreach(print)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment