Commit 70ae92a4 authored by lixiaofang's avatar lixiaofang

add

parent 2a1bcd75
......@@ -25,7 +25,8 @@ from moment.views.process_time import judge_offset_partition_have_consum, judge_
def kafka_consum(topic_name=None):
topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name
consumser_obj = KafkaConsumer(topic_name, bootstrap_servers=settings.KAFKA_BROKER_LIST, enable_auto_commit=True,
auto_commit_interval_ms=1, group_id="vest", auto_offset_reset='earliest')
auto_commit_interval_ms=1000, group_id="vest", auto_offset_reset='earliest',
session_timeout_ms=30000)
consumser_obj.subscribe([topic_name, ])
try:
# redis_topic_partition_name = "vest:topic_name:" + str(topic_name)
......@@ -35,7 +36,8 @@ def kafka_consum(topic_name=None):
# consumser_obj.seek(partition=TopicPartition(topic_name,int(partition_id.decode())),offset=int(topic_partition_info[partition_id]))
while True:
begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=1000, max_records=100)
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=50)
consumser_obj.commit_async()
for msg_value in msg_dict.values():
for msg in msg_value:
card_info = json.loads(str(msg.value, encoding="utf8"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment