Commit 0dd1ec89 authored by 王志伟's avatar 王志伟
parents 0a0e3068 064892be
......@@ -130,17 +130,21 @@ from pyspark.sql import SQLContext
if __name__ == "__main__":
from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
sc = SparkContext(
conf=SparkConf().setAppName("mnist_streaming").set("spark.streaming.kafka.maxRatePerPartition", 100))
sc = SparkContext(conf=SparkConf().setAppName("mnist_streaming")).getOrCreate()
ctx = SQLContext(sc)
jdbcDf = ctx.read.format("jdbc").options(url="jdbc:mysql://rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com:3306/doris_prod",
jdbcDf = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000/jerry_test",
driver="com.mysql.jdbc.Driver",
# dbtable="((select device_id,cid_id,time,device_type,city_id,1 as clicked from jerry_test.data_feed_click where cid_id in (select id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5) and cid_type = 'diary' and stat_date = '2018-08-12') union (select device_id,cid_id,time,device_type,city_id,0 as clicked from jerry_test.data_feed_exposure where cid_id in (select id from eagle.src_mimas_prod_api_diary where doctor_id is not null and content_level >3.5) and cid_type = 'diary' and stat_date = '2018-08-12')) tmp",user="root",
dbtable="(select device_id from device_diary_queue limit 8) tmp",
user="doris",
password="o5gbA27hXHHm"
).load()
dbtable="(select * from nd_cid_similarity_matrix) tmp",
user="root",
password="").load()
print(jdbcDf.printSchema())
print(jdbcDf.collect())
jdbcDf.show(6)
# fetch_data("2018-11-11","2018-11-12")
# from pyspark.context import SparkContext
......
from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
import datetime
from pyspark.sql import HiveContext
def get_data(day):
sc = SparkContext(conf=SparkConf().setAppName("multi_task")).getOrCreate()
sc.setLogLevel("WARN")
ctx = SQLContext(sc)
end_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
start_date = (datetime.date.today() - datetime.timedelta(days=day)).strftime("%Y-%m-%d")
dbtable = "(select device_id,cid_id,stat_date from data_feed_click " \
"where stat_date >= '{}' and stat_date <= '{}')tmp".format(start_date, end_date)
click = ctx.read.format("jdbc").options(url="jdbc:mysql://10.66.157.22:4000/jerry_prod",
driver="com.mysql.jdbc.Driver",
dbtable=dbtable,
user="root",
password="3SYz54LS9#^9sBvC").load()
click.show(6)
click = click.rdd.map(lambda x:(x[0],x[1],x[2]))
device_id = tuple(click.map(lambda x:x[0]).collect())
print(device_id[0:2])
dbtable = "(select device_id,cid_id,stat_date from data_feed_exposure " \
"where stat_date >= '{}' and stat_date <= '{}' and device_id in {})tmp".format(start_date,end_date,device_id)
exp = ctx.read.format("jdbc").options(url="jdbc:mysql://10.66.157.22:4000/jerry_prod",
driver="com.mysql.jdbc.Driver",
dbtable=dbtable,
user="root",
password="3SYz54LS9#^9sBvC").load()
exp.show(6)
exp = exp.rdd.map(lambda x:(x[0],x[1],x[2])).subtract(click).map(lambda x:((x[0],x[1],x[2]),1))\
.reduceByKey(lambda x,y:x+y).filter(lambda x:x[1] >= 3).map(lambda x:(x[0][0],x[0][1],x[0][2],0))
click = click.map(lambda x:(x[0],x[1],x[2],1))
date = click.map(lambda x:x[2]).collect()
def test():
sc = SparkContext(conf=SparkConf().setAppName("multi_task")).getOrCreate()
sc.setLogLevel("WARN")
ctx = SQLContext(sc)
end_date = "2018-09-10"
start_date = "2018-09-09"
dbtable = "(select device_id,cid_id,stat_date from data_feed_click " \
"limit 80)tmp".format(start_date)
click = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000/jerry_prod",
driver="com.mysql.jdbc.Driver",
dbtable=dbtable,
user="root",
password="").load()
click.show(6)
click = click.rdd.map(lambda x: (x[0], x[1], x[2]))
date = click.map(lambda x: x[2]).collect()
cid = click.map(lambda x: x[1]).collect()
click = click.map(lambda x:str(1)+" "+str(cid.index(x[1]))+":"+str(1)+" "+str(date.index(x[2]))+":"+str(1))
print(click.take(6))
# device_id = tuple(click.map(lambda x: x[0]).collect())
# print(device_id[0:2])
# dbtable = "(select device_id,cid_id,stat_date from data_feed_exposure " \
# "where stat_date = '{}' and device_id in {})tmp".format(start_date,device_id)
# exp = ctx.read.format("jdbc").options(url="jdbc:mysql://192.168.15.12:4000/jerry_prod",
# driver="com.mysql.jdbc.Driver",
# dbtable=dbtable,
# user="root",
# password="").load()
# exp.show(6)
# exp = exp.rdd.map(lambda x: (x[0], x[1], x[2])).subtract(click).map(lambda x: ((x[0], x[1], x[2]), 1)) \
# .reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] >= 3).map(lambda x: (x[0][0], x[0][1], x[0][2], 0))
# click = click.map(lambda x: (x[0], x[1], x[2], 1))
def hive():
conf = SparkConf().setAppName("My app")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
sqlContext = HiveContext(sc)
sql = "select partition_date from online.tl_hdfs_maidian_view limit 10"
my_dataframe = sqlContext.sql(sql)
my_dataframe.show(6)
if __name__ == '__main__':
hive()
from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
def get_data():
sc = SparkContext(conf=SparkConf().setAppName("multi_task")).getOrCreate()
ctx = SQLContext(sc)
jdbcDf = ctx.read.format("jdbc").options(url="jdbc:mysql://10.66.157.22:4000/jerry_prod",
driver="com.mysql.jdbc.Driver",
dbtable="(select device_id from data_feed_click limit 8) tmp",
user="root",
password="3SYz54LS9#^9sBvC").load()
jdbcDf.show(6)
if __name__ == '__main__':
get_data()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment