8000 Add mostrecent aggregation to Gauge · prometheus/client_python@20ade3c · GitHub
[go: up one dir, main page]

Skip to content

Commit 20ade3c

Browse files
committed
Add mostrecent aggregation to Gauge
In the multiprocess mode, the process that exposes the metrics needs to aggregate the samples from other processes. Gauge metric allows users to choose the aggregation mode. This implements 'mostrecent' (and 'livemostrecent') mode where the last observed value is exposed. In order to support this, the file format is expanded to store the timestamps in addition to the values. The stored timestamps are read by the reader process and it's used to find the latest value. Closes #847 Consideration on the atomicity: Previously, mmap_dict.py had a comment saying "We assume that reading from an 8 byte aligned value is atomic". With this change, the value write becomes a 16 bytes 8-byte aligned write. The code author tried to find a basis on the original assumption, but couldn't find any. According to write(2), **if a file descriptor is shared**, the write becomes atomic. However, we do not share the file descriptors in the current architecture. Considering that Ruby implementation also does the same and hadn't seen an issue with it, this write atomicity problem might be practically not an issue. See also: * prometheus/client_ruby#172 The approach and naming are taken from client_ruby. * https://github.com/prometheus/client_golang/blob/v1.17.0/prometheus/metric.go#L149-L161 client_golang has an API for setting timestamp already. It explains the use case for the timestamp beyond the client-local aggregation. In order to support the same use case in Python, further changes are needed. Signed-off-by: Masaya Suzuki <draftcode@gmail.com>
1 parent 249490e commit 20ade3c

File tree

6 files changed

+80
-41
lines changed

6 files changed

+80
-41
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -711,9 +711,10 @@ Gauges have several modes they can run in, which can be selected with the `multi
711711
- 'min': Return a single timeseries that is the minimum of the values of all processes (alive or dead).
712712
- 'max': Return a single timeseries that is the maximum of the values of all processes (alive or dead).
713713
- 'sum': Return a single timeseries that is the sum of the values of all processes (alive or dead).
714+
- 'mostrecent': Return a single timeseries that is the most recent value among all processes (alive or dead).
714715

715716
Prepend 'live' to the beginning of the mode to return the same result but only considering living processes
716-
(e.g., 'liveall, 'livesum', 'livemax', 'livemin').
717+
(e.g., 'liveall, 'livesum', 'livemax', 'livemin', 'livemostrecent').
717718

718719
```python
719720
from prometheus_client import Gauge

prometheus_client/metrics.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,8 @@ def f():
346346
d.set_function(lambda: len(my_dict))
347347
"""
348348
_type = 'gauge'
349-
_MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'))
349+
_MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'))
350+
_MOST_RECENT_MODES = frozenset(('mostrecent', 'livemostrecent'))
350351

351352
def __init__(self,
352353
name: str,
@@ -357,7 +358,7 @@ def __init__(self,
357358
unit: str = '',
358359
registry: Optional[CollectorRegistry] = REGISTRY,
359360
_labelvalues: Optional[Sequence[str]] = None,
360-
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'] = 'all',
361+
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all',
361362
):
362363
self._multiprocess_mode = multiprocess_mode
363364
if multiprocess_mode not in self._MULTIPROC_MODES:
@@ -373,6 +374,7 @@ def __init__(self,
373374
_labelvalues=_labelvalues,
374375
)
375376
self._kwargs['multiprocess_mode'] = self._multiprocess_mode
377+
self._is_most_recent = self._multiprocess_mode in self._MOST_RECENT_MODES
376378

377379
def _metric_init(self) -> None:
378380
self._value = values.ValueClass(
@@ -382,18 +384,25 @@ def _metric_init(self) -> None:
382384

383385
def inc(self, amount: float = 1) -> None:
384386
"""Increment gauge by the given amount."""
387+
if self._is_most_recent:
388+
raise RuntimeError("inc must not be used with the mostrecent mode")
385389
self._raise_if_not_observable()
386390
self._value.inc(amount)
387391

388392
def dec(self, amount: float = 1) -> None:
389393
"""Decrement gauge by the given amount."""
394+
if self._is_most_recent:
395+
raise RuntimeError("dec must not be used with the mostrecent mode")
390396
self._raise_if_not_observable()
391397
self._value.inc(-amount)
392398

393399
def set(self, value: float) -> None:
394400
"""Set gauge to the given value."""
395401
self._raise_if_not_observable()
396-
self._value.set(float(value))
402+
if self._is_most_recent:
403+
self._value.set(float(value), timestamp=time.time())
404+
else:
405+
self._value.set(float(value))
397406

398407
def set_to_current_time(self) -> None:
399408
"""Set gauge to the current unixtime."""

prometheus_client/mmap_dict.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,26 @@
66

77
_INITIAL_MMAP_SIZE = 1 << 16
88
_pack_integer_func = struct.Struct(b'i').pack
9-
_pack_double_func = struct.Struct(b'd').pack
9+
_pack_two_doubles_func = struct.Struct(b'dd').pack
1010
_unpack_integer = struct.Struct(b'i').unpack_from
11-
_unpack_double = struct.Struct(b'd').unpack_from
11+
_unpack_two_doubles = struct.Struct(b'dd').unpack_from
1212

1313

1414
# struct.pack_into has atomicity issues because it will temporarily write 0 into
1515
# the mmap, resulting in false reads to 0 when experiencing a lot of writes.
1616
# Using direct assignment solves this issue.
1717

18-
def _pack_double(data, pos, value):
19-
data[pos:pos + 8] = _pack_double_func(value)
18+
19+
def _pack_ 10000 two_doubles(data, pos, value, timestamp):
20+
data[pos:pos + 16] = _pack_two_doubles_func(value, timestamp)
2021

2122

2223
def _pack_integer(data, pos, value):
2324
data[pos:pos + 4] = _pack_integer_func(value)
2425

2526

2627
def _read_all_values(data, used=0):
27-
"""Yield (key, value, pos). No locking is performed."""
28+
"""Yield (key, value, timestamp, pos). No locking is performed."""
2829

2930
if used <= 0:
3031
# If not valid `used` value is passed in, read it from the file.
@@ -41,9 +42,9 @@ def _read_all_values(data, used=0):
4142
encoded_key = data[pos:pos + encoded_len]
4243
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
4344
pos += padded_len
44-
value = _unpack_double(data, pos)[0]
45-
yield encoded_key.decode('utf-8'), value, pos
46-
pos += 8
45+
value, timestamp = _unpack_two_doubles(data, pos)
46+
yield encoded_key.decode('utf-8'), value, timestamp, pos
47+
pos += 16
4748

4849

4950
class MmapedDict:
@@ -53,7 +54,8 @@ class MmapedDict:
5354
Then 4 bytes of padding.
5455
There's then a number of entries, consisting of a 4 byte int which is the
5556
size of the next field, a utf-8 encoded string key, padding to a 8 byte
56-
alignment, and then a 8 byte float which is the value.
57+
alignment, and then a 8 byte float which is the value and a 8 byte float
58+
which is a UNIX timestamp in seconds.
5759
5860
Not thread safe.
5961
"""
@@ -76,7 +78,7 @@ def __init__(self, filename, read_mode=False):
7678
_pack_integer(self._m, 0, self._used)
7779
else:
7880
if not read_mode:
79-
for key, _, pos in self._read_all_values():
81+
for key, _, _, pos in self._read_all_values():
8082
self._positions[key] = pos
8183

8284
@staticmethod
@@ -95,7 +97,7 @@ def _init_value(self, key):
9597
encoded = key.encode('utf-8')
9698
# Pad to be 8-byte aligned.
9799
padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8))
98-
value = struct.pack(f'i{len(padded)}sd'.encode(), len(encoded), padded, 0.0)
100+
value = struct.pack(f'i{len(padded)}sdd'.encode(), len(encoded), padded, 0.0, 0.0)
99101
while self._used + len(value) > self._capacity:
100102
self._capacity *= 2
101103
self._f.truncate(self._capacity)
@@ -105,30 +107,28 @@ def _init_value(self, key):
105107
# Update how much space we've used.
106108
self._used += len(value)
107109
_pack_integer(self._m, 0, self._used)
108-
self._positions[key] = self._used - 8
110+
self._positions[key] = self._used - 16
109111

110112
def _read_all_values(self):
111113
"""Yield (key, value, pos). No locking is performed."""
112114
return _read_all_values(data=self._m, used=self._used)
113115

114116
def read_all_values(self):
115-
"""Yield (key, value). No locking is performed."""
116-
for k, v, _ in self._read_all_values():
117-
yield k, v
117+
"""Yield (key, value, timestamp). No locking is performed."""
118+
for k, v, ts, _ in self._read_all_values():
119+
yield k, v, ts
118120

119121
def read_value(self, key):
120122
if key not in self._positions:
121123
self._init_value(key)
122124
pos = self._positions[key]
123-
# We assume that reading from an 8 byte aligned value is atomic
124-
return _unpack_double(self._m, pos)[0]
125+
return _unpack_two_doubles(self._m, pos)
125126

126-
def write_value(self, key, value):
127+
def write_value(self, key, value, timestamp):
127128
if key not in self._positions:
128129
self._init_value(key)
129130
pos = self._positions[key]
130-
# We assume that writing to an 8 byte aligned value is atomic
131-
_pack_double(self._m, pos, value)
131+
_pack_two_doubles(self._m, pos, value, timestamp)
132132

133133
def close(self):
134134
if self._f:

prometheus_client/multiprocess.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def _parse_key(key):
6868
# the file is missing
6969
continue
7070
raise
71-
for key, value, _ in file_values:
71+
for key, value, timestamp, _ in file_values:
7272
metric_name, name, labels, labels_key, help_text = _parse_key(key)
7373

7474
metric = metrics.get(metric_name)
@@ -79,7 +79,7 @@ def _parse_key(key):
7979
if typ == 'gauge':
8080
pid = parts[2][:-3]
8181
metric._multiprocess_mode = parts[1]
82-
metric.add_sample(name, labels_key + (('pid', pid),), value)
82+
metric.add_sample(name, labels_key + (('pid', pid),), value, timestamp)
8383
else:
8484
# The duplicates and labels are fixed in the next for.
8585
metric.add_sample(name, labels_key, value)
@@ -89,6 +89,7 @@ def _parse_key(key):
8989
def _accumulate_metrics(metrics, accumulate):
9090
for metric in metrics.values():
9191
samples = defaultdict(float)
92+
sample_timestamps = defaultdict(float)
9293
buckets = defaultdict(lambda: defaultdict(float))
9394
samples_setdefault = samples.setdefault
9495
for s in metric.samples:
@@ -105,6 +106,12 @@ def _accumulate_metrics(metrics, accumulate):
105106
samples[without_pid_key] = value
106107
elif metric._multiprocess_mode in ('sum', 'livesum'):
107108
samples[without_pid_key] += value
109+
elif metric._multiprocess_mode in ('mostrecent', 'livemostrecent'):
110+
current_timestamp = sample_timestamps[without_pid_key]
111+
timestamp = float(timestamp or 0)
112+
if current_timestamp < timestamp:
113+
samples[without_pid_key] = value
114+
sample_timestamps[without_pid_key] = timestamp
108115
else: # all/liveall
109116
samples[(name, labels)] = value
110117

prometheus_client/values.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def inc(self, amount):
1919
with self._lock:
2020
self._value += amount
2121

22-
def set(self, value):
22+
def set(self, value, timestamp=None):
2323
with self._lock:
2424
self._value = value
2525

@@ -82,7 +82,7 @@ def __reset(self):
8282
files[file_prefix] = MmapedDict(filename)
8383
self._file = files[file_prefix]
8484
self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
85-
self._value = self._file.read_value(self._key)
85+
self._value, self._timestamp = self._file.read_value(self._key)
8686

8787
def __check_for_pid_change(self):
8888
actual_pid = process_identifier()
@@ -99,13 +99,15 @@ def inc(self, amount):
9999
with lock:
100100
self.__check_for_pid_change()
101101
self._value += amount
102-
self._file.write_value(self._key, self._value)
102+
self._timestamp = 0.0
103+
self._file.write_value(self._key, self._value, self._timestamp)
103104

104-
def set(self, value):
105+
def set(self, value, timestamp=None):
105106
with lock:
106107
self.__check_for_pid_change()
107108
self._value = value
108-
self._file.write_value(self._key, self._value)
109+
self._timestamp = timestamp or 0.0
110+
self._file.write_value(self._key, self._value, self._timestamp)
109111

110112
def set_exemplar(self, exemplar):
111113
# TODO: Implement exemplars for multiprocess mode.

tests/test_multiprocess.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,26 @@ def test_gauge_livesum(self):
185185
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
186186
self.assertEqual(2, self.registry.get_sample_value('g'))
187187

188+
def test_gauge_mostrecent(self):
189+
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent')
190+
values.ValueClass = MultiProcessValue(lambda: 456)
191+
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent')
192+
g2.set(2)
193+
g1.set(1)
194+
self.assertEqual(1, self.registry.get_sample_value('g'))
195+
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
196+
self.assertEqual(1, self.registry.get_sample_value('g'))
197+
198+
def test_gauge_livemostrecent(self):
199+
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent')
200+
values.ValueClass = MultiProcessValue(lambda: 456)
201+
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent')
202+
g2.set(2)
203+
g1.set(1)
204+
self.assertEqual(1, self.registry.get_sample_value('g'))
205+
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
206+
self.assertEqual(2, self.registry.get_sample_value('g'))
207+
188208
def test_namespace_subsystem(self):
189209
c1 = Counter('c', 'help', registry=None, namespace='ns', subsystem='ss')
190210
c1.inc(1)
@@ -369,28 +389,28 @@ def setUp(self):
369389
self.d = mmap_dict.MmapedDict(self.tempfile)
370390

371391
def test_process_restart(self):
372-
self.d.write_value('abc', 123.0)
392+
self.d.write_value('abc', 123.0, 987.0)
373393
self.d.close()
374394
self.d = mmap_dict.MmapedDict(self.tempfile)
375-
self.assertEqual(123, self.d.read_value('abc'))
376-
self.assertEqual([('abc', 123.0)], list(self.d.read_all_values()))
395+
self.assertEqual((123, 987.0), self.d.read_value('abc'))
396+
self.assertEqual([('abc', 123.0, 987.0)], list(self.d.read_all_values()))
377397

378398
def test_expansion(self):
379399
key = 'a' * mmap_dict._INITIAL_MMAP_SIZE
380-
self.d.write_value(key, 123.0)
381-
self.assertEqual([(key, 123.0)], list(self.d.read_all_values()))
400+
self.d.write_value(key, 123.0, 987.0)
401+
self.assertEqual([(key, 123.0, 987.0)], list(self.d.read_all_values()))
382402

383403
def test_multi_expansion(self):
384404
key = 'a' * mmap_dict._INITIAL_MMAP_SIZE * 4
385-
self.d.write_value('abc', 42.0)
386-
self.d.write_value(key, 123.0)
387-
self.d.write_value('def', 17.0)
405+
self.d.write_value('abc', 42.0, 987.0)
406+
self.d.write_value(key, 123.0, 876.0)
407+
self.d.write_value('def', 17.0, 765.0)
388408
self.assertEqual(
389-
[('abc', 42.0), (key, 123.0), ('def', 17.0)],
409+
[('abc', 42.0, 987.0), (key, 123.0, 876.0), ('def', 17.0, 765.0)],
390410
list(self.d.read_all_values()))
391411

392412
def test_corruption_detected(self):
393-
self.d.write_value('abc', 42.0)
413+
self.d.write_value('abc', 42.0, 987.0)
394414
# corrupt the written data
395415
self.d._m[8:16] = b'somejunk'
396416
with self.assertRaises(RuntimeError):

0 commit comments

Comments
 (0)
0