4
4
5
5
import copy
6
6
import math
7
+ import json
8
+ import os
7
9
import re
10
+ import shelve
8
11
import types
9
12
from timeit import default_timer
10
13
@@ -220,7 +223,9 @@ def add_metric(self, labels, buckets, sum_value):
220
223
class _MutexValue (object ):
221
224
'''A float protected by a mutex.'''
222
225
223
- def __init__ (self , name , labelnames , labelvalues ):
226
+ _multiprocess = False
227
+
228
+ def __init__ (self , typ , metric_name , name , labelnames , labelvalues , ** kwargs ):
224
229
self ._value = 0.0
225
230
self ._lock = Lock ()
226
231
@@ -236,7 +241,60 @@ def get(self):
236
241
with self ._lock :
237
242
return self ._value
238
243
239
- _ValueClass = _MutexValue
244
+
245
+ def _MultiProcessValue (__pid = os .getpid ()):
246
+ pid = __pid
247
+ samples = {}
248
+ samples_lock = Lock ()
249
+
250
+ class _ShelveValue (object ):
251
+ '''A float protected by a mutex backed by a per-process shelve.'''
252
+
253
+ _multiprocess = True
254
+
255
+ def __init__ (self , typ , metric_name , name , labelnames , labelvalues , multiprocess_mode = '' , ** kwargs ):
256
+ with samples_lock :
257
+ if typ == 'gauge' :
258
+ file_prefix = typ + '_' + multiprocess_mode
259
+ else :
260
+ file_prefix = typ
261
+ if file_prefix not in samples :
262
+ filename = os .path .join (os .environ ['prometheus_multiproc_dir' ], '{0}_{1}.db' .format (file_prefix , pid ))
263
+ samples [file_prefix ] = shelve .open (filename )
264
+ self ._samples = samples [file_prefix ]
265
+ self ._key = json .dumps ((metric_name , name , labelnames , labelvalues ))
266
+ self ._value = self ._samples .get (self ._key , 0.0 )
267
+ self ._samples [self ._key ] = self ._value
268
+ self ._samples .sync ()
269
+ self ._lock = Lock ()
270
+
271
+ def inc (self , amount ):
272
+ with self ._lock :
273
+ self ._value += amount
274
+ self ._samples [self ._key ] = self ._value
275
+ self ._samples .sync ()
276
+
277
+ def set (self , value ):
278
+ with self ._lock :
279
+ self ._value = value
280
+ self ._samples [self ._key ] = self ._value
281
+ self ._samples .sync ()
282
+
283
+ def get (self ):
284
+ with self ._lock :
285
+ return self ._value
286
+
287
+ return _ShelveValue
288
+
289
+
290
+ # Should we enable multi-process mode?
291
+ # This needs to be chosen before the first metric is constructed,
292
+ # and as that may be in some arbitrary library the user/admin has
293
+ # no control over we use an enviroment variable.
294
+ if 'prometheus_multiproc_dir' in os .environ :
295
+ _ValueClass = _MultiProcessValue ()
296
+ else :
297
+ _ValueClass = _MutexValue
240
298
241
299
242
300
class _LabelWrapper (object ):
@@ -387,7 +445,7 @@ def f():
387
445
_reserved_labelnames = []
388
446
389
447
def __init__ (self , name , labelnames , labelvalues ):
390
- self ._value = _ValueClass (name , labelnames , labelvalues )
448
+ self ._value = _ValueClass (self . _type , name , name , labelnames , labelvalues )
391
449
392
450
def inc (self , amount = 1 ):
393
451
'''Increment counter by the given amount.'''
@@ -449,8 +507,12 @@ def f():
449
507
_type = 'gauge'
450
508
_reserved_labelnames = []
451
509
452
- def __init__ (self , name , labelnames , labelvalues ):
453
- self ._value = _ValueClass (name , labelnames , labelvalues )
510
+ def __init__ (self , name , labelnames , labelvalues , multiprocess_mode = 'all' ):
511
+ if (_ValueClass ._multiprocess
512
+ and multiprocess_mode not in ['min' , 'max' , 'livesum' , 'liveall' , 'all' ]):
513
+ raise ValueError ('Invalid multiprocess mode: ' + multiprocess_mode )
514
+ self ._value = _ValueClass (self ._type , name , name , labelnames ,
515
+ labelvalues , multiprocess_mode = multiprocess_mode )
454
516
455
517
def inc (self , amount = 1 ):
456
518
'''Increment gauge by the given amount.'''
@@ -533,8 +595,8 @@ def create_response(request):
533
595
_reserved_labelnames = ['quantile' ]
534
596
535
597
def __init__ (self , name , labelnames , labelvalues ):
536
- self ._count = _ValueClass (name + '_count' , labelnames , labelvalues )
537
- self ._sum = _ValueClass (name + '_sum' , labelnames , labelvalues )
598
+ self ._count = _ValueClass (self . _type , name , name + '_count' , labelnames , labelvalues )
599
+ self ._sum = _ValueClass (self . _type , name , name + '_sum' , labelnames , labelvalues )
538
600
539
601
def observe (self , amount ):
540
602
'''Observe the given amount.'''
@@ -607,7 +669,7 @@ def create_response(request):
607
669
_reserved_labelnames = ['histogram' ]
608
670
609
671
def __init__ (self , name , labelnames , labelvalues , buckets = (.005 , .01 , .025 , .05 , .075 , .1 , .25 , .5 , .75 , 1.0 , 2.5 , 5.0 , 7.5 , 10.0 , _INF )):
610
- self ._sum = _ValueClass (name + '_sum' , labelnames , labelvalues )
672
+ self ._sum = _ValueClass (self . _type , name , name + '_sum' , labelnames , labelvalues )
611
673
buckets = [float (b ) for b in buckets ]
612
674
if buckets != sorted (buckets ):
613
675
# This is probably an error on the part of the user,
@@ -621,7 +683,7 @@ def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05,
621
683
self ._buckets = []
622
684
bucket_labelnames = labelnames + ('le' ,)
623
685
for b in buckets :
624
- self ._buckets .append (_ValueClass (name + '_bucket' , bucket_labelnames , labelvalues + (_floatToGoString (b ),)))
686
+ self ._buckets .append (_ValueClass (self . _type , name , name + '_bucket' , bucket_labelnames , labelvalues + (_floatToGoString (b ),)))
625
687
626
688
def observe (self , amount ):
627
689
'''Observe the given amount.'''
0 commit comments