Commit d2582d07 authored by lixiaofang's avatar lixiaofang

add

parent 958c2a88
...@@ -41,9 +41,9 @@ def kafka_consum(topic_name=None): ...@@ -41,9 +41,9 @@ def kafka_consum(topic_name=None):
card_info = json.loads(str(msg.value, encoding="utf8")) card_info = json.loads(str(msg.value, encoding="utf8"))
if card_info['card_type'] == "auto_vest": if card_info['card_type'] == "auto_vest":
###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据 ###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据
logging.info("11111判断是否消费过的函数消费处理耗时:%f" % (card_info["card_id"], time.time() - begin)) logging.info("11111判断是否消费过的函数消费处理耗时:%f" % (time.time() - begin))
bol_consum = judge_offset_partition_have_consum(offset=msg.offset, partition=msg.partition) bol_consum = judge_offset_partition_have_consum(offset=msg.offset, partition=msg.partition)
logging.info("22222判断是否消费过的函数消费处理耗时:%f" % (card_info["card_id"], time.time() - begin)) logging.info("22222判断是否消费过的函数消费处理耗时:%f" % (time.time() - begin))
if bol_consum: if bol_consum:
logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % ( logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment