Commit b32c80e2 authored by 张彦钊's avatar 张彦钊

修改test文件

parent 1f799e57
...@@ -2,17 +2,19 @@ ...@@ -2,17 +2,19 @@
from pyspark.sql import HiveContext from pyspark.sql import HiveContext
from pyspark.context import SparkContext from pyspark.context import SparkContext
from pyspark.conf import SparkConf from pyspark.conf import SparkConf
# import pytispark.pytispark as pti import pytispark.pytispark as pti
# from pyspark.sql import SQLContext # from pyspark.sql import SQLContext
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
import datetime import datetime
def test(): def test():
# .set("spark.tispark.plan.allow_index_double_read", "false") \
# .set("spark.tispark.plan.allow_index_read", "true") \
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true")\ sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true")\
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true")\
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true")
spark = SparkSession.builder.config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")\ spark = SparkSession.builder.config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")\
.config("spark.tispark.pd.addresses", "172.16.40.158:2379")\ .config("spark.tispark.pd.addresses", "172.16.40.158:2379")\
.config(conf= sparkConf).enableHiveSupport().getOrCreate() .config(conf= sparkConf).enableHiveSupport().getOrCreate()
...@@ -23,7 +25,10 @@ def test(): ...@@ -23,7 +25,10 @@ def test():
# spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") # spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sparkContext.setLogLevel("WARN") spark.sparkContext.setLogLevel("WARN")
df = spark.sql("select max(stat_date) from jerry_test.esmm_train_data") ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
df = spark.sql("select max(stat_date) from esmm_train_data")
df.show() df.show()
t = df.rdd.map(lambda x: str(x[0])).collect() t = df.rdd.map(lambda x: str(x[0])).collect()
print(t.count()) print(t.count())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment