8000 Merge pull request #33 from brian-brazil/mm · QuantumGhost/client_python@8d40dfd · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 8d40dfd

Browse files
committed
Merge pull request prometheus#33 from brian-brazil/mm
Add pushgateway support
2 parents 93654f3 + 3632050 commit 8d40dfd

File tree

4 files changed

+173
-0
lines changed

4 files changed

+173
-0
lines changed

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,33 @@ write_to_textfile('/configured/textfile/path/raid.prom', registry)
246246
A separate registry is used, as the default registry may contain other metrics
247247
such as those from the Process Collector.
248248

249+
## Exporting to a Pushgateway
250+
251+
The [Pushgateway](https://github.com/prometheus/pushgateway)
252+
allows ephemeral and batch jobs to expose their metrics to Prometheus.
253+
254+
```python
255+
from prometheus_client import CollectorRegistry,Gauge,push_to_gateway
256+
registry = CollectorRegistry()
257+
g = Gauge('job_last_success_unixtime', 'Last time a batch job successfully finished', registry=registry)
258+
g.set_to_current_time()
259+
push_to_gateway('localhost:9091', job='batchA', registry=registry)
260+
```
261+
262+
A separate registry is used, as the default registry may contain other metrics
263+
such as those from the Process Collector.
264+
265+
Pushgateway functions take a grouping key. `push_to_gateway` replaces metrics
266+
with the same grouping key, `pushadd_to_gateway` only replaces metrics with the
267+
same name and grouping key and `delete_from_gateway` deletes metrics with the
268+
given job and grouping key. See the
269+
[Pushgateway documentation](https://github.com/prometheus/pushgateway/blob/master/README.md)
270+
for more information.
271+
272+
`instance_ip_grouping_key` returns a grouping key with the instance label set
273+
to the host's IP address.
274+
275+
249276
## Bridges
250277

251278
It is also possible to expose metrics to systems other than Prometheus.

prometheus_client/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
MetricsHandler = exposition.MetricsHandler
2222
start_http_server = exposition.start_http_server
2323
write_to_textfile = exposition.write_to_textfile
24+
push_to_gateway = exposition.push_to_gateway
25+
pushadd_to_gateway = exposition.pushadd_to_gateway
26+
delete_from_gateway = exposition.delete_from_gateway
27+
instance_ip_grouping_key = exposition.instance_ip_grouping_key
2428

2529
ProcessCollector = process_collector.ProcessCollector
2630
PROCESS_COLLECTOR = process_collector.PROCESS_COLLECTOR

prometheus_client/exposition.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,23 @@
33
from __future__ import unicode_literals
44

55
import os
6+
import socket
67
import time
78
import threading
89

910
from . import core
1011
try:
1112
from BaseHTTPServer import BaseHTTPRequestHandler
1213
from BaseHTTPServer import HTTPServer
14+
from urllib2 import build_opener, Request, HTTPHandler
15+
from urllib import quote_plus
1316
except ImportError:
1417
# Python 3
1518
unicode = str
1619
from http.server import BaseHTTPRequestHandler
1720
from http.server import HTTPServer
21+
from urllib.request import build_opener, Request, HTTPHandler
22+
from urllib.parse import quote_plus
1823

1924

2025
CONTENT_TYPE_LATEST = 'text/plain; version=0.0.4; charset=utf-8'
@@ -72,3 +77,54 @@ def write_to_textfile(path, registry):
7277
f.write(generate_latest(registry))
7378
# rename(2) is atomic.
7479
os.rename(tmppath, path)
80+
81+
82+
def push_to_gateway(gateway, job, registry=core.REGISTRY, grouping_key=None, timeout=None):
83+
'''Push metrics to the given pushgateway.
84+
85+
This overwrites all metrics with the same job and grouping_key.
86+
This uses the PUT HTTP method.'''
87+
_use_gateway('PUT', gateway, job, registry, grouping_key, timeout)
88+
89+
90+
def pushadd_to_gateway(gateway, job, registry=core.REGISTRY, grouping_key=None, timeout=None):
91+
'''PushAdd metrics to the given pushgateway.
92+
93+
This replaces metrics with the same name, job and grouping_key.
94+
This uses the POST HTTP method.'''
95+
_use_gateway('POST', gateway, job, registry, grouping_key, timeout)
96+
97+
98+
def delete_from_gateway(gateway, job, grouping_key=None, timeout=None):
99+
'''Delete metrics from the given pushgateway.
100+
101+
This deletes metrics with the given job and grouping_key.
102+
This uses the DELETE HTTP method.'''
103+
_use_gateway('DELETE', gateway, job, None, grouping_key, timeout)
104+
105+
106+
def _use_gateway(method, gateway, job, registry, grouping_key, timeout):
107+
url = 'http://{0}/job/{1}'.format(gateway, quote_plus(job))
108+
109+
data = b''
110+
if method != 'DELETE':
111+
data = generate_latest(registry)
112+
113+
if grouping_key is None:
114+
grouping_key = {}
115+
url = url + ''.join(['/{0}/{1}'.format(quote_plus(str(k)), quote_plus(str(v)))
116+
for k, v in sorted(grouping_key.items())])
117+
118+
request = Request(url, data=data)
119+
request.add_header('Content-Type', CONTENT_TYPE_LATEST)
120+
request.get_method = lambda: method
121+
resp = build_opener(HTTPHandler).open(request, timeout=timeout)
122+
if resp.code >= 400:
123+
raise IOError("error talking to pushgateway: {0} {1}".format(
124+
resp.code, resp.msg))
125+
126+
def instance_ip_grouping_key():
127+
'''Grouping key with instance set to the IP Address of this host.'''
128+
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
129+
s.connect(('', 0))
130+
return {'instance': s.getsockname()[0]}

tests/test_client.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
11
from __future__ import unicode_literals
22
import os
3+
import threading
34
import unittest
45

56

67
from prometheus_client import Gauge, Counter, Summary, Histogram, Metric
78
from prometheus_client import CollectorRegistry, generate_latest, ProcessCollector
9+
from prometheus_client import push_to_gateway, pushadd_to_gateway, delete_from_gateway
10+
from prometheus_client import CONTENT_TYPE_LATEST, instance_ip_grouping_key
11+
12+
try:
13+
from BaseHTTPServer import BaseHTTPRequestHandler
14+
from BaseHTTPServer import HTTPServer
15+
except ImportError:
16+
# Python 3
17+
from http.server import BaseHTTPRequestHandler
18+
from http.server import HTTPServer
19+
820

921

1022
class TestCounter(unittest.TestCase):
@@ -372,6 +384,80 @@ def test_working_fake_pid(self):
372384
self.assertEqual(None, self.registry.get_sample_value('process_fake_namespace'))
373385

374386

387+
class TestPushGateway(unittest.TestCase):
388+
def setUp(self):
389+
self.registry = CollectorRegistry()
390+
self.counter = Gauge('g', 'help', registry=self.registry)
391+
self.requests = requests = []
392+
class TestHandler(BaseHTTPRequestHandler):
393+
def do_PUT(self):
394+
self.send_response(201)
395+
length = int(self.headers['content-length'])
396+
requests.append((self, self.rfile.read(length)))
397+
398+
do_POST = do_PUT
399+
do_DELETE = do_PUT
400+
401+
httpd = HTTPServer(('', 0), TestHandler)
402+
self.address = ':'.join([str(x) for x in httpd.server_address])
403+
class TestServer(threading.Thread):
404+
def run(self):
405+
httpd.handle_request()
406+
self.server = TestServer()
407+
self.server.daemon = True
408+
self.server.start()
409+
410+
def test_push(self):
411+
push_to_gateway(self.address, "my_job", self.registry)
412+
self.assertEqual(self.requests[0][0].command, 'PUT')
413+
self.assertEqual(self.requests[0][0].path, '/job/my_job')
414+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
415+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
416+
417+
def test_push_with_groupingkey(self):
418+
push_to_gateway(self.address, "my_job", self.registry, {'a': 9})
419+
self.assertEqual(self.requests[0][0].command, 'PUT')
420+
self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9')
421+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
422+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
423+
424+
def test_push_with_complex_groupingkey(self):
425+
push_to_gateway(self.address, "my_job", self.registry, {'a': 9, 'b': 'a/ z'})
426+
self.assertEqual(self.requests[0][0].command, 'PUT')
427+
self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9/b/a%2F+z')
428+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
429+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
430+
431+
def test_pushadd(self):
432+
pushadd_to_gateway(self.address, "my_job", self.registry)
433+
self.assertEqual(self.requests[0][0].command, 'POST')
434+
self.assertEqual(self.requests[0][0].path, '/job/my_job')
435+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
436+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
437+
438+
def test_pushadd_with_groupingkey(self):
439+
pushadd_to_gateway(self.address, "my_job", self.registry, {'a': 9})
440+
self.assertEqual(self.requests[0][0].command, 'POST')
441+
self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9')
442+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
443+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
444+
445+
def test_delete(self):
446+
delete_from_gateway(self.address, "my_job")
447+
self.assertEqual(self.requests[0][0].command, 'DELETE')
448+
self.assertEqual(self.requests[0][0].path, '/job/my_job')
449+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
450+
self.assertEqual(self.requests[0][1], b'')
451+
452+
def test_pushadd_with_groupingkey(self):
453+
delete_from_gateway(self.address, "my_job", {'a': 9})
454+
self.assertEqual(self.requests[0][0].command, 'DELETE')
455+
self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9')
456+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
457+
self.assertEqual(self.requests[0][1], b'')
458+
459+
def test_instance_ip_grouping_key(self):
460+
self.assertTrue('' != instance_ip_grouping_key()['instance'])
375461

376462

377463
if __name__ == '__main__':

0 commit comments

Comments
 (0)
0