From 87a984a4e3606d712be17358a2d58276ce460f0f Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Wed, 11 Jul 2018 16:07:35 +1000 Subject: [PATCH 1/8] Multithreading support for time() decorators Fixes #287 Signed-off-by: Zaar Hai --- prometheus_client/core.py | 56 +++++++--------------------------- tests/test_core.py | 64 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 45 deletions(-) diff --git a/prometheus_client/core.py b/prometheus_client/core.py index 0f1d0144..7adaf9c2 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -13,7 +13,7 @@ import time import types -from threading import Lock +from threading import local, Lock from timeit import default_timer from .decorator import decorate @@ -765,7 +765,7 @@ def time(self): Can be used as a function decorator or context manager. ''' - return _GaugeTimer(self) + return _Timer(self.set) def set_function(self, f): '''Call the provided function to return the Gauge value. @@ -829,7 +829,7 @@ def time(self): Can be used as a function decorator or context manager. ''' - return _SummaryTimer(self) + return _Timer(self.observe) def _samples(self): return ( @@ -919,7 +919,7 @@ def time(self): Can be used as a function decorator or context manager. ''' - return _HistogramTimer(self) + return _Timer(self.observe) def _samples(self): samples = [] @@ -932,24 +932,6 @@ def _samples(self): return tuple(samples) -class _HistogramTimer(object): - def __init__(self, histogram): - self._histogram = histogram - - def __enter__(self): - self._start = default_timer() - - def __exit__(self, typ, value, traceback): - # Time can go backwards. - self._histogram.observe(max(default_timer() - self._start, 0)) - - def __call__(self, f): - def wrapped(func, *args, **kwargs): - with self: - return func(*args, **kwargs) - return decorate(f, wrapped) - - class _ExceptionCounter(object): def __init__(self, counter, exception): self._counter = counter @@ -986,34 +968,18 @@ def wrapped(func, *args, **kwargs): return decorate(f, wrapped) -class _SummaryTimer(object): - def __init__(self, summary): - self._summary = summary - - def __enter__(self): - self._start = default_timer() - - def __exit__(self, typ, value, traceback): - # Time can go backwards. - self._summary.observe(max(default_timer() - self._start, 0)) - - def __call__(self, f): - def wrapped(func, *args, **kwargs): - with self: - return func(*args, **kwargs) - return decorate(f, wrapped) - - -class _GaugeTimer(object): - def __init__(self, gauge): - self._gauge = gauge +class _Timer(object): + def __init__(self, callback): + self._callback = callback + self._storage = local() def __enter__(self): - self._start = default_timer() + self._storage.start = default_timer() def __exit__(self, typ, value, traceback): # Time can go backwards. - self._gauge.set(max(default_timer() - self._start, 0)) + duration = max(default_timer() - self._storage.start, 0) + self._callback(duration) def __call__(self, f): def wrapped(func, *args, **kwargs): diff --git a/tests/test_core.py b/tests/test_core.py index c949b623..fea563fe 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -3,6 +3,8 @@ import inspect import time import unittest +from concurrent.futures import ThreadPoolExecutor + from prometheus_client.core import ( CollectorRegistry, @@ -124,6 +126,26 @@ def f(): f() self.assertNotEqual(0, self.registry.get_sample_value('g')) + def test_function_decorator_multithread(self): + self.assertEqual(0, self.registry.get_sample_value('g')) + workers = 2 + pool = ThreadPoolExecutor(max_workers=workers) + + @self.gauge.time() + def f(duration): + time.sleep(duration) + + expected_duration = 1 + pool.submit(f, expected_duration) + time.sleep(0.7 * expected_duration) + pool.submit(f, expected_duration * 2) + time.sleep(expected_duration) + + rounding_coefficient = 0.9 + adjusted_expected_duration = expected_duration * rounding_coefficient + self.assertLess(adjusted_expected_duration, self.registry.get_sample_value('g')) + pool.shutdown(wait=True) + def test_time_block_decorator(self): self.assertEqual(0, self.registry.get_sample_value('g')) with self.gauge.time(): @@ -155,6 +177,27 @@ def f(): f() self.assertEqual(1, self.registry.get_sample_value('s_count')) + def test_function_decorator_multithread(self): + self.assertEqual(0, self.registry.get_sample_value('s_count')) + workers = 3 + duration = 0.1 + pool = ThreadPoolExecutor(max_workers=workers) + + @self.summary.time() + def f(): + time.sleep(duration) + + jobs = workers * 3 + for i in range(jobs): + pool.submit(f) + pool.shutdown(wait=True) + + self.assertEqual(jobs, self.registry.get_sample_value('s_count')) + + rounding_coefficient = 0.9 + total_expected_duration = jobs * duration * rounding_coefficient + self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum')) + def test_block_decorator(self): self.assertEqual(0, self.registry.get_sample_value('s_count')) with self.summary.time(): @@ -234,6 +277,27 @@ def f(): self.assertEqual(1, self.registry.get_sample_value('h_count')) self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) + def test_function_decorator_multithread(self): + self.assertEqual(0, self.registry.get_sample_value('h_count')) + workers = 3 + duration = 0.1 + pool = ThreadPoolExecutor(max_workers=workers) + + @self.histogram.time() + def f(): + time.sleep(duration) + + jobs = workers * 3 + for i in range(jobs): + pool.submit(f) + pool.shutdown(wait=True) + + self.assertEqual(jobs, self.registry.get_sample_value('h_count')) + + rounding_coefficient = 0.9 + total_expected_duration = jobs * duration * rounding_coefficient + self.assertLess(total_expected_duration, self.registry.get_sample_value('h_sum')) + def test_block_decorator(self): self.assertEqual(0, self.registry.get_sample_value('h_count')) self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) From 8db16f909fb84eb9031fdfd5cbbb689bd921b26e Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Wed, 11 Jul 2018 16:08:16 +1000 Subject: [PATCH 2/8] Installing futures package for Python 2.x Required for multithreading tests. Signed-off-by: Zaar Hai --- tox.ini | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index c9c1138f..04a84389 100644 --- a/tox.ini +++ b/tox.ini @@ -9,11 +9,17 @@ deps = [testenv:py26] ; Last pytest and py version supported on py26 . -deps = +deps = unittest2 py==1.4.31 pytest==2.9.2 coverage + futures + +[testenv:py27] +deps = + {[base]deps} + futures [testenv] deps = From 6fac35c3796b7e6c78cfa214f7c62ff53fa8e928 Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Wed, 11 Jul 2018 16:32:49 +1000 Subject: [PATCH 3/8] Py2.x unittest compatability Signed-off-by: Zaar Hai --- tests/test_core.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_core.py b/tests/test_core.py index fea563fe..81c1461f 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -2,9 +2,13 @@ import inspect import time -import unittest from concurrent.futures import ThreadPoolExecutor +try: + import unittest2 as unittest +except ImportError: + import unittest + from prometheus_client.core import ( CollectorRegistry, From 8db71178c72ec7d893d4d5064483359a774f2c44 Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Wed, 11 Jul 2018 16:29:56 +1000 Subject: [PATCH 4/8] Fixing text environment for Py27 multithread Signed-off-by: Zaar Hai --- tox.ini | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 04a84389..e42fc8eb 100644 --- a/tox.ini +++ b/tox.ini @@ -30,7 +30,9 @@ commands = coverage run --parallel -m pytest {posargs} ; Ensure test suite passes if no optional dependencies are present. [testenv:py27-nooptionals] -deps = {[base]deps} +deps = + {[base]deps} + futures commands = coverage run --parallel -m pytest {posargs} [testenv:py36-nooptionals] From 0aaa3b19f5ed2640d9f400e9c4829c679f79247a Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Wed, 11 Jul 2018 16:34:47 +1000 Subject: [PATCH 5/8] pypy needs futures for testing as well It's a 2.7 version of Python lang. Signed-off-by: Zaar Hai --- tox.ini | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tox.ini b/tox.ini index e42fc8eb..2b0d0bbb 100644 --- a/tox.ini +++ b/tox.ini @@ -21,6 +21,11 @@ deps = {[base]deps} futures +[testenv:pypy] +deps = + {[base]deps} + futures + [testenv] deps = {[base]deps} From dc37fa11a880c631b7615150b7075e9877d2d695 Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Thu, 12 Jul 2018 18:33:36 +1000 Subject: [PATCH 6/8] Ensuring that different instances of timer do not interfere Signed-off-by: Zaar Hai --- prometheus_client/core.py | 6 ++++-- tests/test_core.py | 7 ++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/prometheus_client/core.py b/prometheus_client/core.py index 7adaf9c2..9ca6cd63 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -972,13 +972,15 @@ class _Timer(object): def __init__(self, callback): self._callback = callback self._storage = local() + self.key = "k_{}".format(id(self)) def __enter__(self): - self._storage.start = default_timer() + setattr(self._storage, self.key, default_timer()) def __exit__(self, typ, value, traceback): + start = getattr(self._storage, self.key) # Time can go backwards. - duration = max(default_timer() - self._storage.start, 0) + duration = max(default_timer() - start, 0) self._callback(duration) def __call__(self, f): diff --git a/tests/test_core.py b/tests/test_core.py index 81c1461f..a732dc21 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -183,13 +183,17 @@ def f(): def test_function_decorator_multithread(self): self.assertEqual(0, self.registry.get_sample_value('s_count')) + summary2 = Summary('s2', 'help', registry=self.registry) + workers = 3 duration = 0.1 pool = ThreadPoolExecutor(max_workers=workers) @self.summary.time() def f(): - time.sleep(duration) + time.sleep(duration / 2) + # Testing that different instances of timer do not interfere + summary2.time()(lambda : time.sleep(duration / 2))() jobs = workers * 3 for i in range(jobs): @@ -201,6 +205,7 @@ def f(): rounding_coefficient = 0.9 total_expected_duration = jobs * duration * rounding_coefficient self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum')) + self.assertLess(total_expected_duration / 2 , self.registry.get_sample_value('s2_sum')) def test_block_decorator(self): self.assertEqual(0, self.registry.get_sample_value('s_count')) From 589be96c9d74d5da9393ae639add7f8778a6674f Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Thu, 12 Jul 2018 18:37:23 +1000 Subject: [PATCH 7/8] Python2.6 compliance Signed-off-by: Zaar Hai Python2.6 compliance take 2 Signed-off-by: Zaar Hai --- prometheus_client/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prometheus_client/core.py b/prometheus_client/core.py index 9ca6cd63..706e0234 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -972,7 +972,7 @@ class _Timer(object): def __init__(self, callback): self._callback = callback self._storage = local() - self.key = "k_{}".format(id(self)) + self.key = "k_{0}".format(id(self)) def __enter__(self): setattr(self._storage, self.key, default_timer()) From 34721a9d2a1a17959bca159227dfe42aaa1b0a3d Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Fri, 20 Jul 2018 22:50:20 +1000 Subject: [PATCH 8/8] Using new object instead of thread local storage It makes time() decorated functions reentrant. It's a bit slower than TLS version, but for only 0.12us (micro) per iteration - it worth the reentrancy and it's also IMHO a more clear and pythonic. Signed-off-by: Zaar Hai --- prometheus_client/core.py | 16 +++++++++------- tests/test_core.py | 23 +++++++++++++++++++++++ 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/prometheus_client/core.py b/prometheus_client/core.py index 706e0234..a1c72333 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -13,7 +13,7 @@ import time import types -from threading import local, Lock +from threading import Lock from timeit import default_timer from .decorator import decorate @@ -971,20 +971,22 @@ def wrapped(func, *args, **kwargs): class _Timer(object): def __init__(self, callback): self._callback = callback - self._storage = local() - self.key = "k_{0}".format(id(self)) + + def _new_timer(self): + return self.__class__(self._callback) def __enter__(self): - setattr(self._storage, self.key, default_timer()) + self._start = default_timer() def __exit__(self, typ, value, traceback): - start = getattr(self._storage, self.key) # Time can go backwards. - duration = max(default_timer() - start, 0) + duration = max(default_timer() - self._start, 0) self._callback(duration) def __call__(self, f): def wrapped(func, *args, **kwargs): - with self: + # Obtaining new instance of timer every time + # ensures thread safety and reentrancy. + with self._new_timer(): return func(*args, **kwargs) return decorate(f, wrapped) diff --git a/tests/test_core.py b/tests/test_core.py index a732dc21..f85a3e1b 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -207,6 +207,29 @@ def f(): self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum')) self.assertLess(total_expected_duration / 2 , self.registry.get_sample_value('s2_sum')) + def test_function_decorator_reentrancy(self): + self.assertEqual(0, self.registry.get_sample_value('s_count')) + + iterations = 2 + sleep = 0.1 + + @self.summary.time() + def f(i=1): + time.sleep(sleep) + if i == iterations: + return + f(i+1) + + f() + + self.assertEqual(iterations, self.registry.get_sample_value('s_count')) + + # Arithmetic series with d == a_1 + total_expected_duration = sleep * (iterations**2 + iterations) / 2 + rounding_coefficient = 0.9 + total_expected_duration *= rounding_coefficient + self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum')) + def test_block_decorator(self): self.assertEqual(0, self.registry.get_sample_value('s_count')) with self.summary.time():