1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# -*- coding: utf-8 -*-
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
import pytispark.pytispark as pti
from pyspark.sql import SparkSession
import numpy as np
def test():
conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf")
sc = SparkContext(conf = conf)
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
# ti = pti.TiContext(spark)
# ti.tidbMapDatabase("jerry_test")
spark = SparkSession.builder.appName("hello test").enableHiveSupport().getOrCreate()
spark.sql("use online")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
# hive_context.sql("SET mapreduce.job.queuename=data")
# hive_context.sql("SET mapred.input.dir.recursive=true")
# hive_context.sql("SET hive.mapred.supports.subdirectories=true")
sql = "select user_id from online.tl_hdfs_maidian_view where partition_date = '20190412' limit 10"
spark.sql(sql).show(6)
def some_function(x):
# Use the libraries to do work
return np.sin(x)**2 + 2
if __name__ == '__main__':
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
# ti = pti.TiContext(spark)
# ti.tidbMapDatabase("jerry_test")
# spark.sparkContext.setLogLevel("WARN")
# sql = "select stat_date,cid_id,y,ccity_name from esmm_train_data limit 60"
# spark.sql(sql).show(6)