8000 add test for ASF request streaming · localstack/localstack@f4389cf · GitHub
[go: up one dir, main page]

Skip to content

Commit f4389cf

Browse files
committed
add test for ASF request streaming
1 parent e5dcb80 commit f4389cf

File tree

2 files changed

+44
-5
lines changed

2 files changed

+44
-5
lines changed

localstack/aws/handlers/logging.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import logging
44
from functools import cached_property
55
from typing import Type
6-
from wsgiref.headers import Headers
6+
7+
from werkzeug.datastructures import Headers
78

89
from localstack.aws.api import RequestContext, ServiceException
910
from localstack.aws.chain import ExceptionHandler, HandlerChain
@@ -81,7 +82,7 @@ def _prepare_logger(self, logger: logging.Logger, formatter: Type):
8182
logger.addHandler(handler)
8283
return logger
8384

84-
def log_http_response(
85+
def _log_http_response(
8586
self,
8687
logger: logging.Logger,
8788
request: Request,
@@ -94,13 +95,14 @@ def log_http_response(
9495
The given response data is returned by this function, which allows the usage as a log interceptor for streamed
9596
response data.
9697
98+
:param logger: HTTP logger to log the request onto
9799
:param request: HTTP request data (containing useful metadata like the HTTP method and path)
98100
:param response_status: HTTP status of the response to log
99101
:param response_headers: HTTP headers of the response to log
100102
:param response_data: HTTP body of the response to log
101103
:return: response data
102104
"""
103-
self.http_logger.info(
105+
logger.info(
104106
"%s %s => %d",
105107
request.method,
106108
request.path,
@@ -173,7 +175,7 @@ def _log(self, context: RequestContext, response: Response):
173175
if hasattr(response.response, "__iter__"):
174176
# If the response is streamed, wrap the response data's iterator which logs all values when they are consumed
175177
log_partial = functools.partial(
176-
self.log_http_response,
178+
self._log_http_response,
177179
http_logger,
178180
context.request,
179181
response.status_code,
@@ -183,7 +185,7 @@ def _log(self, context: RequestContext, response: Response):
183185
response.set_response(wrapped_response_iterator)
184186
else:
185187
# If the response is synchronous, we log the data directly
186-
self.log_http_response(
188+
self._log_http_response(
187189
http_logger,
188190
context.request,
189191
response.status_code,

tests/integration/test_edge.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from localstack import config
1616
from localstack.aws.accounts import get_aws_account_id
1717
from localstack.constants import APPLICATION_JSON, HEADER_LOCALSTACK_EDGE_URL
18+
from localstack.http import Request as HttpRequest
1819
from localstack.http import Response as HttpResponse
1920
from localstack.http import route
2021
from localstack.http.request import get_full_raw_path
@@ -452,3 +453,39 @@ def chunk_generator():
452453
assert element == next(chunk_iterator).decode("utf-8")
453454
# make sure the queue is empty
454455
assert queue.empty()
456+
457+
def test_streaming_request(self):
458+
"""Test if request data is correctly streamed when the HTTP request uses a generator."""
459+
queue = Queue()
460+
461+
# generate test events
462+
elements = [bytes(f"element-{n:02d}", "latin-1") for n in range(100)]
463+
464+
@route("/streaming-endpoint-test")
465+
def streaming_endpoint(request: HttpRequest):
466+
# take the first element from the list and add it to the queue
467+
next_element = elements.pop(0)
468+
queue.put(next_element)
469+
# process each element, and add the next element to the generating queue after the element is processed
470+
while data := request.input_stream.read(10):
471+
assert next_element == data
472+
if len(elements) > 0:
473+
next_element = elements.pop(0)
474+
queue.put(next_element)
475+
476+
def data_generator():
477+
try:
478+
# wait for a new element in the queue, if one is received, send it immediately
479+
while chunk := queue.get(timeout=0.1):
480+
yield chunk
481+
except Empty:
482+
# the queue is empty for more than 100 ms, we end here
483+
pass
484+
485+
# register the streaming test endpoint
486+
with self.register_edge_route(streaming_endpoint):
487+
# send a streamed request data to the registered endpoint
488+
requests.post(f"{config.get_edge_url()}/streaming-endpoint-test", data=data_generator())
489+
490+
# ensure that all elements have been processed
491+
assert len(elements) == 0

0 commit comments

Comments
 (0)
0