8000 Add buffer_overflow_handler · dotlambda/fluent-logger-python@3909c43 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3909c43

Browse files
committed
Add buffer_overflow_handler
1 parent d3a89e1 commit 3909c43

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

README.rst

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,25 @@ fluent, with tag 'app.follow' and the attributes 'from' and 'to'.
9393
'to': 'userB'
9494
})
9595
96+
Handler for buffer overflow
97+
~~~~~~~~~~~~~~~~~~~~~~~~~~~
98+
99+
You can inject your own custom proc to handle buffer overflow in the event of connection failure. This will mitigate the loss of data instead of simply throwing data away.
100+
101+
.. code:: python
102+
103+
import msgpack
104+
from io import BytesIO
105+
106+
def handler(pendings):
107+
unpacker = msgpack.Unpacker(BytesIO(pendings))
108+
for unpacked in unpacker:
109+
print(unpacked)
110+
111+
sender.setup('app', host='host', port=24224, buffer_overflow_handler=handler)
112+
113+
You should handle any exception in handler. fluent-logger ignores exceptions from ``buffer_overflow_handler``.
114+
96115
Python logging.Handler interface
97116
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
98117

fluent/sender.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ def __init__(self,
3636
bufmax=1 * 1024 * 1024,
3737
timeout=3.0,
3838
verbose=False,
39+
buffer_overflow_handler=None,
3940
**kwargs):
4041

4142
self.tag = tag
@@ -44,6 +45,7 @@ def __init__(self,
4445
self.bufmax = bufmax
4546
self.timeout = timeout
4647
self.verbose = verbose
48+
self.buffer_overflow_handler = buffer_overflow_handler
4749

4850
self.socket = None
4951
self.pendings = None
@@ -106,7 +108,7 @@ def _send_internal(self, bytes_):
106108
self._close()
107109
# clear buffer if it exceeds max bufer size
108110
if self.pendings and (len(self.pendings) > self.bufmax):
109-
# TODO: add callback handler here
111+
self._call_buffer_overflow_handler(self.pendings)
110112
self.pendings = None
111113
else:
112114
self.pendings = bytes_
@@ -123,6 +125,14 @@ def _reconnect(self):
123125
sock.connect((self.host, self.port))
124126
self.socket = sock
125127

128+
def _call_buffer_overflow_handler(self, pending_events):
129+
try:
130+
if self.buffer_overflow_handler:
131+
self.buffer_overflow_handler(pending_events)
132+
except Exception as e:
133+
# User should care any exception in handler
134+
pass
135+
126136
def _close(self):
127137
if self.socket:
128138
self.socket.close()

0 commit comments

Comments
 (0)
0