8000 Reset multiproc values when the pid changes. (#169) · pythonAI/client_python@50d6e0b · GitHub
[go: up one dir, main page]

Skip to content

Commit 50d6e0b

Browse files
authored
Reset multiproc values when the pid changes. (prometheus#169)
This puts us at ~2000ns for an operation, which is ~300ns than previously but still slightly better than we were before we switched to a single lock. This is twice as slow as without multiproc mode.
1 parent 0634b77 commit 50d6e0b

File tree

3 files changed

+50
-17
lines changed

3 files changed

+50
-17
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,6 @@ This comes with a number of limitations:
416416
- Custom collectors do not work (e.g. cpu and memory metrics)
417417
- The pushgateway cannot be used
418418
- Gauges cannot use the `pid` label
419-
- Gunicorn's `preload_app` feature and equivalents are not supported
420419

421420
There's several steps to getting this working:
422421

prometheus_client/core.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -392,9 +392,9 @@ def close(self):
392392
self._f = None
393393

394394

395-
def _MultiProcessValue(__pid=os.getpid()):
396-
pid = __pid
395+
def _MultiProcessValue(_pidFunc=os.getpid):
397396
files = {}
397+
values = []
398398
# Use a single global lock when in multi-processing mode
399399
# as we presume this means there is no threading going on.
400400
# This avoids the need to also have mutexes in __MmapDict.
@@ -406,31 +406,51 @@ class _MmapedValue(object):
406406
_multiprocess = True
407407

408408
def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs):
409+
self._params = typ, metric_name, name, labelnames, labelvalues, multiprocess_mode
410+
with lock:
411+
self.__reset()
412+
values.append(self)
413+
414+
415+
def __reset(self):
416+
self._pid = _pidFunc()
417+
typ, metric_name, name, labelnames, labelvalues, multiprocess_mode = self._params
409418
if typ == 'gauge':
410419
file_prefix = typ + '_' + multiprocess_mode
411420
else:
412421
file_prefix = typ
413-
with lock:
414-
if file_prefix not in files:
415-
filename = os.path.join(
416-
os.environ['prometheus_multiproc_dir'], '{0}_{1}.db'.format(file_prefix, pid))
417-
files[file_prefix] = _MmapedDict(filename)
422+
if file_prefix not in files:
423+
filename = os.path.join(
424+
os.environ['prometheus_multiproc_dir'], '{0}_{1}.db'.format(file_prefix, self._pid))
425+
files[file_prefix] = _MmapedDict(filename)
418426
self._file = files[file_prefix]
419427
self._key = json.dumps((metric_name, name, labelnames, labelvalues))
420428
self._value = self._file.read_value(self._key)
421429

430+
def __check_for_pid_change(self):
431+
if self._pid != _pidFunc():
432+
# There has been a fork(), reset all the values.
433+
for f in files.values():
434+
f.close()
435+
files.clear()
436+
for value in values:
437+
value.__reset()
438+
422439
def inc(self, amount):
423440
with lock:
441+
self.__check_for_pid_change()
424442
self._value += amount
425443
self._file.write_value(self._key, self._value)
426444

427445
def set(self, value):
428446
with lock:
447+
self.__check_for_pid_change()
429448
self._value = value
430449
self._file.write_value(self._key, self._value)
431450

432451
def get(self):
433452
with lock:
453+
self.__check_for_pid_change()
434454
return self._value
435455

436456
return _MmapedValue

tests/test_multiprocess.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class TestMultiProcess(unittest.TestCase):
1313
def setUp(self):
1414
self.tempdir = tempfile.mkdtemp()
1515
os.environ['prometheus_multiproc_dir'] = self.tempdir
16-
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(123)
16+
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 123)
1717
self.registry = CollectorRegistry()
1818
MultiProcessCollector(self.registry, self.tempdir)
1919

@@ -24,7 +24,7 @@ def tearDown(self):
2424

2525
def test_counter_adds(self):
2626
c1 = Counter('c', 'help', registry=None)
27-
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
27+
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
2828
c2 = Counter('c', 'help', registry=None)
2929
self.assertEqual(0, self.registry.get_sample_value('c'))
3030
c1.inc(1)
@@ -33,7 +33,7 @@ def test_counter_adds(self):
3333

3434
def test_summary_adds(self):
3535
s1 = Summary('s', 'help', registry=None)
36-
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
36+
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
3737
s2 = Summary('s', 'help', registry=None)
3838
self.assertEqual(0, self.registry.get_sample_value('s_count'))
3939
self.assertEqual(0, self.registry.get_sample_value('s_sum'))
@@ -44,7 +44,7 @@ def test_summary_adds(self):
4444

4545
def test_histogram_adds(self):
4646
h1 = Histogram('h', 'help', registry=None)
47-
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
47+
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
4848
h2 = Histogram('h', 'help', registry=None)
4949
self.assertEqual(0, self.registry.get_sample_value('h_count'))
5050
self.assertEqual(0, self.registry.get_sample_value('h_sum'))
@@ -57,7 +57,7 @@ def test_histogram_adds(self):
5757

5858
def test_gauge_all(self):
5959
g1 = Gauge('g', 'help', registry=None)
60-
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
60+
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
6161
g2 = Gauge('g', 'help', registry=None)
6262
self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '123'}))
6363
self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '456'}))
@@ -69,7 +69,7 @@ def test_gauge_all(self):
6969

7070
def test_gauge_liveall(self):
7171
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='liveall')
72-
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
72+
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
7373
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='liveall')
7474
self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '123'}))
7575
self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '456'}))
@@ -83,7 +83,7 @@ def test_gauge_liveall(self):
8383

8484
def test_gauge_min(self):
8585
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='min')
86-
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
86+
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
8787
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='min')
8888
self.assertEqual(0, self.registry.get_sample_value('g'))
8989
g1.set(1)
@@ -92,7 +92,7 @@ def test_gauge_min(self):
9292

9393
def test_gauge_max(self):
9494
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='max')
95-
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
95+
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
9696
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='max')
9797
self.assertEqual(0, self.registry.get_sample_value('g'))
9898
g1.set(1)
@@ -101,7 +101,7 @@ def test_gauge_max(self):
101101

102102
def test_gauge_livesum(self):
103103
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='livesum')
104-
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(456)
104+
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(lambda: 456)
105105
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='livesum')
106106
self.assertEqual(0, self.registry.get_sample_value('g'))
107107
g1.set(1)
@@ -115,6 +115,20 @@ def test_namespace_subsystem(self):
115115
c1.inc(1)
116116
self.assertEqual(1, self.registry.get_sample_value('ns_ss_c'))
117117

118+
def test_counter_across_forks(self):
119+
pid = 0
120+
def get_pid():
121+
return pid
122+
prometheus_client.core._ValueClass = prometheus_client.core._MultiProcessValue(get_pid)
123+
c1 = Counter('c', 'help', registry=None)
124+
self.assertEqual(0, self.registry.get_sample_value('c'))
125+
c1.inc(1)
126+
c1.inc(1)
127+
pid = 1
128+
c1.inc(1)
129+
self.assertEqual(3, self.registry.get_sample_value('c'))
130+
self.assertEqual(1, c1._value.get())
131+
118132

119133
class TestMmapedDict(unittest.TestCase):
120134
def setUp(self):

0 commit comments

Comments
 (0)
0