Commit f0771557 authored by 张彦钊's avatar 张彦钊

change

parent 88c5c57f
import pymysql
from pyspark.conf import SparkConf
import pytispark.pytispark as pti
from pyspark.sql import SparkSession
import datetime
import pandas as pd
import time
from pyspark import StorageLevel
def click(x):
total = []
cpc = []
date = (datetime.date.today() - datetime.timedelta(days=x)).strftime("%Y%m%d")
tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
"where partition_date='{}' "
"and action = 'search_result_welfare_click_item' "
"and app['version'] in ('7.15.0','7.14.0')".format(date)).rdd.map(lambda x:x[0]).collect()
print(tmp)
# spark.sql(
# s
# """
# |select count(*) from online.bl_hdfs_maidian_updates where partition_date='$date'
# |and action = "search_result_welfare_click_item" and app["version"] in ('7.15.0','7.14.0')
# |and params["is_cpc"] = "1"
# """.stripMargin).show(6)
#
# println("首页点全部icon进入的列表-美购卡片点击")
# spark.sql(
# s
# """
# |select count(*) from online.bl_hdfs_maidian_updates where partition_date='$date'
# |and action = "goto_welfare_detail" and app["version"] in ('7.15.0','7.14.0')
# |and params["from"] = "welfare_list" and params["cpc_referer"] = "6"
# """.stripMargin).show()
#
# spark.sql(
# s
# """
# |select count(*) from online.bl_hdfs_maidian_updates where partition_date='$date'
# |and action = "goto_welfare_detail" and app["version"] in ('7.15.0','7.14.0')
# |and params["from"] = "welfare_list" and params["cpc_referer"] = "6"
# |and params["is_cpc"] = "1"
# """.stripMargin).show()
#
# println("首页点击icon进入的列表-美购卡片点击")
# spark.sql(
# s
# """
# |select count(*) from online.bl_hdfs_maidian_updates where partition_date='$date'
# |and action = "goto_welfare_detail" and app["version"] in ('7.15.0','7.14.0')
# |and params["from"] = "category" and params["cpc_referer"] = "19"
# """.stripMargin).show()
# spark.sql(
# s
# """
# |select count(*) from online.bl_hdfs_maidian_updates where partition_date='$date'
# |and action = "goto_welfare_detail" and app["version"] in ('7.15.0','7.14.0')
# |and params["from"] = "category" and params["cpc_referer"] = "19"
# |and params["is_cpc"] = "1"
# """.stripMargin).show()
#
# println("美购首页全部点击")
# spark.sql(
# s
# """
# |select count(*) from online.bl_hdfs_maidian_updates where partition_date='$date'
# |and action = "goto_welfare_detail" and app["version"] in ('7.15.0','7.14.0')
# |and params["from"] = "welfare_list" and params["cpc_referer"] = "21"
# """.stripMargin).show()
# spark.sql(
# s
# """
# |select count(*) from online.bl_hdfs_maidian_updates where partition_date='$date'
# |and action = "goto_welfare_detail" and app["version"] in ('7.15.0','7.14.0')
# |and params["from"] = "welfare_list" and params["cpc_referer"] = "21"
# |and params["is_cpc"] = "1"
# """.stripMargin).show()
#
# println("美购首页icon美购点击")
# spark.sql(
# s
# """
# |select count(*) from online.bl_hdfs_maidian_updates where partition_date='$date'
# |and action = "goto_welfare_detail" and app["version"] in ('7.15.0','7.14.0')
# |and params["from"] = "welfare_list" and params["cpc_referer"] = "18"
# """.stripMargin).show()
# spark.sql(
# s
# """
# |select count(*) from online.bl_hdfs_maidian_updates where partition_date='$date'
# |and action = "goto_welfare_detail" and app["version"] in ('7.15.0','7.14.0')
# |and params["from"] = "welfare_list" and params["cpc_referer"] = "18"
# |and params["is_cpc"] = "1"
# """.stripMargin).show()
#
# println("美购首页相关推荐")
# spark.sql(
# s
# """
# |select count(*) from online.bl_hdfs_maidian_updates where partition_date='$date'
# |and action = "goto_welfare_detail" and app["version"] in ('7.15.0','7.14.0')
# |and params["from"] = "welfare_home_list_item"
# """.stripMargin).show()
# spark.sql(
# s
# """
# |select count(*) from online.bl_hdfs_maidian_updates where partition_date='$date'
# |and action = "goto_welfare_detail" and app["version"] in ('7.15.0','7.14.0')
# |and params["from"] = "welfare_home_list_item"
# |and params["is_cpc"] = "1"
# """.stripMargin).show()
# return sum
if __name__ == '__main__':
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
click(1)
spark.stop()
......@@ -36,7 +36,7 @@ import redis
# if sum % 1000 == 0:
# print(sum)
def scan_key():
def diary_key():
r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
cursor = 0
while True:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment