Commit d7f8ed5c authored by lixiaofang's avatar lixiaofang

去掉多余的判断

parent 0c5a4114
...@@ -36,9 +36,8 @@ def kafka_consum(topic_name=None): ...@@ -36,9 +36,8 @@ def kafka_consum(topic_name=None):
while True: while True:
begin = time.time() begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30) msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30)
for msg_key in msg_dict: for msg_value in msg_dict.values():
consume_msg = msg_dict[msg_key] for msg in msg_value:
for msg in consume_msg:
card_info = json.loads(str(msg.value, encoding="utf8")) card_info = json.loads(str(msg.value, encoding="utf8"))
if card_info['card_type'] == "auto_vest": if card_info['card_type'] == "auto_vest":
###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据 ###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment