|
42 | 42 |
|
43 | 43 | import logging
|
44 | 44 | from abc import ABC, abstractmethod
|
| 45 | +from collections.abc import Callable |
| 46 | +from functools import cache |
45 | 47 | from typing import (
|
46 | 48 | TYPE_CHECKING,
|
47 | 49 | Final,
|
48 | 50 | TypeVar,
|
| 51 | + get_type_hints, |
49 | 52 | )
|
50 | 53 |
|
51 | 54 | from .exceptions import KasaException
|
|
64 | 67 | ModuleT = TypeVar("ModuleT", bound="Module")
|
65 | 68 |
|
66 | 69 |
|
| 70 | +class FeatureAttribute: |
| 71 | + """Class for annotating attributes bound to feature.""" |
| 72 | + |
| 73 | + |
67 | 74 | class Module(ABC):
|
68 | 75 | """Base class implemention for all modules.
|
69 | 76 |
|
@@ -140,6 +147,14 @@ def __init__(self, device: Device, module: str) -> None:
|
140 | 147 | self._module = module
|
141 | 148 | self._module_features: dict[str, Feature] = {}
|
142 | 149 |
|
| 150 | + def has_feature(self, attribute: str | property | Callable) -> bool: |
| 151 | + """Return True if the module attribute feature is supported.""" |
| 152 | + return bool(self.get_feature(attribute)) |
| 153 | + |
| 154 | + def get_feature(self, attribute: str | property | Callable) -> Feature | None: |
| 155 | + """Get Feature for a module attribute or None if not supported.""" |
| 156 | + return _get_bound_feature(self, attribute) |
| 157 | + |
143 | 158 | @abstractmethod
|
144 | 159 | def query(self) -> dict:
|
145 | 160 | """Query to execute during the update cycle.
|
@@ -183,3 +198,60 @@ def __repr__(self) -> str:
|
183 | 198 | f"<Module {self.__class__.__name__} ({self._module})"
|
184 | 199 | f" for {self._device.host}>"
|
185 | 200 | )
|
| 201 | + |
| 202 | + |
| 203 | +def _is_bound_feature(attribute: property | Callable) -> bool: |
| 204 | + """Check if an attribute is bound to a feature with FeatureAttribute.""" |
| 205 | + if isinstance(attribute, property): |
| 206 | + hints = get_type_hints(attribute.fget, include_extras=True) |
| 207 | + else: |
| 208 | + hints = get_type_hints(attribute, include_extras=True) |
| 209 | + |
| 210 | + if (return_hints := hints.get("return")) and hasattr(return_hints, "__metadata__"): |
| 211 | + metadata = hints["return"].__metadata__ |
| 212 | + for meta in metadata: |
| 213 | + if isinstance(meta, FeatureAttribute): |
| 214 | + return True |
| 215 | + |
| 216 | + return False |
| 217 | + |
| 218 | + |
| 219 | +@cache |
| 220 | +def _get_bound_feature( |
| 221 | + module: Module, attribute: str | property | Callable |
| 222 | +) -> Feature | None: |
| 223 | + """Get Feature for a bound property or None if not supported.""" |
| 224 | + if not isinstance(attribute, str): |
| 225 | + if isinstance(attribute, property): |
| 226 | + # Properties have __name__ in 3.13 so this could be simplified |
| 227 | + # when only 3.13 supported |
| 228 | + attribute_name = attribute.fget.__name__ # type: ignore[union-attr] |
| 229 | + else: |
| 230 | + attribute_name = attribute.__name__ |
| 231 | + attribute_callable = attribute |
| 232 | + else: |
| 233 | + if TYPE_CHECKING: |
| 234 | + assert isinstance(attribute, str) |
| 235 | + attribute_name = attribute |
| 236 | + attribute_callable = getattr(module.__class__, attribute, None) # type: ignore[assignment] |
| 237 | + if not attribute_callable: |
| 238 | + raise KasaException( |
| 239 | + f"No attribute named {attribute_name} in " |
| 240 | + f"module {module.__class__.__name__}" |
| 241 | + ) |
| 242 | + |
| 243 | + if not _is_bound_feature(attribute_callable): |
| 244 | + raise KasaException( |
| 245 | + f"Attribute {attribute_name} of module {module.__class__.__name__}" |
| 246 | + " is not bound to a feature" |
| 247 | + ) |
| 248 | + |
| 249 | + check = {attribute_name, attribute_callable} |
| 250 | + for feature in module._module_features.values(): |
| 251 | + if (getter := feature.attribute_getter) and getter in check: |
| 252 | + return feature |
| 253 | + |
| 254 | + if (setter := feature.attribute_setter) and setter in check: |
| 255 | + return feature |
| 256 | + |
| 257 | + return None |
0 commit comments