8000 Merge pull request #102 from panta/master · fluent/fluent-logger-python@3178185 · GitHub
[go: up one dir, main page]

Skip to content 8000

Commit 3178185

Browse files
authored
Merge pull request #102 from panta/master
Fix the asyncsender and make the asyncsender queue a bounded-size circular buffer by default.
2 parents d99bca7 + 6ac8458 commit 3178185

File tree

5 files changed

+287
-14
lines changed

5 files changed

+287
-14
lines changed

README.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,18 @@ or for the python logging interface:
348348
sure the communication thread terminates and it's joined correctly. Otherwise the program won't exit, waiting for
349349
the thread, unless forcibly killed.
350350

351+
#### Circular queue mode
352+
353+
In some applications it can be especially important to guarantee that the logging process won't block under *any*
354+
circumstance, even when it's logging faster than the sending thread could handle (_backpressure_). In this case it's
355+
possible to enable the `circular queue` mode, by passing `True` in the `queue_circular` parameter of
356+
``asyncsender.FluentSender`` or ``asynchandler.FluentHandler``. By doing so the thread doing the logging won't block
357+
even when the queue is full, the new event will be added to the queue by discarding the oldest one.
358+
359+
**WARNING**: setting `queue_circular` to `True` will cause loss of events if the queue fills up completely! Make sure
360+
that this doesn't happen, or it's acceptable for your application.
361+
362+
351363
Testing
352364
-------
353365

fluent/asyncsender.py

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
_global_sender = None
1515

1616
DEFAULT_QUEUE_TIMEOUT = 0.05
17+
DEFAULT_QUEUE_MAXSIZE = 100
18+
DEFAULT_QUEUE_CIRCULAR = False
1719

1820

1921
def _set_global_sender(sender):
@@ -46,19 +48,29 @@ def __init__(self, tag,
4648
buffer_overflow_handler=None,
4749
nanosecond_precision=False,
4850
msgpack_kwargs=None,
49-
queue_timeout=DEFAULT_QUEUE_TIMEOUT, *args, **kwargs):
51+
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
52+
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
53+
queue_circular=DEFAULT_QUEUE_CIRCULAR, *args, **kwargs):
5054
super(CommunicatorThread, self).__init__(**kwargs)
51-
self._queue = Queue()
55+
self._queue = Queue(maxsize=queue_maxsize)
5256
self._do_run = True
5357
self._queue_timeout = queue_timeout
58+
self._queue_maxsize = queue_maxsize
59+
self._queue_circular = queue_circular
5460
self._conn_close_lock = threading.Lock()
5561
self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
5662
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
5763
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs)
5864

5965
def send(self, bytes_):
66+
if self._queue_circular and self._queue.full():
67+
# discard oldest
68+
try:
69+
self._queue.get(block=False)
70+
except Empty: # pragma: no cover
71+
pass
6072
try:
61-
self._queue.put(bytes_)
73+
self._queue.put(bytes_, block=(not self._queue_circular))
6274
except Full:
6375
return False
6476
return True
@@ -114,11 +126,17 @@ def queue_timeout(self):
114126
def queue_timeout(self, value):
115127
self._queue_timeout = value
116128

117-
def __enter__(self):
118-
return self
129+
@property
130+
def queue_maxsize(self):
131+
return self._queue_maxsize
119132

120-
def __exit__(self, typ, value, traceback):
121-
self.close()
133+
@property
134+
def queue_blocking(self):
135+
return not self._queue_circular
136+
137+
@property
138+
def queue_circular(self):
139+
return self._queue_circular
122140

123141

124142
class FluentSender(sender.FluentSender):
@@ -133,6 +151,8 @@ def __init__(self,
133151
nanosecond_precision=False,
134152
ms 1E0A gpack_kwargs=None,
135153
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
154+
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
155+
queue_circular=DEFAULT_QUEUE_CIRCULAR,
136156
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
137157
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
138158
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
@@ -141,7 +161,8 @@ def __init__(self,
141161
self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
142162
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
143163
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
144-
queue_timeout=queue_timeout)
164+
queue_timeout=queue_timeout, queue_maxsize=queue_maxsize,
165+
queue_circular=queue_circular)
145166
self._communicator.start()
146167

147168
def _send(self, bytes_):
@@ -152,10 +173,10 @@ def _close(self):
152173
self._communicator._close()
153174

154175
def _send_internal(self, bytes_):
155-
return
176+
assert False # pragma: no cover
156177

157178
def _send_data(self, bytes_):
158-
return
179+
assert False # pragma: no cover
159180

160181
# override reconnect, so we don't open a socket here (since it
161182
# will be opened by the CommunicatorThread)
@@ -186,6 +207,18 @@ def queue_timeout(self):
186207
def queue_timeout(self, value):
187208
self._communicator.queue_timeout = value
188209

210+
@property
211+
def queue_maxsize(self):
212+
return self._communicator.queue_maxsize
213+
214+
@property
215+
def queue_blocking(self):
216+
return self._communicator.queue_blocking
217+
218+
@property
219+
def queue_circular(self):
220+
return self._communicator.queue_circular
221+
189222
def __enter__(self):
190223
return self
191224

fluent/handler.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,30 +170,32 @@ def __init__(self,
170170
verbose=False,
171171
buffer_overflow_handler=None,
172172
msgpack_kwargs=None,
173-
nanosecond_precision=False):
173+
nanosecond_precision=False,
174+
**kwargs):
174175

175176
self.tag = tag
176177
self.sender = self.getSenderInstance(tag,
177178
host=host, port=port,
178179
timeout=timeout, verbose=verbose,
179180
buffer_overflow_handler=buffer_overflow_handler,
180181
msgpack_kwargs=msgpack_kwargs,
181-
nanosecond_precision=nanosecond_precision)
182+
nanosecond_precision=nanosecond_precision,
183+
**kwargs)
182184
logging.Handler.__init__(self)
183185

184186
def getSenderClass(self):
185187
return sender.FluentSender
186188

187189
def getSenderInstance(self, tag, host, port, timeout, verbose,
188190
buffer_overflow_handler, msgpack_kwargs,
189-
nanosecond_precision):
191+
nanosecond_precision, **kwargs):
190192
sender_class = self.getSenderClass()
191193
return sender_class(tag,
192194
host=host, port=port,
193195
timeout=timeout, verbose=verbose,
194196
buffer_overflow_handler=buffer_overflow_handler,
195197
msgpack_kwargs=msgpack_kwargs,
196-
nanosecond_precision=nanosecond_precision)
198+
nanosecond_precision=nanosecond_precision, **kwargs)
197199

198200
def emit(self, record):
199201
data = self.format(record)

tests/test_asynchandler.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,3 +277,60 @@ def test_non_string_dict_message(self):
277277
data = self.get_data()
278278
# For some reason, non-string keys are ignored
279279
self.assertFalse(42 in data[0][2])
280+
281+
282+
class TestHandlerWithCircularQueue(unittest.TestCase):
283+
Q_TIMEOUT = 0.04
284+
Q_SIZE = 3
285+
286+
def setUp(self):
287+
super(TestHandlerWithCircularQueue, self).setUp()
288+
self._server = mockserver.MockRecvServer('localhost')
289+
self._port = self._server.port
290+
self.handler = None
291+
292+
def get_handler_class(self):
293+
# return fluent.handler.FluentHandler
294+
return fluent.asynchandler.FluentHandler
295+
296+
def get_data(self):
297+
return self._server.get_recieved()
298+
299+
def test_simple(self):
300+
handler = self.get_handler_class()('app.follow', port=self._port,
301+
queue_timeout=self.Q_TIMEOUT,
302+
queue_maxsize=self.Q_SIZE,
303+
queue_circular=True)
304+
self.handler = handler
305+
306+
self.assertEqual(self.handler.sender.queue_circular, True)
307+
self.assertEqual(self.handler.sender.queue_maxsize, self.Q_SIZE)
308+
309+
logging.basicConfig(level=logging.INFO)
310+
log = logging.getLogger('fluent.test')
311+
handler.setFormatter(fluent.handler.FluentRecordFormatter())
312+
log.addHandler(handler)
313+
log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'})
314+
log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'})
315+
log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'})
316+
log.info({'cnt': 4, 'from': 'userA', 'to': 'userB'})
317+
log.info({'cnt': 5, 'from': 'userA', 'to': 'userB'})
318+
319+
# wait, giving time to the communicator thread to send the messages
320+
time.sleep(0.5)
321+
# close the handler, to join the thread and let the test suite to terminate
322+
handler.close()
323+
324+
data = self.get_data()
325+
eq = self.assertEqual
326+
# with the logging interface, we can't be sure to have filled up the queue, so we can
327+
# test only for a cautelative condition here
328+
self.assertTrue(len(data) >= self.Q_SIZE)
329+
330+
el = data[0]
331+
eq(3, len(el))
332+
eq('app.follow', el[0])
333+
eq('userA', el[2]['from'])
334+
eq('userB', el[2]['to'])
335+
self.assertTrue(el[1])
336+
self.assertTrue(isinstance(el[1], int))

0 commit comments

Comments
 (0)
0