Commit 1c2cba7a authored by lixiaofang's avatar lixiaofang

修改消费者偏移量

parent a852a9d9
...@@ -17,6 +17,8 @@ from libs.error import logging_exception ...@@ -17,6 +17,8 @@ from libs.error import logging_exception
from click.views.click_fun import click from click.views.click_fun import click
from comment.views.comment_fun import comment from comment.views.comment_fun import comment
from follow.views.follow_fun import follow from follow.views.follow_fun import follow
from libs.cache import redis_client
from kafka.structs import TopicPartition
def kafka_consum(topic_name=None): def kafka_consum(topic_name=None):
...@@ -25,6 +27,10 @@ def kafka_consum(topic_name=None): ...@@ -25,6 +27,10 @@ def kafka_consum(topic_name=None):
auto_commit_interval_ms=1, group_id="vest") auto_commit_interval_ms=1, group_id="vest")
consumser_obj.subscribe([topic_name, ]) consumser_obj.subscribe([topic_name, ])
try: try:
redis_topic_partition_name = "vest:topic_name:" + str(topic_name)
topic_partition_info = redis_client.hgetall(redis_topic_partition_name)
for partition_id in topic_partition_info:
consumser_obj.seek(TopicPartition(topic=topic_name,partition=int(partition_id)),int(topic_partition_info[partition_id]))
while True: while True:
begin = time.time() begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30) msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30)
...@@ -34,6 +40,7 @@ def kafka_consum(topic_name=None): ...@@ -34,6 +40,7 @@ def kafka_consum(topic_name=None):
card_info = json.loads(msg.value) card_info = json.loads(msg.value)
if card_info['card_type'] == "auto_vest": if card_info['card_type'] == "auto_vest":
logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (str(msg.topic),str(msg.partition),str(msg.offset),str(msg.key),card_info)) logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (str(msg.topic),str(msg.partition),str(msg.offset),str(msg.key),card_info))
redis_client.hset(redis_topic_partition_name,msg.partition,msg.offset)
# 代表当天数据 # 代表当天数据
current_push_time = card_info['current_push_time'] current_push_time = card_info['current_push_time']
create_time = card_info['create_time'] create_time = card_info['create_time']
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment