|
3 | 3 | from __future__ import unicode_literals
|
4 | 4 |
|
5 | 5 | import copy
|
6 |
| -import math |
7 | 6 | import json
|
| 7 | +import math |
| 8 | +import mmap |
8 | 9 | import os
|
9 | 10 | import re
|
10 |
| -import shelve |
| 11 | +import struct |
11 | 12 | import types
|
12 | 13 | from timeit import default_timer
|
13 | 14 |
|
|
26 | 27 | _RESERVED_METRIC_LABEL_NAME_RE = re.compile(r'^__.*$')
|
27 | 28 | _INF = float("inf")
|
28 | 29 | _MINUS_INF = float("-inf")
|
| 30 | +_INITIAL_MMAP_SIZE = 1024*1024 |
29 | 31 |
|
30 | 32 |
|
31 | 33 | class CollectorRegistry(object):
|
@@ -241,50 +243,129 @@ def get(self):
|
241 | 243 | with self._lock:
|
242 | 244 | return self._value
|
243 | 245 |
|
| 246 | +class _MmapedDict(object): |
| 247 | + """A dict of doubles, backed by an mmapped file. |
| 248 | +
|
| 249 | + The file starts with a 4 byte int, indicating how much of it is used. |
| 250 | + Then 4 bytes of padding. |
| 251 | + There's then a number of entries, consisting of a 4 byte int which is the |
| 252 | + side of the next field, a utf-8 encoded string key, padding to a 8 byte |
| 253 | + alignment, and then a 8 byte float which is the value. |
| 254 | + """ |
| 255 | + def __init__(self, filename): |
| 256 | + self._lock = Lock() |
| 257 | + self._f = open(filename, 'a+b') |
| 258 | + if os.fstat(self._f.fileno()).st_size == 0: |
| 259 | + self._f.truncate(_INITIAL_MMAP_SIZE) |
| 260 | + self._capacity = os.fstat(self._f.fileno()).st_size |
| 261 | + self._m = mmap.mmap(self._f.fileno(), self._capacity) |
| 262 | + |
| 263 | + self._positions = {} |
| 264 | + self._used = struct.unpack_from(b'i', self._m, 0)[0] |
| 265 | + if self._used == 0: |
| 266 | + self._used = 8 |
| 267 | + struct.pack_into(b'i', self._m, 0, self._used) |
| 268 | + else: |
| 269 | + for key, _, pos in self._read_all_values(): |
| 270 | + self._positions[key] = pos |
| 271 | + |
| 272 | + def _init_value(self, key): |
| 273 | + """Initilize a value. Lock must be held by caller.""" |
| 274 | + encoded = key.encode('utf-8') |
| 275 | + # Pad to be 8-byte aligned. |
| 276 | + padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8)) |
| 277 | + value = struct.pack('i{0}sd'.format(len(padded)).encode(), len(encoded), padded, 0.0) |
| 278 | + while self._used + len(value) > self._capacity: |
| 279 | + self._capacity *= 2 |
| 280 | + self._f.truncate(self._capacity * 2) |
| 281 | + self._m = mmap.mmap(self._f.fileno(), self._capacity) |
| 282 | + self._m[self._used:self._used + len(value)] = value |
| 283 | + |
| 284 | + # Update how much space we've used. |
| 285 | + self._used += len(value) |
| 286 | + struct.pack_into(b'i', self._m, 0, self._used) |
| 287 | + self._positions[key] = self._used - 8 |
| 288 | + |
| 289 | + def _read_all_values(self): |
| 290 | + """Yield (key, value, pos). No locking is performed.""" |
| 291 | + pos = 8 |
| 292 | + while pos < self._used: |
| 293 | + encoded_len = struct.unpack_from(b'i', self._m, pos)[0] |
| 294 | + pos += 4 |
| 295 | + encoded = struct.unpack_from('{0}s'.format(encoded_len).encode(), self._m, pos)[0] |
| 296 | + padded_len = encoded_len + (8 - (encoded_len + 4) % 8) |
| 297 | + pos += padded_len |
| 298 | + value = struct.unpack_from(b'd', self._m, pos)[0] |
| 299 | + yield encoded.decode('utf-8'), value, pos |
| 300 | + pos += 8 |
| 301 | + |
| 302 | + def read_all_values(self): |
| 303 | + """Yield (key, value, pos). No locking is performed.""" |
| 304 | + for k, v, _ in self._read_all_values(): |
| 305 | + yield k, v |
| 306 | + |
| 307 | + def read_value(self, key): |
| 308 | + with self._lock: |
| 309 | + if key not in self._positions: |
| 310 | + self._init_value(key) |
| 311 | + pos = self._positions[key] |
| 312 | + # We assume that reading from an 8 byte aligned value is atomic |
| 313 | + return struct.unpack_from(b'd', self._m, pos)[0] |
| 314 | + |
| 315 | + def write_value(self, key, value): |
| 316 | + with self._lock: |
| 317 | + if key not in self._positions: |
| 318 | + self._init_value(key) |
| 319 | + pos = self._positions[key] |
| 320 | + # We assume that writing to an 8 byte aligned value is atomic |
| 321 | + struct.pack_into(b'd', self._m, pos, value) |
| 322 | + |
| 323 | + def close(self): |
| 324 | + if self._f: |
| 325 | + self._f.close() |
| 326 | + self._f = None |
| 327 | + |
244 | 328 |
|
245 | 329 | def _MultiProcessValue(__pid=os.getpid()):
|
246 | 330 | pid = __pid
|
247 |
| - samples = {} |
248 |
| - samples_lock = Lock() |
| 331 | + files = {} |
| 332 | + files_lock = Lock() |
249 | 333 |
|
250 |
| - class _ShelveValue(object): |
251 |
| - '''A float protected by a mutex backed by a per-process shelve.''' |
| 334 | + class _MmapedValue(object): |
| 335 | + '''A float protected by a mutex backed by a per-process mmaped file.''' |
252 | 336 |
|
253 | 337 | _multiprocess = True
|
254 | 338 |
|
255 | 339 | def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs):
|
256 |
| - with samples_lock: |
257 |
| - if typ == 'gauge': |
258 |
| - file_prefix = typ + '_' + multiprocess_mode |
259 |
| - else: |
260 |
| - file_prefix = typ |
261 |
| - if file_prefix not in samples: |
262 |
| - filename = os.path.join(os.environ['prometheus_multiproc_dir'], '{0}_{1}.db'.format(file_prefix, pid)) |
263 |
| - samples[file_prefix] = shelve.open(filename) |
264 |
| - self._samples = samples[file_prefix] |
| 340 | + if typ == 'gauge': |
| 341 | + file_prefix = typ + '_' + multiprocess_mode |
| 342 | + else: |
| 343 | + file_prefix = typ |
| 344 | + with files_lock: |
| 345 | + if file_prefix not in files: |
| 346 | + filename = os.path.join( |
| 347 | + os.environ['prometheus_multiproc_dir'], '{0}_{1}.db'.format(file_prefix, pid)) |
| 348 | + files[file_prefix] = _MmapedDict(filename) |
| 349 | + self._file = files[file_prefix] |
265 | 350 | self._key = json.dumps((metric_name, name, labelnames, labelvalues))
|
266 |
| - self._value = self._samples.get(self._key, 0.0) |
267 |
| - self._samples[self._key] = self._value |
268 |
| - self._samples.sync() |
| 351 | + self._value = self._file.read_value(self._key) |
269 | 352 | self._lock = Lock()
|
270 | 353 |
|
271 | 354 | def inc(self, amount):
|
272 | 355 | with self._lock:
|
273 | 356 | self._value += amount
|
274 |
| - self._samples[self._key] = self._value |
275 |
| - self._samples.sync() |
| 357 | + self._file.write_value(self._key, self._value) |
276 | 358 |
|
277 | 359 | def set(self, value):
|
278 | 360 | with self._lock:
|
279 | 361 | self._value = value
|
280 |
| - self._samples[self._key] = self._value |
281 |
| - self._samples.sync() |
| 362 | + self._file.write_value(self._key, self._value) |
282 | 363 |
|
283 | 364 | def get(self):
|
284 | 365 | with self._lock:
|
285 | 366 | return self._value
|
286 | 367 |
|
287 |
| - return _ShelveValue |
| 368 | + return _MmapedValue |
288 | 369 |
|
289 | 370 |
|
290 | 371 | # Should we enable multi-process mode?
|
|
0 commit comments