Commit 8ba6b792 authored by lixiaofang's avatar lixiaofang

add

parent e3560c64
......@@ -40,11 +40,11 @@ def kafka_consum(topic_name=None):
card_info = json.loads(str(msg.value, encoding="utf8"))
if card_info['card_type'] == "auto_vest":
###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据
bol_consum = judge_offset_partition_have_consum(card_info=card_info, offset=msg.offset,
partition=msg.partition)
# bol_consum = judge_offset_partition_have_consum(card_info=card_info, offset=msg.offset,
# partition=msg.partition)
##在这里先判断当前的数据是否已经存在,存在的话直接PaaS,不存在的话再做下边的处理
# change_consum = judge_data_have_in_redis(card_info)
# bol_consum = True
bol_consum = True
if bol_consum:
logging.info("消费到新数据了[%s,%s,%s,%s],get card_id:%s,create_time:%s,current_push_time:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),
......
......@@ -393,8 +393,6 @@ def judge_offset_partition_have_consum(card_info=None, offset=0, partition=0):
create_time = card_info['create_time']
datetime_create_time = datetime.datetime.strptime(create_time, '%Y-%m-%d %H:%M:%S')
str_data = str(datetime_create_time.year) + str(datetime_create_time.month) + str(datetime_create_time.day)
if str_data in ['20191229', '20191230', '20191228', '20191227']:
return False
redis_list_data = 0
key = "irrigation_partition_offset_have_consum:" + str(partition)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment