8000 Refactor MultiProcessCollector.collect() to allow for arbitrary mergi… · flixgithub/client_python@18017c6 · GitHub
[go: up one dir, main page]

Skip to content

Commit 18017c6

Browse files
bloodearnestbrian-brazil
authored andcommitted
Refactor MultiProcessCollector.collect() to allow for arbitrary merging. (prometheus#302)
* Refactor MultiprocessCollector.collect to allow for arbitrary merging. Factors out a merge() method from the previous collect() method, which is parameterized, and thus can be used for arbitrary merging of samples. For motivation, see discussion in issue prometheus#275 around merging dead workers data into a single mmaped file. This basically allows us to parameterize the files to be merged, and also whether to accumulate histograms or not. Accumulation is on by default, as that is what the prometheus format expects. But it can now be disabled, which allows merged values to be correctly written back to an mmaped file. Signed-off-by: Simon Davy <simon.davy@canonical.com>
1 parent 10c8eb4 commit 18017c6

File tree

3 files changed

+158
-16
lines changed

3 files changed

+158
-16
lines changed

prometheus_client/core.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ def get_sample_value(self, name, labels=None):
185185
REGISTRY = CollectorRegistry(auto_describe=True)
186186
'''The default registry.'''
187187

188-
_METRIC_TYPES = ('counter', 'gauge', 'summary', 'histogram',
188+
_METRIC_TYPES = ('counter', 'gauge', 'summary', 'histogram',
189189
'gaugehistogram', 'unknown', 'info', 'stateset')
190190

191191

@@ -378,8 +378,8 @@ def add_metric(self, labels, buckets, sum_value, timestamp=None):
378378
exemplar = None
379379
if len(b) == 3:
380380
exemplar = b[2]
381-
self.samples.append(Sample(self.name + '_bucket',
382-
dict(list(zip(self._labelnames, labels)) + [('le', bucket)]),
381+
self.samples.append(Sample(self.name + '_bucket',
382+
dict(list(zip(self._labelnames, labels)) + [('le', bucket)]),
383383
value, timestamp, exemplar))
384384
# +Inf is last and provides the count value.
385385
self.samples.append(Sample(self.name + '_count', dict(zip(self._labelnames, labels)), buckets[-1][1], timestamp))
@@ -411,7 +411,7 @@ def add_metric(self, labels, buckets, timestamp=None):
411411
'''
412412
for bucket, value in buckets:
413413
self.samples.append(Sample(
414-
self.name + '_bucket',
414+
self.name + '_bucket',
415415
dict(list(zip(self._labelnames, labels)) + [('le', bucket)]),
416416
value, timestamp))
417417

@@ -438,7 +438,7 @@ def add_metric(self, labels, value, timestamp=None):
438438
labels: A list of label values
439439
value: A dict of labels
440440
'''
441-
self.samples.append(Sample(self.name + '_info',
441+
self.samples.append(Sample(self.name + '_info',
442442
dict(dict(zip(self._labelnames, labels)), **value), 1, timestamp))
443443

444444

@@ -586,6 +586,13 @@ def close(self):
586586
self._f = None
587587

588588

589+
def _mmap_key(metric_name, name, labelnames, labelvalues):
590+
"""Format a key for use in the mmap file."""
591+
# ensure labels are in consistent order for identity
592+
labels = dict(zip(labelnames, labelvalues))
593+
return json.dumps([metric_name, name, labels], sort_keys=True)
594+
595+
589596
def _MultiProcessValue(_pidFunc=os.getpid):
590597
files = {}
591598
values = []
@@ -618,7 +625,7 @@ def __reset(self):
618625
'{0}_{1}.db'.format(file_prefix, pid['value']))
619626
files[file_prefix] = _MmapedDict(filename)
620627
self._file = files[file_prefix]
621-
self._key = json.dumps((metric_name, name, labelnames, labelvalues))
628+
self._key = _mmap_key(metric_name, name, labelnames, labelvalues)
622629
self._value = self._file.read_value(self._key)
623630

624631
def __check_for_pid_change(self):
@@ -1143,7 +1150,7 @@ class Enum(object):
11431150
Example usage:
11441151
from prometheus_client import Enum
11451152
1146-
e = Enum('task_state', 'Description of enum',
1153+
e = Enum('task_state', 'Description of enum',
11471154
states=['starting', 'running', 'stopped'])
11481155
e.state('running')
11491156

prometheus_client/multiprocess.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,24 @@ def __init__(self, registry, path=None):
2323
registry.register(self)
2424

2525
def collect(self):
26+
files = glob.glob(os.path.join(self._path, '*.db'))
27+
return self.merge(files, accumulate=True)
28+
29+
def merge(self, files, accumulate=True):
30+
"""Merge metrics from given mmap files.
31+
32+
By default, histograms are accumulated, as per prometheus wire format.
33+
But if writing the merged data back to mmap files, use
34+
accumulate=False to avoid compound accumulation.
35+
"""
2636
metrics = {}
27-
for f in glob.glob(os.path.join(self._path, '*.db')):
37+
for f in files:
2838
parts = os.path.basename(f).split('_')
2939
typ = parts[0]
3040
d = core._MmapedDict(f, read_mode=True)
3141
for key, value in d.read_all_values():
32-
metric_name, name, labelnames, labelvalues = json.loads(key)
42+
metric_name, name, labels = json.loads(key)
43+
labels_key = tuple(sorted(labels.items()))
3344

3445
metric = metrics.get(metric_name)
3546
if metric is None:
@@ -39,10 +50,10 @@ def collect(self):
3950
if typ == 'gauge':
4051
pid = parts[2][:-3]
4152
metric._multiprocess_mode = parts[1]
42-
metric.add_sample(name, tuple(zip(labelnames, labelvalues)) + (('pid', pid), ), value)
53+
metric.add_sample(name, labels_key + (('pid', pid), ), value)
4354
else:
4455
# The duplicates and labels are fixed in the next for.
45-
metric.add_sample(name, tuple(zip(labelnames, labelvalues)), value)
56+
metric.add_sample(name, labels_key, value)
4657
d.close()
4758

4859
for metric in metrics.values():
@@ -86,9 +97,17 @@ def collect(self):
8697
for labels, values in buckets.items():
8798
acc = 0.0
8899
for bucket, value in sorted(values.items()):
89-
acc += value
90-
samples[(metric.name + '_bucket', labels + (('le', core._floatToGoString(bucket)), ))] = acc
91-
samples[(metric.name + '_count', labels)] = acc
100+
sample_key = (
101+
metric.name + '_bucket',
102+
labels + (('le', core._floatToGoString(bucket)), ),
103+
)
104+
if accumulate:
105+
acc += value
106+
samples[sample_key] = acc
107+
else:
108+
samples[sample_key] = value
109+
if accumulate:
110+
samples[(metric.name + '_count', labels)] = acc
92111

93112
# Convert to correct sample format.
94113
metric.samples = [core.Sample(name, dict(labels), value) for (name, labels), value in samples.items()]

tests/test_multiprocess.py

Lines changed: 118 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,25 @@
11
from __future__ import unicode_literals
22

3+
import glob
34
import os
45
import shutil
6+
import sys
57
import tempfile
6-
import unittest
8+
9+
if sys.version_info < (2, 7):
10+
# We need the skip decorators from unittest2 on Python 2.6.
11+
import unittest2 as unittest
12+
else:
13+
import unittest
14+
715

816
from prometheus_client import core
917
from prometheus_client.core import (
1018
CollectorRegistry,
1119
Counter,
1220
Gauge,
1321
Histogram,
22+
Sample,
1423
Summary,
1524
)
1625
from prometheus_client.multiprocess import (
@@ -25,7 +34,7 @@ def setUp(self):
2534
os.environ['prometheus_multiproc_dir'] = self.tempdir
2635
core._ValueClass = core._MultiProcessValue(lambda: 123)
2736
self.registry = CollectorRegistry()
28-
MultiProcessCollector(self.registry, self.tempdir)
37+
self.collector = MultiProcessCollector(self.registry, self.tempdir)
2938

3039
def tearDown(self):
3140
del os.environ['prometheus_multiproc_dir']
@@ -137,6 +146,113 @@ def test_counter_across_forks(self):
137146
self.assertEqual(3, self.registry.get_sample_value('c_total'))
138147
self.assertEqual(1, c1._value.get())
139148

149+
@unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.")
150+
def test_collect(self):
151+
pid = 0
152+
core._ValueClass = core._MultiProcessValue(lambda: pid)
153+
labels = dict((i, i) for i in 'abcd')
154+
155+
def add_label(key, value):
156+
l = labels.copy()
157+
l[key] = value
158+
return l
159+
160+
c = Counter('c', 'help', labelnames=labels.keys(), registry=None)
161+
g = Gauge('g', 'help', labelnames=labels.keys(), registry=None)
162+
h = Histogram('h', 'help', labelnames=labels.keys(), registry=None)
163+
164+
c.labels(**labels).inc(1)
165+
g.labels(**labels).set(1)
166+
h.labels(**labels).observe(1)
167+
168+
pid = 1
169+
170+
c.labels(**labels).inc(1)
171+
g.labels(**labels).set(1)
172+
h.labels(**labels).observe(5)
173+
174+
metrics = dict((m.name, m) for m in self.collector.collect())
175+
176+
self.assertEqual(
177+
metrics['c'].samples, [Sample('c_total', labels, 2.0)]
178+
)
179+
metrics['g'].samples.sort(key=lambda x: x[1]['pid'])
180+
self.assertEqual(metrics['g'].samples, [
181+
Sample('g', add_label('pid', '0'), 1.0),
182+
Sample('g', add_label('pid', '1'), 1.0),
183+
])
184+
185+
metrics['h'].samples.sort(
186+
key=lambda x: (x[0], float(x[1].get('le', 0)))
187+
)
188+
expected_histogram = [
189+
Sample('h_bucket', add_label('le', '0.005'), 0.0),
190+
Sample('h_bucket', add_label('le', '0.01'), 0.0),
191+
Sample('h_bucket', add_label('le', '0.025'), 0.0),
192+
Sample('h_bucket', add_label('le', '0.05'), 0.0),
193+
Sample('h_bucket', add_label('le', '0.075'), 0.0),
194+
Sample('h_bucket', add_label('le', '0.1'), 0.0),
195+
Sample('h_bucket', add_label('le', '0.25'), 0.0),
196+
Sample('h_bucket', add_label('le', '0.5'), 0.0),
197+
Sample('h_bucket', add_label('le', '0.75'), 0.0),
198+
Sample('h_bucket', add_label('le', '1.0'), 1.0),
199+
Sample('h_bucket', add_label('le', '2.5'), 1.0),
200+
Sample('h_bucket', add_label('le', '5.0'), 2.0),
201+
Sample('h_bucket', add_label('le', '7.5'), 2.0),
202+
Sample('h_bucket', add_label('le', '10.0'), 2.0),
203+
Sample('h_bucket', add_label('le', '+Inf'), 2.0),
204+
Sample('h_count', labels, 2.0),
205+
Sample('h_sum', labels, 6.0),
206+
]
207+
208+
self.assertEqual(metrics['h'].samples, expected_histogram)
209+
210+
@unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.")
211+
def test_merge_no_accumulate(self):
212+
pid = 0
213+
core._ValueClass = core._MultiProcessValue(lambda: pid)
214+
labels = dict((i, i) for i in 'abcd')
215+
216+
def add_label(key, value):
217+
l = labels.copy()
218+
l[key] = value
219+
return l
220+
221+
h = Histogram('h', 'help', labelnames=labels.keys(), registry=None)
222+
h.labels(**labels).observe(1)
223+
pid = 1
224+
h.labels(**labels).observe(5)
225+
226+
path = os.path.join(os.environ['prometheus_multiproc_dir'], '*.db')
227+
files = glob.glob(path)
228+
metrics = dict(
229+
(m.name, m) for m in self.collector.merge(files, accumulate=False)
230+
)
231+
232+
metrics['h'].samples.sort(
233+
key=lambda x: (x[0], float(x[1].get('le', 0)))
234+
)
235+
expected_histogram = [
236+
Sample('h_bucket', add_label('le', '0.005'), 0.0),
237+
Sample('h_bucket', add_label('le', '0.01'), 0.0),
238+
Sample('h_bucket', add_label('le', '0.025'), 0.0),
239+
Sample('h_bucket', add_label('le', '0.05'), 0.0),
240+
Sample('h_bucket', add_label('le', '0.075'), 0.0),
241+
Sample('h_bucket', add_label('le', '0.1'), 0.0),
242+
Sample('h_bucket', add_label('le', '0.25'), 0.0),
243+
Sample('h_bucket', add_label('le', '0.5'), 0.0),
244+
Sample('h_bucket', add_label('le', '0.75'), 0.0),
245+
Sample('h_bucket', add_label('le', '1.0'), 1.0),
246+
Sample('h_bucket', add_label('le', '2.5'), 0.0),
247+
Sample('h_bucket', add_label('le', '5.0'), 1.0),
248+
Sample('h_bucket', add_label('le', '7.5'), 0.0),
249+
Sample('h_bucket', add_label('le', '10.0'), 0.0),
250+
Sample('h_bucket', add_label('le', '+Inf'), 0.0),
251+
Sample('h_sum', labels, 6.0),
252+
]
253+
254+
self.assertEqual(metrics['h'].samples, expected_histogram)
255+
140256

141257
class TestMmapedDict(unittest.TestCase):
142258
def setUp(self):

0 commit comments

Comments
 (0)
0