Commit 4cc6dd8b authored by lixiaofang's avatar lixiaofang

commit

parent ff4f5e72
...@@ -33,6 +33,8 @@ def kafka_consum(topic_name=None): ...@@ -33,6 +33,8 @@ def kafka_consum(topic_name=None):
while True: while True:
begin = time.time() begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=30000, max_records=100) msg_dict = consumser_obj.poll(timeout_ms=30000, max_records=100)
consumser_obj.commit()
for msg_value in msg_dict.values(): for msg_value in msg_dict.values():
for msg in msg_value: for msg in msg_value:
card_info = json.loads(str(msg.value, encoding="utf8")) card_info = json.loads(str(msg.value, encoding="utf8"))
...@@ -133,7 +135,6 @@ def kafka_consum(topic_name=None): ...@@ -133,7 +135,6 @@ def kafka_consum(topic_name=None):
else: else:
logging.info("此条数据已经被paas了:%s" % card_info['card_id']) logging.info("此条数据已经被paas了:%s" % card_info['card_id'])
logging.info("消费处理耗时:%f" % (time.time() - begin)) logging.info("消费处理耗时:%f" % (time.time() - begin))
consumser_obj.commit()
except: except:
consumser_obj.close() consumser_obj.close()
logging_exception() logging_exception()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment