8000 Add support for asynchronous threaded sender and logging handler. · fluent/fluent-logger-python@d9a7ab6 · GitHub
[go: up one dir, main page]

Skip to content

Commit d9a7ab6

Browse files
committed
Add support for asynchronous threaded sender and logging handler.
These are implemented respectively in asyncsender and asynchandler, with the same interfaces as their synchronous counterparts.
1 parent 2fb58a6 commit d9a7ab6

File tree

5 files changed

+640
-6
lines changed

5 files changed

+640
-6
lines changed

fluent/asynchandler.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# -*- coding: utf-8 -*-
2+
3+
from fluent import asyncsender
4+
from fluent import handler
5+
6+
7+
class FluentHandler(handler.FluentHandler):
8+
'''
9+
Asynchronous Logging Handler for fluent.
10+
'''
11+
12+
def getSenderClass(self):
13+
return asyncsender.FluentSender
14+
15+
def close(self):
16+
self.sender.close()
17+
super(FluentHandler, self).close()

fluent/asyncsender.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
# -*- coding: utf-8 -*-
2+
3+
from __future__ import print_function
4+
import threading
5+
import time
6+
try:
7+
from queue import Queue, Full, Empty
8+
except ImportError:
9+
from Queue import Queue, Full, Empty
10+
11+
from fluent import sender
12+
from fluent.sender import EventTime
13+
14+
_global_sender = None
15+
16+
17+
def _set_global_sender(sender):
18+
""" [For testing] Function to set global sender directly
19+
"""
20+
global _global_sender
21+
_global_sender = sender
22+
23+
24+
def setup(tag, **kwargs):
25+
global _global_sender
26+
_global_sender = FluentSender(tag, **kwargs)
27+
28+
29+
def get_global_sender():
30+
return _global_sender
31+
32+
def close():
33+
get_global_sender().close()
34+
35+
36+
class CommunicatorThread(threading.Thread):
37+
def __init__(self, tag,
38+
host='localhost',
39+
port=24224,
40+
bufmax=1 * 1024 * 1024,
41+
timeout=3.0,
42+
verbose=False,
43+
buffer_overflow_handler=None,
44+
nanosecond_precision=False,
45+
msgpack_kwargs=None, *args, **kwargs):
46+
super(CommunicatorThread, self).__init__(**kwargs)
47+
self._queue = Queue()
48+
self._do_run = True
49+
self._conn_close_lock = threading.Lock()
50+
self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
51+
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
52+
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs)
53+
54+
def send(self, bytes_):
55+
try:
56+
self._queue.put(bytes_)
57+
except Full:
58+
return False
59+
return True
60+
61+
def run(self):
62+
while self._do_run:
63+
try:
64+
bytes_ = self._queue.get(block=False)
65+
except Empty:
66+
continue
67+
self._conn_close_lock.acquire()
68+
self._sender._send(bytes_)
69+
self._conn_close_lock.release()
70+
71+
def close(self, flush=True, discard=True):
72+
if discard:
73+
while not self._queue.empty():
74+
try:
75+
self._queue.get(block=False)
76+
except Empty:
77+
break
78+
while flush and (not self._queue.empty()):
79+
time.sleep(0.1)
80+
self._do_run = False
81+
self._sender.close()
82+
83+
def _close(self):
84+
self._conn_close_lock.acquire()
85+
# self._sender.lock.acquire()
86+
try:
87+
self._sender._close()
88+
finally:
89+
# self._sender.lock.release()
90+
self._conn_close_lock.release()
91+
pass
92+
93+
@property
94+
def last_error(self):
95+
return self._sender.last_error
96+
97+
@last_error.setter
98+
def last_error(self, err):
99+
self._sender.last_error = err
100+
101+
def clear_last_error(self, _thread_id = None):
102+
self._sender.clear_last_error(_thread_id=_thread_id)
103+
104+
def __enter__(self):
105+
return self
106+
107+
def __exit__(self, typ, value, traceback):
108+
self.close()
109+
110+
111+
class FluentSender(sender.FluentSender):
112+
def __init__(self,
113+
tag,
114+
host='localhost',
115+
port=24224,
116+
bufmax=1 * 1024 * 1024,
117+
timeout=3.0,
118+
verbose=False,
119+
buffer_overflow_handler=None,
120+
nanosecond_precision=False,
121+
msgpack_kwargs=None,
122+
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
123+
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
124+
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
125+
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
126+
**kwargs)
127+
self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
128+
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
129+
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs)
130+
self._communicator.start()
131+
132+
def _send(self, bytes_):
133+
return self._communicator.send(bytes_=bytes_)
134+
135+
def _close(self):
136+
# super(FluentSender, self)._close()
137+
self._communicator._close()
138+
139+
def _send_internal(self, bytes_):
140+
return
141+
142+
def _send_data(self, bytes_):
143+
return
144+
145+
# override reconnect, so we don't open a socket here (since it
146+
# will be opened by the CommunicatorThread)
147+
def _reconnect(self):
148+
return
149+
150+
def close(self):
151+
self._communicator.close(flush=True)
152+
self._communicator.join()
153+
return super(FluentSender, self).close()
154+
155+
@property
156+
def last_error(self):
157+
return self._communicator.last_error
158+
159+
@last_error.setter
160+
def last_error(self, err):
161+
self._communicator.last_error = err
162+
163+
def clear_last_error(self, _thread_id = None):
164+
self._communicator.clear_last_error(_thread_id=_thread_id)
165+
166+
def __enter__(self):
167+
return self
168+
169+
def __exit__(self, typ, value, traceback):
170+
# give time to the comm. thread to send its queued messages
171+
time.sleep(0.2)
172+
self.close()

fluent/handler.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,14 +141,28 @@ def __init__(self,
141141
nanosecond_precision=False):
142142

143143
self.tag = tag
144-
self.sender = sender.FluentSender(tag,
145-
host=host, port=port,
146-
timeout=timeout, verbose=verbose,
147-
buffer_overflow_handler=buffer_overflow_handler,
148-
msgpack_kwargs=msgpack_kwargs,
149-
nanosecond_precision=nanosecond_precision)
144+
self.sender = self.getSenderInstance(tag,
145+
host=host, port=port,
146+
timeout=timeout, verbose=verbose,
147+
buffer_overflow_handler=buffer_overflow_handler,
148+
msgpack_kwargs=msgpack_kwargs,
149+
nanosecond_precision=nanosecond_precision)
150150
logging.Handler.__init__(self)
151151

152+
def getSenderClass(self):
153+
return sender.FluentSender
154+
155+
def getSenderInstance(self, tag, host, port, timeout, verbose,
156+
buffer_overflow_handler, msgpack_kwargs,
157+
nanosecond_precision):
158+
sender_class = self.getSenderClass()
159+
return sender_class(tag,
160+
host=host, port=port,
161+
timeout=timeout, verbose=verbose,
162+
buffer_overflow_handler=buffer_overflow_handler,
163+
msgpack_kwargs=msgpack_kwargs,
164+
nanosecond_precision=nanosecond_precision)
165+
152166
def emit(self, record):
153167
data = self.format(record)
154168
return self.sender.emit(None, data)

0 commit comments

Comments
 (0)
0