Commit 3e814f90 authored by 张彦钊's avatar 张彦钊

change test file

parent 94338b4a
......@@ -96,7 +96,7 @@ def write_redis(device_id,cid_list):
def model(rdd):
try:
rdd.repartition(10).filter(lambda x: maidian(x))
rdd.filter(lambda x: maidian(x))
#.map(lambda x:get_data(x)).na.drop().\
#groupByKey().map(lambda x,y:write_redis(x,y))
except Exception as e:
......@@ -104,24 +104,24 @@ def model(rdd):
print(e)
if __name__ == '__main__':
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike_filter").set(
"spark.io.compression.codec", "lzf"))
ssc = StreamingContext(sc, 10)
sc.setLogLevel("WARN")
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id": "dislike",
"socket.timeout.ms": "600000",
"auto.offset.reset": "largest"}
try:
stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams, keyDecoder=gbk_decoder,
valueDecoder=gbk_decoder)
transformstream = stream.transform(lambda x: model(x))
# transformstream.pprint()
ssc.start()
ssc.awaitTermination()
except Exception as e:
print(e)
# if __name__ == '__main__':
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike_filter").set(
"spark.io.compression.codec", "lzf"))
ssc = StreamingContext(sc, 10)
sc.setLogLevel("WARN")
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id": "dislike",
"socket.timeout.ms": "600000",
"auto.offset.reset": "largest"}
try:
stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams, keyDecoder=gbk_decoder,
valueDecoder=gbk_decoder)
transformstream = stream.transform(lambda x: model(x))
transformstream.pprint()
ssc.start()
ssc.awaitTermination()
except Exception as e:
print(e)
# send_email(sc.appName, sc.applicationId, e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment