8000 Report the type of change observed in FileWatcher by daniel-zullo-frequenz · Pull Request #87 · frequenz-floss/frequenz-channels-python · GitHub
[go: up one dir, main page]

Skip to content

Report the type of change observed in FileWatcher #87

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This release adds support to pass `None` values via channels and revamps the `Ti

* `util.Timer` was replaced by a more generic implementation that allows for customizable policies to handle missed ticks.

If you were using `Timer` to implement timeouts, these two pices of code should be almost equivalent:
If you were using `Timer` to implement timeouts, these two pieces of code should be almost equivalent:

- Old:

Expand All @@ -27,7 +27,7 @@ This release adds support to pass `None` values via channels and revamps the `Ti

They are not **exactly** the same because the `triggered_datetime` in the second case will not be exactly when the timer had triggered, but that shouldn't be relevant, the important part is when your code can actually react to the timer trigger and to know how much drift there was to be able to take corrective actions.

Also the new `Timer` uses the `asyncio` loop monotonic clock and the old one used the wall clock (`datetime.now()`) to track time. This means that when using `async-solipsism` to test, the new `Timer` will always trigger immediately regarless of the state of the wall clock. This also means that we don't need to mock the wall clock with `time-machine` either now.
Also the new `Timer` uses the `asyncio` loop monotonic clock and the old one used the wall clock (`datetime.now()`) to track time. This means that when using `async-solipsism` to test, the new `Timer` will always trigger immediately regardless of the state of the wall clock. This also means that we don't need to mock the wall clock with `time-machine` either now.

With the previous timer one needed to create a separate task to run the timer, because otherwise it would block as it loops until the wall clock was at a specific time. Now the code will run like this:

Expand All @@ -39,7 +39,7 @@ This release adds support to pass `None` values via channels and revamps the `Ti

# Simulates a delay in the timer trigger time
asyncio.sleep(1.5) # Advances the loop monotonic clock by 1.5 seconds immediately
await drift = timer.receive() # The timer should have triggerd 0.5 seconds ago, so it doesn't even sleep
await drift = timer.receive() # The timer should have triggered 0.5 seconds ago, so it doesn't even sleep
assert drift == approx(timedelta(seconds=0.5)) # Now we should observe a drift of 0.5 seconds
```

Expand All @@ -53,12 +53,19 @@ This release adds support to pass `None` values via channels and revamps the `Ti

Therefore, you should now check a file receiving an event really exist before trying to operate on it.

* `FileWatcher` reports the type of event observed in addition to the file path.

Previously, only the file path was reported. With this update, users can now determine if a file change is a creation, modification, or deletion.
Note that this change may require updates to your existing code that relies on `FileWatcher` as the new interface returns a `FileWatcher.Event` instead of just the file path.

## New Features

* `util.Timer` was replaced by a more generic implementation that allows for customizable policies to handle missed ticks.

* Passing `None` values via channels is now supported.

* `FileWatcher.Event` was added to notify events when a file is created, modified, or deleted.

## Bug Fixes

* `util.Select` / `util.Merge` / `util.MergeNamed`: Cancel pending tasks in `__del__` methods only if possible (the loop is not already closed).
Expand Down
< 8000 div class="file-info flex-auto min-width-0 mb-md-0 mb-2">
38 changes: 25 additions & 13 deletions src/frequenz/channels/util/_file_watcher.py
8000
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""A Channel receiver for watching for new (or modified) files."""

from __future__ import annotations

import asyncio
import pathlib
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional, Set, Union

from watchfiles import Change, awatch
from watchfiles.main import FileChange
Expand All @@ -14,7 +17,7 @@
from .._exceptions import ReceiverStoppedError


class FileWatcher(Receiver[pathlib.Path]):
class FileWatcher(Receiver["FileWatcher.Event"]):
"""A channel receiver that watches for file events."""

class EventType(Enum):
Expand All @@ -24,10 +27,19 @@ class EventType(Enum):
MODIFY = Change.modified
DELETE = Change.deleted

@dataclass(frozen=True)
class Event:
"""A file change event."""

type: FileWatcher.EventType
"""The type of change that was observed."""
path: pathlib.Path
"""The path where the change was observed."""

def __init__(
self,
paths: List[Union[pathlib.Path, str]],
event_types: Optional[Set[EventType]] = None,
paths: list[pathlib.Path | str],
event_types: set[EventType] | None = None,
) -> None:
"""Create a `FileWatcher` instance.

Expand All @@ -48,8 +60,8 @@ def __init__(
self._awatch = awatch(
*self._paths, stop_event=self._stop_event, watch_filter=self._filter_events
)
self._awatch_stopped_exc: Optional[Exception] = None
self._changes: Set[FileChange] = set()
self._awatch_stopped_exc: Exception | None = None
self._changes: set[FileChange] = set()

def _filter_events(
self,
Expand Down Expand Up @@ -102,11 +114,11 @@ async def ready(self) -> bool:

return True

def consume(self) -> pathlib.Path:
"""Return the latest change once `ready` is complete.
def consume(self) -> Event:
"""Return the latest event once `ready` is complete.

Returns:
The next change that was received.
The next event that was received.

Raises:
ReceiverStoppedError: if there is some problem with the receiver.
Expand All @@ -115,8 +127,8 @@ def consume(self) -> pathlib.Path:
raise ReceiverStoppedError(self) from self._awatch_stopped_exc

assert self._changes, "`consume()` must be preceeded by a call to `ready()`"
change = self._changes.pop()
# Tuple of (Change, path) returned by watchfiles
_, path_str = change
path = pathlib.Path(path_str)
return path
change, path_str = self._changes.pop()
return FileWatcher.Event(
type=FileWatcher.EventType(change), path=pathlib.Path(path_str)
)
4 changes: 3 additions & 1 deletion tests/utils/test_file_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ async def test_file_watcher_receive_updates(

for change in changes:
recv_changes = await file_watcher.receive()
assert recv_changes == pathlib.Path(change[1])
event_type = FileWatcher.EventType(change[0])
path = pathlib.Path(change[1])
assert recv_changes == FileWatcher.Event(type=event_type, path=path)


@hypothesis.given(event_types=st.sets(st.sampled_from(FileWatcher.EventType)))
Expand Down
7 changes: 6 additions & 1 deletion tests/utils/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ async def test_file_watcher(tmp_path: pathlib.Path) -> None:
if msg := select.timer:
filename.write_text(f"{msg.inner}")
elif msg := select.file_watcher:
assert msg.inner == filename
event_type = (
FileWatcher.EventType.CREATE
if number_of_writes == 0
else FileWatcher.EventType.MODIFY
)
assert msg.inner == FileWatcher.Event(type=event_type, path=filename)
number_of_writes += 1
# After receiving a write 3 times, unsubscribe from the writes channel
if number_of_writes == expected_number_of_writes:
Expand Down
0