8000 Thread safety done right (#290) · mpitt/client_python@889a6fb · GitHub
[go: up one dir, main page]

Skip to content

Commit 889a6fb

Browse files
haizaarbrian-brazil
authored andcommitted
Thread safety done right (prometheus#290)
* Multithreading support for time() decorators Fixes prometheus#287 Signed-off-by: Zaar Hai <haizaar@haizaar.com> * Installing futures package for Python 2.x Required for multithreading tests. Signed-off-by: Zaar Hai <haizaar@haizaar.com> * Py2.x unittest compatability Signed-off-by: Zaar Hai <haizaar@haizaar.com> * Fixing text environment for Py27 multithread Signed-off-by: Zaar Hai <haizaar@haizaar.com> * pypy needs futures for testing as well It's a 2.7 version of Python lang. Signed-off-by: Zaar Hai <haizaar@haizaar.com> * Ensuring that different instances of timer do not interfere Signed-off-by: Zaar Hai <haizaar@haizaar.com> * Python2.6 compliance Signed-off-by: Zaar Hai <haizaar@haizaar.com> Python2.6 compliance take 2 Signed-off-by: Zaar Hai <haizaar@haizaar.com> * Using new object instead of thread local storage It makes time() decorated functions reentrant. It's a bit slower than TLS version, but for only 0.12us (micro) per iteration - it worth the reentrancy and it's also IMHO a more clear and pythonic. Signed-off-by: Zaar Hai <haizaar@haizaar.com>
1 parent f2436ca commit 889a6fb

File tree

3 files changed

+125
-46
lines changed

3 files changed

+125
-46
lines changed

prometheus_client/core.py

Lines changed: 13 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,7 @@ def time(self):
765765
766766
Can be used as a function decorator or context manager.
767767
'''
768-
return _GaugeTimer(self)
768+
return _Timer(self.set)
769769

770770
def set_function(self, f):
771771
'''Call the provided function to return the Gauge value.
@@ -829,7 +829,7 @@ def time(self):
829829
830830
Can be used as a function decorator or context manager.
831831
'''
832-
return _SummaryTimer(self)
832+
return _Timer(self.observe)
833833

834834
def _samples(self):
835835
return (
@@ -919,7 +919,7 @@ def time(self):
919919
920920
Can be used as a function decorator or context manager.
921921
'''
922-
return _HistogramTimer(self)
922+
return _Timer(self.observe)
923923

924924
def _samples(self):
925925
samples = []
@@ -932,24 +932,6 @@ def _samples(self):
932932
8000 return tuple(samples)
933933

934934

935-
class _HistogramTimer(object):
936-
def __init__(self, histogram):
937-
self._histogram = histogram
938-
939-
def __enter__(self):
940-
self._start = default_timer()
941-
942-
def __exit__(self, typ, value, traceback):
943-
# Time can go backwards.
944-
self._histogram.observe(max(default_timer() - self._start, 0))
945-
946-
def __call__(self, f):
947-
def wrapped(func, *args, **kwargs):
948-
with self:
949-
return func(*args, **kwargs)
950-
return decorate(f, wrapped)
951-
952-
953935
class _ExceptionCounter(object):
954936
def __init__(self, counter, exception):
955937
self._counter = counter
@@ -986,37 +968,25 @@ def wrapped(func, *args, **kwargs):
986968
return decorate(f, wrapped)
987969

988970

989-
class _SummaryTimer(object):
990-
def __init__(self, summary):
991-
self._summary = summary
992-
993-
def __enter__(self):
994-
self._start = default_timer()
971+
class _Timer(object):
972+
def __init__(self, callback):
973+
self._callback = callback
995974

996-
def __exit__(self, typ, value, traceback):
997-
# Time can go backwards.
998-
self._summary.observe(max(default_timer() - self._start, 0))
999-
1000-
def __call__(self, f):
1001-
def wrapped(func, *args, **kwargs):
1002-
with self:
1003-
return func(*args, **kwargs)
1004-
return decorate(f, wrapped)
1005-
1006-
1007-
class _GaugeTimer(object):
1008-
def __init__(self, gauge):
1009-
self._gauge = gauge
975+
def _new_timer(self):
976+
return self.__class__(self._callback)
1010977

1011978
def __enter__(self):
1012979
self._start = default_timer()
1013980

1014981
def __exit__(self, typ, value, traceback):
1015982
# Time can go backwards.
1016-
self._gauge.set(max(default_timer() - self._start, 0))
983+
duration = max(default_timer() - self._start, 0)
984+
self._callback(duration)
1017985

1018986
def __call__(self, f):
1019987
def wrapped(func, *args, **kwargs):
1020-
with self:
988+
# Obtaining new instance of timer every time
989+
# ensures thread safety and reentrancy.
990+
with self._new_timer():
1021991
return func(*args, **kwargs)
1022992
return decorate(f, wrapped)

tests/test_core.py

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,13 @@
22

33
import inspect
44
import time
5-
import unittest
5+
from concurrent.futures import ThreadPoolExecutor
6+
7+
try:
8+
import unittest2 as unittest
9+
except ImportError:
10+
import unittest
11+
612

713
from prometheus_client.core import (
814
CollectorRegistry,
@@ -124,6 +130,26 @@ def f():
124130
f()
125131
self.assertNotEqual(0, self.registry.get_sample_value('g'))
126132

133+
def test_function_decorator_multithread(self):
134+
self.assertEqual(0, self.registry.get_sample_value('g'))
135+
workers = 2
136+
pool = ThreadPoolExecutor(max_workers=workers)
137+
138+
@self.gauge.time()
139+
def f(duration):
140+
time.sleep(duration)
141+
142+
expected_duration = 1
143+
pool.submit(f, expected_duration)
144+
time.sleep(0.7 * expected_duration)
145+
pool.submit(f, expected_duration * 2)
146+
time.sleep(expected_duration)
147+
148+
rounding_coefficient = 0.9
149+
adjusted_expected_duration = expected_duration * rounding_coefficient
150+
self.assertLess(adjusted_expected_duration, self.registry.get_sample_value('g'))
151+
pool.shutdown(wait=True)
152+
127153
def test_time_block_decorator(self):
128154
self.assertEqual(0, self.registry.get_sample_value('g'))
129155
with self.gauge.time():
@@ -155,6 +181,55 @@ def f():
155181
f()
156182
self.assertEqual(1, self.registry.get_sample_value('s_count'))
157183

184+
def test_function_decorator_multithread(self):
185+
self.assertEqual(0, self.registry.get_sample_value('s_count'))
186+
summary2 = Summary('s2', 'help', registry=self.registry)
187+
188+
workers = 3
189+
duration = 0.1
190+
pool = ThreadPoolExecutor(max_workers=workers)
191+
192+
@self.summary.time()
193+
def f():
194+
time.sleep(duration / 2)
195+
# Testing that different instances of timer do not interfere
196+
summary2.time()(lambda : time.sleep(duration / 2))()
197+
198+
jobs = workers * 3
199+
for i in range(jobs):
200+
pool.submit(f)
201+
pool.shutdown(wait=True)
202+
203+
self.assertEqual(jobs, self.registry.get_sample_value('s_count'))
204+
205+
rounding_coefficient = 0.9
206+
total_expected_duration = jobs * duration * rounding_coefficient
207+
self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum'))
208+
self.assertLess(total_expected_duration / 2 , self.registry.get_sample_value('s2_sum'))
209+
210+
def test_function_decorator_reentrancy(self):
211+
self.assertEqual(0, self.registry.get_sample_value('s_count'))
212+
213+
iterations = 2
214+
sleep = 0.1
215+
216+
@self.summary.time()
217+
def f(i=1):
218+
time.sleep(sleep)
219+
if i == iterations:
220+
return
221+
f(i+1)
222+
223+
f()
224+
225+
self.assertEqual(iterations, self.registry.get_sample_value('s_count'))
226+
227+
# Arithmetic series with d == a_1
228+
total_expected_duration = sleep * (iterations**2 + iterations) / 2
229+
rounding_coefficient = 0.9
230+
total_expected_duration *= rounding_coefficient
231+
self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum'))
232+
158233
def test_block_decorator(self):
159234
self.assertEqual(0, self.registry.get_sample_value('s_count'))
160235
with self.summary.time():
@@ -234,6 +309,27 @@ def f():
234309
self.assertEqual(1, self.registry.get_sample_value('h_count'))
235310
self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))
236311

312+
def test_function_decorator_multithread(self):
313+
self.assertEqual(0, self.registry.get_sample_value('h_count'))
314+
workers = 3
315+
duration = 0.1
316+
pool = ThreadPoolExecutor(max_workers=workers)
317+
318+
@self.histogram.time()
319+
def f():
320+
time.sleep(duration)
321+
322+
jobs = workers * 3
323+
for i in range(jobs):
324+
pool.submit(f)
325+
pool.shutdown(wait=True)
326+
327+
self.assertEqual(jobs, self.registry.get_sample_value('h_count'))
328+
329+
rounding_coefficient = 0.9
330+
total_expected_duration = jobs * duration * rounding_coefficient
331+
self.assertLess(total_expected_duration, self.registry.get_sample_value('h_sum'))
332+
237333
def test_block_decorator(self):
238334
self.assertEqual(0, self.registry.get_sample_value('h_count'))
239335
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))

tox.ini

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,22 @@ deps =
99

1010
[testenv:py26]
1111
; Last pytest and py version supported on py26 .
12-
deps =
12+
deps =
1313
unittest2
1414
py==1.4.31
1515
pytest==2.9.2
1616
coverage
17+
futures
18+
19+
[testenv:py27]
20+
deps =
21+
{[base]deps}
22+
futures
23+
24+
[testenv:pypy]
25+
deps =
26+
{[base]deps}
27+
futures
1728

1829
[testenv]
1930
deps =
@@ -24,7 +35,9 @@ commands = coverage run --parallel -m pytest {posargs}
2435

2536
; Ensure test suite passes if no optional dependencies are present.
2637
[testenv:py27-nooptionals]
27-
deps = {[base]deps}
38+
deps =
39+
{[base]deps}
40+
futures
2841
commands = coverage run --parallel -m pytest {posargs}
2942

3043
[testenv:py36-nooptionals]

0 commit comments

Comments
 (0)
0