Commit 319894eb authored by lixiaofang's avatar lixiaofang

修改提交方式

parent 5d0b6d55
...@@ -26,7 +26,7 @@ from kafka import TopicPartition ...@@ -26,7 +26,7 @@ from kafka import TopicPartition
def kafka_consum(topic_name=None): def kafka_consum(topic_name=None):
topic_name = settings.KAFKA_TOPIC_NAME topic_name = settings.KAFKA_TOPIC_NAME
consumser_obj = KafkaConsumer(topic_name, bootstrap_servers=settings.KAFKA_BROKER_LIST, enable_auto_commit=True, consumser_obj = KafkaConsumer(topic_name, bootstrap_servers=settings.KAFKA_BROKER_LIST, enable_auto_commit=False,
group_id="vest", auto_offset_reset='earliest') group_id="vest", auto_offset_reset='earliest')
consumser_obj.subscribe([topic_name, ]) consumser_obj.subscribe([topic_name, ])
...@@ -41,8 +41,7 @@ def kafka_consum(topic_name=None): ...@@ -41,8 +41,7 @@ def kafka_consum(topic_name=None):
while True: while True:
begin = time.time() begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=10000, max_records=500) msg_dict = consumser_obj.poll(timeout_ms=10000, max_records=500)
consumser_obj.commit()
consumser_obj.commit_async()
for msg_value in msg_dict.values(): for msg_value in msg_dict.values():
for msg in msg_value: for msg in msg_value:
card_info = json.loads(str(msg.value, encoding="utf8")) card_info = json.loads(str(msg.value, encoding="utf8"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment