4
4
5
5
import copy
6
6
import math
7
+ import json
8
+ import os
7
9
import re
10
+ import shelve
8
11
import time
9
12
import types
10
13
@@ -219,7 +222,9 @@ def add_metric(self, labels, buckets, sum_value):
219
222
class _MutexValue (object ):
220
223
'''A float protected by a mutex.'''
221
224
222
- def __init__ (self , name , labelnames , labelvalues ):
225
+ _multiprocess = False
226
+
227
+ def __init__ (self , typ , metric_name , name , labelnames , labelvalues , ** kwargs ):
223
228
self ._value = 0.0
224
229
self ._lock = Lock ()
225
230
@@ -235,7 +240,60 @@ def get(self):
235
240
with self ._lock :
236
241
return self ._value
237
242
238
- _ValueClass = _MutexValue
243
+
244
+ def _MultiProcessValue (__pid = os .getpid ()):
245
+ pid = __pid
246
+ samples = {}
247
+ samples_lock = Lock ()
248
+
249
+ class _ShelveValue (object ):
250
+ '''A float protected by a mutex backed by a per-process shelve.'''
251
+
252
+ _multiprocess = True
253
+
254
+ def __init__ (self , typ , metric_name , name , labelnames , labelvalues , multiprocess_mode = '' , ** kwargs ):
255
+ with samples_lock :
256
+ if typ == 'gauge' :
257
+ file_prefix = typ + '_' + multiprocess_mode
258
+ else :
259
+ file_prefix = typ
260
+ if file_prefix not in samples :
261
+ filename = os .path .join (os .environ ['prometheus_multiproc_dir' ], '{0}_{1}.db' .format (file_prefix , pid ))
262
+ samples [file_prefix ] = shelve .open (filename )
263
+ self ._samples = samples [file_prefix ]
264
+ self ._key = json .dumps ((metric_name , name , labelnames , labelvalues ))
265
+ self ._value = self ._samples .get (self ._key , 0.0 )
266
+ self ._samples [self ._key ] = self ._value
267
+ self ._samples .sync ()
268
+ self ._lock = Lock ()
269
+
270
+ def inc (self , amount ):
271
+ with self ._lock :
272
+ self ._value += amount
273
+ self ._samples [self ._key ] = self ._value
274
+ self ._samples .sync ()
275
+
276
+ def set (self , value ):
277
+ with self ._lock :
278
+ self ._value = value
279
+ self ._samples [self ._key ] = self ._value
280
+ self ._samples .sync ()
281
+
282
+ def get (self ):
283
+ with self ._lock :
284
+ return self ._value
285
+
286
+ return _ShelveValue
287
+
288
+
289
+ # Should we enable multi-process mode?
290
+ # This needs to be chosen before the first metric is constructed,
291
+ # and as that may be in some arbitrary library the user/admin has
292
+ # no control over we use an enviroment variable.
293
+ if 'prometheus_multiproc_dir' in os .environ :
294
+ _ValueClass = _MultiProcessValue ()
295
+ else :
296
+ _ValueClass = _MutexValue
239
297
240
298
241
299
class _LabelWrapper (object ):
@@ -383,7 +441,7 @@ def f():
383
441
_reserved_labelnames = []
384
442
385
443
def __init__ (self , name , labelnames , labelvalues ):
386
- self ._value = _ValueClass (name , labelnames , labelvalues )
444
+ self ._value = _ValueClass (self . _type , name , name , labelnames , labelvalues )
387
445
388
446
def inc (self , amount = 1 ):
389
447
'''Increment counter by the given amount.'''
@@ -464,8 +522,12 @@ def f():
464
522
_type = 'gauge'
465
523
_reserved_labelnames = []
466
524
467
- def __init__ (self , name , labelnames , labelvalues ):
468
- self ._value = _ValueClass (name , labelnames , labelvalues )
525
+ def __init__ (self , name , labelnames , labelvalues , multiprocess_mode = 'all' ):
526
+ if (_ValueClass ._multiprocess
527
+ and multiprocess_mode not in ['min' , 'max' , 'livesum' , 'liveall' , 'all' ]):
528
+ raise ValueError ('Invalid multiprocess mode: ' + multiprocess_mode )
529
+ self ._value = _ValueClass (self ._type , name , name , labelnames ,
530
+ labelvalues , multiprocess_mode = multiprocess_mode )
469
531
470
532
def inc (self , amount = 1 ):
471
533
'''Increment gauge by the given amount.'''
@@ -585,8 +647,8 @@ def create_response(request):
585
647
_reserved_labelnames = ['quantile' ]
586
648
587
649
def __init__ (self , name , labelnames , labelvalues ):
588
- self ._count = _ValueClass (name + '_count' , labelnames , labelvalues )
589
- self ._sum = _ValueClass (name + '_sum' , labelnames , labelvalues )
650
+ self ._count = _ValueClass (self . _type , name , name + '_count' , labelnames , labelvalues )
651
+ self ._sum = _ValueClass (self . _type , name , name + '_sum' , labelnames , labelvalues )
590
652
591
653
def observe (self , amount ):
592
654
'''Observe the given amount.'''
@@ -678,7 +740,7 @@ def create_response(request):
678
740
_reserved_labelnames = ['histogram' ]
679
741
680
742
def __init__ (self , name , labelnames , labelvalues , buckets = (.005 , .01 , .025 , .05 , .075 , .1 , .25 , .5 , .75 , 1.0 , 2.5 , 5.0 , 7.5 , 10.0 , _INF )):
681
- self ._sum = _ValueClass (name + '_sum' , labelnames , labelvalues )
743
+ self ._sum = _ValueClass (self . _type , name , name + '_sum' , labelnames , labelvalues )
682
744
buckets = [float (b ) for b in buckets ]
683
745
if buckets != sorted (buckets ):
684
746
# This is probably an error on the part of the user,
@@ -692,7 +754,7 @@ def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05,
692
754
self ._buckets = []
693
755
bucket_labelnames = labelnames + ('le' ,)
694
756
for b in buckets :
695
- self ._buckets .append (_ValueClass (name + '_bucket' , bucket_labelnames , labelvalues + (_floatToGoString (b ),)))
757
+ self ._buckets .append (_ValueClass (self . _type , name , name + '_bucket' , bucket_labelnames , labelvalues + (_floatToGoString (b ),)))
696
758
697
759
def observe (self , amount ):
698
760
'''Observe the given amount.'''
0 commit comments