8000 Merge pull request #58 from fluent/improve-pending-handling · dotlambda/fluent-logger-python@9506e5a · GitHub
[go: up one dir, main page]

Skip to content
8000

Commit 9506e5a

Browse files
authored
Merge pull request fluent#58 from fluent/improve-pending-handling
Improve pending handling
2 parents d3a89e1 + 501bb98 commit 9506e5a

File tree

2 files changed

+62
-6
lines changed

2 files changed

+62
-6
lines changed

README.rst

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,33 @@ fluent, with tag 'app.follow' and the attributes 'from' and 'to'.
9393
'to': 'userB'
9494
})
9595
96+
If you want to shutdown the client, call `close()` method.
97+
98+
.. code:: python
99+
100+
sender.close()
101+
102+
Handler for buffer overflow
103+
~~~~~~~~~~~~~~~~~~~~~~~~~~~
104+
105+
You can inject your own custom proc to handle buffer overflow in the event of connection failure. This will mitigate the loss of data instead of simply throwing data away.
106+
107+
.. code:: python
108+
109+
import msgpack
110+
from io import BytesIO
111+
112+
def handler(pendings):
113+
unpacker = msgpack.Unpacker(BytesIO(pendings))
114+
for unpacked in unpacker:
115+
print(unpacked)
116+
117+
sender.setup('app', host='host', port=24224, buffer_overflow_handler=handler)
118+
119+
You should handle any exception in handler. fluent-logger ignores exceptions from ``buffer_overflow_handler``.
120+
121+
This handler is also called when pending events exist during `close()`.
122+
96123
Python logging.Handler interface
97124
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
98125

fluent/sender.py

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ def setup(tag, **kwargs):
2727
def get_global_sender():
2828
return _global_sender
2929

30+
def close():
31+
get_global_sender().close()
3032

3133
class FluentSender(object):
3234
def __init__(self,
@@ -36,6 +38,7 @@ def __init__(self,
3638
bufmax=1 * 1024 * 1024,
3739
timeout=3.0,
3840
verbose=False,
41+
buffer_overflow_handler=None,
3942
**kwargs):
4043

4144
self.tag = tag
@@ -44,6 +47,7 @@ def __init__(self,
4447
self.bufmax = bufmax
4548
self.timeout = timeout
4649
self.verbose = verbose
50+
self.buffer_overflow_handler = buffer_overflow_handler
4751

4852
self.socket = None
4953
self.pendings = None
@@ -69,6 +73,21 @@ def emit_with_time(self, label, timestamp, data):
6973
"traceback": traceback.format_exc()})
7074
self._send(bytes_)
7175

76+
def close(self):
77+
self.lock.acquire()
78+
try:
79+
if self.pendings:
80+
try:
81+
self._send_data(self.pendings)
82+
except Exception:
83+
self._call_buffer_overflow_handler(self.pendings)
84+
85+
self._close()
86+
self.pendings = None
87+
finally:
88+
self.lock.release()
89+
90+
7291
def _make_packet(self, label, timestamp, data):
7392
if label:
7493
tag = '.'.join((self.tag, label))
@@ -93,11 +112,7 @@ def _send_internal(self, bytes_):
93112
bytes_ = self.pendings
94113

95114
try:
96-
# reconnect if possible
97-
self._reconnect()
98-
99-
# send message
100-
self.socket.sendall(bytes_)
115+
self._send_data(bytes_)
101116

102117
# send finished
103118
self.pendings = None
@@ -106,11 +121,17 @@ def _send_internal(self, bytes_):
106121
self._close()
107122
# clear buffer if it exceeds max bufer size
108123
if self.pendings and (len(self.pendings) > self.bufmax):
109-
# TODO: add callback handler here
124+
self._call_buffer_overflow_handler(self.pendings)
110125
self.pendings = None
111126
else:
112127
self.pendings = bytes_
113128

129+
def _send_data(self, bytes_):
130+
# reconnect if possible
131+
self._reconnect()
132+
# send message
133+
self.socket.sendall(bytes_)
134+
114135
def _reconnect(self):
115136
if not self.socket:
116137
if self.host.startswith('unix://'):
@@ -123,6 +144,14 @@ def _reconnect(self):
123144
sock.connect((self.host, self.port))
124145
self.socket = sock
125146

147+
def _call_buffer_overflow_handler(self, pending_events):
148+
try:
149+
if self.buffer_overflow_handler:
150+
self.buffer_overflow_handler(pending_events)
151+
except Exception as e:
152+
# User should care any exception in handler
153+
pass
154+
126155
def _close(self):
127156
if self.socket:
128157
self.socket.close()

0 commit comments

Comments
 (0)
0