10000 fix termination of generators when client connection is closed (#8582) · codeperl/localstack@0088cd7 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0088cd7

Browse files
authored
fix termination of generators when client connection is closed (localstack#8582)
1 parent 59cab78 commit 0088cd7

File tree

3 files changed

+52
-2
lines changed

3 files changed

+52
-2
lines changed

localstack/aws/handlers/logging.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ def _prepare_logger(self, logger: logging.Logger, formatter: Type):
8282
def _log(self, context: RequestContext, response: Response):
8383
aws_logger = self.aws_logger
8484
http_logger = self.http_logger
85-
is_internal_call = is_internal_call_context(context.request.headers)
85+
is_internal_call = (
86+
is_internal_call_context(context.request.headers) or context.is_internal_call
87+
)
8688
if is_internal_call:
8789
aws_logger = self.internal_aws_logger
8890
http_logger = self.internal_http_logger
@@ -143,7 +145,7 @@ def _log(self, context: RequestContext, response: Response):
143145
"request_headers": dict(context.request.headers),
144146
# response
145147
"output_type": "Response",
146-
"output": response.data,
148+
"output": "StreamingBody(unknown)" if response.is_streamed else response.data,
147149
"response_headers": dict(response.headers),
148150
},
149151
)

localstack/http/asgi.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ def write_sync(self, data: bytes) -> None:
253253
async def write(self, data: bytes) -> None:
254254
if not self.started:
255255
raise ValueError("not started the response yet")
256+
if getattr(self.send.__self__, "closed", None):
257+
raise BrokenPipeError("Connection closed")
256258
await self.send({"type": "http.response.body", "body": data, "more_body": True})
257259
self.sent += len(data)
258260
if self.sent >= self.content_length:
@@ -374,7 +376,15 @@ async def handle_http(
374376
else:
375377
for packet in iterable:
376378
await response.write(packet)
379+
except ConnectionError as e:
380+
client_info = "unknown"
381+
if client := scope.get("client"):
382+
address, port = client
383+
client_info = f"{address}:{port}"
384+
LOG.debug("Error while writing responses: %s (client_info: %s)", e, client_info)
377385
finally:
386+
if iterable and hasattr(iterable, "aclose"):
387+
await iterable.aclose()
378388
await response.close()
379389

380390
async def handle_lifespan(

tests/unit/http_/test_asgi.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
import threading
34
import time
45
from concurrent.futures import ThreadPoolExecutor
56
from queue import Queue
@@ -96,6 +97,43 @@ def _gen():
9697
assert next(it) == b"baz"
9798

9899

100+
def test_chunked_transfer_encoding_client_timeout(serve_asgi_adapter):
101+
# this test makes sure that creating a response with a generator automatically creates a
102+
# transfer-encoding=chunked response
103+
104+
generator_exited = threading.Event()
105+
continue_request = threading.Event()
106+
107+
@Request.application
108+
def app(_request: Request) -> Response:
109+
def _gen():
110+
try:
111+
yield "foo"
112+
yield "bar\n"
113+
continue_request.wait()
114+
# only three are needed, let's send some more to make sure
115+
for _ in range(10):
116+
yield "baz\n"
117+
except GeneratorExit:
118+
generator_exited.set()
119+
120+
return Response(_gen(), 200)
121+
122+
server = serve_asgi_adapter(app)
123+
124+
with requests.get(server.url, stream=True) as response:
125+
assert response.headers["Transfer-Encoding"] == "chunked"
126+
127+
it = response.iter_lines()
128+
129+
assert next(it) == b"foobar"
130+
131+
# request is now closed, continue the response generator
132+
continue_request.set()
133+
# this flag is only set when generator is exited
134+
assert generator_exited.wait(timeout=10)
135+
136+
99137
def test_chunked_transfer_encoding_request(serve_asgi_adapter):
100138
request_list: List[Request] = []
101139

0 commit comments

Comments
 (0)
0