Commit 796d5b46 authored by 张彦钊's avatar 张彦钊

change test file

parent 3e814f90
...@@ -104,23 +104,23 @@ def model(rdd): ...@@ -104,23 +104,23 @@ def model(rdd):
print(e) print(e)
# if __name__ == '__main__': if __name__ == '__main__':
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike_filter").set( sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike_filter").set(
"spark.io.compression.codec", "lzf")) "spark.io.compression.codec", "lzf"))
ssc = StreamingContext(sc, 10) ssc = StreamingContext(sc, 10)
sc.setLogLevel("WARN") sc.setLogLevel("WARN")
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092", kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id": "dislike", "group.id": "dislike",
"socket.timeout.ms": "600000", "socket.timeout.ms": "600000",
"auto.offset.reset": "largest"} "auto.offset.reset": "largest"}
try: try:
stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams, keyDecoder=gbk_decoder, stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams, keyDecoder=gbk_decoder,
valueDecoder=gbk_decoder) valueDecoder=gbk_decoder)
transformstream = stream.transform(lambda x: model(x)) transformstream = stream.transform(lambda x: model(x))
transformstream.pprint() transformstream.pprint()
ssc.start() ssc.start()
ssc.awaitTermination() ssc.awaitTermination()
except Exception as e: except Exception as e:
print(e) print(e)
# send_email(sc.appName, sc.applicationId, e) # send_email(sc.appName, sc.applicationId, e)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment