8000 Merge pull request #33 from brian-brazil/mm · rmohr/client_python@8d40dfd · GitHub
[go: up one dir, main page]

Skip to content

Commit 8d40dfd

Browse files
committed
Merge pull request prometheus#33 from brian-brazil/mm
Add pushgateway support
2 parents 93654f3 + 3632050 commit 8d40dfd

File tree

4 files changed

+173
-0
lines changed

4 files changed

+173
-0
lines changed

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,33 @@ write_to_textfile('/configured/textfile/path/raid.prom', registry)
246246
A separate registry is used, as the default registry may contain other metrics
247247
such as those from the Process Collector.
248248

249+
## Exporting to a Pushgateway
250+
251+
The [Pushgateway](https://github.com/prometheus/pushgateway)
252+
allows ephemeral and batch jobs to expose their metrics to Prometheus.
253+
254+
```python
255+
from prometheus_client import CollectorRegistry,Gauge,push_to_gateway
256+
registry = CollectorRegistry()
257+
g = Gauge('job_last_success_unixtime', 'Last time a batch job successfully finished', registry=registry)
258+
g.set_to_current_time()
259+
push_to_gateway('localhost:9091', job='batchA', registry=registry)
260+
```
261+
262+
A separate registry is used, as the default registry may contain other metrics
263+
such as those from the Process Collector.
264+
265+
Pushgateway functions take a grouping key. `push_to_gateway` replaces metrics
266+
with the same grouping key, `pushadd_to_gateway` only replaces metrics with the
267+
same name and grouping key and `delete_from_gateway` deletes metrics with the
268+
given job and grouping key. See the
269+
[Pushgateway documentation](https://github.com/prometheus/pushgateway/blob/master/README.md)
270+
for more information.
271+
272+
`instance_ip_grouping_key` returns a grouping key with the instance label set
273+
to the host's IP address.
274+
275+
249276
## Bridges
250277

251278
It is also possible to expose metrics to systems other than Prometheus.

prometheus_client/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
MetricsHandler = exposition.MetricsHandler
2222
start_http_server = exposition.start_http_server
2323
write_to_textfile = exposition.write_to_textfile
24+
push_to_gateway = exposition.push_to_gateway
25+
pushadd_to_gateway = exposition.pushadd_to_gateway
26+
delete_from_gateway = exposition.delete_from_gateway
27+
instance_ip_grouping_key = exposition.instance_ip_grouping_key
2428

2529
ProcessCollector = process_collector.ProcessCollector
2630
PROCESS_COLLECTOR = process_collector.PROCESS_COLLECTOR

prometheus_client/exposition.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,23 @@
33
from __future__ import unicode_literals
44

55
import os
6+
import socket
67
import time
78
import threading
89

910
from . import core
1011
try:
1112
from BaseHTTPServer import BaseHTTPRequestHandler
1213
from BaseHTTPServer import HTTPServer
14+
from urllib2 import build_opener, Request, HTTPHandler
15+
from urllib import quote_plus
1316
except ImportError:
1417
# Python 3
1518
unicode = str
1619
from http.server import BaseHTTPRequestHandler
1720
from http.server import HTTPServer
21+
from urllib.request import build_opener, Request, HTTPHandler
22+
from urllib.parse import quote_plus
1823

1924

2025
CONTENT_TYPE_LATEST = 'text/plain; version=0.0.4; charset=utf-8'
@@ -72,3 +77,54 @@ def write_to_textfile(path, registry):
7277
f.write(generate_latest(registry))
7378
# rename(2) is atomic.
7479
os.rename(tmppath, path)
80+
81+
82+
def push_to_gateway(gateway, job, registry=core.REGISTRY, grouping_key=None, timeout=None):
83+
'''Push metrics to the given pushgateway.
84+
85+
This overwrites all metrics with the same job and grouping_key.
86+
This uses the PUT HTTP method.'''
87+
_use_gateway('PUT', gateway, job, registry, grouping_key, timeout)
88+
89+
90+
def pushadd_to_gateway(gateway, job, registry=core.REGISTRY, grouping_key=None, timeout=None):
91+
'''PushAdd metrics to the given pushgateway.
92+
93+
This replaces metrics with the same name, job and grouping_key.
94+
This uses the POST HTTP method.'''
95+
_use_gateway('POST', gateway, job, registry, grouping_key, timeout)
96+
97+
98+
def delete_from_gateway(gateway, job, grouping_key=None, timeout=None):
99+
'''Delete metrics from the given pushgateway.
100+
101+
This deletes metrics with the given job and grouping_key.
102+
This uses the DELETE HTTP method.'''
103+
_use_gateway('DELETE', gateway, job, None, grouping_key, timeout)
104+
105+
106+
def _use_gateway(method, gateway, job, registry, grouping_key, timeout):
107+
url = 'http://{0}/job/{1}'.format(gateway, quote_plus(job))
108+
109+
data = b''
110+
if method != 'DELETE':
111+
data = generate_latest(registry)
112+
113+
if grouping_key is None:
114+
grouping_key = {}
115+
url = url + ''.join(['/{0}/{1}'.format(quote_plus(str(k)), quote_plus(str(v)))
116+
for k, v in sorted(grouping_key.items())])
117+
118+
request = Request(url, data=data)
119+
request.add_header('Content-Type', CONTENT_TYPE_LATEST)
120+
request.get_method = lambda: method
121+
resp = build_opener(HTTPHandler).open(request, timeout=timeout)
122+
if resp.code >= 400:
123+
raise IOError("error talking to pushgateway: {0} {1}".format(
124+
resp.code, resp.msg))
125+
126+
def instance_ip_grouping_key():
127+
'''Grouping key with instance set to the IP Address of this host.'''
128+
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
129+
s.connect(('', 0))
130+
return {'instance': s.getsockname()[0]}

tests/test_client.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
11
from __future__ import unicode_literals
22
import os
3+
import threading
34
import unittest
45

56

67
from prometheus_client import Gauge, Counter, Summary, Histogram, Metric
78
from prometheus_client import CollectorRegistry, generate_latest, ProcessCollector
9+
from prometheus_client import push_to_gateway, pushadd_to_gateway, delete_from_gateway
10+
from prometheus_client import CONTENT_TYPE_LATEST, instance_ip_grouping_key
11+
12+
try:
13+
from BaseHTTPServer import BaseHTTPRequestHandler
14+
from BaseHTTPServer import HTTPServer
15+
except ImportError:
16+
# Python 3
17+
from http.server import BaseHTTPRequestHandler
18+
from http.server import HTTPServer
19+
820

921

1022
class TestCounter(unittest.TestCase):
@@ -372,6 +384,80 @@ def test_working_fake_pid(self):
372384
self.assertEqual(None, self.registry.get_sample_value('process_fake_namespace'))
373385

374386

387+
class TestPushGateway(unittest.TestCase):
388+
def setUp(self):
389+
self.registry = CollectorRegistry()
390+
self.counter = Gauge('g', 'help', registry=self.registry)
391+
self.requests = requests = []
392+
class TestHandler(BaseHTTPRequestHandler):
393+
def do_PUT(self):
394+
self.send_response(201)
395+
length = int(self.headers['content-length'])
396+
requests.append((self, self.rfile.read(length)))
397+
398+
do_POST = do_PUT
399+
do_DELETE = do_PUT
400+
401+
httpd = HTTPServer(('', 0), TestHandler)
402+
self.address = ':'.join([str(x) for x in httpd.server_address])
403+
class TestServer(threading.Thread):
404+
def run(self):
405+
httpd.handle_request()
406+
self.server = TestServer()
407+
self.server.daemon = True
408+
self.server.start()
409+
410+
def test_push(self):
411+
push_to_gateway(self.address, "my_job", self.registry)
412+
self.assertEqual(self.requests[0][0].command, 'PUT')
413+
self.assertEqual(self.requests[0][0].path, '/job/my_job')
414+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
415+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
416+
417+
def test_push_with_groupingkey(self):
418+
push_to_gateway(self.address, "my_job", self.registry, {'a': 9})
419+
self.assertEqual(self.requests[0][0].command, 'PUT')
420+
self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9')
421+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
422+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
423+
424+
def test_push_with_complex_groupingkey(self):
425+
push_to_gateway(self.address, "my_job", self.registry, {'a': 9, 'b': 'a/ z'})
426+
self.assertEqual(self.requests[0][0].command, 'PUT')
427+
self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9/b/a%2F+z')
428+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
429+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
430+
431+
def test_pushadd(self):
432+
pushadd_to_gateway(self.address, "my_job", self.registry)
433+
self.assertEqual(self.requests[0][0].command, 'POST')
434+
self.assertEqual(self.requests[0][0].path, '/job/my_job')
435+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
436+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
437+
438+
def test_pushadd_with_groupingkey(self):
439+
pushadd_to_gateway(self.address, "my_job", self.registry, {'a': 9})
440+
self.assertEqual(self.requests[0][0].command, 'POST')
441+
self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9')
442+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
443+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
444+
445+
def test_delete(self):
446+
delete_from_gateway(self.address, "my_job")
447+
self.assertEqual(self.requests[0][0].command, 'DELETE')
448+
self.assertEqual(self.requests[0][0].path, '/job/my_job')
449+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
450+
self.assertEqual(self.requests[0][1], b'')
451+
452+
def test_pushadd_with_groupingkey(self):
453+
delete_from_gateway(self.address, "my_job", {'a': 9})
454+
self.assertEqual(self.requests[0][0].command, 'DELETE')
455+
self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9')
456+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
457+
self.assertEqual(self.requests[0][1], b'')
458+
459+
def test_instance_ip_grouping_key(self):
460+
self.assertTrue('' != instance_ip_grouping_key()['instance'])
375461

376462

377463
if __name__ == '__main__':

0 commit comments

Comments
 (0)
0