1
- import os
2
- import sys , urllib
3
1
import msgpack
4
2
import socket
5
3
import threading
6
- import json
7
4
import time
8
5
9
- global_sender = None
6
+ _global_sender = None
10
7
11
8
def setup (tag , ** kwargs ):
12
- host = ('host' in kwargs ) and kwargs [ 'host' ] or ' localhost'
13
- port = ('port' in kwargs ) and kwargs [ 'port' ] or 24224
9
+ host = kwargs . get ('host' , ' localhost')
10
+ port = kwargs . get ('port' , 24224 )
14
11
15
- global global_sender
16
- global_sender = FluentSender (tag , host = host , port = port )
12
+ global _global_sender
13
+ _global_sender = FluentSender (tag , host = host , port = port )
17
14
18
15
def get_global_sender ():
19
- global global_sender
20
- return global_sender
16
+ return _global_sender
21
17
22
18
class FluentSender (object ):
23
19
def __init__ (self ,
24
- tag ,
25
- host = '127.0.0.1 ' ,
26
- port = 24224 ,
27
- bufmax = 1 * 1024 * 1024 ,
28
- timeout = 3.0 ,
29
- verbose = False ):
20
+ tag ,
21
+ host = 'localhost ' ,
22
+ port = 24224 ,
23
+ bufmax = 1 * 1024 * 1024 ,
24
+ timeout = 3.0 ,
25
+ verbose = False ):
30
26
31
27
self .tag = tag
32
28
self .host = host
@@ -64,9 +60,9 @@ def _close(self):
64
60
self .socket = None
65
61
66
62
def _make_packet (self , label , data ):
67
- tag = "%s.%s" % (self .tag , label )
68
- cur_time = int (time .mktime ( time . localtime () ))
69
- packet = [ tag , cur_time , data ]
63
+ tag = '.' . join (self .tag , label )
64
+ cur_time = int (time .time ( ))
65
+ packet = ( tag , cur_time , data )
70
66
if self .verbose :
71
67
print packet
72
68
return self .packer .pack (packet )
@@ -89,17 +85,11 @@ def _send_internal(self, bytes):
89
85
self ._reconnect ()
90
86
91
87
# send message
92
- total = len (bytes )
93
- nsent = 0
94
- while nsent < total :
95
- n = self .socket .send (bytes [nsent :])
96
- if n <= 0 :
97
- raise RuntimeError ("socket connection broken" )
98
- nsent += n
88
+ self .socket .sendall (bytes )
99
89
100
90
# send finished
101
91
self .pendings = None
102
- except :
92
+ except Exception :
103
93
# close socket
104
94
self ._close ()
105
95
# clear buffer if it exceeds max bufer size
0 commit comments