Commit 7d9d0af7 authored by lixiaofang's avatar lixiaofang

add

parent d2582d07
......@@ -41,9 +41,7 @@ def kafka_consum(topic_name=None):
card_info = json.loads(str(msg.value, encoding="utf8"))
if card_info['card_type'] == "auto_vest":
###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据
logging.info("11111判断是否消费过的函数消费处理耗时:%f" % (time.time() - begin))
bol_consum = judge_offset_partition_have_consum(offset=msg.offset, partition=msg.partition)
logging.info("22222判断是否消费过的函数消费处理耗时:%f" % (time.time() - begin))
if bol_consum:
logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (
......
......@@ -337,15 +337,12 @@ def judge_offset_partition_have_consum(offset=0, partition=0):
:return:
"""
try:
redis_list_data = set()
redis_list_data = 0
key = "irrigation_partition_offset_have_consum:" + str(partition)
redis_data = redis_client.get(key)
if redis_data:
redis_list_data = set(json.loads(str(redis_data, encoding="utf8")))
if offset in redis_list_data:
if offset > int(redis_data):
return False
redis_list_data.add(offset)
redis_client.set(key, json.dumps(list(redis_list_data)))
redis_client.set(key, offset)
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment