8000 Merge pull request #10 from EvaSDK/master · socrateslee/fluent-logger-python@fc10f9f · GitHub
[go: up one dir, main page]

Skip to content

Commit fc10f9f

Browse files
committed
Merge pull request fluent#10 from EvaSDK/master
Misc fixes and cosmetics
2 parents 9f5d664 + 66ea812 commit fc10f9f

File tree

8 files changed

+116
-88
lines changed

8 files changed

+116
-88
lines changed

fluent/event.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
from fluent import sender
1+
# -*- coding: utf-8 -*-
2+
23
import time
34

5+
from fluent import sender
6+
7+
48
class Event(object):
59
def __init__(self, label, data, **kwargs):
6-
if not isinstance(data, dict) :
7-
raise Exception("data must be dict")
8-
s = kwargs['sender'] if ('sender' in kwargs) else sender.get_global_sender()
9-
timestamp = kwargs['time'] if ('time' in kwargs) else int(time.time())
10-
s.emit_with_time(label, timestamp, data)
10+
assert isinstance(data, dict), 'data must be a dict'
11+
sender_ = kwargs.get('sender', sender.get_global_sender())
12+
timestamp = kwargs.get('time', int(time.time()))
13+
sender_.emit_with_time(label, timestamp, data)

fluent/handler.py

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,31 @@
1+
# -*- coding: utf-8 -*-
2+
13
import logging
2-
import os
3-
import sys
4-
import msgpack
54
import socket
6-
import threading
75

86
try:
9-
import json
10-
except ImportError:
117
import simplejson as json
8+
except ImportError:
9+
import json
1210

1311
from fluent import sender
1412

13+
1514
class FluentRecordFormatter(object):
1615
def __init__(self):
1716
self.hostname = socket.gethostname()
1817

1918
def format(self, record):
20-
data = {
21-
'sys_host' : self.hostname,
22-
'sys_name' : record.name,
23-
'sys_module' : record.module,
24-
# 'sys_lineno' : record.lineno,
25-
# 'sys_levelno' : record.levelno,
26-
# 'sys_levelname' : record.levelname,
27-
# 'sys_filename' : record.filename,
28-
# 'sys_funcname' : record.funcName,
29-
# 'sys_exc_info' : record.exc_info,
30-
}
19+
data = {'sys_host': self.hostname,
20+
'sys_name': record.name,
21+
'sys_module': record.module,
22+
# 'sys_lineno': record.lineno,
23+
# 'sys_levelno': record.levelno,
24+
# 'sys_levelname': record.levelname,
25+
# 'sys_filename': record.filename,
26+
# 'sys_funcname': record.funcName,
27+
# 'sys_exc_info': record.exc_info,
28+
}
3129
# if 'sys_exc_info' in data and data['sys_exc_info']:
3230
# data['sys_exc_info'] = self.formatException(data['sys_exc_info'])
3331

@@ -40,36 +38,41 @@ def _structuring(self, data, msg):
4038
elif isinstance(msg, str):
4139
try:
4240
self._add_dic(data, json.loads(str(msg)))
43-
except:
41+
except (ValueError, json.JSONDecodeError):
4442
pass
4543

46-
def _add_dic(self, data, dic):
47-
for k, v in dic.items():
48-
if isinstance(k, str) or isinstance(k, unicode):
49-
data[str(k)] = v
44+
@staticmethod
45+
def _add_dic(data, dic):
46+
for key, value in dic.items():
47+
if isinstance(key, basestring):
48+
data[str(key)] = value
49+
5050

5151
class FluentHandler(logging.Handler):
5252
'''
5353
Logging Handler for fluent.
5454
'''
5555
def __init__(self,
56-
tag,
57-
host='localhost',
58-
port=24224,
59-
timeout=3.0,
60-
verbose=False):
56+
tag,
57+
host='localhost',
58+
port=24224,
59+
timeout=3.0,
60+
verbose=False):
6161

6262
self.tag = tag
6363
self.sender = sender.FluentSender(tag,
6464
host=host, port=port,
6565
timeout=timeout, verbose=verbose)
66-
self.fmt = FluentRecordFormatter()
6766
logging.Handler.__init__(self)
6867

6968
def emit(self, record):
70-
if record.levelno < self.level: return
71-
data = self.fmt.format(record)
69+
data = self.format(record)
7270
self.sender.emit(None, data)
7371

74-
def _close(self):
75-
self.sender._close()
72+
def close(self):
73+
self.acquire()
74+
try:
75+
self.sender._close()
76+
logging.Handler.close(self)
77+
finally:
78+
self.release()

fluent/sender.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,28 @@
1+
# -*- coding: utf-8 -*-
2+
13
from __future__ import print_function
2-
import msgpack
34
import socket
45
import threading
56
import time
67

8+
import msgpack
9+
10+
711
_global_sender = None
812

13+
914
def setup(tag, **kwargs):
1015
host = kwargs.get('host', 'localhost')
1116
port = kwargs.get('port', 24224)
1217

1318
global _global_sender
1419
_global_sender = FluentSender(tag, host=host, port=port)
1520

21+
1622
def get_global_sender():
1723
return _global_sender
1824

25+
1926
class FluentSender(object):
2027
def __init__(self,
2128
tag,
@@ -36,10 +43,10 @@ def __init__(self,
3643
self.pendings = None
3744
self.packer = msgpack.Packer()
3845
self.lock = threading.Lock()
39-
46+
4047
try:
4148
self._reconnect()
42-
except:
49+
except Exception:
4350
# will be retried in emit()
4451
self._close()
4552

@@ -48,8 +55,8 @@ def emit(self, label, data):
4855
self.emit_with_time(label, cur_time, data)
4956

5057
def emit_with_time(self, label, timestamp, data):
51-
bytes = self._make_packet(label, timestamp, data)
52-
self._send(bytes)
58+
bytes_ = self._make_packet(label, timestamp, data)
59+
self._send(bytes_)
5360

5461
def _make_packet(self, label, timestamp, data):
5562
if label:
@@ -61,25 +68,25 @@ def _make_packet(self, label, timestamp, data):
6168
print(packet)
6269
return self.packer.pack(packet)
6370

64-
def _send(self, bytes):
71+
def _send(self, bytes_):
6572
self.lock.acquire()
6673
try:
67-
self._send_internal(bytes)
74+
self._send_internal(bytes_)
6875
finally:
6976
self.lock.release()
7077

71-
def _send_internal(self, bytes):
78+
def _send_internal(self, bytes_):
7279
# buffering
7380
if self.pendings:
74-
self.pendings += bytes
75-
bytes = self.pendings
81+
self.pendings += bytes_
82+
bytes_ = self.pendings
7683

7784
try:
7885
# reconnect if possible
7986
self._reconnect()
8087

8188
# send message
82-
self.socket.sendall(bytes)
89+
self.socket.sendall(bytes_)
8390

8491
# send finished
8592
self.pendings = None
@@ -91,7 +98,7 @@ def _send_internal(self, bytes):
9198
# TODO: add callback handler here
9299
self.pendings = None
93100
else:
94-
self.pendings = bytes
101+
self.pendings = bytes_
95102

96103
def _reconnect(self):
97104
if not self.socket:

tests/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# -*- coding: utf-8 -*-
2+
13
from tests.test_event import *
24
from tests.test_sender import *
35
from tests.test_handler import *

tests/mockserver.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1-
import socket
2-
import threading
3-
import time
4-
from msgpack import Unpacker
1+
# -*- coding: utf-8 -*-
52

63
try:
74
from cStringIO import StringIO as BytesIO
85
except ImportError:
96
from io import BytesIO
107

8+
import socket
9+
import threading
10+
import time
11+
12+
from msgpack import Unpacker
13+
14+
1115
class MockRecvServer(threading.Thread):
1216
"""
1317
Single threaded server accepts one connection and recv until EOF.
@@ -21,16 +25,16 @@ def __init__(self, port):
2125
self.start()
2226

2327
def run(self):
24-
s = self._sock
25-
s.listen(1)
26-
con, _ = s.accept()
28+
sock = self._sock
29+
sock.listen(1)
30+
con, _ = sock.accept()
2731
while True:
28-
d = con.recv(4096)
29-
if not d:
32+
data = con.recv(4096)
33+
if not data:
3034
break
31-
self._buf.write(d)
35+
self._buf.write(data)
3236
con.close()
33-
s.close()
37+
sock.close()
3438
self._sock = None
3539

3640
def wait(self):
@@ -40,5 +44,6 @@ def wait(self):
4044
def get_recieved(self):
4145
self.wait()
4246
self._buf.seek(0)
43-
# TODO: have to process string encoding properly. currently we assume that all encoding is utf-8.
47+
# TODO: have to process string encoding properly. currently we assume
48+
# that all encoding is utf-8.
4449
return list(Unpacker(self._buf, encoding='utf-8'))

tests/test_event.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
1+
# -*- coding: utf-8 -*-
2+
13
import unittest
2-
import time
34

4-
import fluent
55
from fluent import event, sender
66

7+
78
sender.setup(server='localhost', tag='app')
89

10+
911
class TestHandler(unittest.TestCase):
1012
def testLogging(self):
1113
# send event with tag app.follow
1214
event.Event('follow', {
13-
'from': 'userA',
14-
'to': 'userB'
15+
'from': 'userA',
16+
'to': 'userB'
1517
})
1618

1719
# send event with tag app.follow, with timestamp
1820
event.Event('follow', {
19-
'from': 'userA',
20-
'to': 'userB'
21+
'from': 'userA',
22+
'to': 'userB'
2123
}, time=int(0))
22-

10000 tests/test_handler.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
import unittest
2-
from tests import mockserver
1+
# -*- coding: utf-8 -*-
2+
33
import logging
4+
import unittest
5+
46
import fluent.handler
5-
import msgpack
7+
8+
from tests import mockserver
9+
610

711
class TestLogger(unittest.TestCase):
812
def setUp(self):
@@ -12,23 +16,24 @@ def setUp(self):
1216
self._server = mockserver.MockRecvServer(port)
1317
self._port = port
1418
break
15-
except IOError as e:
19+
except IOError:
1620
pass
1721

1822
def get_data(self):
1923
return self._server.get_recieved()
2024

2125
def test_simple(self):
22-
h = fluent.handler.FluentHandler('app.follow', port=self._port)
26+
handler = fluent.handler.FluentHandler('app.follow', port=self._port)
2327

2428
logging.basicConfig(level=logging.INFO)
25-
l = logging.getLogger('fluent.test')
26-
l.addHandler(h)
27-
l.info({
29+
log = logging.getLogger('fluent.test')
30+
handler.setFormatter(fluent.handler.FluentRecordFormatter())
31+
log.addHandler(handler)
32+
log.info({
2833
'from': 'userA',
2934
'to': 'userB'
3035
})
31-
h._close()
36+
handler.close()
3237

3338
data = self.get_data()
3439
eq = self.assertEqual

0 commit comments

Comments
 (0)
0