tractate.py 6.98 KB
import multiprocessing
import os
import sys
import time
from collections import defaultdict

sys.path.append(os.path.realpath("."))

from gensim.models import Word2Vec, word2vec
from utils.date import get_ndays_before_no_minus, get_ndays_before_with_format
from utils.files import DATA_PATH, MODEL_PATH
from utils.spark import get_spark

tractate_click_ids_model_path = os.path.join(MODEL_PATH, "tractate_click_ids_item2vec_model")

try:
    TRACTATE_CLICK_IDS_MODEL = word2vec.Word2Vec.load(tractate_click_ids_model_path)
    TRACTATE_CLICK_IDS = set(TRACTATE_CLICK_IDS_MODEL.wv.vocab.keys())
except Exception as e:
    print(e)


def get_tracate_click_data(spark, start, end):
    reg = r"""^\\d+$"""
    sql = """
        SELECT DISTINCT t1.partition_date, t1.cl_id, cast(t1.business_id as int) card_id, t1.app_session_id
          FROM
          (select partition_date,cl_id,business_id,action,page_name,page_stay, app_session_id
          from online.bl_hdfs_maidian_updates
          where action = 'page_view'
            AND partition_date BETWEEN '{}' AND '{}'
            AND page_name='user_post_detail'
            AND page_stay>=2
            AND cl_id is not null
            AND cl_id != ''
            AND business_id is not null
            AND business_id != ''
            AND business_id rlike '{}'
            ) AS t1
          JOIN
          (select partition_date,active_type,first_channel_source_type,device_id
          from online.ml_device_day_active_status
          where partition_date BETWEEN '{}' AND '{}'
            AND active_type IN ('1', '2', '4')
            AND first_channel_source_type not IN ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
                  ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
                  ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
                  ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
                  ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
                  ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
                  ,'promotion_shike','promotion_julang_jl03','promotion_zuimei')
            AND first_channel_source_type not LIKE 'promotion\\_jf\\_%') as t2
          ON t1.cl_id = t2.device_id
          AND t1.partition_date = t2.partition_date
      LEFT JOIN
    (
        SELECT DISTINCT device_id
        FROM ml.ml_d_ct_dv_devicespam_d  --去除机构刷单设备,即作弊设备(浏览和曝光事件去除)
        WHERE partition_day='{}'

        UNION ALL
        SELECT DISTINCT device_id
        FROM dim.dim_device_user_staff   --去除内网用户
    )spam_pv
    on spam_pv.device_id=t1.cl_id
        LEFT JOIN
    (
        SELECT partition_date,device_id
        FROM
        (--找出user_id当天活跃的第一个设备id
            SELECT user_id,partition_date,
                    if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
              FROM online.ml_user_updates
              WHERE partition_date>='{}' AND partition_date<'{}'
        )t1
        JOIN
        (  --医生账号
            SELECT distinct user_id
            FROM online.tl_hdfs_doctor_view
            WHERE partition_date = '{}'

            --马甲账号/模特用户
            UNION ALL
            SELECT user_id
            FROM ml.ml_c_ct_ui_user_dimen_d
            WHERE partition_day = '{}'
            AND (is_puppet = 'true' or is_classifyuser = 'true')

            UNION ALL
            --公司内网覆盖用户
            select distinct user_id
            from dim.dim_device_user_staff

            UNION ALL
            --登陆过医生设备
            SELECT distinct t1.user_id
            FROM
            (
                SELECT user_id, v.device_id as device_id
                FROM online.ml_user_history_detail
                    LATERAL VIEW EXPLODE(device_history_list) v AS device_id
                WHERE partition_date = '{}'
            )t1
            JOIN
            (
                SELECT device_id
                FROM online.ml_device_history_detail
                WHERE partition_date = '{}'
                AND is_login_doctor = '1'
            )t2
                ON t1.device_id = t2.device_id
        )t2
        on t1.user_id=t2.user_id
        group by partition_date,device_id
    )dev
    on t1.partition_date=dev.partition_date and t1.cl_id=dev.device_id
    WHERE (spam_pv.device_id IS NULL or spam_pv.device_id ='')
    and (dev.device_id is null or dev.device_id ='')
     """.format(start, end, reg, start, end, end, start, end, end, end, end, end)
    # print("sql", flush=True)
    # print(sql, flush=True)
    df = spark.sql(sql)
    return df


def get_device_click_tractate_ids_dict(click_df):
    res = defaultdict(list)
    cols = click_df.orderBy("partition_date", ascending=False).collect()
    for i in cols:
        card_id = i["card_id"]
        session_id = i["app_session_id"]
        if card_id not in res[session_id]:
            res[session_id].append(card_id)
    return res


def save_clicked_tractate_ids_item2vec():
    click_ids = []
    with open(os.path.join(DATA_PATH, "click_tractate_ids.csv"), "r") as f:
        data = f.readlines()
        for i in data:
            tmp = i.split("|")
            # app_session_id = tmp[0]
            ids = tmp[1].rstrip("\n").split(",")
            click_ids.append(ids)
    model = Word2Vec(click_ids, hs=0, min_count=3, workers=multiprocessing.cpu_count(), iter=10)
    print(model)
    print(len(click_ids))

    model.save(tractate_click_ids_model_path)
    return model


if __name__ == "__main__":
    begin_time = time.time()

    spark = get_spark("tractate_click_ids")
    click_df = get_tracate_click_data(spark, get_ndays_before_no_minus(180), get_ndays_before_no_minus(1))
    click_df.show(5, False)

    res_dict = get_device_click_tractate_ids_dict(click_df)

    with open(os.path.join(DATA_PATH, "click_tractate_ids.csv"), "w") as f:
        for (k, v) in res_dict.items():
            if v:
                f.write("{}|{}\n".format(k, ",".join([str(x) for x in v])))
    print("write data done.")

    save_clicked_tractate_ids_item2vec()

    for id in ["84375", "148764", "368399"]:
        print(TRACTATE_CLICK_IDS_MODEL.wv.most_similar(id, topn=5))

    print("total cost: {:.2f}mins".format((time.time() - begin_time) / 60))

# spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/strategy_embedding/word_vector/tractate.py