10000 Merge pull request #421 from valohai/multiproc-expose-speed · benxiaolang-hacker/client_python@df024e0 · GitHub
[go: up one dir, main page]

Skip to content

Commit df024e0

Browse files
authored
Merge pull request prometheus#421 from valohai/multiproc-expose-speed
Multiprocess exposition speed boost
2 parents 5132fd2 + 0f544eb commit df024e0

File tree

2 files changed

+84
-50
lines changed

2 files changed

+84
-50
lines changed

prometheus_client/mmap_dict.py

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,29 @@ def _pack_integer(data, pos, value):
2222
data[pos:pos + 4] = _pack_integer_func(value)
2323

2424

25+
def _read_all_values(data, used=0):
26+
"""Yield (key, value, pos). No locking is performed."""
27+
28+
if used <= 0:
29+
# If not valid `used` value is passed in, read it from the file.
30+
used = _unpack_integer(data, 0)[0]
31+
32+
pos = 8
33+
34+
while pos < used:
35+
encoded_len = _unpack_integer(data, pos)[0]
36+
# check we are not reading beyond bounds
37+
if encoded_len + pos > used:
38+
raise RuntimeError('Read beyond file size detected, file is corrupted.')
39+
pos += 4
40+
encoded_key = data[pos : pos + encoded_len]
41+
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
42+
pos += padded_len
43+
value = _unpack_double(data, pos)[0]
44+
yield encoded_key.decode('utf-8'), value, pos
45+
pos += 8
46+
47+
2548
class MmapedDict(object):
2649
"""A dict of doubles, backed by an mmapped file.
2750
@@ -37,9 +60,11 @@ class MmapedDict(object):
3760
def __init__(self, filename, read_mode=False):
3861
self._f = open(filename, 'rb' if read_mode else 'a+b')
3962
self._fname = filename
40-
if os.fstat(self._f.fileno()).st_size == 0:
63+
capacity = os.fstat(self._f.fileno()).st_size
64+
if capacity == 0:
4165
self._f.truncate(_INITIAL_MMAP_SIZE)
42-
self._capacity = os.fstat(self._f.fileno()).st_size
66+
capacity = _INITIAL_MMAP_SIZE
67+
self._capacity = capacity
4368
self._m = mmap.mmap(self._f.fileno(), self._capacity,
4469
access=mmap.ACCESS_READ if read_mode else mmap.ACCESS_WRITE)
4570

@@ -53,6 +78,17 @@ def __init__(self, filename, read_mode=False):
5378
for key, _, pos in self._read_all_values():
5479
self._positions[key] = pos
5580

81+
@staticmethod
82+
def read_all_values_from_file(filename):
83+
with open(filename, 'rb') as infp:
84+
# Read the first block of data, including the first 4 bytes which tell us
85+
# how much of the file (which is preallocated to _INITIAL_MMAP_SIZE bytes) is occupied.
86+
data = infp.read(65535)
87+
used = _unpack_integer(data, 0)[0]
88+
if used > len(data): # Then read in the rest, if needed.
89+
data += infp.read(used - len(data))
90+
return _read_all_values(data, used)
91+
5692
def _init_value(self, key):
5793
"""Initialize a value. Lock must be held by caller."""
5894
encoded = key.encode('utf-8')
@@ -72,31 +108,10 @@ def _init_value(self, key):
72108

73109
def _read_all_values(self):
74110
"""Yield (key, value, pos). No locking is performed."""
75-
76-
pos = 8
77-
78-
# cache variables to local ones and prevent attributes lookup
79-
# on every loop iteration
80-
used = self._used
81-
data = self._m
82-
unpack_from = struct.unpack_from
83-
84-
while pos < used:
85-
encoded_len = _unpack_integer(data, pos)[0]
86-
# check we are not reading beyond bounds
87-
if encoded_len + pos > used:
88-
msg = 'Read beyond file size detected, %s is corrupted.'
89-
raise RuntimeError(msg % self._fname)
90-
pos += 4
91-
encoded = unpack_from(('%ss' % encoded_len).encode(), data, pos)[0]
92-
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
93-
pos += padded_len
94-
value = _unpack_double(data, pos)[0]
95-
yield encoded.decode('utf-8'), value, pos
96-
pos += 8
111+
return _read_all_values(data=self._m, used=self._used)
97112

98113
def read_all_values(self):
99-
"""Yield (key, value, pos). No locking is performed."""
114+
"""Yield (key, value). No locking is performed."""
100115
for k, v, _ in self._read_all_values():
101116
yield k, v
102117

prometheus_client/multiprocess.py

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
from .samples import Sample
1313
from .utils import floatToGoString
1414

15+
MP_METRIC_HELP = 'Multiprocess metric'
16+
1517

1618
class MultiProcessCollector(object):
1719
"""Collector for files for multi-process mode."""
@@ -33,18 +35,31 @@ def merge(files, accumulate=True):
3335
But if writing the merged data back to mmap files, use
3436
accumulate=False to avoid compound accumulation.
3537
"""
38+
metrics = MultiProcessCollector._read_metrics(files)
39+
return MultiProcessCollector._accumulate_metrics(metrics, accumulate)
40+
41+
@staticmethod
42+
def _read_metrics(files):
3643
metrics = {}
44+
key_cache = {}
45+
46+
def _parse_key(key):
47+
val = key_cache.get(key)
48+
if not val:
49+
metric_name, name, labels = json.loads(key)
50+
labels_key = tuple(sorted(labels.items()))
51+
val = key_cache[key] = (metric_name, name, labels, labels_key)
52+
return val
53+
3754
for f in files:
3855
parts = os.path.basename(f).split('_')
3956
typ = parts[0]
40-
d = MmapedDict(f, read_mode=True)
41-
for key, value in d.read_all_values():
42-
metric_name, name, labels = json.loads(key)
43-
labels_key = tuple(sorted(labels.items()))
57+
for key, value, pos in MmapedDict.read_all_values_from_file(f):
58+
metric_name, name, labels, labels_key = _parse_key(key)
4459

4560
metric = metrics.get(metric_name)
4661
if metric is None:
47-
metric = Metric(metric_name, 'Multiprocess metric', typ)
62+
metric = Metric(metric_name, MP_METRIC_HELP, typ)
4863
metrics[metric_name] = metric
4964

5065
if typ == 'gauge':
@@ -54,43 +69,47 @@ def merge(files, accumulate=True):
5469
else:
5570
# The duplicates and labels are fixed in the next for.
5671
metric.add_sample(name, labels_key, value)
57-
d.close()
72+
return metrics
5873

74+
@staticmethod
75+
def _accumulate_metrics(metrics, accumulate):
5976
for metric in metrics.values():
6077
samples = defaultdict(float)
61-
buckets = {}
78+
buckets = defaultdict(lambda: defaultdict(float))
79+
samples_setdefault = samples.setdefault
6280
for s in metric.samples:
63-
name, labels, value = s.name, s.labels, s.value
81+
name, labels, value, timestamp, exemplar = s
6482
if metric.type == 'gauge':
65-
without_pid = tuple(l for l in labels if l[0] != 'pid')
83+
without_pid_key = (name, tuple([l for l in labels if l[0] != 'pid']))
6684
if metric._multiprocess_mode == 'min':
67-
current = samples.setdefault((name, without_pid), value)
85+
current = samples_setdefault(without_pid_key, value)
6886
if value < current:
69-
samples[(s.name, without_pid)] = value
87+
samples[without_pid_key] = value
7088
elif metric._multiprocess_mode == 'max':
71-
current = samples.setdefault((name, without_pid), value)
89+
current = samples_setdefault(without_pid_key, value)
7290
if value > current:
73-
samples[(s.name, without_pid)] = value
91+
samples[without_pid_key] = value
7492
elif metric._multiprocess_mode == 'livesum':
75-
samples[(name, without_pid)] += value
93+
samples[without_pid_key] += value
7694
else: # all/liveall
7795
samples[(name, labels)] = value
7896

7997
elif metric.type == 'histogram':
80-
bucket = tuple(float(l[1]) for l in labels if l[0] == 'le')
81-
if bucket:
82-
# _bucket
83-
without_le = tuple(l for l in labels if l[0] != 'le')
84-
buckets.setdefault(without_le, {})
85-
buckets[without_le].setdefault(bucket[0], 0.0)
86-
buckets[without_le][bucket[0]] += value
87-
else:
98+
# A for loop with early exit is faster than a genexpr
99+
# or a listcomp that ends up building unnecessary things
100+
for l in labels:
101+
if l[0] == 'le':
102+
bucket_value = float(l[1])
103+
# _bucket
104+
without_le = tuple(l for l in labels if l[0] != 'le')
105+
buckets[without_le][bucket_value] += value
106+
break
107+
else: # did not find the `le` key
88108
# _sum/_count
89-
samples[(s.name, labels)] += value
90-
109+
samples[(name, labels)] += value
91110
else:
92111
# Counter and Summary.
93-
samples[(s.name, labels)] += value
112+
samples[(name, labels)] += value
94113

95114
# Accumulate bucket values.
96115
if metric.type == 'histogram':

0 commit comments

Comments
 (0)
0