import pymysql from pyspark.conf import SparkConf import pytispark.pytispark as pti from pyspark.sql import SparkSession import datetime import pandas as pd import time from pyspark import StorageLevel def all_click(x): total = [] sum = 0 date = (datetime.date.today() - datetime.timedelta(days=x)).strftime("%Y%m%d") print("美购搜索点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates " "where partition_date='{}'and action = 'search_result_welfare_click_item' " "and app['version'] >='7.14.0'" .format(date)).rdd.map(lambda x: x[0]).collect()[0] total.append(tmp) sum = sum + tmp print("美购首页相关推荐") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates " "where partition_date='{}'and action = 'goto_welfare_detail' " "and app['version'] >='7.14.0' and params['from'] = 'welfare_home_list_item'" .format(date)).rdd.map(lambda x: x[0]).collect()[0] total.append(tmp) sum = sum + tmp home_page_sum = 0 print("首页点击'全部'icon按钮进入的列表-美购卡片点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' " "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' " "and params['from'] = 'welfare_list' and params['cpc_referer'] = '6'" .format(date)).rdd.map(lambda x: x[0]).collect()[0] home_page_sum = home_page_sum + tmp print("首页点击icon进入的列表-美购卡片点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates " "where partition_date='{}'and action = 'goto_welfare_detail' " "and app['version'] >='7.14.0' " "and params['from'] = 'category' and params['cpc_referer'] = '19'" .format(date)).rdd.map(lambda x: x[0]).collect()[0] home_page_sum = home_page_sum + tmp total.append(home_page_sum) sum = sum + home_page_sum meigou_homepage_sum = 0 print("美购首页'全部'点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' " "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' " "and params['from'] = 'welfare_list' and params['cpc_referer'] = '21'" .format(date)).rdd.map(lambda x: x[0]).collect()[0] meigou_homepage_sum = meigou_homepage_sum + tmp print("美购首页icon美购点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' " "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' " "and params['from'] = 'welfare_list' and params['cpc_referer'] = '18'" .format(date)).rdd.map(lambda x: x[0]).collect()[0] meigou_homepage_sum = meigou_homepage_sum + tmp total.append(meigou_homepage_sum) sum = sum + meigou_homepage_sum total.append(sum) return total def cpc_click(x): total = [] sum = 0 date = (datetime.date.today() - datetime.timedelta(days=x)).strftime("%Y%m%d") print("美购搜索点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates " "where partition_date='{}'and action = 'search_result_welfare_click_item' " "and app['version'] >='7.14.0' and params['is_cpc'] = '1'" .format(date)).rdd.map(lambda x: x[0]).collect()[0] total.append(tmp) sum = sum + tmp print("美购首页相关推荐") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates " "where partition_date='{}'and action = 'goto_welfare_detail' " "and app['version'] >='7.14.0' and params['from'] = 'welfare_home_list_item' " "and params['is_cpc'] = '1'" .format(date)).rdd.map(lambda x: x[0]).collect()[0] total.append(tmp) sum = sum + tmp home_page_sum = 0 print("首页点击'全部'icon按钮进入的列表-美购卡片点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' " "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' " "and params['from'] = 'welfare_list' and params['cpc_referer'] = '6' " "and params['is_cpc'] = '1'" .format(date)).rdd.map(lambda x: x[0]).collect()[0] home_page_sum = home_page_sum + tmp print("首页点击icon进入的列表-美购卡片点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates " "where partition_date='{}'and action = 'goto_welfare_detail' " "and app['version'] >='7.14.0' " "and params['from'] = 'category' and params['cpc_referer'] = '19' " "and params['is_cpc'] = '1'" .format(date)).rdd.map(lambda x: x[0]).collect()[0] home_page_sum = home_page_sum + tmp total.append(home_page_sum) sum = sum + home_page_sum meigou_home_sum = 0 print("美购首页'全部'点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' " "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' " "and params['from'] = 'welfare_list' and params['cpc_referer'] = '21' " "and params['is_cpc'] = '1'" .format(date)).rdd.map(lambda x: x[0]).collect()[0] meigou_home_sum = meigou_home_sum + tmp print("美购首页icon美购点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' " "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' " "and params['from'] = 'welfare_list' and params['cpc_referer'] = '18' " "and params['is_cpc'] = '1'" .format(date)).rdd.map(lambda x: x[0]).collect()[0] meigou_home_sum = meigou_home_sum + tmp total.append(meigou_home_sum) sum = sum + meigou_home_sum total.append(sum) return total def os_all_click(x,os): total = [] sum = 0 date = (datetime.date.today() - datetime.timedelta(days=x)).strftime("%Y%m%d") print("美购搜索点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates " "where partition_date='{}'and action = 'search_result_welfare_click_item' " "and app['version'] >='7.14.0' and device['device_type'] = '{}'" .format(date,os)).rdd.map(lambda x: x[0]).collect()[0] total.append(tmp) sum = sum + tmp print("美购首页相关推荐") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates " "where partition_date='{}'and action = 'goto_welfare_detail' " "and app['version'] >='7.14.0' and params['from'] = 'welfare_home_list_item' " "and device['device_type'] = '{}'" .format(date,os)).rdd.map(lambda x: x[0]).collect()[0] total.append(tmp) sum = sum + tmp home_page_sum = 0 print("首页点击'全部'icon按钮进入的列表-美购卡片点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' " "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' " "and params['from'] = 'welfare_list' and params['cpc_referer'] = '6' " "and device['device_type'] = '{}'" .format(date,os)).rdd.map(lambda x: x[0]).collect()[0] home_page_sum = home_page_sum + tmp print("首页点击icon进入的列表-美购卡片点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates " "where partition_date='{}'and action = 'goto_welfare_detail' " "and app['version'] >='7.14.0' " "and params['from'] = 'category' and params['cpc_referer'] = '19' " "and device['device_type'] = '{}'" .format(date,os)).rdd.map(lambda x: x[0]).collect()[0] home_page_sum = home_page_sum + tmp total.append(home_page_sum) sum = sum + home_page_sum meigou_homepage_sum = 0 print("美购首页'全部'点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' " "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' " "and params['from'] = 'welfare_list' and params['cpc_referer'] = '21' " "and device['device_type'] = '{}'" .format(date,os)).rdd.map(lambda x: x[0]).collect()[0] meigou_homepage_sum = meigou_homepage_sum + tmp print("美购首页icon美购点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' " "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' " "and params['from'] = 'welfare_list' and params['cpc_referer'] = '18' " "and device['device_type'] = '{}'" .format(date,os)).rdd.map(lambda x: x[0]).collect()[0] meigou_homepage_sum = meigou_homepage_sum + tmp total.append(meigou_homepage_sum) sum = sum + meigou_homepage_sum total.append(sum) return total def os_cpc_click(x,os): total = [] sum = 0 date = (datetime.date.today() - datetime.timedelta(days=x)).strftime("%Y%m%d") print("美购搜索点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates " "where partition_date='{}'and action = 'search_result_welfare_click_item' " "and app['version'] >='7.14.0' and params['is_cpc'] = '1' " "and device['device_type'] = '{}'" .format(date,os)).rdd.map(lambda x: x[0]).collect()[0] total.append(tmp) sum = sum + tmp print("美购首页相关推荐") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates " "where partition_date='{}'and action = 'goto_welfare_detail' " "and app['version'] >='7.14.0' and params['from'] = 'welfare_home_list_item' " "and params['is_cpc'] = '1' " "and device['device_type'] = '{}'" .format(date,os)).rdd.map(lambda x: x[0]).collect()[0] total.append(tmp) sum = sum + tmp home_page_sum = 0 print("首页点击'全部'icon按钮进入的列表-美购卡片点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' " "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' " "and params['from'] = 'welfare_list' and params['cpc_referer'] = '6' " "and params['is_cpc'] = '1' and device['device_type'] = '{}'" .format(date,os)).rdd.map(lambda x: x[0]).collect()[0] home_page_sum = home_page_sum + tmp print("首页点击icon进入的列表-美购卡片点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates " "where partition_date='{}'and action = 'goto_welfare_detail' " "and app['version'] >='7.14.0' " "and params['from'] = 'category' and params['cpc_referer'] = '19' " "and params['is_cpc'] = '1' and device['device_type'] = '{}'" .format(date,os)).rdd.map(lambda x: x[0]).collect()[0] home_page_sum = home_page_sum + tmp total.append(home_page_sum) sum = sum + home_page_sum meigou_home_sum = 0 print("美购首页'全部'点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' " "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' " "and params['from'] = 'welfare_list' and params['cpc_referer'] = '21' " "and params['is_cpc'] = '1' and device['device_type'] = '{}'" .format(date,os)).rdd.map(lambda x: x[0]).collect()[0] meigou_home_sum = meigou_home_sum + tmp print("美购首页icon美购点击") tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' " "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' " "and params['from'] = 'welfare_list' and params['cpc_referer'] = '18' " "and params['is_cpc'] = '1' and device['device_type'] = '{}'" .format(date,os)).rdd.map(lambda x: x[0]).collect()[0] meigou_home_sum = meigou_home_sum + tmp total.append(meigou_home_sum) sum = sum + meigou_home_sum total.append(sum) return total if __name__ == '__main__': sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ .set("spark.tispark.plan.allow_index_double_read", "false") \ .set("spark.tispark.plan.allow_index_read", "true") \ .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \ .set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \ .set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy") spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() for os in ["ios","android"]: all_list = [] for i in range(1,21): date_str = (datetime.date.today() - datetime.timedelta(days=i)).strftime("%Y%m%d") tmp_list = [date_str] tmp_list.extend(os_all_click(i,os)) tmp_list.extend(os_cpc_click(i,os)) all_list.append(tmp_list) df = pd.DataFrame(all_list) df = df.rename(columns={0: "date",1: "search", 2: "xiangguan",3:"home",4:"service_home", 5: "all_clcik", 6: "cpc_search", 7: "cpc_xiangguan",8:"cpc_home",9:"cpc_service_home", 10:"cpc_all"}) df.to_csv('/home/gmuser/cpc_{}.csv'.format(os), index=False) # all_list = [] # for i in range(1, 4): # date_str = (datetime.date.today() - datetime.timedelta(days=i)).strftime("%Y%m%d") # tmp_list = [date_str] # tmp_list.extend(all_click(i)) # tmp_list.extend(cpc_click(i)) # all_list.append(tmp_list) # # df = pd.DataFrame(all_list) # # df = df.rename(columns={0: "date",1: "search", 2: "xiangguan",3:"home",4:"service_home", # 5: "all_clcik", # 6: "cpc_search", 7: "cpc_xiangguan",8:"cpc_home",9:"cpc_service_home", # 10:"cpc_all"}) # df.to_csv('/home/gmuser/cpc_1011.csv', index=False) spark.stop()