Commit bb60cf1c authored by lixiaofang's avatar lixiaofang

add

parent 8510b7f4
......@@ -41,8 +41,7 @@ def kafka_consum(topic_name=None):
card_info = json.loads(str(msg.value, encoding="utf8"))
if card_info['card_type'] == "auto_vest":
###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据
bol_consum = judge_offset_partition_have_consum(offset=msg.offset, partition=msg.partition)
bol_consum = judge_offset_partition_have_consum(card_info=card_info,offset=msg.offset, partition=msg.partition)
if bol_consum:
logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), card_info))
......
......@@ -329,7 +329,7 @@ def get_vest_userid_and_comment(need_comment_num=0, tag_names=[], card_id=0):
return []
def judge_offset_partition_have_consum(offset=0, partition=0):
def judge_offset_partition_have_consum(card_info=None, offset=0, partition=0):
"""
根据当前的offset和分区去判断数据是否已经被消费
:param offset:
......@@ -337,6 +337,13 @@ def judge_offset_partition_have_consum(offset=0, partition=0):
:return:
"""
try:
##先判断是不是2019-12-29的
create_time = card_info['create_time']
datetime_create_time = datetime.datetime.strptime(create_time, '%Y-%m-%d %H:%M:%S')
str_data = str(datetime_create_time.year) + str(datetime_create_time.month) + str(datetime_create_time.day)
if str_data in ['20191229', '20191230']:
return False
redis_list_data = 0
key = "irrigation_partition_offset_have_consum:" + str(partition)
redis_data = redis_client.get(key)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment