# -*- coding:UTF-8 -*- # @Time : 2020/9/21 14:59 # @File : tractate_analysis_in_7000.py # @email : litao@igengmei.com # @author : litao """ 限定尾号的7000篇内容 曝光了哪些 用设备找到用户画像 二级诉求和项目分开做 用画像的标签 在7000篇内容中相关 候选池阅读完 标签在第一列 设备数 曝光数 曝光数 精准曝光占比 内容池数量 每个画像多少帖子 一共曝光了多少个帖子 """ import hashlib import json import pymysql import xlwt, datetime import redis # from pyhive import hive from maintenance.func_send_email_with_file import send_file_email from typing import Dict, List from elasticsearch_7 import Elasticsearch from elasticsearch_7.helpers import scan import sys import time from pyspark import SparkConf from pyspark.sql import SparkSession, DataFrame # from pyspark.sql.functions import lit # import pytispark.pytispark as pti from utils.func_from_es_get_article import get_user_post_from_mysql from meta_base_code.utils.func_from_redis_get_portrait import user_portrait_scan_info, get_user_portrait_tag3_from_redis from meta_base_code.utils.func_from_es_get_article import get_es_article_num, get_user_post_from_mysql def con_sql(sql): # 从数据库的表里获取数据 db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy', db='jerry_prod') cursor = db.cursor() cursor.execute(sql) result = cursor.fetchall() db.close() return result startTime = time.time() sparkConf = SparkConf() sparkConf.set("spark.sql.crossJoin.enabled", True) sparkConf.set("spark.debug.maxToStringFields", "100") sparkConf.set("spark.tispark.plan.allow_index_double_read", False) sparkConf.set("spark.tispark.plan.allow_index_read", True) sparkConf.set("spark.hive.mapred.supports.subdirectories", True) sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True) sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("mapreduce.output.fileoutputformat.compress", False) sparkConf.set("mapreduce.map.output.compress", False) sparkConf.set("prod.gold.jdbcuri", "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true") sparkConf.set("prod.mimas.jdbcuri", "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true") sparkConf.set("prod.gaia.jdbcuri", "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true") sparkConf.set("prod.tidb.jdbcuri", "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") sparkConf.set("prod.jerry.jdbcuri", "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000") sparkConf.set("prod.tidb.database", "jerry_prod") sparkConf.setAppName("tractate_analysis_in_7000") spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") .config("spark.tispark.pd.addresses", "172.16.40.170:2379").enableHiveSupport().getOrCreate()) spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar") spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar") spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'") spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'") sql = """ select card_id from strategy_content_exposure_index where card_content_type="user_post" and preciseexposure_num>=50 and ctr>=0.05 and avg_page_stay>=20 and create_day="2020-09-17" """ second_demands_count_dict, tags_v3_count_dict, second_demands_card_id_list,tags_v3_card_id_list,second_demands_tractate_dict,tags_v3_tractate_dict = get_user_post_from_mysql(sql) print(second_demands_count_dict,tags_v3_count_dict,second_demands_card_id_list,tags_v3_card_id_list) time.sleep(20) t = 1 day_num = 0 - t now = (datetime.datetime.now() + datetime.timedelta(days=day_num)) last_30_day_str = (now + datetime.timedelta(days=-31)).strftime("%Y%m%d") today_str = now.strftime("%Y%m%d") today_str_format = now.strftime("%Y-%m-%d") yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d") yesterday_str_format = (now + datetime.timedelta(days=-1)).strftime("%Y-%m-%d") one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d") device_id_dict = {} huidu_device_id_sql = r""" select t2.device_id from (select distinct(first_device) as device_id from online.ml_user_history_detail where partition_date = {today_str} and substr(md5(first_device),-1) in ('8', '9', 'a', 'b') and last_active_date >= {last_30_day_str}) t2 LEFT JOIN ( select distinct device_id from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D where PARTITION_DAY = '{start_date}' AND is_abnormal_device = 'true') dev on t2.device_id=dev.device_id WHERE dev.device_id is null """.format(today_str=today_str,last_30_day_str=last_30_day_str) print(huidu_device_id_sql) huidu_device_id_df = spark.sql(huidu_device_id_sql) huidu_device_id_df.createOrReplaceTempView("dev_view") huidu_device_id_df.show(1) sql_res = huidu_device_id_df.collect() # 设备id for device_id in sql_res: device_id_dict[device_id.device_id] = 1 # 设备对应画像数量 second_demands_tag_count = {} projects_demands_tag_count = {} # 设备id对应的画像 second_demands_tag_dict = {} projects_demands_tag_dict = {} total_tag_count = {} total_tag_count_pro = {} temp_null_count = 0 for redis_count, device_id in enumerate(device_id_dict): # if redis_count >= 50:break second_demands = [] projects = [] total_answer_content_num = 0 total_tractate_content_num = 0 total_diary_content_num = 0 # print(sql_res) try: res = get_user_portrait_tag3_from_redis(device_id) except: continue if res.get("second_demands"): second_demands = res.get("second_demands") # print(second_demands) for tag in second_demands: if tag in second_demands_tag_count: second_demands_tag_count[tag] += 1 else: second_demands_tag_count[tag] = 1 if tag in second_demands_count_dict: total_tractate_content_num += second_demands_count_dict[tag] second_demands_tag_dict[device_id] = second_demands if res.get("projects"): projects = res.get("projects") # print(projects) for tag in projects: if tag in projects_demands_tag_count: projects_demands_tag_count[tag] += 1 else: projects_demands_tag_count[tag] = 1 if tag in tags_v3_count_dict: total_tractate_content_num += tags_v3_count_dict[tag] projects_demands_tag_dict[device_id] = projects # print(total_answer_content_num, total_tractate_content_num, total_diary_content_num) tmp_count_num = 0 # 7000片内容的曝光 exposure_sql = """ SELECT cl_id, card_id, count(distinct app_session_id) as session_pv FROM ( SELECT cl_id, card_id, app_session_id from online.ml_community_precise_exposure_detail WHERE partition_date>={today_str} AND action in ('page_precise_exposure','home_choiceness_card_exposure') --7745版本action改为page_precise_exposure AND is_exposure = '1' ----精准曝光 AND page_name ='home' AND tab_name = '精选' AND (transaction_type in ('-1','smr','hotspot','pgc','newdata','hotspot_feed','aistragegy','excestragegy','FIXEDSTRATEGY','FIXEDSTRATEGY_VIDEO','high_quality_fmctr') or transaction_type like '%ctr' or transaction_type like '%cvr' or transaction_type like 'deeplink%') AND card_content_type in ('user_post') AND transaction_type in ('high_quality_fmctr') group by cl_id, transaction_type, card_id, app_session_id ) a group by cl_id,card_id """.format(today_str="20200918") print(exposure_sql) exposure_df = spark.sql(exposure_sql) # exposure_df.createOrReplaceTempView("exposure_df") exposure_df.show(1) sql_res = exposure_df.collect() session_pv_all = 0 card_id_set = set() second_demands_id_count = {} projects_demands_id_count = {} baoguang_dict = {} # 遍历card_id 找出对应的device_id是否在灰度里 # 找出card_id 对应帖子的标签 并分类汇总 得到 标签-计数字段 for res in sql_res: # partition_date = res.partition_date # print(res) cl_id = res.cl_id card_id = res.card_id if cl_id in device_id_dict: # print("has device") # print(type(card_id),card_id) # print("has card_id") session_pv = res.session_pv # card_id_set.update(card_id) if cl_id in second_demands_tag_dict: if int(card_id) in second_demands_tractate_dict: # print(cl_id, second_demands_tag_dict[card_id]) for tag_id in second_demands_tractate_dict[int(card_id)]: if tag_id in second_demands_id_count: second_demands_id_count[tag_id][int(card_id)] = 1 else: second_demands_id_count[tag_id] = {} second_demands_id_count[tag_id][int(card_id)] = 1 # if tag_id in baoguang_dict: # baoguang_dict[tag_id] += session_pv # else: # baoguang_dict[tag_id] = session_pv if cl_id in projects_demands_tag_dict: if int(card_id) in tags_v3_tractate_dict: # print(cl_id,projects_demands_tag_dict[cl_id]) for tag_id in tags_v3_tractate_dict[int(card_id)]: if tag_id in projects_demands_id_count: projects_demands_id_count[tag_id][int(card_id)] = 1 else: projects_demands_id_count[tag_id] = {} projects_demands_id_count[tag_id][int(card_id)] = 1 final_projects_list = [] second_demands_list = [] print(projects_demands_id_count) time.sleep(10) print(second_demands_id_count) time.sleep(10) for tag_id in second_demands_tag_count: temp_dict = { "tag_name": tag_id, "device_count": second_demands_tag_count[tag_id], "tractate_count": second_demands_count_dict.get(tag_id), "exporsure_count": len(second_demands_id_count[tag_id]) if second_demands_id_count.get(tag_id) else 0, } print(temp_dict['tag_name'], temp_dict['device_count'], temp_dict['tractate_count'], temp_dict['exporsure_count']) # print(temp_dict) # if temp_dict['tractate_count'] < temp_dict['exporsure_count']: # print("error" , second_demands_id_count[tag_id]) # print(1) # for tag_id in second_demands_count_dict: # if tag_id not in second_demands_tag_count: # temp_dict = { # "tag_name": tag_id, # "device_count": second_demands_tag_count.get(tag_id), # "tractate_count": second_demands_count_dict.get(tag_id), # "exporsure_count": 0, # # } # print(temp_dict) print("----------------------------------------------") for tag_id in projects_demands_tag_count: temp_dict = { "tag_name": tag_id, "device_count": projects_demands_tag_count[tag_id], "tractate_count": tags_v3_count_dict.get(tag_id), "exporsure_count": len(projects_demands_id_count[tag_id]) if projects_demands_id_count.get(tag_id) else 0, } # if temp_dict['tractate_count'] < temp_dict['exporsure_count']: # print("error" , projects_demands_id_count[tag_id]) print(temp_dict['tag_name'],temp_dict['device_count'],temp_dict['tractate_count'],temp_dict['exporsure_count']) # print(2) # for tag_id in tags_v3_count_dict: # if tag_id not in projects_demands_tag_count: # temp_dict = { # "tag_name": tag_id, # "device_count": projects_demands_tag_count.get(tag_id), # "tractate_count": tags_v3_count_dict.get(tag_id), # "exporsure_count": 0, # # } # print(temp_dict['tag_name'],temp_dict['device_count'],temp_dict['tractate_count'],temp_dict['exporsure_count'])