8000 fix ASF handler chain streaming by alexrashed · Pull Request #7522 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

fix ASF handler chain streaming #7522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix blocking test
  • Loading branch information
alexrashed committed Jan 26, 2023
commit 2c593b8a03991d421beac8ab1c7d17fec8f14bc5
72 changes: 36 additions & 36 deletions tests/integration/test_edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,42 @@ def register_edge_route(self, route_endpoint: Callable):
finally:
ROUTER.remove_rule(rule)

def test_streaming_request(self):
"""Test if request data is correctly streamed when the HTTP request uses a generator."""
queue = Queue()

# generate test events
elements = [bytes(f"element-{n:02d}", "latin-1") for n in range(100)]

@route("/streaming-endpoint-test")
def streaming_endpoint(request: HttpRequest):
# take the first element from the list and add it to the queue
next_element = elements.pop(0)
queue.put(next_element)
# process each element, and add the next element to the generating queue after the element is processed
while data := request.input_stream.read(10):
assert next_element == data
if len(elements) > 0:
next_element = elements.pop(0)
queue.put(next_element)

def data_generator():
# wait for a new element in the queue, if one is received, send it immediately
while len(elements) > 0:
try:
yield queue.get(timeout=0.1)
except Empty:
# the queue is empty for more than 100 ms, we end here
pass

# register the streaming test endpoint
with self.register_edge_route(streaming_endpoint):
# send a streamed request data to the registered endpoint
requests.post(f"{config.get_edge_url()}/streaming-endpoint-test", data=data_generator())

# ensure that all elements have been processed
assert len(elements) == 0

def test_streaming_response(self):
"""Test if responses are correctly streamed (HTTP 1.1 chunks) when the HTTP response contains a generator."""

Expand Down Expand Up @@ -453,39 +489,3 @@ def chunk_generator():
assert element == next(chunk_iterator).decode("utf-8")
# make sure the queue is empty
assert queue.empty()

def test_streaming_request(self):
"""Test if request data is correctly streamed when the HTTP request uses a generator."""
queue = Queue()

# generate test events
elements = [bytes(f"element-{n:02d}", "latin-1") for n in range(100)]

@route("/streaming-endpoint-test")
def streaming_endpoint(request: HttpRequest):
# take the first element from the list and add it to the queue
next_element = elements.pop(0)
queue.put(next_element)
# process each element, and add the next element to the generating queue after the element is processed
while data := request.input_stream.read(10):
assert next_element == data
if len(elements) > 0:
next_element = elements.pop(0)
queue.put(next_element)

def data_generator():
try:
# wait for a new element in the queue, if one is received, send it immediately
while chunk := queue.get(timeout=0.1):
yield chunk
except Empty:
# the queue is empty for more than 100 ms, we end here
pass

# register the streaming test endpoint
with self.register_edge_route(streaming_endpoint):
# send a streamed request data to the registered endpoint
requests.post(f"{config.get_edge_url()}/streaming-endpoint-test", data=data_generator())

# ensure that all elements have been processed
assert len(elements) == 0
0