# -*- coding: utf-8 -*-
# from pyspark.context import SparkContext
# from pyspark.conf import SparkConf
# import pytispark.pytispark as pti
# from pyspark.sql import SparkSession
import pandas as pd
import pymysql
from sqlalchemy import create_engine



def test():
        conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf")
        sc = SparkContext(conf = conf)
        spark = SparkSession.builder.enableHiveSupport().getOrCreate()
        # ti = pti.TiContext(spark)
        # ti.tidbMapDatabase("jerry_test")

        spark = SparkSession.builder.appName("hello test").enableHiveSupport().getOrCreate()

        spark.sql("use online")
        spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
        spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
        spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
        spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")

        # hive_context.sql("SET mapreduce.job.queuename=data")
        # hive_context.sql("SET mapred.input.dir.recursive=true")
        # hive_context.sql("SET hive.mapred.supports.subdirectories=true")
        sql = "select user_id from online.tl_hdfs_maidian_view where partition_date = '20190412' limit 10"
        spark.sql(sql).show(6)



def con_sql(db,sql):
    cursor = db.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
    df = pd.DataFrame(list(result))
    db.close()
    return df

if __name__ == '__main__':
        # sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
        #         .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
        #         .set("spark.tispark.plan.allow_index_double_read", "false") \
        #         .set("spark.tispark.plan.allow_index_read", "true") \
        #         .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
        #         .set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf") \
        #         .set("spark.driver.maxResultSize", "8g")
        #
        # spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
        # ti = pti.TiContext(spark)
        # ti.tidbMapDatabase("jerry_test")
        # spark.sparkContext.setLogLevel("WARN")
        # sql = "select stat_date,cid_id,y,ccity_name from esmm_train_data limit 60"
        # spark.sql(sql).show(6)
        sql = "select level2_id,concat('t',treatment_method)," \
              "concat('min',price_min),concat('max',price_max)," \
              "concat('tr',treatment_time),concat('m',maintain_time)," \
              "concat('r',recover_time) from jerry_test.train_Knowledge_network_data"
        db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
        df = con_sql(db, sql)
        df = df.rename(columns={0: "level2_id", 1: "treatment_method",2:"price_min",3:"price_max",4:"treatment_time",
                           5:"maintain_time",6:"recover_time"})
        print(df.head(6))
        host = '172.16.40.158'
        port = 4000
        user = 'root'
        password = '3SYz54LS9#^9sBvC'
        db = 'jerry_test'
        charset = 'utf8'
        engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
        df.to_sql('knowledge', con=engine, if_exists='append', index=False, chunksize=8000)
        print("insert done")