@@ -408,6 +408,42 @@ def register_edge_route(self, route_endpoint: Callable):
408
408
finally :
409
409
ROUTER .remove_rule (rule )
410
410
411
+ def test_streaming_request (self ):
412
+ """Test if request data is correctly streamed when the HTTP request uses a generator."""
413
+ queue = Queue ()
414
+
415
+ # generate test events
416
+ elements = [bytes (f"element-{ n :02d} " , "latin-1" ) for n in range (100 )]
417
+
418
+ @route ("/streaming-endpoint-test" )
419
+ def streaming_endpoint (request : HttpRequest ):
420
+ # take the first element from the list and add it to the queue
421
+ next_element = elements .pop (0 )
422
+ queue .put (next_element )
423
+ # process each element, and add the next element to the generating queue after the element is processed
424
+ while data := request .input_stream .read (10 ):
425
+ assert next_element == data
426
+ if len (elements ) > 0 :
427
+ next_element = elements .pop (0 )
428
+ queue .put (next_element )
429
+
430
+ def data_generator ():
431
+ # wait for a new element in the queue, if one is received, send it immediately
432
+ while len (elements ) > 0 :
433
+ try :
434
+ yield queue .get (timeout = 0.1 )
435
+ except Empty :
436
+ # the queue is empty for more than 100 ms, we end here
437
+ pass
438
+
439
+ # register the streaming test endpoint
440
+ with self .register_edge_route (streaming_endpoint ):
441
+ # send a streamed request data to the registered endpoint
442
+ requests .post (f"{ config .get_edge_url ()} /streaming-endpoint-test" , data = data_generator ())
443
+
444
+ # ensure that all elements have been processed
445
+ assert len (elements ) == 0
446
+
411
447
def test_streaming_response (self ):
412
448
"""Test if responses are correctly streamed (HTTP 1.1 chunks) when the HTTP response contains a generator."""
413
449
@@ -453,39 +489,3 @@ def chunk_generator():
453
489
assert element == next (chunk_iterator ).decode ("utf-8" )
454
490
# make sure the queue is empty
455
491
assert queue .empty ()
456
-
457
- def test_streaming_request (self ):
458
- """Test if request data is correctly streamed when the HTTP request uses a generator."""
459
- queue = Queue ()
460
-
461
- # generate test events
462
- elements = [bytes (f"element-{ n :02d} " , "latin-1" ) for n in range (100 )]
463
-
464
- @route ("/streaming-endpoint-test" )
465
- def streaming_endpoint (request : HttpRequest ):
466
- # take the first element from the list and add it to the queue
467
- next_element = elements .pop (0 )
468
- queue .put (next_element )
469
- # process each element, and add the next element to the generating queue after the element is processed
470
- while data := request .input_stream .read (10 ):
471
- assert next_element == data
472
- if len (elements ) > 0 :
473
- next_element = elements .pop (0 )
474
- queue .put (next_element )
475
-
476
- def data_generator ():
477
- try :
478
- # wait for a new element in the queue, if one is received, send it immediately
479
- while chunk := queue .get (timeout = 0.1 ):
480
- yield chunk
481
- except Empty :
482
- # the queue is empty for more than 100 ms, we end here
483
- pass
484
-
485
- # register the streaming test endpoint
486
- with self .register_edge_route (streaming_endpoint ):
487
- # send a streamed request data to the registered endpoint
488
- requests .post (f"{ config .get_edge_url ()} /streaming-endpoint-test" , data = data_generator ())
489
-
490
- # ensure that all elements have been processed
491
- assert len (elements ) == 0
0 commit comments