ecommerce_income_report.py 24.1 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
# -*- coding:UTF-8 -*-
# @Time  : 2020/9/11 17:37
# @File  : ecommerce_income_report.py
# @email : litao@igengmei.com
# @author : litao


# -*- coding:UTF-8 -*-
# @Time  : 2020/9/4 17:07
# @File  : search_meigou_ctr.py
# @email : litao@igengmei.com
# @author : litao

import hashlib
import json

import pymysql
import xlwt, datetime
import redis
# from pyhive import hive
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch_7 import Elasticsearch
from elasticsearch_7.helpers import scan
import sys
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame


# from pyspark.sql.functions import lit
# import pytispark.pytispark as pti


def con_sql(sql):
    # 从数据库的表里获取数据

    db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
                         db='jerry_prod')
    cursor = db.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
    db.close()
    return result


startTime = time.time()
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
sparkConf.set("spark.tispark.plan.allow_index_read", True)
sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
sparkConf.set("mapreduce.map.output.compress", False)
sparkConf.set("prod.gold.jdbcuri",
              "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true")
sparkConf.set("prod.mimas.jdbcuri",
              "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true")
sparkConf.set("prod.gaia.jdbcuri",
              "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true")
sparkConf.set("prod.tidb.jdbcuri",
              "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.jerry.jdbcuri",
              "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
sparkConf.set("prod.tidb.database", "jerry_prod")
sparkConf.setAppName("search_diary_ctr")
spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
         .config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName(
    "search_diary_ctr").enableHiveSupport().getOrCreate())

spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")

task_list = []
task_days = 60
for t in range(1, task_days):
    day_num = 0 - t
    now = (datetime.datetime.now() + datetime.timedelta(days=day_num))
    last_30_day_str = (now + datetime.timedelta(days=-30)).strftime("%Y%m%d")
    today_str = now.strftime("%Y%m%d")
    yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
    one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")

    # quanzhong_dau
    quanzhong_dau_sql = """
    --quanzhong_dau 
        SELECT mas.partition_date
        ,round(count(DISTINCT CASE WHEN device_type = '老活' AND device_os_type = 'android' AND channel_type = 'AI' THEN device_id END)*0.14
              +count(DISTINCT CASE WHEN device_type = '老活' AND device_os_type = 'android' AND channel_type = '医美' THEN device_id END)*0.64
              +count(DISTINCT CASE WHEN device_type = '新增' AND device_os_type = 'android' AND channel_type = 'AI' THEN device_id END)*0.08
              +count(DISTINCT CASE WHEN device_type = '新增' AND device_os_type = 'android' AND channel_type = '医美' THEN device_id END)*0.19
              +count(DISTINCT CASE WHEN device_type = '老活' AND device_os_type = 'ios' AND channel_type = 'AI' THEN device_id END)*0.32
              +count(DISTINCT CASE WHEN device_type = '老活' AND device_os_type = 'ios' AND channel_type = '积分墙' THEN device_id END)*0.28
              +count(DISTINCT CASE WHEN device_type = '老活' AND device_os_type = 'ios' AND channel_type = '医美' THEN device_id END)*1.00
              +count(DISTINCT CASE WHEN device_type = '新增' AND device_os_type = 'ios' AND channel_type = 'AI' THEN device_id END)*0.19
              +count(DISTINCT CASE WHEN device_type = '新增' AND device_os_type = 'ios' AND channel_type = '积分墙' THEN device_id END)*0.03
              +count(DISTINCT CASE WHEN device_type = '新增' AND device_os_type = 'ios' AND channel_type = '医美' THEN device_id END)*0.57,0) as quanzhong_dau
        FROM
        (
            SELECT
            partition_date,m.device_id,device_os_type 
            ,case WHEN active_type = '4'  THEN '老活' 
                  WHEN active_type  in ('1','2')  then '新增' END as device_type 
            ,CASE WHEN is_ai_channel = 'true' THEN 'AI'  
                   WHEN first_channel_source_type in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
                     ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
                     ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
                     ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
                     ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
                     ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
                     ,'promotion_shike','promotion_julang_jl03','promotion_zuimei','','unknown')  then '积分墙' ELSE '医美' END as channel_type
            FROM online.ml_device_day_active_status m
                LEFT JOIN 
                (SELECT code,is_ai_channel,partition_day
                 FROM DIM.DIM_AI_CHANNEL_ZP_NEW
                 WHERE partition_day>= '{start_date}' AND partition_day < '{end_date}'  ) tmp
                ON  m.partition_date=tmp.partition_day AND first_channel_source_type=code
            where partition_date >= '{start_date}'  
            AND partition_date < '{end_date}' 
            AND active_type in ('1','2','4')
        ) mas
        GROUP BY mas.partition_date
    """.format(start_date=yesterday_str, end_date=today_str)
    print(quanzhong_dau_sql)
    quanzhong_dau_df = spark.sql(quanzhong_dau_sql)
    quanzhong_dau_df.createOrReplaceTempView("quanzhong_dau_view")
    quanzhong_dau_df.show(1)
    sql_res = quanzhong_dau_df.collect()
    for res in sql_res:
        quanzhong_dau = res.quanzhong_dau
        partition_date = res.partition_date

    # DAU
    DAU_sql = """
        SELECT mas.partition_date,count(DISTINCT mas.device_id) as dau
        FROM
        (
            SELECT
            partition_date,m.device_id
            ,array(device_os_type ,'合计') as device_os_type
            ,array(case WHEN active_type = '4'  THEN '老活' 
                  WHEN active_type  in ('1','2')  then '新增' END ,'合计') as active_type 
            ,array(CASE WHEN is_ai_channel = 'true' THEN 'AI'  ELSE '其他' END , '合计') as channel
            FROM online.ml_device_day_active_status m
                LEFT JOIN 
                (SELECT code,is_ai_channel,partition_day
                 FROM DIM.DIM_AI_CHANNEL_ZP_NEW
                 WHERE partition_day>= '{start_date}' AND partition_day < '{end_date}'  ) tmp
                ON  m.partition_date=tmp.partition_day AND first_channel_source_type=code
            where partition_date >= '{start_date}'  
            AND partition_date < '{end_date}' 
            AND active_type in ('1','2','4')
        ) mas
        LATERAL VIEW explode(mas.channel) t2 AS channel
        LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type
        LATERAL VIEW explode(mas.active_type) t2 AS active_type
        GROUP BY mas.partition_date
    """.format(start_date=yesterday_str, end_date=today_str)
    print(DAU_sql)
    dau_df = spark.sql(DAU_sql)
    dau_df.createOrReplaceTempView("dau_view")
    dau_df.show(1)
    sql_res = dau_df.collect()
    for res in sql_res:
        dau = res.dau

    # CPT日均点击
    cpc_daily_click_sql = r"""
SELECT partition_date,count(1) as pv
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >=  '{start_date}'
and partition_date < '{end_date}' 
AND ((ACTION = 'search_result_welfare_click_item' AND PAGE_NAME = 'search_result_welfare' AND PARAMS['transaction_type'] = 'advertise')
OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'category' AND PARAMS['transaction_type'] = 'operating' AND PARAMS['tab_name'] = 'service')
OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'welfare_home_list_item' and PARAMS['transaction_type'] = 'advertise')
OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'welfare_list' AND PARAMS['transaction_type'] = 'advertise') 
OR (ACTION = 'on_click_card' AND PARAMS['card_content_type'] = 'service' AND PARAMS['page_name'] IN ('new_sign','search_result_welfare','category','welfare_home_list_item','welfare_list') AND PARAMS['transaction_type'] = 'advertise')) 
group BY partition_date
    """.format(partition_day=yesterday_str, end_date=today_str, start_date=yesterday_str)

    print(cpc_daily_click_sql)
    cpc_daily_click_df = spark.sql(cpc_daily_click_sql)
    cpc_daily_click_df.createOrReplaceTempView("cpc_daily_click")
    cpc_daily_click_df.show(1)
    sql_res = cpc_daily_click_df.collect()
    for res in sql_res:
        pv = res.pv

    # 商详页PV
    pv_sql = """
    SELECT 
    a1.partition_date,count(1) welfare_pv
FROM 
(
    SELECT cl_id,partition_date
    FROM online.bl_hdfs_maidian_updates
    WHERE partition_date >='{start_date}'and partition_date < '{end_date}'
    AND action='page_view'
    AND params['page_name'] = 'welfare_detail'
)a1
JOIN
(
    SELECT device_id,partition_date
    from online.ml_device_day_active_status
    WHERE partition_date >='{start_date}'and partition_date < '{end_date}'
    AND active_type in ('1','2','4')  
)a2
on a2.device_id = a1.cl_id
AND a2.partition_date=a1.partition_date
group by a1.partition_date
    """.format(start_date=yesterday_str,end_date=today_str)
    all_pv_df = spark.sql(pv_sql)
    all_pv_df.show(1)
    sql_res = all_pv_df.collect()
    for res in sql_res:
        welfare_pv = res.welfare_pv


    # 搜索商详页PV
    bus_detail_sql = r"""
        --页面浏览pvuv
        SELECT
            page.partition_date as partition_date
            ,count(case when page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor') then page.cl_id else NULL end) as search_home_pv
            ,count(distinct case when page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor') then page.cl_id else NULL end) as search_home_uv
            ,count(CASE when referrer in  ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
                        ,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
                        ,'search_result_wiki','search_result_question_answer') and page_name in ('welfare_detail','organization_detail','expert_detail') THEN page.cl_id else NULL END) as referrer_search_hexin_pv
            ,count(CASE when referrer in  ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
                        ,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
                        ,'search_result_wiki','search_result_question_answer') and page_name in ('welfare_detail') THEN page.cl_id else NULL END) as referrer_search_welfare_pv
            ,count(CASE when referrer in  ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
                        ,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
                        ,'search_result_wiki','search_result_question_answer') and page_name in ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail'
                        ,'question_answer_detail','article_detail') THEN page.cl_id else NULL END) as referrer_search_neirong_pv
            ,count(DISTINCT CASE WHEN referrer in  ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
                        ,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
                        ,'search_result_wiki','search_result_question_answer') and page_name in ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail'
                        ,'question_answer_detail','article_detail') and page_stay >= '0' and page_stay < '1000' THEN page.cl_id else NULL END) as referrer_search_neirong_uv_1000
            ,sum(CASE WHEN referrer in  ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
                        ,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
                        ,'search_result_wiki','search_result_question_answer') and page_name in ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail'
                        ,'question_answer_detail','article_detail') and page_stay >= '0' and page_stay < '1000' THEN page.page_stay else NULL END) as  referrer_search_neirong_pagestay
        FROM
        (
            SELECT cl_id,partition_date,page_name,params['referrer'] as referrer,page_stay
            FROM online.bl_hdfs_maidian_updates
            WHERE partition_date >= '{start_date}'  
              AND partition_date < '{end_date}'  
              AND action='page_view'
              AND page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor'
                        ,'diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail'
                        ,'question_answer_detail','article_detail','welfare_detail','organization_detail','expert_detail','level_one_plan_detail')
        )page  
        JOIN
        (
            SELECT partition_date,device_id,t2.active_type,t2.channel,t2.device_os_type
            FROM
            (
                SELECT
                partition_date,m.device_id
                ,array(device_os_type ,'合计') as device_os_type
                ,array(case WHEN active_type = '4'  THEN '老活' 
                      WHEN active_type  in ('1','2')  then '新增' END ,'合计') as active_type 
                ,array(CASE WHEN is_ai_channel = 'true' THEN 'AI'  ELSE '其他' END , '合计') as channel
                FROM online.ml_device_day_active_status m
                    LEFT JOIN 
                    (SELECT code,is_ai_channel,partition_day
                     FROM DIM.DIM_AI_CHANNEL_ZP_NEW
                     WHERE partition_day>= '{start_date}' AND partition_day < '{end_date}'  ) tmp
                    ON  m.partition_date=tmp.partition_day AND first_channel_source_type=code
                where partition_date >= '{start_date}'  
                AND partition_date < '{end_date}' 
                AND active_type in ('1','2','4')
            ) mas
            LATERAL VIEW explode(mas.channel) t2 AS channel
            LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type
            LATERAL VIEW explode(mas.active_type) t2 AS active_type
        )dev_channel
          on dev_channel.device_id = page.cl_id
            AND dev_channel.partition_date = page.partition_date
        GROUP BY page.partition_date
        """.format(partition_day=yesterday_str, end_date=today_str, start_date=yesterday_str)

    print(bus_detail_sql)
    bus_detail_df = spark.sql(bus_detail_sql)
    bus_detail_df.createOrReplaceTempView("bus_detail")
    bus_detail_df.show(1)
    sql_res = bus_detail_df.collect()
    for res in sql_res:
        search_home_pv = res.search_home_pv
        search_home_uv = res.search_home_uv
        referrer_search_hexin_pv = res.referrer_search_hexin_pv
        referrer_search_welfare_pv = res.referrer_search_welfare_pv
        referrer_search_neirong_pv = res.referrer_search_neirong_pv
        referrer_search_neirong_uv_1000 = res.referrer_search_neirong_uv_1000
        referrer_search_neirong_pagestay = res.referrer_search_neirong_pagestay
        # print(res)

    # --cpc当日预算(有效口径)
    cpc_budget_sql = r"""
SELECT day_id,sum(budget) as budget
FROM
(
    SELECT T1.day_id,T1.merchant_doctor_id,case when merchant_budget>=tot_service_budget then tot_service_budget else merchant_budget end as budget
    FROM
    (
        SELECT 
          substr(clicklog.create_time,1,10) AS day_id
          ,clicklog.merchant_doctor_id 
          ,max(merchant_budget) as merchant_budget --商户预算
        FROM 
        (
            SELECT id,promote_id,price,service_budget,merchant_budget,merchant_doctor_id,create_time,recharge
            FROM online.tl_hdfs_cpc_clicklog_view
            WHERE partition_date='{partition_date}'
            AND regexp_replace(substr(create_time,1,10),'-','')>= '{start_date}'
            AND regexp_replace(substr(create_time,1,10),'-','')<'{end_date}'
        )clicklog
        group by substr(clicklog.create_time,1,10),clicklog.merchant_doctor_id 
    )T1 
    LEFT JOIN
    (
        SELECT 
             day_id
             ,merchant_doctor_id
            ,sum(service_budget) as tot_service_budget
        FROM
        (
            SELECT 
              substr(clicklog.create_time,1,10) AS day_id
              ,clicklog.merchant_doctor_id,clicklog.service_id
              ,max(service_budget) as service_budget
            
            FROM 
            (
                SELECT id,promote_id,price,service_budget,merchant_budget,merchant_doctor_id,service_id,create_time
                FROM  online.tl_hdfs_cpc_clicklog_view
                WHERE partition_date='{partition_date}'
                AND regexp_replace(substr(create_time,1,10),'-','')>= '{start_date}'
                AND regexp_replace(substr(create_time,1,10),'-','')<'{end_date}'
            )clicklog
            GROUP BY substr(clicklog.create_time,1,10),clicklog.merchant_doctor_id,clicklog.service_id
        )service_budget
        GROUP BY day_id,merchant_doctor_id
    )T2 
    ON T1.day_id=T2.day_id
    AND T1.merchant_doctor_id=T2.merchant_doctor_id
)T
GROUP BY day_id
            """.format(partition_date=yesterday_str, end_date=today_str, start_date=yesterday_str)

    print(cpc_budget_sql)
    cpc_budget_df = spark.sql(cpc_budget_sql)
    cpc_budget_df.createOrReplaceTempView("cpc_budget")
    cpc_budget_df.show(1)
    sql_res = cpc_budget_df.collect()
    for res in sql_res:
        budget = res.budget
        print(res)

    # cpc收入、广告总消耗
    cpc_income_sql = r"""
    select partition_day,
sum(case when advertise_type = 'cpc' AND advertise_business_type in('service') and advertise_calculate_type='cpc_log' then cpc_click_num end)   cpc_click_num,---    当天cpc商品点击量
sum(case when advertise_type = 'cpc' AND advertise_business_type in('service') and advertise_calculate_type='cpc_flownext' then proportion_expend_amount end)  cpc_proportion_expend_amount,---    当天cpc总收入(含返点)
sum(case when advertise_type = 'cpc' AND advertise_business_type in('service') and advertise_calculate_type='cpc_flownext' then proportion_expend_recharge_amount end)   cpc_proportion_expend_recharge_amount,---    当天cpc收入(不含返点)
SUM(CASE
      WHEN advertise_type = 'cpc' AND advertise_calculate_type = 'cpc_flownext' THEN
       proportion_expend_amount
      WHEN advertise_type = 'cpt' AND advertise_calculate_type = 'cpt_schedule' THEN
       proportion_expend_amount
      WHEN advertise_type IN ('browse', 'message', 'valueadded','rechargededuction') THEN
       proportion_expend_amount
      WHEN advertise_type = 'adjustment' AND advertise_calculate_type ='adjustment_flow' THEN
       proportion_expend_amount
      ELSE
       0
    END) tol_proportion_expend_amount --等比例返点消耗总金额

from ml.ml_c_ct_mc_merchantadclassify_indic_d
where partition_day>='{start_date}' AND partition_day <'{end_date}'
group by partition_day
 """.format(partition_day=yesterday_str, end_date=today_str, start_date=yesterday_str)

    print(cpc_income_sql)
    cpc_income_df = spark.sql(cpc_income_sql)
    cpc_income_df.createOrReplaceTempView("cpc_income")
    cpc_income_df.show(1)
    sql_res = cpc_income_df.collect()
    for res in sql_res:
        cpc_click_num = res.cpc_click_num
        cpc_proportion_expend_amount = res.cpc_proportion_expend_amount
        cpc_proportion_expend_recharge_amount = res.cpc_proportion_expend_recharge_amount
        tol_proportion_expend_amount = res.tol_proportion_expend_amount
        print(res)


    #
    # out_put_sql = """
    # select bus_detail.referrer_search_welfare_pv / dau_view.dau as pv_div_dau,
    #  bus_detail.referrer_search_welfare_pv / quanzhong_dau_view.quanzhong_dau as pv_div_quanzhong_dau,
    #  (cpc_income.cpt_click_num + cpc_income.cpc_click_num) / bus_detail.referrer_search_welfare_pv as ad_flow_rat,
    #  cpc_income.cpc_proportion_expend_amount/cpc_budget.budget as budget_consumption_rate,
    #  cpc_income.cpc_proportion_expend_recharge_amount/cpc_income.cpc_click_num as cpc_item_pricing,
    #  cpc_income.tol_proportion_expend_amount as tol_proportion_expend_amount
    # """
    print(referrer_search_welfare_pv,welfare_pv)
    pv_div_dau = welfare_pv/dau
    pv_div_quanzhong_dau = welfare_pv/quanzhong_dau
    search_pv_div_all_pv = referrer_search_welfare_pv / welfare_pv
    ad_flow_rat = (pv + cpc_click_num) / welfare_pv
    budget_consumption_rate = cpc_proportion_expend_amount/budget
    cpc_item_pricing = cpc_proportion_expend_recharge_amount/cpc_click_num
    cpc_flow_rat = cpc_click_num / welfare_pv
    # tol_proportion_expend_amount
    db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
                         db='jerry_prod')
    cursor = db.cursor()
    partition_date = yesterday_str
    pid = hashlib.md5(partition_date.encode("utf8")).hexdigest()
    cpc_daily_click_sql = """replace into ecommerce_income_report(
    pv_div_dau,pv_div_quanzhong_dau,ad_flow_rat,budget_consumption_rate,cpc_item_pricing,tol_proportion_expend_amount,partition_day,day_id,pid,search_pv_div_all_pv,cpc_flow_rat) VALUES(
    {pv_div_dau},{pv_div_quanzhong_dau},{ad_flow_rat},{budget_consumption_rate},{cpc_item_pricing},{tol_proportion_expend_amount},'{partition_day}','{day_id}','{pid}',{search_pv_div_all_pv},{cpc_flow_rat});""".format(
        pv_div_dau=pv_div_dau,pv_div_quanzhong_dau=pv_div_quanzhong_dau,ad_flow_rat=ad_flow_rat,budget_consumption_rate=budget_consumption_rate,
        cpc_item_pricing=cpc_item_pricing,tol_proportion_expend_amount=tol_proportion_expend_amount,partition_day=today_str,search_pv_div_all_pv=search_pv_div_all_pv,
        day_id=today_str,pid=pid,cpc_flow_rat=cpc_flow_rat
    )
    print(cpc_daily_click_sql)
    # cursor.execute("set names 'UTF8'")
    res = cursor.execute(cpc_daily_click_sql)
    db.commit()
    print(res)
# cursor.executemany()
db.close()