Commit d13ae435 authored by lixiaofang's avatar lixiaofang

修改消费者偏移量

parent 2b3661c9
...@@ -30,8 +30,8 @@ def kafka_consum(topic_name=None): ...@@ -30,8 +30,8 @@ def kafka_consum(topic_name=None):
redis_topic_partition_name = "vest:topic_name:" + str(topic_name) redis_topic_partition_name = "vest:topic_name:" + str(topic_name)
topic_partition_info = redis_client.hgetall(redis_topic_partition_name) topic_partition_info = redis_client.hgetall(redis_topic_partition_name)
for partition_id in topic_partition_info: for partition_id in topic_partition_info:
print (partition_id,topic_name) print (int(str(partition_id)),topic_name)
consumser_obj.seek(TopicPartition(topic=topic_name,partition=int(partition_id)),int(topic_partition_info[partition_id])) consumser_obj.seek(TopicPartition(topic=topic_name,partition=int(str(partition_id))),int(topic_partition_info[partition_id]))
while True: while True:
begin = time.time() begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30) msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment