1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import datetime
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
# from py4j.java_gateway import java_import
# import pytispark.pytispark as pti
import pandas as pd
import pymysql
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
# def test():
conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf")
sc = SparkContext(conf = conf)
hive_context = HiveContext(sc)
hive_context.sql(''' select device["device_type"] from online.tl_hdfs_maidian_view
where partition_date = '20181012' and action = "page_view"
and params["page_name"] = "diary_detail" and params["referrer"] = "home" limit 10 ''').show(6)
# def esmm_pre():
# yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
# print(yesterday)
#
# spark = SparkSession.builder.enableHiveSupport().getOrCreate()
# # gw = SparkContext._gateway
# #
# # # Import TiExtensions
# # java_import(gw.jvm, "org.apache.spark.sql.TiContext")
#
# # Inject TiExtensions, and get a TiContext
# # ti = gw.jvm.TiExtensions.getInstance(spark._jsparkSession).getOrCreateTiContext(spark._jsparkSession)
# ti = pti.TiContext(spark)
#
# ti.tidbMapDatabase("jerry_test")
#
# # sql("use tpch_test")
# spark.sql("select count(*) from esmm_pre_data").show(6)
#
# # conf = SparkConf().setAppName("esmm_pre").set("spark.io.compression.codec", "lzf")
#
# spark.sql("""
# select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from (select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
# union select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue
# union select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='{}'
# """.format(yesterday)).show(6)
if __name__ == '__main__':