10000 Fixing a performance issue in SSE client (#221) · firebase/firebase-admin-python@3e31716 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3e31716

Browse files
authored
Fixing a performance issue in SSE client (#221)
* Fixing a performance issue in SSE client * Merged with master * Using a more efficient mechanism to buffer and parse incoming SSE data * Minor improvement to how tail string is read * Using string slicing to reduce the window * Updated comment
1 parent 77737c0 commit 3e31716

File tree

3 files changed

+84
-24
lines changed

3 files changed

+84
-24
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
type.googleapis.com/google.firebase.fcm.v1.FcmError to set error code.
55
- [fixed] Ensuring that `UserRecord.tokens_valid_after_time` always
66
returns an integer, and never returns `None`.
7+
- [fixed] Fixing a performance issue in the `db.listen()` API
8+
where it was taking a long time to process large RTDB nodes.
79

810
# v2.13.0
911

firebase_admin/_sseclient.py

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
"""SSEClient module to stream realtime updates in the Firebase Database."""
15+
"""SSEClient module to stream realtime updates from the Firebase Database.
16+
17+
Based on a similar implementation from Pyrebase.
18+
"""
1619

1720
import re
1821
import time
@@ -37,6 +40,34 @@ def rebuild_auth(self, prepared_request, response):
3740
pass
3841

3942

43+
class _EventBuffer(object):
44+
"""A helper class for buffering and parsing raw SSE data."""
45+
46+
def __init__(self):
47+
self._buffer = []
48+
self._tail = ''
49+
50+
def append(self, char):
51+
self._buffer.append(char)
52+
self._tail += char
53+
self._tail = self._tail[-4:]
54+
55+
def truncate(self):
56+
head, sep, _ = self.buffer_string.rpartition('\n')
57+
rem = head + sep
58+
self._buffer = list(rem)
59+
self._tail = rem[-4:]
60+
61+
@property
62+
def is_end_of_field(self):
63+
last_two_chars = self._tail[-2:]
64+
return last_two_chars == '\n\n' or last_two_chars == '\r\r' or self._tail == '\r\n\r\n'
65+
66+
@property
67+
def buffer_string(self):
68+
return ''.join(self._buffer)
69+
70+
4071
class SSEClient(object):
4172
"""SSE client implementation."""
4273

@@ -58,7 +89,7 @@ def __init__(self, url, session, retry=3000, **kwargs):
5889
self.buf = u'' # Keep data here as it streams in
5990

6091
headers = self.requests_kwargs.get('headers', {})
61-
# The SSE spec requires making requests with Cache-Control: nocache
92+
# The SSE spec requires making requests with Cache-Control: no-cache
6293
headers['Cache-Control'] = 'no-cache'
6394
# The 'Accept' header is not required, but explicit > implicit
6495
headers['Accept'] = 'text/event-stream'
@@ -82,32 +113,28 @@ def _connect(self):
82113
else:
83114
raise StopIteration()
84115

85-
def _event_complete(self):
86-
"""Checks if the event is completed by matching regular expression."""
87-
return re.search(end_of_field, self.buf) is not None
88-
89116
def __iter__(self):
90117
return self
91118

92119
def __next__(self):
93-
while not self._event_complete():
94-
try:
95-
nextchar = next(self.resp_iterator)
96-
self.buf += nextchar
97-
except (StopIteration, requests.RequestException):
98-
time.sleep(self.retry / 1000.0)
99-
self._connect()
100-
# The SSE spec only supports resuming from a whole message, so
101-
# if we have half a message we should throw it out.
102-
head, sep, tail = self.buf.rpartition('\n')
103-
self.buf = head + sep
104-
continue
120+
if not re.search(end_of_field, self.buf):
121+
temp_buffer = _EventBuffer()
122+
while not temp_buffer.is_end_of_field:
123+
try:
124+
nextchar = next(self.resp_iterator)
125+
temp_buffer.append(nextchar)
126+
except (StopIteration, requests.RequestException):
127+
time.sleep(self.retry / 1000.0)
128+
self._connect()
129+
# The SSE spec only supports resuming from a whole message, so
130+
# if we have half a message we should throw it out.
131+
temp_buffer.truncate()
132+
continue
133+
self.buf = temp_buffer.buffer_string
105134

106135
split = re.split(end_of_field, self.buf)
107136
head = split[0]
108-
tail = ''.join(split[1:])
109-
110-
self.buf = tail
137+
self.buf = '\n\n'.join(split[1:])
111138
event = Event.parse(head)
F438
112139

113140
if event.data == 'credential is no longer valid':
@@ -150,7 +177,7 @@ def parse(cls, raw):
150177
raw: the raw data to parse.
151178
152179
Returns:
153-
Event: newly intialized ``Event`` object with the parameters initialized.
180+
Event: A new ``Event`` with the parameters initialized.
154181
"""
155182
event = cls()
156183
for line in raw.split('\n'):

tests/test_sseclient.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import six
2020

2121
from firebase_admin import _sseclient
22-
from tests.testutils import MockAdapter
22+
from tests import testutils
2323

2424

25-
class MockSSEClientAdapter(MockAdapter):
25+
class MockSSEClientAdapter(testutils.MockAdapter):
2626

2727
def __init__(self, payload, recorder):
2828
super(MockSSEClientAdapter, self).__init__(payload, 200, recorder)
@@ -73,6 +73,17 @@ def test_single_event(self):
7373
assert event_payload["path"] == "/"
7474
assert len(recorder) == 2
7575

76+
def test_large_event(self):
77+
data = 'a' * int(0.1 * 1024 * 1024)
78+
payload = 'event: put\ndata: {"path":"/","data":"' + data + '"}\n\n'
79+
recorder = []
80+
sseclient = self.init_sse(payload, recorder)
81+
event = next(sseclient)
82+
event_payload = json.loads(event.data)
83+
assert event_payload["data"] == data
84+
assert event_payload["path"] == "/"
85+
assert len(recorder) == 1
86+
7687
def test_multiple_events(self):
7788
payload = 'event: put\ndata: {"path":"/foo","data":"testevent1"}\n\n'
7889
payload += 'event: put\ndata: {"path":"/bar","data":"testevent2"}\n\n'
@@ -88,6 +99,26 @@ def test_multiple_events(self):
8899
assert event_payload["path"] == "/bar"
89100
assert len(recorder) == 1
90101

102+
def test_event_separators(self):
103+
payload = 'event: put\ndata: {"path":"/foo","data":"testevent1"}\n\n'
104+
payload += 'event: put\ndata: {"path":"/bar","data":"testevent2"}\r\r'
105+
payload += 'event: put\ndata: {"path":"/baz","data":"testevent3"}\r\n\r\n'
106+
recorder = []
107+
sseclient = self.init_sse(payload, recorder)
108+
event = next(sseclient)
109+
event_payload = json.loads(event.data)
110+
assert event_payload["data"] == "testevent1"
111+
assert event_payload["path"] == "/foo"
112+
event = next(sseclient)
113+
event_payload = json.loads(event.data)
114+
assert event_payload["data"] == "testevent2"
115+
assert event_payload["path"] == "/bar"
116+
event = next(sseclient)
117+
event_payload = json.loads(event.data)
118+
assert event_payload["data"] == "testevent3"
119+
assert event_payload["path"] == "/baz"
120+
assert len(recorder) == 1
121+
91122

92123
class TestEvent(object):
93124
"""Test cases for server-side events"""

0 commit comments

Comments
 (0)
0