12
12
from .samples import Sample
13
13
from .utils import floatToGoString
14
14
15
+ MP_METRIC_HELP = 'Multiprocess metric'
16
+
15
17
16
18
class MultiProcessCollector (object ):
17
19
"""Collector for files for multi-process mode."""
@@ -33,18 +35,31 @@ def merge(files, accumulate=True):
33
35
But if writing the merged data back to mmap files, use
34
36
accumulate=False to avoid compound accumulation.
35
37
"""
38
+ metrics = MultiProcessCollector ._read_metrics (files )
39
+ return MultiProcessCollector ._accumulate_metrics (metrics , accumulate )
40
+
41
+ @staticmethod
42
+ def _read_metrics (files ):
36
43
metrics = {}
44
+ key_cache = {}
45
+
46
+ def _parse_key (key ):
47
+ val = key_cache .get (key )
48
+ if not val :
49
+ metric_name , name , labels = json .loads (key )
50
+ labels_key = tuple (sorted (labels .items ()))
51
+ val = key_cache [key ] = (metric_name , name , labels , labels_key )
52
+ return val
53
+
37
54
for f in files :
38
55
parts = os .path .basename (f ).split ('_' )
39
56
typ = parts [0 ]
40
- d = MmapedDict (f , read_mode = True )
41
- for key , value in d .read_all_values ():
42
- metric_name , name , labels = json .loads (key )
43
- labels_key = tuple (sorted (labels .items ()))
57
+ for key , value , pos in MmapedDict .read_all_values_from_file (f ):
58
+ metric_name , name , labels , labels_key = _parse_key (key )
44
59
45
60
metric = metrics .get (metric_name )
46
61
if metric is None :
47
- metric = Metric (metric_name , 'Multiprocess metric' , typ )
62
+ metric = Metric (metric_name , MP_METRIC_HELP , typ )
48
63
metrics [metric_name ] = metric
49
64
50
65
if typ == 'gauge' :
@@ -54,43 +69,47 @@ def merge(files, accumulate=True):
54
69
else :
55
70
# The duplicates and labels are fixed in the next for.
56
71
metric .add_sample (name , labels_key , value )
57
- d . close ()
72
+ return metrics
58
73
74
+ @staticmethod
75
+ def _accumulate_metrics (metrics , accumulate ):
59
76
for metric in metrics .values ():
60
77
samples = defaultdict (float )
61
- buckets = {}
78
+ buckets = defaultdict (lambda : defaultdict (float ))
79
+ samples_setdefault = samples .setdefault
62
80
for s in metric .samples :
63
- name , labels , value = s . name , s . labels , s . value
81
+ name , labels , value , timestamp , exemplar = s
64
82
if metric .type == 'gauge' :
65
- without_pid = tuple (l for l in labels if l [0 ] != 'pid' )
83
+ without_pid_key = ( name , tuple ([ l for l in labels if l [0 ] != 'pid' ]) )
66
84
if metric ._multiprocess_mode == 'min' :
67
- current = samples . setdefault (( name , without_pid ) , value )
85
+ current = samples_setdefault ( without_pid_key , value )
68
86
if value < current :
69
- samples [( s . name , without_pid ) ] = value
87
+ samples [without_pid_key ] = value
70
88
elif metric ._multiprocess_mode == 'max' :
71
- current = samples . setdefault (( name , without_pid ) , value )
89
+ current = samples_setdefault ( without_pid_key , value )
72
90
if value > current :
73
- samples [( s . name , without_pid ) ] = value
91
+ samples [without_pid_key ] = value
74
92
elif metric ._multiprocess_mode == 'livesum' :
75
- samples [( name , without_pid ) ] += value
93
+ samples [without_pid_key ] += value
76
94
else : # all/liveall
77
95
samples [(name , labels )] = value
78
96
79
97
elif metric .type == 'histogram' :
80
- bucket = tuple (float (l [1 ]) for l in labels if l [0 ] == 'le' )
81
- if bucket :
82
- # _bucket
83
- without_le = tuple (l for l in labels if l [0 ] != 'le' )
84
- buckets .setdefault (without_le , {})
85
- buckets [without_le ].setdefault (bucket [0 ], 0.0 )
86
- buckets [without_le ][bucket [0 ]] += value
87
- else :
98
+ # A for loop with early exit is faster than a genexpr
99
+ # or a listcomp that ends up building unnecessary things
100
+ for l in labels :
101
+ if l [0 ] == 'le' :
102
+ bucket_value = float (l [1 ])
103
+ # _bucket
104
+ without_le = tuple (l for l in labels if l [0 ] != 'le' )
105
+ buckets [without_le ][bucket_value ] += value
106
+ break
107
+ else : # did not find the `le` key
88
108
# _sum/_count
89
- samples [(s .name , labels )] += value
90
-
109
+ samples [(name , labels )] += value
91
110
else :
92
111
# Counter and Summary.
93
- samples [(s . name , labels )] += value
112
+ samples [(name , labels )] += value
94
113
95
114
# Accumulate bucket values.
96
115
if metric .type == 'histogram' :
0 commit comments