Commit d7c5f38e authored by 王志伟's avatar 王志伟
parents 864101b7 822315f0
# -*- coding: utf-8 -*-
import datetime
from pyspark.sql import HiveContext from pyspark.sql import HiveContext
from pyspark.context import SparkContext from pyspark.context import SparkContext
from pyspark.conf import SparkConf from pyspark.conf import SparkConf
...@@ -7,30 +6,43 @@ from pyspark.sql import SQLContext ...@@ -7,30 +6,43 @@ from pyspark.sql import SQLContext
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
# from py4j.java_gateway import java_import # from py4j.java_gateway import java_import
# import pytispark.pytispark as pti # import pytispark.pytispark as pti
import pandas as pd # import pandas as pd
import pymysql #
# def con_sql(db,sql):
def con_sql(db,sql): # cursor = db.cursor()
cursor = db.cursor() # try:
try: # cursor.execute(sql)
cursor.execute(sql) # result = cursor.fetchall()
result = cursor.fetchall() # df = pd.DataFrame(list(result))
df = pd.DataFrame(list(result)) # except Exception:
except Exception: # print("发生异常", Exception)
print("发生异常", Exception) # df = pd.DataFrame()
df = pd.DataFrame() # finally:
finally: # db.close()
db.close() # return df
return df
def test(): def test():
conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf") conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf")
sc = SparkContext(conf = conf) sc = SparkContext(conf = conf)
hive_context = HiveContext(sc) hive_context = HiveContext(sc)
hive_context.sql(''' select device["device_type"] from online.tl_hdfs_maidian_view
where partition_date = '20181012' and action = "page_view" from pyspark.sql import SparkSession
and params["page_name"] = "diary_detail" and params["referrer"] = "home" limit 10 ''').show(6)
spark = SparkSession.builder.appName("BinarizerExample").enableHiveSupport().getOrCreate()
spark.sql("use online")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
# hive_context.sql("SET mapreduce.job.queuename=data")
# hive_context.sql("SET mapred.input.dir.recursive=true")
# hive_context.sql("SET hive.mapred.supports.subdirectories=true")
sql = "select user_id from online.tl_hdfs_maidian_view where partition_date = '20190412' limit 10"
spark.sql(sql).show(6)
# def esmm_pre(): # def esmm_pre():
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment