Commit 6ba9c40b authored by 张彦钊's avatar 张彦钊

修改测试文件

parent 353a37ab
# from __future__ import print_function
# import datetime
# import time
# import pymysql
# from pyspark.sql import SparkSession
# from pyspark.sql import SQLContext
# from pyspark import SparkConf,SparkContext
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function from __future__ import print_function
import datetime from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.streaming.kafka import KafkaUtils
import argparse
import time import time
from datetime import datetime
import msgpack
from tensorflowonspark import TFCluster
import json
import pymysql import pymysql
from pyspark.sql import SparkSession import tensorflow as tf
from pyspark.sql import SQLContext
from pyspark import SparkConf,SparkContext
def fetch_data(start_date, end_date):
# sc = SparkSession.builder.appName("Python Spark SQL basic example") \
# .config('spark.some.config,option0', 'some-value') \
# .getOrCreate()
sc = SparkContext(conf=SparkConf().setAppName("mnist_streaming"))
ctx = SQLContext(sc)
# jdbcDf = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000",
# driver="com.mysql.jdbc.Driver",
# # dbtable="((select device_id,cid_id,time,device_type,city_id,1 as clicked from jerry_test.data_feed_click where cid_id in (select id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5) and cid_type = 'diary' and stat_date = '2018-08-12') union (select device_id,cid_id,time,device_type,city_id,0 as clicked from jerry_test.data_feed_exposure where cid_id in (select id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5) and cid_type = 'diary' and stat_date = '2018-08-12')) tmp",user="root",
# dbtable="(select id as diary_id,doctor_id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5 and datediff(current_date,created_time)<90) tmp",
# user="root",
# password="").load()
# df = ctx.read.format("jdbc").options(url="jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com:3306/doris_test",
# driver="com.mysql.jdbc.Driver",
# dbtable="device diary_queue",
# user="work", password="workwork").load()
# df = ctx.read.format("jdbc").options(url="jdbc:mysql://rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com:3306/doris_prod",
# driver="com.mysql.jdbc.Driver",
# dbtable="device diary_queue",
# user="doris", password="o5gbA27hXHHm").load()
jdbcDf = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000",
driver="com.mysql.jdbc.Driver",
dbtable = "(select device_id from data_feed_click limit 8) tmp",
user="root",password = "3SYz54LS9#^9sBvC").load()
jdbcDf.show(6)
# url = "jdbc:mysql://10.66.157.22:4000/jerry_prod" # def fetch_data(start_date, end_date):
# table = "data_feed_click" # # sc = SparkSession.builder.appName("Python Spark SQL basic example") \
# properties = {"user": "root", "password": "3SYz54LS9#^9sBvC"} # # .config('spark.some.config,option0', 'some-value') \
# df = sqlContext.read.jdbc(url, table, properties) # # .getOrCreate()
# sc = SparkContext(conf=SparkConf().setAppName("mnist_streaming"))
# ctx = SQLContext(sc)
# # jdbcDf = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000",
# # driver="com.mysql.jdbc.Driver",
# # # dbtable="((select device_id,cid_id,time,device_type,city_id,1 as clicked from jerry_test.data_feed_click where cid_id in (select id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5) and cid_type = 'diary' and stat_date = '2018-08-12') union (select device_id,cid_id,time,device_type,city_id,0 as clicked from jerry_test.data_feed_exposure where cid_id in (select id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5) and cid_type = 'diary' and stat_date = '2018-08-12')) tmp",user="root",
# # dbtable="(select id as diary_id,doctor_id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5 and datediff(current_date,created_time)<90) tmp",
# # user="root",
# # password="").load()
# # df = ctx.read.format("jdbc").options(url="jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com:3306/doris_test",
# # driver="com.mysql.jdbc.Driver",
# # dbtable="device diary_queue",
# # user="work", password="workwork").load()
# # df = ctx.read.format("jdbc").options(url="jdbc:mysql://rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com:3306/doris_prod",
# # driver="com.mysql.jdbc.Driver",
# # dbtable="device diary_queue",
# # user="doris", password="o5gbA27hXHHm").load()
#
# jdbcDf = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000",
# driver="com.mysql.jdbc.Driver",
# dbtable = "(select device_id from data_feed_click limit 8) tmp",
# user="root",password = "3SYz54LS9#^9sBvC").load()
# jdbcDf.show(6)
#
# # url = "jdbc:mysql://10.66.157.22:4000/jerry_prod"
# # table = "data_feed_click"
# # properties = {"user": "root", "password": "3SYz54LS9#^9sBvC"}
# # df = sqlContext.read.jdbc(url, table, properties)
...@@ -117,7 +134,18 @@ def fetch_data(start_date, end_date): ...@@ -117,7 +134,18 @@ def fetch_data(start_date, end_date):
if __name__ == "__main__": if __name__ == "__main__":
fetch_data("2018-11-11","2018-11-12") sc = SparkContext(
conf=SparkConf().setAppName("mnist_streaming").set("spark.streaming.kafka.maxRatePerPartition", 100))
ctx = SQLContext(sc)
jdbcDf = ctx.read.format("jdbc").options(url="jdbc:mysql://rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com:3306/doris_prod",
driver="com.mysql.jdbc.Driver",
# dbtable="((select device_id,cid_id,time,device_type,city_id,1 as clicked from jerry_test.data_feed_click where cid_id in (select id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5) and cid_type = 'diary' and stat_date = '2018-08-12') union (select device_id,cid_id,time,device_type,city_id,0 as clicked from jerry_test.data_feed_exposure where cid_id in (select id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5) and cid_type = 'diary' and stat_date = '2018-08-12')) tmp",user="root",
dbtable="(select device_id from device_diary_queue limit 8) tmp",
user="doris",
password="o5gbA27hXHHm"
).load()
jdbcDf.show(6)
# fetch_data("2018-11-11","2018-11-12")
# from pyspark.context import SparkContext # from pyspark.context import SparkContext
# from pyspark.conf import SparkConf # from pyspark.conf import SparkConf
# from tensorflowonspark import TFCluster # from tensorflowonspark import TFCluster
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment