import multiprocessing import os import sys import time from collections import defaultdict sys.path.append(os.path.realpath(".")) from gensim.models import Word2Vec, word2vec from utils.date import get_ndays_before_no_minus, get_ndays_before_with_format from utils.files import DATA_PATH, MODEL_PATH from utils.spark import get_spark tractate_click_ids_model_path = os.path.join(MODEL_PATH, "tractate_click_ids_item2vec_model") try: TRACTATE_CLICK_IDS_MODEL = word2vec.Word2Vec.load(tractate_click_ids_model_path) TRACTATE_CLICK_IDS = set(TRACTATE_CLICK_IDS_MODEL.wv.vocab.keys()) except Exception as e: print(e) def get_tracate_click_data(spark, start, end): reg = r"""^\\d+$""" sql = """ SELECT DISTINCT t1.partition_date, t1.cl_id, cast(t1.business_id as int) card_id, t1.app_session_id FROM (select partition_date,cl_id,business_id,action,page_name,page_stay, app_session_id from online.bl_hdfs_maidian_updates where action = 'page_view' AND partition_date BETWEEN '{}' AND '{}' AND page_name='user_post_detail' AND page_stay>=4 AND cl_id is not null AND cl_id != '' AND business_id is not null AND business_id != '' AND business_id rlike '{}' ) AS t1 JOIN (select partition_date,active_type,first_channel_source_type,device_id from online.ml_device_day_active_status where partition_date BETWEEN '{}' AND '{}' AND active_type IN ('1', '2', '4') AND first_channel_source_type not IN ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' ,'promotion_shike','promotion_julang_jl03','promotion_zuimei') AND first_channel_source_type not LIKE 'promotion\\_jf\\_%') as t2 ON t1.cl_id = t2.device_id AND t1.partition_date = t2.partition_date LEFT JOIN ( SELECT DISTINCT device_id FROM ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除) WHERE partition_day='{}' UNION ALL SELECT DISTINCT device_id FROM dim.dim_device_user_staff --去除内网用户 )spam_pv on spam_pv.device_id=t1.cl_id LEFT JOIN ( SELECT partition_date,device_id FROM (--找出user_id当天活跃的第一个设备id SELECT user_id,partition_date, if(size(device_list) > 0, device_list [ 0 ], '') AS device_id FROM online.ml_user_updates WHERE partition_date>='{}' AND partition_date<'{}' )t1 JOIN ( --医生账号 SELECT distinct user_id FROM online.tl_hdfs_doctor_view WHERE partition_date = '{}' --马甲账号/模特用户 UNION ALL SELECT user_id FROM ml.ml_c_ct_ui_user_dimen_d WHERE partition_day = '{}' AND (is_puppet = 'true' or is_classifyuser = 'true') UNION ALL --公司内网覆盖用户 select distinct user_id from dim.dim_device_user_staff UNION ALL --登陆过医生设备 SELECT distinct t1.user_id FROM ( SELECT user_id, v.device_id as device_id FROM online.ml_user_history_detail LATERAL VIEW EXPLODE(device_history_list) v AS device_id WHERE partition_date = '{}' )t1 JOIN ( SELECT device_id FROM online.ml_device_history_detail WHERE partition_date = '{}' AND is_login_doctor = '1' )t2 ON t1.device_id = t2.device_id )t2 on t1.user_id=t2.user_id group by partition_date,device_id )dev on t1.partition_date=dev.partition_date and t1.cl_id=dev.device_id WHERE (spam_pv.device_id IS NULL or spam_pv.device_id ='') and (dev.device_id is null or dev.device_id ='') """.format(start, end, reg, start, end, end, start, end, end, end, end, end) # print("sql", flush=True) # print(sql, flush=True) df = spark.sql(sql) return df def get_device_click_tractate_ids_dict(click_df): res = defaultdict(list) cols = click_df.orderBy("partition_date", ascending=False).collect() for i in cols: card_id = i["card_id"] session_id = i["app_session_id"] if card_id not in res[session_id]: res[session_id].append(card_id) return res def save_clicked_tractate_ids_item2vec(): click_ids = [] with open(os.path.join(DATA_PATH, "click_tractate_ids.csv"), "r") as f: data = f.readlines() for i in data: tmp = i.split("|") # app_session_id = tmp[0] ids = tmp[1].rstrip("\n").split(",") click_ids.append(ids) model = Word2Vec(click_ids, hs=0, min_count=3, workers=multiprocessing.cpu_count(), iter=10) print(model) print(len(click_ids)) model.save(tractate_click_ids_model_path) return model if __name__ == "__main__": begin_time = time.time() spark = get_spark("tractate_click_ids") click_df = get_tracate_click_data(spark, get_ndays_before_no_minus(180), get_ndays_before_no_minus(1)) click_df.show(5, False) res_dict = get_device_click_tractate_ids_dict(click_df) with open(os.path.join(DATA_PATH, "click_tractate_ids.csv"), "w") as f: for (k, v) in res_dict.items(): if v: f.write("{}|{}\n".format(k, ",".join([str(x) for x in v]))) print("write data done.") save_clicked_tractate_ids_item2vec() for id in ["84375", "148764", "368399"]: print(TRACTATE_CLICK_IDS_MODEL.wv.most_similar(id, topn=5)) print("total cost: {:.2f}mins".format((time.time() - begin_time) / 60)) # spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/strategy_embedding/word_vector/tractate.py