12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
15
- """SSEClient module to stream realtime updates in the Firebase Database."""
15
+ """SSEClient module to stream realtime updates from the Firebase Database.
16
+
17
+ Based on a similar implementation from Pyrebase.
18
+ """
16
19
17
20
import re
18
21
import time
@@ -37,6 +40,34 @@ def rebuild_auth(self, prepared_request, response):
37
40
pass
38
41
39
42
43
+ class _EventBuffer (object ):
44
+ """A helper class for buffering and parsing raw SSE data."""
45
+
46
+ def __init__ (self ):
47
+ self ._buffer = []
48
+ self ._tail = ''
49
+
50
+ def append (self , char ):
51
+ self ._buffer .append (char )
52
+ self ._tail += char
53
+ self ._tail = self ._tail [- 4 :]
54
+
55
+ def truncate (self ):
56
+ head , sep , _ = self .buffer_string .rpartition ('\n ' )
57
+ rem = head + sep
58
+ self ._buffer = list (rem )
59
+ self ._tail = rem [- 4 :]
60
+
61
+ @property
62
+ def is_end_of_field (self ):
63
+ last_two_chars = self ._tail [- 2 :]
64
+ return last_two_chars == '\n \n ' or last_two_chars == '\r \r ' or self ._tail == '\r \n \r \n '
65
+
66
+ @property
67
+ def buffer_string (self ):
68
+ return '' .join (self ._buffer )
69
+
70
+
40
71
class SSEClient (object ):
41
72
"""SSE client implementation."""
42
73
@@ -58,7 +89,7 @@ def __init__(self, url, session, retry=3000, **kwargs):
58
89
self .buf = u'' # Keep data here as it streams in
59
90
60
91
headers = self .requests_kwargs .get ('headers' , {})
61
- # The SSE spec requires making requests with Cache-Control: nocache
92
+ # The SSE spec requires making requests with Cache-Control: no-cache
62
93
headers ['Cache-Control' ] = 'no-cache'
63
94
# The 'Accept' header is not required, but explicit > implicit
64
95
headers ['Accept' ] = 'text/event-stream'
@@ -82,32 +113,28 @@ def _connect(self):
82
113
else :
83
114
raise StopIteration ()
84
115
85
- def _event_complete (self ):
86
- """Checks if the event is completed by matching regular expression."""
87
- return re .search (end_of_field , self .buf ) is not None
88
-
89
116
def __iter__ (self ):
90
117
return self
91
118
92
119
def __next__ (self ):
93
- while not self ._event_complete ():
94
- try :
95
- nextchar = next (self .resp_iterator )
96
- self .buf += nextchar
97
- except (StopIteration , requests .RequestException ):
98
- time .sleep (self .retry / 1000.0 )
99
- self ._connect ()
100
- # The SSE spec only supports resuming from a whole message, so
101
- # if we have half a message we should throw it out.
102
- head , sep , tail = self .buf .rpartition ('\n ' )
103
- self .buf = head + sep
104
- continue
120
+ if not re .search (end_of_field , self .buf ):
121
+ temp_buffer = _EventBuffer ()
122
+ while not temp_buffer .is_end_of_field :
123
+ try :
124
+ nextchar = next (self .resp_iterator )
125
+ temp_buffer .append (nextchar )
126
+ except (StopIteration , requests .RequestException ):
127
+ time .sleep (self .retry / 1000.0 )
128
+ self ._connect ()
129
+ # The SSE spec only supports resuming from a whole message, so
130
+ # if we have half a message we should throw it out.
131
+ temp_buffer .truncate ()
132
+ continue
133
+ self .buf = temp_buffer .buffer_string
105
134
106
135
split = re .split (end_of_field , self .buf )
107
136
head = split [0 ]
108
- tail = '' .join (split [1 :])
109
-
110
- self .buf = tail
137
+ self .buf = '\n \n ' .join (split [1 :])
111
138
event = Event .parse (head )
112
139
113
140
if event .data == 'credential is no longer valid' :
@@ -150,7 +177,7 @@ def parse(cls, raw):
150
177
raw: the raw data to parse.
151
178
152
179
Returns:
153
- Event: newly intialized ``Event`` object with the parameters initialized.
180
+ Event: A new ``Event`` with the parameters initialized.
154
181
"""
155
182
event = cls ()
156
183
for line in raw .split ('\n ' ):
0 commit comments