8000 Add multi-process support. · sonlinux/client_python@d3256c1 · GitHub
[go: up one dir, main page]

Skip to content
8000

Commit d3256c1

Browse files
committed
Add multi-process support.
This works by having a shelve per process that are continously updated with metrics, and a collector that reads from them. Histogram buckets need accumulation and special handling for _count, and for gauges we need to offer a number of options.
1 parent cf91d2c commit d3256c1

File tree

4 files changed

+342
-9
lines changed

4 files changed

+342
-9
lines changed

README.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,74 @@ REGISTRY.register(CustomCollector())
365365

366366
`SummaryMetricFamily` and `HistogramMetricFamily` work similarly.
367367

368+
## Multiprocess Mode (Gunicorn)
369+
370+
**Experimental: This feature is new and has rough edges.**
371+
372+
Prometheus client libaries presume a threaded model, where metrics are shared
373+
across workers. This doesn't work so well for languages such as Python where
374+
it's common to have processes rather than threads to handle large workloads.
375+
376+
To handle this the client library can be put in multiprocess mode.
377+
This comes with a number of limitations:
378+
379+
- Registries can not be used as normal, all instantiated metrics are exported
380+
- Custom collectors do not work (e.g. cpu and memory metrics)
381+
- The pushgateway cannot be used
382+
- Gauges cannot use the `pid` label
383+
- Gunicron's `preload_app` feature is not supported
384+
385+
There's several steps to getting this working:
386+
387+
**One**: Gunicorn deployment
388+
389+
The `prometheus_multiproc_dir` environment variable must be set to a directory
390+
that the client library can use for metrics. This directory must be wiped
391+
between Gunicorn runs (before startup is recommended).
392+
393+
Put the following in the config file:
394+
```python
395+
def worker_exit(server, worker):
396+
from prometheus_client import multiprocess
397+
multiprocess.mark_process_dead(worker.pid)
398+
```
399+
400+
**Two**: Inside the application
401+
```python
402+
from prometheus_client import multiprocess
403+
from prometheus_client import generate_latest, CollectorRegistry, CONTENT_TYPE_LATEST, Gauge
404+
405+
# Example gauge.
406+
IN_PROGRESS = Gauge("inprogress_requests", "help", multiprocess_mode='livesum')
407+
408+
409+
# Expose metrics.
410+
@IN_PROGRESS.track_inprogress()
411+
def app(environ, start_response):
412+
registry = CollectorRegistry()
413+
multiprocess.MultiProcessCollector(registry)
414+
data = generate_latest(registry)
415+
status = '200 OK'
416+
response_headers = [
417+
('Content-type', CONTENT_TYPE_LATEST),
418+
('Content-Length', str(len(data)))
419+
]
420+
start_response(status, response_headers)
421+
return iter([data])
422+
```
423+
424+
**Three**: Instrumentation
425+
426+
Counters, Summarys and Histograms work as normal.
427+
428+
Gauges have several modes they can run in, which can be selected with the
429+
`multiprocess_mode` parameter.
430+
431+
- 'all': Default. Return a timeseries per process alive or dead.
432+
- 'liveall': Return a timeseries per process that is still alive.
433+
- 'livesum': Return a single timeseries that is the sum of the values of alive processes.
434+
- 'max': Return a single timeseries that is the maximum of the values of all processes, alive or dead.
435+
- 'min': Return a single timeseries that is the minimum of the values of all processes, alive or dead.
368436

369437
## Parser
370438

prometheus_client/core.py

Lines changed: 71 additions & 9 deletions
291
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
import copy
66
import math
7+
import json
8+
import os
79
import re
10+
import shelve
811
import types
912
from timeit import default_timer
1013

@@ -220,7 +223,9 @@ def add_metric(self, labels, buckets, sum_value):
220223
class _MutexValue(object):
221224
'''A float protected by a mutex.'''
222225

223-
def __init__(self, name, labelnames, labelvalues):
226+
_multiprocess = False
227+
228+
def __init__(self, typ, metric_name, name, labelnames, labelvalues, **kwargs):
224229
self._value = 0.0
225230
self._lock = Lock()
226231

@@ -236,7 +241,60 @@ def get(self):
236241
with self._lock:
237242
return self._value
238243

239-
_ValueClass = _MutexValue
244+
245+
def _MultiProcessValue(__pid=os.getpid()):
246+
pid = __pid
247+
samples = {}
248+
samples_lock = Lock()
249+
250+
class _ShelveValue(object):
251+
'''A float protected by a mutex backed by a per-process shelve.'''
252+
253+
_multiprocess = True
254+
255+
def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs):
256+
with samples_lock:
257+
if typ == 'gauge':
258+
file_prefix = typ + '_' + multiprocess_mode
259+
else:
260+
file_prefix = typ
261+
if file_prefix not in samples:
262+
filename = os.path.join(os.environ['prometheus_multiproc_dir'], '{0}_{1}.db'.format(file_prefix, pid))
263+
samples[file_prefix] = shelve.open(filename)
264+
self._samples = samples[file_prefix]
265+
self._key = json.dumps((metric_name, name, labelnames, labelvalues))
266+
self._value = self._samples.get(self._key, 0.0)
267+
self._samples[self._key] = self._value
268+
self._samples.sync()
269+
self._lock = Lock()
270+
271+
def inc(self, amount):
272+
with self._lock:
273+
self._value += amount
274+
self._samples[self._key] = self._value
275+
self._samples.sync()
276+
277+
def set(self, value):
278+
with self._lock:
279+
self._value = value
280+
self._samples[self._key] = self._value
281+
self._samples.sync()
282+
283+
def get(self):
284+
with self._lock:
285+
return self._value
286+
287+
return _ShelveValue
288+
289+
290+
# Should we enable multi-process mode?
+
# This needs to be chosen before the first metric is constructed,
292+
# and as that may be in some arbitrary library the user/admin has
293+
# no control over we use an enviroment variable.
294+
if 'prometheus_multiproc_dir' in os.environ:
295+
_ValueClass = _MultiProcessValue()
296+
else:
297+
_ValueClass = _MutexValue
240298

241299

242300
class _LabelWrapper(object):
@@ -387,7 +445,7 @@ def f():
387445
_reserved_labelnames = []
388446

389447
def __init__(self, name, labelnames, labelvalues):
390-
self._value = _ValueClass(name, labelnames, labelvalues)
448+
self._value = _ValueClass(self._type, name, name, labelnames, labelvalues)
391449

392450
def inc(self, amount=1):
393451
'''Increment counter by the given amount.'''
@@ -449,8 +507,12 @@ def f():
449507
_type = 'gauge'
450508
_reserved_labelnames = []
451509

452-
def __init__(self, name, labelnames, labelvalues):
453-
self._value = _ValueClass(name, labelnames, labelvalues)
510+
def __init__(self, name, labelnames, labelvalues, multiprocess_mode='all'):
511+
if (_ValueClass._multiprocess
512+
and multiprocess_mode not in ['min', 'max', 'livesum', 'liveall', 'all']):
513+
raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode)
514+
self._value = _ValueClass(self._type, name, name, labelnames,
515+
labelvalues, multiprocess_mode=multiprocess_mode)
454516

455517
def inc(self, amount=1):
456518
'''Increment gauge by the given amount.'''
@@ -533,8 +595,8 @@ def create_response(request):
533595
_reserved_labelnames = ['quantile']
534596

535597
def __init__(self, name, labelnames, labelvalues):
536-
self._count = _ValueClass(name + '_count', labelnames, labelvalues)
537-
self._sum = _ValueClass(name + '_sum', labelnames, labelvalues)
598+
self._count = _ValueClass(self._type, name, name + '_count', labelnames, labelvalues)
599+
self._sum = _ValueClass(self._type, name, name + '_sum', labelnames, labelvalues)
538600

539601
def observe(self, amount):
540602
'''Observe the given amount.'''
@@ -607,7 +669,7 @@ def create_response(request):
607669
_reserved_labelnames = ['histogram']
608670

609671
def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, _INF)):
610-
self._sum = _ValueClass(name + '_sum', labelnames, labelvalues)
672+
self._sum = _ValueClass(self._type, name, name + '_sum', labelnames, labelvalues)
611673
buckets = [float(b) for b in buckets]
612674
if buckets != sorted(buckets):
613675
# This is probably an error on the part of the user,
@@ -621,7 +683,7 @@ def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05,
621683
self._buckets = []
622684
bucket_labelnames = labelnames + ('le',)
623685
for b in buckets:
624-
self._buckets.append(_ValueClass(name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),)))
686+
self._buckets.append(_ValueClass(self._type, name, name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),)))
625687

626688
def observe(self, amount):
627689
'''Observe the given amount.'''

prometheus_client/multiprocess.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
#!/usr/bin/python
2+
3+
from __future__ import unicode_literals
4+
5+
import glob
6+
import json
7+
import os
8+
import shelve
9+
10+
from . import core
11+
12+
class MultiProcessCollector(object):
13+
"""Collector for files for multi-process mode."""
14+
def __init__(self, registry, path=os.environ.get('prometheus_multiproc_dir')):
15+
self._path = path
16+
if registry:
17+
registry.register(self)
18+
19+
def collect(self):
20+
metrics = {}
21+
for f in glob.glob(os.path.join(self._path, '*.db')):
22+
parts = os.path.basename(f).split('_')
23+
typ = parts[0]
24+
for key, value in shelve.open(f).items():
25+
metric_name, name, labelnames, labelvalues = json.loads(key)
26+
metrics.setdefault(metric_name, core.Metric(metric_name, 'Multiprocess metric', typ))
27+
metric = metrics[metric_name]
28+
if typ == 'gauge':
29+
pid = parts[2][:-3]
30+
metric._multiprocess_mode = parts[1]
31+
metric.add_sample(name, tuple(zip(labelnames, labelvalues)) + (('pid', pid), ), value)
32+
else:
33+
# The deplucates and labels are fixed in the next for.
34+
metric.add_sample(name, tuple(zip(labelnames, labelvalues)), value)
35+
36+
for metric in metrics.values():
37+
samples = {}
38+
buckets = {}
39+
for name, labels, value in metric.samples:
40+
if metric.type == 'gauge':
41+
without_pid = tuple([l for l in labels if l[0] != 'pid'])
42+
if metric._multiprocess_mode == 'min':
43+
samples.setdefault((name, without_pid), value)
44+
if samples[(name, without_pid)] > value:
45+
samples[(name, without_pid)] = value
46+
elif metric._multiprocess_mode == 'max':
47+
samples.setdefault((name, without_pid), value)
48+
if samples[(name, without_pid)] < value:
49+
samples[(name, without_pid)] = value
50+
elif metric._multiprocess_mode == 'livesum':
51+
samples.setdefault((name, without_pid), 0.0)
52+
samples[(name, without_pid)] += value
53+
else: # all/liveall
54+
samples[(name, labels)] = value
55+
elif metric.type == 'histogram':
56+
bucket = [float(l[1]) for l in labels if l[0] == 'le']
57+
if bucket:
58+
# _bucket
59+
without_le = tuple([l for l in labels if l[0] != 'le'])
60+
buckets.setdefault(without_le, {})
61+
buckets[without_le].setdefault(bucket[0], 0.0)
62+
buckets[without_le][bucket[0]] += value
63+
else:
64+
# _sum/_count
65+
samples.setdefault((name, labels), 0.0)
66+
samples[(name, labels)] += value
67+
else:
68+
# Counter and Summary.
69+
samples.setdefault((name, labels), 0.0)
70+
samples[(name, labels)] += value
71+
72+
73+
# Accumulate bucket values.
74+
if metric.type == 'histogram':
75+
for labels, values in buckets.items():
76+
acc = 0.0
77+
for bucket, value in sorted(values.items()):
78+
acc += value
79+
samples[(metric.name + '_bucket', labels + (('le', core._floatToGoString(bucket)), ))] = acc
80+
samples[(metric.name + '_count', labels)] = acc
81+
82+
# Convert to correct sample format.
83+
metric.samples = [(name, dict(labels), value) for (name, labels), value in samples.items()]
84+
return metrics.values()
85+
86+
87+
def mark_process_dead(pid, path=os.environ.get('prometheus_multiproc_dir')):
88+
"""Do bookkeeping for when one process dies in a multi-process setup."""
89+
for f in glob.glob(os.path.join(path, 'gauge_livesum_{0}.db'.format(pid))):
90+
os.remove(f)
91+
for f in glob.glob(os.path.join(path, 'gauge_liveall_{0}.db'.format(pid))):
92+
os.remove(f)

0 commit comments

Comments
 (0)
0