Commit c9991d10 authored by lixiaofang's avatar lixiaofang

删掉一些重复数据

parent debbbf0c
...@@ -29,15 +29,7 @@ def kafka_consum(topic_name=None): ...@@ -29,15 +29,7 @@ def kafka_consum(topic_name=None):
consumser_obj = KafkaConsumer(topic_name, bootstrap_servers=settings.KAFKA_BROKER_LIST, enable_auto_commit=False, consumser_obj = KafkaConsumer(topic_name, bootstrap_servers=settings.KAFKA_BROKER_LIST, enable_auto_commit=False,
group_id="vest", auto_offset_reset='earliest') group_id="vest", auto_offset_reset='earliest')
consumser_obj.subscribe([topic_name, ]) consumser_obj.subscribe([topic_name, ])
# partition = TopicPartition(topic_name, int(clientid))
# consumser_obj.assign([partition])
try: try:
# redis_topic_partition_name = "vest:topic_name:" + str(topic_name)
# topic_partition_info = redis_client.hgetall(redis_topic_partition_name)
# for partition_id in topic_partition_info:
# print (int(partition_id.decode()),topic_name)
# consumser_obj.seek(partition=TopicPartition(topic_name,int(partition_id.decode())),offset=int(topic_partition_info[partition_id]))
while True: while True:
begin = time.time() begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=10000, max_records=500) msg_dict = consumser_obj.poll(timeout_ms=10000, max_records=500)
...@@ -45,13 +37,17 @@ def kafka_consum(topic_name=None): ...@@ -45,13 +37,17 @@ def kafka_consum(topic_name=None):
for msg_value in msg_dict.values(): for msg_value in msg_dict.values():
for msg in msg_value: for msg in msg_value:
card_info = json.loads(str(msg.value, encoding="utf8")) card_info = json.loads(str(msg.value, encoding="utf8"))
if card_info['card_id'] in [719961, 718880, 717122, 720913]:
pass
if card_info['card_type'] == "auto_vest": if card_info['card_type'] == "auto_vest":
###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据 ###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据
bol_consum = judge_offset_partition_have_consum(card_info=card_info, offset=msg.offset, bol_consum = judge_offset_partition_have_consum(card_info=card_info, offset=msg.offset,
partition=msg.partition) partition=msg.partition)
##在这里先判断当前的数据是否已经存在,存在的话直接PaaS,不存在的话再做下边的处理 ##在这里先判断当前的数据是否已经存在,存在的话直接PaaS,不存在的话再做下边的处理
# bol_consum = judge_data_have_in_redis(card_info) # change_consum = judge_data_have_in_redis(card_info)
# bol_consum = True
if bol_consum: if bol_consum:
logging.info("消费到新数据了[%s,%s,%s,%s],get card_id:%s,create_time:%s,current_push_time:%s" % ( logging.info("消费到新数据了[%s,%s,%s,%s],get card_id:%s,create_time:%s,current_push_time:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),
......
...@@ -386,11 +386,15 @@ def judge_data_have_in_redis(card_info=None): ...@@ -386,11 +386,15 @@ def judge_data_have_in_redis(card_info=None):
today = datetime.datetime.now() today = datetime.datetime.now()
str_today = str(today.year) + str(today.month) + str(today.day) str_today = str(today.year) + str(today.month) + str(today.day)
push_time = card_info['current_push_time'] push_time = card_info['current_push_time']
push_datetime = datetime.datetime.strptime(push_time, '%Y-%m-%d %H:%M:%S') push_datetime = datetime.datetime.strptime(push_time, '%Y-%m-%d %H:%M:%S')
str_push_datetime = str(push_datetime.year) + str(push_datetime.month) + str(push_datetime.day) str_push_datetime = str(push_datetime.year) + str(push_datetime.month) + str(push_datetime.day)
if str_today != str_push_datetime: if str_today != str_push_datetime:
if push_datetime.month in (2, 3, 4):
pass
return True return True
if card_info['action_type'] == 'comment': if card_info['action_type'] == 'comment':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment