Commit 2b3661c9 authored by lixiaofang's avatar lixiaofang

修改消费者偏移量

parent 1c2cba7a
...@@ -30,6 +30,7 @@ def kafka_consum(topic_name=None): ...@@ -30,6 +30,7 @@ def kafka_consum(topic_name=None):
redis_topic_partition_name = "vest:topic_name:" + str(topic_name) redis_topic_partition_name = "vest:topic_name:" + str(topic_name)
topic_partition_info = redis_client.hgetall(redis_topic_partition_name) topic_partition_info = redis_client.hgetall(redis_topic_partition_name)
for partition_id in topic_partition_info: for partition_id in topic_partition_info:
print (partition_id,topic_name)
consumser_obj.seek(TopicPartition(topic=topic_name,partition=int(partition_id)),int(topic_partition_info[partition_id])) consumser_obj.seek(TopicPartition(topic=topic_name,partition=int(partition_id)),int(topic_partition_info[partition_id]))
while True: while True:
begin = time.time() begin = time.time()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment