8000 Added ASGI application (#512) · rayandas/client_python@56a8a53 · GitHub
[go: up one dir, main page]

Skip to content

Commit 56a8a53

Browse files
authored
Added ASGI application (prometheus#512)
* Added ASGI application * Factor out common-functionality for asgi/wsgi * Convert twisted to use WSGIResource * Change default HTTP Server to WSGI Server Signed-off-by: Emil Madsen <sovende@gmail.com>
1 parent ce7063f commit 56a8a53

File tree

8 files changed

+293
-55
lines changed

8 files changed

+293
-55
lines changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,19 @@ from prometheus_client import start_wsgi_server
306306
start_wsgi_server(8000)
307307
```
308308

309+
#### ASGI
310+
311+
To use Prometheus with [ASGI](http://asgi.readthedocs.org/en/latest/), there is
312+
`make_asgi_app` which creates an ASGI application.
313+
314+
```python
315+
from prometheus_client import make_asgi_app
316+
317+
app = make_asgi_app()
318+
```
319+
Such an application can be useful when integrating Prometheus metrics with ASGI
320+
apps.
321+
309322
#### Flask
310323

311324
To use Prometheus with [Flask](http://flask.pocoo.org/) we need to serve metrics through a Prometheus WSGI application. This can be achieved using [Flask's application dispatching](http://flask.pocoo.org/docs/latest/patterns/appdispatch/). Below is a working example.

prometheus_client/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@
2424
generate_latest = exposition.generate_latest
2525
MetricsHandler = exposition.MetricsHandler
2626
make_wsgi_app = exposition.make_wsgi_app
27+
try:
28+
# Python >3.5 only
29+
make_asgi_app = exposition.make_asgi_app
30+
except:
31+
pass
2732
start_http_server = exposition.start_http_server
2833
start_wsgi_server = exposition.start_wsgi_server
2934
write_to_textfile = exposition.write_to_textfile

prometheus_client/asgi.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from urllib.parse import parse_qs
2+
3+
from .exposition import _bake_output
4+
from .registry import REGISTRY
5+
6+
7+
def make_asgi_app(registry=REGISTRY):
8+
"""Create a ASGI app which serves the metrics from a registry."""
9+
10+
async def prometheus_app(scope, receive, send):
11+
assert scope.get("type") == "http"
12+
# Prepare parameters
13+
params = parse_qs(scope.get('query_string', b''))
14+
accept_header = "Accept: " + ",".join([
15+
value.decode("utf8") for (name, value) in scope.get('headers')
16+
if name.decode("utf8") == 'accept'
17+
])
18+
# Bake output
19+
status, header, output = _bake_output(registry, accept_header, params)
20+
# Return output
21+
payload = await receive()
22+
if payload.get("type") == "http.request":
23+
await send(
24+
{
25+
"type": "http.response.start",
26+
"status": int(status.split(' ')[0]),
27+
"headers": [
28+
tuple(x.encode('utf8') for x in header)
29+
]
30+
}
31+
)
32+
await send({"type": "http.response.body", "body": output})
33+
34+
return prometheus_app

prometheus_client/exposition.py

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import socket
77
import sys
88
import threading
9-
from wsgiref.simple_server import make_server, WSGIRequestHandler
9+
from wsgiref.simple_server import make_server, WSGIServer, WSGIRequestHandler
1010

1111
from .openmetrics import exposition as openmetrics
1212
from .registry import REGISTRY
@@ -31,20 +31,27 @@
3131
PYTHON26_OR_OLDER = sys.version_info < (2, 7)
3232
PYTHON376_OR_NEWER = sys.version_info > (3, 7, 5)
3333

34+
35+
def _bake_output(registry, accept_header, params):
36+
"""Bake output for metrics output."""
37+
encoder, content_type = choose_encoder(accept_header)
38+
if 'name[]' in params:
39+
registry = registry.restricted_registry(params['name[]'])
40+
output = encoder(registry)
41+
return str('200 OK'), (str('Content-Type'), content_type), output
42+
43+
3444
def make_wsgi_app(registry=REGISTRY):
3545
"""Create a WSGI app which serves the metrics from a registry."""
3646

3747
def prometheus_app(environ, start_response):
48+
# Prepare parameters
49+
accept_header = environ.get('HTTP_ACCEPT')
3850
params = parse_qs(environ.get('QUERY_STRING', ''))
39-
r = registry
40-
encoder, content_type = choose_encoder(environ.get('HTTP_ACCEPT'))
41-
if 'name[]' in params:
42-
r = r.restricted_registry(params['name[]'])
43-
output = encoder(r)
44-
45-
status = str('200 OK')
46-
headers = [(str('Content-type'), content_type)]
47-
start_response(status, headers)
51+
# Bake output
52+
status, header, output = _bake_output(registry, accept_header, params)
53+
# Return output
54+
start_response(status, [header])
4855
return [output]
4956

5057
return prometheus_app
@@ -57,15 +64,26 @@ def log_message(self, format, *args):
5764
"""Log nothing."""
5865

5966

67+
class ThreadingWSGIServer(ThreadingMixIn, WSGIServer):
68+
"""Thread per request HTTP server."""
69+
# Make worker threads "fire and forget". Beginning with Python 3.7 this
70+
# prevents a memory leak because ``ThreadingMixIn`` starts to gather all
71+
# non-daemon threads in a list in order to join on them at server close.
72+
daemon_threads = True
73+
74+
6075
def start_wsgi_server(port, addr='', registry=REGISTRY):
6176
"""Starts a WSGI server for prometheus metrics as a daemon thread."""
6277
app = make_wsgi_app(registry)
63-
httpd = make_server(addr, port, app, handler_class=_SilentHandler)
78+
httpd = make_server(addr, port, app, ThreadingWSGIServer, handler_class=_SilentHandler)
6479
t = threading.Thread(target=httpd.serve_forever)
6580
t.daemon = True
6681
t.start()
6782

6883

84+
start_http_server = start_wsgi_server
85+
86+
6987
def generate_latest(registry=REGISTRY):
7088
"""Returns the metrics from the registry in latest text format as a string."""
7189

@@ -143,18 +161,15 @@ class MetricsHandler(BaseHTTPRequestHandler):
143161
registry = REGISTRY
144162

145163
def do_GET(self):
164+
# Prepare parameters
146165
registry = self.registry
166+
accept_header = self.headers.get('Accept')
147167
params = parse_qs(urlparse(self.path).query)
148-
encoder, content_type = choose_encoder(self.headers.get('Accept'))
149-
if 'name[]' in params:
150-
registry = registry.restricted_registry(params['name[]'])
151-
try:
152-
output = encoder(registry)
153-
except:
154-
self.send_error(500, 'error generating metric output')
155-
raise
156-
self.send_response(200)
157-
self.send_header('Content-Type', content_type)
168+
# Bake output
169+
status, header, output = _bake_output( F987 registry, accept_header, params)
170+
# Return output
171+
self.send_response(int(status.split(' ')[0]))
172+
self.send_header(*header)
158173
self.end_headers()
159174
self.wfile.write(output)
160175

@@ -177,25 +192,6 @@ def factory(cls, registry):
177192
return MyMetricsHandler
178193

179194

180-
class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
181-
"""Thread per request HTTP server."""
182-
# Make worker threads "fire and forget". Beginning with Python 3.7 this
183-
# prevents a memory leak because ``ThreadingMixIn`` starts to gather all
184-
# non-daemon threads in a list in order to join on them at server close.
185-
# Enabling daemon threads virtually makes ``_ThreadingSimpleServer`` the
186-
# same as Python 3.7's ``ThreadingHTTPServer``.
187-
daemon_threads = True
188-
189-
190-
def start_http_server(port, addr='', registry=REGISTRY):
191-
"""Starts an HTTP server for prometheus metrics as a daemon thread"""
192-
CustomMetricsHandler = MetricsHandler.factory(registry)
193-
httpd = _ThreadingSimpleServer((addr, port), CustomMetricsHandler)
194-
t = threading.Thread(target=httpd.serve_forever)
195-
t.daemon = True
196-
t.start()
197-
198-
199195
def write_to_textfile(path, registry):
200196
"""Write metrics to the given path.
201197
@@ -378,3 +374,10 @@ def instance_ip_grouping_key():
378374
with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
379375
s.connect(('localhost', 0))
380376
return {'instance': s.getsockname()[0]}
377+
378+
379+
try:
380+
# Python >3.5 only
381+
from .asgi import make_asgi_app
382+
except:
383+
pass
Lines changed: 5 additions & 15 deletions
F438
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,10 @@
11
from __future__ import absolute_import, unicode_literals
22

3-
from twisted.web.resource import Resource
3+
from twisted.web.wsgi import WSGIResource
4+
from twisted.internet import reactor
45

56
from .. import exposition, REGISTRY
67

7-
8-
class MetricsResource(Resource):
9-
"""
10-
Twisted ``Resource`` that serves prometheus metrics.
11-
"""
12-
isLeaf = True
13-
14-
def __init__(self, registry=REGISTRY):
15-
self.registry = registry
16-
17-
def render_GET(self, request):
18-
encoder, content_type = exposition.choose_encoder(request.getHeader('Accept'))
19-
request.setHeader(b'Content-Type', content_type.encode('ascii'))
20-
return encoder(self.registry)
8+
MetricsResource = lambda registry=REGISTRY: WSGIResource(
9+
reactor, reactor.getThreadPool(), exposition.make_wsgi_app(registry)
10+
)

tests/test_asgi.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
from __future__ import absolute_import, unicode_literals
2+
3+
import sys
4+
from unittest import TestCase
5+
6+
from prometheus_client import CollectorRegistry, Counter, generate_latest
7+
from prometheus_client.exposition import CONTENT_TYPE_LATEST
8+
9+
if sys.version_info < (2, 7):
10+
from unittest2 import skipUnless
11+
else:
12+
from unittest import skipUnless
13+
14+
try:
15+
# Python >3.5 only
16+
from prometheus_client import make_asgi_app
17+
import asyncio
18+
from asgiref.testing import ApplicationCommunicator
19+
HAVE_ASYNCIO_AND_ASGI = True
20+
except ImportError:
21+
HAVE_ASYNCIO_AND_ASGI = False
22+
23+
24+
def setup_testing_defaults(scope):
25+
scope.update(
26+
{
27+
"client": ("127.0.0.1", 32767),
28+
"headers": [],
29+
"http_version": "1.0",
30+
"method": "GET",
31+
"path": "/",
32+
"query_string": b"",
33+
"scheme": "http",
34+
"server": ("127.0.0.1", 80),
35+
"type": "http",
36+
}
37+
)
38+
39+
40+
class ASGITest(TestCase):
41+
@skipUnless(HAVE_ASYNCIO_AND_ASGI, "Don't have asyncio/asgi installed.")
42+
def setUp(self):
43+
self.registry = CollectorRegistry()
44+
self.captured_status = None
45+
self.captured_headers = None
46+
# Setup ASGI scope
47+
self.scope = {}
48+
setup_testing_defaults(self.scope)
49+
self.communicator = None
50+
51+
def tearDown(self):
52+
if self.communicator:
53+
asyncio.get_event_loop().run_until_complete(
54+
self.communicator.wait()
55+
)
56+
57+
def seed_app(self, app):
58+
self.communicator = ApplicationCommunicator(app, self.scope)
59+
60+
def send_input(self, payload):
61+
asyncio.get_event_loop().run_until_complete(
62+
self.communicator.send_input(payload)
63+
)
64+
65+
def send_default_request(self):
66+
self.send_input({"type": "http.request", "body": b""})
67+
68+
def get_output(self):
69+
output = asyncio.get_event_loop().run_until_complete(
70+
self.communicator.receive_output(0)
71+
)
72+
return output
73+
74+
def get_all_output(self):
75+
outputs = []
76+
while True:
77+
try:
78+
outputs.append(self.get_output())
79+
except asyncio.TimeoutError:
80+
break
81+
return outputs
82+
83+
def validate_metrics(self, metric_name, help_text, increments):
84+
"""
85+
ASGI app serves the metrics from the provided registry.
86+
"""
87+
c = Counter(metric_name, help_text, registry=self.registry)
88+
for _ in range(increments):
89+
c.inc()
90+
# Create and run ASGI app
91+
app = make_asgi_app(self.registry)
92+
self.seed_app(app)
93+
self.send_default_request()
94+
# Assert outputs
95+
outputs = self.get_all_output()
96+
# Assert outputs
97+
self.assertEqual(len(outputs), 2)
98+
response_start = outputs[0]
99+
self.assertEqual(response_start['type'], 'http.response.start')
100+
response_body = outputs[1]
101+
self.assertEqual(response_body['type'], 'http.response.body')
102+
# Status code
103+
self.assertEqual(response_start['status'], 200)
104+
# Headers
105+
self.assertEqual(len(response_start['headers']), 1)
106+
self.assertEqual(response_start['headers'][0], (b"Content-Type", CONTENT_TYPE_LATEST.encode('utf8')))
107+
# Body
108+
output = response_body['body'].decode('utf8')
109+
self.assertIn("# HELP " + metric_name + "_total " + help_text + "\n", output)
110+
self.assertIn("# TYPE " + metric_name + "_total counter\n", output)
111+
self.assertIn(metric_name + "_total " + str(increments) + ".0\n", output)
112+
113+
def test_report_metrics_1(self):
114+
self.validate_metrics("counter", "A counter", 2)
115+
116+
def test_report_metrics_2(self):
117+
self.validate_metrics("counter", "Another counter", 3)
118+
119+
def test_report_metrics_3(self):
120+
self.validate_metrics("requests", "Number of requests", 5)
121+
122+
def test_report_metrics_4(self):
123+
self.validate_metrics("failed_requests", "Number of failed requests", 7)

0 commit comments

Comments
 (0)
0