Commit 27a72180 authored by 张彦钊's avatar 张彦钊

change test file

parent 43c66694
# -*- coding: utf-8 -*-
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
# import pytispark.pytispark as pti
# from pyspark.sql import SQLContext
import pytispark.pytispark as pti
from pyspark.sql import SparkSession
# from py4j.java_gateway import java_import
# import pytispark.pytispark as pti
# import pandas as pd
#
# def con_sql(db,sql):
# cursor = db.cursor()
# try:
# cursor.execute(sql)
# result = cursor.fetchall()
# df = pd.DataFrame(list(result))
# except Exception:
# print("发生异常", Exception)
# df = pd.DataFrame()
# finally:
# db.close()
# return df
def test():
conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf")
sc = SparkContext(conf = conf)
hive_context = HiveContext(sc)
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
# ti = pti.TiContext(spark)
# ti.tidbMapDatabase("jerry_test")
......@@ -46,34 +27,26 @@ def test():
spark.sql(sql).show(6)
# def esmm_pre():
# yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
# print(yesterday)
#
# spark = SparkSession.builder.enableHiveSupport().getOrCreate()
# # gw = SparkContext._gateway
# #
# # # Import TiExtensions
# # java_import(gw.jvm, "org.apache.spark.sql.TiContext")
#
# # Inject TiExtensions, and get a TiContext
# # ti = gw.jvm.TiExtensions.getInstance(spark._jsparkSession).getOrCreateTiContext(spark._jsparkSession)
# ti = pti.TiContext(spark)
#
# ti.tidbMapDatabase("jerry_test")
#
# # sql("use tpch_test")
# spark.sql("select count(*) from esmm_pre_data").show(6)
#
# # conf = SparkConf().setAppName("esmm_pre").set("spark.io.compression.codec", "lzf")
#
# spark.sql("""
# select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from (select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
# union select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue
# union select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='{}'
# """.format(yesterday)).show(6)
if __name__ == '__main__':
test()
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
spark.sparkContext.setLogLevel("WARN")
sql = "select stat_date,cid_id,y,ccity_name from esmm_train_data limit 60"
spark.sql(sql).show(6)
spark.sql("use online")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
sql = "select user_id from online.tl_hdfs_maidian_view where partition_date = '20190412' limit 10"
spark.sql(sql).show(6)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment