diff --git a/README.md b/README.md index d3c9e62a..33769544 100644 --- a/README.md +++ b/README.md @@ -246,6 +246,33 @@ write_to_textfile('/configured/textfile/path/raid.prom', registry) A separate registry is used, as the default registry may contain other metrics such as those from the Process Collector. +## Exporting to a Pushgateway + +The [Pushgateway](https://github.com/prometheus/pushgateway) +allows ephemeral and batch jobs to expose their metrics to Prometheus. + +```python +from prometheus_client import CollectorRegistry,Gauge,push_to_gateway +registry = CollectorRegistry() +g = Gauge('job_last_success_unixtime', 'Last time a batch job successfully finished', registry=registry) +g.set_to_current_time() +push_to_gateway('localhost:9091', job='batchA', registry=registry) +``` + +A separate registry is used, as the default registry may contain other metrics +such as those from the Process Collector. + +Pushgateway functions take a grouping key. `push_to_gateway` replaces metrics +with the same grouping key, `pushadd_to_gateway` only replaces metrics with the +same name and grouping key and `delete_from_gateway` deletes metrics with the +given job and grouping key. See the +[Pushgateway documentation](https://github.com/prometheus/pushgateway/blob/master/README.md) +for more information. + +`instance_ip_grouping_key` returns a grouping key with the instance label set +to the host's IP address. + + ## Bridges It is also possible to expose metrics to systems other than Prometheus. diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py index f40261ce..80424dbf 100644 --- a/prometheus_client/__init__.py +++ b/prometheus_client/__init__.py @@ -21,6 +21,10 @@ MetricsHandler = exposition.MetricsHandler start_http_server = exposition.start_http_server write_to_textfile = exposition.write_to_textfile +push_to_gateway = exposition.push_to_gateway +pushadd_to_gateway = exposition.pushadd_to_gateway +delete_from_gateway = exposition.delete_from_gateway +instance_ip_grouping_key = exposition.instance_ip_grouping_key ProcessCollector = process_collector.ProcessCollector PROCESS_COLLECTOR = process_collector.PROCESS_COLLECTOR diff --git a/prometheus_client/exposition.py b/prometheus_client/exposition.py index 51e57c07..4b71fc44 100644 --- a/prometheus_client/exposition.py +++ b/prometheus_client/exposition.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals import os +import socket import time import threading @@ -10,11 +11,15 @@ try: from BaseHTTPServer import BaseHTTPRequestHandler from BaseHTTPServer import HTTPServer + from urllib2 import build_opener, Request, HTTPHandler + from urllib import quote_plus except ImportError: # Python 3 unicode = str from http.server import BaseHTTPRequestHandler from http.server import HTTPServer + from urllib.request import build_opener, Request, HTTPHandler + from urllib.parse import quote_plus CONTENT_TYPE_LATEST = 'text/plain; version=0.0.4; charset=utf-8' @@ -72,3 +77,54 @@ def write_to_textfile(path, registry): f.write(generate_latest(registry)) # rename(2) is atomic. os.rename(tmppath, path) + + +def push_to_gateway(gateway, job, registry=core.REGISTRY, grouping_key=None, timeout=None): + '''Push metrics to the given pushgateway. + + This overwrites all metrics with the same job and grouping_key. + This uses the PUT HTTP method.''' + _use_gateway('PUT', gateway, job, registry, grouping_key, timeout) + + +def pushadd_to_gateway(gateway, job, registry=core.REGISTRY, grouping_key=None, timeout=None): + '''PushAdd metrics to the given pushgateway. + + This replaces metrics with the same name, job and grouping_key. + This uses the POST HTTP method.''' + _use_gateway('POST', gateway, job, registry, grouping_key, timeout) + + +def delete_from_gateway(gateway, job, grouping_key=None, timeout=None): + '''Delete metrics from the given pushgateway. + + This deletes metrics with the given job and grouping_key. + This uses the DELETE HTTP method.''' + _use_gateway('DELETE', gateway, job, None, grouping_key, timeout) + + +def _use_gateway(method, gateway, job, registry, grouping_key, timeout): + url = 'http://{0}/job/{1}'.format(gateway, quote_plus(job)) + + data = b'' + if method != 'DELETE': + data = generate_latest(registry) + + if grouping_key is None: + grouping_key = {} + url = url + ''.join(['/{0}/{1}'.format(quote_plus(str(k)), quote_plus(str(v))) + for k, v in sorted(grouping_key.items())]) + + request = Request(url, data=data) + request.add_header('Content-Type', CONTENT_TYPE_LATEST) + request.get_method = lambda: method + resp = build_opener(HTTPHandler).open(request, timeout=timeout) + if resp.code >= 400: + raise IOError("error talking to pushgateway: {0} {1}".format( + resp.code, resp.msg)) + +def instance_ip_grouping_key(): + '''Grouping key with instance set to the IP Address of this host.''' + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(('', 0)) + return {'instance': s.getsockname()[0]} diff --git a/tests/test_client.py b/tests/test_client.py index 70204dad..2ee2fc08 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,10 +1,22 @@ from __future__ import unicode_literals import os +import threading import unittest from prometheus_client import Gauge, Counter, Summary, Histogram, Metric from prometheus_client import CollectorRegistry, generate_latest, ProcessCollector +from prometheus_client import push_to_gateway, pushadd_to_gateway, delete_from_gateway +from prometheus_client import CONTENT_TYPE_LATEST, instance_ip_grouping_key + +try: + from BaseHTTPServer import BaseHTTPRequestHandler + from BaseHTTPServer import HTTPServer +except ImportError: + # Python 3 + from http.server import BaseHTTPRequestHandler + from http.server import HTTPServer + class TestCounter(unittest.TestCase): @@ -372,6 +384,80 @@ def test_working_fake_pid(self): self.assertEqual(None, self.registry.get_sample_value('process_fake_namespace')) +class TestPushGateway(unittest.TestCase): + def setUp(self): + self.registry = CollectorRegistry() + self.counter = Gauge('g', 'help', registry=self.registry) + self.requests = requests = [] + class TestHandler(BaseHTTPRequestHandler): + def do_PUT(self): + self.send_response(201) + length = int(self.headers['content-length']) + requests.append((self, self.rfile.read(length))) + + do_POST = do_PUT + do_DELETE = do_PUT + + httpd = HTTPServer(('', 0), TestHandler) + self.address = ':'.join([str(x) for x in httpd.server_address]) + class TestServer(threading.Thread): + def run(self): + httpd.handle_request() + self.server = TestServer() + self.server.daemon = True + self.server.start() + + def test_push(self): + push_to_gateway(self.address, "my_job", self.registry) + self.assertEqual(self.requests[0][0].command, 'PUT') + self.assertEqual(self.requests[0][0].path, '/job/my_job') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_push_with_groupingkey(self): + push_to_gateway(self.address, "my_job", self.registry, {'a': 9}) + self.assertEqual(self.requests[0][0].command, 'PUT') + self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_push_with_complex_groupingkey(self): + push_to_gateway(self.address, "my_job", self.registry, {'a': 9, 'b': 'a/ z'}) + self.assertEqual(self.requests[0][0].command, 'PUT') + self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9/b/a%2F+z') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_pushadd(self): + pushadd_to_gateway(self.address, "my_job", self.registry) + self.assertEqual(self.requests[0][0].command, 'POST') + self.assertEqual(self.requests[0][0].path, '/job/my_job') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_pushadd_with_groupingkey(self): + pushadd_to_gateway(self.address, "my_job", self.registry, {'a': 9}) + self.assertEqual(self.requests[0][0].command, 'POST') + self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_delete(self): + delete_from_gateway(self.address, "my_job") + self.assertEqual(self.requests[0][0].command, 'DELETE') + self.assertEqual(self.requests[0][0].path, '/job/my_job') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'') + + def test_pushadd_with_groupingkey(self): + delete_from_gateway(self.address, "my_job", {'a': 9}) + self.assertEqual(self.requests[0][0].command, 'DELETE') + self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'') + + def test_instance_ip_grouping_key(self): + self.assertTrue('' != instance_ip_grouping_key()['instance']) if __name__ == '__main__':