Commit b57b718e authored by lixiaofang's avatar lixiaofang

修改消费者偏移量

parent c9c1e685
...@@ -28,10 +28,10 @@ def kafka_consum(topic_name=None): ...@@ -28,10 +28,10 @@ def kafka_consum(topic_name=None):
consumser_obj.subscribe([topic_name, ]) consumser_obj.subscribe([topic_name, ])
try: try:
redis_topic_partition_name = "vest:topic_name:" + str(topic_name) redis_topic_partition_name = "vest:topic_name:" + str(topic_name)
topic_partition_info = redis_client.hgetall(redis_topic_partition_name) # topic_partition_info = redis_client.hgetall(redis_topic_partition_name)
for partition_id in topic_partition_info: # for partition_id in topic_partition_info:
print (int(partition_id.decode()),topic_name) # print (int(partition_id.decode()),topic_name)
consumser_obj.seek(TopicPartition(topic=topic_name,partition=int(partition_id.decode())),int(topic_partition_info[partition_id])) # consumser_obj.seek(TopicPartition(topic=topic_name,partition=int(partition_id.decode())),int(topic_partition_info[partition_id]))
while True: while True:
begin = time.time() begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30) msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment