diff --git a/.github/ISSUE_TEMPLATE/bug.yml b/.github/ISSUE_TEMPLATE/bug.yml index 17b2148c..d9b39289 100644 --- a/.github/ISSUE_TEMPLATE/bug.yml +++ b/.github/ISSUE_TEMPLATE/bug.yml @@ -50,7 +50,7 @@ body: - Build script, CI, dependencies, etc. (part:tooling) - Channels, `Broadcast`, `Bidirectional`, etc. (part:channels) - Select (part:select) - - Utility receivers, `Merge`, `Timer`, etc. (part:receivers) + - Utility receivers, `Merge`, etc. (part:receivers) validations: required: true - type: textarea diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 9634e08b..bcbb2a5a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -231,6 +231,10 @@ jobs: publish-to-pypi: needs: ["create-github-release"] runs-on: ubuntu-20.04 + permissions: + # For trusted publishing. See: + # https://blog.pypi.org/posts/2023-04-20-introducing-trusted-publishers/ + id-token: write steps: - name: Download dist files uses: actions/download-artifact@v3 @@ -240,6 +244,3 @@ jobs: - name: Publish the Python distribution to PyPI uses: pypa/gh-action-pypi-publish@release/v1 - with: - password: ${{ secrets.PYPI_API_TOKEN }} - skip_existing: true diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 0324ee6e..4a777899 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -2,36 +2,74 @@ ## Summary -The main change in this release is the revamp of exception handling in general. New exceptions were created and `send()` now raises an exception too when it fails. - -Hopefully they are now used much more uniformly across the whole library. +This release adds support to pass `None` values via channels and revamps the `Timer` class to support custom policies for handling missed ticks and use the loop monotonic clock. There is also a fix for the `FileWatcher` which includes a change in behavior when reporting changes for deleted files. ## Upgrading -* The `Sender.send()` method now `raise`s a `SenderError` instead of returning `False`. The `SenderError` will typically have a `ChannelClosedError` and the underlying reason as a chained exception. +* `util.Timer` was replaced by a more generic implementation that allows for customizable policies to handle missed ticks. + + If you were using `Timer` to implement timeouts, these two pieces of code should be almost equivalent: + + - Old: + + ```python + old_timer = Timer(1.0) + triggered_datetime = old_timer.receive() + ``` + + - New: + + ```python + new_timer = Timer.timeout(timedelta(seconds=1.0)) + drift = new_timer.receive() + triggered_datetime = datetime.now(timezone.utc) - drift + ``` + + They are not **exactly** the same because the `triggered_datetime` in the second case will not be exactly when the timer had triggered, but that shouldn't be relevant, the important part is when your code can actually react to the timer trigger and to know how much drift there was to be able to take corrective actions. -* The `Receiver.ready()` method (and related `receive()` and `__anext__` when used as an async iterator) now `raise`s a `ReceiverError` and in particular a `ReceiverStoppedError` when the receiver has no more messages to receive. + Also the new `Timer` uses the `asyncio` loop monotonic clock and the old one used the wall clock (`datetime.now()`) to track time. This means that when using `async-solipsism` to test, the new `Timer` will always trigger immediately regardless of the state of the wall clock. This also means that we don't need to mock the wall clock with `time-machine` either now. - `Receiver.consume()` doesn't raise any exceptions. + With the previous timer one needed to create a separate task to run the timer, because otherwise it would block as it loops until the wall clock was at a specific time. Now the code will run like this: - Receivers raising `EOFError` now raise `ReceiverInvalidatedError` instead. + ```python + timer = Timer.timeout(timedelta(seconds=1.0)) + asyncio.sleep(0.5) # Advances the loop monotonic clock by 0.5 seconds immediately + await drift = timer.receive() # Advances the clock by 0.5 immediately too + assert drift == approx(timedelta(0)) # Because it could trigger exactly at the tick time -* For channels which senders raise an error when the channel is closed or which receivers stop receiving when the channel is closed, the `SenderError` and `ReceiverStoppedError` are chained with a `__cause__` that is a `ChannelClosedError` with the channel that was closed. + # Simulates a delay in the timer trigger time + asyncio.sleep(1.5) # Advances the loop monotonic clock by 1.5 seconds immediately + await drift = timer.receive() # The timer should have triggered 0.5 seconds ago, so it doesn't even sleep + assert drift == approx(timedelta(seconds=0.5)) # Now we should observe a drift of 0.5 seconds + ``` -* `ChannelClosedError` now requires the argument `channel` (before it was optional). + **Note:** Before replacing this code blindly in all uses of `Timer.timeout()`, please consider using the periodic timer constructor `Timer.periodic()` if you need a timer that triggers reliable on a periodic fashion, as the old `Timer` (and `Timer.timeout()`) accumulates drift, which might not be what you want. -* Now exceptions are not raised in Receiver.ready() but in Receiver.consume() (receive() or the async iterator `anext`). +* `FileWatcher` now will emit events even if the file doesn't exist anymore. + + Because the underlying library has a considerable delay in triggering filesystem events, it can happen that, for example, a `CREATE` event is received but at the time of receiving the file doesn't exist anymore (because if was removed just after creation and before the event was triggered). + + Before the `FileWatcher` will only emit events when the file exists, but this doesn't work for `DELETE` events (clearly). Given the nature of this mechanism can lead to races easily, it is better to leave it to the user to decide when these situations happen and just report all events. + + Therefore, you should now check a file receiving an event really exist before trying to operate on it. + +* `FileWatcher` reports the type of event observed in addition to the file path. + + Previously, only the file path was reported. With this update, users can now determine if a file change is a creation, modification, or deletion. + Note that this change may require updates to your existing code that relies on `FileWatcher` as the new interface returns a `FileWatcher.Event` instead of just the file path. ## New Features -* New exceptions were added: +* `util.Timer` was replaced by a more generic implementation that allows for customizable policies to handle missed ticks. + +* Passing `None` values via channels is now supported. - * `Error`: A base exception from which all exceptions from this library inherit. +* `FileWatcher.Event` was added to notify events when a file is created, modified, or deleted. - * `SendError`: Raised for errors when sending messages. +## Bug Fixes - * `ReceiverError`: Raised for errors when receiving messages. +* `util.Select` / `util.Merge` / `util.MergeNamed`: Cancel pending tasks in `__del__` methods only if possible (the loop is not already closed). - * `ReceiverClosedError`: Raised when a receiver don't have more messages to receive. +* `FileWatcher` will now report `DELETE` events correctly. - * `ReceiverInvalidatedError`: Raised when a receiver was invalidated (for example it was converted into a `Peekable`). + Due to a bug, before this release `DELETE` events were only reported if the file was re-created before the event was triggered. diff --git a/noxfile.py b/noxfile.py index b4a0abe0..ef8b3f8f 100644 --- a/noxfile.py +++ b/noxfile.py @@ -28,14 +28,30 @@ def formatting(session: nox.Session) -> None: @nox.session def pylint(session: nox.Session) -> None: """Run pylint to do lint checks.""" - session.install("-e", ".[docs]", "pylint", "pytest", "nox") + session.install( + "-e", + ".[docs]", + "pylint", + "pytest", + "nox", + "async-solipsism", + "hypothesis", + ) session.run("pylint", *check_dirs, *check_files) @nox.session def mypy(session: nox.Session) -> None: """Run mypy to check type hints.""" - session.install("-e", ".[docs]", "pytest", "nox", "mypy") + session.install( + "-e", + ".[docs]", + "pytest", + "nox", + "mypy", + "async-solipsism", + "hypothesis", + ) common_args = [ "--namespace-packages", @@ -59,7 +75,7 @@ def mypy(session: nox.Session) -> None: @nox.session def docstrings(session: nox.Session) -> None: """Check docstring tone with pydocstyle and param descriptions with darglint.""" - session.install("pydocstyle", "darglint", "toml") + session.install("pydocstyle", "darglint", "tomli") session.run("pydocstyle", *check_dirs, *check_files) @@ -72,7 +88,14 @@ def docstrings(session: nox.Session) -> None: @nox.session def pytest(session: nox.Session) -> None: """Run all tests using pytest.""" - session.install("pytest", "pytest-cov", "pytest-mock", "pytest-asyncio") + session.install( + "pytest", + "pytest-cov", + "pytest-mock", + "pytest-asyncio", + "async-solipsism", + "hypothesis", + ) session.install("-e", ".") session.run( "pytest", diff --git a/pyproject.toml b/pyproject.toml index 6275fbbc..3ece81ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,8 +1,8 @@ [build-system] requires = [ - "setuptools >= 65.3.0, < 66", - "setuptools_scm[toml] >= 7.0.5, < 8", - "wheel" + "setuptools >= 65.3.0, < 66", + "setuptools_scm[toml] >= 7.0.5, < 8", + "wheel", ] build-backend = "setuptools.build_meta" @@ -12,36 +12,34 @@ name = "frequenz-channels" description = "Channel implementations for Python" readme = "README.md" license = { text = "MIT" } -keywords = [ "frequenz", "channel" ] +keywords = ["frequenz", "channel"] classifiers = [ - "Development Status :: 3 - Alpha", - "Intended Audience :: Developers", - "License :: OSI Approved :: MIT License", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Topic :: Software Development :: Libraries", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Topic :: Software Development :: Libraries", ] requires-python = ">= 3.8, < 4" -dependencies = [ - "watchfiles >= 0.15.0", -] -dynamic = [ "version" ] +dependencies = ["watchfiles >= 0.15.0"] +dynamic = ["version"] [[project.authors]] -name ="Frequenz Energy-as-a-Service GmbH" +name = "Frequenz Energy-as-a-Service GmbH" email = "floss@frequenz.com" [project.optional-dependencies] docs = [ - "mike >= 1.1.2, < 2", - "mkdocs-gen-files >= 0.4.0, < 0.5.0", - "mkdocs-literate-nav >= 0.4.0, < 0.5.0", - "mkdocs-material >= 8.5.7, < 9", - "mkdocs-section-index >= 0.3.4, < 0.4.0", - "mkdocstrings[python] >= 0.19.0, < 0.20.0", + "mike >= 1.1.2, < 2", + "mkdocs-gen-files >= 0.4.0, < 0.5.0", + "mkdocs-literate-nav >= 0.4.0, < 0.5.0", + "mkdocs-material >= 8.5.7, < 9", + "mkdocs-section-index >= 0.3.4, < 0.4.0", + "mkdocstrings[python] >= 0.19.0, < 0.20.0", ] [project.urls] @@ -57,16 +55,16 @@ include-package-data = true version_scheme = "post-release" [tool.pylint.similarities] -ignore-comments=['yes'] -ignore-docstrings=['yes'] -ignore-imports=['no'] -min-similarity-lines=40 +ignore-comments = ['yes'] +ignore-docstrings = ['yes'] +ignore-imports = ['no'] +min-similarity-lines = 40 [tool.pylint.messages_control] # disable wrong-import-order, ungrouped-imports because it conflicts with isort disable = ["too-few-public-methods", "wrong-import-order", "ungrouped-imports"] [tool.pylint.'DESIGN'] -max-attributes=12 +max-attributes = 12 [tool.isort] profile = "black" @@ -75,4 +73,11 @@ src_paths = ["src", "examples", "tests"] [tool.pytest.ini_options] asyncio_mode = "auto" -required_plugins = [ "pytest-asyncio", "pytest-mock" ] +required_plugins = ["pytest-asyncio", "pytest-mock"] +markers = [ + "integration: integration tests (deselect with '-m \"not integration\"')", +] + +[[tool.mypy.overrides]] +module = ["async_solipsism", "async_solipsism.*"] +ignore_missing_imports = true diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 8fa054c8..1d314087 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -7,7 +7,7 @@ from asyncio import Condition from collections import deque -from typing import Deque, Generic, Optional +from typing import Deque, Generic, Type from ._base_classes import Receiver as BaseReceiver from ._base_classes import Sender as BaseSender @@ -151,6 +151,10 @@ async def send(self, msg: T) -> None: self._chan.recv_cv.notify(1) +class _Empty: + """A sentinel value to indicate that a value has not been set.""" + + class Receiver(BaseReceiver[T]): """A receiver to receive messages from an Anycast channel. @@ -165,7 +169,7 @@ def __init__(self, chan: Anycast[T]) -> None: chan: A reference to the channel that this receiver belongs to. """ self._chan = chan - self._next: Optional[T] = None + self._next: T | Type[_Empty] = _Empty async def ready(self) -> bool: """Wait until the receiver is ready with a value or an error. @@ -179,7 +183,7 @@ async def ready(self) -> bool: Whether the receiver is still active. """ # if a message is already ready, then return immediately. - if self._next is not None: + if self._next is not _Empty: return True while len(self._chan.deque) == 0: @@ -202,12 +206,15 @@ def consume(self) -> T: ReceiverStoppedError: if the receiver stopped producing messages. ReceiverError: if there is some problem with the receiver. """ - if self._next is None and self._chan.closed: + if self._next is _Empty and self._chan.closed: raise ReceiverStoppedError(self) from ChannelClosedError(self._chan) assert ( - self._next is not None + self._next is not _Empty ), "`consume()` must be preceeded by a call to `ready()`" - next_val = self._next - self._next = None + # mypy doesn't understand that the assert above ensures that self._next is not + # _Sentinel. So we have to use a type ignore here. + next_val: T = self._next # type: ignore[assignment] + self._next = _Empty + return next_val diff --git a/src/frequenz/channels/util/__init__.py b/src/frequenz/channels/util/__init__.py index 1b0f2c67..72369a7b 100644 --- a/src/frequenz/channels/util/__init__.py +++ b/src/frequenz/channels/util/__init__.py @@ -17,25 +17,34 @@ multiple receivers into a single named stream, allowing to identify the origin of each message. +* [Timer][frequenz.channels.util.Timer]: + A [receiver][frequenz.channels.Receiver] that ticks at certain intervals. + * [Select][frequenz.channels.util.Select]: A helper to select the next available message for each [receiver][frequenz.channels.Receiver] in a group of receivers. - -* [Timer][frequenz.channels.util.Timer]: - A [receiver][frequenz.channels.Receiver] that emits a *now* `timestamp` - every `interval` seconds. """ from ._file_watcher import FileWatcher from ._merge import Merge from ._merge_named import MergeNamed from ._select import Select -from ._timer import Timer +from ._timer import ( + MissedTickPolicy, + SkipMissedAndDrift, + SkipMissedAndResync, + Timer, + TriggerAllMissed, +) __all__ = [ "FileWatcher", "Merge", "MergeNamed", - "Select", + "MissedTickPolicy", "Timer", + "Select", + "SkipMissedAndDrift", + "SkipMissedAndResync", + "TriggerAllMissed", ] diff --git a/src/frequenz/channels/util/_file_watcher.py b/src/frequenz/channels/util/_file_watcher.py index 684c708e..54a0f6a6 100644 --- a/src/frequenz/channels/util/_file_watcher.py +++ b/src/frequenz/channels/util/_file_watcher.py @@ -2,10 +2,13 @@ # Copyright © 2022 Frequenz Energy-as-a-Service GmbH """A Channel receiver for watching for new (or modified) files.""" + +from __future__ import annotations + import asyncio import pathlib +from dataclasses import dataclass from enum import Enum -from typing import List, Optional, Set, Union from watchfiles import Change, awatch from watchfiles.main import FileChange @@ -14,7 +17,7 @@ from .._exceptions import ReceiverStoppedError -class FileWatcher(Receiver[pathlib.Path]): +class FileWatcher(Receiver["FileWatcher.Event"]): """A channel receiver that watches for file events.""" class EventType(Enum): @@ -24,10 +27,19 @@ class EventType(Enum): MODIFY = Change.modified DELETE = Change.deleted + @dataclass(frozen=True) + class Event: + """A file change event.""" + + type: FileWatcher.EventType + """The type of change that was observed.""" + path: pathlib.Path + """The path where the change was observed.""" + def __init__( self, - paths: List[Union[pathlib.Path, str]], - event_types: Optional[Set[EventType]] = None, + paths: list[pathlib.Path | str], + event_types: set[EventType] | None = None, ) -> None: """Create a `FileWatcher` instance. @@ -46,15 +58,26 @@ def __init__( for path in paths ] self._awatch = awatch( - *self._paths, - stop_event=self._stop_event, - watch_filter=lambda change, path_str: ( - change in [event_type.value for event_type in event_types] # type: ignore - and pathlib.Path(path_str).is_file() - ), + *self._paths, stop_event=self._stop_event, watch_filter=self._filter_events ) - self._awatch_stopped_exc: Optional[Exception] = None - self._changes: Set[FileChange] = set() + self._awatch_stopped_exc: Exception | None = None + self._changes: set[FileChange] = set() + + def _filter_events( + self, + change: Change, + path: str, # pylint: disable=unused-argument + ) -> bool: + """Filter events based on the event type and path. + + Args: + change: The type of change to be notified. + path: The path of the file that changed. + + Returns: + Whether the event should be notified. + """ + return change in [event_type.value for event_type in self.event_types] def __del__(self) -> None: """Cleanup registered watches. @@ -91,11 +114,11 @@ async def ready(self) -> bool: return True - def consume(self) -> pathlib.Path: - """Return the latest change once `ready` is complete. + def consume(self) -> Event: + """Return the latest event once `ready` is complete. Returns: - The next change that was received. + The next event that was received. Raises: ReceiverStoppedError: if there is some problem with the receiver. @@ -104,8 +127,8 @@ def consume(self) -> pathlib.Path: raise ReceiverStoppedError(self) from self._awatch_stopped_exc assert self._changes, "`consume()` must be preceeded by a call to `ready()`" - change = self._changes.pop() # Tuple of (Change, path) returned by watchfiles - _, path_str = change - path = pathlib.Path(path_str) - return path + change, path_str = self._changes.pop() + return FileWatcher.Event( + type=FileWatcher.EventType(change), path=pathlib.Path(path_str) + ) diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/util/_merge.py index 56882959..37570b47 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/util/_merge.py @@ -46,7 +46,8 @@ def __init__(self, *args: Receiver[T]) -> None: def __del__(self) -> None: """Cleanup any pending tasks.""" for task in self._pending: - task.cancel() + if not task.done() and task.get_loop().is_running(): + task.cancel() async def stop(self) -> None: """Stop the `Merge` instance and cleanup any pending tasks.""" diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/util/_merge_named.py index f8c85be6..2267ef6a 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/util/_merge_named.py @@ -34,7 +34,8 @@ def __init__(self, **kwargs: Receiver[T]) -> None: def __del__(self) -> None: """Cleanup any pending tasks.""" for task in self._pending: - task.cancel() + if not task.done() and task.get_loop().is_running(): + task.cancel() async def stop(self) -> None: """Stop the `MergeNamed` instance and cleanup any pending tasks.""" diff --git a/src/frequenz/channels/util/_select.py b/src/frequenz/channels/util/_select.py index 9c99c5dd..7aee5d8d 100644 --- a/src/frequenz/channels/util/_select.py +++ b/src/frequenz/channels/util/_select.py @@ -108,7 +108,8 @@ def __init__(self, **kwargs: Receiver[Any]) -> None: def __del__(self) -> None: """Cleanup any pending tasks.""" for task in self._pending: - task.cancel() + if not task.done() and task.get_loop().is_running(): + task.cancel() async def stop(self) -> None: """Stop the `Select` instance and cleanup any pending tasks.""" diff --git a/src/frequenz/channels/util/_timer.py b/src/frequenz/channels/util/_timer.py index 3b0725a3..846b4308 100644 --- a/src/frequenz/channels/util/_timer.py +++ b/src/frequenz/channels/util/_timer.py @@ -1,135 +1,664 @@ # License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH -"""A timer receiver that returns the timestamp every `interval`.""" +"""A timer receiver that ticks every `interval`. +Note: + This module always use `int`s to represent time. The time is always in + microseconds, so the timer resolution is 1 microsecond. + + This is to avoid floating point errors when performing calculations with + time, which can lead to very hard to reproduce, and debug, issues. +""" + +from __future__ import annotations + +import abc import asyncio -from datetime import datetime, timedelta, timezone -from typing import Optional +from datetime import timedelta from .._base_classes import Receiver from .._exceptions import ReceiverStoppedError -class Timer(Receiver[datetime]): - """A timer receiver that returns the timestamp every `interval` seconds. +def _to_microseconds(time: float | timedelta) -> int: + """Convert a time or a timedelta to microseconds. + + Args: + time: The time to convert. If it is a `float`, it is assumed to be in + seconds. + + Returns: + The time in microseconds. + """ + if isinstance(time, timedelta): + time = time.total_seconds() + return round(time * 1_000_000) + + +class MissedTickPolicy(abc.ABC): + """A policy to handle timer missed ticks. + + This is only relevant if the timer is not ready to trigger when it should + (an interval passed) which can happen if the event loop is busy processing + other tasks. + """ + + @abc.abstractmethod + def calculate_next_tick_time( + self, *, interval: int, scheduled_tick_time: int, now: int + ) -> int: + """Calculate the next tick time according to `missed_tick_policy`. + + This method is called by `ready()` after it has determined that the + timer has triggered. It will check if the timer has missed any ticks + and handle them according to `missed_tick_policy`. + + Args: + interval: The interval between ticks (in microseconds). + scheduled_tick_time: The time the current tick was scheduled to + trigger (in microseconds). + now: The current loop time (in microseconds). + + Returns: + The next tick time (in microseconds) according to + `missed_tick_policy`. + """ + return 0 # dummy value to avoid darglint warnings + + +class TriggerAllMissed(MissedTickPolicy): + """A policy that triggers all the missed ticks immediately until it catches up. + + Example: + Assume a timer with interval 1 second, the tick `T0` happens exactly + at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds + late), so it trigges immediately. The third tick, `T2`, happens at + time 2.3 (0.3 seconds late), so it also triggers immediately. The + fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also + triggers immediately as well as the fifth tick, `T4`, which was also + already delayed (by 0.3 seconds), so it catches up. The sixth tick, + `T5`, happens at 5.1 (0.1 seconds late), so it triggers immediately + again. The seventh tick, `T6`, happens at 6.0, right on time. + + ``` + 0 1 2 3 4 o 5 6 + o---------|-o-------|--o------|---------|--o------|o--------o-----> time + T0 T1 T2 T3 T5 T6 + T4 + ``` + """ + + def calculate_next_tick_time( + self, *, now: int, scheduled_tick_time: int, interval: int + ) -> int: + """Calculate the next tick time. + + This method always returns `scheduled_tick_time + interval`, as all + ticks need to produce a trigger event. + + Args: + now: The current loop time (in microseconds). + scheduled_tick_time: The time the current tick was scheduled to + trigger (in microseconds). + interval: The interval between ticks (in microseconds). + + Returns: + The next tick time (in microseconds). + """ + return scheduled_tick_time + interval + + +class SkipMissedAndResync(MissedTickPolicy): + """A policy that drops all the missed ticks, triggers immediately and resyncs. + + If ticks are missed, the timer will trigger immediately returing the drift + and it will schedule to trigger again on the next multiple of `interval`, + effectively skipping any missed ticks, but resyncing with the original start + time. + + Example: + Assume a timer with interval 1 second, the tick `T0` happens exactly + at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds + late), so it trigges immediately. The third tick, `T2`, happens at + time 2.3 (0.3 seconds late), so it also triggers immediately. The + fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also + triggers immediately but the fifth tick, `T4`, which was also + already delayed (by 0.3 seconds) is skipped. The sixth tick, + `T5`, happens at 5.1 (0.1 seconds late), so it triggers immediately + again. The seventh tick, `T6`, happens at 6.0, right on time. + + ``` + 0 1 2 3 4 o 5 6 + o---------|-o-------|--o------|---------|--o------|o--------o-----> time + T0 T1 T2 T3 T5 T6 + ``` + """ + + def calculate_next_tick_time( + self, *, now: int, scheduled_tick_time: int, interval: int + ) -> int: + """Calculate the next tick time. + + Calculate the next multiple of `interval` after `scheduled_tick_time`. - Primarily for use with [Select][frequenz.channels.util.Select]. + Args: + now: The current loop time (in microseconds). + scheduled_tick_time: The time the current tick was scheduled to + trigger (in microseconds). + interval: The interval between ticks (in microseconds). + + Returns: + The next tick time (in microseconds). + """ + # We need to resync (align) the next tick time to the current time + drift = now - scheduled_tick_time + delta_to_next_tick = interval - (drift % interval) + return now + delta_to_next_tick + + +class SkipMissedAndDrift(MissedTickPolicy): + """A policy that drops all the missed ticks, triggers immediately and resets. - The timestamp generated is a timezone-aware datetime using UTC as timezone. + This will behave effectively as if the timer was `reset()` at the time it + had triggered last, so the start time will change (and the drift will be + accumulated each time a tick is delayed, but only the relative drift will + be returned on each tick). + + The reset happens only if the delay is larger than `delay_tolerance`, so + it is possible to ignore small delays and not drift in those cases. Example: - When you want something to happen with a fixed period: + Assume a timer with interval 1 second and `delay_tolerance=0.1`, the + first tick, `T0`, happens exactly at time 0, the second tick, `T1`, + happens at time 1.2 (0.2 seconds late), so the timer triggers + immmediately but drifts a bit. The next tick, `T2.2`, happens at 2.3 seconds + (0.1 seconds late), so it also triggers immediately but it doesn't + drift because the delay is under the `delay_tolerance`. The next tick, + `T3.2`, triggers at 4.3 seconds (1.1 seconds late), so it also triggers + immediately but the timer drifts by 1.1 seconds and the tick `T4.2` is + skipped (not triggered). The next tick, `T5.3`, triggers at 5.3 seconds + so is right on time (no drift) and the same happens for tick `T6.3`, + which triggers at 6.3 seconds. + + ``` + 0 1 2 3 4 5 6 + o---------|-o-------|--o------|---------|--o------|--o------|--o--> time + T0 T1 T2.2 T3.2 T5.3 T6.3 + ``` + """ + + def __init__(self, *, delay_tolerance: timedelta = timedelta(0)): + """ + Create an instance. + + See the class documenation for more details. + + Args: + delay_tolerance: The maximum delay that is tolerated before + starting to drift. If a tick is delayed less than this, then + it is not considered a missed tick and the timer doesn't + accumulate this drift. + + Raises: + ValueError: If `delay_tolerance` is negative. + """ + self._tolerance: int = _to_microseconds(delay_tolerance) + """The maximum allowed delay before starting to drift.""" + + if self._tolerance < 0: + raise ValueError("delay_tolerance must be positive") + + @property + def delay_tolerance(self) -> timedelta: + """Return the maximum delay that is tolerated before starting to drift. + + Returns: + The maximum delay that is tolerated before starting to drift. + """ + return timedelta(microseconds=self._tolerance) + + def calculate_next_tick_time( + self, *, now: int, scheduled_tick_time: int, interval: int + ) -> int: + """Calculate the next tick time. + + If the drift is larger than `delay_tolerance`, then it returns `now + + interval` (so the timer drifts), otherwise it returns + `scheduled_tick_time + interval` (we consider the delay too small and + avoid small drifts). + + Args: + now: The current loop time (in microseconds). + scheduled_tick_time: The time the current tick was scheduled to + trigger (in microseconds). + interval: The interval between ticks (in microseconds). + + Returns: + The next tick time (in microseconds). + """ + drift = now - scheduled_tick_time + if drift > self._tolerance: + return now + interval + return scheduled_tick_time + interval + + +class Timer(Receiver[timedelta]): + """A timer receiver that triggers every `interval` time. + + The timer as microseconds resolution, so the `interval` must be at least + 1 microsecond. + + The message it produces is a `timedelta` containing the drift of the timer, + i.e. the difference between when the timer should have triggered and the time + when it actually triggered. + + This drift will likely never be `0`, because if there is a task that is + running when it should trigger, the timer will be delayed. In this case the + drift will be positive. A negative drift should be technically impossible, + as the timer uses `asyncio`s loop monotonic clock. + + If the timer is delayed too much, then the timer will behave according to + the `missed_tick_policy`. Missing ticks might or might not trigger + a message and the drift could be accumulated or not depending on the + chosen policy. + + The timer accepts an optional `loop`, which will be used to track the time. + If `loop` is `None`, then the running loop will be used (if there is no + running loop most calls will raise a `RuntimeError`). + + Starting the timer can be delayed if necessary by using `auto_start=False` + (for example until we have a running loop). A call to `reset()`, `ready()`, + `receive()` or the async iterator interface to await for a new message will + start the timer. + + For the most common cases, a specialized constructor is provided: + + * [`periodic()`][frequenz.channels.util.Timer.periodic] + * [`timeout()`][frequenz.channels.util.Timer.timeout] + + Example: Periodic timer example + ```python + async for drift in Timer.periodic(timedelta(seconds=1.0)): + print(f"The timer has triggered {drift=}") + ``` + + But you can also use [`Select`][frequenz.channels.util.Select] to combine it + with other receivers, and even start it (semi) manually: ```python - timer = Timer(30.0) - select = Select(bat_1 = receiver1, timer = timer) + timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) + # Do some other initialization, the timer will start automatically if + # a message is awaited (or manually via `reset()`). + select = Select(bat_1=receiver1, timer=timer) while await select.ready(): if msg := select.bat_1: if val := msg.inner: process_data(val) else: logging.warn("battery channel closed") - if ts := select.timer: - # something to do once every 30 seconds - pass + elif drift := select.timer: + # Print some regular battery data + print(f"Battery is charged at {battery.soc}%") + if stop_logging: + timer.stop() + elif start_logging: + timer.reset() ``` - When you want something to happen when nothing else has happened in a - certain interval: - + Example: Timeout example ```python - timer = Timer(30.0) - select = Select(bat_1 = receiver1, timer = timer) + timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) + select = Select(bat_1=receiver1, heavy_process=receiver2, timeout=timer) while await select.ready(): - timer.reset() if msg := select.bat_1: if val := msg.inner: process_data(val) + timer.reset() else: logging.warn("battery channel closed") - if ts := select.timer: - # something to do if there's no battery data for 30 seconds - pass + if msg := select.heavy_process: + if val := msg.inner: + do_heavy_processing(val) + else: + logging.warn("processing channel closed") + elif drift := select.timeout: + logging.warn("No data received in time") ``` + + In this case `do_heavy_processing` might take 2 seconds, and we don't + want our timeout timer to trigger for the missed ticks, and want the + next tick to be relative to the time timer was last triggered. """ - def __init__(self, interval: float) -> None: - """Create a `Timer` instance. + def __init__( + self, + interval: timedelta, + missed_tick_policy: MissedTickPolicy, + /, + *, + auto_start: bool = True, + loop: asyncio.AbstractEventLoop | None = None, + ) -> None: + """Create an instance. + + See the class documentation for details. Args: - interval: number of seconds between messages. + interval: The time between timer ticks. Must be at least + 1 microsecond. + missed_tick_policy: The policy of the timer when it misses + a tick. See the documentation of `MissedTickPolicy` for + details. + auto_start: Whether the timer should be started when the + instance is created. This can only be `True` if there is + already a running loop or an explicit `loop` that is running + was passed. + loop: The event loop to use to track time. If `None`, + `asyncio.get_running_loop()` will be used. + + Raises: + RuntimeError: if it was called without a loop and there is no + running loop. + ValueError: if `interval` is not positive or is smaller than 1 + microsecond. """ - self._stopped = False - self._interval = timedelta(seconds=interval) - self._next_msg_time = datetime.now(timezone.utc) + self._interval - self._now: Optional[datetime] = None + self._interval: int = _to_microseconds(interval) + """The time to between timer ticks.""" + + self._missed_tick_policy: MissedTickPolicy = missed_tick_policy + """The policy of the timer when it misses a tick. + + See the documentation of `MissedTickPolicy` for details. + """ + + self._loop: asyncio.AbstractEventLoop = ( + loop if loop is not None else asyncio.get_running_loop() + ) + """The event loop to use to track time.""" + + self._stopped: bool = True + """Whether the timer was requested to stop. + + If this is `False`, then the timer is running. + + If this is `True`, then it is stopped or there is a request to stop it + or it was not started yet: + + * If `_next_msg_time` is `None`, it means it wasn't started yet (it was + created with `auto_start=False`). Any receiving method will start + it by calling `reset()` in this case. + + * If `_next_msg_time` is not `None`, it means there was a request to + stop it. In this case receiving methods will raise + a `ReceiverClosedError`. + """ + + self._next_tick_time: int | None = None + """The absolute (monotonic) time when the timer should trigger. + + If this is `None`, it means the timer didn't start yet, but it should + be started as soon as it is used. + """ + + self._current_drift: timedelta | None = None + """The difference between `_next_msg_time` and the triggered time. + + This is calculated by `ready()` but is returned by `consume()`. If + `None` it means `ready()` wasn't called and `consume()` will assert. + `consume()` will set it back to `None` to tell `ready()` that it needs + to wait again. + """ + + if self._interval <= 0: + raise ValueError( + "The `interval` must be positive and at least 1 microsecond, " + f"not {interval} ({self._interval} microseconds)" + ) + + if auto_start: + self.reset() + + @classmethod + def timeout( + cls, + delay: timedelta, + /, + *, + auto_start: bool = True, + loop: asyncio.AbstractEventLoop | None = None, + ) -> Timer: + """Create a timer useful for tracking timeouts. + + This is basically a shortcut to create a timer with + `SkipMissedAndDrift(delay_tolerance=timedelta(0))` as the missed tick policy. + + See the class documentation for details. + + Args: + delay: The time until the timer ticks. Must be at least + 1 microsecond. + auto_start: Whether the timer should be started when the + instance is created. This can only be `True` if there is + already a running loop or an explicit `loop` that is running + was passed. + loop: The event loop to use to track time. If `None`, + `asyncio.get_running_loop()` will be used. + + Returns: + The timer instance. + + Raises: + RuntimeError: if it was called without a loop and there is no + running loop. + ValueError: if `interval` is not positive or is smaller than 1 + microsecond. + """ + return Timer( + delay, + SkipMissedAndDrift(delay_tolerance=timedelta(0)), + auto_start=auto_start, + loop=loop, + ) + + @classmethod + def periodic( + cls, + period: timedelta, + /, + *, + skip_missed_ticks: bool = False, + auto_start: bool = True, + loop: asyncio.AbstractEventLoop | None = None, + ) -> Timer: + """Create a periodic timer. + + This is basically a shortcut to create a timer with either + `TriggerAllMissed()` or `SkipMissedAndResync()` as the missed tick policy + (depending on `skip_missed_ticks`). + + See the class documentation for details. + + Args: + period: The time between timer ticks. Must be at least + 1 microsecond. + skip_missed_ticks: Whether to skip missed ticks or trigger them + all until it catches up. + auto_start: Whether the timer should be started when the + instance is created. This can only be `True` if there is + already a running loop or an explicit `loop` that is running + was passed. + loop: The event loop to use to track time. If `None`, + `asyncio.get_running_loop()` will be used. + + Returns: + The timer instance. + + Raises: + RuntimeError: if it was called without a loop and there is no + running loop. + ValueError: if `interval` is not positive or is smaller than 1 + microsecond. + """ + missed_tick_policy = ( + SkipMissedAndResync() if skip_missed_ticks else TriggerAllMissed() + ) + return Timer( + period, + missed_tick_policy, + auto_start=auto_start, + loop=loop, + ) + + @property + def interval(self) -> timedelta: + """The interval between timer ticks. + + Returns: + The interval between timer ticks. + """ + return timedelta(microseconds=self._interval) + + @property + def missed_tick_policy(self) -> MissedTickPolicy: + """The policy of the timer when it misses a tick. + + Returns: + The policy of the timer when it misses a tick. + """ + return self._missed_tick_policy + + @property + def loop(self) -> asyncio.AbstractEventLoop: + """The event loop used by the timer to track time. + + Returns: + The event loop used by the timer to track time. + """ + return self._loop + + @property + def is_running(self) -> bool: + """Whether the timer is running. + + This will be `False` if the timer was stopped, or not started yet. + + Returns: + Whether the timer is running. + """ + return not self._stopped def reset(self) -> None: - """Reset the timer to start timing from `now`.""" - self._next_msg_time = datetime.now(timezone.utc) + self._interval + """Reset the timer to start timing from now. + + If the timer was stopped, or not started yet, it will be started. + + This can only be called with a running loop, see the class + documentation for more details. + + Raises: + RuntimeError: if it was called without a running loop. + """ + self._stopped = False + self._next_tick_time = self._now() + self._interval + self._current_drift = None def stop(self) -> None: """Stop the timer. - Once `stop` has been called, all subsequent calls to - [receive()][frequenz.channels.Receiver.receive] will immediately - return `None`. + Once `stop` has been called, all subsequent calls to `ready()` will + immediately return False and calls to `consume()` / `receive()` or any + use of the async iterator interface will raise + a `ReceiverStoppedError`. + + You can restart the timer with `reset()`. """ self._stopped = True + # We need to make sure it's not None, otherwise `ready()` will start it + self._next_tick_time = self._now() async def ready(self) -> bool: - """Wait until the receiver is ready with a value or an error. + """Wait until the timer `interval` passed. + + Once a call to `ready()` has finished, the resulting tick information + must be read with a call to `consume()` (`receive()` or iterated over) + to tell the timer it should wait for the next interval. - Once a call to `ready()` has finished, the value should be read with - a call to `consume()` (`receive()` or iterated over). The receiver will - remain ready (this method will return immediately) until it is - consumed. + The timer will remain ready (this method will return immediately) + until it is consumed. Returns: - Whether the receiver is still active. + Whether the timer was started and it is still running. + + Raises: + RuntimeError: if it was called without a running loop. """ - # if there are messages waiting to be consumed, return immediately. - if self._now is not None: + # If there are messages waiting to be consumed, return immediately. + if self._current_drift is not None: return True + # If `_next_tick_time` is `None`, it means it was created with + # `auto_start=False` and should be started. + if self._next_tick_time is None: + self.reset() + assert ( + self._next_tick_time is not None + ), "This should be assigned by reset()" + + # If a stop was explicitly requested, we bail out. if self._stopped: return False - now = datetime.now(timezone.utc) - diff = self._next_msg_time - now - while diff.total_seconds() > 0: - await asyncio.sleep(diff.total_seconds()) - now = datetime.now(timezone.utc) - diff = self._next_msg_time - now - self._now = now + now = self._now() + time_to_next_tick = self._next_tick_time - now + # If we didn't reach the tick yet, sleep until we do. + if time_to_next_tick > 0: + await asyncio.sleep(time_to_next_tick / 1_000_000) + now = self._now() - self._next_msg_time = self._now + self._interval + # If a stop was explicitly requested during the sleep, we bail out. + if self._stopped: + return False + + self._current_drift = timedelta(microseconds=now - self._next_tick_time) + self._next_tick_time = self._missed_tick_policy.calculate_next_tick_time( + now=now, + scheduled_tick_time=self._next_tick_time, + interval=self._interval, + ) return True - def consume(self) -> datetime: - """Return the latest value once `ready` is complete. + def consume(self) -> timedelta: + """Return the latest drift once `ready()` is complete. + + Once the timer has triggered (`ready()` is done), this method returns the + difference between when the timer should have triggered and the time when + it actually triggered. See the class documentation for more details. Returns: - The time of the next tick in UTC or `None` if - [stop()][frequenz.channels.util.Timer.stop] has been called on - the timer. + The difference between when the timer should have triggered and the + time when it actually did. Raises: - ReceiverStoppedError: if the receiver stopped producing messages. - ReceiverError: if there is some problem with the receiver. - - Changelog: - * **v0.11.0:** Returns a timezone-aware datetime with UTC timezone - instead of a native datetime object. + ReceiverStoppedError: if the timer was stopped via `stop()`. """ - if self._stopped: + # If it was stopped and there it no pending result, we raise + # (if there is a pending result, then we still want to return it first) + if self._stopped and self._current_drift is None: raise ReceiverStoppedError(self) assert ( - self._now is not None - ), "`consume()` must be preceeded by a call to `ready()`" - now = self._now - self._now = None - return now + self._current_drift is not None + ), "calls to `consume()` must be follow a call to `ready()`" + drift = self._current_drift + self._current_drift = None + return drift + + def _now(self) -> int: + """Return the current monotonic clock time in microseconds. + + Returns: + The current monotonic clock time in microseconds. + """ + return _to_microseconds(self._loop.time()) diff --git a/tests/test_anycast.py b/tests/test_anycast.py index 9b38f4e6..5a5e8c1e 100644 --- a/tests/test_anycast.py +++ b/tests/test_anycast.py @@ -3,6 +3,8 @@ """Tests for the Channel implementation.""" +from __future__ import annotations + import asyncio import pytest @@ -147,6 +149,23 @@ async def test_anycast_full() -> None: assert False +async def test_anycast_none_values() -> None: + """Ensure None values can be sent and received.""" + acast: Anycast[int | None] = Anycast() + + sender = acast.new_sender() + receiver = acast.new_receiver() + + await sender.send(5) + assert await receiver.receive() == 5 + + await sender.send(None) + assert await receiver.receive() is None + + await sender.send(10) + assert await receiver.receive() == 10 + + async def test_anycast_async_iterator() -> None: """Check that the anycast receiver works as an async iterator.""" acast: Anycast[str] = Anycast() diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index f3e52ecc..566252c0 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -3,6 +3,8 @@ """Tests for the Broadcast implementation.""" +from __future__ import annotations + import asyncio from typing import Tuple @@ -69,6 +71,23 @@ async def update_tracker_on_receive(receiver_id: int, recv: Receiver[int]) -> No assert actual_sum == expected_sum +async def test_broadcast_none_values() -> None: + """Ensure None values can be sent and received.""" + bcast: Broadcast[int | None] = Broadcast("any_channel") + + sender = bcast.new_sender() + receiver = bcast.new_receiver() + + await sender.send(5) + assert await receiver.receive() == 5 + + await sender.send(None) + assert await receiver.receive() is None + + await sender.send(10) + assert await receiver.receive() == 10 + + async def test_broadcast_after_close() -> None: """Ensure closed channels can't get new messages.""" bcast: Broadcast[int] = Broadcast("meter_5") diff --git a/tests/utils/test_file_watcher.py b/tests/utils/test_file_watcher.py index 2a640230..c7f4de5e 100644 --- a/tests/utils/test_file_watcher.py +++ b/tests/utils/test_file_watcher.py @@ -3,61 +3,101 @@ """Tests for `channel.FileWatcher`.""" -import os +from __future__ import annotations + import pathlib +from collections.abc import AsyncGenerator, Iterator, Sequence +from typing import Any +from unittest import mock -from frequenz.channels.util import FileWatcher, Select, Timer +import hypothesis +import hypothesis.strategies as st +import pytest +from watchfiles import Change +from watchfiles.main import FileChange +from frequenz.channels.util import FileWatcher -async def test_file_watcher(tmp_path: pathlib.Path) -> None: - """Ensure file watcher is returning paths on file events. - Args: - tmp_path (pathlib.Path): A tmp directory to run the file watcher on. - Created by pytest. - """ - filename = tmp_path / "test-file" - file_watcher = FileWatcher(paths=[str(tmp_path)]) +class _FakeAwatch: + """Fake awatch class to mock the awatch function.""" - number_of_writes = 0 - expected_number_of_writes = 3 + def __init__(self, changes: Sequence[FileChange] = ()) -> None: + """Create a `_FakeAwatch` instance. - select = Select(timer=Timer(0.1), file_watcher=file_watcher) - while await select.ready(): - if msg := select.timer: - filename.write_text(f"{msg.inner}") - elif msg := select.file_watcher: - assert msg.inner == filename - number_of_writes += 1 - # After receiving a write 3 times, unsubscribe from the writes channel - if number_of_writes == expected_number_of_writes: - break + Args: + changes: A sequence of file changes to be returned by the fake awatch + function. + """ + self.changes: Sequence[FileChange] = changes - assert number_of_writes == expected_number_of_writes + async def fake_awatch( + self, *paths: str, **kwargs: Any # pylint: disable=unused-argument + ) -> AsyncGenerator[set[FileChange], None]: + """Fake awatch function. + Args: + paths: Paths to watch. + kwargs: Keyword arguments to pass to the awatch function. + """ + for change in self.changes: + yield {change} -async def test_file_watcher_change_types(tmp_path: pathlib.Path) -> None: - """Ensure file watcher is returning paths only on the DELETE change. - Args: - tmp_path (pathlib.Path): A tmp directory to run the file watcher on. - Created by pytest. - """ - filename = tmp_path / "test-file" - file_watcher = FileWatcher( - paths=[str(tmp_path)], event_types={FileWatcher.EventType.DELETE} - ) +@pytest.fixture +def fake_awatch() -> Iterator[_FakeAwatch]: + """Fixture to mock the awatch function.""" + fake = _FakeAwatch() + with mock.patch( + "frequenz.channels.util._file_watcher.awatch", + autospec=True, + side_effect=fake.fake_awatch, + ): + yield fake + - select = Select( - write_timer=Timer(0.1), deletion_timer=Timer(0.5), watcher=file_watcher +async def test_file_watcher_receive_updates( + fake_awatch: _FakeAwatch, # pylint: disable=redefined-outer-name +) -> None: + """Test the file watcher receive the expected events.""" + filename = "test-file" + changes = ( + (Change.added, filename), + (Change.deleted, filename), + (Change.modified, filename), ) - number_of_receives = 0 - while await select.ready(): - if msg := select.write_timer: - filename.write_text(f"{msg.inner}") - elif _ := select.deletion_timer: - os.remove(filename) - elif _ := select.watcher: - number_of_receives += 1 - break - assert number_of_receives == 1 + fake_awatch.changes = changes + file_watcher = FileWatcher(paths=[filename]) + + for change in changes: + recv_changes = await file_watcher.receive() + event_type = FileWatcher.EventType(change[0]) + path = pathlib.Path(change[1]) + assert recv_changes == FileWatcher.Event(type=event_type, path=path) + + +@hypothesis.given(event_types=st.sets(st.sampled_from(FileWatcher.EventType))) +async def test_file_watcher_filter_events( + event_types: set[FileWatcher.EventType], +) -> None: + """Test the file watcher events filtering.""" + good_path = "good-file" + + # We need to reset the mock explicitly because hypothesis runs all the produced + # inputs in the same context. + with mock.patch( + "frequenz.channels.util._file_watcher.awatch", autospec=True + ) as awatch_mock: + file_watcher = FileWatcher(paths=[good_path], event_types=event_types) + + filter_events = file_watcher._filter_events # pylint: disable=protected-access + + assert awatch_mock.mock_calls == [ + mock.call( + pathlib.Path(good_path), stop_event=mock.ANY, watch_filter=filter_events + ) + ] + for event_type in FileWatcher.EventType: + assert filter_events(event_type.value, good_path) == ( + event_type in event_types + ) diff --git a/tests/utils/test_integration.py b/tests/utils/test_integration.py new file mode 100644 index 00000000..25700784 --- /dev/null +++ b/tests/utils/test_integration.py @@ -0,0 +1,114 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Integration tests for the `util` module.""" + +from __future__ import annotations + +import os +import pathlib +from datetime import timedelta + +import pytest + +from frequenz.channels.util import FileWatcher, Select, Timer + + +@pytest.mark.integration +async def test_file_watcher(tmp_path: pathlib.Path) -> None: + """Ensure file watcher is returning paths on file events. + + Args: + tmp_path (pathlib.Path): A tmp directory to run the file watcher on. + Created by pytest. + """ + filename = tmp_path / "test-file" + file_watcher = FileWatcher(paths=[str(tmp_path)]) + + number_of_writes = 0 + expected_number_of_writes = 3 + + select = Select( + timer=Timer.timeout(timedelta(seconds=0.1)), + file_watcher=file_watcher, + ) + while await select.ready(): + if msg := select.timer: + filename.write_text(f"{msg.inner}") + elif msg := select.file_watcher: + event_type = ( + FileWatcher.EventType.CREATE + if number_of_writes == 0 + else FileWatcher.EventType.MODIFY + ) + assert msg.inner == FileWatcher.Event(type=event_type, path=filename) + number_of_writes += 1 + # After receiving a write 3 times, unsubscribe from the writes channel + if number_of_writes == expected_number_of_writes: + break + + assert number_of_writes == expected_number_of_writes + + +@pytest.mark.integration +async def test_file_watcher_deletes(tmp_path: pathlib.Path) -> None: + """Ensure file watcher is returning paths only on the DELETE change. + + Also ensures that DELETE events are sent even if the file was recreated and even if + the file doesn't exist. + + Args: + tmp_path (pathlib.Path): A tmp directory to run the file watcher on. + Created by pytest. + """ + filename = tmp_path / "test-file" + file_watcher = FileWatcher( + paths=[str(tmp_path)], event_types={FileWatcher.EventType.DELETE} + ) + + select = Select( + write_timer=Timer.timeout(timedelta(seconds=0.1)), + deletion_timer=Timer.timeout(timedelta(seconds=0.25)), + watcher=file_watcher, + ) + number_of_write = 0 + number_of_deletes = 0 + number_of_events = 0 + # We want to write to a file and then removed, and then write again (create it + # again) and remove it again and then stop. + # Because awatch takes some time to get notified by the OS, we need to stop writing + # while a delete was done, to make sure the file is not recreated before the + # deletion event arrives. + # For the second round of writes and then delete, we allow writing after the delete + # was done as an extra test. + # + # This is an example timeline for this test: + # + # |-----|--.--|-----|---o-|-----|--.--|-----|--o--|-----|-----|-----|-----|-----| + # W W D E W W D W W E + # + # Where: + # W: Write + # D: Delete + # E: FileWatcher Event + while await select.ready(): + if msg := select.write_timer: + if number_of_write >= 2 and number_of_events == 0: + continue + filename.write_text(f"{msg.inner}") + number_of_write += 1 + elif _ := select.deletion_timer: + # Avoid removing the file twice + if not pathlib.Path(filename).is_file(): + continue + os.remove(filename) + number_of_deletes += 1 + elif _ := select.watcher: + number_of_events += 1 + if number_of_events >= 2: + break + + assert number_of_deletes == 2 + # Can be more because the watcher could take some time to trigger + assert number_of_write >= 3 + assert number_of_events == 2 diff --git a/tests/utils/test_timer.py b/tests/utils/test_timer.py index bdcdb704..a264806f 100644 --- a/tests/utils/test_timer.py +++ b/tests/utils/test_timer.py @@ -1,92 +1,538 @@ # License: MIT # Copyright © 2022 Frequenz Energy-as-a-Service GmbH -"""Tests for `channel.Timer`.""" +"""Tests for the timer.""" + +from __future__ import annotations import asyncio -import logging -from dataclasses import dataclass -from datetime import datetime, timezone -from typing import Optional - -from frequenz.channels import Anycast, Sender -from frequenz.channels.util import Select, Timer - -logger = logging.Logger(__name__) - - -async def test_timer() -> None: - """Ensure Timer messages arrive on time.""" - - @dataclass - class _TestCase: - count: int - delta: float - - # if the fast test doesn't pass, we'll switch to a slow test. - test_cases = [ - _TestCase(count=10, delta=0.05), - _TestCase(count=6, delta=0.2), - _TestCase(count=6, delta=0.5), - ] - fail_count = 0 - for test_case in test_cases: - start = datetime.now(timezone.utc) - count = 0 - async for _ in Timer(test_case.delta): - count += 1 - if count >= test_case.count: - break - actual_duration = (datetime.now(timezone.utc) - start).total_seconds() - expected_duration = test_case.delta * test_case.count - tolerance = expected_duration * 0.1 - - assert actual_duration >= expected_duration - if actual_duration <= expected_duration + tolerance: - break - logger.error( - "%s failed :: actual_duration=%s > expected_duration=%s + tolerance=%s", - test_case, - actual_duration, - expected_duration, - tolerance, +import enum +from collections.abc import Iterator +from datetime import timedelta + +import async_solipsism +import hypothesis +import pytest +from hypothesis import strategies as st + +from frequenz.channels.util import ( + SkipMissedAndDrift, + SkipMissedAndResync, + Timer, + TriggerAllMissed, +) + + +# Setting 'autouse' has no effect as this method replaces the event loop for all tests in the file. +@pytest.fixture() +def event_loop() -> Iterator[async_solipsism.EventLoop]: + """Replace the loop with one that doesn't interact with the outside world.""" + loop = async_solipsism.EventLoop() + yield loop + loop.close() + + +_max_timedelta_microseconds = ( + int( + timedelta.max.total_seconds() * 1_000_000, + ) + - 1 +) + +_min_timedelta_microseconds = ( + int( + timedelta.min.total_seconds() * 1_000_000, + ) + + 1 +) + +_calculate_next_tick_time_args = { + "now": st.integers(), + "scheduled_tick_time": st.integers(), + "interval": st.integers(min_value=1, max_value=_max_timedelta_microseconds), +} + + +def _assert_tick_is_aligned( + next_tick_time: int, now: int, scheduled_tick_time: int, interval: int +) -> None: + # Can be equals if scheduled_tick_time == now + assert next_tick_time <= now + interval + assert (next_tick_time - scheduled_tick_time) % interval == pytest.approx(0.0) + + +@hypothesis.given(**_calculate_next_tick_time_args) +def test_policy_trigger_all_missed( + now: int, scheduled_tick_time: int, interval: int +) -> None: + """Test the TriggerAllMissed policy.""" + hypothesis.assume(now >= scheduled_tick_time) + assert ( + TriggerAllMissed().calculate_next_tick_time( + now=now, interval=interval, scheduled_tick_time=scheduled_tick_time + ) + == scheduled_tick_time + interval + ) + + +def test_policy_trigger_all_missed_examples() -> None: + """Test the TriggerAllMissed policy with minimal examples. + + This is just a sanity check to make sure we are not missing to test any important + properties with the hypothesis tests. + """ + policy = TriggerAllMissed() + assert ( + policy.calculate_next_tick_time( + now=10_200_000, scheduled_tick_time=9_000_000, interval=1_000_000 + ) + == 10_000_000 + ) + assert ( + policy.calculate_next_tick_time( + now=10_000_000, scheduled_tick_time=9_000_000, interval=1_000_000 + ) + == 10_000_000 + ) + assert ( + policy.calculate_next_tick_time( + now=10_500_000, scheduled_tick_time=1_000_000, interval=1_000_000 + ) + == 2_000_000 + ) + + +@hypothesis.given(**_calculate_next_tick_time_args) +def test_policy_skip_missed_and_resync( + now: int, scheduled_tick_time: int, interval: int +) -> None: + """Test the SkipMissedAndResync policy.""" + hypothesis.assume(now >= scheduled_tick_time) + + next_tick_time = SkipMissedAndResync().calculate_next_tick_time( + now=now, interval=interval, scheduled_tick_time=scheduled_tick_time + ) + assert next_tick_time > now + _assert_tick_is_aligned(next_tick_time, now, scheduled_tick_time, interval) + + +def test_policy_skip_missed_and_resync_examples() -> None: + """Test the SkipMissedAndResync policy with minimal examples. + + This is just a sanity check to make sure we are not missing to test any important + properties with the hypothesis tests. + """ + policy = SkipMissedAndResync() + assert ( + policy.calculate_next_tick_time( + now=10_200_000, scheduled_tick_time=9_000_000, interval=1_000_000 + ) + == 11_000_000 + ) + assert ( + policy.calculate_next_tick_time( + now=10_000_000, scheduled_tick_time=9_000_000, interval=1_000_000 + ) + == 11_000_000 + ) + assert ( + policy.calculate_next_tick_time( + now=10_500_000, scheduled_tick_time=1_000_000, interval=1_000_000 + ) + == 11_000_000 + ) + + +@hypothesis.given( + tolerance=st.integers(min_value=_min_timedelta_microseconds, max_value=-1) +) +def test_policy_skip_missed_and_drift_invalid_tolerance(tolerance: int) -> None: + """Test the SkipMissedAndDrift policy raises an error for invalid tolerances.""" + with pytest.raises(ValueError, match="delay_tolerance must be positive"): + SkipMissedAndDrift(delay_tolerance=timedelta(microseconds=tolerance)) + + +@hypothesis.given( + tolerance=st.integers(min_value=0, max_value=_max_timedelta_microseconds), + **_calculate_next_tick_time_args, +) +def test_policy_skip_missed_and_drift( + tolerance: int, now: int, scheduled_tick_time: int, interval: int +) -> None: + """Test the SkipMissedAndDrift policy.""" + hypothesis.assume(now >= scheduled_tick_time) + + next_tick_time = SkipMissedAndDrift( + delay_tolerance=timedelta(microseconds=tolerance) + ).calculate_next_tick_time( + now=now, interval=interval, scheduled_tick_time=scheduled_tick_time + ) + if tolerance < interval: + assert next_tick_time > now + drift = now - scheduled_tick_time + if drift > tolerance: + assert next_tick_time == now + interval + else: + _assert_tick_is_aligned(next_tick_time, now, scheduled_tick_time, interval) + + +def test_policy_skip_missed_and_drift_examples() -> None: + """Test the SkipMissedAndDrift policy with minimal examples. + + This is just a sanity check to make sure we are not missing to test any important + properties with the hypothesis tests. + """ + tolerance = 100_000 + policy = SkipMissedAndDrift(delay_tolerance=timedelta(microseconds=tolerance)) + assert ( + policy.calculate_next_tick_time( + now=10_200_000, scheduled_tick_time=9_000_000, interval=1_000_000 ) - fail_count += 1 - assert fail_count < len(test_cases) + == 11_200_000 + ) + assert ( + policy.calculate_next_tick_time( + now=10_000_000, scheduled_tick_time=9_000_000, interval=1_000_000 + ) + == 11_000_000 + ) + assert ( + policy.calculate_next_tick_time( + now=10_500_000, scheduled_tick_time=1_000_000, interval=1_000_000 + ) + == 11_500_000 + ) + assert ( + policy.calculate_next_tick_time( + now=10_000_000 + tolerance, + scheduled_tick_time=10_000_000, + interval=1_000_000, + ) + == 11_000_000 + ) + + +async def test_timer_contruction_defaults() -> None: + """Test the construction of a periodic timer with default values.""" + timer = Timer(timedelta(seconds=1.0), TriggerAllMissed()) + assert timer.interval == timedelta(seconds=1.0) + assert isinstance(timer.missed_tick_policy, TriggerAllMissed) + assert timer.loop is asyncio.get_running_loop() + assert timer.is_running is True + + +def test_timer_contruction_no_async() -> None: + """Test the construction outside of async (using a custom loop).""" + loop = async_solipsism.EventLoop() + timer = Timer(timedelta(seconds=1.0), TriggerAllMissed(), loop=loop) + assert timer.interval == timedelta(seconds=1.0) + assert isinstance(timer.missed_tick_policy, TriggerAllMissed) + assert timer.loop is loop + assert timer.is_running is True + + +def test_timer_contruction_no_event_loop() -> None: + """Test the construction outside of async (without a custom loop) fails.""" + with pytest.raises(RuntimeError, match="no running event loop"): + Timer(timedelta(seconds=1.0), TriggerAllMissed()) + + +async def test_timer_contruction_auto_start() -> None: + """Test the construction of a periodic timer with auto_start=False.""" + policy = TriggerAllMissed() + timer = Timer( + timedelta(seconds=5.0), + policy, + auto_start=False, + loop=None, + ) + assert timer.interval == timedelta(seconds=5.0) + assert timer.missed_tick_policy is policy + assert timer.loop is asyncio.get_running_loop() + assert timer.is_running is False + + +async def test_timer_contruction_custom_args() -> None: + """Test the construction of a periodic timer with custom arguments.""" + policy = TriggerAllMissed() + timer = Timer( + timedelta(seconds=5.0), + policy, + auto_start=True, + loop=None, + ) + assert timer.interval == timedelta(seconds=5.0) + assert timer.missed_tick_policy is policy + assert timer.loop is asyncio.get_running_loop() + assert timer.is_running is True + + +async def test_timer_contruction_timeout_custom_args() -> None: + """Test the construction of a timeout timer with custom arguments.""" + timer = Timer.timeout( + timedelta(seconds=5.0), + auto_start=True, + loop=None, + ) + assert timer.interval == timedelta(seconds=5.0) + assert isinstance(timer.missed_tick_policy, SkipMissedAndDrift) + assert timer.missed_tick_policy.delay_tolerance == timedelta(0) + assert timer.loop is asyncio.get_running_loop() + assert timer.is_running is True + + +async def test_timer_contruction_periodic_defaults() -> None: + """Test the construction of a periodic timer.""" + timer = Timer.periodic(timedelta(seconds=5.0)) + assert timer.interval == timedelta(seconds=5.0) + assert isinstance(timer.missed_tick_policy, TriggerAllMissed) + assert timer.loop is asyncio.get_running_loop() + assert timer.is_running is True + + +async def test_timer_contruction_periodic_custom_args() -> None: + """Test the construction of a timeout timer with custom arguments.""" + timer = Timer.periodic( + timedelta(seconds=5.0), + skip_missed_ticks=True, + auto_start=True, + loop=None, + ) + assert timer.interval == timedelta(seconds=5.0) + assert isinstance(timer.missed_tick_policy, SkipMissedAndResync) + assert timer.loop is asyncio.get_running_loop() + assert timer.is_running is True -async def test_timer_reset() -> None: - """Ensure timer resets function as expected.""" - chan1 = Anycast[int]() +async def test_timer_autostart( + event_loop: async_solipsism.EventLoop, # pylint: disable=redefined-outer-name +) -> None: + """Test the autostart of a periodic timer.""" + timer = Timer(timedelta(seconds=1.0), TriggerAllMissed()) - count = 4 - reset_delta = 0.1 - timer_delta = 0.3 + # We sleep some time, less than the interval, and then receive from the + # timer, since it was automatically started at time 0, it should trigger at + # time 1.0 without any drift + await asyncio.sleep(0.5) + drift = await timer.receive() + assert drift == pytest.approx(timedelta(seconds=0.0)) + assert event_loop.time() == pytest.approx(1.0) - async def send(ch1: Sender[int]) -> None: - for ctr in range(count): - await asyncio.sleep(reset_delta) - await ch1.send(ctr) - timer = Timer(timer_delta) +class _StartMethod(enum.Enum): + RESET = enum.auto() + RECEIVE = enum.auto() + READY = enum.auto() + ASYNC_ITERATOR = enum.auto() - senders = asyncio.create_task(send(chan1.new_sender())) - select = Select(msg=chan1.new_receiver(), timer=timer) - start_ts = datetime.now(timezone.utc) - stop_ts: Optional[datetime] = None - while await select.ready(): - if select.msg: - timer.reset() - elif event_ts := select.timer: - stop_ts = event_ts.inner +@pytest.mark.parametrize("start_method", list(_StartMethod)) +async def test_timer_no_autostart( + start_method: _StartMethod, + event_loop: async_solipsism.EventLoop, # pylint: disable=redefined-outer-name +) -> None: + """Test a periodic timer when it is not automatically started.""" + timer = Timer( + timedelta(seconds=1.0), + TriggerAllMissed(), + auto_start=False, + ) + + # We sleep some time, less than the interval, and then start the timer and + # receive from it, since it wasn't automatically started, it should trigger + # shifted by the sleep time (without any drift) + await asyncio.sleep(0.5) + drift: timedelta | None = None + if start_method == _StartMethod.RESET: + timer.reset() + drift = await timer.receive() + elif start_method == _StartMethod.RECEIVE: + drift = await timer.receive() + elif start_method == _StartMethod.READY: + assert await timer.ready() is True + drift = timer.consume() + elif start_method == _StartMethod.ASYNC_ITERATOR: + async for _drift in timer: + drift = _drift break + else: + assert False, f"Unknown start method {start_method}" + + assert drift == pytest.approx(timedelta(seconds=0.0)) + assert event_loop.time() == pytest.approx(1.5) + + +async def test_timer_trigger_all_missed( + event_loop: async_solipsism.EventLoop, # pylint: disable=redefined-outer-name +) -> None: + """Test a timer using the TriggerAllMissed policy.""" + interval = 1.0 + timer = Timer(timedelta(seconds=interval), TriggerAllMissed()) + + # We let the first tick be triggered on time + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval) + assert drift == pytest.approx(timedelta(seconds=0.0)) + + # Now we let the time pass interval plus a bit more, so we should get + # a drift, but the next tick should be triggered still at a multiple of the + # interval because we are using TRIGGER_ALL. + await asyncio.sleep(interval + 0.1) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 2 + 0.1) + assert drift == pytest.approx(timedelta(seconds=0.1)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 3) + assert drift == pytest.approx(timedelta(seconds=0.0)) + + # Now we let the time pass by two times the interval, so we should get + # a drift of a whole interval and then next tick should be triggered + # immediately with no drift, because we are still at an interval boundary. + await asyncio.sleep(2 * interval) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 5) + assert drift == pytest.approx(timedelta(seconds=interval)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 5) + assert drift == pytest.approx(timedelta(seconds=0.0)) + + # Finally we let the time pass by 5 times the interval plus some extra + # delay (even when the tolerance should be irrelevant for this mode), + # so we should catch up for the 4 missed ticks (the timer should trigger + # immediately), with the drift of each trigger lowering. The last trigger + # should have no drift once it catches up. + extra_delay = 0.1 + await asyncio.sleep(5 * interval + extra_delay) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 10 + extra_delay) + assert drift == pytest.approx(timedelta(seconds=interval * 4 + extra_delay)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 10 + extra_delay) + assert drift == pytest.approx(timedelta(seconds=interval * 3 + extra_delay)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 10 + extra_delay) + assert drift == pytest.approx(timedelta(seconds=interval * 2 + extra_delay)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 10 + extra_delay) + assert drift == pytest.approx(timedelta(seconds=interval * 1 + extra_delay)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 10 + extra_delay) + assert drift == pytest.approx(timedelta(seconds=interval * 0 + extra_delay)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 11) + assert drift == pytest.approx(timedelta(seconds=0.0)) + + +async def test_timer_skip_missed_and_resync( + event_loop: async_solipsism.EventLoop, # pylint: disable=redefined-outer-name +) -> None: + """Test a timer using the SkipMissedAndResync policy.""" + interval = 1.0 + timer = Timer(timedelta(seconds=interval), SkipMissedAndResync()) + + # We let the first tick be triggered on time + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval) + assert drift == pytest.approx(timedelta(seconds=0.0)) + + # Now we let the time pass interval plus a bit more, so we should get + # a drift, but the next tick should be triggered still at a multiple of the + # interval because we are using TRIGGER_ALL. + await asyncio.sleep(interval + 0.1) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 2 + 0.1) + assert drift == pytest.approx(timedelta(seconds=0.1)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 3) + assert drift == pytest.approx(timedelta(seconds=0.0)) + + # Now we let the time pass by two times the interval, so we should get + # a drift of a whole interval and then next tick should an interval later, + # as the delayed tick will be skipped and the timer will resync. + await asyncio.sleep(2 * interval) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 5) + assert drift == pytest.approx(timedelta(seconds=interval)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 6) + assert drift == pytest.approx(timedelta(seconds=0.0)) + + # Finally we let the time pass by 5 times the interval plus some extra + # delay. The timer should fire immediately once with a drift of 4 intervals + # plus the extra delay, and then it should resync and fire again with no + # drift, skipping the missed ticks. + extra_delay = 0.8 + await asyncio.sleep(5 * interval + extra_delay) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 11 + extra_delay) + assert drift == pytest.approx(timedelta(seconds=interval * 4 + extra_delay)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 12) + assert drift == pytest.approx(timedelta(seconds=0.0)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 13) + assert drift == pytest.approx(timedelta(seconds=0.0)) + + +async def test_timer_skip_missed_and_drift( + event_loop: async_solipsism.EventLoop, # pylint: disable=redefined-outer-name +) -> None: + """Test a timer using the SkipMissedAndDrift policy.""" + interval = 1.0 + tolerance = 0.1 + timer = Timer( + timedelta(seconds=interval), + SkipMissedAndDrift(delay_tolerance=timedelta(seconds=tolerance)), + ) + + # We let the first tick be triggered on time + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval) + assert drift == timedelta(seconds=0.0) + + # Now we let the time pass by the interval plus the tolerance so the drift + # should be the tolerance and the next tick should be triggered still at + # a multiple of the interval. + await asyncio.sleep(interval + tolerance) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 2 + tolerance) + assert drift == pytest.approx(timedelta(seconds=tolerance)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 3) + assert drift == pytest.approx(timedelta(seconds=0.0)) + + # Now we let the time pass the interval plus two times the tolerance. Now + # the timer should start to drift, and the next tick should be triggered at + # a multiple of the interval plus the shift of two times the tolerance. + await asyncio.sleep(interval + tolerance * 2) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 4 + tolerance * 2) + assert drift == pytest.approx(timedelta(seconds=tolerance * 2)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 5 + tolerance * 2) + assert drift == pytest.approx(timedelta(seconds=0.0)) - assert stop_ts is not None - actual_duration = (stop_ts - start_ts).total_seconds() - expected_duration = reset_delta * count + timer_delta - tolerance = expected_duration * 0.1 - assert actual_duration >= expected_duration - assert actual_duration <= expected_duration + tolerance + # Now we let the time pass by two times the interval, so we should missed + # one tick (the tick at time = 6 + tolerance * 2) and the next tick should + # still be triggered at a multiple of the interval plus the shift of two + # times the tolerance because the current trigger time was still aligned + # to the shifted interval. + await asyncio.sleep(2 * interval) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 7 + tolerance * 2) + assert drift == pytest.approx(timedelta(seconds=interval)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 8 + tolerance * 2) + assert drift == pytest.approx(timedelta(seconds=0.0)) - await senders + # Finally we let the time pass by 5 times the interval plus a tiny bit more + # than the tolerance, so we should missed 4 ticks (the ticks at times 9+, + # 10+, 11+ and 12+). The current trigger is also delayed more than the + # tolerance, so the next tick should accumulate the drift again. + await asyncio.sleep(5 * interval + tolerance + 0.001) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 13 + tolerance * 3 + 0.001) + assert drift == pytest.approx(timedelta(seconds=interval * 4 + tolerance + 0.001)) + drift = await timer.receive() + assert event_loop.time() == pytest.approx(interval * 14 + tolerance * 3 + 0.001) + assert drift == pytest.approx(timedelta(seconds=0.0))