Commit 0c8ac4ac authored by litaolemo's avatar litaolemo

update

parent 13ccecd1
......@@ -89,7 +89,7 @@ for t in range(1, task_days):
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
#quanzhong_dau
# quanzhong_dau
quanzhong_dau_sql = """
--quanzhong_dau
SELECT mas.partition_date
......@@ -128,14 +128,15 @@ for t in range(1, task_days):
AND active_type in ('1','2','4')
) mas
GROUP BY mas.partition_date
""".format(start_date=yesterday_str,end_date=today_str)
""".format(start_date=yesterday_str, end_date=today_str)
print(quanzhong_dau_sql)
quanzhong_dau_df = spark.sql(quanzhong_dau_sql)
quanzhong_dau_df.createOrReplaceTempView("quanzhong_dau_view")
quanzhong_dau_df.show(1)
sql_res = quanzhong_dau_df.collect()
for res in sql_res:
print(res)
quanzhong_dau = res.quanzhong_dau
partition_date = res.partition_date
# DAU
DAU_sql = """
......@@ -162,14 +163,14 @@ for t in range(1, task_days):
LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type
LATERAL VIEW explode(mas.active_type) t2 AS active_type
GROUP BY mas.partition_date
""".format(start_date=yesterday_str,end_date=today_str)
""".format(start_date=yesterday_str, end_date=today_str)
print(DAU_sql)
dau_df = spark.sql(DAU_sql)
dau_df.createOrReplaceTempView("dau_view")
dau_df.show(1)
sql_res = dau_df.collect()
for res in sql_res:
print(res)
dau = res.dau
# CPT日均点击
cpc_daily_click_sql = r"""
......@@ -183,7 +184,7 @@ OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'welfare_home_list_item'
OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'welfare_list' AND PARAMS['transaction_type'] = 'advertise')
OR (ACTION = 'on_click_card' AND PARAMS['card_content_type'] = 'service' AND PARAMS['page_name'] IN ('new_sign','search_result_welfare','category','welfare_home_list_item','welfare_list') AND PARAMS['transaction_type'] = 'advertise'))
group BY partition_date
""".format(partition_day=yesterday_str, end_date=today_str,start_date=yesterday_str)
""".format(partition_day=yesterday_str, end_date=today_str, start_date=yesterday_str)
print(cpc_daily_click_sql)
cpc_daily_click_df = spark.sql(cpc_daily_click_sql)
......@@ -191,8 +192,7 @@ group BY partition_date
cpc_daily_click_df.show(1)
sql_res = cpc_daily_click_df.collect()
for res in sql_res:
print(res)
pv = res.pv
# 商详页PV
bus_detail_sql = r"""
......@@ -258,7 +258,7 @@ group BY partition_date
on dev_channel.device_id = page.cl_id
AND dev_channel.partition_date = page.partition_date
GROUP BY page.partition_date
""".format(partition_day=yesterday_str, end_date=today_str,start_date=yesterday_str)
""".format(partition_day=yesterday_str, end_date=today_str, start_date=yesterday_str)
print(bus_detail_sql)
bus_detail_df = spark.sql(bus_detail_sql)
......@@ -266,8 +266,14 @@ group BY partition_date
bus_detail_df.show(1)
sql_res = bus_detail_df.collect()
for res in sql_res:
print(res)
search_home_pv = res.search_home_pv
search_home_uv = res.search_home_uv
referrer_search_hexin_pv = res.referrer_search_hexin_pv
referrer_search_welfare_pv = res.referrer_search_welfare_pv
referrer_search_neirong_pv = res.referrer_search_neirong_pv
referrer_search_neirong_uv_1000 = res.referrer_search_neirong_uv_1000
referrer_search_neirong_pagestay = res.referrer_search_neirong_pagestay
# print(res)
# --cpc当日预算(有效口径)
cpc_budget_sql = r"""
......@@ -320,7 +326,7 @@ FROM
AND T1.merchant_doctor_id=T2.merchant_doctor_id
)T
GROUP BY day_id
""".format(partition_date=yesterday_str, end_date=today_str,start_date=yesterday_str)
""".format(partition_date=yesterday_str, end_date=today_str, start_date=yesterday_str)
print(cpc_budget_sql)
cpc_budget_df = spark.sql(cpc_budget_sql)
......@@ -328,6 +334,7 @@ GROUP BY day_id
cpc_budget_df.show(1)
sql_res = cpc_budget_df.collect()
for res in sql_res:
budget = res.budget
print(res)
# cpc收入、广告总消耗
......@@ -353,7 +360,7 @@ SUM(CASE
from ml.ml_c_ct_mc_merchantadclassify_indic_d
where partition_day>='{start_date}' AND partition_day <'{end_date}'
group by partition_day
""".format(partition_day=yesterday_str, end_date=today_str,start_date=yesterday_str)
""".format(partition_day=yesterday_str, end_date=today_str, start_date=yesterday_str)
print(cpc_income_sql)
cpc_income_df = spark.sql(cpc_income_sql)
......@@ -361,8 +368,15 @@ group by partition_day
cpc_income_df.show(1)
sql_res = cpc_income_df.collect()
for res in sql_res:
cpc_click_num = res.cpc_click_num
cpt_click_num = res.cpt_click_num
cpc_proportion_expend_amount = res.cpc_proportion_expend_amount
cpc_proportion_expend_recharge_amount = res.cpc_proportion_expend_recharge_amount
tol_proportion_expend_amount = res.tol_proportion_expend_amount
print(res)
print("--------------------------------------------")
print(cpt_click_num)
#
# out_put_sql = """
# select bus_detail.referrer_search_welfare_pv / dau_view.dau as pv_div_dau,
......@@ -372,29 +386,29 @@ group by partition_day
# cpc_income.cpc_proportion_expend_recharge_amount/cpc_income.cpc_click_num as cpc_item_pricing,
# cpc_income.tol_proportion_expend_amount as tol_proportion_expend_amount
# """
# out_df = spark.sql(out_put_sql)
# # out_df.createOrReplaceTempView("out_df")
# out_df.show(1)
# sql_res = out_df.collect()
# for res in sql_res:
# print(res)
# for active_type in res_dict:
# db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
# db='jerry_prod')
# cursor = db.cursor()
# partition_date = yesterday_str
# pid = hashlib.md5((partition_date + device_os_type + active_type).encode("utf8")).hexdigest()
# cpc_daily_click_sql = """replace into search_diary_ctr(
# partition_date,device_os_type,active_type,pid,click_num,exposure,search_ctr) VALUES('{partition_date}','{device_os_type}','{active_type}','{pid}',{click_num},{exposure},{search_ctr});""".format(
# partition_date=partition_date, device_os_type=device_os_type, active_type=active_type, pid=pid, click_num=click_num,
# exposure=exposure, search_ctr=search_ctr
# )
# print(instert_sql)
# # cursor.execute("set names 'UTF8'")
# res = cursor.execute(instert_sql)
# db.commit()
# print(res)
# # cursor.executemany()
# db.close()
pv_div_dau = referrer_search_welfare_pv/dau
pv_div_quanzhong_dau = referrer_search_welfare_pv/quanzhong_dau
ad_flow_rat = (cpt_click_num + cpc_click_num)/referrer_search_welfare_pv
budget_consumption_rate = cpc_proportion_expend_amount/budget
cpc_item_pricing = cpc_proportion_expend_recharge_amount/cpc_click_num
# tol_proportion_expend_amount
db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
db='jerry_prod')
cursor = db.cursor()
partition_date = yesterday_str
pid = hashlib.md5(partition_date.encode("utf8")).hexdigest()
cpc_daily_click_sql = """replace into ecommerce_income_report(
pv_div_dau,pv_div_quanzhong_dau,ad_flow_rat,budget_consumption_rate,cpc_item_pricing,tol_proportion_expend_amount,partition_day,day_id,pid) VALUES(
{pv_div_dau},{pv_div_quanzhong_dau},{ad_flow_rat},{budget_consumption_rate},{cpc_item_pricing},{tol_proportion_expend_amount},'{partition_day}','{day_id}','{pid}');""".format(
pv_div_dau=pv_div_dau,pv_div_quanzhong_dau=pv_div_quanzhong_dau,ad_flow_rat=ad_flow_rat,budget_consumption_rate=budget_consumption_rate,
cpc_item_pricing=cpc_item_pricing,tol_proportion_expend_amount=tol_proportion_expend_amount,partition_day=today_str,
day_id=today_str,pid=pid
)
print(cpc_daily_click_sql)
# cursor.execute("set names 'UTF8'")
res = cursor.execute(cpc_daily_click_sql)
db.commit()
print(res)
# cursor.executemany()
db.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment