Commit aefa6cd0 authored by litaolemo's avatar litaolemo

update

parent 1b0e1f88
...@@ -24,9 +24,10 @@ from pyspark.sql.functions import lit ...@@ -24,9 +24,10 @@ from pyspark.sql.functions import lit
import pytispark.pytispark as pti import pytispark.pytispark as pti
db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy', db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
db='jerry_prod') db='jerry_prod')
cursor = db.cursor() cursor = db.cursor()
def con_sql(sql): def con_sql(sql):
# 从数据库的表里获取数据 # 从数据库的表里获取数据
...@@ -74,7 +75,6 @@ spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF ...@@ -74,7 +75,6 @@ spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'") spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
task_list = [] task_list = []
task_days = 1 task_days = 1
now = datetime.datetime.now() now = datetime.datetime.now()
...@@ -90,135 +90,176 @@ for t in range(0, task_days): ...@@ -90,135 +90,176 @@ for t in range(0, task_days):
# CPT日均点击 # CPT日均点击
CPT_daily_click_sql = """SELECT partition_date,count(1) as pv CPT_daily_click_sql = """SELECT partition_date,count(1) as pv
FROM online.bl_hdfs_maidian_updates FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= '${start_date}' WHERE partition_date >= '{start_date}'
and partition_date < '${end_date}' and partition_date < '{end_date}'
AND ((ACTION = 'search_result_welfare_click_item' AND PAGE_NAME = 'search_result_welfare' AND PARAMS['transaction_type'] = 'advertise') AND ((ACTION = 'search_result_welfare_click_item' AND PAGE_NAME = 'search_result_welfare' AND PARAMS['transaction_type'] = 'advertise')
OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'category' AND PARAMS['transaction_type'] = 'operating' AND PARAMS['tab_name'] = 'service') OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'category' AND PARAMS['transaction_type'] = 'operating' AND PARAMS['tab_name'] = 'service')
OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'welfare_home_list_item' and PARAMS['transaction_type'] = 'advertise') OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'welfare_home_list_item' and PARAMS['transaction_type'] = 'advertise')
OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'welfare_list' AND PARAMS['transaction_type'] = 'advertise') OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'welfare_list' AND PARAMS['transaction_type'] = 'advertise')
OR (ACTION = 'on_click_card' AND PARAMS['card_content_type'] = 'service' AND PARAMS['page_name'] IN ('new_sign','search_result_welfare','category','welfare_home_list_item','welfare_list') AND PARAMS['transaction_type'] = 'advertise')) OR (ACTION = 'on_click_card' AND PARAMS['card_content_type'] = 'service' AND PARAMS['page_name'] IN ('new_sign','search_result_welfare','category','welfare_home_list_item','welfare_list') AND PARAMS['transaction_type'] = 'advertise'))
group BY partition_date""".format(start_date=yesterday_str,end_date=today_str) group BY partition_date""".format(start_date=yesterday_str, end_date=today_str)
CPT_daily_click_df = spark.sql(CPT_daily_click_sql) CPT_daily_click_df = spark.sql(CPT_daily_click_sql)
CPT_daily_click_df.createOrReplaceTempView("cpt_daily_click_df") # CPT_daily_click_df.createOrReplaceTempView("cpt_daily_click_df")
sql_res = CPT_daily_click_df.collect()
for res in sql_res:
print(res)
# 商详页PV # 商详页PV
bus_detail_pv_sql = """SELECT bus_detail_pv_sql = """SELECT
partition_date,count(1) welfare_pv partition_date,count(1) welfare_pv
FROM FROM
( (
SELECT cl_id,partition_date SELECT cl_id,partition_date
FROM bl_hdfs_maidian_updates FROM bl_hdfs_maidian_updates
WHERE partition_date >='{start_date}'and partition_date < '{end_date}' WHERE partition_date >='{start_date}'and partition_date < '{end_date}'
AND action='page_view' AND action='page_view'
AND params['page_name'] = 'welfare_detail' AND params['page_name'] = 'welfare_detail'
)a1 )a1
JOIN JOIN
( (
SELECT device_id,partition_date SELECT device_id,partition_date
from online.ml_device_day_active_status from online.ml_device_day_active_status
WHERE partition_date >='{start_date}'and partition_date < '{end_date}' WHERE partition_date >='{start_date}'and partition_date < '{end_date}'
AND active_type in ('1','2','4') AND active_type in ('1','2','4')
)a2 )a2
on a2.device_id = a1.cl_id on a2.device_id = a1.cl_id
AND a2.partition_date=a1.partition_date AND a2.partition_date=a1.partition_date
group by partition_date""".format(start_date=yesterday_str,end_date=today_str,) group by partition_date""".format(start_date=yesterday_str, end_date=today_str, )
bus_detail_pv_df = spark.sql(bus_detail_pv_sql) bus_detail_pv_df = spark.sql(bus_detail_pv_sql)
bus_detail_pv_df.createOrReplaceTempView("bus_detail_pv_df") # bus_detail_pv_df.createOrReplaceTempView("bus_detail_pv_df")
sql_res = bus_detail_pv_df.collect()
for res in sql_res:
print(res)
# cpc当日预算(有效口径) # cpc当日预算(有效口径)
sql = """SELECT day_id,sum(budget) as budget cpc_budget_sql = """SELECT day_id,sum(budget) as budget
FROM
(
SELECT T1.day_id,T1.merchant_doctor_id,case when merchant_budget>=tot_service_budget then tot_service_budget else merchant_budget end as budget
FROM
(
SELECT
substr(clicklog.create_time,1,10) AS day_id
,clicklog.merchant_doctor_id
,max(merchant_budget) as merchant_budget --商户预算
FROM
(
SELECT id,promote_id,price,service_budget,merchant_budget,merchant_doctor_id,create_time,recharge
FROM online.tl_hdfs_cpc_clicklog_view
WHERE partition_date='${partition_date}'
AND regexp_replace(substr(create_time,1,10),'-','')>= '${start_date}'
AND regexp_replace(substr(create_time,1,10),'-','')<'${end_date}'
)clicklog
group by substr(clicklog.create_time,1,10),clicklog.merchant_doctor_id
)T1
LEFT JOIN
(
SELECT
day_id
,merchant_doctor_id
,sum(service_budget) as tot_service_budget
FROM FROM
( (
SELECT SELECT T1.day_id,T1.merchant_doctor_id,case when merchant_budget>=tot_service_budget then tot_service_budget else merchant_budget end as budget
substr(clicklog.create_time,1,10) AS day_id FROM
,clicklog.merchant_doctor_id,clicklog.service_id
,max(service_budget) as service_budget
FROM
( (
SELECT id,promote_id,price,service_budget,merchant_budget,merchant_doctor_id,service_id,create_time SELECT
FROM online.tl_hdfs_cpc_clicklog_view substr(clicklog.create_time,1,10) AS day_id
WHERE partition_date='${partition_date}' ,clicklog.merchant_doctor_id
AND regexp_replace(substr(create_time,1,10),'-','')>= '${start_date}' ,max(merchant_budget) as merchant_budget --商户预算
AND regexp_replace(substr(create_time,1,10),'-','')<'${end_date}' FROM
)clicklog (
GROUP BY substr(clicklog.create_time,1,10),clicklog.merchant_doctor_id,clicklog.service_id SELECT id,promote_id,price,service_budget,merchant_budget,merchant_doctor_id,create_time,recharge
)service_budget FROM online.tl_hdfs_cpc_clicklog_view
GROUP BY day_id,merchant_doctor_id WHERE partition_date='{partition_date}'
)T2 AND regexp_replace(substr(create_time,1,10),'-','')>= '{start_date}'
ON T1.day_id=T2.day_id AND regexp_replace(substr(create_time,1,10),'-','')<'{end_date}'
AND T1.merchant_doctor_id=T2.merchant_doctor_id )clicklog
)T group by substr(clicklog.create_time,1,10),clicklog.merchant_doctor_id
GROUP BY day_id )T1
""" LEFT JOIN
device_df = spark.sql(sql) (
SELECT
device_df.show(1, False) day_id
sql_res = device_df.collect() ,merchant_doctor_id
print("-----------------------------------------------------------------------------") ,sum(service_budget) as tot_service_budget
FROM
(
SELECT
substr(clicklog.create_time,1,10) AS day_id
,clicklog.merchant_doctor_id,clicklog.service_id
,max(service_budget) as service_budget
FROM
(
SELECT id,promote_id,price,service_budget,merchant_budget,merchant_doctor_id,service_id,create_time
FROM online.tl_hdfs_cpc_clicklog_view
WHERE partition_date='{partition_date}'
AND regexp_replace(substr(create_time,1,10),'-','')>= '{start_date}'
AND regexp_replace(substr(create_time,1,10),'-','')<'{end_date}'
)clicklog
GROUP BY substr(clicklog.create_time,1,10),clicklog.merchant_doctor_id,clicklog.service_id
)service_budget
GROUP BY day_id,merchant_doctor_id
)T2
ON T1.day_id=T2.day_id
AND T1.merchant_doctor_id=T2.merchant_doctor_id
)T
GROUP BY day_id
""".format(start_date=yesterday_str, end_date=today_str, partition_date=partition_date_str)
cpc_budget_df = spark.sql(cpc_budget_sql)
cpc_budget_df.show(1, False)
sql_res = cpc_budget_df.collect()
for res in sql_res: for res in sql_res:
# print(res)
day_id = res.day_id
device_os_type = res.device_os_type
active_type = res.active_type
grey_type = res.grey_type
page_name = res.page_name
content_pv = res.content_pv
content_uv = res.content_uv
wel_exp_pv = res.wel_exp_pv
content_exp_pv = res.content_exp_pv
meigou_ctr=res.meigou_ctr
if not meigou_ctr: meigou_ctr = 0
grey_meigou_ctr=res.grey_meigou_ctr
neirong_ctr=res.neirong_ctr
if not neirong_ctr: neirong_ctr = 0
grey_neirong_ctr=res.grey_neirong_ctr
wel_click_pv = res.wel_click_pv
content_click_pv = res.content_click_pv
slide_wel_click_pv = res.slide_wel_click_pv
self_wel_click_pv = res.self_wel_click_pv
partition_day = res.PARTITION_DAY
pid = hashlib.md5((day_id + device_os_type + active_type + grey_type + page_name).encode("utf8")).hexdigest()
instert_sql = """replace into conent_detail_page_grayscale_ctr(
day_id,device_os_type,active_type,grey_type,page_name,content_pv,content_uv,wel_exp_pv,
content_exp_pv,wel_click_pv,content_click_pv,slide_wel_click_pv,self_wel_click_pv,partition_day,pid,meigou_ctr,neirong_ctr,
grey_meigou_ctr,grey_neirong_ctr) VALUES('{day_id}','{device_os_type}','{active_type}','{grey_type}','{page_name}',{content_pv},{content_uv},
{wel_exp_pv},{content_exp_pv},{wel_click_pv},{content_click_pv},{slide_wel_click_pv},{self_wel_click_pv},'{partition_day}','{pid}',{meigou_ctr},{neirong_ctr},{grey_meigou_ctr},{grey_neirong_ctr});""".format(
day_id=day_id,device_os_type=device_os_type,active_type=active_type,grey_type=grey_type,page_name=page_name,
content_pv=content_pv,content_uv=content_uv,wel_exp_pv=wel_exp_pv,content_exp_pv=content_exp_pv,wel_click_pv=wel_click_pv,
content_click_pv=content_click_pv,slide_wel_click_pv=slide_wel_click_pv,self_wel_click_pv=self_wel_click_pv,meigou_ctr=meigou_ctr,neirong_ctr=neirong_ctr,
partition_day=partition_day, pid=pid,grey_neirong_ctr=grey_neirong_ctr,grey_meigou_ctr=grey_meigou_ctr
)
print(instert_sql)
# cursor.execute("set names 'UTF8'")
res = cursor.execute(instert_sql)
db.commit()
print(res) print(res)
# cursor.executemany()
db.close() cpc_income_total_consume_sql = """
\ No newline at end of file select partition_day,
sum(case when advertise_type = 'cpc' AND advertise_business_type in('service') and advertise_calculate_type='cpc_log' then cpc_click_num end) cpc_click_num,
sum(case when advertise_type = 'cpc' AND advertise_business_type in('service') and advertise_calculate_type='cpc_flownext' then proportion_expend_amount end) cpc_proportion_expend_amount,
sum(case when advertise_type = 'cpc' AND advertise_business_type in('service') and advertise_calculate_type='cpc_flownext' then proportion_expend_recharge_amount end) cpc_proportion_expend_recharge_amount,
SUM(CASE
WHEN advertise_type = 'cpc' AND advertise_calculate_type = 'cpc_flownext' THEN
proportion_expend_amount
WHEN advertise_type = 'cpt' AND advertise_calculate_type = 'cpt_schedule' THEN
proportion_expend_amount
WHEN advertise_type IN ('browse', 'message', 'valueadded','rechargededuction') THEN
proportion_expend_amount
WHEN advertise_type = 'adjustment' AND advertise_calculate_type ='adjustment_flow' THEN
proportion_expend_amount
ELSE
0
END) tol_proportion_expend_amount
from ml.ml_c_ct_mc_merchantadclassify_indic_d
where partition_day>='{start_date}' AND partition_day <'{end_date}'
group by partition_day
""".format(start_date=yesterday_str, end_date=today_str)
cpc_income_total_consume_df = spark.sql(cpc_income_total_consume_sql)
cpc_income_total_consume_df.show(1, False)
cpc_income_total_consume_df_res = cpc_income_total_consume_df.collect()
for res in cpc_income_total_consume_df_res:
print(res)
print("-----------------------------------------------------------------------------")
# for res in sql_res:
# # print(res)
# day_id = res.day_id
# device_os_type = res.device_os_type
# active_type = res.active_type
# grey_type = res.grey_type
# page_name = res.page_name
# content_pv = res.content_pv
# content_uv = res.content_uv
# wel_exp_pv = res.wel_exp_pv
# content_exp_pv = res.content_exp_pv
# meigou_ctr = res.meigou_ctr
# if not meigou_ctr: meigou_ctr = 0
# grey_meigou_ctr = res.grey_meigou_ctr
# neirong_ctr = res.neirong_ctr
# if not neirong_ctr: neirong_ctr = 0
# grey_neirong_ctr = res.grey_neirong_ctr
#
# wel_click_pv = res.wel_click_pv
# content_click_pv = res.content_click_pv
# slide_wel_click_pv = res.slide_wel_click_pv
# self_wel_click_pv = res.self_wel_click_pv
# partition_day = res.PARTITION_DAY
# pid = hashlib.md5((day_id + device_os_type + active_type + grey_type + page_name).encode("utf8")).hexdigest()
# instert_sql = """replace into conent_detail_page_grayscale_ctr(
# day_id,device_os_type,active_type,grey_type,page_name,content_pv,content_uv,wel_exp_pv,
# content_exp_pv,wel_click_pv,content_click_pv,slide_wel_click_pv,self_wel_click_pv,partition_day,pid,meigou_ctr,neirong_ctr,
# grey_meigou_ctr,grey_neirong_ctr) VALUES('{day_id}','{device_os_type}','{active_type}','{grey_type}','{page_name}',{content_pv},{content_uv},
# {wel_exp_pv},{content_exp_pv},{wel_click_pv},{content_click_pv},{slide_wel_click_pv},{self_wel_click_pv},'{partition_day}','{pid}',{meigou_ctr},{neirong_ctr},{grey_meigou_ctr},{grey_neirong_ctr});""".format(
# day_id=day_id, device_os_type=device_os_type, active_type=active_type, grey_type=grey_type,
# page_name=page_name,
# content_pv=content_pv, content_uv=content_uv, wel_exp_pv=wel_exp_pv, content_exp_pv=content_exp_pv,
# wel_click_pv=wel_click_pv,
# content_click_pv=content_click_pv, slide_wel_click_pv=slide_wel_click_pv,
# self_wel_click_pv=self_wel_click_pv, meigou_ctr=meigou_ctr, neirong_ctr=neirong_ctr,
# partition_day=partition_day, pid=pid, grey_neirong_ctr=grey_neirong_ctr, grey_meigou_ctr=grey_meigou_ctr
# )
# print(instert_sql)
# # cursor.execute("set names 'UTF8'")
# res = cursor.execute(instert_sql)
# db.commit()
# print(res)
# # cursor.executemany()
# db.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment