Commit 21502e22 authored by 王志伟's avatar 王志伟
parents 9304c9d5 0012f840
# from __future__ import print_function
# import datetime
# import time
# import pymysql
# from pyspark.sql import SparkSession
# from pyspark.sql import SQLContext
# from pyspark import SparkConf,SparkContext
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function from __future__ import print_function
import datetime from pyspark.context import SparkContext
import time from pyspark.conf import SparkConf
import pymysql # from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext from pyspark.sql import SQLContext
# from pyspark.streaming.kafka import KafkaUtils
# import argparse
# import time
# from datetime import datetime
def fetch_data(start_date, end_date):
sc = SparkSession.builder.appName("Python Spark SQL basic example") \
.config('spark.some.config,option0', 'some-value') \
.getOrCreate()
ctx = SQLContext(sc)
df = ctx.read.format("jdbc").options(url="jdbc:mysql://rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com:3306/doris_prod",
driver="com.mysql.jdbc.Driver",
dbtable="device diary_queue",
user="doris", password="o5gbA27hXHHm").load()
# df = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000/jerry_prod",
# driver="com.mysql.jdbc.Driver",
# dbtable = "data_feed_click",
# user="root",password = "3SYz54LS9#^9sBvC").load()
df.show(6)
# url = "jdbc:mysql://10.66.157.22:4000/jerry_prod" # def fetch_data(start_date, end_date):
# table = "data_feed_click" # # sc = SparkSession.builder.appName("Python Spark SQL basic example") \
# properties = {"user": "root", "password": "3SYz54LS9#^9sBvC"} # # .config('spark.some.config,option0', 'some-value') \
# df = sqlContext.read.jdbc(url, table, properties) # # .getOrCreate()
# sc = SparkContext(conf=SparkConf().setAppName("mnist_streaming"))
# ctx = SQLContext(sc)
# # jdbcDf = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000",
# # driver="com.mysql.jdbc.Driver",
# # # dbtable="((select device_id,cid_id,time,device_type,city_id,1 as clicked from jerry_test.data_feed_click where cid_id in (select id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5) and cid_type = 'diary' and stat_date = '2018-08-12') union (select device_id,cid_id,time,device_type,city_id,0 as clicked from jerry_test.data_feed_exposure where cid_id in (select id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5) and cid_type = 'diary' and stat_date = '2018-08-12')) tmp",user="root",
# # dbtable="(select id as diary_id,doctor_id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5 and datediff(current_date,created_time)<90) tmp",
# # user="root",
# # password="").load()
# # df = ctx.read.format("jdbc").options(url="jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com:3306/doris_test",
# # driver="com.mysql.jdbc.Driver",
# # dbtable="device diary_queue",
# # user="work", password="workwork").load()
# # df = ctx.read.format("jdbc").options(url="jdbc:mysql://rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com:3306/doris_prod",
# # driver="com.mysql.jdbc.Driver",
# # dbtable="device diary_queue",
# # user="doris", password="o5gbA27hXHHm").load()
#
# jdbcDf = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000",
# driver="com.mysql.jdbc.Driver",
# dbtable = "(select device_id from data_feed_click limit 8) tmp",
# user="root",password = "3SYz54LS9#^9sBvC").load()
# jdbcDf.show(6)
#
# # url = "jdbc:mysql://10.66.157.22:4000/jerry_prod"
# # table = "data_feed_click"
# # properties = {"user": "root", "password": "3SYz54LS9#^9sBvC"}
# # df = sqlContext.read.jdbc(url, table, properties)
...@@ -105,7 +130,19 @@ def fetch_data(start_date, end_date): ...@@ -105,7 +130,19 @@ def fetch_data(start_date, end_date):
if __name__ == "__main__": if __name__ == "__main__":
fetch_data("2018-11-11","2018-11-12")
sc = SparkContext(
conf=SparkConf().setAppName("mnist_streaming").set("spark.streaming.kafka.maxRatePerPartition", 100))
ctx = SQLContext(sc)
jdbcDf = ctx.read.format("jdbc").options(url="jdbc:mysql://rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com:3306/doris_prod",
driver="com.mysql.jdbc.Driver",
# dbtable="((select device_id,cid_id,time,device_type,city_id,1 as clicked from jerry_test.data_feed_click where cid_id in (select id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5) and cid_type = 'diary' and stat_date = '2018-08-12') union (select device_id,cid_id,time,device_type,city_id,0 as clicked from jerry_test.data_feed_exposure where cid_id in (select id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5) and cid_type = 'diary' and stat_date = '2018-08-12')) tmp",user="root",
dbtable="(select device_id from device_diary_queue limit 8) tmp",
user="doris",
password="o5gbA27hXHHm"
).load()
jdbcDf.show(6)
# fetch_data("2018-11-11","2018-11-12")
# from pyspark.context import SparkContext # from pyspark.context import SparkContext
# from pyspark.conf import SparkConf # from pyspark.conf import SparkConf
# from tensorflowonspark import TFCluster # from tensorflowonspark import TFCluster
......
from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
def get_data():
sc = SparkContext(conf=SparkConf().setAppName("multi_task")).getOrCreate()
ctx = SQLContext(sc)
jdbcDf = ctx.read.format("jdbc").options(url="jdbc:mysql://10.66.157.22:4000/jerry_prod",
driver="com.mysql.jdbc.Driver",
dbtable="(select device_id from data_feed_click limit 8) tmp",
user="root",
password="3SYz54LS9#^9sBvC").load()
jdbcDf.show(6)
if __name__ == '__main__':
get_data()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment