8000 Switch multi-process support to custom data store. · sonlinux/client_python@f85ccd9 · GitHub
[go: up one dir, main page]

Skip to content

Commit f85ccd9

Browse files
committed
Switch multi-process support to custom data store.
This uses an mmaped file to store the data. Fsync is not called, and concurrent readers are permitted (which some shelve backends did not permit). Microbenchmarks indicate 1.2-1.5us per instrumentation event. This compares to 3-13ms from the shelve solution, so 3 orders of magnitude better. The 1.2-1.5us is about double normal multi-process instrumentation, due to the 2nd mutex required. Python doesn't offer an RWLock which would let this be reduced. Overall this performance is acceptable, as if someone was doing something extremely performance critical with instrumentation they wouldn't be using Python.
1 parent d3256c1 commit f85ccd9

File tree

3 files changed

+137
-25
lines changed

3 files changed

+137
-25
lines changed

prometheus_client/core.py

Lines changed: 104 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
from __future__ import unicode_literals
44

55
import copy
6-
import math
76
import json
7+
import math
8+
import mmap
89
import os
910
import re
10-
import shelve
11+
import struct
1112
import types
1213
from timeit import default_timer
1314

@@ -26,6 +27,7 @@
2627
_RESERVED_METRIC_LABEL_NAME_RE = re.compile(r'^__.*$')
2728
_INF = float("inf")
2829
_MINUS_INF = float("-inf")
30+
_INITIAL_MMAP_SIZE = 1024*1024
2931

3032

3133
class CollectorRegistry(object):
@@ -241,50 +243,129 @@ def get(self):
241243
with self._lock:
242244
return self._value
243245

246+
class _MmapedDict(object):
247+
"""A dict of doubles, backed by an mmapped file.
248+
249+
The file starts with a 4 byte int, indicating how much of it is used.
250+
Then 4 bytes of padding.
251+
There's then a number of entries, consisting of a 4 byte int which is the
252+
side of the next field, a utf-8 encoded string key, padding to a 8 byte
253+
alignment, and then a 8 byte float which is the value.
254+
"""
255+
def __init__(self, filename):
256+
self._lock = Lock()
257+
self._f = open(filename, 'a+b')
258+
if os.fstat(self._f.fileno()).st_size == 0:
259+
self._f.truncate(_INITIAL_MMAP_SIZE)
260+
self._capacity = os.fstat(self._f.fileno()).st_size
261+
self._m = mmap.mmap(self._f.fileno(), self._capacity)
262+
263+
self._positions = {}
264+
self._used = struct.unpack_from(b'i', self._m, 0)[0]
265+
if self._used == 0:
266+
self._used = 8
267+
struct.pack_into(b'i', self._m, 0, self._used)
268+
else:
269+
for key, _, pos in self._read_all_values():
270+
self._positions[key] = pos
271+
272+
def _init_value(self, key):
273+
"""Initilize a value. Lock must be held by caller."""
274+
encoded = key.encode('utf-8')
275+
# Pad to be 8-byte aligned.
276+
padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8))
277+
value = struct.pack('i{0}sd'.format(len(padded)).encode(), len(encoded), padded, 0.0)
278+
while self._used + len(value) > self._capacity:
279+
self._capacity *= 2
280+
self._f.truncate(self._capacity * 2)
281+
self._m = mmap.mmap(self._f.fileno(), self._capacity)
282+
self._m[self._used:self._used + len(value)] = value
283+
284+
# Update how much space we've used.
285+
self._used += len(value)
286+
struct.pack_into(b'i', self._m, 0, self._used)
287+
self._positions[key] = self._used - 8
288+
289+
def _read_all_values(self):
290+
"""Yield (key, value, pos). No locking is performed."""
291+
pos = 8
292+
while pos < self._used:
293+
encoded_len = struct.unpack_from(b'i', self._m, pos)[0]
294+
pos += 4
295+
encoded = struct.unpack_from('{0}s'.format(encoded_len).encode(), self._m, pos)[0]
296+
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
297+
pos += padded_len
298+
value = struct.unpack_from(b'd', self._m, pos)[0]
299+
yield encoded.decode('utf-8'), value, pos
300+
pos += 8
301+
302+
def read_all_values(self):
303+
"""Yield (key, value, pos). No locking is performed."""
304+
for k, v, _ in self._read_all_values():
305+
yield k, v
306+
307+
def read_value(self, key):
308+
with self._lock:
309+
if key not in self._positions:
310+
self._init_value(key)
311+
pos = self._positions[key]
312+
# We assume that reading from an 8 byte aligned value is atomic
313+
return struct.unpack_from(b'd', self._m, pos)[0]
314+
315+
def write_value(self, key, value):
316+
with self._lock:
317+
if key not in self._positions:
318+
self._init_value(key)
319+
pos = self._positions[key]
320+
# We assume that writing to an 8 byte aligned value is atomic
321+
struct.pack_into(b'd', self._m, pos, value)
322+
323+
def close(self):
324+
if self._f:
325+
self._f.close()
326+
self._f = None
327+
244328

245329
def _MultiProcessValue(__pid=os.getpid()):
246330
pid = __pid
247-
samples = {}
248-
samples_lock = Lock()
331+
files = {}
332+
files_lock = Lock()
249333

250-
class _ShelveValue(object):
251-
'''A float protected by a mutex backed by a per-process shelve.'''
334+
class _MmapedValue(object):
335+
'''A float protected by a mutex backed by a per-process mmaped file.'''
252336

253337
_multiprocess = True
254338

255339
def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs):
256-
with samples_lock:
257-
if typ == 'gauge':
258-
file_prefix = typ + '_' + multiprocess_mode
259-
else:
260-
file_prefix = typ
261-
if file_prefix not in samples:
262-
filename = os.path.join(os.environ['prometheus_multiproc_dir'], '{0}_{1}.db'.format(file_prefix, pid))
263-
samples[file_prefix] = shelve.open(filename)
264-
self._samples = samples[file_prefix]
340+
if typ == 'gauge':
341+
file_prefix = typ + '_' + multiprocess_mode
342+
else:
343+
file_prefix = typ
344+
with files_lock:
345+
if file_prefix not in files:
346+
filename = os.path.join(
347+
os.environ['prometheus_multiproc_dir'], '{0}_{1}.db'.format(file_prefix, pid))
348+
files[file_prefix] = _MmapedDict(filename)
349+
self._file = files[file_prefix]
265350
self._key = json.dumps((metric_name, name, labelnames, labelvalues))
266-
self._value = self._samples.get(self._key, 0.0)
267-
self._samples[self._key] = self._value
268-
self._samples.sync()
351+
self._value = self._file.read_value(self._key)
269352
self._lock = Lock()
270353

271354
def inc(self, amount):
272355
with self._lock:
273356
self._value += amount
274-
self._samples[self._key] = self._value
275-
self._samples.sync()
357+
self._file.write_value(self._key, self._value)
276358

277359
def set(self, value):
278360
with self._lock:
279361
self._value = value
280-
self._samples[self._key] = self._value
281-
self._samples.sync()
362+
self._file.write_value(self._key, self._value)
282363

283364
def get(self):
284365
with self._lock:
285366
return self._value
286367

287-
return _ShelveValue
368+
return _MmapedValue
288369

289370

290371
# Should we enable multi-process mode?

prometheus_client/multiprocess.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ def collect(self):
2121
for f in glob.glob(os.path.join(self._path, '*.db')):
2222
parts = os.path.basename(f).split('_')
2323
typ = parts[0]
24-
for key, value in shelve.open(f).items():
24+
d = core._MmapedDict(f)
25+
for key, value in d.read_all_values():
2526
metric_name, name, labelnames, labelvalues = json.loads(key)
2627
metrics.setdefault(metric_name, core.Metric(metric_name, 'Multiprocess metric', typ))
2728
metric = metrics[metric_name]
@@ -30,8 +31,9 @@ def collect(self):
3031
metric._multiprocess_mode = parts[1]
3132
metric.add_sample(name, tuple(zip(labelnames, labelvalues)) + (('pid', pid), ), value)
3233
else:
33-
# The deplucates and labels are fixed in the next for.
34+
# The duplicates and labels are fixed in the next for.
3435
metric.add_sample(name, tuple(zip(labelnames, labelvalues)), value)
36+
d.close()
3537

3638
for metric in metrics.values():
3739
samples = {}

tests/test_multiprocess.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,32 @@ def test_gauge_livesum(self):
109109
self.assertEqual(3, self.registry.get_sample_value('g'))
110110
mark_process_dead(123, os.environ['prometheus_multiproc_dir'])
111111
self.assertEqual(2, self.registry.get_sample_value('g'))
112+
113+
class TestMmapedDict(unittest.TestCase):
114+
def setUp(self):
115+
fd, self.tempfile = tempfile.mkstemp()
116+
os.close(fd)
117+
self.d = core._MmapedDict(self.tempfile)
118+
119+
def test_process_restart(self):
120+
self.d.write_value('abc', 123.0)
121+
self.d.close()
122+
self.d = core._MmapedDict(self.tempfile)
123+
self.assertEqual(123, self.d.read_value('abc'))
124+
self.assertEqual([('abc', 123.0)], list(self.d.read_all_values()))
125+
126+
def test_expansion(self):
127+
key = 'a' * core._INITIAL_MMAP_SIZE
128+
self.d.write_value(key, 123.0)
129+
self.assertEqual([(key, 123.0)], list(self.d.read_all_values()))
130+
131+
def test_multi_expansion(self):
132+
key = 'a' * core._INITIAL_MMAP_SIZE * 4
133+
self.d.write_value('abc', 42.0)
134+
self.d.write_value(key, 123.0)
135+
self.d.write_value('def', 17.0)
136+
self.assertEqual([('abc', 42.0), (key, 123.0), ('def', 17.0)],
137+
list(self.d.read_all_values()))
138+
139+
def tearDown(self):
140+
os.unlink(self.tempfile)

0 commit comments

Comments
 (0)
0