10000 Asynchronous threaded support for senders and handlers. by panta · Pull Request #95 · fluent/fluent-logger-python · GitHub
[go: up one dir, main page]

Skip to content

Asynchronous threaded support for senders and handlers. #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,72 @@ A sample configuration ``logging.yaml`` would be:
level: DEBUG
propagate: False

Asynchronous Communication
~~~~~~~~~~~~~~~~~~~~~~~~~~

Besides the regular interfaces - the event-based one provided by ``sender.FluentSender`` and the python logging one
provided by ``handler.FluentHandler`` - there are also corresponding asynchronous versions in ``asyncsender`` and
``asynchandler`` respectively. These versions use a separate thread to handle the communication with the remote fluentd
server. In this way the client of the library won't be blocked during the logging of the events, and won't risk going
into timeout if the fluentd server becomes unreachable. Also it won't be slowed down by the network overhead.

The interfaces in ``asyncsender`` and ``asynchandler`` are exactly the same as those in ``sender`` and ``handler``, so it's
just a matter of importing from a different module.

For instance, for the event-based interface:

.. code:: python

from fluent import asyncsender as sender

# for local fluent
sender.setup('app')

# for remote fluent
sender.setup('app', host='host', port=24224)

# do your work
...

# IMPORTANT: before program termination, close the sender
sender.close()

or for the python logging interface:

.. code:: python

import logging
from fluent import asynchandler as handler

custom_format = {
'host': '%(hostname)s',
'where': '%(module)s.%(funcName)s',
'type': '%(levelname)s',
'stack_trace': '%(exc_text)s'
}

logging.basicConfig(level=logging.INFO)
l = logging.getLogger('fluent.test')
h = handler.FluentHandler('app.follow', host='host', port=24224, buffer_overflow_handler=overflow_handler)
formatter = handler.FluentRecordFormatter(custom_format)
h.setFormatter(formatter)
l.addHandler(h)
l.info({
'from': 'userA',
'to': 'userB'
})
l.info('{"from": "userC", "to": "userD"}')
l.info("This log entry will be logged with the additional key: 'message'.")

...

# IMPORTANT: before program termination, close the handler
h.close()

**NOTE**: please note that it's important to close the sender or the handler at program termination. This will make
sure the communication thread terminates and it's joined correctly. Otherwise the program won't exit, waiting for
the thread, unless forcibly killed.

Testing
-------

Expand Down
18 changes: 18 additions & 0 deletions fluent/asynchandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-

from fluent import asyncsender
from fluent import handler
from fluent.handler import FluentRecordFormatter


class FluentHandler(handler.FluentHandler):
'''
Asynchronous Logging Handler for fluent.
'''

def getSenderClass(self):
return asyncsender.FluentSender

def close(self):
self.sender.close()
super(FluentHandler, self).close()
195 changes: 195 additions & 0 deletions fluent/asyncsender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
# -*- coding: utf-8 -*-

from __future__ import print_function
import threading
import time
try:
from queue import Queue, Full, Empty
except ImportError:
from Queue import Queue, Full, Empty

from fluent import sender
from fluent.sender import EventTime

_global_sender = None

DEFAULT_QUEUE_TIMEOUT = 0.05


def _set_global_sender(sender):
""" [For testing] Function to set global sender directly
"""
global _global_sender
_global_sender = sender


def setup(tag, **kwargs):
global _global_sender
_global_sender = FluentSender(tag, **kwargs)


def get_global_sender():
return _global_sender


def close():
get_global_sender().close()


class CommunicatorThread(threading.Thread):
def __init__(self, tag,
host='localhost',
port=24224,
bufmax=1 * 1024 * 1024,
timeout=3.0,
verbose=False,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
queue_timeout=DEFAULT_QUEUE_TIMEOUT, *args, **kwargs):
super(CommunicatorThread, self).__init__(**kwargs)
self._queue = Queue()
self._do_run = True
self._queue_timeout = queue_timeout
self._conn_close_lock = threading.Lock()
self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs)

def send(self, bytes_):
try:
self._queue.put(bytes_)
except Full:
return False
return True

def run(self):
while self._do_run:
try:
bytes_ = self._queue.get(block=True, timeout=self._queue_timeout)
except Empty:
continue
self._conn_close_lock.acquire()
self._sender._send(bytes_)
self._conn_close_lock.release()

def close(self, flush=True, discard=True):
if discard:
while not self._queue.empty():
try:
self._queue.get(block=False)
except Empty:
break
while flush and (not self._queue.empty()):
time.sleep(0.1)
self._do_run = False
self._sender.close()

def _close(self):
self._conn_close_lock.acquire()
# self._sender.lock.acquire()
try:
self._sender._close()
finally:
# self._sender.lock.release()
self._conn_close_lock.release()
pass

@property
def last_error(self):
return self._sender.last_error

@last_error.setter
def last_error(self, err):
self._sender.last_error = err

def clear_last_error(self, _thread_id = None):
self._sender.clear_last_error(_thread_id=_thread_id)

@property
def queue_timeout(self):
return self._queue_timeout

@queue_timeout.setter
def queue_timeout(self, value):
self._queue_timeout = value

def __enter__(self):
return self

def __exit__(self, typ, value, traceback):
self.close()


class FluentSender(sender.FluentSender):
def __init__(self,
tag,
host='localhost',
port=24224,
bufmax=1 * 1024 * 1024,
timeout=3.0,
verbose=False,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
**kwargs)
self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
queue_timeout=queue_timeout)
self._communicator.start()

def _send(self, bytes_):
return self._communicator.send(bytes_=bytes_)

def _close(self):
# super(FluentSender, self)._close()
self._communicator._close()

def _send_internal(self, bytes_):
return

def _send_data(self, bytes_):
return

# override reconnect, so we don't open a socket here (since it
# will be opened by the CommunicatorThread)
def _reconnect(self):
return

def close(self):
self._communicator.close(flush=True)
self._communicator.join()
return super(FluentSender, self).close()

@property
def last_error(self):
return self._communicator.last_error

@last_error.setter
def last_error(self, err):
self._communicator.last_error = err

def clear_last_error(self, _thread_id = None):
self._communicator.clear_last_error(_thread_id=_thread_id)

@property
def queue_timeout(self):
return self._communicator.queue_timeout

@queue_timeout.setter
def queue_timeout(self, value):
self._communicator.queue_timeout = value

def __enter__(self):
return self

def __exit__(self, typ, value, traceback):
# give time to the comm. thread to send its queued messages
time.sleep(0.2)
self.close()
26 changes: 20 additions & 6 deletions fluent/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,28 @@ def __init__(self,
nanosecond_precision=False):

self.tag = tag
self.sender = sender.FluentSender(tag,
host=host, port=port,
timeout=timeout, verbose=verbose,
buffer_overflow_handler=buffer_overflow_handler,
msgpack_kwargs=msgpack_kwargs,
nanosecond_precision=nanosecond_precision)
self.sender = self.getSenderInstance(tag,
host=host, port=port,
timeout=timeout, verbose=verbose,
buffer_overflow_handler=buffer_overflow_handler,
msgpack_kwargs=msgpack_kwargs,
nanosecond_precision=nanosecond_precision)
logging.Handler.__init__(self)

def getSenderClass(self):
return sender.FluentSender

def getSenderInstance(self, tag, host, port, timeout, verbose,
buffer_overflow_handler, msgpack_kwargs,
nanosecond_precision):
sender_class = self.getSenderClass()
return sender_class(tag,
host=host, port=port,
timeout=timeout, verbose=verbose,
buffer_overflow_handler=buffer_overflow_handler,
msgpack_kwargs=msgpack_kwargs,
nanosecond_precision=nanosecond_precision)

def emit(self, record):
data = self.format(record)
return self.sender.emit(None, data)
Expand Down
Loading
0