Commit 26546f59 authored by lixiaofang's avatar lixiaofang

auto_vest

parent 87b1e5b7
...@@ -26,8 +26,8 @@ def kafka_consum(topic_name=None): ...@@ -26,8 +26,8 @@ def kafka_consum(topic_name=None):
consumser_obj.subscribe([topic_name, ]) consumser_obj.subscribe([topic_name, ])
try: try:
while True: while True:
begin = datetime.datetime.now()
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30) msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30)
consumser_obj.commit_async()
for msg_key in msg_dict: for msg_key in msg_dict:
consume_msg = msg_dict[msg_key] consume_msg = msg_dict[msg_key]
for msg in consume_msg: for msg in consume_msg:
...@@ -111,6 +111,7 @@ def kafka_consum(topic_name=None): ...@@ -111,6 +111,7 @@ def kafka_consum(topic_name=None):
logging.info("push_time时间未到========================需要等待") logging.info("push_time时间未到========================需要等待")
save_data_to_kafka(card_info) save_data_to_kafka(card_info)
pass pass
print(datetime.datetime.now() - begin)
except: except:
consumser_obj.close() consumser_obj.close()
......
...@@ -244,8 +244,6 @@ def save_data_to_kafka(card_info): ...@@ -244,8 +244,6 @@ def save_data_to_kafka(card_info):
try: try:
topic = settings.KAFKA_TOPIC_NAME topic = settings.KAFKA_TOPIC_NAME
producer.send(topic, json.dumps(card_info).encode()) producer.send(topic, json.dumps(card_info).encode())
producer.close()
except: except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment