1
+ # Copyright 2017 Google Inc.
2
+ #
1
3
# Licensed under the Apache License, Version 2.0 (the "License");
2
4
# you may not use this file except in compliance with the License.
3
5
# You may obtain a copy of the License at
10
12
# See the License for the specific language governing permissions and
11
13
# limitations under the License.
12
14
13
- """SSEClient module to handle streaming of realtime changes on the database
14
- to the firebase-admin-sdk
15
- """
15
+ """SSEClient module to stream realtime updates in the Firebase Database."""
16
16
17
17
import re
18
18
import time
19
19
import warnings
20
- import six
20
+
21
+ from google .auth import transport
21
22
import requests
22
23
23
24
26
27
end_of_field = re .compile (r'\r\n\r\n|\r\r|\n\n' )
27
28
28
29
29
- class KeepAuthSession (requests .Session ):
30
- """A session that does not drop Authentication on redirects between domains"""
30
+ class KeepAuthSession (transport .requests .AuthorizedSession ):
31
+ """A session that does not drop authentication on redirects between domains."""
32
+
33
+ def __init__ (self , credential ):
34
+ super (KeepAuthSession , self ).__init__ (credential )
35
+
31
36
def rebuild_auth (self , prepared_request , response ):
32
37
pass
33
38
34
39
35
40
class SSEClient (object ):
36
- """SSE Client Class"""
41
+ """SSE client implementation."""
42
+
43
+ def __init__ (self , url , session , retry = 3000 , ** kwargs ):
44
+ """Initializes the SSEClient.
37
45
38
- def __init__ (self , url , session , last_id = None , retry = 3000 , ** kwargs ):
39
- """Initialize the SSEClient
40
46
Args:
41
- url: the url to connect to
42
- session: the requests.session()
43
- last_id: optional id
44
- retry: the interval in ms
45
- **kwargs: extra kwargs will be sent to requests.get
47
+ url: The remote url to connect to.
48
+ session: The requests session.
49
+ retry: The retry interval in milliseconds (optional).
50
+ **kwargs: Extra kwargs that will be sent to ``requests.get()`` (optional).
46
51
"""
47
- self .should_connect = True
48
52
self .url = url
49
- self .last_id = last_id
50
- self .retry = retry
51
53
self .session = session
54
+ self .retry = retry
52
55
self .requests_kwargs = kwargs
56
+ self .should_connect = True
57
+ self .last_id = None
58
+ self .buf = u'' # Keep data here as it streams in
53
59
54
60
headers = self .requests_kwargs .get ('headers' , {})
55
61
# The SSE spec requires making requests with Cache-Control: nocache
56
62
headers ['Cache-Control' ] = 'no-cache'
57
63
# The 'Accept' header is not required, but explicit > implicit
58
64
headers ['Accept' ] = 'text/event-stream'
59
-
60
65
self .requests_kwargs ['headers' ] = headers
61
-
62
- # Keep data here as it streams in
63
- self .buf = u''
64
-
65
66
self ._connect ()
66
67
67
68
def close (self ):
68
- """Close the SSE Client instance"""
57AE
69
- # TODO: check if AttributeError is needed to catch here
69
+ """Closes the SSEClient instance."""
70
70
self .should_connect = False
71
71
self .retry = 0
72
72
self .resp .close ()
73
- # self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR)
74
- # self.resp.raw._fp.fp.raw._sock.close()
75
-
76
73
77
74
def _connect (self ):
78
- """connects to the server using requests"""
75
+ """Connects to the server using requests. """
79
76
if self .should_connect :
80
- success = False
81
- while not success :
82
- if self .last_id :
83
- self .requests_kwargs ['headers' ]['Last-Event-ID' ] = self .last_id
84
- # Use session if set. Otherwise fall back to requests module.
85
- self .requester = self .session or requests
86
- self .resp = self .requester .get (self .url , stream = True , ** self .requests_kwargs )
87
-
88
- self .resp_iterator = self .resp .iter_content (decode_unicode = True )
89
-
90
- # TODO: Ensure we're handling redirects. Might also stick the 'origin'
91
- # attribute on Events like the Javascript spec requires.
92
- self .resp .raise_for_status ()
93
- success = True
77
+ if self .last_id :
78
+ self .requests_kwargs ['headers' ]['Last-Event-ID' ] = self .last_id
79
+ self .resp = self .session .get (self .url , stream = True , ** self .requests_kwargs )
80
+ self .resp_iterator = self .resp .iter_content (decode_unicode = True )
81
+ self .resp .raise_for_status ()
94
82
else :
95
83
raise StopIteration ()
96
84
97
85
def _event_complete (self ):
98
- """Checks if the event is completed by matching regular expression
99
-
100
- Returns:
101
- boolean: True if the regex matched meaning end of event, else False
102
- """
86
+ """Checks if the event is completed by matching regular expression."""
103
87
return re .search (end_of_field , self .buf ) is not None
104
88
105
89
def __iter__ (self ):
@@ -113,8 +97,6 @@ def __next__(self):
113
97
F438
except (StopIteration , requests .RequestException ):
114
98
time .sleep (self .retry / 1000.0 )
115
99
self ._connect ()
116
-
117
-
118
100
# The SSE spec only supports resuming from a whole message, so
119
101
# if we have half a message we should throw it out.
120
102
head , sep , tail = self .buf .rpartition ('\n ' )
@@ -123,56 +105,54 @@ def __next__(self):
123
105
124
106
split = re .split (end_of_field , self .buf )
125
107
head = split [0 ]
126
- tail = "" .join (split [1 :])
108
+ tail = '' .join (split [1 :])
127
109
128
110
self .buf = tail
129
- msg = Event .parse (head )
111
+ event = Event .parse (head )
130
112
131
- if msg .data == " credential is no longer valid" :
113
+ if event .data == ' credential is no longer valid' :
132
114
self ._connect ()
133
115
return None
134
-
135
- if msg .data == 'null' :
116
+ elif event .data == 'null' :
136
117
return None
137
118
138
119
# If the server requests a specific retry delay, we need to honor it.
139
- if msg .retry :
140
- self .retry = msg .retry
120
+ if event .retry :
121
+ self .retry = event .retry
141
122
142
123
# last_id should only be set if included in the message. It's not
143
124
# forgotten if a message omits it.
144
- if msg .event_id :
145
- self .last_id = msg .event_id
146
-
147
- return msg
125
+ if event .event_id :
126
+ self .last_id = event .event_id
127
+ return event
148
128
149
- if six . PY2 :
150
- next = __next__
129
+ def next ( self ) :
130
+ return self . __next__ ()
151
131
152
132
153
133
class Event (object ):
154
- """Event class to handle the events fired by SSE"""
134
+ """Event represents the events fired by SSE. """
155
135
156
136
sse_line_pattern = re .compile ('(?P<name>[^:]*):?( ?(?P<value>.*))?' )
157
137
158
- def __init__ (self , data = '' , event = 'message' , event_id = None , retry = None ):
138
+ def __init__ (self , data = '' , event_type = 'message' , event_id = None , retry = None ):
159
139
self .data = data
160
- self .event = event
140
+ self .event_type = event_type
161
141
self .event_id = event_id
162
142
self .retry = retry
163
143
164
144
@classmethod
165
145
def parse (cls , raw ):
166
- """Given a possibly-multiline string representing an SSE message, parse it
167
- and return a Event object.
146
+ """Given a possibly-multiline string representing an SSE message, parses it
147
+ and returns an Event object.
168
148
169
149
Args:
170
- raw: the raw data to parse
150
+ raw: the raw data to parse.
171
151
172
152
Returns:
173
- Event: newly intialized Event() object with the parameters initialized
153
+ Event: newly intialized `` Event`` object with the parameters initialized.
174
154
"""
175
- msg = cls ()
155
+ event = cls ()
176
156
for line in raw .split ('\n ' ):
177
157
match = cls .sse_line_pattern .match (line )
178
158
if match is None :
@@ -185,22 +165,17 @@ def parse(cls, raw):
185
165
if name == '' :
186
166
# line began with a ":", so is a comment. Ignore
187
167
continue
188
-
189
- if name == 'data' :
168
+ elif name == 'data' :
190
169
# If we already have some data, then join to it with a newline.
191
170
# Else this is it.
192
- if msg .data :
193
- msg .data = '%s\n %s' % (msg .data , value )
171
+ if event .data :
172
+ event .data = '%s\n %s' % (event .data , value )
194
173
else :
195
- msg .data = value
174
+ event .data = value
196
175
elif name == 'event' :
197
- msg . event = value
176
+ event . event_type = value
198
177
elif name == 'id' :
199
- msg .event_id = value
178
+ event .event_id = value
200
179
elif name == 'retry' :
201
- msg .retry = int (value )
202
-
203
- return msg
204
-
205
- def __str__ (self ):
206
- return self .data
180
+ event .retry = int (value )
181
+ return event
0 commit comments