Commit c30fc64e authored by 王志伟's avatar 王志伟
parents 5bae6a85 9bdab612
import datetime import datetime
from pyspark.sql import HiveContext # from pyspark.sql import HiveContext
from pyspark.context import SparkContext # from pyspark.context import SparkContext
from pyspark.conf import SparkConf # from pyspark.conf import SparkConf
from pyspark.sql import SQLContext # from pyspark.sql import SQLContext
from pyspark.sql import SparkSession # from pyspark.sql import SparkSession
from py4j.java_gateway import java_import # from py4j.java_gateway import java_import
import pytispark.pytispark as pti # import pytispark.pytispark as pti
import pandas as pd import pandas as pd
import pymysql import pymysql
...@@ -25,35 +25,35 @@ def con_sql(db,sql): ...@@ -25,35 +25,35 @@ def con_sql(db,sql):
return df return df
def test(): # def test():
conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf") # conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf")
sc = SparkContext(conf = conf) # sc = SparkContext(conf = conf)
hive_context = HiveContext(sc) # hive_context = HiveContext(sc)
hive_context.sql(''' select device["device_type"] from online.tl_hdfs_maidian_view # hive_context.sql(''' select device["device_type"] from online.tl_hdfs_maidian_view
where partition_date = '20181012' and action = "page_view" # where partition_date = '20181012' and action = "page_view"
and params["page_name"] = "diary_detail" and params["referrer"] = "home" limit 10 ''').show(6) # and params["page_name"] = "diary_detail" and params["referrer"] = "home" limit 10 ''').show(6)
def esmm_pre(): # def esmm_pre():
yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d") # yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
print(yesterday) # print(yesterday)
#
spark = SparkSession.builder.enableHiveSupport().getOrCreate() # spark = SparkSession.builder.enableHiveSupport().getOrCreate()
# gw = SparkContext._gateway # # gw = SparkContext._gateway
# # #
# # Import TiExtensions # # # Import TiExtensions
# java_import(gw.jvm, "org.apache.spark.sql.TiContext") # # java_import(gw.jvm, "org.apache.spark.sql.TiContext")
#
# Inject TiExtensions, and get a TiContext # # Inject TiExtensions, and get a TiContext
# ti = gw.jvm.TiExtensions.getInstance(spark._jsparkSession).getOrCreateTiContext(spark._jsparkSession) # # ti = gw.jvm.TiExtensions.getInstance(spark._jsparkSession).getOrCreateTiContext(spark._jsparkSession)
ti = pti.TiContext(spark) # ti = pti.TiContext(spark)
#
ti.tidbMapDatabase("jerry_test") # ti.tidbMapDatabase("jerry_test")
#
# sql("use tpch_test") # # sql("use tpch_test")
spark.sql("select count(*) from esmm_pre_data").show(6) # spark.sql("select count(*) from esmm_pre_data").show(6)
#
# conf = SparkConf().setAppName("esmm_pre").set("spark.io.compression.codec", "lzf") # # conf = SparkConf().setAppName("esmm_pre").set("spark.io.compression.codec", "lzf")
# #
# spark.sql(""" # spark.sql("""
# select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from (select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix # select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from (select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
...@@ -63,10 +63,9 @@ def esmm_pre(): ...@@ -63,10 +63,9 @@ def esmm_pre():
if __name__ == '__main__': if __name__ == '__main__':
try:
db = pymysql.connect(host='10.66.157.11', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='10.66.157.11', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
except Exception:
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data" sql = "select max(stat_date) from esmm_train_data"
validate_date = con_sql(db, sql)[0].values.tolist()[0] validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date) print("validate_date:" + validate_date)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment