Commit ba299554 authored by 李小芳's avatar 李小芳

Merge branch 'change_time_outdate' into 'master'

Change time outdate

See merge request !32
parents debbbf0c 59040dc8
......@@ -29,15 +29,7 @@ def kafka_consum(topic_name=None):
consumser_obj = KafkaConsumer(topic_name, bootstrap_servers=settings.KAFKA_BROKER_LIST, enable_auto_commit=False,
group_id="vest", auto_offset_reset='earliest')
consumser_obj.subscribe([topic_name, ])
# partition = TopicPartition(topic_name, int(clientid))
# consumser_obj.assign([partition])
try:
# redis_topic_partition_name = "vest:topic_name:" + str(topic_name)
# topic_partition_info = redis_client.hgetall(redis_topic_partition_name)
# for partition_id in topic_partition_info:
# print (int(partition_id.decode()),topic_name)
# consumser_obj.seek(partition=TopicPartition(topic_name,int(partition_id.decode())),offset=int(topic_partition_info[partition_id]))
while True:
begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=10000, max_records=500)
......@@ -45,13 +37,17 @@ def kafka_consum(topic_name=None):
for msg_value in msg_dict.values():
for msg in msg_value:
card_info = json.loads(str(msg.value, encoding="utf8"))
if card_info['card_id'] in [719961, 718880, 717122, 720913]:
logging.info("删除重复数据:%s" % card_info['card_id'])
if card_info['card_type'] == "auto_vest":
###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据
bol_consum = judge_offset_partition_have_consum(card_info=card_info, offset=msg.offset,
partition=msg.partition)
##在这里先判断当前的数据是否已经存在,存在的话直接PaaS,不存在的话再做下边的处理
# bol_consum = judge_data_have_in_redis(card_info)
# bol_consum = True
# change_consum = judge_data_have_in_redis(card_info)
if bol_consum:
logging.info("消费到新数据了[%s,%s,%s,%s],get card_id:%s,create_time:%s,current_push_time:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),
......
......@@ -386,6 +386,7 @@ def judge_data_have_in_redis(card_info=None):
today = datetime.datetime.now()
str_today = str(today.year) + str(today.month) + str(today.day)
push_time = card_info['current_push_time']
push_datetime = datetime.datetime.strptime(push_time, '%Y-%m-%d %H:%M:%S')
str_push_datetime = str(push_datetime.year) + str(push_datetime.month) + str(push_datetime.day)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment