Commit 0c5a4114 authored by lixiaofang's avatar lixiaofang

判断offset是否已经消费了

parent 0cb7322a
...@@ -19,6 +19,7 @@ from comment.views.comment_fun import comment ...@@ -19,6 +19,7 @@ from comment.views.comment_fun import comment
from follow.views.follow_fun import follow from follow.views.follow_fun import follow
from libs.cache import redis_client from libs.cache import redis_client
from kafka.structs import TopicPartition from kafka.structs import TopicPartition
from moment.views.process_time import judge_offset_partition_have_consum
def kafka_consum(topic_name=None): def kafka_consum(topic_name=None):
...@@ -40,6 +41,10 @@ def kafka_consum(topic_name=None): ...@@ -40,6 +41,10 @@ def kafka_consum(topic_name=None):
for msg in consume_msg: for msg in consume_msg:
card_info = json.loads(str(msg.value, encoding="utf8")) card_info = json.loads(str(msg.value, encoding="utf8"))
if card_info['card_type'] == "auto_vest": if card_info['card_type'] == "auto_vest":
###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据
bol_consum = judge_offset_partition_have_consum(offset=msg.offset, partition=msg.partition)
if bol_consum:
logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % ( logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), card_info)) str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), card_info))
redis_client.hset(redis_topic_partition_name, msg.partition, msg.offset) redis_client.hset(redis_topic_partition_name, msg.partition, msg.offset)
...@@ -104,7 +109,8 @@ def kafka_consum(topic_name=None): ...@@ -104,7 +109,8 @@ def kafka_consum(topic_name=None):
card_info['card_id'], time.time() - begin)) card_info['card_id'], time.time() - begin))
is_success = comment(card_info) is_success = comment(card_info)
logging.info("comment [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % ( logging.info("comment [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), str(msg.topic), str(msg.partition), str(msg.offset),
str(msg.key),
card_info["card_id"], is_success)) card_info["card_id"], is_success))
logging.info("当前卡片ID:%s,comment2子函数消费处理耗时:%f" % ( logging.info("当前卡片ID:%s,comment2子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin)) card_info['card_id'], time.time() - begin))
...@@ -112,13 +118,15 @@ def kafka_consum(topic_name=None): ...@@ -112,13 +118,15 @@ def kafka_consum(topic_name=None):
auto_comment_user(card_info) auto_comment_user(card_info)
elif action_type == "click": # 在这里去调点赞的接口 elif action_type == "click": # 在这里去调点赞的接口
if 'have_click_number' in card_info and card_info['have_click_number'] < 20: if 'have_click_number' in card_info and card_info[
'have_click_number'] < 20:
card_info["have_click_number"] += 1 card_info["have_click_number"] += 1
logging.info("当前卡片ID:%s,click1子函数消费处理耗时:%f" % ( logging.info("当前卡片ID:%s,click1子函数消费处理耗时:%f" % (
card_info["card_id"], time.time() - begin)) card_info["card_id"], time.time() - begin))
is_success = click(card_info) is_success = click(card_info)
logging.info("click [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % ( logging.info("click [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), str(msg.topic), str(msg.partition), str(msg.offset),
str(msg.key),
card_info["card_id"], is_success)) card_info["card_id"], is_success))
logging.info("当前卡片ID:%s,click2子函数消费处理耗时:%f" % ( logging.info("当前卡片ID:%s,click2子函数消费处理耗时:%f" % (
card_info["card_id"], time.time() - begin)) card_info["card_id"], time.time() - begin))
...@@ -136,7 +144,8 @@ def kafka_consum(topic_name=None): ...@@ -136,7 +144,8 @@ def kafka_consum(topic_name=None):
card_info["card_id"], time.time() - begin)) card_info["card_id"], time.time() - begin))
logging.info("follow [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % ( logging.info("follow [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), str(msg.topic), str(msg.partition), str(msg.offset),
str(msg.key),
card_info["card_id"], is_success)) card_info["card_id"], is_success))
auto_follow_user(card_info) auto_follow_user(card_info)
......
...@@ -310,3 +310,26 @@ def get_vest_userid_and_comment(need_comment_num=0, tag_names=[], card_id=0): ...@@ -310,3 +310,26 @@ def get_vest_userid_and_comment(need_comment_num=0, tag_names=[], card_id=0):
except: except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return [] return []
def judge_offset_partition_have_consum(offset=0, partition=0):
"""
根据当前的offset和分区去判断数据是否已经被消费
:param offset:
:param partition:
:return:
"""
try:
redis_list_data = set()
key = "irrigation_partition_offset_have_consum:" + str(partition)
redis_data = redis_client.get(key)
if redis_data:
redis_list_data = set(json.loads(redis_data))
if offset in redis_list_data:
return False
redis_list_data.add(offset)
redis_client.set(key, json.dumps(list(redis_list_data)))
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment