E5EB Thread safety done right by haizaar · Pull Request #290 · prometheus/client_python · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 13 additions & 43 deletions prometheus_client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ def time(self):

Can be used as a function decorator or context manager.
'''
return _GaugeTimer(self)
return _Timer(self.set)

def set_function(self, f):
'''Call the provided function to return the Gauge value.
Expand Down Expand Up @@ -829,7 +829,7 @@ def time(self):

Can be used as a function decorator or context manager.
'''
return _SummaryTimer(self)
return _Timer(self.observe)

def _samples(self):
return (
Expand Down Expand Up @@ -919,7 +919,7 @@ def time(self):

Can be used as a function decorator or context manager.
'''
return _HistogramTimer(self)
return _Timer(self.observe)

def _samples(self):
samples = []
Expand All @@ -932,24 +932,6 @@ def _samples(self):
return tuple(samples)


class _HistogramTimer(object):
def __init__(self, histogram):
self._histogram = histogram

def __enter__(self):
self._start = default_timer()

def __exit__(self, typ, value, traceback):
# Time can go backwards.
self._histogram.observe(max(default_timer() - self._start, 0))

def __call__(self, f):
def wrapped(func, *args, **kwargs):
with self:
return func(*args, **kwargs)
return decorate(f, wrapped)


class _ExceptionCounter(object):
def __init__(self, counter, exception):
self._counter = counter
Expand Down Expand Up @@ -986,37 +968,25 @@ def wrapped(func, *args, **kwargs):
return decorate(f, wrapped)


class _SummaryTimer(object):
def __init__(self, summary):
self._summary = summary

def __enter__(self):
self._start = default_timer()
class _Timer(object):
def __init__(self, callback):
self._callback = callback

def __exit__(self, typ, value, traceback):
# Time can go backwards.
self._summary.observe(max(default_timer() - self._start, 0))

def __call__(self, f):
def wrapped(func, *args, **kwargs):
with self:
return func(*args, **kwargs)
return decorate(f, wrapped)


class _GaugeTimer(object):
def __init__(self, gauge):
self._gauge = gauge
def _new_timer(self):
return self.__class__(self._callback)

def __enter__(self):
self._start = default_timer()

def __exit__(self, typ, value, traceback):
# Time can go backwards.
self._gauge.set(max(default_timer() - self._start, 0))
duration = max(default_timer() - self._start, 0)
self._callback(duration)

def __call__(self, f):
def wrapped(func, *args, **kwargs):
with self:
# Obtaining new instance of timer every time
# ensures thread safety and reentrancy.
with self._new_timer():
return func(*args, **kwargs)
return decorate(f, wrapped)
98 changes: 97 additions & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

import inspect
import time
import unittest
from concurrent.futures import ThreadPoolExecutor

try:
import unittest2 as unittest
except ImportError:
import unittest


from prometheus_client.core import (
CollectorRegistry,
Expand Down Expand Up @@ -124,6 +130,26 @@ def f():
f()
self.assertNotEqual(0, self.registry.get_sample_value('g'))

def test_function_decorator_multithread(self):
self.assertEqual(0, self.registry.get_sample_value('g'))
workers = 2
pool = ThreadPoolExecutor(max_workers=workers)

@self.gauge.time()
def f(duration):
time.sleep(duration)

expected_duration = 1
pool.submit(f, expected_duration)
time.sleep(0.7 * expected_duration)
pool.submit(f, expected_duration * 2)
time.sleep(expected_duration)

rounding_coefficient = 0.9
adjusted_expected_duration = expected_duration * rounding_coefficient
self.assertLess(adjusted_expected_duration, self.registry.get_sample_value('g'))
pool.shutdown(wait=True)

def test_time_block_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('g'))
with self.gauge.time():
Expand Down Expand Up @@ -155,6 +181,55 @@ def f():
f()
self.assertEqual(1, self.registry.get_sample_value('s_count'))

def test_function_decorator_multithread(self):
self.assertEqual(0, self.registry.get_sample_value('s_count'))
summary2 = Summary('s2', 'help', registry=self.registry)

workers = 3
duration = 0.1
pool = ThreadPoolExecutor(max_workers=workers)

@self.summary.time()
def f():
time.sleep(duration / 2)
# Testing that different instances of timer do not interfere
summary2.time()(lambda : time.sleep(duration / 2))()

jobs = workers * 3
for i in range(jobs):
pool.submit(f)
pool.shutdown(wait=True)

self.assertEqual(jobs, self.registry.get_sample_value('s_count'))

rounding_coefficient = 0.9
total_expected_duration = jobs * duration * rounding_coefficient
self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum'))
self.assertLess(total_expected_duration / 2 , self.registry.get_sample_value('s2_sum'))

def test_function_decorator_reentrancy(self):
self.assertEqual(0, self.registry.get_sample_value('s_count'))

iterations = 2
sleep = 0.1

@self.summary.time()
def f(i=1):
time.sleep(sleep)
if i == iterations:
return
f(i+1)

f()

self.assertEqual(iterations, self.registry.get_sample_value('s_count'))

# Arithmetic series with d == a_1
total_expected_duration = sleep * (iterations**2 + iterations) / 2
rounding_coefficient = 0.9
total_expected_duration *= rounding_coefficient
self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum'))

def test_block_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('s_count'))
with self.summary.time():
Expand Down Expand Up @@ -234,6 +309,27 @@ def f():
self.assertEqual(1, self.registry.get_sample_value('h_count'))
self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))

def test_function_decorator_multithread(self):
self.assertEqual(0, self.registry.get_sample_value('h_count'))
workers = 3
duration = 0.1
pool = ThreadPoolExecutor(max_workers=workers)

@self.histogram.time()
def f():
time.sleep(duration)

jobs = workers * 3
for i in range(jobs):
pool.submit(f)
pool.shutdown(wait=True)

self.assertEqual(jobs, self.registry.get_sample_value('h_count'))

rounding_coefficient = 0.9
total_expected_duration = jobs * duration * rounding_coefficient
self.assertLess(total_expected_duration, self.registry.get_sample_value('h_sum'))

def test_block_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('h_count'))
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))
Expand Down
17 changes: 15 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,22 @@ deps =

[testenv:py26]
; Last pytest and py version supported on py26 .
deps =
deps =
unittest2
py==1.4.31
pytest==2.9.2
coverage
futures

[testenv:py27]
deps =
{[base]deps}
futures

[testenv:pypy]
deps =
{[base]deps}
futures

[testenv]
deps =
Expand All @@ -24,7 +35,9 @@ commands = coverage run --parallel -m pytest {posargs}

; Ensure test suite passes if no optional dependencies are present.
[testenv:py27-nooptionals]
deps = {[base]deps}
deps =
{[base]deps}
futures
commands = coverage run --parallel -m pytest {posargs}

[testenv:py36-nooptionals]
Expand Down
0