3
3
from __future__ import unicode_literals
4
4
5
5
import copy
6
+ import json
6
7
import math
8
+ import mmap
9
+ import os
7
10
import re
11
+ import struct
8
12
import types
9
13
from timeit import default_timer
10
14
23
27
_RESERVED_METRIC_LABEL_NAME_RE = re .compile (r'^__.*$' )
24
28
_INF = float ("inf" )
25
29
_MINUS_INF = float ("-inf" )
30
+ _INITIAL_MMAP_SIZE = 1024 * 1024
26
31
27
32
28
33
class CollectorRegistry (object ):
@@ -220,7 +225,9 @@ def add_metric(self, labels, buckets, sum_value):
220
225
class _MutexValue (object ):
221
226
'''A float protected by a mutex.'''
222
227
223
- def __init__ (self , name , labelnames , labelvalues ):
228
+ _multiprocess = False
229
+
230
+ def __init__ (self , typ , metric_name , name , labelnames , labelvalues , ** kwargs ):
224
231
self ._value = 0.0
225
232
self ._lock = Lock ()
226
233
@@ -236,7 +243,139 @@ def get(self):
236
243
with self ._lock :
237
244
return self ._value
238
245
239
- _ValueClass = _MutexValue
246
+ class _MmapedDict (object ):
247
+ """A dict of doubles, backed by an mmapped file.
248
+
249
+ The file starts with a 4 byte int, indicating how much of it is used.
250
+ Then 4 bytes of padding.
251
+ There's then a number of entries, consisting of a 4 byte int which is the
252
+ side of the next field, a utf-8 encoded string key, padding to a 8 byte
253
+ alignment, and then a 8 byte float which is the value.
254
+ """
255
+ def __init__ (self , filename ):
256
+ self ._lock = Lock ()
257
+ self ._f = open (filename , 'a+b' )
258
+ if os .fstat (self ._f .fileno ()).st_size == 0 :
259
+ self ._f .truncate (_INITIAL_MMAP_SIZE )
260
+ self ._capacity = os .fstat (self ._f .fileno ()).st_size
261
+ self ._m = mmap .mmap (self ._f .fileno (), self ._capacity )
262
+
263
+ self ._positions = {}
264
+ self ._used = struct .unpack_from (b'i' , self ._m , 0 )[0 ]
265
+ if self ._used == 0 :
266
+ self ._used = 8
267
+ struct .pack_into (b'i' , self ._m , 0 , self ._used )
268
+ else :
269
+ for key , _ , pos in self ._read_all_values ():
270
+ self ._positions [key ] = pos
271
+
272
+ def _init_value (self , key ):
273
+ """Initilize a value. Lock must be held by caller."""
274
+ encoded = key .encode ('utf-8' )
275
+ # Pad to be 8-byte aligned.
276
+ padded = encoded + (b' ' * (8 - (len (encoded ) + 4 ) % 8 ))
277
+ value = struct .pack ('i{0}sd' .format (len (padded )).encode (), len (encoded ), padded , 0.0 )
278
+ while self ._used + len (value ) > self ._capacity :
279
+ self ._capacity *= 2
280
+ self ._f .truncate (self ._capacity * 2 )
281
+ self ._m = mmap .mmap (self ._f .fileno (), self ._capacity )
282
+ self ._m [self ._used :self ._used + len (value )] = value
283
+
284
+ # Update how much space we've used.
285
+ self ._used += len (value )
286
+ struct .pack_into (b'i' , self ._m , 0 , self ._used )
287
+ self ._positions [key ] = self ._used - 8
288
+
289
+ def _read_all_values (self ):
290
+ """Yield (key, value, pos). No locking is performed."""
291
+ pos = 8
292
+ while pos < self ._used :
293
+ encoded_len = struct .unpack_from (b'i' , self ._m , pos )[0 ]
294
+ pos += 4
295
+ encoded = struct .unpack_from ('{0}s' .format (encoded_len ).encode (), self ._m , pos )[0 ]
296
+ padded_len = encoded_len + (8 - (encoded_len + 4 ) % 8 )
297
+ pos += padded_len
298
+ value = struct .unpack_from (b'd' , self ._m , pos )[0 ]
299
+ yield encoded .decode ('utf-8' ), value , pos
300
+ pos += 8
301
+
302
+ def read_all_values (self ):
303
+ """Yield (key, value, pos). No locking is performed."""
304
+ for k , v , _ in self ._read_all_values ():
305
+ yield k , v
306
+
307
+ def read_value (self , key ):
308
+ with self ._lock :
309
+ if key not in self ._positions :
310
+ self ._init_value (key )
311
+ pos = self ._positions [key ]
312
+ # We assume that reading from an 8 byte aligned value is atomic
313
+ return struct .unpack_from (b'd' , self ._m , pos )[0 ]
314
+
315
+ def write_value (self , key , value ):
316
+ with self ._lock :
317
+ if key not in self ._positions :
318
+ self ._init_value (key )
319
+ pos = self ._positions [key ]
320
+ # We assume that writing to an 8 byte aligned value is atomic
321
+ struct .pack_into (b'd' , self ._m , pos , value )
322
+
323
+ def close (self ):
324
+ if self ._f :
325
+ self ._f .close ()
326
+ self ._f = None
327
+
328
+
329
+ def _MultiProcessValue (__pid = os .getpid ()):
330
+ pid = __pid
331
+ files = {}
332
+ files_lock = Lock ()
333
+
334
+ class _MmapedValue (object ):
335
+ '''A float protected by a mutex backed by a per-process mmaped file.'''
336
+
337
+ _multiprocess = True
338
+
339
+ def __init__ (self , typ , metric_name , name , labelnames , labelvalues , multiprocess_mode = '' , ** kwargs ):
340
+ if typ == 'gauge' :
341
+ file_prefix = typ + '_' + multiprocess_mode
342
+ else :
343
+ file_prefix = typ
344
+ with files_lock :
345
+ if file_prefix not in files :
346
+ filename = os .path .join (
347
+ os .environ ['prometheus_multiproc_dir' ], '{0}_{1}.db' .format (file_prefix , pid ))
348
+ files [file_prefix ] = _MmapedDict (filename )
349
+ self ._file = files [file_prefix ]
350
+ self ._key = json .dumps ((metric_name , name , labelnames , labelvalues ))
351
+ self ._value = self ._file .read_value (self ._key )
352
+ self ._lock = Lock ()
353
+
354
+ def inc (self , amount ):
355
+ with self ._lock :
356
+ self ._value += amount
357
+ self ._file .write_value (self ._key , self ._value )
358
+
359
+ def set (self , value ):
360
+ with self ._lock :
361
+ self ._value = value
D96B
td>362
+ self ._file .write_value (self ._key , self ._value )
363
+
364
+ def get (self ):
365
+ with self ._lock :
366
+ return self ._value
367
+
368
+ return _MmapedValue
369
+
370
+
371
+ # Should we enable multi-process mode?
372
+ # This needs to be chosen before the first metric is constructed,
373
+ # and as that may be in some arbitrary library the user/admin has
374
+ # no control over we use an enviroment variable.
375
+ if 'prometheus_multiproc_dir' in os .environ :
376
+ _ValueClass = _MultiProcessValue ()
377
+ else :
378
+ _ValueClass = _MutexValue
240
379
241
380
242
381
class _LabelWrapper (object ):
@@ -387,7 +526,7 @@ def f():
387
526
_reserved_labelnames = []
388
527
389
528
def __init__ (self , name , labelnames , labelvalues ):
390
- self ._value = _ValueClass (name , labelnames , labelvalues )
529
+ self ._value = _ValueClass (self . _type , name , name , labelnames , labelvalues )
391
530
392
531
def inc (self , amount = 1 ):
393
532
'''Increment counter by the given amount.'''
@@ -449,8 +588,12 @@ def f():
449
588
_type = 'gauge'
450
589
_reserved_labelnames = []
451
590
452
- def __init__ (self , name , labelnames , labelvalues ):
453
- self ._value = _ValueClass (name , labelnames , labelvalues )
591
+ def __init__ (self , name , labelnames , labelvalues , multiprocess_mode = 'all' ):
592
+ if (_ValueClass ._multiprocess
593
+ and multiprocess_mode not in ['min' , 'max' , 'livesum' , 'liveall' , 'all' ]):
594
+ raise ValueError ('Invalid multiprocess mode: ' + multiprocess_mode )
595
+ self ._value = _ValueClass (self ._type , name , name , labelnames ,
596
+ labelvalues , multiprocess_mode = multiprocess_mode )
454
597
455
598
def inc (self , amount = 1 ):
456
599
'''Increment gauge by the given amount.'''
@@ -533,8 +676,8 @@ def create_response(request):
533
676
_reserved_labelnames = ['quantile' ]
534
677
535
678
def __init__ (self , name , labelnames , labelvalues ):
536
- self ._count = _ValueClass (name + '_count' , labelnames , labelvalues )
537
- self ._sum = _ValueClass (name + '_sum' , labelnames , labelvalues )
679
+ self ._count = _ValueClass (self . _type , name , name + '_count' , labelnames , labelvalues )
680
+ self ._sum = _ValueClass (self . _type , name , name + '_sum' , labelnames , labelvalues )
538
681
539
682
def observe (self , amount ):
540
683
'''Observe the given amount.'''
@@ -607,7 +750,7 @@ def create_response(request):
607
750
_reserved_labelnames = ['histogram' ]
608
751
609
752
def __init__ (self , name , labelnames , labelvalues , buckets = (.005 , .01 , .025 , .05 , .075 , .1 , .25 , .5 , .75 , 1.0 , 2.5 , 5.0 , 7.5 , 10.0 , _INF )):
610
- self ._sum = _ValueClass (name + '_sum' , labelnames , labelvalues )
753
+ self ._sum = _ValueClass (self . _type , name , name + '_sum' , labelnames , labelvalues )
611
754
buckets = [float (b ) for b in buckets ]
612
755
if buckets != sorted (buckets ):
613
756
# This is probably an error on the part of the user,
@@ -621,7 +764,7 @@ def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05,
621
764
self ._buckets = []
622
765
bucket_labelnames = labelnames + ('le' ,)
623
766
for b in buckets :
624
- self ._buckets .append (_ValueClass (name + '_bucket' , bucket_labelnames , labelvalues + (_floatToGoString (b ),)))
767
+ self ._buckets .append (_ValueClass (self . _type , name , name + '_bucket' , bucket_labelnames , labelvalues + (_floatToGoString (b ),)))
625
768
626
769
def observe (self , amount ):
627
770
'''Observe the given amount.'''
0 commit comments