Commit 17af2fcb authored by lixiaofang's avatar lixiaofang

commit

parent 3dfc03ee
...@@ -33,17 +33,16 @@ def kafka_consum(topic_name=None): ...@@ -33,17 +33,16 @@ def kafka_consum(topic_name=None):
while True: while True:
begin = time.time() begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=10000, max_records=500) msg_dict = consumser_obj.poll(timeout_ms=10000, max_records=500)
consumser_obj.commit()
for msg_value in msg_dict.values(): for msg_value in msg_dict.values():
for msg in msg_value: for msg in msg_value:
card_info = json.loads(str(msg.value, encoding="utf8")) card_info = json.loads(str(msg.value, encoding="utf8"))
if card_info['card_type'] == "auto_vest": if card_info['card_type'] == "auto_vest":
###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据 ###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据
bol_consum = judge_offset_partition_have_consum(card_info=card_info, offset=msg.offset, # bol_consum = judge_offset_partition_have_consum(card_info=card_info, offset=msg.offset,
partition=msg.partition) # partition=msg.partition)
##在这里先判断当前的数据是否已经存在,存在的话直接PaaS,不存在的话再做下边的处理 ##在这里先判断当前的数据是否已经存在,存在的话直接PaaS,不存在的话再做下边的处理
# change_consum = judge_data_have_in_redis(card_info) # change_consum = judge_data_have_in_redis(card_info)
bol_consum = True
if bol_consum: if bol_consum:
logging.info("消费到新数据了[%s,%s,%s,%s],get card_id:%s,create_time:%s,current_push_time:%s" % ( logging.info("消费到新数据了[%s,%s,%s,%s],get card_id:%s,create_time:%s,current_push_time:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),
...@@ -134,6 +133,7 @@ def kafka_consum(topic_name=None): ...@@ -134,6 +133,7 @@ def kafka_consum(topic_name=None):
else: else:
logging.info("此条数据已经被paas了:%s" % card_info['card_id']) logging.info("此条数据已经被paas了:%s" % card_info['card_id'])
logging.info("消费处理耗时:%f" % (time.time() - begin)) logging.info("消费处理耗时:%f" % (time.time() - begin))
consumser_obj.commit()
except: except:
consumser_obj.close() consumser_obj.close()
logging_exception() logging_exception()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment