Commit aee31959 authored by 张彦钊's avatar 张彦钊

change test

parent 6a80bc1f
...@@ -30,8 +30,8 @@ def Json(x): ...@@ -30,8 +30,8 @@ def Json(x):
def model(rdd): def model(rdd):
try: try:
rdd = rdd.filter(lambda x:Json(x)).repartition(10).map(lambda x:get_data(x)) rdd = rdd.filter(lambda x:Json(x)).repartition(10).map(lambda x:get_data(x))\
# .map(lambda x:write_redis(x[0],x[1],x[2])) .map(lambda x:write_redis(x[0],x[1],x[2]))
return rdd return rdd
except: except:
print("fail") print("fail")
...@@ -77,7 +77,7 @@ def tractate_write(device_id, cid): ...@@ -77,7 +77,7 @@ def tractate_write(device_id, cid):
db.close() db.close()
if len(result) > 0: if len(result) > 0:
cids = [i[0] for i in result] cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="",db = 2) r = redis.Redis(host="172.16.40.135", port=5379, password="",db = 2)
key = str(device_id) + "_dislike_tractate" key = str(device_id) + "_dislike_tractate"
...@@ -117,7 +117,7 @@ def question_write(device_id,cid): ...@@ -117,7 +117,7 @@ def question_write(device_id,cid):
result = cursor.fetchall() result = cursor.fetchall()
db.close() db.close()
if len(result) > 0: if len(result) > 0:
cids = [i[0] for i in result] cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2) r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
key = str(device_id) + "_dislike_qa" key = str(device_id) + "_dislike_qa"
...@@ -158,7 +158,7 @@ def diary_write(device_id,cid): ...@@ -158,7 +158,7 @@ def diary_write(device_id,cid):
result = cursor.fetchall() result = cursor.fetchall()
db.close() db.close()
if len(result) > 0: if len(result) > 0:
cids = [i[0] for i in result] cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2) r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
key = str(device_id) + "_dislike_diary" key = str(device_id) + "_dislike_diary"
if r.exists(key): if r.exists(key):
...@@ -176,23 +176,21 @@ def diary_write(device_id,cid): ...@@ -176,23 +176,21 @@ def diary_write(device_id,cid):
print(e) print(e)
# sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike").set("spark.io.compression.codec", "lzf")) sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike").set("spark.io.compression.codec", "lzf"))
# ssc = StreamingContext(sc,4) ssc = StreamingContext(sc,4)
# sc.setLogLevel("WARN") sc.setLogLevel("WARN")
# kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092", kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
# "group.id": "dislike", "group.id": "dislike",
# "socket.timeout.ms": "600000", "socket.timeout.ms": "600000",
# "auto.offset.reset": "largest"} "auto.offset.reset": "largest"}
#
#
# stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams) stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams)
# transformstream = stream.transform(lambda x:model(x)) transformstream = stream.transform(lambda x:model(x))
# transformstream.pprint() transformstream.pprint()
#
# ssc.start() ssc.start()
# ssc.awaitTermination() ssc.awaitTermination()
question_write("hello",5)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment