Commit 10300bfc authored by 宋柯's avatar 宋柯

Merge remote-tracking branch 'origin/master'

parents 71b4b9dc 1552b35d
This diff is collapsed.
import hashlib
import json
import pymysql
import xlwt, datetime
import redis
# from pyhive import hive
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch_7 import Elasticsearch
from elasticsearch_7.helpers import scan
import sys
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
# from pyspark.sql.functions import lit
# import pytispark.pytispark as pti
def con_sql(sql):
# 从数据库的表里获取数据
db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm',
db='doris_olap')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
startTime = time.time()
def getSpark():
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
sparkConf.set("spark.tispark.plan.allow_index_read", True)
sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
sparkConf.set("mapreduce.map.output.compress", False)
sparkConf.set("prod.gold.jdbcuri",
"jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true")
sparkConf.set("prod.mimas.jdbcuri",
"jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true")
sparkConf.set("prod.gaia.jdbcuri",
"jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true")
sparkConf.set("prod.tidb.jdbcuri",
"jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
# sparkConf.set("prod.jerry.jdbcuri",
# "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
# sparkConf.set("prod.tidb.database", "jerry_prod")
spark = (
SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName(
"search_meigou_ctr").enableHiveSupport().getOrCreate())
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
return spark
def getSql(startDay, endDay):
sql = """
--push接收数据
SELECT partition_day
,push_type
,count(a.device_id)
,count(a.msg_id)
,count(distinct a.device_id)
,count(distinct a.msg_id)
FROM
(
select partition_day
,device_id,msg_id
from bl.bl_et_bg_trackingpushlog_inc_d
where partition_day=regexp_replace(DATE_SUB(current_date,1) ,'-','')
group by partition_day,device_id,msg_id
)a
JOIN
(
select msg_id,push_type
from online.tl_hdfs_push2_new_view --增量表
where partition_date=regexp_replace(DATE_SUB(current_date,1) ,'-','')
group by msg_id,push_type
union all
SELECT msg_id,regexp_replace(labels['event'], '\\s+', '') AS push_type
FROM online.tl_hdfs_push2_task_view
WHERE partition_date=regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND labels['event_type'] = 'push'
group by msg_id,regexp_replace(labels['event'], '\\s+', '')
)c
on a.msg_id=c.msg_id
where push_type in ('101','102')
group by partition_day
,push_type
"""
return sql
if __name__ == '__main__':
if len(sys.argv) < 2:
print('请输入day', flush=True)
sys.exit(1)
day = int(sys.argv[1])
now = (datetime.datetime.now() + datetime.timedelta(days=0))
endDay = now.strftime("%Y%m%d")
startDay = (now + datetime.timedelta(days=-day)).strftime("%Y%m%d")
print(startDay,endDay)
sql = getSql(startDay,endDay)
spark = getSpark()
df = spark.sql(sql)
# spam_pv_df.createOrReplaceTempView("dev_view")
df.show(1)
sql_res = df.collect()
for res in sql_res:
print(res)
# device_os_type = res.device_os_type
# active_type = res.active_type
# day_id = res.day_id
# all_receive_num = res.all_receive_num
#
# diary_click_num = res.diary_click_num
# tractate_click_num = res.tractate_click_num
# answer_click_num = res.answer_click_num
#
# diary_receive_num = res.diary_receive_num
# tractate_receive_num = res.tractate_receive_num
# answer_receive_num = res.answer_receive_num
#
# diary_push_num = push_count_d['diary'].setdefault(day_id,0)
# tractate_push_num = push_count_d['tractate'].setdefault(day_id,0)
# answer_push_num = push_count_d['answer'].setdefault(day_id,0)
#
#
# diary_click_rate = round(diary_click_num/diary_receive_num,2) if diary_receive_num!=0 else 0
# tractate_click_rate = round(tractate_click_num/tractate_receive_num,2) if tractate_receive_num!=0 else 0
# answer_click_rate = round(answer_click_num/answer_receive_num,2) if answer_receive_num!=0 else 0
#
# diary_cover_rate = round(diary_receive_num/all_receive_num,2) if all_receive_num!=0 else 0
# tractate_cover_rate = round(tractate_receive_num/all_receive_num,2) if all_receive_num!=0 else 0
# answer_cover_rate = round(answer_receive_num/all_receive_num,2) if all_receive_num!=0 else 0
#
# diary_receive_rate = round(diary_receive_num/diary_push_num,2) if diary_push_num!=0 else 0
# tractate_receive_rate = round(tractate_receive_num/tractate_push_num,2) if tractate_push_num!=0 else 0
# answer_receive_rate = round(answer_receive_num/answer_push_num,2) if answer_push_num!=0 else 0
#
# pid = hashlib.md5((day_id + device_os_type + active_type).encode("utf8")).hexdigest()
#
# instert_sql = """replace into ctr_push_strategy(
# day_id,device_os_type,active_type,pid,diary_click_num,tractate_click_num,answer_click_num,diary_receive_num,tractate_receive_num,answer_receive_num
# ,diary_click_rate,tractate_click_rate,answer_receive_rate,diary_cover_rate,tractate_cover_rate,answer_cover_rate,cover_rate
# ,diary_push_num,tractate_push_num,answer_push_num,all_receive_num)
# VALUES('{day_id}','{device_os_type}','{active_type}','{pid}',{diary_click_num},{tractate_click_num},{answer_click_num},{diary_receive_num},{tractate_receive_num},{answer_receive_num}
# ,{diary_click_rate},{tractate_click_rate},{answer_receive_rate},{diary_cover_rate},{tractate_cover_rate},{answer_cover_rate},{diary_push_num},{tractate_push_num},{answer_push_num},{all_receive_num});"""\
# .format(day_id=day_id,device_os_type=device_os_type,active_type=active_type,pid=pid,diary_click_num=diary_click_num,tractate_click_num=tractate_click_num,diary_receive_num=diary_receive_num,tractate_receive_num=tractate_receive_num
# ,diary_click_rate=diary_click_rate,tractate_click_rate=tractate_click_rate,answer_receive_rate=answer_receive_rate,diary_cover_rate=diary_cover_rate,tractate_cover_rate=tractate_cover_rate,answer_cover_rate=answer_cover_rate
# ,diary_push_num=diary_push_num,tractate_push_num=tractate_push_num,answer_push_num=answer_push_num,all_receive_num=all_receive_num)
# print(instert_sql)
# cursor.execute("set names 'UTF8'")
# db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm',
# db='doris_olap')
# cursor = db.cursor()
# res = cursor.execute(instert_sql)
# db.commit()
# print(res)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment