Commit 73a1ca87 authored by lixiaofang's avatar lixiaofang

修改消费者偏移量

parent b57b718e
......@@ -28,10 +28,10 @@ def kafka_consum(topic_name=None):
consumser_obj.subscribe([topic_name, ])
try:
redis_topic_partition_name = "vest:topic_name:" + str(topic_name)
# topic_partition_info = redis_client.hgetall(redis_topic_partition_name)
# for partition_id in topic_partition_info:
# print (int(partition_id.decode()),topic_name)
# consumser_obj.seek(TopicPartition(topic=topic_name,partition=int(partition_id.decode())),int(topic_partition_info[partition_id]))
topic_partition_info = redis_client.hgetall(redis_topic_partition_name)
for partition_id in topic_partition_info:
print (int(partition_id.decode()),topic_name)
consumser_obj.seek(partition=TopicPartition(topic_name,int(partition_id.decode())),offset=int(topic_partition_info[partition_id]))
while True:
begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment