8000 Add multi-process support. · alex499/prometheus-client-python@c1e47a4 · GitHub
[go: up one dir, main page]

Skip to content

Commit c1e47a4

Browse files
committed
Add multi-process support.
This works by having a shelve per process that are continously updated with metrics, and a collector that reads from them. Histogram buckets need accumulation and special handling for _count, and for gauges we need to offer a number of options.
1 parent 31d671b commit c1e47a4

File tree

4 files changed

+342
-9
lines changed

4 files changed

+342
-9
lines changed

README.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,74 @@ REGISTRY.register(CustomCollector())
321321

322322
`SummaryMetricFamily` and `HistogramMetricFamily` work similarly.
323323

324+
## Multiprocess Mode (Gunicorn)
325+
326+
**Experimental: This feature is new and has rough edges.**
327+
328+
Prometheus client libaries presume a threaded model, where metrics are shared
329+
across workers. This doesn't work so well for languages such as Python where
330+
it's common to have processes rather than threads to handle large workloads.
331+
332+
To handle this the client library can be put in multiprocess mode.
333+
This comes with a number of limitations:
334+
335+
- Registries can not be used as normal, all instantiated metrics are exported
336+
- Custom collectors do not work (e.g. cpu and memory metrics)
337+
- The pushgateway cannot be used
338+
- Gauges cannot use the `pid` label
339+
- Gunicron's `preload_app` feature is not supported
340+
341+
There's several steps to getting this working:
342+
343+
**One**: Gunicorn deployment
344+
345+
The `prometheus_multiproc_dir` environment variable must be set to a directory
346+
that the client library can use for metrics. This directory must be wiped
347+
between Gunicorn runs (before startup is recommended).
348+
349+
Put the following in the config file:
350+
```python
351+
def worker_exit(server, worker):
352+
from prometheus_client import multiprocess
353+
multiprocess.mark_process_dead(worker.pid)
354+
```
355+
356+
**Two**: Inside the application
357+
```python
358+
from prometheus_client import multiprocess
359+
from prometheus_client import generate_latest, CollectorRegistry, CONTENT_TYPE_LATEST, Gauge
360+
361+
# Example gauge.
362+
IN_PROGRESS = Gauge("inprogress_requests", "help", multiprocess_mode='livesum')
363+
364+
365+
# Expose metrics.
366+
@IN_PROGRESS.track_inprogress()
367+
def app(environ, start_response):
368+
registry = CollectorRegistry()
369+
multiprocess.MultiProcessCollector(registry)
370+
data = generate_latest(registry)
371+
status = '200 OK'
372+
response_headers = [
373+
('Content-type', CONTENT_TYPE_LATEST),
374+
('Content-Length', str(len(data)))
375+
]
376+
start_response(status, response_headers)
377+
return iter([data])
378+
```
379+
380+
**Three**: Instrumentation
381+
382+
Counters, Summarys and Histograms work as normal.
383+
384+
Gauges have several modes they can run in, which can be selected with the
385+
`multiprocess_mode` parameter.
386+
387+
- 'all': Default. Return a timeseries per process alive or dead.
388+
- 'liveall': Return a timeseries per process that is still alive.
389+
- 'livesum': Return a single timeseries that is the sum of the values of alive processes.
390+
- 'max': Return a single timeseries that is the maximum of the values of all processes, alive or dead.
391+
- 'min': Return a single timeseries that is the minimum of the values of all processes, alive or dead.
324392

325393
## Parser
326394

prometheus_client/core.py

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
import copy
66
import math
7+
import json
8+
import os
79
import re
10+
import shelve
811
import time
912
import types
1013

@@ -219,7 +222,9 @@ def add_metric(self, labels, buckets, sum_value):
219222
class _MutexValue(object):
220223
'''A float protected by a mutex.'''
221224

222-
def __init__(self, name, labelnames, labelvalues):
225+
_multiprocess = False
226+
227+
def __init__(self, typ, metric_name, name, labelnames, labelvalues, **kwargs):
223228
self._value = 0.0
224229
self._lock = Lock()
225230

@@ -235,7 +240,60 @@ def get(self):
235240
with self._lock:
236241
return self._value
237242

238-
_ValueClass = _MutexValue
243+
244+
def _MultiProcessValue(__pid=os.getpid()):
245+
pid = __pid
246+
samples = {}
247+
samples_lock = Lock()
248+
249+
class _ShelveValue(object):
250+
'''A float protected by a mutex backed by a per-process shelve.'''
251+
252+
_multiprocess = True
253+
254+
def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs):
255+
with samples_lock:
256+
if typ == 'gauge':
257+
file_prefix = typ + '_' + multiprocess_mode
258+
else:
259+
file_prefix = typ
260+
if file_prefix not in samples:
261+
filename = os.path.join(os.environ['prometheus_multiproc_dir'], '{0}_{1}.db'.format(file_prefix, pid))
262+
samples[file_prefix] = shelve.open(filename)
263+
self._samples = samples[file_prefix]
264+
self._key = json.dumps((metric_name, name, labelnames, labelvalues))
265+
self._value = self._samples.get(self._key, 0.0)
266+
self._samples[self._key] = self._value
267+
self._samples.sync()
268+
self._lock = Lock()
269+
270+
def inc(self, amount):
271+
with self._lock:
272+
self._value += amount
273+
self._samples[self._key] = self._value
274+
self._samples.sync()
275+
276+
def set(self, value):
277+
with self._lock:
278+
self._value = value
279+
self._samples[self._key] = self._value
280+
self._samples.sync()
281+
282+
def get(self):
283+
with self._lock:
284+
return self._value
285+
286+
return _ShelveValue
287+
288+
289+
# Should we enable multi-process mode?
290+
# This needs to be chosen before the first metric is constructed,
291+
# and as that may be in some arbitrary library the user/admin has
292+
# no control over we use an enviroment variable.
293+
if 'prometheus_multiproc_dir' in os.environ:
294+
_ValueClass = _MultiProcessValue()
295+
else:
296+
_ValueClass = _MutexValue
239297

240298

241299
class _LabelWrapper(object):
@@ -383,7 +441,7 @@ def f():
383441
_reserved_labelnames = []
384442

385443
def __init__(self, name, labelnames, labelvalues):
386-
self._value = _ValueClass(name, labelnames, labelvalues)
444+
self._value = _ValueClass(self._type, name, name, labelnames, labelvalues)
387445

388446
def inc(self, amount=1):
389447
'''Increment counter by the given amount.'''
@@ -464,8 +522,12 @@ def f():
464522
_type = 'gauge'
465523
_reserved_labelnames = []
466524

467-
def __init__(self, name, labelnames, labelvalues):
468-
self._value = _ValueClass(name, labelnames, labelvalues)
525+
def __init__(self, name, labelnames, labelvalues, multiprocess_mode='all'):
526+
if (_ValueClass._multiprocess
527+
and multiprocess_mode not in ['min', 'max', 'livesum', 'liveall', 'all']):
528+
raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode)
529+
self._value = _ValueClass(self._type, name, name, labelnames,
530+
labelvalues, multiprocess_mode=multiprocess_mode)
469531

470532
def inc(self, amount=1):
471533
'''Increment gauge by the given amount.'''
@@ -585,8 +647,8 @@ def create_response(request):
585647
_reserved_labelnames = ['quantile']
586648

587649
def __init__(self, name, labelnames, labelvalues):
588-
self._count = _ValueClass(name + '_count', labelnames, labelvalues)
589-
self._sum = _ValueClass(name + '_sum', labelnames, labelvalues)
650+
self._count = _ValueClass(self._type, name, name + '_count', labelnames, labelvalues)
651+
self._sum = _ValueClass(self._type, name, name + '_sum', labelnames, labelvalues)
590652

591653
def observe(self, amount):
592654
'''Observe the given amount.'''
@@ -678,7 +740,7 @@ def create_response(request):
678740
_reserved_labelnames = ['histogram']
679741

680742
def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, _INF)):
681-
self._sum = _ValueClass(name + '_sum', labelnames, labelvalues)
743+
self._sum = _ValueClass(self._type, name, name + '_sum', labelnames, labelvalues)
682744
buckets = [float(b) for b in buckets]
683745
if buckets != sorted(buckets):
684746
# This is probably an error on the part of the user,
@@ -692,7 +754,7 @@ def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05,
692754
self._buckets = []
693755
bucket_labelnames = labelnames + ('le',)
694756
for b in buckets:
695-
self._buckets.append(_ValueClass(name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),)))
757+
self._buckets.append(_ValueClass(self._type, name, name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),)))
696758

697759
def observe(self, amount):
698760
'''Observe the given amount.'''

prometheus_client/multiprocess.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
#!/usr/bin/python
2+
3+
from __future__ import unicode_literals
4+
5+
import glob
6+
import json
7+
import os
8+
import shelve
9+
10 F43B +
from . import core
11+
12+
class MultiProcessCollector(object):
13+
"""Collector for files for multi-process mode."""
14+
def __init__(self, registry, path=os.environ.get('prometheus_multiproc_dir')):
15+
self._path = path
16+
if registry:
17+
registry.register(self)
18+
19+
def collect(self):
20+
metrics = {}
21+
for f in glob.glob(os.path.join(self._path, '*.db')):
22+
parts = os.path.basename(f).split('_')
23+
typ = parts[0]
24+
for key, value in shelve.open(f).items():
25+
metric_name, name, labelnames, labelvalues = json.loads(key)
26+
metrics.setdefault(metric_name, core.Metric(metric_name, 'Multiprocess metric', typ))
27+
metric = metrics[metric_name]
28+
if typ == 'gauge':
29+
pid = parts[2][:-3]
30+
metric._multiprocess_mode = parts[1]
31+
metric.add_sample(name, tuple(zip(labelnames, labelvalues)) + (('pid', pid), ), value)
32+
else:
33+
# The deplucates and labels are fixed in the next for.
34+
metric.add_sample(name, tuple(zip(labelnames, labelvalues)), value)
35+
36+
for metric in metrics.values():
37+
samples = {}
38+
buckets = {}
39+
for name, labels, value in metric.samples:
40+
if metric.type == 'gauge':
41+
without_pid = tuple([l for l in labels if l[0] != 'pid'])
42+
if metric._multiprocess_mode == 'min':
43+
samples.setdefault((name, without_pid), value)
44+
if samples[(name, without_pid)] > value:
45+
samples[(name, without_pid)] = value
46+
elif metric._multiprocess_mode == 'max':
47+
samples.setdefault((name, without_pid), value)
48+
if samples[(name, without_pid)] < value:
49+
samples[(name, without_pid)] = value
50+
elif metric._multiprocess_mode == 'livesum':
51+
samples.setdefault((name, without_pid), 0.0)
52+
samples[(name, without_pid)] += value
53+
else: # all/liveall
54+
samples[(name, labels)] = value
55+
elif metric.type == 'histogram':
56+
bucket = [float(l[1]) for l in labels if l[0] == 'le']
57+
if bucket:
58+
# _bucket
59+
without_le = tuple([l for l in labels if l[0] != 'le'])
60+
buckets.setdefault(without_le, {})
61+
buckets[without_le].setdefault(bucket[0], 0.0)
62+
buckets[without_le][bucket[0]] += value
63+
else:
64+
# _sum
65+
samples.setdefault((name, labels), 0.0)
66+
samples[(name, labels)] += value
67+
else:
68+
# Counter and Summary.
69+
samples.setdefault((name, labels), 0.0)
70+
samples[(name, labels)] += value
71+
72+
73+
# Accumulate bucket values.
74+
if metric.type == 'histogram':
75+
for labels, values in buckets.items():
76+
acc = 0.0
77+
for bucket, value in sorted(values.items()):
78+
acc += value
79+
samples[(metric.name + '_bucket', labels + (('le', core._floatToGoString(bucket)), ))] = acc
80+
samples[(metric.name + '_count', labels)] = acc
81+
82+
# Convert to correct sample format.
83+
metric.samples = [(name, dict(labels), value) for (name, labels), value in samples.items()]
84+
return metrics.values()
85+
86+
87+
def mark_process_dead(pid, path=os.environ.get('prometheus_multiproc_dir')):
88+
"""Do bookkeeping for when one process dies in a multi-process setup."""
89+
for f in glob.glob(os.path.join(path, 'gauge_livesum_{0}.db'.format(pid))):
90+
os.remove(f)
91+
for f in glob.glob(os.path.join(path, 'gauge_liveall_{0}.db'.format(pid))):
92+
os.remove(f)

0 commit comments

Comments
 (0)
0