8000 Fixed and refactored the db.Reference.listen() API (#190) · rinlevan/firebase-admin-python@62fb76d · GitHub
[go: up one dir, main page]

Skip to content

Commit 62fb76d

Browse files
authored
Fixed and refactored the db.Reference.listen() API (firebase#190)
* Fixing a credentials issue in listener * Added more tests * Updated changelog * Further clean up and improvement * Improved error handling in listen() method
1 parent 1e61f2e commit 62fb76d

File tree

5 files changed

+309
-183
lines changed

5 files changed

+309
-183
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# Unreleased
22

3+
- [added] The `db.Reference` type now provides a `listen()` API for
4+
receiving realtime update events from the Firebase Database.
35
- [added] The `db.reference()` method now optionally takes a `url`
46
parameter. This can be used to access multiple Firebase Databases
57
in the same project more easily.

firebase_admin/_sseclient.py

Lines changed: 58 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright 2017 Google Inc.
2+
#
13
# Licensed under the Apache License, Version 2.0 (the "License");
24
# you may not use this file except in compliance with the License.
35
# You may obtain a copy of the License at
@@ -10,14 +12,13 @@
1012
# See the License for the specific language governing permissions and
1113
# limitations under the License.
1214

13-
"""SSEClient module to handle streaming of realtime changes on the database
14-
to the firebase-admin-sdk
15-
"""
15+
"""SSEClient module to stream realtime updates in the Firebase Database."""
1616

1717
import re
1818
import time
1919
import warnings
20-
import six
20+
21+
from google.auth import transport
2122
import requests
2223

2324

@@ -26,80 +27,63 @@
2627
end_of_field = re.compile(r'\r\n\r\n|\r\r|\n\n')
2728

2829

29-
class KeepAuthSession(requests.Session):
30-
"""A session that does not drop Authentication on redirects between domains"""
30+
class KeepAuthSession(transport.requests.AuthorizedSession):
31+
"""A session that does not drop authentication on redirects between domains."""
32+
33+
def __init__(self, credential):
34+
super(KeepAuthSession, self).__init__(credential)
35+
3136
def rebuild_auth(self, prepared_request, response):
3237
pass
3338

3439

3540
class SSEClient(object):
36-
"""SSE Client Class"""
41+
"""SSE client implementation."""
42+
43+
def __init__(self, url, session, retry=3000, **kwargs):
44+
"""Initializes the SSEClient.
3745
38-
def __init__(self, url, session, last_id=None, retry=3000, **kwargs):
39-
"""Initialize the SSEClient
4046
Args:
41-
url: the url to connect to
42-
session: the requests.session()
43-
last_id: optional id
44-
retry: the interval in ms
45-
**kwargs: extra kwargs will be sent to requests.get
47+
url: The remote url to connect to.
48+
session: The requests session.
49+
retry: The retry interval in milliseconds (optional).
50+
**kwargs: Extra kwargs that will be sent to ``requests.get()`` (optional).
4651
"""
47-
self.should_connect = True
4852
self.url = url
49-
self.last_id = last_id
50-
self.retry = retry
5153
self.session = session
54+
self.retry = retry
5255
self.requests_kwargs = kwargs
56+
self.should_connect = True
57+
self.last_id = None
58+
self.buf = u'' # Keep data here as it streams in
5359

5460
headers = self.requests_kwargs.get('headers', {})
5561
# The SSE spec requires making requests with Cache-Control: nocache
5662
headers['Cache-Control'] = 'no-cache'
5763
# The 'Accept' header is not required, but explicit > implicit
5864
headers['Accept'] = 'text/event-stream'
59-
6065
self.requests_kwargs['headers'] = headers
61-
62-
# Keep data here as it streams in
63-
self.buf = u''
64-
6566
self._connect()
6667

6768
def close(self):
68-
"""Close the SSE Client instance"""
69-
# TODO: check if AttributeError is needed to catch here
69+
"""Closes the SSEClient instance."""
7070
self.should_connect = False
7171
self.retry = 0
7272
self.resp.close()
73-
# self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR)
74-
# self.resp.raw._fp.fp.raw._sock.close()
75-
7673

7774
def _connect(self):
78-
"""connects to the server using requests"""
75+
"""Connects to the server using requests."""
7976
if self.should_connect:
80-
success = False
81-
while not success:
82-
if self.last_id:
83-
self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id
84-
# Use session if set. Otherwise fall back to requests module.
85-
self.requester = self.session or requests
86-
self.resp = self.requester.get(self.url, stream=True, **self.requests_kwargs)
87-
88-
self.resp_iterator = self.resp.iter_content(decode_unicode=True)
89-
90-
# TODO: Ensure we're handling redirects. Might also stick the 'origin'
91-
# attribute on Events like the Javascript spec requires.
92-
self.resp.raise_for_status()
93-
success = True
77+
if self.last_id:
78+
self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id
79+
self.resp = self.session.get(self.url, stream=True, **self.requests_kwargs)
80+
self.resp_iterator = self.resp.iter_content(decode_unicode=True)
81+
self.resp.raise_for_status()
9482
else:
9583
raise StopIteration()
9684

9785
def _event_complete(self):
98-
"""Checks if the event is completed by matching regular expression
99-
100-
Returns:
101-
boolean: True if the regex matched meaning end of event, else False
102-
"""
86+
"""Checks if the event is completed by matching regular expression."""
10387
return re.search(end_of_field, self.buf) is not None
10488

10589
def __iter__(self):
@@ -113,8 +97,6 @@ def __next__(self):
11397
except (StopIteration, requests.RequestException):
11498
time.sleep(self.retry / 1000.0)
11599
self._connect()
116-
117-
118100
# The SSE spec only supports resuming from a whole message, so
119101
# if we have half a message we should throw it out.
120102
head, sep, tail = self.buf.rpartition('\n')
@@ -123,56 +105,54 @@ def __next__(self):
123105

124106
split = re.split(end_of_field, self.buf)
125107
head = split[0]
126-
tail = "".join(split[1:])
108+
tail = ''.join(split[1:])
127109

128110
self.buf = tail
129-
msg = Event.parse(head)
111+
event = Event.parse(head)
130112

131-
if msg.data == "credential is no longer valid":
113+
if event.data == 'credential is no longer valid':
132114
self._connect()
133115
return None
134-
135-
if msg.data == 'null':
116+
elif event.data == 'null':
136117
return None
137118

138119
# If the server requests a specific retry delay, we need to honor it.
139-
if msg.retry:
140-
self.retry = msg.retry
120+
if event.retry:
121+
self.retry = event.retry
141122

142123
# last_id should only be set if included in the message. It's not
143124
# forgotten if a message omits it.
144-
if msg.event_id:
145-
self.last_id = msg.event_id
146-
147-
return msg
125+
if event.event_id:
126+
self.last_id = event.event_id
127+
return event
148128

149-
if six.PY2:
150-
next = __next__
129+
def next(self):
130+
return self.__next__()
151131

152132

153133
class Event(object):
154-
"""Event class to handle the events fired by SSE"""
134+
"""Event represents the events fired by SSE."""
155135

156136
sse_line_pattern = re.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?')
157137

158-
def __init__(self, data='', event='message', event_id=None, retry=None):
138+
def __init__(self, data='', event_type='message', event_id=None, retry=None):
159139
self.data = data
160-
self.event = event
140+
self.event_type = event_type
161141
self.event_id = event_id
162142
self.retry = retry
163143

164144
@classmethod
165145
def parse(cls, raw):
166-
"""Given a possibly-multiline string representing an SSE message, parse it
167-
and return a Event object.
146+
"""Given a possibly-multiline string representing an SSE message, parses it
147+
and returns an Event object.
168148
169149
Args:
170-
raw: the raw data to parse
150+
raw: the raw data to parse.
171151
172152
Returns:
173-
Event: newly intialized Event() object with the parameters initialized
153+
Event: newly intialized ``Event`` object with the parameters initialized.
174154
"""
175-
msg = cls()
155+
event = cls()
176156
for line in raw.split('\n'):
177157
match = cls.sse_line_pattern.match(line)
178158
if match is None:
@@ -185,22 +165,17 @@ def parse(cls, raw):
185165
if name == '':
186166
# line began with a ":", so is a comment. Ignore
187167
continue
188-
189-
if name == 'data':
168+
elif name == 'data':
190169
# If we already have some data, then join to it with a newline.
191170
# Else this is it.
192-
if msg.data:
193-
msg.data = '%s\n%s' % (msg.data, value)
171+
if event.data:
172+
event.data = '%s\n%s' % (event.data, value)
194173
else:
195-
msg.data = value
174+
event.data = value
196175
elif name == 'event':
197-
msg.event = value
176+
event.event_type = value
198177
elif name == 'id':
199-
msg.event_id = value
178+
event.event_id = value
200179
elif name == 'retry':
201-
msg.retry = int(value)
202-
203-
return msg
204-
205-
def __str__(self):
20 419C 6-
return self.data
180+
event.retry = int(value)
181+
return event

0 commit comments

Comments
 (0)
0