diff --git a/src/apify_client/_dynamic_timeout.py b/src/apify_client/_dynamic_timeout.py new file mode 100644 index 00000000..47b1caaa --- /dev/null +++ b/src/apify_client/_dynamic_timeout.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from collections.abc import AsyncIterable, Iterable +from typing import Protocol, Union + +RequestContent = Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]] + + +class DynamicTimeoutFunction(Protocol): + """A function for dynamically creating suitable timeout for an http request.""" + + def __call__(self, method: str, url: str, content: RequestContent) -> None | int: + """Generate suitable timeout [s] for the request.""" diff --git a/src/apify_client/_http_client.py b/src/apify_client/_http_client.py index 2a23516f..81c14970 100644 --- a/src/apify_client/_http_client.py +++ b/src/apify_client/_http_client.py @@ -20,6 +20,8 @@ if TYPE_CHECKING: from apify_shared.types import JSONSerializable + from apify_client._dynamic_timeout import DynamicTimeoutFunction + DEFAULT_BACKOFF_EXPONENTIAL_FACTOR = 2 DEFAULT_BACKOFF_RANDOM_FACTOR = 1 @@ -36,11 +38,13 @@ def __init__( max_retries: int = 8, min_delay_between_retries_millis: int = 500, timeout_secs: int = 360, + get_dynamic_timeout: DynamicTimeoutFunction | None = None, stats: Statistics | None = None, ) -> None: self.max_retries = max_retries self.min_delay_between_retries_millis = min_delay_between_retries_millis self.timeout_secs = timeout_secs + self._get_dynamic_timeout = get_dynamic_timeout headers = {'Accept': 'application/json, */*'} @@ -170,6 +174,16 @@ def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response: params=params, content=content, ) + + if self._get_dynamic_timeout: + timeout = self._get_dynamic_timeout(method, url, content) or self.timeout_secs + request.extensions['timeout'] = { + 'connect': timeout, + 'pool': timeout, + 'read': timeout, + 'write': timeout, + } + response = httpx_client.send( request=request, stream=stream or False, @@ -249,6 +263,16 @@ async def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response params=params, content=content, ) + + if self._get_dynamic_timeout: + timeout = self._get_dynamic_timeout(method, url, content) or self.timeout_secs + request.extensions['timeout'] = { + 'connect': timeout, + 'pool': timeout, + 'read': timeout, + 'write': timeout, + } + response = await httpx_async_client.send( request=request, stream=stream or False, diff --git a/src/apify_client/client.py b/src/apify_client/client.py index 7aae143b..e39da6cc 100644 --- a/src/apify_client/client.py +++ b/src/apify_client/client.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing import TYPE_CHECKING + from apify_shared.utils import ignore_docs from apify_client._http_client import HTTPClient, HTTPClientAsync @@ -53,6 +55,9 @@ WebhookDispatchCollectionClientAsync, ) +if TYPE_CHECKING: + from apify_client._dynamic_timeout import DynamicTimeoutFunction + DEFAULT_API_URL = 'https://api.apify.com' API_VERSION = 'v2' @@ -108,6 +113,7 @@ def __init__( max_retries: int | None = 8, min_delay_between_retries_millis: int | None = 500, timeout_secs: int | None = 360, + get_dynamic_timeout: DynamicTimeoutFunction | None = None, ) -> None: """Initialize a new instance. @@ -118,6 +124,7 @@ def __init__( min_delay_between_retries_millis: How long will the client wait between retrying requests (increases exponentially from this value). timeout_secs: The socket timeout of the HTTP requests sent to the Apify API. + get_dynamic_timeout: A function that can be called for each request to get suitable custom timeout for it. """ super().__init__( token, @@ -133,6 +140,7 @@ def __init__( max_retries=self.max_retries, min_delay_between_retries_millis=self.min_delay_between_retries_millis, timeout_secs=self.timeout_secs, + get_dynamic_timeout=get_dynamic_timeout, stats=self.stats, ) @@ -291,6 +299,7 @@ def __init__( max_retries: int | None = 8, min_delay_between_retries_millis: int | None = 500, timeout_secs: int | None = 360, + get_dynamic_timeout: DynamicTimeoutFunction | None = None, ) -> None: """Initialize a new instance. @@ -301,6 +310,7 @@ def __init__( min_delay_between_retries_millis: How long will the client wait between retrying requests (increases exponentially from this value). timeout_secs: The socket timeout of the HTTP requests sent to the Apify API. + get_dynamic_timeout: A function that can be called for each request to get suitable custom timeout for it. """ super().__init__( token, @@ -316,6 +326,7 @@ def __init__( max_retries=self.max_retries, min_delay_between_retries_millis=self.min_delay_between_retries_millis, timeout_secs=self.timeout_secs, + get_dynamic_timeout=get_dynamic_timeout, stats=self.stats, ) diff --git a/tests/unit/test_dynamic_timeout.py b/tests/unit/test_dynamic_timeout.py new file mode 100644 index 00000000..765139ac --- /dev/null +++ b/tests/unit/test_dynamic_timeout.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +import respx +from httpx import Request, Response + +from apify_client import ApifyClient, ApifyClientAsync + +if TYPE_CHECKING: + from apify_client._dynamic_timeout import DynamicTimeoutFunction, RequestContent + + +@pytest.fixture +def get_dynamic_timeout_function() -> DynamicTimeoutFunction: + """Example of a dynamic timeout function.""" + + def get_dynamic_timeout(method: str, url: str, content: RequestContent) -> int | None: + """Return suitable timeout. + + For POST on endpoint v2/datasets/whatever/items timeout is proportional to the size of the content. + For everything else return fixed 30.""" + if isinstance(content, bytes) and method == 'POST' and url.endswith('v2/datasets/whatever/items'): + dynamic_timeout_based_on_size = int(len(content) / 10) + return min(360, max(5, dynamic_timeout_based_on_size)) # Saturate in range 5-360 seconds + return 30 + + return get_dynamic_timeout + + +@respx.mock +@pytest.mark.parametrize( + ('content', 'expected_timeout'), + [ + pytest.param('abcd', 5, id='Small payload'), + pytest.param('abcd' * 10000, 9, id='Payload in the dynamic timeout interval interval'), + pytest.param('abcd' * 1000000, 360, id='Large payload'), + ], +) +async def test_dynamic_timeout_async_client( + get_dynamic_timeout_function: DynamicTimeoutFunction, content: str, expected_timeout: int +) -> None: + def check_timeout(request: Request) -> Response: + assert request.extensions['timeout'] == { + 'connect': expected_timeout, + 'pool': expected_timeout, + 'read': expected_timeout, + 'write': expected_timeout, + } + return Response(201) + + respx.post('https://api.apify.com/v2/datasets/whatever/items').mock(side_effect=check_timeout) + client = ApifyClientAsync(get_dynamic_timeout=get_dynamic_timeout_function) + await client.dataset(dataset_id='whatever').push_items({'some_key': content}) + + +@respx.mock +async def test_dynamic_timeout_async_client_default() -> None: + expected_timeout = 360 + + def check_timeout(request: Request) -> Response: + assert request.extensions['timeout'] == { + 'connect': expected_timeout, + 'pool': expected_timeout, + 'read': expected_timeout, + 'write': expected_timeout, + } + return Response(201) + + respx.post('https://api.apify.com/v2/datasets/whatever/items').mock(side_effect=check_timeout) + client = ApifyClientAsync() + await client.dataset(dataset_id='whatever').push_items({'some_key': 'abcd'}) + + +@respx.mock +@pytest.mark.parametrize( + ('content', 'expected_timeout'), + [ + pytest.param('abcd', 5, id='Small payload'), + pytest.param('abcd' * 10000, 9, id='Payload in the dynamic timeout interval interval'), + pytest.param('abcd' * 1000000, 360, id='Large payload'), + ], +) +def test_dynamic_timeout_sync_client( + get_dynamic_timeout_function: DynamicTimeoutFunction, content: str, expected_timeout: int +) -> None: + def check_timeout(request: Request) -> Response: + assert request.extensions['timeout'] == { + 'connect': expected_timeout, + 'pool': expected_timeout, + 'read': expected_timeout, + 'write': expected_timeout, + } + return Response(201) + + respx.post('https://api.apify.com/v2/datasets/whatever/items').mock(side_effect=check_timeout) + client = ApifyClient(get_dynamic_timeout=get_dynamic_timeout_function) + client.dataset(dataset_id='whatever').push_items({'some_key': content}) + + +@respx.mock +def test_dynamic_timeout_sync_client_default() -> None: + expected_timeout = 360 + + def check_timeout(request: Request) -> Response: + assert request.extensions['timeout'] == { + 'connect': expected_timeout, + 'pool': expected_timeout, + 'read': expected_timeout, + 'write': expected_timeout, + } + return Response(201) + + respx.post('https://api.apify.com/v2/datasets/whatever/items').mock(side_effect=check_timeout) + client = ApifyClient() + client.dataset(dataset_id='whatever').push_items({'some_key': 'abcd'})