8000 Add multi-process support by brian-brazil · Pull Request #66 · prometheus/client_python · GitHub
[go: up one dir, main page]

Skip to content

Add multi-process support #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@ cache:
directories:
- $HOME/.cache/pip

# Pending https://github.com/travis-ci/travis-ci/issues/5027
before_install:
- |
if [ "$TRAVIS_PYTHON_VERSION" = "pypy" ]; then
export PYENV_ROOT="$HOME/.pyenv"
if [ -f "$PYENV_ROOT/bin/pyenv" ]; then
cd "$PYENV_ROOT" && git pull
else
rm -rf "$PYENV_ROOT" && git clone --depth 1 https://github.com/yyuu/pyenv.git "$PYENV_ROOT"
fi
export PYPY_VERSION="4.0.1"
"$PYENV_ROOT/bin/pyenv" install "pypy-$PYPY_VERSION"
virtualenv --python="$PYENV_ROOT/versions/pypy-$PYPY_VERSION/bin/python" "$HOME/virtualenvs/pypy-$PYPY_VERSION"
source "$HOME/virtualenvs/pypy-$PYPY_VERSION/bin/activate"
fi

language: python

matrix:
Expand Down
68 changes: 68 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,74 @@ REGISTRY.register(CustomCollector())

`SummaryMetricFamily` and `HistogramMetricFamily` work similarly.

## Multiprocess Mode (Gunicorn)

**Experimental: This feature is new and has rough edges.**

Prometheus client libaries presume a threaded model, where metrics are shared
across workers. This doesn't work so well for languages such as Python where
it's common to have processes rather than threads to handle large workloads.

To handle this the client library can be put in multiprocess mode.
This comes with a number of limitations:

- Registries can not be used as normal, all instantiated metrics are exported
- Custom collectors do not work (e.g. cpu and memory metrics)
- The pushgateway cannot be used
- Gauges cannot use the `pid` label
- Gunicron's `preload_app` feature is not supported

There's several steps to getting this working:

**One**: Gunicorn deployment

The `prometheus_multiproc_dir` environment variable must be set to a directory
that the client library can use for metrics. This directory must be wiped
between Gunicorn runs (before startup is recommended).

Put the following in the config file:
```python
def worker_exit(server, worker):
from prometheus_client import multiprocess
multiprocess.mark_process_dead(worker.pid)
```

**Two**: Inside the application
```python
from prometheus_client import multiprocess
from prometheus_client import generate_latest, CollectorRegistry, CONTENT_TYPE_LATEST, Gauge

# Example gauge.
IN_PROGRESS = Gauge("inprogress_requests", "help", multiprocess_mode='livesum')


# Expose metrics.
@IN_PROGRESS.track_inprogress()
def app(environ, start_response):
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
data = generate_latest(registry)
status = '200 OK'
response_headers = [
('Content-type', CONTENT_TYPE_LATEST),
('Content-Length', str(len(data)))
]
start_response(status, response_headers)
return iter([data])
```

**Three**: Instrumentation

Counters, Summarys and Histograms work as normal.

Gauges have several modes they can run in, which can be selected with the
`multiprocess_mode` parameter.

- 'all': Default. Return a timeseries per process alive or dead.
- 'liveall': Return a timeseries per process that is still alive.
- 'livesum': Return a single timeseries that is the sum of the values of alive processes.
8000 - 'max': Return a single timeseries that is the maximum of the values of all processes, alive or dead.
- 'min': Return a single timeseries that is the minimum of the values of all processes, alive or dead.

## Parser

Expand Down
161 changes: 152 additions & 9 deletions prometheus_client/core.py
EED5
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
from __future__ import unicode_literals

import copy
import json
import math
import mmap
import os
import re
import struct
import types
from timeit import default_timer

Expand All @@ -23,6 +27,7 @@
_RESERVED_METRIC_LABEL_NAME_RE = re.compile(r'^__.*$')
_INF = float("inf")
_MINUS_INF = float("-inf")
_INITIAL_MMAP_SIZE = 1024*1024


class CollectorRegistry(object):
Expand Down Expand Up @@ -220,7 +225,9 @@ def add_metric(self, labels, buckets, sum_value):
class _MutexValue(object):
'''A float protected by a mutex.'''

def __init__(self, name, labelnames, labelvalues):
_multiprocess = False

def __init__(self, typ, metric_name, name, labelnames, labelvalues, **kwargs):
self._value = 0.0
self._lock = Lock()

Expand All @@ -236,7 +243,139 @@ def get(self):
with self._lock:
return self._value

_ValueClass = _MutexValue
class _MmapedDict(object):
"""A dict of doubles, backed by an mmapped file.

The file starts with a 4 byte int, indicating how much of it is used.
Then 4 bytes of padding.
There's then a number of entries, consisting of a 4 byte int which is the
side of the next field, a utf-8 encoded string key, padding to a 8 byte
alignment, and then a 8 byte float which is the value.
"""
def __init__(self, filename):
self._lock = Lock()
self._f = open(filename, 'a+b')
if os.fstat(self._f.fileno()).st_size == 0:
self._f.truncate(_INITIAL_MMAP_SIZE)
self._capacity = os.fstat(self._f.fileno()).st_size
self._m = mmap.mmap(self._f.fileno(), self._capacity)

self._positions = {}
self._used = struct.unpack_from(b'i', self._m, 0)[0]
if self._used == 0:
self._used = 8
struct.pack_into(b'i', self._m, 0, self._used)
else:
for key, _, pos in self._read_all_values():
self._positions[key] = pos

def _init_value(self, key):
"""Initilize a value. Lock must be held by caller."""
encoded = key.encode('utf-8')
# Pad to be 8-byte aligned.
padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8))
value = struct.pack('i{0}sd'.format(len(padded)).encode(), len(encoded), padded, 0.0)
while self._used + len(value) > self._capacity:
self._capacity *= 2
self._f.truncate(self._capacity * 2)
self._m = mmap.mmap(self._f.fileno(), self._capacity)
self._m[self._used:self._used + len(value)] = value

# Update how much space we've used.
self._used += len(value)
struct.pack_into(b'i', self._m, 0, self._used)
self._positions[key] = self._used - 8

def _read_all_values(self):
"""Yield (key, value, pos). No locking is performed."""
pos = 8
while pos < self._used:
encoded_len = struct.unpack_from(b'i', self._m, pos)[0]
pos += 4
encoded = struct.unpack_from('{0}s'.format(encoded_len).encode(), self._m, pos)[0]
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
pos += padded_len
value = struct.unpack_from(b'd', self._m, pos)[0]
yield encoded.decode('utf-8'), value, pos
pos += 8

def read_all_values(self):
"""Yield (key, value, pos). No locking is performed."""
for k, v, _ in self._read_all_values():
yield k, v

def read_value(self, key):
with self._lock:
if key not in self._positions:
self._init_value(key)
pos = self._positions[key]
# We assume that reading from an 8 byte aligned value is atomic
return struct.unpack_from(b'd', self._m, pos)[0]

def write_value(self, key, value):
with self._lock:
if key not in self._positions:
self._init_value(key)
pos = self._positions[key]
# We assume that writing to an 8 byte aligned value is atomic
struct.pack_into(b'd', self._m, pos, value)

def close(self):
if self._f:
self._f.close()
self._f = None


def _MultiProcessValue(__pid=os.getpid()):
pid = __pid
files = {}
files_lock = Lock()

class _MmapedValue(object):
'''A float protected by a mutex backed by a per-process mmaped file.'''

_multiprocess = True

def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs):
if typ == 'gauge':
file_prefix = typ + '_' + multiprocess_mode
else:
file_prefix = typ
with files_lock:
if file_prefix not in files:
filename = os.path.join(
os.environ['prometheus_multiproc_dir'], '{0}_{1}.db'.format(file_prefix, pid))
files[file_prefix] = _MmapedDict(filename)
self._file = files[file_prefix]
self._key = json.dumps((metric_name, name, labelnames, labelvalues))
self._value = self._file.read_value(self._key)
self._lock = Lock()

def inc(self, amount):
with self._lock:
self._value += amount
self._file.write_value(self._key, self._value)

def set(self, value):
with self._lock:
self._value = value
self._file.write_value(self._key, self._value)

def get(self):
with self._lock:
return self._value

return _MmapedValue


# Should we enable multi-process mode?
# This needs to be chosen before the first metric is constructed,
# and as that may be in some arbitrary library the user/admin has
# no control over we use an enviroment variable.
if 'prometheus_multiproc_dir' in os.environ:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than hard coding the choice of _ValueClass here, could the class be defined by the users and loaded from anywhere on the path? #70 could be implemented in a separate package entirely and be developed/updated on it's own schedule. The shelf implementation can certainly ship with client_python, but users of other multi-process python servers could be free to experiment without needing to land code here.

Something like the following, shamelessly stolen from django/utils/module_loading.py with error handling stripped dangerously:

module_path, class_name = os.environ.get(PROMETHEUS_VALUE_CLASS, 'prometheus_client.core._MutexValue').rsplit('.', 1)
module = import_module(module_path)
_ValueClass = getattr(module, class_name)

Then if #70 was in it's own package prometheus_uwsgi, you might start the server with PROMETHEUS_VALUE_CLASS=prometheus_uwsgi.UwsgiCacheValue or PROMETHEUS_VALUE_CLASS=prometheus_client.core.MultiProcessValueandprometheus_multiproc_dir=/path/to/multiproc_dir` to use the shelf implementation described in this PR (removing the _ since it would become part of the public API)

Those _ValueClass implementations would be responsible for configuring themselves from the environment or wherever, which feels non-pythonic. Somehow configuring the system explicitly at process startup seems like the best path, and the developers could pass in whatever class they want, but I haven't dug in enough to the rest of the implementation to suggest what I would want that API to look like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a reasonable idea.

Somehow configuring the system explicitly at process startup seems like the best path

That won't work, as by the time your code is running all this code has likely already been imported and many metrics already exist. That's why I'm using environment variables, as they're set at exec() time before any python code has a chance to run.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That won't work, as by the time your code is running all this code has likely already been imported and many metrics already exist.

That's true assuming you want to set the _ValueClass once and only once and never have it specified in code. You could circumvent this by using a client pattern or parameterizing metric instantiation with a _ValueClass as well. I think it would make the interface nicer to use the environment variable by default, but also allow:

REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request', value_class=prometheus_client.ShelveValue)

or something of the sort.

_ValueClass = _MultiProcessValue()
else:
_ValueClass = _MutexValue


class _LabelWrapper(object):
Expand Down Expand Up @@ -387,7 +526,7 @@ def f():
_reserved_labelnames = []

def __init__(self, name, labelnames, labelvalues):
self._value = _ValueClass(name, labelnames, labelvalues)
self._value = _ValueClass(self._type, name, name, labelnames, labelvalues)

def inc(self, amount=1):
'''Increment counter by the given amount.'''
Expand Down Expand Up @@ -449,8 +588,12 @@ def f():
_type = 'gauge'
_reserved_labelnames = []

def __init__(self, name, labelnames, labelvalues):
self._value = _ValueClass(name, labelnames, labelvalues)
def __init__(self, name, labelnames, labelvalues, multiprocess_mode='all'):
if (_ValueClass._multiprocess
and multiprocess_mode not in ['min', 'max', 'livesum', 'liveall', 'all']):
raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode)
self._value = _ValueClass(self._type, name, name, labelnames,
labelvalues, multiprocess_mode=multiprocess_mode)

def inc(self, amount=1):
'''Increment gauge by the given amount.'''
Expand Down Expand Up @@ -533,8 +676,8 @@ def create_response(request):
_reserved_labelnames = ['quantile']

def __init__(self, name, labelnames, labelvalues):
self._count = _ValueClass(name + '_count', labelnames, labelvalues)
self._sum = _ValueClass(name + '_sum', labelnames, labelvalues)
self._count = _ValueClass(self._type, name, name + '_count', labelnames, labelvalues)
self._sum = _ValueClass(self._type, name, name + '_sum', labelnames, labelvalues)

def observe(self, amount):
'''Observe the given amount.'''
Expand Down Expand Up @@ -607,7 +750,7 @@ def create_response(request):
_reserved_labelnames = ['histogram']

def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, _INF)):
self._sum = _ValueClass(name + '_sum', labelnames, labelvalues)
self._sum = _ValueClass(self._type, name, name + '_sum', labelnames, labelvalues)
buckets = [float(b) for b in buckets]
if buckets != sorted(buckets):
# This is probably an error on the part of the user,
Expand All @@ -621,7 +764,7 @@ def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05,
self._buckets = []
bucket_labelnames = labelnames + ('le',)
for b in buckets:
self._buckets.append(_ValueClass(name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),)))
self._buckets.append(_ValueClass(self._type, name, name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),)))

def observe(self, amount):
'''Observe the given amount.'''
Expand Down
Loading
0