1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# -*- coding:UTF-8 -*-
# @Time : 2020/8/31 13:41
# @File : advertisement_strategy_d.py
# @email : litao@igengmei.com
# @author : litao
import hashlib
import json
import pymysql
import xlwt, datetime
import redis
# from pyhive import hive
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch_7 import Elasticsearch
from elasticsearch_7.helpers import scan
import sys
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import lit
import pytispark.pytispark as pti
db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
db='jerry_prod')
cursor = db.cursor()
def con_sql(sql):
# 从数据库的表里获取数据
db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
db='jerry_prod')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
startTime = time.time()
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
sparkConf.set("spark.tispark.plan.allow_index_read", True)
sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
sparkConf.set("mapreduce.map.output.compress", False)
sparkConf.set("prod.gold.jdbcuri",
"jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true")
sparkConf.set("prod.mimas.jdbcuri",
"jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true")
sparkConf.set("prod.gaia.jdbcuri",
"jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true")
sparkConf.set("prod.tidb.jdbcuri",
"jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.jerry.jdbcuri",
"jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
sparkConf.set("prod.tidb.database", "jerry_prod")
spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName(
"LR PYSPARK TEST").enableHiveSupport().getOrCreate())
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
task_list = []
task_days = 1
now = datetime.datetime.now()
partition_date_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
for t in range(0, task_days):
day_num = 0 - t
now = (datetime.datetime.now() + datetime.timedelta(days=day_num))
last_30_day_str = (now + datetime.timedelta(days=-30)).strftime("%Y%m%d")
today_str = now.strftime("%Y%m%d")
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
# CPT日均点击
# CPT_daily_click_sql = """SELECT partition_date,count(1) as pv
# FROM online.bl_hdfs_maidian_updates
# WHERE partition_date >= '{start_date}'
# and partition_date < '{end_date}'
# AND ((ACTION = 'search_result_welfare_click_item' AND PAGE_NAME = 'search_result_welfare' AND PARAMS['transaction_type'] = 'advertise')
# OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'category' AND PARAMS['transaction_type'] = 'operating' AND PARAMS['tab_name'] = 'service')
# OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'welfare_home_list_item' and PARAMS['transaction_type'] = 'advertise')
# OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'welfare_list' AND PARAMS['transaction_type'] = 'advertise')
# OR (ACTION = 'on_click_card' AND PARAMS['card_content_type'] = 'service' AND PARAMS['page_name'] IN ('new_sign','search_result_welfare','category','welfare_home_list_item','welfare_list') AND PARAMS['transaction_type'] = 'advertise'))
# group BY partition_date""".format(start_date=yesterday_str, end_date=today_str)
# CPT_daily_click_df = spark.sql(CPT_daily_click_sql)
# # CPT_daily_click_df.createOrReplaceTempView("cpt_daily_click_df")
# sql_res = CPT_daily_click_df.collect()
# for res in sql_res:
# print(res)
#
# print("0-----------------------------------------------------------------------------")
# # 商详页PV
# bus_detail_pv_sql = """SELECT
# a2.partition_date,count(1) welfare_pv
# FROM
# (
# SELECT cl_id,partition_date
# FROM online.bl_hdfs_maidian_updates
# WHERE partition_date >='{start_date}'and partition_date < '{end_date}'
# AND action='page_view'
# AND params['page_name'] = 'welfare_detail'
# )a1
# JOIN
# (
# SELECT device_id,partition_date
# from online.ml_device_day_active_status
# WHERE partition_date >='{start_date}'and partition_date < '{end_date}'
# AND active_type in ('1','2','4')
# )a2
# on a2.device_id = a1.cl_id
# AND a2.partition_date=a1.partition_date
# group by a2.partition_date""".format(start_date=yesterday_str, end_date=today_str, )
# bus_detail_pv_df = spark.sql(bus_detail_pv_sql)
# # bus_detail_pv_df.createOrReplaceTempView("bus_detail_pv_df")
# sql_res = bus_detail_pv_df.collect()
# for res in sql_res:
# print(res)
# print("1-----------------------------------------------------------------------------")
# cpc当日预算(有效口径)
cpc_budget_sql = """SELECT day_id,sum(budget) as budget
FROM
(
SELECT T1.day_id,T1.merchant_doctor_id,case when merchant_budget>=tot_service_budget then tot_service_budget else merchant_budget end as budget
FROM
(
SELECT
substr(clicklog.create_time,1,10) AS day_id
,clicklog.merchant_doctor_id
,max(merchant_budget) as merchant_budget
FROM
(
SELECT id,promote_id,price,service_budget,merchant_budget,merchant_doctor_id,create_time,recharge
FROM online.tl_hdfs_cpc_clicklog_view
WHERE partition_date='{partition_date}'
AND regexp_replace(substr(create_time,1,10),'-','')>= '{start_date}'
AND regexp_replace(substr(create_time,1,10),'-','')<'{end_date}'
)clicklog
group by substr(clicklog.create_time,1,10),clicklog.merchant_doctor_id
)T1
LEFT JOIN
(
SELECT
day_id
,merchant_doctor_id
,sum(service_budget) as tot_service_budget
FROM
(
SELECT
substr(clicklog.create_time,1,10) AS day_id
,clicklog.merchant_doctor_id,clicklog.service_id
,max(service_budget) as service_budget
FROM
(
SELECT id,promote_id,price,service_budget,merchant_budget,merchant_doctor_id,service_id,create_time
FROM online.tl_hdfs_cpc_clicklog_view
WHERE partition_date='{partition_date}'
AND regexp_replace(substr(create_time,1,10),'-','')>= '{start_date}'
AND regexp_replace(substr(create_time,1,10),'-','')<'{end_date}'
)clicklog
GROUP BY substr(clicklog.create_time,1,10),clicklog.merchant_doctor_id,clicklog.service_id
)service_budget
GROUP BY day_id,merchant_doctor_id
)T2
ON T1.day_id=T2.day_id
AND T1.merchant_doctor_id=T2.merchant_doctor_id
)T
GROUP BY day_id
""".format(start_date=yesterday_str, end_date=today_str, partition_date=partition_date_str)
cpc_budget_df = spark.sql(cpc_budget_sql)
cpc_budget_df.show(1, False)
sql_res = cpc_budget_df.collect()
for res in sql_res:
print(res)
print("2-----------------------------------------------------------------------------")
cpc_income_total_consume_sql = """
select partition_day,
sum(case when advertise_type = 'cpc' AND advertise_business_type in('service') and advertise_calculate_type='cpc_log' then cpc_click_num end) cpc_click_num,
sum(case when advertise_type = 'cpc' AND advertise_business_type in('service') and advertise_calculate_type='cpc_flownext' then proportion_expend_amount end) cpc_proportion_expend_amount,
sum(case when advertise_type = 'cpc' AND advertise_business_type in('service') and advertise_calculate_type='cpc_flownext' then proportion_expend_recharge_amount end) cpc_proportion_expend_recharge_amount,
SUM(CASE
WHEN advertise_type = 'cpc' AND advertise_calculate_type = 'cpc_flownext' THEN
proportion_expend_amount
WHEN advertise_type = 'cpt' AND advertise_calculate_type = 'cpt_schedule' THEN
proportion_expend_amount
WHEN advertise_type IN ('browse', 'message', 'valueadded','rechargededuction') THEN
proportion_expend_amount
WHEN advertise_type = 'adjustment' AND advertise_calculate_type ='adjustment_flow' THEN
proportion_expend_amount
ELSE
0
END) tol_proportion_expend_amount
from ml.ml_c_ct_mc_merchantadclassify_indic_d
where partition_day>='{start_date}' AND partition_day <'{end_date}'
group by partition_day
""".format(start_date=yesterday_str, end_date=today_str)
cpc_income_total_consume_df = spark.sql(cpc_income_total_consume_sql)
cpc_income_total_consume_df.show(1, False)
cpc_income_total_consume_df_res = cpc_income_total_consume_df.collect()
for res in cpc_income_total_consume_df_res:
print(res)
print("3-----------------------------------------------------------------------------")
# for res in sql_res:
# # print(res)
# day_id = res.day_id
# device_os_type = res.device_os_type
# active_type = res.active_type
# grey_type = res.grey_type
# page_name = res.page_name
# content_pv = res.content_pv
# content_uv = res.content_uv
# wel_exp_pv = res.wel_exp_pv
# content_exp_pv = res.content_exp_pv
# meigou_ctr = res.meigou_ctr
# if not meigou_ctr: meigou_ctr = 0
# grey_meigou_ctr = res.grey_meigou_ctr
# neirong_ctr = res.neirong_ctr
# if not neirong_ctr: neirong_ctr = 0
# grey_neirong_ctr = res.grey_neirong_ctr
#
# wel_click_pv = res.wel_click_pv
# content_click_pv = res.content_click_pv
# slide_wel_click_pv = res.slide_wel_click_pv
# self_wel_click_pv = res.self_wel_click_pv
# partition_day = res.PARTITION_DAY
# pid = hashlib.md5((day_id + device_os_type + active_type + grey_type + page_name).encode("utf8")).hexdigest()
# instert_sql = """replace into conent_detail_page_grayscale_ctr(
# day_id,device_os_type,active_type,grey_type,page_name,content_pv,content_uv,wel_exp_pv,
# content_exp_pv,wel_click_pv,content_click_pv,slide_wel_click_pv,self_wel_click_pv,partition_day,pid,meigou_ctr,neirong_ctr,
# grey_meigou_ctr,grey_neirong_ctr) VALUES('{day_id}','{device_os_type}','{active_type}','{grey_type}','{page_name}',{content_pv},{content_uv},
# {wel_exp_pv},{content_exp_pv},{wel_click_pv},{content_click_pv},{slide_wel_click_pv},{self_wel_click_pv},'{partition_day}','{pid}',{meigou_ctr},{neirong_ctr},{grey_meigou_ctr},{grey_neirong_ctr});""".format(
# day_id=day_id, device_os_type=device_os_type, active_type=active_type, grey_type=grey_type,
# page_name=page_name,
# content_pv=content_pv, content_uv=content_uv, wel_exp_pv=wel_exp_pv, content_exp_pv=content_exp_pv,
# wel_click_pv=wel_click_pv,
# content_click_pv=content_click_pv, slide_wel_click_pv=slide_wel_click_pv,
# self_wel_click_pv=self_wel_click_pv, meigou_ctr=meigou_ctr, neirong_ctr=neirong_ctr,
# partition_day=partition_day, pid=pid, grey_neirong_ctr=grey_neirong_ctr, grey_meigou_ctr=grey_meigou_ctr
# )
# print(instert_sql)
# # cursor.execute("set names 'UTF8'")
# res = cursor.execute(instert_sql)
# db.commit()
# print(res)
# # cursor.executemany()
# db.close()