|
15 | 15 | from localstack import config
|
16 | 16 | from localstack.aws.accounts import get_aws_account_id
|
17 | 17 | from localstack.constants import APPLICATION_JSON, HEADER_LOCALSTACK_EDGE_URL
|
| 18 | +from localstack.http import Request as HttpRequest |
18 | 19 | from localstack.http import Response as HttpResponse
|
19 | 20 | from localstack.http import route
|
20 | 21 | from localstack.http.request import get_full_raw_path
|
@@ -452,3 +453,39 @@ def chunk_generator():
|
452 | 453 | assert element == next(chunk_iterator).decode("utf-8")
|
453 | 454 | # make sure the queue is empty
|
454 | 455 | assert queue.empty()
|
| 456 | + |
| 457 | + def test_streaming_request(self): |
| 458 | + """Test if request data is correctly streamed when the HTTP request uses a generator.""" |
| 459 | + queue = Queue() |
| 460 | + |
| 461 | + # generate test events |
| 462 | + elements = [bytes(f"element-{n:02d}", "latin-1") for n in range(100)] |
| 463 | + |
| 464 | + @route("/streaming-endpoint-test") |
| 465 | + def streaming_endpoint(request: HttpRequest): |
| 466 | + # take the first element from the list and add it to the queue |
| 467 | + next_element = elements.pop(0) |
| 468 | + queue.put(next_element) |
| 469 | + # process each element, and add the next element to the generating queue after the element is processed |
| 470 | + while data := request.input_stream.read(10): |
| 471 | + assert next_element == data |
| 472 | + if len(elements) > 0: |
| 473 | + next_element = elements.pop(0) |
| 474 | + queue.put(next_element) |
| 475 | + |
| 476 | + def data_generator(): |
| 477 | + try: |
| 478 | + # wait for a new element in the queue, if one is received, send it immediately |
| 479 | + while chunk := queue.get(timeout=0.1): |
| 480 | + yield chunk |
| 481 | + except Empty: |
| 482 | + # the queue is empty for more than 100 ms, we end here |
| 483 | + pass |
| 484 | + |
| 485 | + # register the streaming test endpoint |
| 486 | + with self.register_edge_route(streaming_endpoint): |
| 487 | + # send a streamed request data to the registered endpoint |
| 488 | + requests.post(f"{config.get_edge_url()}/streaming-endpoint-test", data=data_generator()) |
| 489 | + |
| 490 | + # ensure that all elements have been processed |
| 491 | + assert len(elements) == 0 |
0 commit comments