From a795145ab0c83d9786e6135d82f873a773ebca48 Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Wed, 19 Mar 2025 15:59:57 +0200 Subject: [PATCH 1/3] [utils] Add a batch policy utility --- .../localstack/utils/batch_policy.py | 114 +++++++++++ tests/unit/utils/test_batch_policy.py | 180 ++++++++++++++++++ 2 files changed, 294 insertions(+) create mode 100644 localstack-core/localstack/utils/batch_policy.py create mode 100644 tests/unit/utils/test_batch_policy.py diff --git a/localstack-core/localstack/utils/batch_policy.py b/localstack-core/localstack/utils/batch_policy.py new file mode 100644 index 0000000000000..8cdad6ecf5e8f --- /dev/null +++ b/localstack-core/localstack/utils/batch_policy.py @@ -0,0 +1,114 @@ +import copy +import time +from typing import Generic, List, Optional, TypeVar, overload + +from pydantic import Field +from pydantic.dataclasses import dataclass + +T = TypeVar("T") + + +# TODO: Add batching on bytes as well. +@dataclass +class Batcher(Generic[T]): + """ + A utility for collecting items into batches and flushing them when one or more batch policy conditions are met. + + The batch policy can be created to trigger on: + - max_count: Maximum number of items added + - max_window: Maximum time window (in seconds) + + If no limits are specified, the batcher is always in triggered state. + + Example usage: + + import time + + # Triggers when 2 (or more) items are added + batcher = Batcher(max_count=2) + assert batcher.add(["foo", "bar", "baz"]) + assert batcher.flush() == ["foo", "bar", "baz"] + + # Triggers partially when 2 (or more) items are added + batcher = Batcher(max_count=2) + assert batcher.add(["foo", "bar", "baz"]) + assert batcher.flush(partial=True) == ["foo", "bar"] + assert batcher.add("foobar") + assert batcher.flush(partial=True) == ["baz", "foobar"] + + # Trigger 2 seconds after the first add + batcher = Batcher(max_window=2.0) + assert not batcher.add(["foo", "bar", "baz"]) + time.sleep(2.1) + assert not batcher.add(["foobar"]) + assert batcher.flush() == ["foo", "bar", "baz", "foobar"] + """ + + max_count: Optional[int] = Field(default=None, description="Maximum number of items", ge=0) + max_window: Optional[float] = Field( + default=None, description="Maximum time window in seconds", ge=0 + ) + + _triggered: bool = Field(default=False, init=False) + _last_batch_time: float = Field(default_factory=time.monotonic, init=False) + _batch: list[T] = Field(default_factory=list, init=False) + + @property + def period(self) -> float: + return time.monotonic() - self._last_batch_time + + def _check_batch_policy(self) -> bool: + """Check if any batch policy conditions are met""" + if self.max_count is not None and len(self._batch) >= self.max_count: + self._triggered = True + elif self.max_window is not None and self.period >= self.max_window: + self._triggered = True + elif not self.max_count and not self.max_window: + # always return true + self._triggered = True + + return self._triggered + + @overload + def add(self, item: T, *, cache_deep_copy: bool = False) -> bool: ... + + @overload + def add(self, items: List[T], *, cache_deep_copy: bool = False) -> bool: ... + + def add(self, item_or_items: T | list[T], *, cache_deep_copy: bool = False) -> bool: + if cache_deep_copy: + item_or_items = copy.deepcopy(item_or_items) + + if isinstance(item_or_items, list): + self._batch.extend(item_or_items) + else: + self._batch.append(item_or_items) + + # Check if the last addition triggered the batch policy + return self.is_triggered() + + def flush(self, *, partial=False) -> list[T]: + result = [] + if not partial or not self.max_count: + result = self._batch.copy() + self._batch.clear() + else: + batch_size = min(self.max_count, len(self._batch)) + result = self._batch[:batch_size].copy() + self._batch = self._batch[batch_size:] + + self._last_batch_time = time.monotonic() + self._triggered = False + + return result + + def duration_until_next_batch(self) -> float: + if not self.max_window: + return -1 + return max(self.max_window - self.period, -1) + + def get_current_size(self) -> int: + return len(self._batch) + + def is_triggered(self): + return self._triggered or self._check_batch_policy() diff --git a/tests/unit/utils/test_batch_policy.py b/tests/unit/utils/test_batch_policy.py new file mode 100644 index 0000000000000..c21d2513041e2 --- /dev/null +++ b/tests/unit/utils/test_batch_policy.py @@ -0,0 +1,180 @@ +import time + +from localstack.utils.batch_policy import Batcher + + +class SimpleItem: + def __init__(self, number=10): + self.number = number + + +class TestBatcher: + def test_add_single_item(self): + batcher = Batcher(max_count=2) + + assert not batcher.add("item1") + assert batcher.get_current_size() == 1 + assert not batcher.is_triggered() + + assert batcher.add("item2") + assert batcher.is_triggered() + + result = batcher.flush() + assert result == ["item1", "item2"] + assert batcher.get_current_size() == 0 + + def test_add_multple_items(self): + batcher = Batcher(max_count=3) + + assert not batcher.add(["item1", "item2"]) + assert batcher.get_current_size() == 2 + assert not batcher.is_triggered() + + assert batcher.add(["item3", "item4"]) # exceeds max_count + assert batcher.is_triggered() + assert batcher.get_current_size() == 4 + + result = batcher.flush() + assert result == ["item1", "item2", "item3", "item4"] + assert batcher.get_current_size() == 0 + + assert batcher.add(["item1", "item2", "item3", "item4"]) + assert batcher.flush() == ["item1", "item2", "item3", "item4"] + assert not batcher.is_triggered() + + def test_max_count_limit(self): + batcher = Batcher(max_count=3) + + assert not batcher.add("item1") + assert not batcher.add("item2") + assert batcher.add("item3") + + assert batcher.is_triggered() + assert batcher.get_current_size() == 3 + + result = batcher.flush() + assert result == ["item1", "item2", "item3"] + assert batcher.get_current_size() == 0 + + assert not batcher.add("item4") + assert not batcher.add("item5") + assert batcher.get_current_size() == 2 + + def test_max_window_limit(self): + max_window = 0.5 + batcher = Batcher(max_window=max_window) + + assert not batcher.add("item1") + assert batcher.get_current_size() == 1 + assert not batcher.is_triggered() + + assert not batcher.add("item2") + assert batcher.get_current_size() == 2 + assert not batcher.is_triggered() + + time.sleep(max_window + 0.1) + + assert batcher.add("item3") + assert batcher.is_triggered() + assert batcher.get_current_size() == 3 + + result = batcher.flush() + assert result == ["item1", "item2", "item3"] + assert batcher.get_current_size() == 0 + + def test_multiple_policies(self): + batcher = Batcher(max_count=5, max_window=2.0) + + item1 = SimpleItem(1) + for _ in range(5): + batcher.add(item1) + assert batcher.is_triggered() + + result = batcher.flush() + assert result == [item1, item1, item1, item1, item1] + assert batcher.get_current_size() == 0 + + batcher.add(item1) + assert not batcher.is_triggered() + + item2 = SimpleItem(10) + + time.sleep(2.1) + batcher.add(item2) + assert batcher.is_triggered() + + result = batcher.flush() + assert result == [item1, item2] + + def test_flush(self): + batcher = Batcher(max_count=10) + + batcher.add("item1") + batcher.add("item2") + batcher.add("item3") + + result = batcher.flush() + assert result == ["item1", "item2", "item3"] + assert batcher.get_current_size() == 0 + + batcher.add("item4") + result = batcher.flush() + assert result == ["item4"] + assert batcher.get_current_size() == 0 + + def test_no_limits(self): + batcher = Batcher() + assert batcher.is_triggered() # no limit always returns true + + assert batcher.add("item1") + assert batcher.get_current_size() == 1 + assert batcher.is_triggered() + + assert batcher.add(["item2", "item3"]) + assert batcher.get_current_size() == 3 + assert batcher.is_triggered() + + result = batcher.flush() + assert result == ["item1", "item2", "item3"] + assert batcher.get_current_size() == 0 + + def test_triggered_state(self): + batcher = Batcher(max_count=2) + + assert not batcher.add("item1") + assert not batcher.is_triggered() + + assert batcher.add("item2") + assert batcher.is_triggered() + + assert batcher.add("item3") + assert batcher.flush() == ["item1", "item2", "item3"] + assert batcher.get_current_size() == 0 + assert not batcher.is_triggered() + + def test_max_count_partial_flush(self): + batcher = Batcher(max_count=2) + + assert batcher.add(["item1", "item2", "item3", "item4"]) + assert batcher.is_triggered() + + assert batcher.flush(partial=True) == ["item1", "item2"] + assert batcher.get_current_size() == 2 + + assert batcher.flush(partial=True) == ["item3", "item4"] + assert not batcher.is_triggered() # early flush + + assert batcher.flush() == [] + assert batcher.get_current_size() == 0 + assert not batcher.is_triggered() + + def test_deep_copy(self): + original = {"key": "value"} + batcher = Batcher(max_count=2) + + batcher.add(original, cache_deep_copy=True) + + original["key"] = "modified" + + batch = batcher.flush() + assert batch[0]["key"] == "value" From 6ed957beade9ce80edd53d0b42fd4a11fa128a29 Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Tue, 25 Mar 2025 17:56:36 +0200 Subject: [PATCH 2/3] Add type alias and better naming --- .../localstack/utils/batch_policy.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/localstack-core/localstack/utils/batch_policy.py b/localstack-core/localstack/utils/batch_policy.py index 8cdad6ecf5e8f..e69320d0a3a54 100644 --- a/localstack-core/localstack/utils/batch_policy.py +++ b/localstack-core/localstack/utils/batch_policy.py @@ -7,6 +7,9 @@ T = TypeVar("T") +# alias to signify whether a batch policy has been triggered +BatchPolicyTriggered = bool + # TODO: Add batching on bytes as well. @dataclass @@ -70,13 +73,19 @@ def _check_batch_policy(self) -> bool: return self._triggered @overload - def add(self, item: T, *, cache_deep_copy: bool = False) -> bool: ... + def add(self, item: T, *, deep_copy: bool = False) -> BatchPolicyTriggered: ... @overload - def add(self, items: List[T], *, cache_deep_copy: bool = False) -> bool: ... + def add(self, items: List[T], *, deep_copy: bool = False) -> BatchPolicyTriggered: ... + + def add(self, item_or_items: T | list[T], *, deep_copy: bool = False) -> BatchPolicyTriggered: + """ + Add an item or list of items to the collected batch. - def add(self, item_or_items: T | list[T], *, cache_deep_copy: bool = False) -> bool: - if cache_deep_copy: + Returns: + BatchPolicyTriggered: True if the batch policy was triggered during addition, False otherwise. + """ + if deep_copy: item_or_items = copy.deepcopy(item_or_items) if isinstance(item_or_items, list): From 399b60df4cf554be18b2128d98d4304965df2bde Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Tue, 25 Mar 2025 19:34:54 +0200 Subject: [PATCH 3/3] Add more tests --- .../localstack/utils/batch_policy.py | 19 ++++++++++--------- tests/unit/utils/test_batch_policy.py | 16 +++++++++++++--- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/localstack-core/localstack/utils/batch_policy.py b/localstack-core/localstack/utils/batch_policy.py index e69320d0a3a54..9ac5e575f3a49 100644 --- a/localstack-core/localstack/utils/batch_policy.py +++ b/localstack-core/localstack/utils/batch_policy.py @@ -29,22 +29,22 @@ class Batcher(Generic[T]): # Triggers when 2 (or more) items are added batcher = Batcher(max_count=2) - assert batcher.add(["foo", "bar", "baz"]) - assert batcher.flush() == ["foo", "bar", "baz"] + assert batcher.add(["item1", "item2", "item3"]) + assert batcher.flush() == ["item1", "item2", "item3"] # Triggers partially when 2 (or more) items are added batcher = Batcher(max_count=2) - assert batcher.add(["foo", "bar", "baz"]) - assert batcher.flush(partial=True) == ["foo", "bar"] - assert batcher.add("foobar") - assert batcher.flush(partial=True) == ["baz", "foobar"] + assert batcher.add(["item1", "item2", "item3"]) + assert batcher.flush(partial=True) == ["item1", "item2"] + assert batcher.add("item4") + assert batcher.flush(partial=True) == ["item3", "item4"] # Trigger 2 seconds after the first add batcher = Batcher(max_window=2.0) - assert not batcher.add(["foo", "bar", "baz"]) + assert not batcher.add(["item1", "item2", "item3"]) time.sleep(2.1) - assert not batcher.add(["foobar"]) - assert batcher.flush() == ["foo", "bar", "baz", "foobar"] + assert not batcher.add(["item4"]) + assert batcher.flush() == ["item1", "item2", "item3", "item4"] """ max_count: Optional[int] = Field(default=None, description="Maximum number of items", ge=0) @@ -108,6 +108,7 @@ def flush(self, *, partial=False) -> list[T]: self._last_batch_time = time.monotonic() self._triggered = False + self._check_batch_policy() return result diff --git a/tests/unit/utils/test_batch_policy.py b/tests/unit/utils/test_batch_policy.py index c21d2513041e2..e93fd594ded1d 100644 --- a/tests/unit/utils/test_batch_policy.py +++ b/tests/unit/utils/test_batch_policy.py @@ -1,5 +1,7 @@ import time +import pytest + from localstack.utils.batch_policy import Batcher @@ -122,8 +124,16 @@ def test_flush(self): assert result == ["item4"] assert batcher.get_current_size() == 0 - def test_no_limits(self): - batcher = Batcher() + @pytest.mark.parametrize( + "max_count,max_window", + [(0, 10), (10, 0), (None, None)], + ) + def test_no_limits(self, max_count, max_window): + if max_count or max_window: + batcher = Batcher(max_count=max_count, max_window=max_window) + else: + batcher = Batcher() + assert batcher.is_triggered() # no limit always returns true assert batcher.add("item1") @@ -172,7 +182,7 @@ def test_deep_copy(self): original = {"key": "value"} batcher = Batcher(max_count=2) - batcher.add(original, cache_deep_copy=True) + batcher.add(original, deep_copy=True) original["key"] = "modified"