Commit f30202e8 authored by hukx.michael's avatar hukx.michael

init

parents
[run]
branch = True
source = prometheus_client
omit =
prometheus_client/decorator.py
[paths]
source =
prometheus_client
.tox/*/lib/python*/site-packages/prometheus_client
.tox/pypy/site-packages/prometheus_client
[report]
show_missing = True
\ No newline at end of file
build
dist
*.egg-info
*.pyc
*.swp
.coverage.*
.coverage
.tox
sudo: false
cache:
directories:
- $HOME/.cache/pip
# Pending https://github.com/travis-ci/travis-ci/issues/5027
before_install:
- |
if [ "$TRAVIS_PYTHON_VERSION" = "pypy" ]; then
export PYENV_ROOT="$HOME/.pyenv"
if [ -f "$PYENV_ROOT/bin/pyenv" ]; then
cd "$PYENV_ROOT" && git pull
else
rm -rf "$PYENV_ROOT" && git clone --depth 1 https://github.com/yyuu/pyenv.git "$PYENV_ROOT"
fi
export PYPY_VERSION="4.0.1"
"$PYENV_ROOT/bin/pyenv" install "pypy-$PYPY_VERSION"
virtualenv --python="$PYENV_ROOT/versions/pypy-$PYPY_VERSION/bin/python" "$HOME/virtualenvs/pypy-$PYPY_VERSION"
source "$HOME/virtualenvs/pypy-$PYPY_VERSION/bin/activate"
fi
language: python
matrix:
include:
- python: "2.6"
env: TOXENV=py26
- python: "2.7"
env: TOXENV=py27
- python: "2.7"
env: TOXENV=py27-nooptionals
- python: "3.4"
env: TOXENV=py34
- python: "3.5"
env: TOXENV=py35
- python: "3.5"
env: TOXENV=py35-nooptionals
- python: "pypy"
env: TOXENV=pypy
install:
- pip install tox
script:
- tox
notifications:
email: false
# Contributing
Prometheus uses GitHub to manage reviews of pull requests.
* If you have a trivial fix or improvement, go ahead and create a pull request,
addressing (with `@...`) the maintainer of this repository (see
[MAINTAINERS.md](MAINTAINERS.md)) in the description of the pull request.
* If you plan to do something more involved, first discuss your ideas
on our [mailing list](https://groups.google.com/forum/?fromgroups#!forum/prometheus-developers).
This will avoid unnecessary work and surely give you and us a good deal
of inspiration.
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
* Brian Brazil <brian.brazil@robustperception.io>
Prometheus instrumentation library for Python applications
Copyright 2015 The Prometheus Authors
This product bundles decorator 4.0.10 which is available under a "2-clause BSD"
license. For details, see prometheus_client/decorator.py.
# Prometheus Python Client
The official Python 2 and 3 client for [Prometheus](http://prometheus.io).
## Three Step Demo
**One**: Install the client:
```
pip install prometheus_client
```
**Two**: Paste the following into a Python interpreter:
```python
from prometheus_client import start_http_server, Summary
import random
import time
# Create a metric to track time spent and requests made.
REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request')
# Decorate function with metric.
@REQUEST_TIME.time()
def process_request(t):
"""A dummy function that takes some time."""
time.sleep(t)
if __name__ == '__main__':
# Start up the server to expose the metrics.
start_http_server(8000)
# Generate some requests.
while True:
process_request(random.random())
```
**Three**: Visit [http://localhost:8000/](http://localhost:8000/) to view the metrics.
From one easy to use decorator you get:
* `request_processing_seconds_count`: Number of times this function was called.
* `request_processing_seconds_sum`: Total amount of time spent in this function.
Prometheus's `rate` function allows calculation of both requests per second,
and latency over time from this data.
In addition if you're on Linux the `process` metrics expose CPU, memory and
other information about the process for free!
## Installation
```
pip install prometheus_client
```
This package can be found on
[PyPI](https://pypi.python.org/pypi/prometheus_client).
## Instrumenting
Four types of metric are offered: Counter, Gauge, Summary and Histogram.
See the documentation on [metric types](http://prometheus.io/docs/concepts/metric_types/)
and [instrumentation best practices](http://prometheus.io/docs/practices/instrumentation/#counter-vs.-gauge,-summary-vs.-histogram)
on how to use them.
### Counter
Counters go up, and reset when the process restarts.
```python
from prometheus_client import Counter
c = Counter('my_failures_total', 'Description of counter')
c.inc() # Increment by 1
c.inc(1.6) # Increment by given value
```
There are utilities to count exceptions raised:
```python
@c.count_exceptions()
def f():
pass
with c.count_exceptions():
pass
# Count only one type of exception
with c.count_exceptions(ValueError):
pass
```
### Gauge
Gauges can go up and down.
```python
from prometheus_client import Gauge
g = Gauge('my_inprogress_requests', 'Description of gauge')
g.inc() # Increment by 1
g.dec(10) # Decrement by given value
g.set(4.2) # Set to a given value
```
There are utilities for common use cases:
```python
g.set_to_current_time() # Set to current unixtime
# Increment when entered, decrement when exited.
@g.track_inprogress()
def f():
pass
with g.track_inprogress():
pass
```
A Gauge can also take its value from a callback:
```python
d = Gauge('data_objects', 'Number of objects')
my_dict = {}
d.set_function(lambda: len(my_dict))
```
### Summary
Summaries track the size and number of events.
```python
from prometheus_client import Summary
s = Summary('request_latency_seconds', 'Description of summary')
s.observe(4.7) # Observe 4.7 (seconds in this case)
```
There are utilities for timing code:
```python
@s.time()
def f():
pass
with s.time():
pass
```
The Python client doesn't store or expose quantile information at this time.
### Histogram
Histograms track the size and number of events in buckets.
This allows for aggregatable calculation of quantiles.
```python
from prometheus_client import Histogram
h = Histogram('request_latency_seconds', 'Description of histogram')
h.observe(4.7) # Observe 4.7 (seconds in this case)
```
The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds.
They can be overridden by passing `buckets` keyword argument to `Histogram`.
There are utilities for timing code:
```python
@h.time()
def f():
pass
with h.time():
pass
```
### Labels
All metrics can have labels, allowing grouping of related time series.
See the best practices on [naming](http://prometheus.io/docs/practices/naming/)
and [labels](http://prometheus.io/docs/practices/instrumentation/#use-labels).
Taking a counter as an example:
```python
from prometheus_client import Counter
c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
c.labels('get', '/').inc()
c.labels('post', '/submit').inc()
```
Labels can also be passed as keyword-arguments:
```python
from prometheus_client import Counter
c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
c.labels(method='get', endpoint='/').inc()
c.labels(method='post', endpoint='/submit').inc()
```
### Process Collector
The Python client automatically exports metrics about process CPU usage, RAM,
file descriptors and start time. These all have the prefix `process`, and
are only currently available on Linux.
The namespace and pid constructor arguments allows for exporting metrics about
other processes, for example:
```
ProcessCollector(namespace='mydaemon', pid=lambda: open('/var/run/daemon.pid').read())
```
### Platform Collector
The client also automatically exports some metadata about Python. If using Jython,
metadata about the JVM in use is also included. This information is available as
labels on the `python_info` metric. The value of the metric is 1, since it is the
labels that carry information.
## Exporting
There are several options for exporting metrics.
### HTTP
Metrics are usually exposed over HTTP, to be read by the Prometheus server.
The easiest way to do this is via `start_http_server`, which will start a HTTP
server in a daemon thread on the given port:
```python
from prometheus_client import start_http_server
start_http_server(8000)
```
Visit [http://localhost:8000/](http://localhost:8000/) to view the metrics.
To add Prometheus exposition to an existing HTTP server, see the `MetricsHandler` class
which provides a `BaseHTTPRequestHandler`. It also serves as a simple example of how
to write a custom endpoint.
#### Twisted
To use prometheus with [twisted](https://twistedmatrix.com/), there is `MetricsResource` which exposes metrics as a twisted resource.
```python
from prometheus_client.twisted import MetricsResource
from twisted.web.server import Site
from twisted.web.resource import Resource
from twisted.internet import reactor
root = Resource()
root.putChild(b'metrics', MetricsResource())
factory = Site(root)
reactor.listenTCP(8000, factory)
reactor.run()
```
#### WSGI
To use Prometheus with [WSGI](http://wsgi.readthedocs.org/en/latest/), there is
`make_wsgi_app` which creates a WSGI application.
```python
from prometheus_client import make_wsgi_app
from wsgiref.simple_server import make_server
app = make_wsgi_app()
httpd = make_server('', 8000, app)
httpd.serve_forever()
```
Such an application can be useful when integrating Prometheus metrics with WSGI
apps.
The method `start_wsgi_server` can be used to serve the metrics through the
WSGI reference implementation in a new thread.
```python
from prometheus_client import start_wsgi_server
start_wsgi_server(8000)
```
### Node exporter textfile collector
The [textfile collector](https://github.com/prometheus/node_exporter#textfile-collector)
allows machine-level statistics to be exported out via the Node exporter.
This is useful for monitoring cronjobs, or for writing cronjobs to expose metrics
about a machine system that the Node exporter does not support or would not make sense
to perform at every scrape (for example, anything involving subprocesses).
```python
from prometheus_client import CollectorRegistry, Gauge, write_to_textfile
registry = CollectorRegistry()
g = Gauge('raid_status', '1 if raid array is okay', registry=registry)
g.set(1)
write_to_textfile('/configured/textfile/path/raid.prom', registry)
```
A separate registry is used, as the default registry may contain other metrics
such as those from the Process Collector.
## Exporting to a Pushgateway
The [Pushgateway](https://github.com/prometheus/pushgateway)
allows ephemeral and batch jobs to expose their metrics to Prometheus.
```python
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
registry = CollectorRegistry()
g = Gauge('job_last_success_unixtime', 'Last time a batch job successfully finished', registry=registry)
g.set_to_current_time()
push_to_gateway('localhost:9091', job='batchA', registry=registry)
```
A separate registry is used, as the default registry may contain other metrics
such as those from the Process Collector.
Pushgateway functions take a grouping key. `push_to_gateway` replaces metrics
with the same grouping key, `pushadd_to_gateway` only replaces metrics with the
same name and grouping key and `delete_from_gateway` deletes metrics with the
given job and grouping key. See the
[Pushgateway documentation](https://github.com/prometheus/pushgateway/blob/master/README.md)
for more information.
`instance_ip_grouping_key` returns a grouping key with the instance label set
to the host's IP address.
### Handlers for authentication
If the push gateway you are connecting to is protected with HTTP Basic Auth,
you can use a special handler to set the Authorization header.
```python
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from prometheus_client.exposition import basic_auth_handler
def my_auth_handler(url, method, timeout, headers, data):
username = 'foobar'
password = 'secret123'
return basic_auth_handler(url, method, timeout, headers, data, username, password)
registry = CollectorRegistry()
g = Gauge('job_last_success_unixtime', 'Last time a batch job successfully finished', registry=registry)
g.set_to_current_time()
push_to_gateway('localhost:9091', job='batchA', registry=registry, handler=my_auth_handler)
```
## Bridges
It is also possible to expose metrics to systems other than Prometheus.
This allows you to take advantage of Prometheus instrumentation even
if you are not quite ready to fully transition to Prometheus yet.
### Graphite
Metrics are pushed over TCP in the Graphite plaintext format.
```python
from prometheus_client.bridge.graphite import GraphiteBridge
gb = GraphiteBridge(('graphite.your.org', 2003))
# Push once.
gb.push()
# Push every 10 seconds in a daemon thread.
gb.start(10.0)
```
## Custom Collectors
Sometimes it is not possible to directly instrument code, as it is not
in your control. This requires you to proxy metrics from other systems.
To do so you need to create a custom collector, for example:
```python
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY
class CustomCollector(object):
def collect(self):
yield GaugeMetricFamily('my_gauge', 'Help text', value=7)
c = CounterMetricFamily('my_counter_total', 'Help text', labels=['foo'])
c.add_metric(['bar'], 1.7)
c.add_metric(['baz'], 3.8)
yield c
REGISTRY.register(CustomCollector())
```
`SummaryMetricFamily` and `HistogramMetricFamily` work similarly.
A collector may implement a `describe` method which returns metrics in the same
format as `collect` (though you don't have to include the samples). This is
used to predetermine the names of time series a `CollectorRegistry` exposes and
thus to detect collisions and duplicate registrations.
Usually custom collectors do not have to implement `describe`. If `describe` is
not implemented and the CollectorRegistry was created with `auto_desribe=True`
(which is the case for the default registry) then `collect` will be called at
registration time instead of `describe`. If this could cause problems, either
implement a proper `describe`, or if that's not practical have `describe`
return an empty list.
## Multiprocess Mode (Gunicorn)
Prometheus client libaries presume a threaded model, where metrics are shared
across workers. This doesn't work so well for languages such as Python where
it's common to have processes rather than threads to handle large workloads.
To handle this the client library can be put in multiprocess mode.
This comes with a number of limitations:
- Registries can not be used as normal, all instantiated metrics are exported
- Custom collectors do not work (e.g. cpu and memory metrics)
- The pushgateway cannot be used
- Gauges cannot use the `pid` label
- Gunicorn's `preload_app` feature and equivalents are not supported
There's several steps to getting this working:
**One**: Gunicorn deployment
The `prometheus_multiproc_dir` environment variable must be set to a directory
that the client library can use for metrics. This directory must be wiped
between Gunicorn runs (before startup is recommended).
Put the following in the config file:
```python
from prometheus_client import multiprocess
def child_exit(server, worker):
multiprocess.mark_process_dead(worker.pid)
```
**Two**: Inside the application
```python
from prometheus_client import multiprocess
from prometheus_client import generate_latest, CollectorRegistry, CONTENT_TYPE_LATEST, Gauge
# Example gauge.
IN_PROGRESS = Gauge("inprogress_requests", "help", multiprocess_mode='livesum')
# Expose metrics.
@IN_PROGRESS.track_inprogress()
def app(environ, start_response):
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
data = generate_latest(registry)
status = '200 OK'
response_headers = [
('Content-type', CONTENT_TYPE_LATEST),
('Content-Length', str(len(data)))
]
start_response(status, response_headers)
return iter([data])
```
**Three**: Instrumentation
Counters, Summarys and Histograms work as normal.
Gauges have several modes they can run in, which can be selected with the
`multiprocess_mode` parameter.
- 'all': Default. Return a timeseries per process alive or dead.
- 'liveall': Return a timeseries per process that is still alive.
- 'livesum': Return a single timeseries that is the sum of the values of alive processes.
- 'max': Return a single timeseries that is the maximum of the values of all processes, alive or dead.
- 'min': Return a single timeseries that is the minimum of the values of all processes, alive or dead.
## Parser
The Python client supports parsing the Prometheus text format.
This is intended for advanced use cases where you have servers
exposing Prometheus metrics and need to get them into some other
system.
```python
from prometheus_client.parser import text_string_to_metric_families
for family in text_string_to_metric_families(u"my_gauge 1.0\n"):
for sample in family.samples:
print("Name: {0} Labels: {1} Value: {2}".format(*sample))
```
#!/usr/bin/python
from . import core
from . import exposition
from . import process_collector
from . import platform_collector
__all__ = ['Counter', 'Gauge', 'Summary', 'Histogram']
# http://stackoverflow.com/questions/19913653/no-unicode-in-all-for-a-packages-init
__all__ = [n.encode('ascii') for n in __all__]
CollectorRegistry = core.CollectorRegistry
REGISTRY = core.REGISTRY
Metric = core.Metric
Counter = core.Counter
Gauge = core.Gauge
Summary = core.Summary
Histogram = core.Histogram
CONTENT_TYPE_LATEST = exposition.CONTENT_TYPE_LATEST
generate_latest = exposition.generate_latest
MetricsHandler = exposition.MetricsHandler
make_wsgi_app = exposition.make_wsgi_app
start_http_server = exposition.start_http_server
start_wsgi_server = exposition.start_wsgi_server
write_to_textfile = exposition.write_to_textfile
push_to_gateway = exposition.push_to_gateway
pushadd_to_gateway = exposition.pushadd_to_gateway
delete_from_gateway = exposition.delete_from_gateway
instance_ip_grouping_key = exposition.instance_ip_grouping_key
ProcessCollector = process_collector.ProcessCollector
PROCESS_COLLECTOR = process_collector.PROCESS_COLLECTOR
PlatformCollector = platform_collector.PlatformCollector
PLATFORM_COLLECTOR = platform_collector.PLATFORM_COLLECTOR
if __name__ == '__main__':
c = Counter('cc', 'A counter')
c.inc()
g = Gauge('gg', 'A gauge')
g.set(17)
s = Summary('ss', 'A summary', ['a', 'b'])
s.labels('c', 'd').observe(17)
h = Histogram('hh', 'A histogram')
h.observe(.6)
start_http_server(8000)
import time
while True:
time.sleep(1)
#!/usr/bin/python
from __future__ import unicode_literals
import logging
import re
import socket
import time
import threading
from timeit import default_timer
from .. import core
# Roughly, have to keep to what works as a file name.
# We also remove periods, so labels can be distinguished.
_INVALID_GRAPHITE_CHARS = re.compile(r"[^a-zA-Z0-9_-]")
def _sanitize(s):
return _INVALID_GRAPHITE_CHARS.sub('_', s)
class _RegularPush(threading.Thread):
def __init__(self, pusher, interval, prefix):
super(_RegularPush, self).__init__()
self._pusher = pusher
self._interval = interval
self._prefix = prefix
def run(self):
wait_until = default_timer()
while True:
while True:
now = default_timer()
if now >= wait_until:
# May need to skip some pushes.
while wait_until < now:
wait_until += self._interval
break
# time.sleep can return early.
time.sleep(wait_until - now)
try:
self._pusher.push(prefix=self._prefix)
except IOError:
logging.exception("Push failed")
class GraphiteBridge(object):
def __init__(self, address, registry=core.REGISTRY, timeout_seconds=30, _timer=time.time):
self._address = address
self._registry = registry
self._timeout = timeout_seconds
self._timer = _timer
def push(self, prefix=''):
now = int(self._timer())
output = []
prefixstr = ''
if prefix:
prefixstr = prefix + '.'
for metric in self._registry.collect():
for name, labels, value in metric.samples:
if labels:
labelstr = '.' + '.'.join(
['{0}.{1}'.format(
_sanitize(k), _sanitize(v))
for k, v in sorted(labels.items())])
else:
labelstr = ''
output.append('{0}{1}{2} {3} {4}\n'.format(
prefixstr, _sanitize(name), labelstr, float(value), now))
conn = socket.create_connection(self._address, self._timeout)
conn.sendall(''.join(output).encode('ascii'))
conn.close()
def start(self, interval=60.0, prefix=''):
t = _RegularPush(self, interval, prefix)
t.daemon = True
t.start()
#!/usr/bin/python
from __future__ import unicode_literals
import copy
import json
import math
import mmap
import os
import re
import struct
import time
import types
try:
from BaseHTTPServer import BaseHTTPRequestHandler
except ImportError:
# Python 3
unicode = str
from threading import Lock
from timeit import default_timer
from .decorator import decorate
_METRIC_NAME_RE = re.compile(r'^[a-zA-Z_:][a-zA-Z0-9_:]*$')
_METRIC_LABEL_NAME_RE = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
_RESERVED_METRIC_LABEL_NAME_RE = re.compile(r'^__.*$')
_INF = float("inf")
_MINUS_INF = float("-inf")
_INITIAL_MMAP_SIZE = 1024*1024
class CollectorRegistry(object):
'''Metric collector registry.
Collectors must have a no-argument method 'collect' that returns a list of
Metric objects. The returned metrics should be consistent with the Prometheus
exposition formats.
'''
def __init__(self, auto_describe=False):
self._collector_to_names = {}
self._names_to_collectors = {}
self._auto_describe = auto_describe
self._lock = Lock()
def register(self, collector):
'''Add a collector to the registry.'''
with self._lock:
names = self._get_names(collector)
for name in names:
if name in self._names_to_collectors:
raise ValueError('Timeseries already present '
'in CollectorRegistry: ' + name)
for name in names:
self._names_to_collectors[name] = collector
self._collector_to_names[collector] = names
def unregister(self, collector):
'''Remove a collector from the registry.'''
with self._lock:
for name in self._collector_to_names[collector]:
del self._names_to_collectors[name]
del self._collector_to_names[collector]
def _get_names(self, collector):
'''Get names of timeseries the collector produces.'''
desc_func = None
# If there's a describe function, use it.
try:
desc_func = collector.describe
except AttributeError:
pass
# Otherwise, if auto describe is enabled use the collect function.
if not desc_func and self._auto_describe:
desc_func = collector.collect
if not desc_func:
return []
result = []
type_suffixes = {
'summary': ['', '_sum', '_count'],
'histogram': ['_bucket', '_sum', '_count']
}
for metric in desc_func():
for suffix in type_suffixes.get(metric.type, ['']):
result.append(metric.name + suffix)
return result
def collect(self):
'''Yields metrics from the collectors in the registry.'''
collectors = None
with self._lock:
collectors = copy.copy(self._collector_to_names)
for collector in collectors:
for metric in collector.collect():
yield metric
def restricted_registry(self, names):
'''Returns object that only collects some metrics.
Returns an object which upon collect() will return
only samples with the given names.
Intended usage is:
generate_latest(REGISTRY.restricted_registry(['a_timeseries']))
Experimental.'''
names = set(names)
collectors = set()
with self._lock:
for name in names:
if name in self._names_to_collectors:
collectors.add(self._names_to_collectors[name])
metrics = []
for collector in collectors:
for metric in collector.collect():
samples = [s for s in metric.samples if s[0] in names]
if samples:
m = Metric(metric.name, metric.documentation, metric.type)
m.samples = samples
metrics.append(m)
class RestrictedRegistry(object):
def collect(self):
return metrics
return RestrictedRegistry()
def get_sample_value(self, name, labels=None):
'''Returns the value: a tuple with value and timestamp if
if timestamp is present, or just the non-tuple value if no
timestamp, or None if not found.
This is inefficient, and intended only for use in unittests.
'''
if labels is None:
labels = {}
for metric in self.collect():
for n, l, value in metric.samples:
if n == name and l == labels:
if value[1] is not None:
return value
else:
return value[0]
return None
REGISTRY = CollectorRegistry(auto_describe=True)
'''The default registry.'''
_METRIC_TYPES = ('counter', 'gauge', 'summary', 'histogram', 'untyped')
class Metric(object):
'''A single metric family and its samples.
This is intended only for internal use by the instrumentation client.
Custom collectors should use GaugeMetricFamily, CounterMetricFamily
and SummaryMetricFamily instead.
'''
def __init__(self, name, documentation, typ):
self.name = name
self.documentation = documentation
if typ not in _METRIC_TYPES:
raise ValueError('Invalid metric type: ' + typ)
self.type = typ
self.samples = []
def add_sample(self, name, labels, value):
'''Add a sample to the metric.
Internal-only, do not use.'''
self.samples.append((name, labels, value))
def __eq__(self, other):
return (isinstance(other, Metric)
and self.name == other.name
and self.documentation == other.documentation
and self.type == other.type
and self.samples == other.samples)
class CounterMetricFamily(Metric):
'''A single counter and its samples.
For use by custom collectors.
'''
def __init__(self, name, documentation, value=None, labels=None, timestamp=None):
Metric.__init__(self, name, documentation, 'counter')
if labels is not None and value is not None:
raise ValueError('Can only specify at most one of value and labels.')
if labels is None:
labels = []
self._labelnames = labels
if value is not None:
self.add_metric([], value, timestamp)
def add_metric(self, labels, value, timestamp=None):
'''Add a metric to the metric family.
Args:
labels: A list of label values
value: The value of the metric.
'''
self.samples.append((self.name, dict(zip(self._labelnames, labels)), (value, timestamp)))
class GaugeMetricFamily(Metric):
'''A single gauge and its samples.
For use by custom collectors.
'''
def __init__(self, name, documentation, value=None, labels=None, timestamp=None):
Metric.__init__(self, name, documentation, 'gauge')
if labels is not None and value is not None:
raise ValueError('Can only specify at most one of value and labels.')
if labels is None:
labels = []
self._labelnames = labels
if value is not None:
self.add_metric([], value, timestamp)
def add_metric(self, labels, value, timestamp=None):
'''Add a metric to the metric family.
Args:
labels: A list of label values
value: A float
'''
self.samples.append((self.name, dict(zip(self._labelnames, labels)), (value, timestamp)))
class SummaryMetricFamily(Metric):
'''A single summary and its samples.
For use by custom collectors.
'''
def __init__(self, name, documentation, count_value=None, sum_value=None, labels=None, timestamp=None):
Metric.__init__(self, name, documentation, 'summary')
if (sum_value is None) != (count_value is None):
raise ValueError('count_value and sum_value must be provided together.')
if labels is not None and count_value is not None:
raise ValueError('Can only specify at most one of value and labels.')
if labels is None:
labels = []
self._labelnames = labels
if count_value is not None:
self.add_metric([], count_value, sum_value, timestamp)
def add_metric(self, labels, count_value, sum_value, timestamp=None):
'''Add a metric to the metric family.
Args:
labels: A list of label values
count_value: The count value of the metric.
sum_value: The sum value of the metric.
'''
self.samples.append((self.name + '_count', dict(zip(self._labelnames, labels)), (count_value, timestamp)))
self.samples.append((self.name + '_sum', dict(zip(self._labelnames, labels)), (sum_value, timestamp)))
class HistogramMetricFamily(Metric):
'''A single histogram and its samples.
For use by custom collectors.
'''
def __init__(self, name, documentation, buckets=None, sum_value=None, labels=None, timestamp=None):
Metric.__init__(self, name, documentation, 'histogram')
if (sum_value is None) != (buckets is None):
raise ValueError('buckets and sum_value must be provided together.')
if labels is not None and buckets is not None:
raise ValueError('Can only specify at most one of buckets and labels.')
if labels is None:
labels = []
self._labelnames = labels
if buckets is not None:
self.add_metric([], buckets, sum_value, timestamp)
def add_metric(self, labels, buckets, sum_value, timestamp=None):
'''Add a metric to the metric family.
Args:
labels: A list of label values
buckets: A list of pairs of bucket names and values.
The buckets must be sorted, and +Inf present.
sum_value: The sum value of the metric.
'''
for bucket, value in buckets:
self.samples.append((self.name + '_bucket', dict(list(zip(self._labelnames, labels)) + [('le', bucket)]), (value, timestamp)))
# +Inf is last and provides the count value.
self.samples.append((self.name + '_count', dict(zip(self._labelnames, labels)), (buckets[-1][1], timestamp)))
self.samples.append((self.name + '_sum', dict(zip(self._labelnames, labels)), (sum_value, timestamp)))
class _MutexValue(object):
'''A [float value, longlong timestamp] list protected by a mutex.'''
_multiprocess = False
def __init__(self, typ, metric_name, name, labelnames, labelvalues, **kwargs):
self._value = [0.0, None]
self._lock = Lock()
def inc(self, amount, timestamp=None):
with self._lock:
self._value[0] += amount
self._value[1] = timestamp
def set(self, value, timestamp=None):
with self._lock:
self._value[0] = value
self._value[1] = timestamp
def get(self):
with self._lock:
return tuple(self._value)
class _MmapedDict(object):
"""A dict of doubles, backed by an mmapped file.
The file starts with a 4 byte int, indicating how much of it is used.
Then 4 bytes of padding.
There's then a number of entries, consisting of a 4 byte int which is the
size of the next field, a utf-8 encoded string key, padding to a 8 byte
alignment, and then a 8 byte float which is the value, and an 8 byte
int which is the timestamp (in milliseconds).
Not thread safe.
"""
def __init__(self, filename):
self._f = open(filename, 'a+b')
if os.fstat(self._f.fileno()).st_size == 0:
self._f.truncate(_INITIAL_MMAP_SIZE)
self._capacity = os.fstat(self._f.fileno()).st_size
self._m = mmap.mmap(self._f.fileno(), self._capacity)
self._positions = {}
self._used = struct.unpack_from(b'i', self._m, 0)[0]
if self._used == 0:
self._used = 8
struct.pack_into(b'i', self._m, 0, self._used)
else:
for key, _, pos in self._read_all_values():
self._positions[key] = pos
def _init_value(self, key):
"""Initialize a value. Lock must be held by caller."""
encoded = key.encode('utf-8')
# Pad to be 8-byte aligned.
padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8))
value = struct.pack('i{0}sdq'.format(len(padded)).encode(), len(encoded), padded, 0.0, 0)
while self._used + len(value) > self._capacity:
self._capacity *= 2
self._f.truncate(self._capacity)
self._m = mmap.mmap(self._f.fileno(), self._capacity)
self._m[self._used:self._used + len(value)] = value
# Update how much space we've used.
self._used += len(value)
struct.pack_into(b'i', self._m, 0, self._used)
self._positions[key] = self._used - 16 # d + q
def _read_all_values(self):
"""Yield (key, (value, timestamp), pos). No locking is performed."""
pos = 8
while pos < self._used:
encoded_len = struct.unpack_from(b'i', self._m, pos)[0]
pos += 4
encoded = struct.unpack_from('{0}s'.format(encoded_len).encode(), self._m, pos)[0]
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
pos += padded_len
value = struct.unpack_from(b'dq', self._m, pos)
if value[1] == 0:
value = (value[0], None)
yield encoded.decode('utf-8'), value, pos
pos += 16 # d + q
def read_all_values(self):
"""Yield (key, value). No locking is performed."""
for k, v, _ in self._read_all_values():
yield k, v
def read_value(self, key):
if key not in self._positions:
self._init_value(key)
pos = self._positions[key]
# XXX Does this need locking?
value = struct.unpack_from(b'dq', self._m, pos)
if value[1] == 0:
value = (value[0], None)
return value
def write_value(self, key, value):
if key not in self._positions:
self._init_value(key)
pos = self._positions[key]
# XXX Locking?
if value[1] == None:
value = (value[0], 0)
struct.pack_into(b'dq', self._m, pos, *value)
def close(self):
if self._f:
self._f.close()
self._f = None
def _MultiProcessValue(__pid=os.getpid()):
pid = __pid
files = {}
# Use a single global lock when in multi-processing mode
# as we presume this means there is no threading going on.
# This avoids the need to also have mutexes in __MmapDict.
lock = Lock()
class _MmapedValue(object):
'''A (float, longlong) tuple protected by a mutex backed by a per-process mmaped file.'''
_multiprocess = True
def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs):
if typ == 'gauge':
file_prefix = typ + '_' + multiprocess_mode
else:
file_prefix = typ
with lock:
if file_prefix not in files:
filename = os.path.join(
os.environ['prometheus_multiproc_dir'], '{0}_{1}.db'.format(file_prefix, pid))
files[file_prefix] = _MmapedDict(filename)
self._file = files[file_prefix]
self._key = json.dumps((metric_name, name, labelnames, labelvalues))
self._value = list(self._file.read_value(self._key))
def inc(self, amount, timestamp=None):
with lock:
self._value[0] += amount
self._value[1] = timestamp
self._file.write_value(self._key, self._value)
def set(self, value, timestamp=None):
with lock:
self._value[0] = value
self._value[1] = timestamp
self._file.write_value(self._key, self._value)
def get(self):
with lock:
return tuple(self._value)
return _MmapedValue
# Should we enable multi-process mode?
# This needs to be chosen before the first metric is constructed,
# and as that may be in some arbitrary library the user/admin has
# no control over we use an enviroment variable.
if 'prometheus_multiproc_dir' in os.environ:
_ValueClass = _MultiProcessValue()
else:
_ValueClass = _MutexValue
class _LabelWrapper(object):
'''Handles labels for the wrapped metric.'''
def __init__(self, wrappedClass, name, labelnames, **kwargs):
self._wrappedClass = wrappedClass
self._type = wrappedClass._type
self._name = name
self._labelnames = labelnames
self._kwargs = kwargs
self._lock = Lock()
self._metrics = {}
for l in labelnames:
if l.startswith('__'):
raise ValueError('Invalid label metric name: ' + l)
def labels(self, *labelvalues, **labelkwargs):
'''Return the child for the given labelset.
All metrics can have labels, allowing grouping of related time series.
Taking a counter as an example:
from prometheus_client import Counter
c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
c.labels('get', '/').inc()
c.labels('post', '/submit').inc()
Labels can also be provided as keyword arguments:
from prometheus_client import Counter
c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
c.labels(method='get', endpoint='/').inc()
c.labels(method='post', endpoint='/submit').inc()
See the best practices on [naming](http://prometheus.io/docs/practices/naming/)
and [labels](http://prometheus.io/docs/practices/instrumentation/#use-labels).
'''
if labelvalues and labelkwargs:
raise ValueError("Can't pass both *args and **kwargs")
if labelkwargs:
if sorted(labelkwargs) != sorted(self._labelnames):
raise ValueError('Incorrect label names')
labelvalues = tuple([unicode(labelkwargs[l]) for l in self._labelnames])
else:
if len(labelvalues) != len(self._labelnames):
raise ValueError('Incorrect label count')
labelvalues = tuple([unicode(l) for l in labelvalues])
with self._lock:
if labelvalues not in self._metrics:
self._metrics[labelvalues] = self._wrappedClass(self._name, self._labelnames, labelvalues, **self._kwargs)
return self._metrics[labelvalues]
def remove(self, *labelvalues):
'''Remove the given labelset from the metric.'''
if len(labelvalues) != len(self._labelnames):
raise ValueError('Incorrect label count')
labelvalues = tuple([unicode(l) for l in labelvalues])
with self._lock:
del self._metrics[labelvalues]
def _samples(self):
with self._lock:
metrics = self._metrics.copy()
for labels, metric in metrics.items():
series_labels = list(dict(zip(self._labelnames, labels)).items())
for suffix, sample_labels, value in metric._samples():
yield (suffix, dict(series_labels + list(sample_labels.items())), value)
def _MetricWrapper(cls):
'''Provides common functionality for metrics.'''
def init(name, documentation, labelnames=(), namespace='', subsystem='', registry=REGISTRY, **kwargs):
full_name = ''
if namespace:
full_name += namespace + '_'
if subsystem:
full_name += subsystem + '_'
full_name += name
if labelnames:
labelnames = tuple(labelnames)
for l in labelnames:
if not _METRIC_LABEL_NAME_RE.match(l):
raise ValueError('Invalid label metric name: ' + l)
if _RESERVED_METRIC_LABEL_NAME_RE.match(l):
raise ValueError('Reserved label metric name: ' + l)
if l in cls._reserved_labelnames:
raise ValueError('Reserved label metric name: ' + l)
collector = _LabelWrapper(cls, full_name, labelnames, **kwargs)
else:
collector = cls(full_name, (), (), **kwargs)
if not _METRIC_NAME_RE.match(full_name):
raise ValueError('Invalid metric name: ' + full_name)
def describe():
return [Metric(full_name, documentation, cls._type)]
collector.describe = describe
def collect():
metric = Metric(full_name, documentation, cls._type)
for suffix, labels, value in collector._samples():
metric.add_sample(full_name + suffix, labels, value)
return [metric]
collector.collect = collect
if registry:
registry.register(collector)
return collector
init.__wrapped__ = cls
return init
@_MetricWrapper
class Counter(object):
'''A Counter tracks counts of events or running totals.
Example use cases for Counters:
- Number of requests processed
- Number of items that were inserted into a queue
- Total amount of data that a system has processed
Counters can only go up (and be reset when the process restarts). If your use case can go down,
you should use a Gauge instead.
An example for a Counter:
from prometheus_client import Counter
c = Counter('my_failures_total', 'Description of counter')
c.inc() # Increment by 1
c.inc(1.6) # Increment by given value
There are utilities to count exceptions raised:
@c.count_exceptions()
def f():
pass
with c.count_exceptions():
pass
# Count only one type of exception
with c.count_exceptions(ValueError):
pass
'''
_type = 'counter'
_reserved_labelnames = []
def __init__(self, name, labelnames, labelvalues):
self._value = _ValueClass(self._type, name, name, labelnames, labelvalues)
def inc(self, amount=1, timestamp=None):
'''Increment counter by the given amount.'''
if amount < 0:
raise ValueError('Counters can only be incremented by non-negative amounts.')
self._value.inc(amount, timestamp)
def count_exceptions(self, exception=Exception):
'''Count exceptions in a block of code or function.
Can be used as a function decorator or context manager.
Increments the counter when an exception of the given
type is raised up out of the code.
'''
return _ExceptionCounter(self, exception)
def _samples(self):
return (('', {}, self._value.get()), )
@_MetricWrapper
class Gauge(object):
'''Gauge metric, to report instantaneous values.
Examples of Gauges include:
- Inprogress requests
- Number of items in a queue
- Free memory
- Total memory
- Temperature
Gauges can go both up and down.
from prometheus_client import Gauge
g = Gauge('my_inprogress_requests', 'Description of gauge')
g.inc() # Increment by 1
g.dec(10) # Decrement by given value
g.set(4.2) # Set to a given value
There are utilities for common use cases:
g.set_to_current_time() # Set to current unixtime
# Increment when entered, decrement when exited.
@g.track_inprogress()
def f():
pass
with g.track_inprogress():
pass
A Gauge can also take its value from a callback:
d = Gauge('data_objects', 'Number of objects')
my_dict = {}
d.set_function(lambda: len(my_dict))
'''
_type = 'gauge'
_reserved_labelnames = []
def __init__(self, name, labelnames, labelvalues, multiprocess_mode='all'):
if (_ValueClass._multiprocess
and multiprocess_mode not in ['min', 'max', 'livesum', 'liveall', 'all']):
raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode)
self._value = _ValueClass(self._type, name, name, labelnames,
labelvalues, multiprocess_mode=multiprocess_mode)
def inc(self, amount=1, timestamp=None):
'''Increment gauge by the given amount.'''
self._value.inc(amount, timestamp)
def dec(self, amount=1, timestamp=None):
'''Decrement gauge by the given amount.'''
self._value.inc(-amount, timestamp)
def set(self, value, timestamp=None):
'''Set gauge to the given value.'''
self._value.set(float(value), timestamp)
def set_to_current_time(self):
'''Set gauge to the current unixtime.'''
self.set(time.time())
def track_inprogress(self):
'''Track inprogress blocks of code or functions.
Can be used as a function decorator or context manager.
Increments the gauge when the code is entered,
and decrements when it is exited.
'''
return _InprogressTracker(self)
def time(self):
'''Time a block of code or function, and set the duration in seconds.
Can be used as a function decorator or context manager.
'''
return _GaugeTimer(self)
def set_function(self, f):
'''Call the provided function to return the Gauge value.
The function must return a (float, longint) tuple, and may be called from
multiple threads. All other methods of the Gauge become NOOPs.
'''
def samples(self):
val = f()
return (('', {}, (float(val[0]), val[1])), )
self._samples = types.MethodType(samples, self)
def _samples(self):
return (('', {}, self._value.get()), )
@_MetricWrapper
class Summary(object):
'''A Summary tracks the size and number of events.
Example use cases for Summaries:
- Response latency
- Request size
Example for a Summary:
from prometheus_client import Summary
s = Summary('request_size_bytes', 'Request size (bytes)')
s.observe(512) # Observe 512 (bytes)
Example for a Summary using time:
from prometheus_client import Summary
REQUEST_TIME = Summary('response_latency_seconds', 'Response latency (seconds)')
@REQUEST_TIME.time()
def create_response(request):
"""A dummy function"""
time.sleep(1)
Example for using the same Summary object as a context manager:
with REQUEST_TIME.time():
pass # Logic to be timed
'''
_type = 'summary'
_reserved_labelnames = ['quantile']
def __init__(self, name, labelnames, labelvalues):
self._count = _ValueClass(self._type, name, name + '_count', labelnames, labelvalues)
self._sum = _ValueClass(self._type, name, name + '_sum', labelnames, labelvalues)
def observe(self, amount, timestamp=None):
'''Observe the given amount.'''
self._count.inc(1, timestamp)
self._sum.inc(amount, timestamp)
def time(self):
'''Time a block of code or function, and observe the duration in seconds.
Can be used as a function decorator or context manager.
'''
return _SummaryTimer(self)
def _samples(self):
return (
('_count', {}, self._count.get()),
('_sum', {}, self._sum.get()))
def _floatToGoString(d):
if d == _INF:
return '+Inf'
elif d == _MINUS_INF:
return '-Inf'
elif math.isnan(d):
return 'NaN'
else:
return repr(float(d))
@_MetricWrapper
class Histogram(object):
'''A Histogram tracks the size and number of events in buckets.
You can use Histograms for aggregatable calculation of quantiles.
Example use cases:
- Response latency
- Request size
Example for a Histogram:
from prometheus_client import Histogram
h = Histogram('request_size_bytes', 'Request size (bytes)')
h.observe(512) # Observe 512 (bytes)
Example for a Histogram using time:
from prometheus_client import Histogram
REQUEST_TIME = Histogram('response_latency_seconds', 'Response latency (seconds)')
@REQUEST_TIME.time()
def create_response(request):
"""A dummy function"""
time.sleep(1)
Example of using the same Histogram object as a context manager:
with REQUEST_TIME.time():
pass # Logic to be timed
The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds.
They can be overridden by passing `buckets` keyword argument to `Histogram`.
**NB** The Python client doesn't store or expose quantile information at this time.
'''
_type = 'histogram'
_reserved_labelnames = ['histogram']
def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, _INF)):
self._sum = _ValueClass(self._type, name, name + '_sum', labelnames, labelvalues)
buckets = [float(b) for b in buckets]
if buckets != sorted(buckets):
# This is probably an error on the part of the user,
# so raise rather than sorting for them.
raise ValueError('Buckets not in sorted order')
if buckets and buckets[-1] != _INF:
buckets.append(_INF)
if len(buckets) < 2:
raise ValueError('Must have at least two buckets')
self._upper_bounds = buckets
self._buckets = []
bucket_labelnames = labelnames + ('le',)
for b in buckets:
self._buckets.append(_ValueClass(self._type, name, name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),)))
self._latest_timestamp = None
def observe(self, amount, timestamp=None):
'''Observe the given amount.'''
self._sum.inc(amount, timestamp)
self._latest_timestamp = timestamp
for i, bound in enumerate(self._upper_bounds):
if amount <= bound:
self._buckets[i].inc(1, timestamp)
break
def time(self):
'''Time a block of code or function, and observe the duration in seconds.
Can be used as a function decorator or context manager.
'''
return _HistogramTimer(self)
def _samples(self):
samples = []
acc = 0
ts = self._latest_timestamp
for i, bound in enumerate(self._upper_bounds):
acc += self._buckets[i].get()[0]
samples.append(('_bucket', {'le': _floatToGoString(bound)}, (acc, ts)))
samples.append(('_count', {}, (acc, ts)))
samples.append(('_sum', {}, self._sum.get()))
return tuple(samples)
class _HistogramTimer(object):
def __init__(self, histogram):
self._histogram = histogram
def __enter__(self):
self._start = default_timer()
def __exit__(self, typ, value, traceback):
# Time can go backwards.
self._histogram.observe(max(default_timer() - self._start, 0))
def __call__(self, f):
def wrapped(func, *args, **kwargs):
with self:
return func(*args, **kwargs)
return decorate(f, wrapped)
class _ExceptionCounter(object):
def __init__(self, counter, exception):
self._counter = counter
self._exception = exception
def __enter__(self):
pass
def __exit__(self, typ, value, traceback):
if isinstance(value, self._exception):
self._counter.inc()
def __call__(self, f):
def wrapped(func, *args, **kwargs):
with self:
return func(*args, **kwargs)
return decorate(f, wrapped)
class _InprogressTracker(object):
def __init__(self, gauge):
self._gauge = gauge
def __enter__(self):
self._gauge.inc()
def __exit__(self, typ, value, traceback):
self._gauge.dec()
def __call__(self, f):
def wrapped(func, *args, **kwargs):
with self:
return func(*args, **kwargs)
return decorate(f, wrapped)
class _SummaryTimer(object):
def __init__(self, summary):
self._summary = summary
def __enter__(self):
self._start = default_timer()
def __exit__(self, typ, value, traceback):
# Time can go backwards.
self._summary.observe(max(default_timer() - self._start, 0))
def __call__(self, f):
def wrapped(func, *args, **kwargs):
with self:
return func(*args, **kwargs)
return decorate(f, wrapped)
class _GaugeTimer(object):
def __init__(self, gauge):
self._gauge = gauge
def __enter__(self):
self._start = default_timer()
def __exit__(self, typ, value, traceback):
# Time can go backwards.
self._gauge.set(max(default_timer() - self._start, 0))
def __call__(self, f):
def wrapped(func, *args, **kwargs):
with self:
return func(*args, **kwargs)
return decorate(f, wrapped)
# ######################### LICENSE ############################ #
# Copyright (c) 2005-2016, Michele Simionato
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# Redistributions in bytecode form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
# DAMAGE.
"""
Decorator module, see http://pypi.python.org/pypi/decorator
for the documentation.
"""
from __future__ import print_function
import re
import sys
import inspect
import operator
import itertools
import collections
__version__ = '4.0.10'
if sys.version >= '3':
from inspect import getfullargspec
def get_init(cls):
return cls.__init__
else:
class getfullargspec(object):
"A quick and dirty replacement for getfullargspec for Python 2.X"
def __init__(self, f):
self.args, self.varargs, self.varkw, self.defaults = \
inspect.getargspec(f)
self.kwonlyargs = []
self.kwonlydefaults = None
def __iter__(self):
yield self.args
yield self.varargs
yield self.varkw
yield self.defaults
getargspec = inspect.getargspec
def get_init(cls):
return cls.__init__.__func__
# getargspec has been deprecated in Python 3.5
ArgSpec = collections.namedtuple(
'ArgSpec', 'args varargs varkw defaults')
def getargspec(f):
"""A replacement for inspect.getargspec"""
spec = getfullargspec(f)
return ArgSpec(spec.args, spec.varargs, spec.varkw, spec.defaults)
DEF = re.compile('\s*def\s*([_\w][_\w\d]*)\s*\(')
# basic functionality
class FunctionMaker(object):
"""
An object with the ability to create functions with a given signature.
It has attributes name, doc, module, signature, defaults, dict and
methods update and make.
"""
# Atomic get-and-increment provided by the GIL
_compile_count = itertools.count()
def __init__(self, func=None, name=None, signature=None,
defaults=None, doc=None, module=None, funcdict=None):
self.shortsignature = signature
if func:
# func can be a class or a callable, but not an instance method
self.name = func.__name__
if self.name == '<lambda>': # small hack for lambda functions
self.name = '_lambda_'
self.doc = func.__doc__
self.module = func.__module__
if inspect.isfunction(func):
argspec = getfullargspec(func)
self.annotations = getattr(func, '__annotations__', {})
for a in ('args', 'varargs', 'varkw', 'defaults', 'kwonlyargs',
'kwonlydefaults'):
setattr(self, a, getattr(argspec, a))
for i, arg in enumerate(self.args):
setattr(self, 'arg%d' % i, arg)
if sys.version < '3': # easy way
self.shortsignature = self.signature = (
inspect.formatargspec(
formatvalue=lambda val: "", *argspec)[1:-1])
else: # Python 3 way
allargs = list(self.args)
allshortargs = list(self.args)
if self.varargs:
allargs.append('*' + self.varargs)
allshortargs.append('*' + self.varargs)
elif self.kwonlyargs:
allargs.append('*') # single star syntax
for a in self.kwonlyargs:
allargs.append('%s=None' % a)
allshortargs.append('%s=%s' % (a, a))
if self.varkw:
allargs.append('**' + self.varkw)
allshortargs.append('**' + self.varkw)
self.signature = ', '.join(allargs)
self.shortsignature = ', '.join(allshortargs)
self.dict = func.__dict__.copy()
# func=None happens when decorating a caller
if name:
self.name = name
if signature is not None:
self.signature = signature
if defaults:
self.defaults = defaults
if doc:
self.doc = doc
if module:
self.module = module
if funcdict:
self.dict = funcdict
# check existence required attributes
assert hasattr(self, 'name')
if not hasattr(self, 'signature'):
raise TypeError('You are decorating a non function: %s' % func)
def update(self, func, **kw):
"Update the signature of func with the data in self"
func.__name__ = self.name
func.__doc__ = getattr(self, 'doc', None)
func.__dict__ = getattr(self, 'dict', {})
func.__defaults__ = getattr(self, 'defaults', ())
func.__kwdefaults__ = getattr(self, 'kwonlydefaults', None)
func.__annotations__ = getattr(self, 'annotations', None)
try:
frame = sys._getframe(3)
except AttributeError: # for IronPython and similar implementations
callermodule = '?'
else:
callermodule = frame.f_globals.get('__name__', '?')
func.__module__ = getattr(self, 'module', callermodule)
func.__dict__.update(kw)
def make(self, src_templ, evaldict=None, addsource=False, **attrs):
"Make a new function from a given template and update the signature"
src = src_templ % vars(self) # expand name and signature
evaldict = evaldict or {}
mo = DEF.match(src)
if mo is None:
raise SyntaxError('not a valid function template\n%s' % src)
name = mo.group(1) # extract the function name
names = set([name] + [arg.strip(' *') for arg in
self.shortsignature.split(',')])
for n in names:
if n in ('_func_', '_call_'):
raise NameError('%s is overridden in\n%s' % (n, src))
if not src.endswith('\n'): # add a newline for old Pythons
src += '\n'
# Ensure each generated function has a unique filename for profilers
# (such as cProfile) that depend on the tuple of (<filename>,
# <definition line>, <function name>) being unique.
filename = '<decorator-gen-%d>' % (next(self._compile_count),)
try:
code = compile(src, filename, 'single')
exec(code, evaldict)
except:
print('Error in generated code:', file=sys.stderr)
print(src, file=sys.stderr)
raise
func = evaldict[name]
if addsource:
attrs['__source__'] = src
self.update(func, **attrs)
return func
@classmethod
def create(cls, obj, body, evaldict, defaults=None,
doc=None, module=None, addsource=True, **attrs):
"""
Create a function from the strings name, signature and body.
evaldict is the evaluation dictionary. If addsource is true an
attribute __source__ is added to the result. The attributes attrs
are added, if any.
"""
if isinstance(obj, str): # "name(signature)"
name, rest = obj.strip().split('(', 1)
signature = rest[:-1] # strip a right parens
func = None
else: # a function
name = None
signature = None
func = obj
self = cls(func, name, signature, defaults, doc, module)
ibody = '\n'.join(' ' + line for line in body.splitlines())
return self.make('def %(name)s(%(signature)s):\n' + ibody,
evaldict, addsource, **attrs)
def decorate(func, caller):
"""
decorate(func, caller) decorates a function using a caller.
"""
evaldict = dict(_call_=caller, _func_=func)
fun = FunctionMaker.create(
func, "return _call_(_func_, %(shortsignature)s)",
evaldict, __wrapped__=func)
if hasattr(func, '__qualname__'):
fun.__qualname__ = func.__qualname__
return fun
def decorator(caller, _func=None):
"""decorator(caller) converts a caller function into a decorator"""
if _func is not None: # return a decorated function
# this is obsolete behavior; you should use decorate instead
return decorate(_func, caller)
# else return a decorator function
if inspect.isclass(caller):
name = caller.__name__.lower()
doc = 'decorator(%s) converts functions/generators into ' \
'factories of %s objects' % (caller.__name__, caller.__name__)
elif inspect.isfunction(caller):
if caller.__name__ == '<lambda>':
name = '_lambda_'
else:
name = caller.__name__
doc = caller.__doc__
else: # assume caller is an object with a __call__ method
name = caller.__class__.__name__.lower()
doc = caller.__call__.__doc__
evaldict = dict(_call_=caller, _decorate_=decorate)
return FunctionMaker.create(
'%s(func)' % name, 'return _decorate_(func, _call_)',
evaldict, doc=doc, module=caller.__module__,
__wrapped__=caller)
# ####################### contextmanager ####################### #
try: # Python >= 3.2
from contextlib import _GeneratorContextManager
except ImportError: # Python >= 2.5
from contextlib import GeneratorContextManager as _GeneratorContextManager
class ContextManager(_GeneratorContextManager):
def __call__(self, func):
"""Context manager decorator"""
return FunctionMaker.create(
func, "with _self_: return _func_(%(shortsignature)s)",
dict(_self_=self, _func_=func), __wrapped__=func)
init = getfullargspec(_GeneratorContextManager.__init__)
n_args = len(init.args)
if n_args == 2 and not init.varargs: # (self, genobj) Python 2.7
def __init__(self, g, *a, **k):
return _GeneratorContextManager.__init__(self, g(*a, **k))
ContextManager.__init__ = __init__
elif n_args == 2 and init.varargs: # (self, gen, *a, **k) Python 3.4
pass
elif n_args == 4: # (self, gen, args, kwds) Python 3.5
def __init__(self, g, *a, **k):
return _GeneratorContextManager.__init__(self, g, a, k)
ContextManager.__init__ = __init__
contextmanager = decorator(ContextManager)
# ############################ dispatch_on ############################ #
def append(a, vancestors):
"""
Append ``a`` to the list of the virtual ancestors, unless it is already
included.
"""
add = True
for j, va in enumerate(vancestors):
if issubclass(va, a):
add = False
break
if issubclass(a, va):
vancestors[j] = a
add = False
if add:
vancestors.append(a)
# inspired from simplegeneric by P.J. Eby and functools.singledispatch
def dispatch_on(*dispatch_args):
"""
Factory of decorators turning a function into a generic function
dispatching on the given arguments.
"""
assert dispatch_args, 'No dispatch args passed'
dispatch_str = '(%s,)' % ', '.join(dispatch_args)
def check(arguments, wrong=operator.ne, msg=''):
"""Make sure one passes the expected number of arguments"""
if wrong(len(arguments), len(dispatch_args)):
raise TypeError('Expected %d arguments, got %d%s' %
(len(dispatch_args), len(arguments), msg))
def gen_func_dec(func):
"""Decorator turning a function into a generic function"""
# first check the dispatch arguments
argset = set(getfullargspec(func).args)
if not set(dispatch_args) <= argset:
raise NameError('Unknown dispatch arguments %s' % dispatch_str)
typemap = {}
def vancestors(*types):
"""
Get a list of sets of virtual ancestors for the given types
"""
check(types)
ras = [[] for _ in range(len(dispatch_args))]
for types_ in typemap:
for t, type_, ra in zip(types, types_, ras):
if issubclass(t, type_) and type_ not in t.__mro__:
append(type_, ra)
return [set(ra) for ra in ras]
def ancestors(*types):
"""
Get a list of virtual MROs, one for each type
"""
check(types)
lists = []
for t, vas in zip(types, vancestors(*types)):
n_vas = len(vas)
if n_vas > 1:
raise RuntimeError(
'Ambiguous dispatch for %s: %s' % (t, vas))
elif n_vas == 1:
va, = vas
mro = type('t', (t, va), {}).__mro__[1:]
else:
mro = t.__mro__
lists.append(mro[:-1]) # discard t and object
return lists
def register(*types):
"""
Decorator to register an implementation for the given types
"""
check(types)
def dec(f):
check(getfullargspec(f).args, operator.lt, ' in ' + f.__name__)
typemap[types] = f
return f
return dec
def dispatch_info(*types):
"""
An utility to introspect the dispatch algorithm
"""
check(types)
lst = []
for anc in itertools.product(*ancestors(*types)):
lst.append(tuple(a.__name__ for a in anc))
return lst
def _dispatch(dispatch_args, *args, **kw):
types = tuple(type(arg) for arg in dispatch_args)
try: # fast path
f = typemap[types]
except KeyError:
pass
else:
return f(*args, **kw)
combinations = itertools.product(*ancestors(*types))
next(combinations) # the first one has been already tried
for types_ in combinations:
f = typemap.get(types_)
if f is not None:
return f(*args, **kw)
# else call the default implementation
return func(*args, **kw)
return FunctionMaker.create(
func, 'return _f_(%s, %%(shortsignature)s)' % dispatch_str,
dict(_f_=_dispatch), register=register, default=func,
typemap=typemap, vancestors=vancestors, ancestors=ancestors,
dispatch_info=dispatch_info, __wrapped__=func)
gen_func_dec.__name__ = 'dispatch_on' + dispatch_str
return gen_func_dec
#!/usr/bin/python
from __future__ import unicode_literals
import base64
import os
import socket
import threading
import time
from contextlib import closing
from wsgiref.simple_server import make_server
from . import core
try:
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SocketServer import ThreadingMixIn
from urllib2 import build_opener, Request, HTTPHandler
from urllib import quote_plus
from urlparse import parse_qs, urlparse
except ImportError:
# Python 3
unicode = str
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from urllib.request import build_opener, Request, HTTPHandler
from urllib.parse import quote_plus, parse_qs, urlparse
CONTENT_TYPE_LATEST = str('text/plain; version=0.0.4; charset=utf-8')
'''Content type of the latest text format'''
def make_wsgi_app(registry=core.REGISTRY):
'''Create a WSGI app which serves the metrics from a registry.'''
def prometheus_app(environ, start_response):
params = parse_qs(environ['QUERY_STRING'])
r = registry
if 'name[]' in params:
r = r.restricted_registry(params['name[]'])
output = generate_latest(r)
status = str('200 OK')
headers = [(str('Content-type'), CONTENT_TYPE_LATEST)]
start_response(status, headers)
return [output]
return prometheus_app
def start_wsgi_server(port, addr='', registry=core.REGISTRY):
"""Starts a WSGI server for prometheus metrics as a daemon thread."""
class PrometheusMetricsServer(threading.Thread):
def run(self):
httpd = make_server(addr, port, make_wsgi_app(registry))
httpd.serve_forever()
t = PrometheusMetricsServer()
t.daemon = True
t.start()
def generate_latest(registry=core.REGISTRY):
'''Returns the metrics from the registry in latest text format as a string.'''
def sampleconv(s):
# s may be a (value, timestamp) tuple or simply a value
if not isinstance(s, tuple):
return core._floatToGoString(s)
if s[1] is None:
return core._floatToGoString(s[0])
# return the pair
return ('{} {}'.format(core._floatToGoString(s[0]), int(s[1])))
output = []
for metric in registry.collect():
output.append('# HELP {0} {1}'.format(
metric.name, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
output.append('\n# TYPE {0} {1}\n'.format(metric.name, metric.type))
for name, labels, value in metric.samples:
if labels:
labelstr = '{{{0}}}'.format(','.join(
['{0}="{1}"'.format(
k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"'))
for k, v in sorted(labels.items())]))
else:
labelstr = ''
output.append('{0}{1} {2}\n'.format(name, labelstr, sampleconv(value)))
return ''.join(output).encode('utf-8')
class MetricsHandler(BaseHTTPRequestHandler):
def do_GET(self):
registry = core.REGISTRY
params = parse_qs(urlparse(self.path).query)
if 'name[]' in params:
registry = registry.restricted_registry(params['name[]'])
try:
output = generate_latest(registry)
except:
self.send_error(500, 'error generating metric output')
raise
self.send_response(200)
self.send_header('Content-Type', CONTENT_TYPE_LATEST)
self.end_headers()
self.wfile.write(output)
def log_message(self, format, *args):
return
def start_http_server(port, addr=''):
"""Starts an HTTP server for prometheus metrics as a daemon thread"""
class ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
pass
class PrometheusMetricsServer(threading.Thread):
def run(self):
httpd = ThreadingSimpleServer((addr, port), MetricsHandler)
httpd.serve_forever()
t = PrometheusMetricsServer()
t.daemon = True
t.start()
def write_to_textfile(path, registry):
'''Write metrics to the given path.
This is intended for use with the Node exporter textfile collector.
The path must end in .prom for the textfile collector to process it.'''
tmppath = '%s.%s.%s' % (path, os.getpid(), threading.current_thread().ident)
with open(tmppath, 'wb') as f:
f.write(generate_latest(registry))
# rename(2) is atomic.
os.rename(tmppath, path)
def default_handler(url, method, timeout, headers, data):
'''Default handler that implements HTTP/HTTPS connections.
Used by the push_to_gateway functions. Can be re-used by other handlers.'''
def handle():
request = Request(url, data=data)
request.get_method = lambda: method
for k, v in headers:
request.add_header(k, v)
resp = build_opener(HTTPHandler).open(request, timeout=timeout)
if resp.code >= 400:
raise IOError("error talking to pushgateway: {0} {1}".format(
resp.code, resp.msg))
return handle
def basic_auth_handler(url, method, timeout, headers, data, username=None, password=None):
'''Handler that implements HTTP/HTTPS connections with Basic Auth.
Sets auth headers using supplied 'username' and 'password', if set.
Used by the push_to_gateway functions. Can be re-used by other handlers.'''
def handle():
'''Handler that implements HTTP Basic Auth.
'''
if username is not None and password is not None:
auth_value = '{0}:{1}'.format(username, password).encode('utf-8')
auth_token = base64.b64encode(auth_value)
auth_header = b'Basic ' + auth_token
headers.append(['Authorization', auth_header])
default_handler(url, method, timeout, headers, data)()
return handle
def push_to_gateway(gateway, job, registry, grouping_key=None, timeout=None, handler=default_handler):
'''Push metrics to the given pushgateway.
`gateway` the url for your push gateway. Either of the form
'http://pushgateway.local', or 'pushgateway.local'.
Scheme defaults to 'http' if none is provided
`job` is the job label to be attached to all pushed metrics
`registry` is an instance of CollectorRegistry
`grouping_key` please see the pushgateway documentation for details.
Defaults to None
`timeout` is how long push will attempt to connect before giving up.
Defaults to None
`handler` is an optional function which can be provided to perform
requests to the 'gateway'.
Defaults to None, in which case an http or https request
will be carried out by a default handler.
If not None, the argument must be a function which accepts
the following arguments:
url, method, timeout, headers, and content
May be used to implement additional functionality not
supported by the built-in default handler (such as SSL
client certicates, and HTTP authentication mechanisms).
'url' is the URL for the request, the 'gateway' argument
described earlier will form the basis of this URL.
'method' is the HTTP method which should be used when
carrying out the request.
'timeout' requests not successfully completed after this
many seconds should be aborted. If timeout is None, then
the handler should not set a timeout.
'headers' is a list of ("header-name","header-value") tuples
which must be passed to the pushgateway in the form of HTTP
request headers.
The function should raise an exception (e.g. IOError) on
failure.
'content' is the data which should be used to form the HTTP
Message Body.
This overwrites all metrics with the same job and grouping_key.
This uses the PUT HTTP method.'''
_use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler)
def pushadd_to_gateway(gateway, job, registry, grouping_key=None, timeout=None, handler=default_handler):
'''PushAdd metrics to the given pushgateway.
`gateway` the url for your push gateway. Either of the form
'http://pushgateway.local', or 'pushgateway.local'.
Scheme defaults to 'http' if none is provided
`job` is the job label to be attached to all pushed metrics
`registry` is an instance of CollectorRegistry
`grouping_key` please see the pushgateway documentation for details.
Defaults to None
`timeout` is how long push will attempt to connect before giving up.
Defaults to None
`handler` is an optional function which can be provided to perform
requests to the 'gateway'.
Defaults to None, in which case an http or https request
will be carried out by a default handler.
See the 'prometheus_client.push_to_gateway' documentation
for implementation requirements.
This replaces metrics with the same name, job and grouping_key.
This uses the POST HTTP method.'''
_use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler)
def delete_from_gateway(gateway, job, grouping_key=None, timeout=None, handler=default_handler):
'''Delete metrics from the given pushgateway.
`gateway` the url for your push gateway. Either of the form
'http://pushgateway.local', or 'pushgateway.local'.
Scheme defaults to 'http' if none is provided
`job` is the job label to be attached to all pushed metrics
`grouping_key` please see the pushgateway documentation for details.
Defaults to None
`timeout` is how long delete will attempt to connect before giving up.
Defaults to None
`handler` is an optional function which can be provided to perform
requests to the 'gateway'.
Defaults to None, in which case an http or https request
will be carried out by a default handler.
See the 'prometheus_client.push_to_gateway' documentation
for implementation requirements.
This deletes metrics with the given job and grouping_key.
This uses the DELETE HTTP method.'''
_use_gateway('DELETE', gateway, job, None, grouping_key, timeout, handler)
def _use_gateway(method, gateway, job, registry, grouping_key, timeout, handler):
gateway_url = urlparse(gateway)
if not gateway_url.scheme:
gateway = 'http://{0}'.format(gateway)
url = '{0}/metrics/job/{1}'.format(gateway, quote_plus(job))
data = b''
if method != 'DELETE':
data = generate_latest(registry)
if grouping_key is None:
grouping_key = {}
url = url + ''.join(['/{0}/{1}'.format(quote_plus(str(k)), quote_plus(str(v)))
for k, v in sorted(grouping_key.items())])
headers=[('Content-Type', CONTENT_TYPE_LATEST)]
handler(url=url, method=method, timeout=timeout,
headers=headers, data=data)()
def instance_ip_grouping_key():
'''Grouping key with instance set to the IP Address of this host.'''
with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
s.connect(('localhost', 0))
return {'instance': s.getsockname()[0]}
#!/usr/bin/python
from __future__ import unicode_literals
import glob
import json
import os
import shelve
from . import core
class MultiProcessCollector(object):
"""Collector for files for multi-process mode."""
def __init__(self, registry, path=os.environ.get('prometheus_multiproc_dir')):
self._path = path
if registry:
registry.register(self)
def collect(self):
metrics = {}
for f in glob.glob(os.path.join(self._path, '*.db')):
parts = os.path.basename(f).split('_')
typ = parts[0]
d = core._MmapedDict(f)
for key, value in d.read_all_values():
metric_name, name, labelnames, labelvalues = json.loads(key)
metrics.setdefault(metric_name, core.Metric(metric_name, 'Multiprocess metric', typ))
metric = metrics[metric_name]
if typ == 'gauge':
pid = parts[2][:-3]
metric._multiprocess_mode = parts[1]
metric.add_sample(name, tuple(zip(labelnames, labelvalues)) + (('pid', pid), ), value)
else:
# The duplicates and labels are fixed in the next for.
metric.add_sample(name, tuple(zip(labelnames, labelvalues)), value)
d.close()
for metric in metrics.values():
samples = {}
buckets = {}
latest_ts = None
for name, labels, value in metric.samples:
if value[1] is not None:
latest_ts = max(latest_ts, value[1])
if metric.type == 'gauge':
without_pid = tuple([l for l in labels if l[0] != 'pid'])
if metric._multiprocess_mode == 'min':
samples.setdefault((name, without_pid), value)
if samples[(name, without_pid)][0] > value[0]:
samples[(name, without_pid)] = value
elif metric._multiprocess_mode == 'max':
samples.setdefault((name, without_pid), value)
if samples[(name, without_pid)][0] < value[0]:
samples[(name, without_pid)] = value
elif metric._multiprocess_mode == 'livesum':
samples.setdefault((name, without_pid), [0.0, None])
samples[(name, without_pid)][0] += value[0]
samples[(name, without_pid)][1] = latest_ts
else: # all/liveall
samples[(name, labels)] = value
elif metric.type == 'histogram':
bucket = [float(l[1]) for l in labels if l[0] == 'le']
if bucket:
# _bucket
without_le = tuple([l for l in labels if l[0] != 'le'])
buckets.setdefault(without_le, {})
buckets[without_le].setdefault(bucket[0], [0.0, None])
buckets[without_le][bucket[0]][0] += value[0]
buckets[without_le][bucket[0]][1] = latest_ts
else:
# _sum/_count
samples.setdefault((name, labels), [0.0, None])
samples[(name, labels)][0] += value[0]
samples[(name, labels)][1] = latest_ts
else:
# Counter and Summary.
samples.setdefault((name, labels), [0.0, None])
samples[(name, labels)][0] += value[0]
samples[(name, labels)][1] = value[1]
# Accumulate bucket values.
if metric.type == 'histogram':
for labels, values in buckets.items():
latest_ts = None
acc = 0.0
for bucket, value in sorted(values.items()):
acc += value[0]
if value[1] is not None:
latest_ts = max(latest_ts, value[1])
samples[(metric.name + '_bucket', labels + (('le', core._floatToGoString(bucket)), ))] = \
(acc, value[1])
samples[(metric.name + '_count', labels)] = (acc, latest_ts)
# Convert to correct sample format.
metric.samples = [(name, dict(labels), tuple(value)) for (name, labels), value in samples.items()]
return metrics.values()
def mark_process_dead(pid, path=os.environ.get('prometheus_multiproc_dir')):
"""Do bookkeeping for when one process dies in a multi-process setup."""
for f in glob.glob(os.path.join(path, 'gauge_livesum_{0}.db'.format(pid))):
os.remove(f)
for f in glob.glob(os.path.join(path, 'gauge_liveall_{0}.db'.format(pid))):
os.remove(f)
#!/usr/bin/python
from __future__ import unicode_literals
try:
import StringIO
except ImportError:
# Python 3
import io as StringIO
from . import core
def text_string_to_metric_families(text):
"""Parse Prometheus text format from a unicode string.
See text_fd_to_metric_families.
"""
for metric_family in text_fd_to_metric_families(StringIO.StringIO(text)):
yield metric_family
def _unescape_help(text):
result = []
slash = False
for char in text:
if slash:
if char == '\\':
result.append('\\')
elif char == 'n':
result.append('\n')
else:
result.append('\\' + char)
slash = False
else:
if char == '\\':
slash = True
else:
result.append(char)
if slash:
result.append('\\')
return ''.join(result)
def _parse_sample(text):
name = []
labelname = []
labelvalue = []
value = []
timestamp = []
labels = {}
state = 'name'
for char in text:
if state == 'name':
if char == '{':
state = 'startoflabelname'
elif char == ' ' or char == '\t':
state = 'endofname'
else:
name.append(char)
elif state == 'endofname':
if char == ' ' or char == '\t':
pass
elif char == '{':
state = 'startoflabelname'
else:
value.append(char)
state = 'value'
elif state == 'startoflabelname':
if char == ' ' or char == '\t' or char == ',':
pass
elif char == '}':
state = 'endoflabels'
else:
state = 'labelname'
labelname.append(char)
elif state == 'labelname':
if char == '=':
state = 'labelvaluequote'
elif char == ' ' or char == '\t':
state = 'labelvalueequals'
else:
labelname.append(char)
elif state == 'labelvalueequals':
if char == '=':
state = 'labelvaluequote'
elif char == ' ' or char == '\t':
pass
else:
raise ValueError("Invalid line: " + text)
elif state == 'labelvaluequote':
if char == '"':
state = 'labelvalue'
elif char == ' ' or char == '\t':
pass
else:
raise ValueError("Invalid line: " + text)
elif state == 'labelvalue':
if char == '\\':
state = 'labelvalueslash'
elif char == '"':
labels[''.join(labelname)] = ''.join(labelvalue)
labelname = []
labelvalue = []
state = 'nextlabel'
else:
labelvalue.append(char)
elif state == 'labelvalueslash':
state = 'labelvalue'
if char == '\\':
labelvalue.append('\\')
elif char == 'n':
labelvalue.append('\n')
elif char == '"':
labelvalue.append('"')
else:
labelvalue.append('\\' + char)
elif state == 'nextlabel':
if char == ',':
state = 'startoflabelname'
elif char == '}':
state = 'endoflabels'
elif char == ' ' or char == '\t':
pass
else:
raise ValueError("Invalid line: " + text)
elif state == 'endoflabels':
if char == ' ' or char == '\t':
pass
else:
value.append(char)
state = 'value'
elif state == 'value':
if char == ' ' or char == '\t':
state = 'endofvalue'
else:
value.append(char)
elif state == 'endofvalue':
if char == ' ' or char == '\t':
pass
else:
state = 'timestamp'
timestamp.append(char)
elif state == 'timestamp':
if char == ' ' or char == '\t':
break
else:
timestamp.append(char)
if len(timestamp) == 0:
timestamp = None
else:
timestamp = int(''.join(timestamp))
return (''.join(name), labels, (float(''.join(value)), timestamp))
def text_fd_to_metric_families(fd):
"""Parse Prometheus text format from a file descriptor.
This is a laxer parser than the main Go parser,
so successful parsing does not imply that the parsed
text meets the specification.
Yields core.Metric's.
"""
name = ''
documentation = ''
typ = 'untyped'
samples = []
allowed_names = []
def build_metric(name, documentation, typ, samples):
metric = core.Metric(name, documentation, typ)
metric.samples = samples
return metric
for line in fd:
line = line.strip()
if line.startswith('#'):
parts = line.split(None, 3)
if len(parts) < 2:
continue
if parts[1] == 'HELP':
if parts[2] != name:
if name != '':
yield build_metric(name, documentation, typ, samples)
# New metric
name = parts[2]
typ = 'untyped'
samples = []
allowed_names = [parts[2]]
if len(parts) == 4:
documentation = _unescape_help(parts[3])
else:
documentation = ''
elif parts[1] == 'TYPE':
if parts[2] != name:
if name != '':
yield build_metric(name, documentation, typ, samples)
# New metric
name = parts[2]
documentation = ''
samples = []
typ = parts[3]
allowed_names = {
'counter': [''],
'gauge': [''],
'summary': ['_count', '_sum', ''],
'histogram': ['_count', '_sum', '_bucket'],
}.get(typ, [''])
allowed_names = [name + n for n in allowed_names]
else:
# Ignore other comment tokens
pass
elif line == '':
# Ignore blank lines
pass
else:
sample = _parse_sample(line)
if sample[0] not in allowed_names:
if name != '':
yield build_metric(name, documentation, typ, samples)
# New metric, yield immediately as untyped singleton
name = ''
documentation = ''
typ = 'untyped'
samples = []
allowed_names = []
yield build_metric(sample[0], documentation, typ, [sample])
else:
samples.append(sample)
if name != '':
yield build_metric(name, documentation, typ, samples)
#!/usr/bin/env python
# -*- coding: utf-8
from __future__ import unicode_literals
import platform as pf
from . import core
class PlatformCollector(object):
"""Collector for python platform information"""
def __init__(self, registry=core.REGISTRY, platform=None):
self._platform = pf if platform is None else platform
info = self._info()
system = self._platform.system()
if system == "Java":
info.update(self._java())
self._metrics = [
self._add_metric("python_info", "Python platform information", info)
]
if registry:
registry.register(self)
def collect(self):
return self._metrics
@staticmethod
def _add_metric(name, documentation, data):
labels = data.keys()
values = [data[k] for k in labels]
g = core.GaugeMetricFamily(name, documentation, labels=labels)
g.add_metric(values, 1)
return g
def _info(self):
major, minor, patchlevel = self._platform.python_version_tuple()
return {
"version": self._platform.python_version(),
"implementation": self._platform.python_implementation(),
"major": major,
"minor": minor,
"patchlevel": patchlevel
}
def _java(self):
java_version, _, vminfo, osinfo = self._platform.java_ver()
vm_name, vm_release, vm_vendor = vminfo
return {
"jvm_version": java_version,
"jvm_release": vm_release,
"jvm_vendor": vm_vendor,
"jvm_name": vm_name
}
PLATFORM_COLLECTOR = PlatformCollector()
"""PlatformCollector in default Registry REGISTRY"""
#!/usr/bin/python
from __future__ import unicode_literals
import os
from . import core
try:
import resource
_PAGESIZE = resource.getpagesize()
except ImportError:
# Not Unix
_PAGESIZE = 4096
class ProcessCollector(object):
"""Collector for Standard Exports such as cpu and memory."""
def __init__(self, namespace='', pid=lambda: 'self', proc='/proc', registry=core.REGISTRY):
self._namespace = namespace
self._pid = pid
self._proc = proc
if namespace:
self._prefix = namespace + '_process_'
else:
self._prefix = 'process_'
self._ticks = 100.0
try:
self._ticks = os.sysconf('SC_CLK_TCK')
except (ValueError, TypeError, AttributeError):
pass
# This is used to test if we can access /proc.
self._btime = 0
try:
self._btime = self._boot_time()
except IOError:
pass
if registry:
registry.register(self)
def _boot_time(self):
with open(os.path.join(self._proc, 'stat')) as stat:
for line in stat:
if line.startswith('btime '):
return float(line.split()[1])
def collect(self):
if not self._btime:
return []
pid = os.path.join(self._proc, str(self._pid()).strip())
result = []
try:
with open(os.path.join(pid, 'stat')) as stat:
parts = (stat.read().split(')')[-1].split())
vmem = core.GaugeMetricFamily(self._prefix + 'virtual_memory_bytes',
'Virtual memory size in bytes.', value=float(parts[20]))
rss = core.GaugeMetricFamily(self._prefix + 'resident_memory_bytes', 'Resident memory size in bytes.',
value=float(parts[21]) * _PAGESIZE)
start_time_secs = float(parts[19]) / self._ticks
start_time = core.GaugeMetricFamily(self._prefix + 'start_time_seconds',
'Start time of the process since unix epoch in seconds.',
value=start_time_secs + self._btime)
utime = float(parts[11]) / self._ticks
stime = float(parts[12]) / self._ticks
cpu = core.CounterMetricFamily(self._prefix + 'cpu_seconds_total',
'Total user and system CPU time spent in seconds.',
value=utime + stime)
result.extend([vmem, rss, start_time, cpu])
except IOError:
pass
try:
with open(os.path.join(pid, 'limits')) as limits:
for line in limits:
if line.startswith('Max open file'):
max_fds = core.GaugeMetricFamily(self._prefix + 'max_fds',
'Maximum number of open file descriptors.',
value=float(line.split()[3]))
break
open_fds = core.GaugeMetricFamily(self._prefix + 'open_fds',
'Number of open file descriptors.',
len(os.listdir(os.path.join(pid, 'fd'))))
result.extend([open_fds, max_fds])
except IOError:
pass
return result
PROCESS_COLLECTOR = ProcessCollector()
"""Default ProcessCollector in default Registry REGISTRY."""
from ._exposition import MetricsResource
__all__ = ['MetricsResource']
from __future__ import absolute_import, unicode_literals
from .. import REGISTRY, generate_latest, CONTENT_TYPE_LATEST
from twisted.web.resource import Resource
class MetricsResource(Resource):
"""
Twisted ``Resource`` that serves prometheus metrics.
"""
isLeaf = True
def __init__(self, registry=REGISTRY):
self.registry = registry
def render_GET(self, request):
request.setHeader(b'Content-Type', CONTENT_TYPE_LATEST.encode('ascii'))
return generate_latest(self.registry)
import os
from setuptools import setup
setup(
name = "prometheus_client",
version = "0.0.19",
author = "Brian Brazil",
author_email = "brian.brazil@robustperception.io",
description = ("Python client for the Prometheus monitoring system."),
long_description = ("See https://github.com/prometheus/client_python/blob/master/README.md for documentation."),
license = "Apache Software License 2.0",
keywords = "prometheus monitoring instrumentation client",
url = "https://github.com/prometheus/client_python",
packages=['prometheus_client', 'prometheus_client.bridge', 'prometheus_client.twisted'],
extras_require={
'twisted': ['twisted'],
},
test_suite="tests",
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.6",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
"Topic :: System :: Monitoring",
"License :: OSI Approved :: Apache Software License",
],
)
Limit Soft Limit Hard Limit Units
Max cpu time unlimited unlimited seconds
Max file size unlimited unlimited bytes
Max data size unlimited unlimited bytes
Max stack size 8388608 unlimited bytes
Max core file size 0 unlimited bytes
Max resident set unlimited unlimited bytes
Max processes 62898 62898 processes
Max open files 2048 4096 files
Max locked memory 65536 65536 bytes
Max address space unlimited unlimited bytes
Max file locks unlimited unlimited locks
Max pending signals 62898 62898 signals
Max msgqueue size 819200 819200 bytes
Max nice priority 0 0
26231 (vim) R 5392 7446 5392 34835 7446 4218880 32533 309516 26 82 1677 44 158 99 20 0 1 0 82375 56274944 1981 18446744073709551615 4194304 6294284 140736914091744 140736914087944 139965136429984 0 0 12288 1870679807 0 0 0 17 0 0 0 31 0 0 8391624 8481048 16420864 140736914093252 140736914093279 140736914093279 140736914096107 0
1020 ((a b ) ( c d) ) R 28378 1020 28378 34842 1020 4218880 286 0 0 0 0 0 0 0 20 0 1 0 10839175 10395648 155 18446744073709551615 4194304 4238788 140736466511168 140736466511168 140609271124624 0 0 0 0 0 0 0 17 5 0 0 0 0 0 6336016 6337300 25579520 140736466515030 140736466515061 140736466515061 140736466518002 0
cpu 301854 612 111922 8979004 3552 2 3944 0 0 0
cpu0 44490 19 21045 1087069 220 1 3410 0 0 0
cpu1 47869 23 16474 1110787 591 0 46 0 0 0
cpu2 46504 36 15916 1112321 441 0 326 0 0 0
cpu3 47054 102 15683 1113230 533 0 60 0 0 0
cpu4 28413 25 10776 1140321 217 0 8 0 0 0
cpu5 29271 101 11586 1136270 672 0 30 0 0 0
cpu6 29152 36 10276 1139721 319 0 29 0 0 0
cpu7 29098 268 10164 1139282 555 0 31 0 0 0
intr 8885917 17 0 0 0 0 0 0 0 1 79281 0 0 0 0 0 0 0 231237 0 0 0 0 250586 103 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 223424 190745 13 906 1283803 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0intr 8885917 17 0 0 0 0 0 0 0 1 79281 0 0 0 0 00 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
ctxt 38014093
btime 1418183276
processes 26442
procs_running 2
procs_blocked 0
softirq 5057579 250191 1481983 1647 211099 186066 0 1783454 622196 12499 508444
from __future__ import unicode_literals
import inspect
import os
import threading
import time
import unittest
from prometheus_client.core import *
class TestCounter(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
self.counter = Counter('c', 'help', registry=self.registry)
def test_increment(self):
self.assertEqual(0, self.registry.get_sample_value('c'))
self.counter.inc()
self.assertEqual(1, self.registry.get_sample_value('c'))
self.counter.inc(7)
self.assertEqual(8, self.registry.get_sample_value('c'))
def test_negative_increment_raises(self):
self.assertRaises(ValueError, self.counter.inc, -1)
def test_function_decorator(self):
@self.counter.count_exceptions(ValueError)
def f(r):
if r:
raise ValueError
else:
raise TypeError
self.assertEqual((["r"], None, None, None), inspect.getargspec(f))
try:
f(False)
except TypeError:
pass
self.assertEqual(0, self.registry.get_sample_value('c'))
try:
f(True)
except ValueError:
raised = True
self.assertEqual(1, self.registry.get_sample_value('c'))
def test_block_decorator(self):
with self.counter.count_exceptions():
pass
self.assertEqual(0, self.registry.get_sample_value('c'))
raised = False
try:
with self.counter.count_exceptions():
raise ValueError
except:
raised = True
self.assertTrue(raised)
self.assertEqual(1, self.registry.get_sample_value('c'))
class TestGauge(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
self.gauge = Gauge('g', 'help', registry=self.registry)
def test_gauge(self):
self.assertEqual(0, self.registry.get_sample_value('g'))
self.gauge.inc()
self.assertEqual(1, self.registry.get_sample_value('g'))
self.gauge.dec(3)
self.assertEqual(-2, self.registry.get_sample_value('g'))
self.gauge.set(9)
self.assertEqual(9, self.registry.get_sample_value('g'))
def test_function_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('g'))
@self.gauge.track_inprogress()
def f():
self.assertEqual(1, self.registry.get_sample_value('g'))
self.assertEqual(([], None, None, None), inspect.getargspec(f))
f()
self.assertEqual(0, self.registry.get_sample_value('g'))
def test_block_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('g'))
with self.gauge.track_inprogress():
self.assertEqual(1, self.registry.get_sample_value('g'))
self.assertEqual(0, self.registry.get_sample_value('g'))
def test_gauge_function(self):
x = {}
self.gauge.set_function(lambda: (len(x), None))
self.assertEqual(0, self.registry.get_sample_value('g'))
self.gauge.inc()
self.assertEqual(0, self.registry.get_sample_value('g'))
x['a'] = None
self.assertEqual(1, self.registry.get_sample_value('g'))
def test_function_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('g'))
@self.gauge.time()
def f():
time.sleep(.001)
self.assertEqual(([], None, None, None), inspect.getargspec(f))
f()
self.assertNotEqual(0, self.registry.get_sample_value('g'))
def test_block_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('g'))
with self.gauge.time():
time.sleep(.001)
self.assertNotEqual(0, self.registry.get_sample_value('g'))
class TestSummary(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
self.summary = Summary('s', 'help', registry=self.registry)
def test_summary(self):
self.assertEqual(0, self.registry.get_sample_value('s_count'))
self.assertEqual(0, self.registry.get_sample_value('s_sum'))
self.summary.observe(10)
self.assertEqual(1, self.registry.get_sample_value('s_count'))
self.assertEqual(10, self.registry.get_sample_value('s_sum'))
def test_function_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('s_count'))
@self.summary.time()
def f():
pass
self.assertEqual(([], None, None, None), inspect.getargspec(f))
f()
self.assertEqual(1, self.registry.get_sample_value('s_count'))
def test_block_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('s_count'))
with self.summary.time():
pass
self.assertEqual(1, self.registry.get_sample_value('s_count'))
class TestHistogram(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
self.histogram = Histogram('h', 'help', registry=self.registry)
self.labels = Histogram('hl', 'help', ['l'], registry=self.registry)
def test_histogram(self):
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '1.0'}))
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '2.5'}))
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '5.0'}))
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))
self.assertEqual(0, self.registry.get_sample_value('h_count'))
self.assertEqual(0, self.registry.get_sample_value('h_sum'))
self.histogram.observe(2)
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '1.0'}))
self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '2.5'}))
self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '5.0'}))
self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))
self.assertEqual(1, self.registry.get_sample_value('h_count'))
self.assertEqual(2, self.registry.get_sample_value('h_sum'))
self.histogram.observe(2.5)
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '1.0'}))
self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '2.5'}))
self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '5.0'}))
self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))
self.assertEqual(2, self.registry.get_sample_value('h_count'))
self.assertEqual(4.5, self.registry.get_sample_value('h_sum'))
self.histogram.observe(float("inf"))
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '1.0'}))
self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '2.5'}))
self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '5.0'}))
self.assertEqual(3, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))
self.assertEqual(3, self.registry.get_sample_value('h_count'))
self.assertEqual(float("inf"), self.registry.get_sample_value('h_sum'))
def test_setting_buckets(self):
h = Histogram('h', 'help', registry=None, buckets=[0, 1, 2])
self.assertEqual([0.0, 1.0, 2.0, float("inf")], h._upper_bounds)
h = Histogram('h', 'help', registry=None, buckets=[0, 1, 2, float("inf")])
self.assertEqual([0.0, 1.0, 2.0, float("inf")], h._upper_bounds)
self.assertRaises(ValueError, Histogram, 'h', 'help', registry=None, buckets=[])
self.assertRaises(ValueError, Histogram, 'h', 'help', registry=None, buckets=[float("inf")])
self.assertRaises(ValueError, Histogram, 'h', 'help', registry=None, buckets=[3, 1])
def test_labels(self):
self.labels.labels('a').observe(2)
self.assertEqual(0, self.registry.get_sample_value('hl_bucket', {'le': '1.0', 'l': 'a'}))
self.assertEqual(1, self.registry.get_sample_value('hl_bucket', {'le': '2.5', 'l': 'a'}))
self.assertEqual(1, self.registry.get_sample_value('hl_bucket', {'le': '5.0', 'l': 'a'}))
self.assertEqual(1, self.registry.get_sample_value('hl_bucket', {'le': '+Inf', 'l': 'a'}))
self.assertEqual(1, self.registry.get_sample_value('hl_count', {'l': 'a'}))
self.assertEqual(2, self.registry.get_sample_value('hl_sum', {'l': 'a'}))
def test_function_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('h_count'))
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))
@self.histogram.time()
def f():
pass
self.assertEqual(([], None, None, None), inspect.getargspec(f))
f()
self.assertEqual(1, self.registry.get_sample_value('h_count'))
self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))
def test_block_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('h_count'))
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))
with self.histogram.time():
pass
self.assertEqual(1, self.registry.get_sample_value('h_count'))
self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))
class TestMetricWrapper(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
self.counter = Counter('c', 'help', labelnames=['l'], registry=self.registry)
self.two_labels = Counter('two', 'help', labelnames=['a', 'b'], registry=self.registry)
def test_child(self):
self.counter.labels('x').inc()
self.assertEqual(1, self.registry.get_sample_value('c', {'l': 'x'}))
self.two_labels.labels('x', 'y').inc(2)
self.assertEqual(2, self.registry.get_sample_value('two', {'a': 'x', 'b': 'y'}))
def test_remove(self):
self.counter.labels('x').inc()
self.counter.labels('y').inc(2)
self.assertEqual(1, self.registry.get_sample_value('c', {'l': 'x'}))
self.assertEqual(2, self.registry.get_sample_value('c', {'l': 'y'}))
self.counter.remove('x')
self.assertEqual(None, self.registry.get_sample_value('c', {'l': 'x'}))
self.assertEqual(2, self.registry.get_sample_value('c', {'l': 'y'}))
def test_incorrect_label_count_raises(self):
self.assertRaises(ValueError, self.counter.labels)
self.assertRaises(ValueError, self.counter.labels, 'a', 'b')
self.assertRaises(ValueError, self.counter.remove)
self.assertRaises(ValueError, self.counter.remove, 'a', 'b')
def test_labels_coerced_to_string(self):
self.counter.labels(None).inc()
self.counter.labels(l=None).inc()
self.assertEqual(2, self.registry.get_sample_value('c', {'l': 'None'}))
self.counter.remove(None)
self.assertEqual(None, self.registry.get_sample_value('c', {'l': 'None'}))
def test_non_string_labels_raises(self):
class Test(object):
__str__ = None
self.assertRaises(TypeError, self.counter.labels, Test())
self.assertRaises(TypeError, self.counter.labels, l=Test())
def test_namespace_subsystem_concatenated(self):
c = Counter('c', 'help', namespace='a', subsystem='b', registry=self.registry)
c.inc()
self.assertEqual(1, self.registry.get_sample_value('a_b_c'))
def test_labels_by_kwarg(self):
self.counter.labels(l='x').inc()
self.assertEqual(1, self.registry.get_sample_value('c', {'l': 'x'}))
self.assertRaises(ValueError, self.counter.labels, l='x', m='y')
self.assertRaises(ValueError, self.counter.labels, m='y')
self.assertRaises(ValueError, self.counter.labels)
self.two_labels.labels(a='x', b='y').inc()
self.assertEqual(1, self.registry.get_sample_value('two', {'a': 'x', 'b': 'y'}))
self.assertRaises(ValueError, self.two_labels.labels, a='x', b='y', c='z')
self.assertRaises(ValueError, self.two_labels.labels, a='x', c='z')
self.assertRaises(ValueError, self.two_labels.labels, b='y', c='z')
self.assertRaises(ValueError, self.two_labels.labels, c='z')
self.assertRaises(ValueError, self.two_labels.labels)
self.assertRaises(ValueError, self.two_labels.labels, {'a': 'x'}, b='y')
def test_invalid_names_raise(self):
self.assertRaises(ValueError, Counter, '', 'help')
self.assertRaises(ValueError, Counter, '^', 'help')
self.assertRaises(ValueError, Counter, '', 'help', namespace='&')
self.assertRaises(ValueError, Counter, '', 'help', subsystem='(')
self.assertRaises(ValueError, Counter, 'c', '', labelnames=['^'])
self.assertRaises(ValueError, Counter, 'c', '', labelnames=['a:b'])
self.assertRaises(ValueError, Counter, 'c', '', labelnames=['__reserved'])
self.assertRaises(ValueError, Summary, 'c', '', labelnames=['quantile'])
def test_empty_labels_list(self):
h = Histogram('h', 'help', [], registry=self.registry)
self.assertEqual(0, self.registry.get_sample_value('h_sum'))
def test_wrapped_original_class(self):
self.assertEqual(Counter.__wrapped__, Counter('foo', 'bar').__class__)
class TestMetricFamilies(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
def custom_collector(self, metric_family):
class CustomCollector(object):
def collect(self):
return [metric_family]
self.registry.register(CustomCollector())
def test_counter(self):
self.custom_collector(CounterMetricFamily('c', 'help', value=1))
self.assertEqual(1, self.registry.get_sample_value('c', {}))
def test_counter_labels(self):
cmf = CounterMetricFamily('c', 'help', labels=['a', 'c'])
cmf.add_metric(['b', 'd'], 2)
self.custom_collector(cmf)
self.assertEqual(2, self.registry.get_sample_value('c', {'a': 'b', 'c': 'd'}))
def test_gauge(self):
self.custom_collector(GaugeMetricFamily('g', 'help', value=1))
self.assertEqual(1, self.registry.get_sample_value('g', {}))
def test_gauge_labels(self):
cmf = GaugeMetricFamily('g', 'help', labels=['a'])
cmf.add_metric(['b'], 2)
self.custom_collector(cmf)
self.assertEqual(2, self.registry.get_sample_value('g', {'a':'b'}))
def test_summary(self):
self.custom_collector(SummaryMetricFamily('s', 'help', count_value=1, sum_value=2))
self.assertEqual(1, self.registry.get_sample_value('s_count', {}))
self.assertEqual(2, self.registry.get_sample_value('s_sum', {}))
def test_summary_labels(self):
cmf = SummaryMetricFamily('s', 'help', labels=['a'])
cmf.add_metric(['b'], count_value=1, sum_value=2)
self.custom_collector(cmf)
self.assertEqual(1, self.registry.get_sample_value('s_count', {'a': 'b'}))
self.assertEqual(2, self.registry.get_sample_value('s_sum', {'a': 'b'}))
def test_histogram(self):
self.custom_collector(HistogramMetricFamily('h', 'help', buckets=[('0', 1), ('+Inf', 2)], sum_value=3))
self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '0'}))
self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))
self.assertEqual(2, self.registry.get_sample_value('h_count', {}))
self.assertEqual(3, self.registry.get_sample_value('h_sum', {}))
def test_histogram_labels(self):
cmf = HistogramMetricFamily('h', 'help', labels=['a'])
cmf.add_metric(['b'], buckets=[('0', 1), ('+Inf', 2)], sum_value=3)
self.custom_collector(cmf)
self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'a': 'b', 'le': '0'}))
self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'a': 'b', 'le': '+Inf'}))
self.assertEqual(2, self.registry.get_sample_value('h_count', {'a': 'b'}))
self.assertEqual(3, self.registry.get_sample_value('h_sum', {'a': 'b'}))
def test_bad_constructors(self):
self.assertRaises(ValueError, CounterMetricFamily, 'c', 'help', value=1, labels=[])
self.assertRaises(ValueError, CounterMetricFamily, 'c', 'help', value=1, labels=['a'])
self.assertRaises(ValueError, GaugeMetricFamily, 'g', 'help', value=1, labels=[])
self.assertRaises(ValueError, GaugeMetricFamily, 'g', 'help', value=1, labels=['a'])
self.assertRaises(ValueError, SummaryMetricFamily, 's', 'help', sum_value=1)
self.assertRaises(ValueError, SummaryMetricFamily, 's', 'help', count_value=1)
self.assertRaises(ValueError, SummaryMetricFamily, 's', 'help', count_value=1, labels=['a'])
self.assertRaises(ValueError, SummaryMetricFamily, 's', 'help', sum_value=1, labels=['a'])
self.assertRaises(ValueError, SummaryMetricFamily, 's', 'help', count_value=1, sum_value=1, labels=['a'])
self.assertRaises(ValueError, HistogramMetricFamily, 'h', 'help', sum_value=1)
self.assertRaises(ValueError, HistogramMetricFamily, 'h', 'help', buckets={})
self.assertRaises(ValueError, HistogramMetricFamily, 'h', 'help', sum_value=1, labels=['a'])
self.assertRaises(ValueError, HistogramMetricFamily, 'h', 'help', buckets={}, labels=['a'])
self.assertRaises(ValueError, HistogramMetricFamily, 'h', 'help', buckets={}, sum_value=1, labels=['a'])
self.assertRaises(KeyError, HistogramMetricFamily, 'h', 'help', buckets={}, sum_value=1)
class TestCollectorRegistry(unittest.TestCase):
def test_duplicate_metrics_raises(self):
registry = CollectorRegistry()
Counter('c', 'help', registry=registry)
self.assertRaises(ValueError, Counter, 'c', 'help', registry=registry)
self.assertRaises(ValueError, Gauge, 'c', 'help', registry=registry)
Gauge('g', 'help', registry=registry)
self.assertRaises(ValueError, Gauge, 'g', 'help', registry=registry)
self.assertRaises(ValueError, Counter, 'g', 'help', registry=registry)
Summary('s', 'help', registry=registry)
self.assertRaises(ValueError, Summary, 's', 'help', registry=registry)
# We don't currently expose quantiles, but let's prevent future
# clashes anyway.
self.assertRaises(ValueError, Gauge, 's', 'help', registry=registry)
Histogram('h', 'help', registry=registry)
self.assertRaises(ValueError, Histogram, 'h', 'help', registry=registry)
# Clashes aggaint various suffixes.
self.assertRaises(ValueError, Summary, 'h', 'help', registry=registry)
self.assertRaises(ValueError, Counter, 'h_count', 'help', registry=registry)
self.assertRaises(ValueError, Counter, 'h_sum', 'help', registry=registry)
self.assertRaises(ValueError, Counter, 'h_bucket', 'help', registry=registry)
# The name of the histogram itself isn't taken.
Counter('h', 'help', registry=registry)
def test_unregister_works(self):
registry = CollectorRegistry()
s = Summary('s', 'help', registry=registry)
self.assertRaises(ValueError, Counter, 's_count', 'help', registry=registry)
registry.unregister(s)
Counter('s_count', 'help', registry=registry)
def custom_collector(self, metric_family, registry):
class CustomCollector(object):
def collect(self):
return [metric_family]
registry.register(CustomCollector())
def test_autodescribe_disabled_by_default(self):
registry = CollectorRegistry()
self.custom_collector(CounterMetricFamily('c', 'help', value=1), registry)
self.custom_collector(CounterMetricFamily('c', 'help', value=1), registry)
registry = CollectorRegistry(auto_describe=True)
self.custom_collector(CounterMetricFamily('c', 'help', value=1), registry)
self.assertRaises(ValueError, self.custom_collector, CounterMetricFamily('c', 'help', value=1), registry)
def test_restricted_registry(self):
registry = CollectorRegistry()
Counter('c', 'help', registry=registry)
Summary('s', 'help', registry=registry).observe(7)
m = Metric('s', 'help', 'summary')
m.samples = [('s_sum', {}, (7, None))]
self.assertEquals([m], registry.restricted_registry(['s_sum']).collect())
if __name__ == '__main__':
unittest.main()
from __future__ import unicode_literals
import sys
import threading
if sys.version_info < (2, 7):
# We need the skip decorators from unittest2 on Python 2.6.
import unittest2 as unittest
else:
import unittest
from prometheus_client import Gauge, Counter, Summary, Histogram, Metric
from prometheus_client import CollectorRegistry, generate_latest
from prometheus_client import push_to_gateway, pushadd_to_gateway, delete_from_gateway
from prometheus_client import CONTENT_TYPE_LATEST, instance_ip_grouping_key
from prometheus_client.exposition import default_handler, basic_auth_handler
try:
from BaseHTTPServer import BaseHTTPRequestHandler
from BaseHTTPServer import HTTPServer
except ImportError:
# Python 3
from http.server import BaseHTTPRequestHandler
from http.server import HTTPServer
class TestGenerateText(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
def test_counter(self):
c = Counter('cc', 'A counter', registry=self.registry)
c.inc()
self.assertEqual(b'# HELP cc A counter\n# TYPE cc counter\ncc 1.0\n', generate_latest(self.registry))
def test_gauge(self):
g = Gauge('gg', 'A gauge', registry=self.registry)
g.set(17)
self.assertEqual(b'# HELP gg A gauge\n# TYPE gg gauge\ngg 17.0\n', generate_latest(self.registry))
def test_summary(self):
s = Summary('ss', 'A summary', ['a', 'b'], registry=self.registry)
s.labels('c', 'd').observe(17)
self.assertEqual(b'# HELP ss A summary\n# TYPE ss summary\nss_count{a="c",b="d"} 1.0\nss_sum{a="c",b="d"} 17.0\n', generate_latest(self.registry))
@unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.")
def test_histogram(self):
s = Histogram('hh', 'A histogram', registry=self.registry)
s.observe(0.05)
self.assertEqual(b'''# HELP hh A histogram
# TYPE hh histogram
hh_bucket{le="0.005"} 0.0
hh_bucket{le="0.01"} 0.0
hh_bucket{le="0.025"} 0.0
hh_bucket{le="0.05"} 1.0
hh_bucket{le="0.075"} 1.0
hh_bucket{le="0.1"} 1.0
hh_bucket{le="0.25"} 1.0
hh_bucket{le="0.5"} 1.0
hh_bucket{le="0.75"} 1.0
hh_bucket{le="1.0"} 1.0
hh_bucket{le="2.5"} 1.0
hh_bucket{le="5.0"} 1.0
hh_bucket{le="7.5"} 1.0
hh_bucket{le="10.0"} 1.0
hh_bucket{le="+Inf"} 1.0
hh_count 1.0
hh_sum 0.05
''', generate_latest(self.registry))
def test_unicode(self):
c = Counter('cc', '\u4500', ['l'], registry=self.registry)
c.labels('\u4500').inc()
self.assertEqual(b'# HELP cc \xe4\x94\x80\n# TYPE cc counter\ncc{l="\xe4\x94\x80"} 1.0\n', generate_latest(self.registry))
def test_escaping(self):
c = Counter('cc', 'A\ncount\\er', ['a'], registry=self.registry)
c.labels('\\x\n"').inc(1)
self.assertEqual(b'# HELP cc A\\ncount\\\\er\n# TYPE cc counter\ncc{a="\\\\x\\n\\""} 1.0\n', generate_latest(self.registry))
def test_nonnumber(self):
class MyNumber():
def __repr__(self):
return "MyNumber(123)"
def __float__(self):
return 123.0
class MyCollector():
def collect(self):
metric = Metric("nonnumber", "Non number", 'untyped')
metric.add_sample("nonnumber", {}, MyNumber())
yield metric
self.registry.register(MyCollector())
self.assertEqual(b'# HELP nonnumber Non number\n# TYPE nonnumber untyped\nnonnumber 123.0\n', generate_latest(self.registry))
class TestPushGateway(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
self.counter = Gauge('g', 'help', registry=self.registry)
self.requests = requests = []
class TestHandler(BaseHTTPRequestHandler):
def do_PUT(self):
if 'with_basic_auth' in self.requestline and self.headers['authorization'] != 'Basic Zm9vOmJhcg==':
self.send_response(401)
else:
self.send_response(201)
length = int(self.headers['content-length'])
requests.append((self, self.rfile.read(length)))
self.end_headers()
do_POST = do_PUT
do_DELETE = do_PUT
httpd = HTTPServer(('localhost', 0), TestHandler)
self.address = 'http://localhost:{0}'.format(httpd.server_address[1])
class TestServer(threading.Thread):
def run(self):
httpd.handle_request()
self.server = TestServer()
self.server.daemon = True
self.server.start()
def test_push(self):
push_to_gateway(self.address, "my_job", self.registry)
self.assertEqual(self.requests[0][0].command, 'PUT')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
def test_push_with_groupingkey(self):
push_to_gateway(self.address, "my_job", self.registry, {'a': 9})
self.assertEqual(self.requests[0][0].command, 'PUT')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job/a/9')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
def test_push_with_complex_groupingkey(self):
push_to_gateway(self.address, "my_job", self.registry, {'a': 9, 'b': 'a/ z'})
self.assertEqual(self.requests[0][0].command, 'PUT')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job/a/9/b/a%2F+z')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
def test_pushadd(self):
pushadd_to_gateway(self.address, "my_job", self.registry)
self.assertEqual(self.requests[0][0].command, 'POST')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
def test_pushadd_with_groupingkey(self):
pushadd_to_gateway(self.address, "my_job", self.registry, {'a': 9})
self.assertEqual(self.requests[0][0].command, 'POST')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job/a/9')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
def test_delete(self):
delete_from_gateway(self.address, "my_job")
self.assertEqual(self.requests[0][0].command, 'DELETE')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'')
def test_delete_with_groupingkey(self):
delete_from_gateway(self.address, "my_job", {'a': 9})
self.assertEqual(self.requests[0][0].command, 'DELETE')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job/a/9')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'')
def test_push_with_handler(self):
def my_test_handler(url, method, timeout, headers, data):
headers.append(['X-Test-Header', 'foobar'])
return default_handler(url, method, timeout, headers, data)
push_to_gateway(self.address, "my_job", self.registry, handler=my_test_handler)
self.assertEqual(self.requests[0][0].command, 'PUT')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][0].headers.get('x-test-header'), 'foobar')
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
def test_push_with_basic_auth_handler(self):
def my_auth_handler(url, method, timeout, headers, data):
return basic_auth_handler(url, method, timeout, headers, data, "foo", "bar")
push_to_gateway(self.address, "my_job_with_basic_auth", self.registry, handler=my_auth_handler)
self.assertEqual(self.requests[0][0].command, 'PUT')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job_with_basic_auth')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
@unittest.skipIf(
sys.platform == "darwin",
"instance_ip_grouping_key() does not work on macOS."
)
def test_instance_ip_grouping_key(self):
self.assertTrue('' != instance_ip_grouping_key()['instance'])
if __name__ == '__main__':
unittest.main()
import unittest
import threading
try:
import SocketServer
except ImportError:
import socketserver as SocketServer
from prometheus_client import Counter, CollectorRegistry
from prometheus_client.bridge.graphite import GraphiteBridge
def fake_timer():
return 1434898897.5
class TestGraphiteBridge(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
self.data = ''
class TCPHandler(SocketServer.BaseRequestHandler):
def handle(s):
self.data = s.request.recv(1024)
server = SocketServer.TCPServer(('', 0), TCPHandler)
class ServingThread(threading.Thread):
def run(self):
server.handle_request()
server.socket.close()
self.t = ServingThread()
self.t.start()
# Explicitly use localhost as the target host, since connecting to 0.0.0.0 fails on Windows
address = ('localhost', server.server_address[1])
self.gb = GraphiteBridge(address, self.registry, _timer=fake_timer)
def test_nolabels(self):
counter = Counter('c', 'help', registry=self.registry)
counter.inc()
self.gb.push()
self.t.join()
self.assertEqual(b'c 1.0 1434898897\n', self.data)
def test_labels(self):
labels = Counter('labels', 'help', ['a', 'b'], registry=self.registry)
labels.labels('c', 'd').inc()
self.gb.push()
self.t.join()
self.assertEqual(b'labels.a.c.b.d 1.0 1434898897\n', self.data)
def test_prefix(self):
labels = Counter('labels', 'help', ['a', 'b'], registry=self.registry)
labels.labels('c', 'd').inc()
self.gb.push(prefix = 'pre.fix')
self.t.join()
self.assertEqual(b'pre.fix.labels.a.c.b.d 1.0 1434898897\n', self.data)
def test_sanitizing(self):
labels = Counter('labels', 'help', ['a'], registry=self.registry)
labels.labels('c.:8').inc()
self.gb.push()
self.t.join()
self.assertEqual(b'labels.a.c__8 1.0 1434898897\n', self.data)
from __future__ import unicode_literals
import os
import shutil
import tempfile
import time
import unittest
import prometheus_client
from prometheus_client.core import *
from prometheus_client.multiprocess import *
class TestMultiProcess(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
os.environ['prometheus_multiproc_dir'] = self.tempdir
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(123)
self.registry = CollectorRegistry()
MultiProcessCollector(self.registry, self.tempdir)
def tearDown(self):
del os.environ['prometheus_multiproc_dir']
shutil.rmtree(self.tempdir)
prometheus_client.core._ValueClass = prometheus_client.core._MutexValue
def test_counter_adds(self):
c1 = Counter('c', 'help', registry=None)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
c2 = Counter('c', 'help', registry=None)
self.assertEqual(0, self.registry.get_sample_value('c'))
c1.inc(1)
c2.inc(2)
self.assertEqual(3, self.registry.get_sample_value('c'))
def test_summary_adds(self):
s1 = Summary('s', 'help', registry=None)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
s2 = Summary('s', 'help', registry=None)
self.assertEqual(0, self.registry.get_sample_value('s_count'))
self.assertEqual(0, self.registry.get_sample_value('s_sum'))
s1.observe(1)
s2.observe(2)
self.assertEqual(2, self.registry.get_sample_value('s_count'))
self.assertEqual(3, self.registry.get_sample_value('s_sum'))
def test_histogram_adds(self):
h1 = Histogram('h', 'help', registry=None)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
h2 = Histogram('h', 'help', registry=None)
self.assertEqual(0, self.registry.get_sample_value('h_count'))
self.assertEqual(0, self.registry.get_sample_value('h_sum'))
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '5.0'}))
h1.observe(1)
h2.observe(2)
self.assertEqual(2, self.registry.get_sample_value('h_count'))
self.assertEqual(3, self.registry.get_sample_value('h_sum'))
self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '5.0'}))
def test_gauge_all(self):
g1 = Gauge('g', 'help', registry=None)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
g2 = Gauge('g', 'help', registry=None)
self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '123'}))
self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '456'}))
g1.set(1)
g2.set(2)
mark_process_dead(123, os.environ['prometheus_multiproc_dir'])
self.assertEqual(1, self.registry.get_sample_value('g', {'pid': '123'}))
self.assertEqual(2, self.registry.get_sample_value('g', {'pid': '456'}))
def test_gauge_liveall(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='liveall')
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='liveall')
self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '123'}))
self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '456'}))
g1.set(1)
g2.set(2)
self.assertEqual(1, self.registry.get_sample_value('g', {'pid': '123'}))
self.assertEqual(2, self.registry.get_sample_value('g', {'pid': '456'}))
mark_process_dead(123, os.environ['prometheus_multiproc_dir'])
self.assertEqual(None, self.registry.get_sample_value('g', {'pid': '123'}))
self.assertEqual(2, self.registry.get_sample_value('g', {'pid': '456'}))
def test_gauge_min(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='min')
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='min')
self.assertEqual(0, self.registry.get_sample_value('g'))
g1.set(1)
g2.set(2)
self.assertEqual(1, self.registry.get_sample_value('g'))
def test_gauge_max(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='max')
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='max')
self.assertEqual(0, self.registry.get_sample_value('g'))
g1.set(1)
g2.set(2)
self.assertEqual(2, self.registry.get_sample_value('g'))
def test_gauge_livesum(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='livesum')
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='livesum')
self.assertEqual(0, self.registry.get_sample_value('g'))
g1.set(1)
g2.set(2)
self.assertEqual(3, self.registry.get_sample_value('g'))
mark_process_dead(123, os.environ['prometheus_multiproc_dir'])
self.assertEqual(2, self.registry.get_sample_value('g'))
def test_namespace_subsystem(self):
c1 = Counter('c', 'help', registry=None, namespace='ns', subsystem='ss')
c1.inc(1)
self.assertEqual(1, self.registry.get_sample_value('ns_ss_c'))
class TestMmapedDict(unittest.TestCase):
def setUp(self):
fd, self.tempfile = tempfile.mkstemp()
os.close(fd)
self.d = core._MmapedDict(self.tempfile)
def test_process_restart(self):
self.d.write_value('abc', (123.0, None))
self.d.close()
self.d = core._MmapedDict(self.tempfile)
self.assertEqual((123, None), self.d.read_value('abc'))
self.assertEqual([('abc', (123.0, None))], list(self.d.read_all_values()))
def test_expansion(self):
key = 'a' * core._INITIAL_MMAP_SIZE
self.d.write_value(key, (123.0, None))
self.assertEqual([(key, (123.0, None))], list(self.d.read_all_values()))
def test_multi_expansion(self):
key = 'a' * core._INITIAL_MMAP_SIZE * 4
self.d.write_value('abc', (42.0, None))
self.d.write_value(key, (123.0, None))
self.d.write_value('def', (17.0, None))
self.assertEqual([('abc', (42.0, None)), (key, (123.0, None)), ('def', (17.0, None))],
list(self.d.read_all_values()))
def tearDown(self):
os.unlink(self.tempfile)
from __future__ import unicode_literals
import sys
if sys.version_info < (2, 7):
# We need the skip decorators from unittest2 on Python 2.6.
import unittest2 as unittest
else:
import unittest
from prometheus_client.core import *
from prometheus_client.exposition import *
from prometheus_client.parser import *
class TestParse(unittest.TestCase):
def test_simple_counter(self):
families = text_string_to_metric_families("""# TYPE a counter
# HELP a help
a 1
""")
self.assertEqual([CounterMetricFamily("a", "help", value=1)], list(families))
def test_simple_gauge(self):
families = text_string_to_metric_families("""# TYPE a gauge
# HELP a help
a 1
""")
self.assertEqual([GaugeMetricFamily("a", "help", value=1)], list(families))
def test_simple_summary(self):
families = text_string_to_metric_families("""# TYPE a summary
# HELP a help
a_count 1
a_sum 2
""")
def test_summary_quantiles(self):
families = text_string_to_metric_families("""# TYPE a summary
# HELP a help
a_count 1
a_sum 2
a{quantile="0.5"} 0.7
""")
# The Python client doesn't support quantiles, but we
# still need to be able to parse them.
metric_family = SummaryMetricFamily("a", "help", count_value=1, sum_value=2)
metric_family.add_sample("a", {"quantile": "0.5"}, (0.7, None))
self.assertEqual([metric_family], list(families))
def test_simple_histogram(self):
families = text_string_to_metric_families("""# TYPE a histogram
# HELP a help
a_bucket{le="1"} 0
a_bucket{le="+Inf"} 3
a_count 3
a_sum 2
""")
self.assertEqual([HistogramMetricFamily("a", "help", sum_value=2, buckets=[("1", 0.0), ("+Inf", 3.0)])], list(families))
def test_no_metadata(self):
families = text_string_to_metric_families("""a 1
""")
metric_family = Metric("a", "", "untyped")
metric_family.add_sample("a", {}, (1, None))
self.assertEqual([metric_family], list(families))
def test_untyped(self):
# https://github.com/prometheus/client_python/issues/79
families = text_string_to_metric_families("""# HELP redis_connected_clients Redis connected clients
# TYPE redis_connected_clients untyped
redis_connected_clients{instance="rough-snowflake-web",port="6380"} 10.0
redis_connected_clients{instance="rough-snowflake-web",port="6381"} 12.0
""")
m = Metric("redis_connected_clients", "Redis connected clients", "untyped")
m.samples = [
("redis_connected_clients", {"instance": "rough-snowflake-web", "port": "6380"}, (10, None)),
("redis_connected_clients", {"instance": "rough-snowflake-web", "port": "6381"}, (12, None)),
]
self.assertEqual([m], list(families))
def test_type_help_switched(self):
families = text_string_to_metric_families("""# HELP a help
# TYPE a counter
a 1
""")
self.assertEqual([CounterMetricFamily("a", "help", value=1)], list(families))
def test_blank_lines_and_comments(self):
families = text_string_to_metric_families("""
# TYPE a counter
# FOO a
# BAR b
# HELP a help
a 1
""")
self.assertEqual([CounterMetricFamily("a", "help", value=1)], list(families))
def test_tabs(self):
families = text_string_to_metric_families("""#\tTYPE\ta\tcounter
#\tHELP\ta\thelp
a\t1
""")
self.assertEqual([CounterMetricFamily("a", "help", value=1)], list(families))
def test_empty_help(self):
families = text_string_to_metric_families("""# TYPE a counter
# HELP a
a 1
""")
self.assertEqual([CounterMetricFamily("a", "", value=1)], list(families))
def test_labels_and_infinite(self):
families = text_string_to_metric_families("""# TYPE a counter
# HELP a help
a{foo="bar"} +Inf
a{foo="baz"} -Inf
""")
metric_family = CounterMetricFamily("a", "help", labels=["foo"])
metric_family.add_metric(["bar"], core._INF)
metric_family.add_metric(["baz"], core._MINUS_INF)
self.assertEqual([metric_family], list(families))
def test_spaces(self):
families = text_string_to_metric_families("""# TYPE a counter
# HELP a help
a{ foo = "bar" } 1
a\t\t{\t\tfoo\t\t=\t\t"baz"\t\t}\t\t2
""")
metric_family = CounterMetricFamily("a", "help", labels=["foo"])
metric_family.add_metric(["bar"], 1)
metric_family.add_metric(["baz"], 2)
self.assertEqual([metric_family], list(families))
def test_commas(self):
families = text_string_to_metric_families("""# TYPE a counter
# HELP a help
a{foo="bar",} 1
# TYPE b counter
# HELP b help
b{,} 2
""")
a = CounterMetricFamily("a", "help", labels=["foo"])
a.add_metric(["bar"], 1)
b = CounterMetricFamily("b", "help", value=2)
self.assertEqual([a, b], list(families))
def test_empty_brackets(self):
families = text_string_to_metric_families("""# TYPE a counter
# HELP a help
a{} 1
""")
self.assertEqual([CounterMetricFamily("a", "help", value=1)], list(families))
def test_nan(self):
families = text_string_to_metric_families("""a NaN
""")
# Can't use a simple comparison as nan != nan.
self.assertTrue(math.isnan(list(families)[0].samples[0][2][0]))
def test_escaping(self):
families = text_string_to_metric_families("""# TYPE a counter
# HELP a he\\n\\\\l\\tp
a{foo="b\\"a\\nr"} 1
a{foo="b\\\\a\\z"} 2
""")
metric_family = CounterMetricFamily("a", "he\n\\l\\tp", labels=["foo"])
metric_family.add_metric(["b\"a\nr"], 1)
metric_family.add_metric(["b\\a\\z"], 2)
self.assertEqual([metric_family], list(families))
@unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.")
def test_roundtrip(self):
text = """# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.013300656000000001
go_gc_duration_seconds{quantile="0.25"} 0.013638736
go_gc_duration_seconds{quantile="0.5"} 0.013759906
go_gc_duration_seconds{quantile="0.75"} 0.013962066
go_gc_duration_seconds{quantile="1"} 0.021383540000000003
go_gc_duration_seconds_sum 56.12904785
go_gc_duration_seconds_count 7476.0
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 166.0
# HELP prometheus_local_storage_indexing_batch_duration_milliseconds Quantiles for batch indexing duration in milliseconds.
# TYPE prometheus_local_storage_indexing_batch_duration_milliseconds summary
prometheus_local_storage_indexing_batch_duration_milliseconds{quantile="0.5"} NaN
prometheus_local_storage_indexing_batch_duration_milliseconds{quantile="0.9"} NaN
prometheus_local_storage_indexing_batch_duration_milliseconds{quantile="0.99"} NaN
prometheus_local_storage_indexing_batch_duration_milliseconds_sum 871.5665949999999
prometheus_local_storage_indexing_batch_duration_milliseconds_count 229.0
# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 29323.4
# HELP process_virtual_memory_bytes Virtual memory size in bytes.
# TYPE process_virtual_memory_bytes gauge
process_virtual_memory_bytes 2478268416.0
# HELP prometheus_build_info A metric with a constant '1' value labeled by version, revision, and branch from which Prometheus was built.
# TYPE prometheus_build_info gauge
prometheus_build_info{branch="HEAD",revision="ef176e5",version="0.16.0rc1"} 1.0
# HELP prometheus_local_storage_chunk_ops_total The total number of chunk operations by their type.
# TYPE prometheus_local_storage_chunk_ops_total counter
prometheus_local_storage_chunk_ops_total{type="clone"} 28.0
prometheus_local_storage_chunk_ops_total{type="create"} 997844.0
prometheus_local_storage_chunk_ops_total{type="drop"} 1345758.0
prometheus_local_storage_chunk_ops_total{type="load"} 1641.0
prometheus_local_storage_chunk_ops_total{type="persist"} 981408.0
prometheus_local_storage_chunk_ops_total{type="pin"} 32662.0
prometheus_local_storage_chunk_ops_total{type="transcode"} 980180.0
prometheus_local_storage_chunk_ops_total{type="unpin"} 32662.0
"""
families = list(text_string_to_metric_families(text))
class TextCollector(object):
def collect(self):
return families
registry = CollectorRegistry()
registry.register(TextCollector())
self.assertEqual(text.encode('utf-8'), generate_latest(registry))
if __name__ == '__main__':
unittest.main()
from __future__ import unicode_literals
import unittest
from prometheus_client import CollectorRegistry, PlatformCollector
class TestPlatformCollector(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
self.platform = _MockPlatform()
def test_python_info(self):
PlatformCollector(registry=self.registry, platform=self.platform)
self.assertLabels("python_info", {
"version": "python_version",
"implementation": "python_implementation",
"major": "pvt_major",
"minor": "pvt_minor",
"patchlevel": "pvt_patchlevel"
})
def test_system_info_java(self):
self.platform._system = "Java"
PlatformCollector(registry=self.registry, platform=self.platform)
self.assertLabels("python_info", {
"version": "python_version",
"implementation": "python_implementation",
"major": "pvt_major",
"minor": "pvt_minor",
"patchlevel": "pvt_patchlevel",
"jvm_version": "jv_release",
"jvm_release": "vm_release",
"jvm_vendor": "vm_vendor",
"jvm_name": "vm_name"
})
def assertLabels(self, name, labels):
for metric in self.registry.collect():
for n, l, value in metric.samples:
if n == name:
assert l == labels
return
assert False
class _MockPlatform(object):
def __init__(self):
self._system = "system"
def python_version_tuple(self):
return "pvt_major", "pvt_minor", "pvt_patchlevel"
def python_version(self):
return "python_version"
def python_implementation(self):
return "python_implementation"
def system(self):
return self._system
def java_ver(self):
return (
"jv_release",
"jv_vendor",
("vm_name", "vm_release", "vm_vendor"),
("os_name", "os_version", "os_arch")
)
from __future__ import unicode_literals
import os
import unittest
from prometheus_client import CollectorRegistry, ProcessCollector
class TestProcessCollector(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
self.test_proc = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'proc')
def test_working(self):
collector = ProcessCollector(proc=self.test_proc, pid=lambda: 26231, registry=self.registry)
collector._ticks = 100
self.assertEqual(17.21, self.registry.get_sample_value('process_cpu_seconds_total'))
self.assertEqual(56274944.0, self.registry.get_sample_value('process_virtual_memory_bytes'))
self.assertEqual(8114176, self.registry.get_sample_value('process_resident_memory_bytes'))
self.assertEqual(1418184099.75, self.registry.get_sample_value('process_start_time_seconds'))
self.assertEqual(2048.0, self.registry.get_sample_value('process_max_fds'))
self.assertEqual(5.0, self.registry.get_sample_value('process_open_fds'))
self.assertEqual(None, self.registry.get_sample_value('process_fake_namespace'))
def test_namespace(self):
collector = ProcessCollector(proc=self.test_proc, pid=lambda: 26231, registry=self.registry, namespace='n')
collector._ticks = 100
self.assertEqual(17.21, self.registry.get_sample_value('n_process_cpu_seconds_total'))
self.assertEqual(56274944.0, self.registry.get_sample_value('n_process_virtual_memory_bytes'))
self.assertEqual(8114176, self.registry.get_sample_value('n_process_resident_memory_bytes'))
self.assertEqual(1418184099.75, self.registry.get_sample_value('n_process_start_time_seconds'))
self.assertEqual(2048.0, self.registry.get_sample_value('n_process_max_fds'))
self.assertEqual(5.0, self.registry.get_sample_value('n_process_open_fds'))
self.assertEqual(None, self.registry.get_sample_value('process_cpu_seconds_total'))
def test_working_584(self):
collector = ProcessCollector(proc=self.test_proc, pid=lambda: "584\n", registry=self.registry)
collector._ticks = 100
self.assertEqual(0.0, self.registry.get_sample_value('process_cpu_seconds_total'))
self.assertEqual(10395648.0, self.registry.get_sample_value('process_virtual_memory_bytes'))
self.assertEqual(634880, self.registry.get_sample_value('process_resident_memory_bytes'))
self.assertEqual(1418291667.75, self.registry.get_sample_value('process_start_time_seconds'))
self.assertEqual(None, self.registry.get_sample_value('process_max_fds'))
self.assertEqual(None, self.registry.get_sample_value('process_open_fds'))
def test_working_fake_pid(self):
collector = ProcessCollector(proc=self.test_proc, pid=lambda: 123, registry=self.registry)
collector._ticks = 100
self.assertEqual(None, self.registry.get_sample_value('process_cpu_seconds_total'))
self.assertEqual(None, self.registry.get_sample_value('process_virtual_memory_bytes'))
self.assertEqual(None, self.registry.get_sample_value('process_resident_memory_bytes'))
self.assertEqual(None, self.registry.get_sample_value('process_start_time_seconds'))
self.assertEqual(None, self.registry.get_sample_value('process_max_fds'))
self.assertEqual(None, self.registry.get_sample_value('process_open_fds'))
self.assertEqual(None, self.registry.get_sample_value('process_fake_namespace'))
if __name__ == '__main__':
unittest.main()
from __future__ import absolute_import, unicode_literals
import sys
if sys.version_info < (2, 7):
from unittest2 import skipUnless
else:
from unittest import skipUnless
from prometheus_client import Counter
from prometheus_client import CollectorRegistry, generate_latest
try:
from prometheus_client.twisted import MetricsResource
from twisted.trial.unittest import TestCase
from twisted.web.server import Site
from twisted.web.resource import Resource
from twisted.internet import reactor
from twisted.web.client import Agent
from twisted.web.client import readBody
HAVE_TWISTED = True
except ImportError:
from unittest import TestCase
HAVE_TWISTED = False
class MetricsResourceTest(TestCase):
@skipUnless(HAVE_TWISTED, "Don't have twisted installed.")
def setUp(self):
self.registry = CollectorRegistry()
def test_reports_metrics(self):
"""
``MetricsResource`` serves the metrics from the provided registry.
"""
c = Counter('cc', 'A counter', registry=self.registry)
c.inc()
root = Resource()
root.putChild(b'metrics', MetricsResource(registry=self.registry))
server = reactor.listenTCP(0, Site(root))
self.addCleanup(server.stopListening)
agent = Agent(reactor)
port = server.getHost().port
url = "http://localhost:{port}/metrics".format(port=port)
d = agent.request(b"GET", url.encode("ascii"))
d.addCallback(readBody)
d.addCallback(self.assertEqual, generate_latest(self.registry))
return d
[tox]
envlist = coverage-clean,py26,py27,py34,py35,py36,pypy,{py27,py35,py36}-nooptionals,coverage-report
[base]
deps =
coverage
pytest
[testenv]
deps =
{[base]deps}
py26: unittest2
; Twisted does not support Python 2.6.
{py27,py34,py35,pypy}: twisted
commands = coverage run --parallel -m pytest {posargs}
; Ensure test suite passes if no optional dependencies are present.
[testenv:py27-nooptionals]
deps = {[base]deps}
commands = coverage run --parallel -m pytest {posargs}
[testenv:py35-nooptionals]
deps = {[base]deps}
commands = coverage run --parallel -m pytest {posargs}
[testenv:coverage-clean]
deps = coverage
skip_install = true
commands = coverage erase
[testenv:coverage-report]
deps = coverage
skip_install = true
commands =
coverage combine
coverage report
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment