Commit c636db5f authored by hukx.michael's avatar hukx.michael

init

parents
venv/
*.pyc
.vscode/
.idea/
.DS_Store
# encoding: utf-8
"""
"""
import threading
import time
import redis
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
import settings
class Monitor(object):
def __init__(self):
self.rs = {redis_url: redis.StrictRedis(*redis_url.split(':')) for redis_url in settings.REDIS_URL}
self.keyprefix_queue = '_kombu.binding.'
self.sep = '\x06\x16'
self.redis_queue_map = {}
def refresh_redis_queue_map(self):
for url, r in self.rs.items():
exchanges = filter(lambda x: not x[len(self.keyprefix_queue):].startswith('celeryev')
and not x.endswith('pidbox'),
r.keys('%s*' % self.keyprefix_queue))
for exchange in exchanges:
self.redis_queue_map[url] = [queue_str.split(self.sep)[-1] for queue_str in r.smembers(exchange)]
def get_jam_tasks_nums(self):
"""
获取阻塞的tasks数
"""
res = {}
for redis_url, queue_list in self.redis_queue_map.items():
for queue in queue_list:
r = self.rs.get(redis_url)
num = r.llen(queue)
key = '%s|%s' % (queue, redis_url)
res[key] = num
return res
class Reporter(object):
def push_to_pushgateway(self, content):
"""
推送数据到pushgateway
"""
registry = CollectorRegistry()
g = Gauge('jam_tasks_number', 'jam task number', ['queue', 'redis_broker'], registry=registry)
for queue_redis, jam_tasks_num in content.items():
queue, redis_url = queue_redis.split('|')
g.labels(queue, redis_url).set(jam_tasks_num)
push_to_gateway(settings.PUSHGATEWAY_URL, job='jam_tasks', registry=registry)
def main():
monitor = Monitor()
reporter = Reporter()
# 每24h刷新 redis 与 queue 的映射
def refresh():
print('refresh')
monitor.refresh_redis_queue_map()
threading.Timer(settings.REFRESH_REDIS_QUEUE_MAP_TIME, refresh).start()
refresh()
while True:
res = monitor.get_jam_tasks_nums()
print(res)
reporter.push_to_pushgateway(res)
time.sleep(settings.REFRESH_TIME)
if __name__ == '__main__':
main()
# encoding: utf-8
""" 配置项
"""
# redis URL
REDIS_URL = [
'localhost',
]
# pushgateway URL
PUSHGATEWAY_URL = 'localhost:9091'
# 数据刷新时间间隔(s)
REFRESH_TIME = 5
# 更新redis与queue映射关系时间间隔 (s)
REFRESH_REDIS_QUEUE_MAP_TIME = 24*60*60
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment