Commit e58c41c0 authored by lixiaofang's avatar lixiaofang

修改时间

parent 7b04de82
......@@ -39,10 +39,12 @@ def kafka_consum(topic_name=None):
for msg_value in msg_dict.values():
for msg in msg_value:
card_info = json.loads(str(msg.value, encoding="utf8"))
if card_info['card_type'] == "auto_vest":
###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据
bol_consum = judge_offset_partition_have_consum(card_info=card_info, offset=msg.offset,
partition=msg.partition)
# bol_consum = judge_offset_partition_have_consum(card_info=card_info, offset=msg.offset,
# partition=msg.partition)
bol_consum = True
if bol_consum:
logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), card_info))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment