Commit 61968268 authored by zhanglu's avatar zhanglu

zipkin监控

parents
from zipkin.api import Zipkin
from zipkin.api import ZIPKIN_VERION
from zipkin.parse import parse, list_exc_info
import socket
import http
import requests
from requests.exceptions import ConnectionError
class HTTPMethods(object):
GET = "get"
POST = "post"
class ZIPKIN_VERION(object):
V1 = "v1"
V2 = "v1"
class ReuqestError(Exception):
pass
class Zipkin(object):
"""A client for zipkin's V2 API.
https://zipkin.apache.org/zipkin-api/
"""
def __init__(self, domain, version, *args, **kwargs):
self.domain = domain
self.version = version
def _request(self, method, path, params=None, json=None, **kwargs):
"""send a request."""
url = "".join([self.domain, path])
if method == HTTPMethods.GET:
try:
res = requests.get(url=url, params=params, **kwargs)
except (socket.gaierror, ConnectionError):
raise ReuqestError("网络异常,无法连接")
elif method == HTTPMethods.POST:
try:
res = requests.post(url=url, data=params, json=json, **kwargs)
except (socket.gaierror, ConnectionError):
raise ReuqestError("网络异常,无法连接")
if res.status_code == http.HTTPStatus.OK:
try:
return res.json()
except:
return res.text
raise ReuqestError("request error: {}".format(res.text))
def services(self):
"""List services name."""
path = "/zipkin/api/{version}/services".format(version=self.version)
response = self._request(method=HTTPMethods.GET, path=path)
return response
def spans(self, service_name):
"""List service's spans."""
path = "/zipkin/api/{version}/spans".format(version=self.version)
spans = self._request(method=HTTPMethods.GET, path=path, params={"serviceName": service_name})
return spans
def traces(self, service_name, error=False, limit=100, lookback=30 * 1000):
"""List service's traces.
params: error: boolean. 如果位 True 则仅仅显示错误的trace.
params: lookback: 当前时间多久之前。如果是自定义时间模式,为custom,需要参数:startTs、endTs.
"""
path = "/zipkin/api/{version}/traces".format(version=self.version)
params = {
"serviceName": service_name,
"lookback": lookback,
"sortOrder": "timestamp-desc",
"limit": limit,
}
if error:
params["annotationQuery"] = {"error": True}
traces = self._request(method=HTTPMethods.GET, path=path, params=params)
return traces
File added
import json
from collections import defaultdict
from zipkin.span import Span, Endpoint, Annotation, BinaryAnnotation
from zipkin.trace import Trace
exc_info_key = "exc.info"
def parse(traces_data):
traces = []
for trace_data in traces_data:
spans = []
for span_data in trace_data:
span = Span(**span_data)
annotations_data = span_data.get("annotations", [])
annotations = []
for annotation_data in annotations_data:
annotation = Annotation(**annotation_data)
annotation.endpoint = Endpoint(**annotation_data.get("endpoint"))
annotations.append(annotation)
span.annotations = annotations
binaryAnnotations_data = span_data.get("binaryAnnotations", [])
binaryAnnotations = []
for binaryAnnotation_data in binaryAnnotations_data:
binaryAnnotation = BinaryAnnotation(**binaryAnnotation_data)
binaryAnnotation.endpoint = Endpoint(**annotation_data.get("endpoint"))
binaryAnnotations.append(binaryAnnotation)
span.binaryAnnotations = binaryAnnotations
spans.append(span)
traces.append(Trace(spans=spans))
return traces
def list_exc_info(traces):
exc_infos = defaultdict(list)
for trace in traces:
for span in trace.spans:
service_name = span.name
for item in span.binaryAnnotations:
if item.key != exc_info_key:
continue
exc_infos[service_name].append(item.value)
return exc_infos
class Span(object):
def __init__(self, **kwargs):
self.id = kwargs.get("id")
self.traceId = kwargs.get("traceId")
self.name = kwargs.get("name")
self.parentId = kwargs.get("parentId")
self.kind = kwargs.get("kind")
self.timestamp = kwargs.get("timestamp")
self.duration = kwargs.get("duration")
self.debug = True
self.shared = True
self.localEndpoint = []
self.remoteEndpoint = []
self.annotations = []
self.binaryAnnotations = []
class Endpoint(object):
def __init__(self, **kwargs):
self.serviceName = kwargs.get("serviceName")
self.ipv4 = kwargs.get("ipv4")
self.ipv6 = kwargs.get("ipv6")
self.port = kwargs.get("port")
class Annotation(object):
def __init__(self, **kwargs):
self.timestamp = kwargs.get("timestamp")
self.value = kwargs.get("value")
# Endpoint 对象
self.endpoint = None
class BinaryAnnotation(object):
def __init__(self, **kwargs):
self.key = kwargs.get("key")
self.value = kwargs.get("value")
# Endpoint 对象
self.endpoint = None
class Trace(object):
def __init__(self, spans=[]):
self.spans = spans
from zipkin import Zipkin, ZIPKIN_VERION
from zipkin import parse, list_exc_info
if __name__ == "__main__":
domain = "http://zipkin.gengmei.tx"
client = Zipkin(domain, version=ZIPKIN_VERION.V2)
# 当前时间30s之内产生的错误
traces_data = client.traces(service_name='backend', error=True, lookback=30 * 1000)
traces = parse(traces_data)
infos = list_exc_info(traces)
if infos:
for service_name, exc_infos in infos.items():
print("接口:{}".format(service_name))
for info in exc_infos:
print("异常信息:{}".format(info))
raise Exception("服务异常...")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment