Commit c9c1e685 authored by lixiaofang's avatar lixiaofang

修改消费者偏移量

parent d13ae435
...@@ -30,8 +30,8 @@ def kafka_consum(topic_name=None): ...@@ -30,8 +30,8 @@ def kafka_consum(topic_name=None):
redis_topic_partition_name = "vest:topic_name:" + str(topic_name) redis_topic_partition_name = "vest:topic_name:" + str(topic_name)
topic_partition_info = redis_client.hgetall(redis_topic_partition_name) topic_partition_info = redis_client.hgetall(redis_topic_partition_name)
for partition_id in topic_partition_info: for partition_id in topic_partition_info:
print (int(str(partition_id)),topic_name) print (int(partition_id.decode()),topic_name)
consumser_obj.seek(TopicPartition(topic=topic_name,partition=int(str(partition_id))),int(topic_partition_info[partition_id])) consumser_obj.seek(TopicPartition(topic=topic_name,partition=int(partition_id.decode())),int(topic_partition_info[partition_id]))
while True: while True:
begin = time.time() begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30) msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment