Commit fe78edef authored by 张彦钊's avatar 张彦钊

修改测试文件

parent aba390e4
...@@ -5,6 +5,7 @@ from pyspark.context import SparkContext ...@@ -5,6 +5,7 @@ from pyspark.context import SparkContext
from pyspark.conf import SparkConf from pyspark.conf import SparkConf
from pyspark.sql import SQLContext from pyspark.sql import SQLContext
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
def test(): def test():
...@@ -19,13 +20,23 @@ def test(): ...@@ -19,13 +20,23 @@ def test():
def esmm_pre(): def esmm_pre():
yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d") yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
print(yesterday) print(yesterday)
gw = SparkContext._gateway
java_import(gw.jvm, "org.apache.spark.sql.TiExtensions")
conf = SparkConf().setAppName("esmm_pre").set("spark.io.compression.codec", "lzf") conf = SparkConf().setAppName("esmm_pre").set("spark.io.compression.codec", "lzf")
spark = SparkSession.builder.getOrCreate() spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()
spark.sql(""" ti = gw.jvm.TiExtensions.getInstance(spark._jsparkSession).getOrCreateTiContext(spark._jsparkSession)
select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from (select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
union select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue # Map database as old api does
union select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='{}' ti.tidbMapDatabase("jerry_test")
""".format(yesterday)).show(6)
# sql("use tpch_test")
spark.sql("select count(*) from esmm_pre_data").show(6)
#
# spark.sql("""
# select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from (select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
# union select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue
# union select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='{}'
# """.format(yesterday)).show(6)
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment