Commit fe34b457 authored by 段英荣's avatar 段英荣

modify

parent dce90e1b
......@@ -20,8 +20,8 @@ class KafkaManager(object):
if not cls.consumser_obj:
topic_name = cls.topic_name if not topic_name else topic_name
cls.consumser_obj = KafkaConsumer(bootstrap_servers=cls.kafka_broker_list)
cls.consumser_obj.subscribe([topic_name])
cls.consumser_obj = KafkaConsumer(topic_name,bootstrap_servers=cls.kafka_broker_list)
# cls.consumser_obj.subscribe([topic_name])
return cls.consumser_obj
......@@ -82,7 +82,11 @@ class CollectData(object):
user_feature = [1,1]
kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name)
for ori_msg in kafka_consumer_obj:
while True:
msg_dict = kafka_consumer_obj.poll(timeout_ms=100)
for msg_key in msg_dict:
consume_msg = msg_dict[msg_key]
for ori_msg in consume_msg:
try:
logging.info(ori_msg)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment