Commit bee817f5 authored by litaolemo's avatar litaolemo

update

parent 120af132
# -*- coding:UTF-8 -*-
# @Time : 2020/9/25 16:30
# @File : out_put_user_post_each_strategy.py
# @email : litao@igengmei.com
# @author : litao
import hashlib
import json
import pymysql
import xlwt, datetime
import redis
from meta_base_code.utils.func_from_redis_get_portrait import user_portrait_scan_info,get_user_portrait_tag3_from_redis
from meta_base_code.utils.func_from_es_get_article import get_es_article_num,get_user_post_from_mysql
# from pyhive import hive
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch_7 import Elasticsearch
from elasticsearch_7.helpers import scan
import sys
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
"""
限定尾号的7000篇内容 曝光了哪些
用设备找到用户画像 二级诉求和项目分开做
用画像的标签 在7000篇内容中相关 候选池阅读完
标签在第一列
设备数
曝光数
曝光数
精准曝光占比
内容池数量
每个画像多少帖子
一共曝光了多少个帖子
"""
import hashlib
import json
import pymysql
import xlwt, datetime
import redis
# from pyhive import hive
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch_7 import Elasticsearch
from elasticsearch_7.helpers import scan
import sys
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
# from pyspark.sql.functions import lit
# import pytispark.pytispark as pti
from utils.func_from_es_get_article import get_user_post_from_mysql
from meta_base_code.utils.func_from_redis_get_portrait import user_portrait_scan_info, get_user_portrait_tag3_from_redis
from meta_base_code.utils.func_from_es_get_article import get_es_article_num, get_user_post_from_mysql
def con_sql(sql):
# 从数据库的表里获取数据
db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
db='jerry_prod')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
startTime = time.time()
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
sparkConf.set("spark.tispark.plan.allow_index_read", True)
sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
sparkConf.set("mapreduce.map.output.compress", False)
sparkConf.set("prod.gold.jdbcuri",
"jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true")
sparkConf.set("prod.mimas.jdbcuri",
"jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true")
sparkConf.set("prod.gaia.jdbcuri",
"jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true")
sparkConf.set("prod.tidb.jdbcuri",
"jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.jerry.jdbcuri",
"jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
sparkConf.set("prod.tidb.database", "jerry_prod")
sparkConf.setAppName("test")
spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.config("spark.tispark.pd.addresses", "172.16.40.170:2379").enableHiveSupport().getOrCreate())
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
sql = """
select card_id from strategy_content_exposure_index where card_content_type="user_post" and preciseexposure_num>=50 and ctr>=0.05 and avg_page_stay>=20 and create_day="2020-09-17"
"""
second_demands_count_dict, tags_v3_count_dict, second_demands_card_id_list, tags_v3_card_id_list, second_demands_tractate_dict, tags_v3_tractate_dict = get_user_post_from_mysql(
sql)
print(second_demands_count_dict, tags_v3_count_dict, second_demands_card_id_list, tags_v3_card_id_list)
time.sleep(20)
t = 1
day_num = 0 - t
now = (datetime.datetime.now() + datetime.timedelta(days=day_num))
last_30_day_str = (now + datetime.timedelta(days=-31)).strftime("%Y%m%d")
today_str = now.strftime("%Y%m%d")
today_str_format = now.strftime("%Y-%m-%d")
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
yesterday_str_format = (now + datetime.timedelta(days=-1)).strftime("%Y-%m-%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
device_id_dict = {}
huidu_device_id_sql = r"""
select t2.device_id from
(select distinct(first_device) as device_id from online.ml_user_history_detail where partition_date = {today_str} and substr(md5(first_device),-1) in ('8', '9', 'a', 'b') and last_active_date >= {last_30_day_str}) t2
LEFT JOIN
(
select distinct device_id
from ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除)
WHERE partition_day='{today_str}'
union all
select distinct device_id
from dim.dim_device_user_staff --去除内网用户
)spam_pv
on spam_pv.device_id=t2.device_id
LEFT JOIN
(
SELECT partition_date,device_id
FROM
(--找出user_id当天活跃的第一个设备id
SELECT user_id,partition_date,
if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
FROM online.ml_user_updates
WHERE partition_date='{today_str}'
)t1
JOIN
( --医生账号
SELECT distinct user_id
FROM online.tl_hdfs_doctor_view
WHERE partition_date = '{today_str}'
--马甲账号/模特用户
UNION ALL
SELECT user_id
FROM ml.ml_c_ct_ui_user_dimen_d
WHERE partition_day = '{today_str}'
AND (is_puppet = 'true' or is_classifyuser = 'true')
UNION ALL
--公司内网覆盖用户
select distinct user_id
from dim.dim_device_user_staff
UNION ALL
--登陆过医生设备
SELECT distinct t1.user_id
FROM
(
SELECT user_id, v.device_id as device_id
FROM online.ml_user_history_detail
LATERAL VIEW EXPLODE(device_history_list) v AS device_id
WHERE partition_date = '{today_str}'
) t1
JOIN
(
SELECT device_id
FROM online.ml_device_history_detail
WHERE partition_date = '{today_str}'
AND is_login_doctor = '1'
) t2
ON t1.device_id = t2.device_id
)t2
on t1.user_id=t2.user_id
group by partition_date,device_id
)dev
on t2.device_id=dev.device_id
WHERE spam_pv.device_id IS NULL
and dev.device_id is null
""".format(today_str=today_str, last_30_day_str=last_30_day_str)
print(huidu_device_id_sql)
huidu_device_id_df = spark.sql(huidu_device_id_sql)
huidu_device_id_df.createOrReplaceTempView("dev_view")
huidu_device_id_df.show(1)
sql_res = huidu_device_id_df.collect()
# 设备id
for device_id in sql_res:
device_id_dict[device_id.device_id] = 1
# 设备对应画像数量
second_demands_tag_count = {}
projects_demands_tag_count = {}
# 设备id对应的画像
second_demands_tag_dict = {}
projects_demands_tag_dict = {}
total_tag_count = {}
total_tag_count_pro = {}
temp_null_count = 0
for redis_count, device_id in enumerate(device_id_dict):
# if redis_count >= 50:break
second_demands = []
projects = []
total_answer_content_num = 0
total_tractate_content_num = 0
total_diary_content_num = 0
# print(sql_res)
try:
res = get_user_portrait_tag3_from_redis(device_id)
except:
continue
if res.get("second_demands"):
second_demands = res.get("second_demands")
# print(second_demands)
for tag in second_demands:
if tag in second_demands_tag_count:
second_demands_tag_count[tag] += 1
else:
second_demands_tag_count[tag] = 1
if tag in second_demands_count_dict:
total_tractate_content_num += second_demands_count_dict[tag]
second_demands_tag_dict[device_id] = second_demands
if res.get("projects"):
projects = res.get("projects")
# print(projects)
for tag in projects:
if tag in projects_demands_tag_count:
projects_demands_tag_count[tag] += 1
else:
projects_demands_tag_count[tag] = 1
if tag in tags_v3_count_dict:
total_tractate_content_num += tags_v3_count_dict[tag]
projects_demands_tag_dict[device_id] = projects
# print(total_answer_content_num, total_tractate_content_num, total_diary_content_num)
tmp_count_num = 0
# 7000片内容的曝光
exposure_sql = """
SELECT
cl_id,
card_id,
transaction_type,
count(distinct app_session_id) as session_pv
FROM
(
SELECT
cl_id,
card_id,
app_session_id,
transaction_type
from online.ml_community_precise_exposure_detail
WHERE partition_date>= '${today_str}'
AND action in ('page_precise_exposure','home_choiceness_card_exposure') --7745版本action改为page_precise_exposure
AND is_exposure = '1' ----精准曝光
AND page_name ='home'
AND tab_name = '精选'
AND (transaction_type in ('-1','smr','hotspot','pgc','newdata','hotspot_feed','aistragegy','excestragegy','FIXEDSTRATEGY','FIXEDSTRATEGY_VIDEO','high_quality_fmctr')
or transaction_type like '%ctr' or transaction_type like '%cvr' or transaction_type like 'deeplink%')
AND card_content_type in ('user_post')
AND transaction_type in ('high_quality_fmctr')
group by
cl_id,
transaction_type,
card_id,
app_session_id
) a group by cl_id,card_id,transaction_type
""".format(today_str="20200918")
print(exposure_sql)
exposure_df = spark.sql(exposure_sql)
# exposure_df.createOrReplaceTempView("exposure_df")
exposure_df.show(1)
sql_res = exposure_df.collect()
session_pv_all = 0
card_id_set = set()
second_demands_id_count = {}
projects_demands_id_count = {}
baoguang_dict = {}
# 遍历card_id 找出对应的device_id是否在灰度里
# 找出card_id 对应帖子的标签 并分类汇总 得到 标签-计数字段
for res in sql_res:
# partition_date = res.partition_date
# print(res)
cl_id = res.cl_id
card_id = res.card_id
if cl_id in device_id_dict:
# print("has device")
# print(type(card_id),card_id)
# print("has card_id")
session_pv = res.session_pv
# card_id_set.update(card_id)
if cl_id in second_demands_tag_dict:
if int(card_id) in second_demands_tractate_dict:
# print(cl_id, second_demands_tag_dict[card_id])
for tag_id in second_demands_tractate_dict[int(card_id)]:
if tag_id in second_demands_id_count:
second_demands_id_count[tag_id][int(card_id)] = 1
else:
second_demands_id_count[tag_id] = {}
second_demands_id_count[tag_id][int(card_id)] = 1
# if tag_id in baoguang_dict:
# baoguang_dict[tag_id] += session_pv
# else:
# baoguang_dict[tag_id] = session_pv
if cl_id in projects_demands_tag_dict:
if int(card_id) in tags_v3_tractate_dict:
# print(cl_id,projects_demands_tag_dict[cl_id])
for tag_id in tags_v3_tractate_dict[int(card_id)]:
if tag_id in projects_demands_id_count:
projects_demands_id_count[tag_id][int(card_id)] = 1
else:
projects_demands_id_count[tag_id] = {}
projects_demands_id_count[tag_id][int(card_id)] = 1
final_projects_list = []
second_demands_list = []
print(projects_demands_id_count)
time.sleep(10)
print(second_demands_id_count)
time.sleep(10)
for tag_id in second_demands_tag_count:
temp_dict = {
"tag_name": tag_id,
"device_count": second_demands_tag_count[tag_id],
"tractate_count": second_demands_count_dict.get(tag_id),
"exporsure_count": len(second_demands_id_count[tag_id]) if second_demands_id_count.get(tag_id) else 0,
}
print(temp_dict['tag_name'], temp_dict['device_count'], temp_dict['tractate_count'], temp_dict['exporsure_count'])
# print(temp_dict)
# if temp_dict['tractate_count'] < temp_dict['exporsure_count']:
# print("error" , second_demands_id_count[tag_id])
# print(1)
# for tag_id in second_demands_count_dict:
# if tag_id not in second_demands_tag_count:
# temp_dict = {
# "tag_name": tag_id,
# "device_count": second_demands_tag_count.get(tag_id),
# "tractate_count": second_demands_count_dict.get(tag_id),
# "exporsure_count": 0,
#
# }
# print(temp_dict)
print("----------------------------------------------")
for tag_id in projects_demands_tag_count:
temp_dict = {
"tag_name": tag_id,
"device_count": projects_demands_tag_count[tag_id],
"tractate_count": tags_v3_count_dict.get(tag_id),
"exporsure_count": len(projects_demands_id_count[tag_id]) if projects_demands_id_count.get(tag_id) else 0,
}
# if temp_dict['tractate_count'] < temp_dict['exporsure_count']:
# print("error" , projects_demands_id_count[tag_id])
print(temp_dict['tag_name'], temp_dict['device_count'], temp_dict['tractate_count'], temp_dict['exporsure_count'])
# print(2)
# for tag_id in tags_v3_count_dict:
# if tag_id not in projects_demands_tag_count:
# temp_dict = {
# "tag_name": tag_id,
# "device_count": projects_demands_tag_count.get(tag_id),
# "tractate_count": tags_v3_count_dict.get(tag_id),
# "exporsure_count": 0,
#
# }
# print(temp_dict['tag_name'],temp_dict['device_count'],temp_dict['tractate_count'],temp_dict['exporsure_count'])
\ No newline at end of file
...@@ -80,16 +80,20 @@ spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDF ...@@ -80,16 +80,20 @@ spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDF
sql_search_ctr = r""" sql_search_ctr = r"""
SELECT SELECT
count(1) cl_id,
from card_id,
transaction_type,
count(distinct app_session_id) as session_pv
FROM
( (
SELECT SELECT
cl_id, cl_id,
card_id, card_id,
app_session_id app_session_id,
transaction_type
from online.ml_community_precise_exposure_detail from online.ml_community_precise_exposure_detail
WHERE partition_date>='{today_str}' WHERE partition_date>= '${today_str}'
AND action in ('page_precise_exposure','home_choiceness_card_exposure') --7745版本action改为page_precise_exposure AND action in ('page_precise_exposure','home_choiceness_card_exposure') --7745版本action改为page_precise_exposure
AND is_exposure = '1' ----精准曝光 AND is_exposure = '1' ----精准曝光
AND page_name ='home' AND page_name ='home'
...@@ -103,7 +107,7 @@ SELECT ...@@ -103,7 +107,7 @@ SELECT
transaction_type, transaction_type,
card_id, card_id,
app_session_id app_session_id
) ) a group by cl_id,card_id,transaction_type
""".format(today_str='20200922') """.format(today_str='20200922')
print(sql_search_ctr) print(sql_search_ctr)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment