8000 Merge branch 'master' into issue_77_103 · fluent/fluent-logger-python@813ae7d · GitHub
[go: up one dir, main page]

Skip to content

Commit 813ae7d

Browse files
authored
Merge branch 'master' into issue_77_103
2 parents 161d76e + 3178185 commit 813ae7d

File tree

6 files changed

+363
-38
lines changed

6 files changed

+363
-38
lines changed

README.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,18 @@ or for the python logging interface:
348348
sure the communication thread terminates and it's joined correctly. Otherwise the program won't exit, waiting for
349349
the thread, unless forcibly killed.
350350

351+
#### Circular queue mode
352+
353+
In some applications it can be especially important to guarantee that the logging process won't block under *any*
354+
circumstance, even when it's logging faster than the sending thread could handle (_backpressure_). In this case it's
355+
possible to enable the `circular queue` mode, by passing `True` in the `queue_circular` parameter of
356+
``asyncsender.FluentSender`` or ``asynchandler.FluentHandler``. By doing so the thread doing the logging won't block
357+
even when the queue is full, the new event will be added to the queue by discarding the oldest one.
358+
359+
**WARNING**: setting `queue_circular` to `True` will cause loss of events if the queue fills up completely! Make sure
360+
that this doesn't happen, or it's acceptable for your application.
361+
362+
351363
Testing
352364
-------
353365

fluent/asyncsender.py

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
_global_sender = None
1919

2020
DEFAULT_QUEUE_TIMEOUT = 0.05
21+
DEFAULT_QUEUE_MAXSIZE = 100
22+
DEFAULT_QUEUE_CIRCULAR = False
2123

2224

2325
def _set_global_sender(sender): # pragma: no cover
@@ -50,19 +52,29 @@ def __init__(self, tag,
5052
buffer_overflow_handler=None,
5153
nanosecond_precision=False,
5254
msgpack_kwargs=None,
53-
queue_timeout=DEFAULT_QUEUE_TIMEOUT, *args, **kwargs):
55+
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
56+
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
57+
queue_circular=DEFAULT_QUEUE_CIRCULAR, *args, **kwargs):
5458
super(CommunicatorThread, self).__init__(**kwargs)
55-
self._queue = Queue()
59+
self._queue = Queue(maxsize=queue_maxsize)
5660
self._do_run = True
5761
self._queue_timeout = queue_timeout
62+
self._queue_maxsize = queue_maxsize
63+
self._queue_circular = queue_circular
5864
self._conn_close_lock = threading.Lock()
5965
self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
6066
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
6167
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs)
6268

6369
def send(self, bytes_):
70+
if self._queue_circular and self._queue.full():
71+
# discard oldest
72+
try:
73+
self._queue.get(block=False)
74+
except Empty: # pragma: no cover
75+
pass
6476
try:
65-
self._queue.put(bytes_)
77+
self._queue.put(bytes_, block=(not self._queue_circular))
6678
except Full:
6779
return False
6880
return True
@@ -111,11 +123,17 @@ def queue_timeout(self):
111123
def queue_timeout(self, value):
112124
self._queue_timeout = value
113125

114-
def __enter__(self):
115-
return self
126+
@property
127+
def queue_maxsize(self):
128+
return self._queue_maxsize
116129

117-
def __exit__(self, typ, value, traceback):
118-
self.close()
130+
@property
131+
def queue_blocking(self):
132+
return not self._queue_circular
133+
134+
@property
135+
def queue_circular(self):
136+
return self._queue_circular
119137

120138

121139
class FluentSender(sender.FluentSender):
@@ -130,16 +148,18 @@ def __init__(self,
130148
nanosecond_precision=False,
131149
msgpack_kwargs=None,
132150
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
133-
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
151+
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
152+
queue_circular=DEFAULT_QUEUE_CIRCULAR,
153+
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
134154
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
135155
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
136156
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
137157
**kwargs)
138158
self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
139159
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
140-
nanosecond_precision=nanosecond_precision,
141-
msgpack_kwargs=msgpack_kwargs,
142-
queue_timeout=queue_timeout)
160+
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
161+
queue_timeout=queue_timeout, queue_maxsize=queue_maxsize,
162+
queue_circular=queue_circular)
143163
self._communicator.start()
144164

145165
def _send(self, bytes_):
@@ -150,10 +170,10 @@ def _close(self):
150170
self._communicator._close()
151171

152172
def _send_internal(self, bytes_):
153-
return
173+
assert False # pragma: no cover
154174

155175
def _send_data(self, bytes_):
156-
return
176+
assert False # pragma: no cover
157177

158178
# override reconnect, so we don't open a socket here (since it
159179
# will be opened by the CommunicatorThread)
@@ -184,6 +204,18 @@ def queue_timeout(self):
184204
def queue_timeout(self, value):
185205
self._communicator.queue_timeout = value
186206

207+
@property
208+
def queue_maxsize(self):
209+
return self._communicator.queue_maxsize
210+
211+
@property
212+
def queue_blocking(self):
213+
return self._communicator.queue_blocking
214+
215+
@property
216+
def queue_circular(self):
217+
return self._communicator.queue_circular
218+
187219
def __enter__(self):
188220
return self
189221

fluent/handler.py

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,14 @@ class FluentRecordFormatter(logging.Formatter, object):
2929
key is not found. Put None if not found.
3030
:param format_json: if True, will attempt to parse message as json. If not,
3131
will use message as-is. Defaults to True
32+
:param exclude_attrs: switches this formatter into a mode where all attributes
33+
except the ones specified by `exclude_attrs` are logged with the record as is.
34+
If `None`, operates as before, otherwise `fmt` is ignored.
35+
Can be a `list`, `tuple` or a `set`.
3236
"""
3337

34-
def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False, format_json=True):
38+
def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False, format_json=True,
39+
exclude_attrs=None):
3540
super(FluentRecordFormatter, self).__init__(None, datefmt)
3641

3742
if sys.version_info[0:2] >= (3, 2) and style != '%':
@@ -55,10 +60,15 @@ def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False
5560
'sys_module': '%(module)s',
5661
}
5762

58-
if not fmt:
59-
self._fmt_dict = basic_fmt_dict
63+
if exclude_attrs is not None:
64+
self._exc_attrs = set(exclude_attrs)
65+
self._fmt_dict = None
6066
else:
61-
self._fmt_dict = fmt
67+
self._exc_attrs = None
68+
if not fmt:
69+
self._fmt_dict = basic_fmt_dict
70+
else:
71+
self._fmt_dict = fmt
6272

6373
if format_json:
6474
self._format_msg = self._format_msg_json
@@ -81,25 +91,33 @@ def format(self, record):
8191

8292
# Apply format
8393
data = {}
84-
for key, value in self._fmt_dict.items():
85-
try:
86-
if self.__style:
87-
value = self.__style(value).format(record)
88-
else:
89-
value = value % record.__dict__
90-
except KeyError as exc:
91-
value = None
92-
if not self.fill_missing_fmt_key:
93-
raise exc
94-
95-
data[key] = value
94+
if self._exc_attrs is not None:
95+
for key, value in record.__dict__.items():
96+
if key not in self._exc_attrs:
97+
data[key] = value
98+
else:
99+
for key, value in self._fmt_dict.items():
100+
try:
101+
if self.__style:
102+
value = self.__style(value).format(record)
103+
else:
104+
value = value % record.__dict__
105+
except KeyError as exc:
106+
value = None
107+
if not self.fill_missing_fmt_key:
108+
raise exc
109+
110+
data[key] = value
96111

97112
self._structuring(data, record)
98113
return data
99114

100115
def usesTime(self):
101-
return any([value.find('%(asctime)') >= 0
102-
for value in self._fmt_dict.values()])
116+
if self._exc_attrs is not None:
117+
return super(FluentRecordFormatter, self).usesTime()
118+
else:
119+
return any([value.find('%(asctime)') >= 0
F438 120+
for value in self._fmt_dict.values()])
103121

104122
def _structuring(self, data, record):
105123
""" Melds `msg` into `data`.
@@ -152,30 +170,32 @@ def __init__(self,
152170
verbose=False,
153171
buffer_overflow_handler=None,
154172
msgpack_kwargs=None,
155-
nanosecond_precision=False):
173+
nanosecond_precision=False,
174+
**kwargs):
156175

157176
self.tag = tag
158177
self.sender = self.getSenderInstance(tag,
159178
host=host, port=port,
160179
timeout=timeout, verbose=verbose,
161180
buffer_overflow_handler=buffer_overflow_handler,
162181
msgpack_kwargs=msgpack_kwargs,
163-
nanosecond_precision=nanosecond_precision)
182+
nanosecond_precision=nanosecond_precision,
183+
**kwargs)
164184
logging.Handler.__init__(self)
165185

166186
def getSenderClass(self):
167187
return sender.FluentSender
168188

169189
def getSenderInstance(self, tag, host, port, timeout, verbose,
170190
buffer_overflow_handler, msgpack_kwargs,
171-
nanosecond_precision):
191+
nanosecond_precision, **kwargs):
172192
sender_class = self.getSenderClass()
173193
return sender_class(tag,
174194
host=host, port=port,
175195
timeout=timeout, verbose=verbose,
176196
buffer_overflow_handler=buffer_overflow_handler,
177197
msgpack_kwargs=msgpack_kwargs,
178-
nanosecond_precision=nanosecond_precision)
198+
nanosecond_precision=nanosecond_precision, **kwargs)
179199

180200
def emit(self, record):
181201
data = self.format(record)

tests/test_asynchandler.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,3 +277,60 @@ def test_non_string_dict_message(self):
277277
data = self.get_data()
278278
# For some reason, non-string keys are ignored
279279
self.assertFalse(42 in data[0][2])
280+
281+
282+
class TestHandlerWithCircularQueue(unittest.TestCase):
283+
Q_TIMEOUT = 0.04
284+
Q_SIZE = 3
285+
286+
def setUp(self):
287+
super(TestHandlerWithCircularQueue, self).setUp()
288+
self._server = mockserver.MockRecvServer('localhost')
289+
self._port = self._server.port
290+
self.handler = None
291+
292+
def get_handler_class(self):
293+
# return fluent.handler.FluentHandler
294+
return fluent.asynchandler.FluentHandler
295+
296+
def get_data(self):
297+
return self._server.get_recieved()
298+
299+
def test_simple(self):
300+
handler = self.get_handler_class()('app.follow', port=self._port,
301+
queue_timeout=self.Q_TIMEOUT,
302+
queue_maxsize=self.Q_SIZE,
303+
queue_circular=True)
304+
self.handler = handler
305+
306+
self.assertEqual(self.handler.sender.queue_circular, True)
307+
self.assertEqual(self.handler.sender.queue_maxsize, self.Q_SIZE)
308+
309+
logging.basicConfig(level=logging.INFO)
310+
log = logging.getLogger('fluent.test')
311+
handler.setFormatter(fluent.handler.FluentRecordFormatter())
312+
log.addHandler(handler)
313+
log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'})
314+
log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'})
315+
log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'})
316+
log.info({'cnt': 4, 'from': 'userA', 'to': 'userB'})
317+
log.info({'cnt': 5, 'from': 'userA', 'to': 'userB'})
318+
319+
# wait, giving time to the communicator thread to send the messages
320+
time.sleep(0.5)
321+
# close the handler, to join the thread and let the test suite to terminate
322+
handler.close()
323+
324+
data = self.get_data()
325+
eq = self.assertEqual
326+
# with the logging interface, we can't be sure to have filled up the queue, so we can
327+
# test only for a cautelative condition here
328+
self.assertTrue(len(data) >= self.Q_SIZE)
329+
330+
el = data[0]
331+
eq(3, len(el))
332+
eq('app.follow', el[0])
333+
eq('userA', el[2]['from'])
334+
eq('userB', el[2]['to'])
335+
self.assertTrue(el[1])
336+
self.assertTrue(isinstance(el[1], int))

0 commit comments

Comments
 (0)
0