# -*- coding:UTF-8 -*- # @Time : 2020/9/23 14:26 # @File : high_quality_diary_analysis.py # @email : litao@igengmei.com # @author : litao """ 限定尾号的7000篇内容 曝光了哪些 用设备找到用户画像 二级诉求和项目分开做 用画像的标签 在7000篇内容中相关 候选池阅读完 标签在第一列 设备数 曝光数 曝光数 精准曝光占比 内容池数量 每个画像多少帖子 一共曝光了多少个帖子 """ import hashlib import json import pymysql import xlwt, datetime import redis # from pyhive import hive from maintenance.func_send_email_with_file import send_file_email from typing import Dict, List from elasticsearch_7 import Elasticsearch from elasticsearch_7.helpers import scan import sys import time from pyspark import SparkConf from pyspark.sql import SparkSession, DataFrame # from pyspark.sql.functions import lit # import pytispark.pytispark as pti from utils.func_from_es_get_article import get_user_post_from_mysql, get_diary_from_mysql from meta_base_code.utils.func_from_redis_get_portrait import user_portrait_scan_info, get_user_portrait_tag3_from_redis from meta_base_code.utils.func_from_es_get_article import get_es_article_num, get_user_post_from_mysql def con_sql(sql): # 从数据库的表里获取数据 db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm', db='doris_olap') cursor = db.cursor() cursor.execute(sql) result = cursor.fetchall() db.close() return result startTime = time.time() sparkConf = SparkConf() sparkConf.set("spark.sql.crossJoin.enabled", True) sparkConf.set("spark.debug.maxToStringFields", "100") sparkConf.set("spark.tispark.plan.allow_index_double_read", False) sparkConf.set("spark.tispark.plan.allow_index_read", True) sparkConf.set("spark.hive.mapred.supports.subdirectories", True) sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True) sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("mapreduce.output.fileoutputformat.compress", False) sparkConf.set("mapreduce.map.output.compress", False) sparkConf.set("prod.gold.jdbcuri", "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true") sparkConf.set("prod.mimas.jdbcuri", "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true") sparkConf.set("prod.gaia.jdbcuri", "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true") sparkConf.set("prod.tidb.jdbcuri", "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") # sparkConf.set("prod.jerry.jdbcuri", # "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000") # sparkConf.set("prod.tidb.database", "jerry_prod") sparkConf.setAppName("high_quality_diary_analysis") spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") .config("spark.tispark.pd.addresses", "172.16.40.170:2379").enableHiveSupport().getOrCreate()) spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar") spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar") spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'") spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'") sql = """ select card_id from strategy_content_exposure_index where card_content_type="diary" and preciseexposure_num>=50 and ctr>=0.05 and avg_page_stay>=20 and create_day="2020-09-17" """ second_demands_count_dict, tags_v3_count_dict, second_demands_card_id_list, tags_v3_card_id_list, second_demands_tractate_dict, tags_v3_tractate_dict = get_diary_from_mysql( sql,id_type='diary') print(second_demands_count_dict, tags_v3_count_dict, second_demands_card_id_list, tags_v3_card_id_list) time.sleep(20) t = 1 day_num = 0 - t now = (datetime.datetime.now() + datetime.timedelta(days=day_num)) last_30_day_str = (now + datetime.timedelta(days=-31)).strftime("%Y%m%d") today_str = now.strftime("%Y%m%d") today_str_format = now.strftime("%Y-%m-%d") yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d") yesterday_str_format = (now + datetime.timedelta(days=-1)).strftime("%Y-%m-%d") one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d") device_id_dict = {} huidu_device_id_sql = r""" select t2.device_id from (select distinct(first_device) as device_id from online.ml_user_history_detail where partition_date = {today_str} and substr(md5(first_device),-1) in ('8', '9', 'a', 'b') and last_active_date >= {last_30_day_str}) t2 LEFT JOIN ( select distinct device_id from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D where PARTITION_DAY = '{today_str}' AND is_abnormal_device = 'true' )dev on t2.device_id=dev.device_id WHERE dev.device_id IS NULL """.format(today_str=today_str, last_30_day_str=last_30_day_str) print(huidu_device_id_sql) huidu_device_id_df = spark.sql(huidu_device_id_sql) huidu_device_id_df.createOrReplaceTempView("dev_view") huidu_device_id_df.show(1) sql_res = huidu_device_id_df.collect() # 设备id for device_id in sql_res: device_id_dict[device_id.device_id] = 1 # 设备对应画像数量 second_demands_tag_count = {} projects_demands_tag_count = {} # 设备id对应的画像 second_demands_tag_dict = {} projects_demands_tag_dict = {} total_tag_count = {} total_tag_count_pro = {} temp_null_count = 0 for redis_count, device_id in enumerate(device_id_dict): # if redis_count >= 50:break second_demands = [] projects = [] total_answer_content_num = 0 total_tractate_content_num = 0 total_diary_content_num = 0 # print(sql_res) try: res = get_user_portrait_tag3_from_redis(device_id) except: continue if res.get("second_demands"): second_demands = res.get("second_demands") # print(second_demands) for tag in second_demands: if tag in second_demands_tag_count: second_demands_tag_count[tag] += 1 else: second_demands_tag_count[tag] = 1 if tag in second_demands_count_dict: total_tractate_content_num += second_demands_count_dict[tag] second_demands_tag_dict[device_id] = second_demands if res.get("projects"): projects = res.get("projects") # print(projects) for tag in projects: if tag in projects_demands_tag_count: projects_demands_tag_count[tag] += 1 else: projects_demands_tag_count[tag] = 1 if tag in tags_v3_count_dict: total_tractate_content_num += tags_v3_count_dict[tag] projects_demands_tag_dict[device_id] = projects # print(total_answer_content_num, total_tractate_content_num, total_diary_content_num) tmp_count_num = 0 # 7000片内容的曝光 exposure_sql = """ select C.DEVICE_ID as devcie_id,C.CARD_ID as card_id from (SELECT T.DEVICE_ID as DEVICE_ID, --设备ID T.CARD_ID as CARD_ID --卡片ID --COUNT(T.CARD_ID) AS EXPOSURE --点击次数 FROM ML.MID_ML_C_ET_PE_PRECISEEXPOSURE_DIMEN_D T WHERE T.PARTITION_DAY >= '20200918' AND T.PAGE_CODE = 'search_result_diary' GROUP BY T.DEVICE_ID, T.CARD_ID) C LEFT JOIN ( SELECT DISTINCT device_id FROM ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除) WHERE partition_day='{partition_day}' UNION ALL SELECT DISTINCT device_id FROM dim.dim_device_user_staff --去除内网用户 )spam_pv on spam_pv.device_id=C.DEVICE_ID LEFT JOIN ( SELECT partition_date,device_id FROM (--找出user_id当天活跃的第一个设备id SELECT user_id,partition_date, if(size(device_list) > 0, device_list [ 0 ], '') AS device_id FROM online.ml_user_updates WHERE partition_date>='{partition_day}' AND partition_date<'{end_date}' )t1 JOIN ( --医生账号 SELECT distinct user_id FROM online.tl_hdfs_doctor_view WHERE partition_date = '{partition_day}' --马甲账号/模特用户 UNION ALL SELECT user_id FROM ml.ml_c_ct_ui_user_dimen_d WHERE partition_day = '{partition_day}' AND (is_puppet = 'true' or is_classifyuser = 'true') UNION ALL --公司内网覆盖用户 select distinct user_id from dim.dim_device_user_staff UNION ALL --登陆过医生设备 SELECT distinct t1.user_id FROM ( SELECT user_id, v.device_id as device_id FROM online.ml_user_history_detail LATERAL VIEW EXPLODE(device_history_list) v AS device_id WHERE partition_date = '{partition_day}' )t1 JOIN ( SELECT device_id FROM online.ml_device_history_detail WHERE partition_date = '{partition_day}' AND is_login_doctor = '1' )t2 ON t1.device_id = t2.device_id )t2 on t1.user_id=t2.user_id group by partition_date,device_id )dev on C.DEVICE_ID=dev.device_id WHERE (spam_pv.device_id IS NULL or spam_pv.device_id = '') and (dev.device_id is null or dev.device_id='') """.format(partition_day=yesterday_str, end_date=today_str) print(exposure_sql) exposure_df = spark.sql(exposure_sql) # exposure_df.createOrReplaceTempView("exposure_df") exposure_df.show(1) sql_res = exposure_df.collect() session_pv_all = 0 card_id_set = set() second_demands_id_count = {} projects_demands_id_count = {} baoguang_dict = {} # 遍历card_id 找出对应的device_id是否在灰度里 # 找出card_id 对应帖子的标签 并分类汇总 得到 标签-计数字段 for res in sql_res: # partition_date = res.partition_date # print(res) cl_id = res.devcie_id card_id = res.card_id if cl_id in device_id_dict: # print("has device") # print(type(card_id),card_id) # print("has card_id") # session_pv = res.session_pv # card_id_set.update(card_id) if cl_id in second_demands_tag_dict: if int(card_id) in second_demands_tractate_dict: # print(cl_id, second_demands_tag_dict[card_id]) for tag_id in second_demands_tractate_dict[int(card_id)]: if tag_id in second_demands_id_count: second_demands_id_count[tag_id][int(card_id)] = 1 else: second_demands_id_count[tag_id] = {} second_demands_id_count[tag_id][int(card_id)] = 1 # if tag_id in baoguang_dict: # baoguang_dict[tag_id] += session_pv # else: # baoguang_dict[tag_id] = session_pv if cl_id in projects_demands_tag_dict: if int(card_id) in tags_v3_tractate_dict: # print(cl_id,projects_demands_tag_dict[cl_id]) for tag_id in tags_v3_tractate_dict[int(card_id)]: if tag_id in projects_demands_id_count: projects_demands_id_count[tag_id][int(card_id)] = 1 else: projects_demands_id_count[tag_id] = {} projects_demands_id_count[tag_id][int(card_id)] = 1 final_projects_list = [] second_demands_list = [] print(projects_demands_id_count) time.sleep(10) print(second_demands_id_count) time.sleep(10) for tag_id in second_demands_tag_count: temp_dict = { "tag_name": tag_id, "device_count": second_demands_tag_count[tag_id], "tractate_count": second_demands_count_dict.get(tag_id), "exporsure_count": len(second_demands_id_count[tag_id]) if second_demands_id_count.get(tag_id) else 0, } print(temp_dict['tag_name'], temp_dict['device_count'], temp_dict['tractate_count'], temp_dict['exporsure_count']) # print(temp_dict) # if temp_dict['tractate_count'] < temp_dict['exporsure_count']: # print("error" , second_demands_id_count[tag_id]) # print(1) # for tag_id in second_demands_count_dict: # if tag_id not in second_demands_tag_count: # temp_dict = { # "tag_name": tag_id, # "device_count": second_demands_tag_count.get(tag_id), # "tractate_count": second_demands_count_dict.get(tag_id), # "exporsure_count": 0, # # } # print(temp_dict) print("----------------------------------------------") for tag_id in projects_demands_tag_count: temp_dict = { "tag_name": tag_id, "device_count": projects_demands_tag_count[tag_id], "tractate_count": tags_v3_count_dict.get(tag_id), "exporsure_count": len(projects_demands_id_count[tag_id]) if projects_demands_id_count.get(tag_id) else 0, } # if temp_dict['tractate_count'] < temp_dict['exporsure_count']: # print("error" , projects_demands_id_count[tag_id]) print(temp_dict['tag_name'], temp_dict['device_count'], temp_dict['tractate_count'], temp_dict['exporsure_count']) # print(2) # for tag_id in tags_v3_count_dict: # if tag_id not in projects_demands_tag_count: # temp_dict = { # "tag_name": tag_id, # "device_count": projects_demands_tag_count.get(tag_id), # "tractate_count": tags_v3_count_dict.get(tag_id), # "exporsure_count": 0, # # } # print(temp_dict['tag_name'],temp_dict['device_count'],temp_dict['tractate_count'],temp_dict['exporsure_count'])