8000 Reset multiproc values when the pid changes. by brian-brazil · Pull Request #169 · prometheus/client_python · GitHub
[go: up one dir, main page]

Skip to content

Reset multiproc values when the pid changes. #169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ This comes with a number of limitations:
- Custom collectors do not work (e.g. cpu and memory metrics)
- The pushgateway cannot be used
- Gauges cannot use the `pid` label
- Gunicorn's `preload_app` feature and equivalents are not supported

There's several steps to getting this working:

Expand Down
34 changes: 27 additions & 7 deletions
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,9 @@ def close(self):
self._f = None


def _MultiProcessValue(__pid=os.getpid()):
pid = __pid
def _MultiProcessValue(_pidFunc=os.getpid):
files = {}
values = []
# Use a single global lock when in multi-processing mode
# as we presume this means there is no threading going on.
# This avoids the need to also have mutexes in __MmapDict.
Expand All @@ -406,31 +406,51 @@ class _MmapedValue(object):
_multiprocess = True

def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs):
self._params = typ, metric_name, name, labelnames, labelvalues, multiprocess_mode
with lock:
self.__reset()
values.append(self)


def __reset(self):
self._pid = _pidFunc()
typ, metric_name, name, labelnames, labelvalues, multiprocess_mode = self._params
if typ == 'gauge':
file_prefix = typ + '_' + multiprocess_mode
else:
file_prefix = typ
with lock:
if file_prefix not in files:
filename = os.path.join(
os.environ['prometheus_multiproc_dir'], '{0}_{1}.db'.format(file_prefix, pid))
files[file_prefix] = _MmapedDict(filename)
if file_prefix not in files:
filename = os.path.join(
os.environ['prometheus_multiproc_dir'], '{0}_{1}.db'.format(file_prefix, self._pid))
files[file_prefix] = _MmapedDict(filename)
self._file = files[file_prefix]
self._key = json.dumps((metric_name, name, labelnames, labelvalues))
self._value = self._file.read_value(self._key)

def __check_for_pid_change(self):
if self._pid != _pidFunc():
# There has been a fork(), reset all the values.
for f in files.values():
f.close()
files.clear()
for value in values:
value.__reset()

def inc(self, amount):
with lock:
self.__check_for_pid_change()
self._value += amount
self._file.write_value(self._key, self._value)

def set(self, value):
with lock:
self.__check_for_pid_change()
self._value = value
self._file.write_value(self._key, self._value)

def get(self):
with lock:
self.__check_for_pid_change()
return self._value

return _MmapedValue
Expand Down
32 changes: 23 additions & 9 deletions tests/test_multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class TestMultiProcess(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
os.environ['prometheus_multiproc_dir'] = self.tempdir
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(123)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 123)
self.registry = CollectorRegistry()
MultiProcessCollector(self.registry, self.tempdir)

Expand All @@ -24,7 +24,7 @@ def tearDown(self):

def test_counter_adds(self):
c1 = Counter('c', 'help', registry=None)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
c2 = Counter('c', 'help', registry=None)
self.assertEqual(0, self.registry.get_sample_value('c'))
c1.inc(1)
Expand All @@ -33,7 +33,7 @@ def test_counter_adds(self):

def test_summary_adds(self):
s1 = Summary('s', 'help', registry=None)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
s2 = Summary('s', 'help', registry=None)
self.assertEqual(0, self.registry.get_sample_value('s_count'))
self.assertEqual(0, self.registry.get_sample_value('s_sum'))
Expand All @@ -44,7 +44,7 @@ def test_summary_adds(self):

def test_histogram_adds(self):
h1 = Histogram('h', 'help', registry=None)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
h2 = Histogram('h', 'help', registry=None)
self.assertEqual(0, self.registry.get_sample_value('h_count'))
self.assertEqual(0, self.registry.get_sample_value('h_sum'))
Expand All @@ -57,7 +57,7 @@ def test_histogram_adds(self):

def test_gauge_all(self):
g1 = Gauge('g', 'help', registry=None)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
g2 = Gauge('g', 'help', registry=None)
self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '123'}))
self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '456'}))
Expand All @@ -69,7 +69,7 @@ def test_gauge_all(self):

def test_gauge_liveall(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='liveall')
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='liveall')
self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '123'}))
self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '456'}))
Expand All @@ -83,7 +83,7 @@ def test_gauge_liveall(self):

def test_gauge_min(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='min')
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='min')
self.assertEqual(0, self.registry.get_sample_value('g'))
g1.set(1)
Expand All @@ -92,7 +92,7 @@ def test_gauge_min(self):

def test_gauge_max(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='max')
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='max')
self.assertEqual(0, self.registry.get_sample_value('g'))
g1.set(1)
Expand All @@ -101,7 +101,7 @@ def test_gauge_max(self):

def test_gauge_livesum(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='livesum')
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='livesum')
self.assertEqual(0, self.registry.get_sample_value('g'))
g1.set(1)
Expand All @@ -115,6 +115,20 @@ def test_namespace_subsystem(self):
c1.inc(1)
self.assertEqual(1, self.registry.get_sample_value('ns_ss_c'))

def test_counter_across_forks(self):
pid = 0
def get_pid():
return pid
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(get_pid)
c1 = Counter('c', 'help', registry=None)
self.assertEqual(0, self.registry.get_sample_value('c'))
c1.inc(1)
c1.inc(1)
pid = 1
c1.inc(1)
self.assertEqual(3, self.registry.get_sample_value('c'))
self.assertEqual(1, c1._value.get())


class TestMmapedDict(unittest.TestCase):
def setUp(self):
Expand Down
0