Commit e8df6b73 authored by 高雅喆's avatar 高雅喆

update

parent 2c43c3d4
...@@ -52,79 +52,80 @@ def get_user_service_portrait(cl_id, all_word_tags, all_tag_tag_type, all_3tag_2 ...@@ -52,79 +52,80 @@ def get_user_service_portrait(cl_id, all_word_tags, all_tag_tag_type, all_3tag_2
user_df_service["days_diff_now"] = round((int(time.time()) - user_df_service["time"].astype(float)) / (24 * 60 * 60)) user_df_service["days_diff_now"] = round((int(time.time()) - user_df_service["time"].astype(float)) / (24 * 60 * 60))
user_df_service["tag_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag_id"]), axis=1) user_df_service["tag_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag_id"]), axis=1)
user_df_service = user_df_service[user_df_service['tag_type'].isin(['2','3'])] user_df_service = user_df_service[user_df_service['tag_type'].isin(['2','3'])]
user_log_df_tag2_list = user_df_service[user_df_service['tag_type'] == '2']['tag_id'].unique().tolist() if not user_df_service.empty:
user_df_service["tag2"] = user_df_service.apply(lambda x: user_log_df_tag2_list = user_df_service[user_df_service['tag_type'] == '2']['tag_id'].unique().tolist()
get_tag2_from_tag3(x.tag_id, all_3tag_2tag, user_log_df_tag2_list) user_df_service["tag2"] = user_df_service.apply(lambda x:
if x.tag_type == '3' else x.tag_id, axis=1) get_tag2_from_tag3(x.tag_id, all_3tag_2tag, user_log_df_tag2_list)
user_df_service["tag2_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag2"]), axis=1) if x.tag_type == '3' else x.tag_id, axis=1)
# 算分及比例 user_df_service["tag2_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag2"]), axis=1)
user_df_service["tag_score"] = user_df_service.apply( # 算分及比例
lambda x: compute_henqiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "henqiang" else ( user_df_service["tag_score"] = user_df_service.apply(
compute_jiaoqiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "jiaoqiang" else ( lambda x: compute_henqiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "henqiang" else (
compute_ai_scan(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ai_scan" else ( compute_jiaoqiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "jiaoqiang" else (
compute_ruoyixiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ruoyixiang" else compute_ai_scan(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ai_scan" else (
compute_validate(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time)))), axis=1) compute_ruoyixiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ruoyixiang" else
tag_score_sum = user_df_service.groupby(by=["tag2", "tag2_type"]).agg( compute_validate(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time)))), axis=1)
{'tag_score': 'sum', 'cl_id': 'first', 'action': 'first'}).reset_index().sort_values(by=["tag_score"], tag_score_sum = user_df_service.groupby(by=["tag2", "tag2_type"]).agg(
ascending=False) {'tag_score': 'sum', 'cl_id': 'first', 'action': 'first'}).reset_index().sort_values(by=["tag_score"],
tag_score_sum['weight'] = 100 * tag_score_sum['tag_score'] / tag_score_sum['tag_score'].sum() ascending=False)
tag_score_sum["pay_type"] = tag_score_sum.apply( tag_score_sum['weight'] = 100 * tag_score_sum['tag_score'] / tag_score_sum['tag_score'].sum()
lambda x: 3 if x.action == "api/order/validate" else ( tag_score_sum["pay_type"] = tag_score_sum.apply(
2 if x.action == "api/settlement/alipay_callback" else 1 lambda x: 3 if x.action == "api/order/validate" else (
), axis=1 2 if x.action == "api/settlement/alipay_callback" else 1
) ), axis=1
gmkv_tag_score_sum = tag_score_sum[["tag2", "tag_score", "weight"]][:size].to_dict('record') )
gmkv_tag_score2_sum = tag_score_sum[["tag2", "tag_score"]][:size].to_dict('record') gmkv_tag_score_sum = tag_score_sum[["tag2", "tag_score", "weight"]][:size].to_dict('record')
gmkv_tag_score2_sum_dict = {i["tag2"]: i["tag_score"] for i in gmkv_tag_score2_sum} gmkv_tag_score2_sum = tag_score_sum[["tag2", "tag_score"]][:size].to_dict('record')
gmkv_tag_score2_sum_dict = {i["tag2"]: i["tag_score"] for i in gmkv_tag_score2_sum}
# 写redis
redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN9@172.16.40.173:6379') # 写redis
cl_id_portrait_key2 = "user:service_portrait_tags2:cl_id:" + str(cl_id) redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN9@172.16.40.173:6379')
# 如果画像的tag个数小于5,则补充热搜词 cl_id_portrait_key2 = "user:service_portrait_tags2:cl_id:" + str(cl_id)
if len(gmkv_tag_score2_sum_dict) < 5: # 如果画像的tag个数小于5,则补充热搜词
hot_search_wordskey2 = "user:service_coldstart_tags2" if len(gmkv_tag_score2_sum_dict) < 5:
hot_search_words_score = redis_client.hgetall(hot_search_wordskey2) hot_search_wordskey2 = "user:service_coldstart_tags2"
for tag_id in hot_search_words_score: hot_search_words_score = redis_client.hgetall(hot_search_wordskey2)
if int(tag_id) in gmkv_tag_score2_sum_dict: for tag_id in hot_search_words_score:
continue if int(tag_id) in gmkv_tag_score2_sum_dict:
else: continue
gmkv_tag_score2_sum_dict.update({int(tag_id): float(hot_search_words_score[tag_id])}) else:
if len(gmkv_tag_score2_sum_dict) > 4: gmkv_tag_score2_sum_dict.update({int(tag_id): float(hot_search_words_score[tag_id])})
break if len(gmkv_tag_score2_sum_dict) > 4:
redis_client.delete(cl_id_portrait_key2) break
redis_client.hmset(cl_id_portrait_key2, gmkv_tag_score2_sum_dict) redis_client.delete(cl_id_portrait_key2)
redis_client.expire(cl_id_portrait_key2, time=30 * 24 * 60 * 60) redis_client.hmset(cl_id_portrait_key2, gmkv_tag_score2_sum_dict)
redis_client.expire(cl_id_portrait_key2, time=30 * 24 * 60 * 60)
# 标签name写redis
cl_id_portrait_key3 = "user:service_portrait_tags3:cl_id:" + str(cl_id) # 标签name写redis
gmkv_tag_score3_sum_dict = {all_tags_name[i]: gmkv_tag_score2_sum_dict[i] for i in gmkv_tag_score2_sum_dict} cl_id_portrait_key3 = "user:service_portrait_tags3:cl_id:" + str(cl_id)
redis_client.delete(cl_id_portrait_key3) gmkv_tag_score3_sum_dict = {all_tags_name[i]: gmkv_tag_score2_sum_dict[i] for i in gmkv_tag_score2_sum_dict}
redis_client.hmset(cl_id_portrait_key3, gmkv_tag_score3_sum_dict) redis_client.delete(cl_id_portrait_key3)
redis_client.expire(cl_id_portrait_key3, time=30 * 24 * 60 * 60) redis_client.hmset(cl_id_portrait_key3, gmkv_tag_score3_sum_dict)
redis_client.expire(cl_id_portrait_key3, time=30 * 24 * 60 * 60)
# 写tidb,redis同步
stat_date = datetime.datetime.today().strftime('%Y-%m-%d') # 写tidb,redis同步
replace_sql = """replace into user_service_portrait_tags (stat_date, cl_id, tag_list) values("{stat_date}","{cl_id}","{tag_list}")"""\ stat_date = datetime.datetime.today().strftime('%Y-%m-%d')
.format(stat_date=stat_date, cl_id=cl_id, tag_list=gmkv_tag_score_sum) replace_sql = """replace into user_service_portrait_tags (stat_date, cl_id, tag_list) values("{stat_date}","{cl_id}","{tag_list}")"""\
cur_jerry_test.execute(replace_sql) .format(stat_date=stat_date, cl_id=cl_id, tag_list=gmkv_tag_score_sum)
db_jerry_test.commit() cur_jerry_test.execute(replace_sql)
cur_jerry_test.close() db_jerry_test.commit()
db_jerry_test.close() cur_jerry_test.close()
# # 写tidb 用户分层营销 db_jerry_test.close()
# # todo 不准确,因为聚合后,一个标签会有多个来源,即多个pay_type # # 写tidb 用户分层营销
# score_result = tag_score_sum[["tag2", "cl_id", "tag_score", "weight", "pay_type"]] # # todo 不准确,因为聚合后,一个标签会有多个来源,即多个pay_type
# score_result.rename(columns={"tag2": "tag_id", "cl_id": "device_id", "tag_score": "score"}, inplace=True) # score_result = tag_score_sum[["tag2", "cl_id", "tag_score", "weight", "pay_type"]]
# delete_sql = "delete from api_market_personas where device_id='{}'".format(cl_id) # score_result.rename(columns={"tag2": "tag_id", "cl_id": "device_id", "tag_score": "score"}, inplace=True)
# cur_jerry_test.execute(delete_sql) # delete_sql = "delete from api_market_personas where device_id='{}'".format(cl_id)
# db_jerry_test.commit() # cur_jerry_test.execute(delete_sql)
# # db_jerry_test.commit()
# for index, row in score_result.iterrows(): #
# insert_sql = "insert into api_market_personas values (null, {}, '{}', {}, {}, {})".format( # for index, row in score_result.iterrows():
# row['tag_id'], row['device_id'], row['score'], row['weight'], row['pay_type']) # insert_sql = "insert into api_market_personas values (null, {}, '{}', {}, {}, {})".format(
# cur_jerry_test.execute(insert_sql) # row['tag_id'], row['device_id'], row['score'], row['weight'], row['pay_type'])
# db_jerry_test.commit() # cur_jerry_test.execute(insert_sql)
# db_jerry_test.close() # db_jerry_test.commit()
return cl_id # db_jerry_test.close()
return cl_id
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment