import pymysql
from pyspark.conf import SparkConf
import pytispark.pytispark as pti
from pyspark.sql import SparkSession
import datetime
import pandas as pd
import time
from pyspark import StorageLevel


def position(x,y):
    sum = 0
    for i in x:
        if 'absolute_position' in i:
            if int(i['absolute_position']) <= y:
                sum = sum + 1
    return sum

def cpc(x,y):
    sum = 0
    for i in x:
        if ("is_cpc" in i) and ('absolute_position' in i):
            if (int(i['absolute_position']) <= y) and (i["is_cpc"] == 1):
                sum = sum + 1
    return sum



if __name__ == '__main__':
    sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
        .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
        .set("spark.tispark.plan.allow_index_double_read", "false") \
        .set("spark.tispark.plan.allow_index_read", "true") \
        .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
        .set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \
        .set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")

    spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
    sql = "select params['exposure_cards'] from online.ml_community_precise_exposure_detail " \
          "where action = 'page_precise_exposure' and page_name = 'search_result_welfare' " \
          "AND partition_date='20190926'"
    df = spark.sql(sql).rdd

    df.persist()
    total = []
    rdd = df.map(lambda x:("a",position(eval(x[0]),10))).reduceByKey(lambda x,y:x+y).map(lambda x:x[1])
    tmp = rdd.collect()[0]
    total.append(tmp)
    rdd = df.map(lambda x: ("a", position(eval(x[0]), 30))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1])
    tmp = rdd.collect()[0]
    total.append(tmp)
    rdd = df.map(lambda x: ("a", position(eval(x[0]), 50))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1])
    tmp = rdd.collect()[0]
    total.append(tmp)

    rdd = df.map(lambda x: ("a", cpc(eval(x[0]), 10))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1])
    tmp = rdd.collect()[0]
    total.append(tmp)

    rdd = df.map(lambda x: ("a", cpc(eval(x[0]), 30))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1])
    tmp = rdd.collect()[0]
    total.append(tmp)

    rdd = df.map(lambda x: ("a", cpc(eval(x[0]), 50))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1])
    tmp = rdd.collect()[0]
    total.append(tmp)

    rdd = df.map(lambda x: ("a", len(eval(x[0])))).reduceByKey(lambda x, y: x + y).map(lambda x: x[1])
    tmp = rdd.collect()[0]
    total.append(tmp)

    print("total")
    print(total)
    df.unpersist()
    spark.stop()