8000 Almost complete rewrite of async sender and async handler · arcivanov/fluent-logger-python@fad318b · GitHub
[go: up one dir, main page]

Skip to content 8000

Commit fad318b

Browse files
committed
Almost complete rewrite of async sender and async handler
Queue timeout removed as it served no purpose other than hide multiple threading issues asctime format was non-functional Many tests would silently fail and appear successful Tests now are near-instantaneous due to removal of sleep fixes fluent#105, fixes fluent#106
1 parent f357a2d commit fad318b

File tree

10 files changed

+748
-643
lines changed

10 files changed

+748
-643
lines changed

fluent/asynchandler.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,17 @@ def getSenderClass(self):
1313
return asyncsender.FluentSender
1414

1515
def close(self):
16+
self.acquire()
1617
try:
17-
self.sender.close()
18+
try:
19+
self.sender.close()
20+
finally:
21+
super(FluentHandler, self).close()
1822
finally:
19-
super(FluentHandler, self).close()
23+
self.release()
24+
25+
def __enter__(self):
26+
return self
27+
28+
def __exit__(self, exc_type, exc_val, exc_tb):
29+
self.close()

fluent/asyncsender.py

Lines changed: 63 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from __future__ import print_function
44

55
import threading
6-
import time
76

87
try:
98
from queue import Queue, Full, Empty
@@ -15,12 +14,13 @@
1514

1615
__all__ = ["EventTime", "FluentSender"]
1716

18-
_global_sender = None
19-
20-
DEFAULT_QUEUE_TIMEOUT = 0.05
2117
DEFAULT_QUEUE_MAXSIZE = 100
2218
DEFAULT_QUEUE_CIRCULAR = False
2319

20+
_TOMBSTONE = object()
21+
22+
_global_sender = None
23+
2424

2525
def _set_global_sender(sender): # pragma: no cover
2626
""" [For testing] Function to set global sender directly
@@ -42,8 +42,9 @@ def close(): # pragma: no cover
4242
get_global_sender().close()
4343

4444

45-
class CommunicatorThread(threading.Thread):
46-
def __init__(self, tag,
45+
class FluentSender(sender.FluentSender):
46+
def __init__(self,
47+
tag,
4748
host='localhost',
4849
port=24224,
4950
bufmax=1 * 1024 * 1024,
@@ -52,76 +53,42 @@ def __init__(self, tag,
5253
buffer_overflow_handler=None,
5354
nanosecond_precision=False,
5455
msgpack_kwargs=None,
55-
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
5656
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
57-
queue_circular=DEFAULT_QUEUE_CIRCULAR, *args, **kwargs):
58-
super(CommunicatorThread, self).__init__(**kwargs)
59-
self._queue = Queue(maxsize=queue_maxsize)
60-
self._do_run = True
61-
self._queue_timeout = queue_timeout
57+
queue_circular=DEFAULT_QUEUE_CIRCULAR,
58+
**kwargs):
59+
"""
60+
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
61+
"""
62+
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
63+
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
64+
nanosecond_precision=nanosecond_precision,
65+
msgpack_kwargs=msgpack_kwargs,
66+
**kwargs)
6267
self._queue_maxsize = queue_maxsize
6368
self._queue_circular = queue_circular
64-
self._conn_close_lock = threading.Lock()
65-
self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
66-
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
67-
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs)
6869

69-
def send(self, bytes_):
70-
if self._queue_circular and self._queue.full():
71-
# discard oldest
72-
try:
73-
self._queue.get(block=False)
74-
except Empty: # pragma: no cover
75-
pass
76-
try:
77-
self._queue.put(bytes_, block=(not self._queue_circular))
78-
except Full:
79-
return False
80-
return True
70+
self._thread_guard = threading.Event() # This ensures visibility across all variables
71+
self._closed = False
8172

82-
def run(self):
83-
while self._do_run:
84-
try:
85-
bytes_ = self._queue.get(block=True, timeout=self._queue_timeout)
86-
except Empty:
87-
continue
88-
with self._conn_close_lock:
89-
self._sender._send(bytes_)
90-
91-
def close(self, flush=True, discard=True):
92-
if discard:
93-
while not self._queue.empty():
94-
try:
95-
self._queue.get(block=False)
96-
except Empty:
97-
break
98-
while flush and (not self._queue.empty()):
99-
time.sleep(0.1)
100-
self._do_run = False
101-
self._sender.close()
102-
103-
def _close(self):
104-
with self._conn_close_lock:
105-
self._sender._close()
106-
107-
@property
108-
def last_error(self):
109-
return self._sender.last_error
110-
111-
@last_error.setter
112-
def last_error(self, err):
113-
self._sender.last_error = err
114-
115-
def clear_last_error(self, _thread_id=None):
116-
self._sender.clear_last_error(_thread_id=_thread_id)
117-
118-
@property
119-
def queue_timeout(self):
120-
return self._queue_timeout
121-
122-
@queue_timeout.setter
123-
def queue_timeout(self, value):
124-
self._queue_timeout = value
73+
self._queue = Queue(maxsize=queue_maxsize)
74+
self._send_thread = threading.Thread(target=self._send_loop,
75+
name="AsyncFluentSender %d" % id(self))
76+
self._send_thread.daemon = True
77+
self._send_thread.start()
78+
79+
def close(self, flush=True):
80+
with self.lock:
81+
if self._closed:
82+
return
83+
self._closed = True
84+
if not flush:
85+
while True:
86+
try:
87+
self._queue.get(block=False)
88+
except Empty:
89+
break
90+
self._queue.put(_TOMBSTONE)
91+
self._send_thread.join()
12592

12693
@property
12794
def queue_maxsize(self):
@@ -135,91 +102,35 @@ def queue_blocking(self):
135102
def queue_circular(self):
136103
return self._queue_circular
137104

138-
139-
class FluentSender(sender.FluentSender):
140-
def __init__(self,
141-
tag,
142-
host='localhost',
143-
port=24224,
144-
bufmax=1 * 1024 * 1024,
145-
timeout=3.0,
146-
verbose=False,
147-
buffer_overflow_handler=None,
148-
nanosecond_precision=False,
149-
msgpack_kwargs=None,
150-
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
151-
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
152-
queue_circular=DEFAULT_QUEUE_CIRCULAR,
153-
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
154-
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
155-
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
156-
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
157-
**kwargs)
158-
self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
159-
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
160-
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
161-
queue_timeout=queue_timeout, queue_maxsize=queue_maxsize,
162-
queue_circular=queue_circular)
163-
self._communicator.start()
164-
165105
def _send(self, bytes_):
166-
return self._communicator.send(bytes_=bytes_)
167-
168-
def _close(self):
169-
# super(FluentSender, self)._close()
170-
self._communicator._close()
171-
172-
def _send_internal(self, bytes_):
173-
assert False # pragma: no cover
174-
175-
def _send_data(self, bytes_):
176-
assert False # pragma: no cover
177-
178-
# override reconnect, so we don't open a socket here (since it
179-
# will be opened by the CommunicatorThread)
180-
def _reconnect(self):
181-
return
182-
183-
def close(self):
184-
self._communicator.close(flush=True)
185-
self._communicator.join()
186-
return super(FluentSender, self).close()
187-
188-
@property
189-
def last_error(self):
190-
return self._communicator.last_error
191-
192-
@last_error.setter
193-
def last_error(self, err):
194-
self._communicator.last_error = err
195-
196-
def clear_last_error(self, _thread_id=None):
197-
self._communicator.clear_last_error(_thread_id=_thread_id)
106+
with self.lock:
107+
if self._closed:
108+
return False
109+
if self._queue_circular and self._queue.full():
110+
# discard oldest
111+
try:
112+
self._queue.get(block=False)
113+
except Empty: # pragma: no cover
114+
pass
115+
try:
116+
self._queue.put(bytes_, block=(not self._queue_circular))
117+
except Full: # pragma: no cover
118+
return False # this actually can't happen
198119

199-
@property
200-
def queue_timeout(self):
201-
return self._communicator.queue_timeout
120+
return True
202121

203-
@queue_timeout.setter
204-
def queue_timeout(self, value):
205-
self._communicator.queue_timeout = value
122+
def _send_loop(self):
123+
send_internal = super(FluentSender, self)._send_internal
206124

207-
@property
208-
def queue_maxsize(self):
209-
return self._communicator.queue_maxsize
210-
211-
@property
212-
def queue_blocking(self):
213-
return self._communicator.queue_blocking
214-
215-
@property
216-
def queue_circular(self):
217-
return self._communicator.queue_circular
125+
try:
126+
while True:
127+
bytes_ = self._queue.get(block=True)
128+
if bytes_ is _TOMBSTONE:
129+
break
218130

219-
def __enter__(self):
220-
return self
131+
send_internal(bytes_)
132+
finally:
133+
self._close()
221134

222-
def __exit__(self, typ, value, traceback):
223-
# give time to the comm. thread to send its queued messages
224-
time.sleep(0.2)
135+
def __exit__(self, exc_type, exc_val, exc_tb):
225136
self.close()

fluent/handler.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False
8181

8282
def format(self, record):
8383
# Only needed for python2.6
84-
if sys.version_info[0:2] <= (2, 6) and self.usesTime():
84+
if sys.version_info[0:2] <= (2, 6) and self.usesTime(): # pragma: no cover
8585
record.asctime = self.formatTime(record, self.datefmt)
8686

8787
# Compute attributes handled by parent class.
@@ -116,8 +116,11 @@ def usesTime(self):
116116
if self._exc_attrs is not None:
117117
return super(FluentRecordFormatter, self).usesTime()
118118
else:
119-
return any([value.find('%(asctime)') >= 0
120-
for value in self._fmt_dict.values()])
119+
if self.__style:
120+
search = self.__style.asctime_search
121+
else:
122+
search = "%(asctime)"
123+
return any([value.find(search) >= 0 for value in self._fmt_dict.values()])
121124

122125
def _structuring(self, data, record):
123126
""" Melds `msg` into `data`.
@@ -209,7 +212,15 @@ def emit(self, record):
209212
def close(self):
210213
self.acquire()
211214
try:
212-
self.sender._close()
213-
logging.Handler.close(self)
215+
try:
216+
self.sender.close()
217+
finally:
218+
super(FluentHandler, self).close()
214219
finally:
215220
self.release()
221+
222+
def __enter__(self):
223+
return self
224+
225+
def __exit__(self, exc_type, exc_val, exc_tb):
226+
self.close()

0 commit comments

Comments
 (0)
0