import pymysql from pyspark.conf import SparkConf import pytispark.pytispark as pti from pyspark.sql import SparkSession import datetime import pandas as pd import time from pyspark import StorageLevel def position(x,y): sum = 0 for i in x: if 'absolute_position' in i: if int(i['absolute_position']) <= y: sum = sum + 1 return sum def cpc(x,y): sum = 0 for i in x: if ("is_cpc" in i) and ('absolute_position' in i): if (int(i['absolute_position']) <= y) and (i["is_cpc"] == 1): sum = sum + 1 return sum if __name__ == '__main__': sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ .set("spark.tispark.plan.allow_index_double_read", "false") \ .set("spark.tispark.plan.allow_index_read", "true") \ .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \ .set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \ .set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy") spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() sql = "select params['exposure_cards'] from online.ml_community_precise_exposure_detail " \ "where action = 'page_precise_exposure' and page_name = 'search_result_welfare' " \ "AND partition_date='20190926'" df = spark.sql(sql).rdd df.persist() total = [] rdd = df.map(lambda x:("a",position(eval(x[0]),10))).reduceByKey(lambda x,y:x+y).map(lambda x:x[1]) tmp = rdd.collect()[0] total.append(tmp) rdd = df.map(lambda x: ("a", position(eval(x[0]), 30))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1]) tmp = rdd.collect()[0] total.append(tmp) rdd = df.map(lambda x: ("a", position(eval(x[0]), 50))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1]) tmp = rdd.collect()[0] total.append(tmp) rdd = df.map(lambda x: ("a", cpc(eval(x[0]), 10))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1]) tmp = rdd.collect()[0] total.append(tmp) rdd = df.map(lambda x: ("a", cpc(eval(x[0]), 30))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1]) tmp = rdd.collect()[0] total.append(tmp) rdd = df.map(lambda x: ("a", cpc(eval(x[0]), 50))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1]) tmp = rdd.collect()[0] total.append(tmp) rdd = df.map(lambda x: ("a", len(eval(x[0])))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1]) tmp = rdd.collect()[0] total.append(tmp) print("total") print(total) df.unpersist() spark.stop()