8000 Merge pull request #66 from prometheus/multiproc · sonlinux/client_python@ac77d9b · GitHub
[go: up one dir, main page]

Skip to content

Commit ac77d9b

Browse files
authored
Merge pull request prometheus#66 from prometheus/multiproc
Add multi-process support
2 parents cf91d2c + 9e1f993 commit ac77d9b

File tree

5 files changed

+470
-9
lines changed

5 files changed

+470
-9
lines changed

.travis.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,22 @@ cache:
33
directories:
44
- $HOME/.cache/pip
55

6+
# Pending https://github.com/travis-ci/travis-ci/issues/5027
7+
before_install:
8+
- |
9+
if [ "$TRAVIS_PYTHON_VERSION" = "pypy" ]; then
10+
export PYENV_ROOT="$HOME/.pyenv"
11+
if [ -f "$PYENV_ROOT/bin/pyenv" ]; then
12+
cd "$PYENV_ROOT" && git pull
13+
else
14+
rm -rf "$PYENV_ROOT" && git clone --depth 1 https://github.com/yyuu/pyenv.git "$PYENV_ROOT"
15+
fi
16+
export PYPY_VERSION="4.0.1"
17+
"$PYENV_ROOT/bin/pyenv" install "pypy-$PYPY_VERSION"
18+
virtualenv --python="$PYENV_ROOT/versions/pypy-$PYPY_VERSION/bin/python" "$HOME/virtualenvs/pypy-$PYPY_VERSION"
19+
source "$HOME/virtualenvs/pypy-$PYPY_VERSION/bin/activate"
20+
fi
21+
622
language: python
723

824
matrix:

README.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,74 @@ REGISTRY.register(CustomCollector())
365365

366366
`SummaryMetricFamily` and `HistogramMetricFamily` work similarly.
367367

368+
## Multiprocess Mode (Gunicorn)
369+
370+
**Experimental: This feature is new and has rough edges.**
371+
372+
Prometheus client libaries presume a threaded model, where metrics are shared
373+
across workers. This doesn't work so well for languages such as Python where
374+
it's common to have processes rather than threads to handle large workloads.
375+
376+
To handle this the client library can be put in multiprocess mode.
377+
This comes with a number of limitations:
378+
379+
- Registries can not be used as normal, all instantiated metrics are exported
380+
- Custom collectors do not work (e.g. cpu and memory metrics)
381+
- The pushgateway cannot be used
382+
- Gauges cannot use the `pid` label
383+
- Gunicron's `preload_app` feature is not supported
384+
385+
There's several steps to getting this working:
386+
387+
**One**: Gunicorn deployment
388+
389+
The `prometheus_multiproc_dir` environment variable must be set to a directory
390+
that the client library can use for metrics. This directory must be wiped
391+
between Gunicorn runs (before startup is recommended).
392+
393+
Put the following in the config file:
394+
```python
395+
def worker_exit(server, worker):
396+
from prometheus_client import multiprocess
397+
multiprocess.mark_process_dead(worker.pid)
398+
```
399+
400+
**Two**: Inside the application
401+
```python
402+
from prometheus_client import multiprocess
403+
from prometheus_client import generate_latest, CollectorRegistry, CONTENT_TYPE_LATEST, Gauge
404+
405+
# Example gauge.
406+
IN_PROGRESS = Gauge("inprogress_requests", "help", multiprocess_mode='livesum')
407+
408+
409+
# Expose metrics.
410+
@IN_PROGRESS.track_inprogress()
411+
def app(environ, start_response):
412+
registry = CollectorRegistry()
413+
multiprocess.MultiProcessCollector(registry)
414+
data = generate_latest(registry)
415+
status = '200 OK'
416+
response_headers = [
417+
('Content-type', CONTENT_TYPE_LATEST),
418+
('Content-Length', str(len(data)))
419+
]
420+
start_response(status, response_headers)
421+
return iter([data])
422+
```
423+
424+
**Three**: Instrumentation
425+
426+
Counters, Summarys and Histograms work as normal.
427+
428+
Gauges have several modes they can run in, which can be selected with the
429+
`multiprocess_mode` parameter.
430+
431+
- 'all': Default. Return a timeseries per process alive or dead.
432+
- 'liveall': Return a timeseries per process that is still alive.
433+
- 'livesum': Return a single timeseries that is the sum of the values of alive processes.
434+
- 'max': Return a single timeseries that is the maximum of the values of all processes, alive or dead.
435+
- 'min': Return a single timeseries that is the minimum of the values of all processes, alive or dead.
368436

369437
## Parser
370438

prometheus_client/core.py

Lines changed: 152 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33
from __future__ import unicode_literals
44

55
import copy
6+
import json
67
import math
8+
import mmap
9+
import os
710
import re
11+
import struct
812
import types
913
from timeit import default_timer
1014

@@ -23,6 +27,7 @@
2327
_RESERVED_METRIC_LABEL_NAME_RE = re.compile(r'^__.*$')
2428
_INF = float("inf")
2529
_MINUS_INF = float("-inf")
30+
_INITIAL_MMAP_SIZE = 1024*1024
2631

2732

2833
class CollectorRegistry(object):
@@ -220,7 +225,9 @@ def add_metric(self, labels, buckets, sum_value):
220225
class _MutexValue(object):
221226
'''A float protected by a mutex.'''
222227

223-
def __init__(self, name, labelnames, labelvalues):
228+
_multiprocess = False
229+
230+
def __init__(self, typ, metric_name, name, labelnames, labelvalues, **kwargs):
224231
self._value = 0.0
225232
self._lock = Lock()
226233

@@ -236,7 +243,139 @@ def get(self):
236243
with self._lock:
237244
return self._value
238245

239-
_ValueClass = _MutexValue
246+
class _MmapedDict(object):
247+
"""A dict of doubles, backed by an mmapped file.
248+
249+
The file starts with a 4 byte int, indicating how much of it is used.
250+
Then 4 bytes of padding.
251+
There's then a number of entries, consisting of a 4 byte int which is the
252+
side of the next field, a utf-8 encoded string key, padding to a 8 byte
253+
alignment, and then a 8 byte float which is the value.
254+
"""
255+
def __init__(self, filename):
256+
self._lock = Lock()
257+
self._f = open(filename, 'a+b')
258+
if os.fstat(self._f.fileno()).st_size == 0:
259+
self._f.truncate(_INITIAL_MMAP_SIZE)
260+
self._capacity = os.fstat(self._f.fileno()).st_size
261+
self._m = mmap.mmap(self._f.fileno(), self._capacity)
262+
263+
self._positions = {}
264+
self._used = struct.unpack_from(b'i', self._m, 0)[0]
265+
if self._used == 0:
266+
self._used = 8
267+
struct.pack_into(b'i', self._m, 0, self._used)
268+
else:
269+
for key, _, pos in self._read_all_values():
270+
self._positions[key] = pos
271+
272+
def _init_value(self, key):
273+
"""Initilize a value. Lock must be held by caller."""
274+
encoded = key.encode('utf-8')
275+
# Pad to be 8-byte aligned.
276+
padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8))
277+
value = struct.pack('i{0}sd'.format(len(padded)).encode(), len(encoded), padded, 0.0)
278+
while self._used + len(value) > self._capacity:
279+
self._capacity *= 2
280+
self._f.truncate(self._capacity * 2)
281+
self._m = mmap.mmap(self._f.fileno(), self._capacity)
282+
self._m[self._used:self._used + len(value)] = value
283+
284+
# Update how much space we've used.
285+
self._used += len(value)
286+
struct.pack_into(b'i', self._m, 0, self._used)
287+
self._positions[key] = self._used - 8
288+
289+
def _read_all_values(self):
290+
"""Yield (key, value, pos). No locking is performed."""
291+
pos = 8
292+
while pos < self._used:
293+
encoded_len = struct.unpack_from(b'i', self._m, pos)[0]
294+
pos += 4
295+
encoded = struct.unpack_from('{0}s'.format(encoded_len).encode(), self._m, pos)[0]
296+
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
297+
pos += padded_len
298+
value = struct.unpack_from(b'd', self._m, pos)[0]
299+
yield encoded.decode('utf-8'), value, pos
300+
pos += 8
301+
302+
def read_all_values(self):
303+
"""Yield (key, value, pos). No locking is performed."""
304+
for k, v, _ in self._read_all_values():
305+
yield k, v
306+
307+
def read_value(self, key):
308+
with self._lock:
309+
if key not in self._positions:
310+
self._init_value(key)
311+
pos = self._positions[key]
312+
# We assume that reading from an 8 byte aligned value is atomic
313+
return struct.unpack_from(b'd', self._m, pos)[0]
314+
315+
def write_value(self, key, value):
316+
with self._lock:
317+
if key not in self._positions:
318+
self._init_value(key)
319+
pos = self._positions[key]
320+
# We assume that writing to an 8 byte aligned value is atomic
321+
struct.pack_into(b'd', self._m, pos, value)
322+
323+
def close(self):
324+
if self._f:
325+
self._f.close()
326+
self._f = None
327+
328+
329+
def _MultiProcessValue(__pid=os.getpid()):
330+
pid = __pid
331+
files = {}
332+
files_lock = Lock()
333+
334+
class _MmapedValue(object):
335+
'''A float protected by a mutex backed by a per-process mmaped file.'''
336+
337+
_multiprocess = True
338+
339+
def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs):
340+
if typ == 'gauge':
341+
file_prefix = typ + '_' + multiprocess_mode
342+
else:
343+
file_prefix = typ
344+
with files_lock:
345+
if file_prefix not in files:
346+
filename = os.path.join(
347+
os.environ['prometheus_multiproc_dir'], '{0}_{1}.db'.format(file_prefix, pid))
348+
files[file_prefix] = _MmapedDict(filename)
349+
self._file = files[file_prefix]
350+
self._key = json.dumps((metric_name, name, labelnames, labelvalues))
351+
self._value = self._file.read_value(self._key)
352+
self._lock = Lock()
353+
354+
def inc(self, amount):
355+
with self._lock:
356+
self._value += amount
357+
self._file.write_value(self._key, self._value)
358+
359+
def set(self, value):
360+
with self._lock:
361+
self._value = value
362+
self._file.write_value(self._key, self._value)
363+
364+
def get(self):
365+
with self._lock:
366+
return self._value
367+
368+
return _MmapedValue
369+
370+
371+
# Should we enable multi-process mode?
372+
# This needs to be chosen before the first metric is constructed,
373+
# and as that may be in some arbitrary library the user/admin has
374+
# no control over we use an enviroment variable.
375+
if 'prometheus_multiproc_dir' in os.environ:
376+
_ValueClass = _MultiProcessValue()
377+
else:
378+
_ValueClass = _MutexValue
240379

241380

242381
class _LabelWrapper(object):
@@ -387,7 +526,7 @@ def f():
387526
_reserved_labelnames = []
388527

389528
def __init__(self, name, labelnames, labelvalues):
390-
self._value = _ValueClass(name, labelnames, labelvalues)
529+
self._value = _ValueClass(self._type, name, name, labelnames, labelvalues)
391530

392531
def inc(self, amount=1):
393532
'''Increment counter by the given amount.'''
@@ -449,8 +588,12 @@ def f():
449588
_type = 'gauge'
450589
_reserved_labelnames = []
451590

452-
def __init__(self, name, labelnames, labelvalues):
453-
self._value = _ValueClass(name, labelnames, labelvalues)
591+
def __init__(self, name, labelnames, labelvalues, multiprocess_mode='all'):
592+
if (_ValueClass._multiprocess
593+
and multiprocess_mode not in ['min', 'max', 'livesum', 'liveall', 'all']):
594+
raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode)
595+
self._value = _ValueClass(self._type, name, name, labelnames,
596+
labelvalues, multiprocess_mode=multiprocess_mode)
454597

455598
def inc(self, amount=1):
456599
'''Increment gauge by the given amount.'''
@@ -533,8 +676,8 @@ def create_response(request):
533676
_reserved_labelnames = ['quantile']
534677

535678
def __init__(self, name, labelnames, labelvalues):
536-
self._count = _ValueClass(name + '_count', labelnames, labelvalues)
537-
self._sum = _ValueClass(name + '_sum', labelnames, labelvalues)
679+
self._count = _ValueClass(self._type, name, name + '_count', labelnames, labelvalues)
680+
self._sum = _ValueClass(self._type, name, name + '_sum', labelnames, labelvalues)
538681

539682
def observe(self, amount):
540683
'''Observe the given amount.'''
@@ -607,7 +750,7 @@ def create_response(request):
607750
_reserved_labelnames = ['histogram']
608751

609752
def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, _INF)):
610-
self._sum = _ValueClass(name + '_sum', labelnames, labelvalues)
753+
self._sum = _ValueClass(self._type, name, name + '_sum', labelnames, labelvalues)
611754
buckets = [float(b) for b in buckets]
612755
if buckets != sorted(buckets):
613756
# This is probably an error on the part of the user,
@@ -621,7 +764,7 @@ def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05,
621764
self._buckets = []
622765
bucket_labelnames = labelnames + ('le',)
623766
for b in buckets:
624-
self._buckets.append(_ValueClass(name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),)))
767+
self._buckets.append(_ValueClass(self._type, name, name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),)))
625768

626769
def observe(self, amount):
627770
'''Observe the given amount.'''

0 commit comments

Comments
 (0)
0