@@ -1713,6 +1713,21 @@ def test_process_requests_no_errors():
17131713 assert not requests_to_retry
17141714
17151715
1716+ def test_process_requests_no_errors_no_future ():
1717+ # no errors, request should be completed, even when future is None.
1718+ ack_reqs_dict = {
1719+ "ackid1" : requests .AckRequest (
1720+ ack_id = "ackid1" , byte_size = 0 , time_to_ack = 20 , ordering_key = "" , future = None
1721+ )
1722+ }
1723+ errors_dict = {}
1724+ requests_completed , requests_to_retry = streaming_pull_manager ._process_requests (
1725+ None , ack_reqs_dict , errors_dict
1726+ )
1727+ assert requests_completed [0 ].ack_id == "ackid1"
1728+ assert not requests_to_retry
1729+
1730+
17161731def test_process_requests_permanent_error_raises_exception ():
17171732 # a permanent error raises an exception
17181733 future = futures .Future ()
@@ -1735,6 +1750,40 @@ def test_process_requests_permanent_error_raises_exception():
17351750 assert not requests_to_retry
17361751
17371752
1753+ def test_process_requests_permanent_error_other_raises_exception ():
1754+ # a permanent error of other raises an exception
1755+ future = futures .Future ()
1756+ ack_reqs_dict = {
1757+ "ackid1" : requests .AckRequest (
1758+ ack_id = "ackid1" , byte_size = 0 , time_to_ack = 20 , ordering_key = "" , future = future
1759+ )
1760+ }
1761+ errors_dict = {"ackid1" : "PERMANENT_FAILURE_OTHER" }
1762+ requests_completed , requests_to_retry = streaming_pull_manager ._process_requests (
1763+ None , ack_reqs_dict , errors_dict
1764+ )
1765+ assert requests_completed [0 ].ack_id == "ackid1"
1766+ with pytest .raises (subscriber_exceptions .AcknowledgeError ) as exc_info :
1767+ future .result ()
1768+ assert exc_info .value .error_code == subscriber_exceptions .AcknowledgeStatus .OTHER
1769+ assert not requests_to_retry
1770+
1771+
1772+ def test_process_requests_permanent_error_other_raises_exception_no_future ():
1773+ # with a permanent error, request is completed even when future is None.
1774+ ack_reqs_dict = {
1775+ "ackid1" : requests .AckRequest (
1776+ ack_id = "ackid1" , byte_size = 0 , time_to_ack = 20 , ordering_key = "" , future = None
1777+ )
1778+ }
1779+ errors_dict = {"ackid1" : "PERMANENT_FAILURE_OTHER" }
1780+ requests_completed , requests_to_retry = streaming_pull_manager ._process_requests (
1781+ None , ack_reqs_dict , errors_dict
1782+ )
1783+ assert requests_completed [0 ].ack_id == "ackid1"
1784+ assert not requests_to_retry
1785+
1786+
17381787def test_process_requests_transient_error_returns_request_for_retrying ():
17391788 # a transient error returns the request in `requests_to_retry`
17401789 future = futures .Future ()
@@ -1872,6 +1921,23 @@ def test_process_requests_other_error_status_raises_exception():
18721921 assert not requests_to_retry
18731922
18741923
1924+ def test_process_requests_other_error_status_raises_exception_no_future ():
1925+ # with an unrecognized error status, requests are completed, even when
1926+ # future is None.
1927+ ack_reqs_dict = {
1928+ "ackid1" : requests .AckRequest (
1929+ ack_id = "ackid1" , byte_size = 0 , time_to_ack = 20 , ordering_key = "" , future = None
1930+ )
1931+ }
1932+ st = status_pb2 .Status ()
1933+ st .code = code_pb2 .Code .OUT_OF_RANGE
1934+ requests_completed , requests_to_retry = streaming_pull_manager ._process_requests (
1935+ st , ack_reqs_dict , None
1936+ )
1937+ assert requests_completed [0 ].ack_id == "ackid1"
1938+ assert not requests_to_retry
1939+
1940+
18751941def test_process_requests_mixed_success_and_failure_acks ():
18761942 # mixed success and failure (acks)
18771943 future1 = futures .Future ()
0 commit comments