8000 Merge branch 'master' into master · yyc/fluent-logger-python@90ba39b · GitHub
[go: up one dir, main page]

Skip to content

Commit 90ba39b

Browse files
authored
Merge branch 'master' into master
2 parents 3058514 + fffa39a commit 90ba39b

File tree

3 files changed

+69
-1
lines changed

3 files changed

+69
-1
lines changed

README.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,16 @@ fluentd, with tag 'app.follow' and the attributes 'from' and 'to'.
9090
cur_time = int(time.time())
9191
logger.emit_with_time('follow', cur_time, {'from': 'userA', 'to':'userB'})
9292
93+
To send events with nanosecond-precision timestamps (Fluent 0.14 and up),
94+
specify `nanosecond_precision` on `FluentSender`.
95+
96+
.. code:: python
97+
98+
# Use nanosecond
99+
logger = sender.FluentSender('app', nanosecond_precision=True)
100+
logger.emit('follow', {'from': 'userA', 'to': 'userB'})
101+
logger.emit_with_time('follow', time.time(), {'from': 'userA', 'to': 'userB'})
102+
93103
You can detect an error via return value of `emit`. If an error happens in `emit`, `emit` returns `False` and get an error object using `last_error` method.
94104

95105
.. code:: python

fluent/sender.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# -*- coding: utf-8 -*-
22

33
from __future__ import print_function
4+
import struct
45
import socket
56
import threading
67
import time
@@ -30,6 +31,18 @@ def get_global_sender():
3031
def close():
3132
get_global_sender().close()
3233

34+
35+
class EventTime(msgpack.ExtType):
36+
def __new__(cls, timestamp):
37+
seconds = int(timestamp)
38+
nanoseconds = int(timestamp % 1 * 10 ** 9)
39+
return super(EventTime, cls).__new__(
40+
cls,
41+
code=0,
42+
data=struct.pack(">II", seconds, nanoseconds),
43+
)
44+
45+
3346
class FluentSender(object):
3447
def __init__(self,
3548
tag,
@@ -39,6 +52,7 @@ def __init__(self,
3952
timeout=3.0,
4053
verbose=False,
4154
buffer_overflow_handler=None,
55+
nanosecond_precision=False,
4256
**kwargs):
4357

4458
self.tag = tag
@@ -48,6 +62,7 @@ def __init__(self,
4862
self.timeout = timeout
4963
self.verbose = verbose
5064
self.buffer_overflow_handler = buffer_overflow_handler
65+
self.nanosecond_precision = nanosecond_precision
5166

5267
self.socket = None
5368
self.pendings = None
@@ -61,10 +76,15 @@ def __init__(self,
6176
self._close()
6277

6378
def emit(self, label, data):
64-
cur_time = int(time.time())
79+
if self.nanosecond_precision:
80+
cur_time = EventTime(time.time())
81+
else:
82+
cur_time = int(time.time())
6583
return self.emit_with_time(label, cur_time, data)
6684

6785
def emit_with_time(self, label, timestamp, data):
86+
if self.nanosecond_precision and isinstance(timestamp, float):
87+
timestamp = EventTime(timestamp)
6888
try:
6989
bytes_ = self._make_packet(label, timestamp, data)
7090
except Exception as e:

tests/test_sender.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import print_function
44
import unittest
55
import socket
6+
import msgpack
67

78
import fluent.sender
89
from tests import mockserver
@@ -76,6 +77,36 @@ def test_decorator_simple(self):
7677
self.assertTrue(data[0][1])
7778
self.assertTrue(isinstance(data[0][1], int))
7879

80+
def test_nanosecond(self):
81+
sender = self._sender
82+
sender.nanosecond_precision = True
83+
sender.emit('foo', {'bar': 'baz'})
84+
sender._close()
85+
data = self.get_data()
86+
eq = self.assertEqual
87+
eq(1, len(data))
88+
eq(3, len(data[0]))
89+
eq('test.foo', data[0][0])
90+
eq({'bar': 'baz'}, data[0][2])
91+
self.assertTrue(isinstance(data[0][1], msgpack.ExtType))
92+
eq(data[0][1].code, 0)
93+
94+
def test_nanosecond_coerce_float(self):
95+
time = 1490061367.8616468906402588
96+
sender = self._sender
97+
sender.nanosecond_precision = True
98+
sender.emit_with_time('foo', time, {'bar': 'baz'})
99+
sender._close()
100+
data = self.get_data()
101+
eq = self.assertEqual
102+
eq(1, len(data))
103+
eq(3, len(data[0]))
104+
eq('test.foo', data[0][0])
105+
eq({'bar': 'baz'}, data[0][2])
106+
self.assertTrue(isinstance(data[0][1], msgpack.ExtType))
107+
eq(data[0][1].code, 0)
108+
eq(data[0][1].data, b'X\xd0\x8873[\xb0*')
109+
79110
def test_no_last_error_on_successful_emit(self):
80111
sender = self._sender
81112
sender.emit('foo', {'bar': 'baz'})
@@ -105,3 +136,10 @@ def test_connect_exception_during_sender_init(self, mock_socket):
105136
mock_connect.side_effect = socket.error(EXCEPTION_MSG)
106137

107138
self.assertEqual(self._sender.last_error.args[0], EXCEPTION_MSG)
139+
140+
141+
class TestEventTime(unittest.TestCase):
142+
def test_event_time(self):
143+
time = fluent.sender.EventTime(1490061367.8616468906402588)
144+
self.assertEqual(time.code, 0)
145+
self.assertEqual(time.data, b'X\xd0\x8873[\xb0*')

0 commit comments

Comments
 (0)
0