From 2b85782a202270e96a438a4f8fcd7368a4e2158e Mon Sep 17 00:00:00 2001 From: elpekenin Date: Tue, 20 Aug 2024 23:24:29 +0200 Subject: [PATCH 1/4] (WIP) (broken) initial changes --- .pre-commit-config.yaml | 7 +- adafruit_ble/__init__.py | 41 +++++--- adafruit_ble/advertising/__init__.py | 57 ++++++----- adafruit_ble/advertising/adafruit.py | 1 + adafruit_ble/advertising/standard.py | 34 +++++-- adafruit_ble/characteristics/__init__.py | 94 ++++++++++++++----- adafruit_ble/characteristics/float.py | 31 ++++-- adafruit_ble/characteristics/int.py | 31 ++++-- adafruit_ble/characteristics/json.py | 11 ++- adafruit_ble/characteristics/stream.py | 13 ++- adafruit_ble/characteristics/string.py | 40 ++++++-- adafruit_ble/services/__init__.py | 10 +- adafruit_ble/services/circuitpython.py | 2 +- adafruit_ble/services/nordic.py | 8 +- adafruit_ble/services/sphero.py | 2 +- adafruit_ble/services/standard/__init__.py | 16 ++-- adafruit_ble/services/standard/device_info.py | 8 +- adafruit_ble/services/standard/hid.py | 37 ++++++-- adafruit_ble/uuid/__init__.py | 3 + docs/conf.py | 2 +- examples/ble_advertising_simpletest.py | 1 - examples/ble_bluefruit_color_picker.py | 3 +- examples/ble_bluefruit_connect_plotter.py | 6 +- examples/ble_color_proximity.py | 2 +- examples/ble_current_time_service.py | 1 + examples/ble_demo_central.py | 3 +- examples/ble_demo_periph.py | 2 +- examples/ble_detailed_scan.py | 1 - examples/ble_device_info_service.py | 1 + examples/ble_hid_periph.py | 3 +- examples/ble_json_central.py | 2 +- examples/ble_json_peripheral.py | 5 +- examples/ble_json_service.py | 4 +- examples/ble_packet_buffer_service.py | 4 +- examples/ble_packet_buffer_test.py | 1 - optional_requirements.txt | 3 + 36 files changed, 341 insertions(+), 149 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 70ade69..ba466bc 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,6 +7,11 @@ repos: rev: 23.3.0 hooks: - id: black + - repo: https://github.com/PyCQA/isort + rev: 5.13.2 + hooks: + - id: isort + args: ["--profile", "black", "--filter-files"] - repo: https://github.com/fsfe/reuse-tool rev: v1.1.2 hooks: @@ -18,7 +23,7 @@ repos: - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/pycqa/pylint - rev: v2.17.4 + rev: v3.0.4 hooks: - id: pylint name: pylint (library code) diff --git a/adafruit_ble/__init__.py b/adafruit_ble/__init__.py index 693a47f..16f85ec 100755 --- a/adafruit_ble/__init__.py +++ b/adafruit_ble/__init__.py @@ -12,9 +12,10 @@ from __future__ import annotations +import sys + # pylint: disable=wrong-import-position -import sys if sys.implementation.name == "circuitpython" and sys.implementation.version[0] <= 4: raise ImportError( @@ -24,17 +25,27 @@ import _bleio -from .services import Service from .advertising import Advertisement +from .services import Service try: - from typing import Iterator, NoReturn, Optional, Tuple, Type, TYPE_CHECKING, Union + from typing import ( + TYPE_CHECKING, + Dict, + Iterator, + NoReturn, + Optional, + Tuple, + Type, + Union, + ) + from typing_extensions import Literal if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer + from adafruit_ble.uuid import UUID - from adafruit_ble.characteristics import Characteristic except ImportError: pass @@ -55,9 +66,9 @@ class BLEConnection: def __init__(self, bleio_connection: _bleio.Connection) -> None: self._bleio_connection = bleio_connection # _bleio.Service objects representing services found during discovery. - self._discovered_bleio_services = {} + self._discovered_bleio_services: Dict[UUID, _bleio.Service] = {} # Service objects that wrap remote services. - self._constructed_services = {} + self._constructed_services: Dict[UUID, Service] = {} def _discover_remote(self, uuid: UUID) -> Optional[_bleio.Service]: remote_service = None @@ -98,7 +109,7 @@ def __getitem__(self, key: Union[UUID, Type[Service]]) -> Optional[Service]: uuid = key.uuid maybe_service = True - if uuid in self._constructed_services: + if isinstance(uuid) and uuid in self._constructed_services: return self._constructed_services[uuid] remote_service = self._discover_remote(uuid) @@ -166,7 +177,7 @@ def __init__(self, adapter: Optional[_bleio.Adapter] = None) -> None: raise RuntimeError("No adapter available") self._adapter = adapter or _bleio.adapter self._current_advertisement = None - self._connection_cache = {} + self._connection_cache: Dict[_bleio.Connection, BLEConnection] = {} def start_advertising( self, @@ -223,7 +234,7 @@ def stop_advertising(self) -> None: """Stops advertising.""" self._adapter.stop_advertising() - def start_scan( + def start_scan( # pylint: disable=too-many-arguments self, *advertisement_types: Type[Advertisement], buffer_size: int = 512, @@ -311,9 +322,15 @@ def connect( :return: the connection to the peer :rtype: BLEConnection """ - if not isinstance(peer, _bleio.Address): - peer = peer.address - connection = self._adapter.connect(peer, timeout=timeout) + if isinstance(peer, _bleio.Address): + peer_ = peer + else: + if peer.address is None: + msg = "Unreachable?" + raise RuntimeError(msg) + peer_ = peer.address + + connection = self._adapter.connect(peer_, timeout=timeout) self._clean_connection_cache() self._connection_cache[connection] = BLEConnection(connection) return self._connection_cache[connection] diff --git a/adafruit_ble/advertising/__init__.py b/adafruit_ble/advertising/__init__.py index 02324d3..6b80769 100644 --- a/adafruit_ble/advertising/__init__.py +++ b/adafruit_ble/advertising/__init__.py @@ -11,16 +11,29 @@ import struct try: - from typing import Dict, Any, Union, List, Optional, Type, TypeVar, TYPE_CHECKING + from typing import ( + TYPE_CHECKING, + Any, + Dict, + List, + Optional, + Tuple, + Type, + TypeVar, + Union, + ) + from typing_extensions import Literal if TYPE_CHECKING: - from _bleio import ScanEntry + from _bleio import Address, ScanEntry LazyObjectField_GivenClass = TypeVar( # pylint: disable=invalid-name "LazyObjectField_GivenClass" ) + DataDict = Dict[Any, bytes] + except ImportError: pass @@ -35,9 +48,7 @@ def to_bytes_literal(seq: bytes) -> str: return 'b"' + "".join("\\x{:02x}".format(v) for v in seq) + '"' -def decode_data( - data: bytes, *, key_encoding: str = "B" -) -> Dict[Any, Union[bytes, List[bytes]]]: +def decode_data(data: bytes, *, key_encoding: str = "B") -> DataDict: """Helper which decodes length encoded structures into a dictionary with the given key encoding.""" i = 0 @@ -49,20 +60,17 @@ def decode_data( if item_length == 0: break key = struct.unpack_from(key_encoding, data, i)[0] + if key not in data_dict: + data_dict[key] = b"" + value = data[i + key_size : i + item_length] - if key in data_dict: - if not isinstance(data_dict[key], list): - data_dict[key] = [data_dict[key]] - data_dict[key].append(value) - else: - data_dict[key] = value + data_dict[key] += value + i += item_length return data_dict -def compute_length( - data_dict: Dict[Any, Union[bytes, List[bytes]]], *, key_encoding: str = "B" -) -> int: +def compute_length(data_dict: DataDict, *, key_encoding: str = "B") -> int: """Computes the length of the encoded data dictionary.""" value_size = 0 for value in data_dict.values(): @@ -74,9 +82,7 @@ def compute_length( return len(data_dict) + len(data_dict) * struct.calcsize(key_encoding) + value_size -def encode_data( - data_dict: Dict[Any, Union[bytes, List[bytes]]], *, key_encoding: str = "B" -) -> bytes: +def encode_data(data_dict: DataDict, *, key_encoding: str = "B") -> bytes: """Helper which encodes dictionaries into length encoded structures with the given key encoding.""" length = compute_length(data_dict, key_encoding=key_encoding) @@ -237,7 +243,10 @@ class Advertisement: bytestring prefixes to match against the multiple data structures in the advertisement. """ - match_prefixes = () + address: Optional[Address] + _rssi: Optional[int] + + match_prefixes: Optional[Tuple[bytes, ...]] = () """For Advertisement, :py:attr:`~adafruit_ble.advertising.Advertisement.match_prefixes` will always return ``True``. Subclasses may override this value.""" # cached bytes of merged prefixes. @@ -293,15 +302,16 @@ def rssi(self) -> Optional[int]: return self._rssi @classmethod - def get_prefix_bytes(cls) -> Optional[bytes]: + def get_prefix_bytes(cls) -> bytes: """Return a merged version of match_prefixes as a single bytes object, with length headers. """ # Check for deprecated `prefix` class attribute. - cls._prefix_bytes = getattr(cls, "prefix", None) + prefix_bytes: Optional[bytes] = getattr(cls, "prefix", None) + # Do merge once and memoize it. - if cls._prefix_bytes is None: - cls._prefix_bytes = ( + cls._prefix_bytes = ( + ( b"" if cls.match_prefixes is None else b"".join( @@ -309,6 +319,9 @@ def get_prefix_bytes(cls) -> Optional[bytes]: for prefix in cls.match_prefixes ) ) + if prefix_bytes is None + else prefix_bytes + ) return cls._prefix_bytes diff --git a/adafruit_ble/advertising/adafruit.py b/adafruit_ble/advertising/adafruit.py index ad3285b..4f30c56 100755 --- a/adafruit_ble/advertising/adafruit.py +++ b/adafruit_ble/advertising/adafruit.py @@ -15,6 +15,7 @@ """ import struct + from micropython import const from . import Advertisement, LazyObjectField diff --git a/adafruit_ble/advertising/standard.py b/adafruit_ble/advertising/standard.py index 3bfc1a1..1cdeea2 100644 --- a/adafruit_ble/advertising/standard.py +++ b/adafruit_ble/advertising/standard.py @@ -11,25 +11,39 @@ """ +from __future__ import annotations + import struct from collections import OrderedDict, namedtuple +from ..uuid import StandardUUID, VendorUUID from . import ( Advertisement, AdvertisingDataField, - encode_data, + compute_length, decode_data, + encode_data, to_hex, - compute_length, ) -from ..uuid import StandardUUID, VendorUUID try: - from typing import Optional, List, Tuple, Union, Type, Iterator, Iterable, Any - from adafruit_ble.uuid import UUID - from adafruit_ble.services import Service + from typing import ( + Any, + Collection, + Iterable, + Iterator, + List, + Optional, + Tuple, + Type, + Union, + ) + from _bleio import ScanEntry + from adafruit_ble.services import Service + from adafruit_ble.uuid import UUID + UsesServicesAdvertisement = Union[ "ProvideServicesAdvertisement", "SolicitServicesAdvertisement" ] @@ -50,7 +64,7 @@ def __init__( advertisement: UsesServicesAdvertisement, *, standard_services: List[int], - vendor_services: List[int] + vendor_services: List[int], ) -> None: self._advertisement = advertisement self._standard_service_fields = standard_services @@ -243,7 +257,7 @@ def __init__( *, advertising_data_type: int = 0xFF, company_id: int, - key_encoding: str = "B" + key_encoding: str = "B", ) -> None: self._obj = obj self._company_id = company_id @@ -281,7 +295,7 @@ class ManufacturerDataField: """A single piece of data within the manufacturer specific data. The format can be repeated.""" def __init__( - self, key: int, value_format: str, field_names: Optional[Iterable[str]] = None + self, key: int, value_format: str, field_names: Optional[Collection[str]] = None ) -> None: self._key = key self._format = value_format @@ -302,7 +316,7 @@ def __init__( def __get__( self, obj: Optional[Advertisement], cls: Type[Advertisement] - ) -> Optional[Union["ManufacturerDataField", Tuple, namedtuple]]: + ) -> Optional[Union["ManufacturerDataField", Tuple, tuple]]: if obj is None: return self if self._key not in obj.manufacturer_data.data: diff --git a/adafruit_ble/characteristics/__init__.py b/adafruit_ble/characteristics/__init__.py index 42a5868..ce80042 100644 --- a/adafruit_ble/characteristics/__init__.py +++ b/adafruit_ble/characteristics/__init__.py @@ -11,17 +11,19 @@ from __future__ import annotations import struct + import _bleio from ..attributes import Attribute try: - from typing import Optional, Type, Union, Tuple, Iterable, TYPE_CHECKING + from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Type, Union, overload if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer - from adafruit_ble.uuid import UUID + from adafruit_ble.services import Service + from adafruit_ble.uuid import UUID except ImportError: pass @@ -32,7 +34,7 @@ class Characteristic: """ - Top level Characteristic class that does basic binding. + Base Characteristic class that does basic binding. :param UUID uuid: The uuid of the characteristic :param int properties: The properties of the characteristic, @@ -85,7 +87,10 @@ class Characteristic: WRITE = _bleio.Characteristic.WRITE WRITE_NO_RESPONSE = _bleio.Characteristic.WRITE_NO_RESPONSE - def __init__( + field_name: str + value: ReadableBuffer + + def __init__( # pylint: disable=too-many-arguments self, *, uuid: Optional[UUID] = None, @@ -96,7 +101,7 @@ def __init__( fixed_length: bool = False, initial_value: Optional[ReadableBuffer] = None, ) -> None: - self.field_name = None # Set by Service during basic binding + # field_name is set by Service during basic binding if uuid: self.uuid = uuid @@ -126,18 +131,18 @@ def _ensure_bound( service.bleio_characteristics[self.field_name] = bleio_characteristic def __bind_locally( - self, service: Service, initial_value: Optional[bytes] + self, service: Service, initial_value: Optional[ReadableBuffer] ) -> _bleio.Characteristic: - if initial_value is None: - initial_value = self.initial_value - if initial_value is None and self.max_length: - initial_value = bytes(self.max_length) + value = initial_value if initial_value is not None else self.initial_value + max_length = self.max_length - if max_length is None and initial_value is None: - max_length = 0 - initial_value = b"" - elif max_length is None: - max_length = len(initial_value) + if value is None: + if max_length is None: + max_length = 0 + value = b"" + else: + max_length = len(value) + return _bleio.Characteristic.add_to_service( service.bleio_service, self.uuid.bleio_uuid, @@ -149,9 +154,23 @@ def __bind_locally( write_perm=self.write_perm, ) + if TYPE_CHECKING: + + @overload + def __get__( + self, service: None, cls: Optional[Type[Service]] = None + ) -> Characteristic: + ... + + @overload + def __get__( + self, service: Service, cls: Optional[Type[Service]] = None + ) -> ReadableBuffer: + ... + def __get__( self, service: Optional[Service], cls: Optional[Type[Service]] = None - ) -> ReadableBuffer: + ) -> Union[Characteristic, ReadableBuffer]: # CircuitPython doesn't invoke descriptor protocol on obj's class, # but CPython does. In the CPython case, pretend that it doesn't. if service is None: @@ -175,7 +194,9 @@ class ComplexCharacteristic: has been bound to the corresponding instance attribute. """ - def __init__( + field_name: str + + def __init__( # pylint: disable=too-many-arguments self, *, uuid: Optional[UUID] = None, @@ -186,7 +207,7 @@ def __init__( fixed_length: bool = False, initial_value: Optional[ReadableBuffer] = None, ) -> None: - self.field_name = None # Set by Service during basic binding + # field_name is set by Service during basic binding if uuid: self.uuid = uuid @@ -214,9 +235,23 @@ def bind(self, service: Service) -> _bleio.Characteristic: write_perm=self.write_perm, ) + if TYPE_CHECKING: + + @overload + def __get__( + self, service: None, cls: Optional[Type[Service]] = None + ) -> ComplexCharacteristic: + ... + + @overload + def __get__( + self, service: Service, cls: Optional[Type[Service]] = None + ) -> _bleio.Characteristic: + ... + def __get__( self, service: Optional[Service], cls: Optional[Type[Service]] = None - ) -> _bleio.Characteristic: + ) -> Union[ComplexCharacteristic, _bleio.Characteristic]: if service is None: return self bound_object = self.bind(service) @@ -237,7 +272,7 @@ class StructCharacteristic(Characteristic): :param buf initial_value: see `Characteristic` """ - def __init__( + def __init__( # pylint: disable=too-many-arguments self, struct_format, *, @@ -261,9 +296,26 @@ def __init__( write_perm=write_perm, ) + if TYPE_CHECKING: + + @overload + def __get__( + self, obj: None, cls: Optional[Type[Service]] = None + ) -> Characteristic: + # NOTE(elpekenin): we actually return StructCharacteristic, but we hint like + # this so that we dont change parent's function signature, causing a mypy warn + # regardless, this is not wrong, just incomplete, because this is a subclass + ... + + @overload + def __get__( + self, obj: Service, cls: Optional[Type[Service]] = None + ) -> Optional[Tuple[int, ...]]: + ... + def __get__( self, obj: Optional[Service], cls: Optional[Type[Service]] = None - ) -> Optional[Union[Tuple, "StructCharacteristic"]]: + ) -> Union[Characteristic, Optional[Tuple[int, ...]]]: if obj is None: return self raw_data = super().__get__(obj, cls) diff --git a/adafruit_ble/characteristics/float.py b/adafruit_ble/characteristics/float.py index 69915eb..d1d96b0 100644 --- a/adafruit_ble/characteristics/float.py +++ b/adafruit_ble/characteristics/float.py @@ -12,16 +12,17 @@ from __future__ import annotations -from . import Attribute -from . import StructCharacteristic +from . import Attribute, StructCharacteristic try: - from typing import Optional, Type, Union, TYPE_CHECKING + from typing import TYPE_CHECKING, Optional, Type, Union, overload if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer - from adafruit_ble.uuid import UUID + + from adafruit_ble.characteristics import Characteristic from adafruit_ble.services import Service + from adafruit_ble.uuid import UUID except ImportError: pass @@ -33,7 +34,7 @@ class FloatCharacteristic(StructCharacteristic): """32-bit float""" - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, uuid: Optional[UUID] = None, @@ -53,12 +54,28 @@ def __init__( initial_value=initial_value, ) + if TYPE_CHECKING: + + @overload + def __get__( + self, obj: None, cls: Optional[Type[Service]] = None + ) -> Characteristic: + ... + + @overload + def __get__(self, obj: Service, cls: Optional[Type[Service]] = None) -> float: + ... + def __get__( self, obj: Optional[Service], cls: Optional[Type[Service]] = None - ) -> Union[float, "FloatCharacteristic"]: + ) -> Union[Characteristic, float]: if obj is None: return self - return super().__get__(obj)[0] + get = super().__get__(obj) + if get is None: + msg = "Unreachable?" + raise RuntimeError(msg) + return get[0] # pylint: disable=unsubscriptable-object def __set__(self, obj: Service, value: float) -> None: super().__set__(obj, (value,)) diff --git a/adafruit_ble/characteristics/int.py b/adafruit_ble/characteristics/int.py index 52dbd80..03b9d13 100755 --- a/adafruit_ble/characteristics/int.py +++ b/adafruit_ble/characteristics/int.py @@ -12,16 +12,17 @@ from __future__ import annotations -from . import Attribute -from . import StructCharacteristic +from . import Attribute, StructCharacteristic try: - from typing import Optional, Type, Union, TYPE_CHECKING + from typing import TYPE_CHECKING, Optional, Type, Union, overload if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer - from adafruit_ble.uuid import UUID + + from adafruit_ble.characteristics import Characteristic from adafruit_ble.services import Service + from adafruit_ble.uuid import UUID except ImportError: pass @@ -33,7 +34,7 @@ class IntCharacteristic(StructCharacteristic): """Superclass for different kinds of integer fields.""" - def __init__( + def __init__( # pylint: disable=too-many-arguments self, format_string: str, min_value: int, @@ -61,12 +62,28 @@ def __init__( initial_value=initial_value, ) + if TYPE_CHECKING: + + @overload + def __get__( + self, obj: None, cls: Optional[Type[Service]] = None + ) -> Characteristic: + ... + + @overload + def __get__(self, obj: Service, cls: Optional[Type[Service]] = None) -> int: + ... + def __get__( self, obj: Optional[Service], cls: Optional[Type[Service]] = None - ) -> Union[int, "IntCharacteristic"]: + ) -> Union[Characteristic, int]: if obj is None: return self - return super().__get__(obj)[0] + get = super().__get__(obj) + if get is None: + msg = "Unreachable?" + raise RuntimeError(msg) + return get[0] # pylint: disable=unsubscriptable-object def __set__(self, obj: Service, value: int) -> None: if not self._min_value <= value <= self._max_value: diff --git a/adafruit_ble/characteristics/json.py b/adafruit_ble/characteristics/json.py index 2622cfd..03cf5b1 100644 --- a/adafruit_ble/characteristics/json.py +++ b/adafruit_ble/characteristics/json.py @@ -13,16 +13,17 @@ from __future__ import annotations import json -from . import Attribute -from . import Characteristic + +from . import Attribute, Characteristic try: - from typing import Optional, Any, Type, TYPE_CHECKING + from typing import TYPE_CHECKING, Any, Optional, Type if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer - from adafruit_ble.uuid import UUID + from adafruit_ble.services import Service + from adafruit_ble.uuid import UUID except ImportError: pass @@ -34,7 +35,7 @@ class JSONCharacteristic(Characteristic): """JSON string characteristic for JSON serializable values of a limited size (max_length).""" - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, uuid: Optional[UUID] = None, diff --git a/adafruit_ble/characteristics/stream.py b/adafruit_ble/characteristics/stream.py index 5053971..e25d5e9 100755 --- a/adafruit_ble/characteristics/stream.py +++ b/adafruit_ble/characteristics/stream.py @@ -15,18 +15,17 @@ import _bleio -from . import Attribute -from . import Characteristic -from . import ComplexCharacteristic +from . import Attribute, Characteristic, ComplexCharacteristic try: - from typing import Optional, Union, TYPE_CHECKING + from typing import TYPE_CHECKING, Optional, Union if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer + from adafruit_ble.characteristics import Characteristic - from adafruit_ble.uuid import UUID from adafruit_ble.services import Service + from adafruit_ble.uuid import UUID except ImportError: pass @@ -53,7 +52,7 @@ def write(self, buf: ReadableBuffer) -> None: class StreamOut(ComplexCharacteristic): """Output stream from the Service server.""" - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, uuid: Optional[UUID] = None, @@ -88,7 +87,7 @@ def bind( class StreamIn(ComplexCharacteristic): """Input stream into the Service server.""" - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, uuid: Optional[UUID] = None, diff --git a/adafruit_ble/characteristics/string.py b/adafruit_ble/characteristics/string.py index 7326bc7..ac224a0 100755 --- a/adafruit_ble/characteristics/string.py +++ b/adafruit_ble/characteristics/string.py @@ -12,16 +12,16 @@ from __future__ import annotations -from . import Attribute -from . import Characteristic +from . import Attribute, Characteristic try: - from typing import Optional, Type, Union, TYPE_CHECKING + from typing import TYPE_CHECKING, Optional, Type, Union, overload if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer - from adafruit_ble.uuid import UUID + from adafruit_ble.services import Service + from adafruit_ble.uuid import UUID except ImportError: pass @@ -33,7 +33,7 @@ class StringCharacteristic(Characteristic): """UTF-8 Encoded string characteristic.""" - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, uuid: Optional[UUID] = None, @@ -52,9 +52,21 @@ def __init__( initial_value=initial_value, ) + if TYPE_CHECKING: + + @overload + def __get__( + self, obj: None, cls: Optional[Type[Service]] = None + ) -> Characteristic: + ... + + @overload + def __get__(self, obj: Service, cls: Optional[Type[Service]] = None) -> str: + ... + def __get__( self, obj: Optional[Service], cls: Optional[Type[Service]] = None - ) -> Union[str, "StringCharacteristic"]: + ) -> Union[Characteristic, str]: if obj is None: return self return str(super().__get__(obj, cls), "utf-8") @@ -77,9 +89,21 @@ def __init__( fixed_length=True, ) + if TYPE_CHECKING: + + @overload + def __get__( + self, obj: None, cls: Optional[Type[Service]] = None + ) -> Characteristic: + ... + + @overload + def __get__(self, obj: Service, cls: Optional[Type[Service]] = None) -> str: + ... + def __get__( - self, obj: Service, cls: Optional[Type[Service]] = None - ) -> Union[str, "FixedStringCharacteristic"]: + self, obj: Optional[Service], cls: Optional[Type[Service]] = None + ) -> Union[Characteristic, str]: if obj is None: return self return str(super().__get__(obj, cls), "utf-8") diff --git a/adafruit_ble/services/__init__.py b/adafruit_ble/services/__init__.py index 2fd0789..42e1bce 100644 --- a/adafruit_ble/services/__init__.py +++ b/adafruit_ble/services/__init__.py @@ -15,10 +15,13 @@ from ..characteristics import Characteristic, ComplexCharacteristic try: - from typing import Optional + from typing import TYPE_CHECKING, Dict, Optional except ImportError: pass +if TYPE_CHECKING: + from adafruit_ble.uuid import UUID + __version__ = "0.0.0+auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git" @@ -36,6 +39,8 @@ class Service: instance for the connection's peer. """ + uuid: UUID + def __init__( self, *, @@ -44,6 +49,7 @@ def __init__( **initial_values, ) -> None: if service is None: + # NOTE(elpekenin): when is self.uuid set? __new__? # pylint: disable=no-member self.bleio_service = _bleio.Service( self.uuid.bleio_uuid, secondary=secondary @@ -56,7 +62,7 @@ def __init__( # This internal dictionary is manipulated by the Characteristic descriptors to store their # per-Service state. It is NOT managed by the Service itself. It is an attribute of the # Service so that the lifetime of the objects is the same as the Service. - self.bleio_characteristics = {} + self.bleio_characteristics: Dict[str, _bleio.Characteristic] = {} # Set the field name on all of the characteristic objects so they can replace themselves if # they choose. diff --git a/adafruit_ble/services/circuitpython.py b/adafruit_ble/services/circuitpython.py index 3d58b88..0eb14e1 100755 --- a/adafruit_ble/services/circuitpython.py +++ b/adafruit_ble/services/circuitpython.py @@ -10,11 +10,11 @@ """ -from . import Service from ..characteristics import Characteristic from ..characteristics.stream import StreamOut from ..characteristics.string import StringCharacteristic from ..uuid import VendorUUID +from . import Service __version__ = "0.0.0+auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git" diff --git a/adafruit_ble/services/nordic.py b/adafruit_ble/services/nordic.py index d4db39b..ae1ca6d 100755 --- a/adafruit_ble/services/nordic.py +++ b/adafruit_ble/services/nordic.py @@ -13,16 +13,16 @@ from __future__ import annotations -from . import Service +from ..characteristics.stream import StreamIn, StreamOut from ..uuid import VendorUUID -from ..characteristics.stream import StreamOut, StreamIn +from . import Service try: - from typing import Optional, TYPE_CHECKING + from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: - from circuitpython_typing import WriteableBuffer, ReadableBuffer import _bleio + from circuitpython_typing import ReadableBuffer, WriteableBuffer except ImportError: pass diff --git a/adafruit_ble/services/sphero.py b/adafruit_ble/services/sphero.py index 2c34d6f..0fa3337 100644 --- a/adafruit_ble/services/sphero.py +++ b/adafruit_ble/services/sphero.py @@ -10,8 +10,8 @@ """ -from . import Service from ..uuid import VendorUUID +from . import Service __version__ = "0.0.0+auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git" diff --git a/adafruit_ble/services/standard/__init__.py b/adafruit_ble/services/standard/__init__.py index 91fe0bb..8f445fe 100644 --- a/adafruit_ble/services/standard/__init__.py +++ b/adafruit_ble/services/standard/__init__.py @@ -12,12 +12,11 @@ import time -from .. import Service -from ...uuid import StandardUUID -from ...characteristics import Characteristic -from ...characteristics.string import StringCharacteristic -from ...characteristics import StructCharacteristic +from ...characteristics import Characteristic, StructCharacteristic from ...characteristics.int import Uint8Characteristic +from ...characteristics.string import StringCharacteristic +from ...uuid import StandardUUID +from .. import Service __version__ = "0.0.0+auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git" @@ -78,7 +77,12 @@ def struct_time(self) -> time.struct_time: """The current time as a `time.struct_time`. Day of year and whether DST is in effect are always -1. """ - year, month, day, hour, minute, second, weekday, _, _ = self.current_time + current_time = self.current_time + if current_time is None: + msg = "Could not get current time." + raise RuntimeError(msg) + + year, month, day, hour, minute, second, weekday, _, _ = current_time # Bluetooth weekdays count from 1. struct_time counts from 0. return time.struct_time( (year, month, day, hour, minute, second, weekday - 1, -1, -1) diff --git a/adafruit_ble/services/standard/device_info.py b/adafruit_ble/services/standard/device_info.py index 0ebd8ac..2f30008 100644 --- a/adafruit_ble/services/standard/device_info.py +++ b/adafruit_ble/services/standard/device_info.py @@ -15,12 +15,12 @@ import os import sys -from .. import Service -from ...uuid import StandardUUID from ...characteristics.string import FixedStringCharacteristic +from ...uuid import StandardUUID +from .. import Service try: - from typing import Optional, TYPE_CHECKING + from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: import _bleio @@ -43,7 +43,7 @@ class DeviceInfoService(Service): software_revision = FixedStringCharacteristic(uuid=StandardUUID(0x2A28)) manufacturer = FixedStringCharacteristic(uuid=StandardUUID(0x2A29)) - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, manufacturer: Optional[str] = None, diff --git a/adafruit_ble/services/standard/hid.py b/adafruit_ble/services/standard/hid.py index 6409f21..1c6251d 100755 --- a/adafruit_ble/services/standard/hid.py +++ b/adafruit_ble/services/standard/hid.py @@ -16,18 +16,35 @@ import struct -from micropython import const import _bleio +from micropython import const -from adafruit_ble.characteristics import Attribute -from adafruit_ble.characteristics import Characteristic +from adafruit_ble.characteristics import Attribute, Characteristic from adafruit_ble.characteristics.int import Uint8Characteristic from adafruit_ble.uuid import StandardUUID from .. import Service try: - from typing import Dict, Optional + from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Union + + if TYPE_CHECKING: + from typing import TypedDict + + from circuitpython_typing import ReadableBuffer + + class Collection(TypedDict, total=False): + """Stub for type checker.""" + + # NOTE(elpekenin): total=False means not all keys are required + # however i'm pretty sure type,locals,globals are always present + # may do that with inheritance, but doesnt seem worth + tag: Union[Literal["input"], Literal["output"]] + type: ReadableBuffer + locals: List[ReadableBuffer] + globals: List[ReadableBuffer] + mains: List[Collection] + except ImportError: pass @@ -173,7 +190,7 @@ class ReportIn: uuid = StandardUUID(_REPORT_UUID_NUM) - def __init__( + def __init__( # pylint: disable=too-many-arguments self, service: Service, report_id: int, @@ -214,7 +231,7 @@ class ReportOut: # pylint: disable=too-few-public-methods uuid = StandardUUID(_REPORT_UUID_NUM) - def __init__( + def __init__( # pylint: disable=too-many-arguments self, service: Service, report_id: int, @@ -357,12 +374,12 @@ def __init__( def _init_devices(self) -> None: # pylint: disable=too-many-branches,too-many-statements,too-many-locals - self.devices = [] + self.devices: List[Union[ReportIn, ReportOut]] = [] hid_descriptor = self.report_map global_table = [None] * 10 local_table = [None] * 3 - collections = [] + collections: List[Collection] = [] top_level_collections = [] i = 0 @@ -417,7 +434,7 @@ def _init_devices(self) -> None: i += size - def get_report_info(collection: Dict, reports: Dict) -> None: + def get_report_info(collection: Collection, reports: Dict) -> None: """Gets info about hid reports""" for main in collection["mains"]: if "type" in main: @@ -440,7 +457,7 @@ def get_report_info(collection: Dict, reports: Dict) -> None: ) usage_page = collection["globals"][0][0] usage = collection["locals"][0][0] - reports = {} + reports: Dict[int, Dict] = {} get_report_info(collection, reports) if len(reports) > 1: raise NotImplementedError( diff --git a/adafruit_ble/uuid/__init__.py b/adafruit_ble/uuid/__init__.py index 4459c6f..1b5498d 100644 --- a/adafruit_ble/uuid/__init__.py +++ b/adafruit_ble/uuid/__init__.py @@ -19,6 +19,9 @@ class UUID: """Top level UUID""" + bleio_uuid: _bleio.UUID + size: int + # TODO: Make subclassing _bleio.UUID work so we can just use it directly. # pylint: disable=no-member def __hash__(self): diff --git a/docs/conf.py b/docs/conf.py index f19b877..15fb40d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -4,9 +4,9 @@ # # SPDX-License-Identifier: MIT +import datetime import os import sys -import datetime sys.path.insert(0, os.path.abspath("..")) diff --git a/examples/ble_advertising_simpletest.py b/examples/ble_advertising_simpletest.py index 2c2815e..7a98199 100644 --- a/examples/ble_advertising_simpletest.py +++ b/examples/ble_advertising_simpletest.py @@ -6,7 +6,6 @@ """ from adafruit_ble import BLERadio - from adafruit_ble.advertising import Advertisement ble = BLERadio() diff --git a/examples/ble_bluefruit_color_picker.py b/examples/ble_bluefruit_color_picker.py index 4facbae..4cf7fdc 100755 --- a/examples/ble_bluefruit_color_picker.py +++ b/examples/ble_bluefruit_color_picker.py @@ -5,9 +5,8 @@ import board import neopixel - -from adafruit_bluefruit_connect.packet import Packet from adafruit_bluefruit_connect.color_packet import ColorPacket +from adafruit_bluefruit_connect.packet import Packet from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement diff --git a/examples/ble_bluefruit_connect_plotter.py b/examples/ble_bluefruit_connect_plotter.py index 542c714..1839d48 100755 --- a/examples/ble_bluefruit_connect_plotter.py +++ b/examples/ble_bluefruit_connect_plotter.py @@ -4,9 +4,11 @@ # CircuitPython Bluefruit LE Connect Plotter Example import time -import board -import analogio + import adafruit_thermistor +import analogio +import board + from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement from adafruit_ble.services.nordic import UARTService diff --git a/examples/ble_color_proximity.py b/examples/ble_color_proximity.py index 31c56d3..a72bb5d 100644 --- a/examples/ble_color_proximity.py +++ b/examples/ble_color_proximity.py @@ -8,9 +8,9 @@ """ import time + import board import digitalio - import neopixel from adafruit_ble import BLERadio diff --git a/examples/ble_current_time_service.py b/examples/ble_current_time_service.py index a62dfd0..6747c0c 100644 --- a/examples/ble_current_time_service.py +++ b/examples/ble_current_time_service.py @@ -7,6 +7,7 @@ """ import time + import adafruit_ble from adafruit_ble.advertising.standard import SolicitServicesAdvertisement from adafruit_ble.services.standard import CurrentTimeService diff --git a/examples/ble_demo_central.py b/examples/ble_demo_central.py index b7510ac..d2b6ff4 100644 --- a/examples/ble_demo_central.py +++ b/examples/ble_demo_central.py @@ -9,12 +9,11 @@ import time +import adafruit_lis3dh import board import busio import digitalio -import adafruit_lis3dh import neopixel - from adafruit_bluefruit_connect.color_packet import ColorPacket from adafruit_ble import BLERadio diff --git a/examples/ble_demo_periph.py b/examples/ble_demo_periph.py index a9b5f92..bc45e7b 100644 --- a/examples/ble_demo_periph.py +++ b/examples/ble_demo_periph.py @@ -8,10 +8,10 @@ import board import neopixel +from adafruit_bluefruit_connect.color_packet import ColorPacket # Only the packet classes that are imported will be known to Packet. from adafruit_bluefruit_connect.packet import Packet -from adafruit_bluefruit_connect.color_packet import ColorPacket from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement diff --git a/examples/ble_detailed_scan.py b/examples/ble_detailed_scan.py index 7f5dfeb..cad9a3a 100644 --- a/examples/ble_detailed_scan.py +++ b/examples/ble_detailed_scan.py @@ -6,7 +6,6 @@ # specialty advertising types. from adafruit_ble import BLERadio - from adafruit_ble.advertising import Advertisement from adafruit_ble.advertising.standard import ProvideServicesAdvertisement diff --git a/examples/ble_device_info_service.py b/examples/ble_device_info_service.py index 34ac32b..32086c2 100644 --- a/examples/ble_device_info_service.py +++ b/examples/ble_device_info_service.py @@ -7,6 +7,7 @@ """ import time + import adafruit_ble from adafruit_ble.advertising.standard import Advertisement from adafruit_ble.services.standard.device_info import DeviceInfoService diff --git a/examples/ble_hid_periph.py b/examples/ble_hid_periph.py index 6b8053a..6224a95 100644 --- a/examples/ble_hid_periph.py +++ b/examples/ble_hid_periph.py @@ -15,9 +15,8 @@ import adafruit_ble from adafruit_ble.advertising import Advertisement from adafruit_ble.advertising.standard import ProvideServicesAdvertisement -from adafruit_ble.services.standard.hid import HIDService from adafruit_ble.services.standard.device_info import DeviceInfoService - +from adafruit_ble.services.standard.hid import HIDService # Use default HID descriptor hid = HIDService() diff --git a/examples/ble_json_central.py b/examples/ble_json_central.py index 654f7f4..557b183 100644 --- a/examples/ble_json_central.py +++ b/examples/ble_json_central.py @@ -5,10 +5,10 @@ # Read sensor readings from peripheral BLE device using a JSON characteristic. from ble_json_service import SensorService + from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement - ble = BLERadio() connection = None diff --git a/examples/ble_json_peripheral.py b/examples/ble_json_peripheral.py index e079a8a..14d13fb 100644 --- a/examples/ble_json_peripheral.py +++ b/examples/ble_json_peripheral.py @@ -4,13 +4,14 @@ # Provide readable sensor values and writable settings to connected devices via JSON characteristic. -import time import random +import time + from ble_json_service import SensorService + from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement - # Create BLE radio, custom service, and advertisement. ble = BLERadio() service = SensorService() diff --git a/examples/ble_json_service.py b/examples/ble_json_service.py index 6351564..96a8d41 100644 --- a/examples/ble_json_service.py +++ b/examples/ble_json_service.py @@ -4,10 +4,10 @@ # Read sensor readings from peripheral BLE device using a JSON characteristic. -from adafruit_ble.uuid import VendorUUID -from adafruit_ble.services import Service from adafruit_ble.characteristics import Characteristic from adafruit_ble.characteristics.json import JSONCharacteristic +from adafruit_ble.services import Service +from adafruit_ble.uuid import VendorUUID # A custom service with two JSON characteristics for this device. The "sensors" characteristic diff --git a/examples/ble_packet_buffer_service.py b/examples/ble_packet_buffer_service.py index 510e4f0..a3c53aa 100644 --- a/examples/ble_packet_buffer_service.py +++ b/examples/ble_packet_buffer_service.py @@ -8,12 +8,12 @@ import _bleio -from adafruit_ble.services import Service from adafruit_ble.characteristics import ( Attribute, Characteristic, ComplexCharacteristic, ) +from adafruit_ble.services import Service from adafruit_ble.uuid import VendorUUID @@ -30,7 +30,7 @@ def __init__(self, uuid16): class PacketBufferCharacteristic(ComplexCharacteristic): - def __init__( + def __init__( # pylint: disable=too-many-arguments self, *, uuid=None, diff --git a/examples/ble_packet_buffer_test.py b/examples/ble_packet_buffer_test.py index 66c986f..3326ec5 100644 --- a/examples/ble_packet_buffer_test.py +++ b/examples/ble_packet_buffer_test.py @@ -11,7 +11,6 @@ from adafruit_ble import BLERadio from adafruit_ble.advertising.standard import ProvideServicesAdvertisement - ble = BLERadio() pbs = PacketBufferService() advertisement = ProvideServicesAdvertisement(pbs) diff --git a/optional_requirements.txt b/optional_requirements.txt index d4e27c4..7a17dbd 100644 --- a/optional_requirements.txt +++ b/optional_requirements.txt @@ -1,3 +1,6 @@ # SPDX-FileCopyrightText: 2022 Alec Delaney, for Adafruit Industries # # SPDX-License-Identifier: Unlicense + +circuitpython-stubs +mypy From 9e19cf4a6f21cc7cd42bb214d0acf506dc560cec Mon Sep 17 00:00:00 2001 From: elpekenin Date: Wed, 21 Aug 2024 00:18:37 +0200 Subject: [PATCH 2/4] (wip) more changes --- adafruit_ble/__init__.py | 17 ++++--- adafruit_ble/advertising/__init__.py | 13 +---- adafruit_ble/advertising/standard.py | 64 +++++++++++++++--------- adafruit_ble/characteristics/__init__.py | 10 ++-- adafruit_ble/characteristics/float.py | 6 ++- adafruit_ble/characteristics/int.py | 6 ++- adafruit_ble/characteristics/json.py | 8 +-- adafruit_ble/characteristics/stream.py | 8 +-- adafruit_ble/characteristics/string.py | 8 +-- adafruit_ble/services/__init__.py | 8 +-- adafruit_ble/uuid/__init__.py | 3 -- 11 files changed, 87 insertions(+), 64 deletions(-) diff --git a/adafruit_ble/__init__.py b/adafruit_ble/__init__.py index 16f85ec..2bee638 100755 --- a/adafruit_ble/__init__.py +++ b/adafruit_ble/__init__.py @@ -45,7 +45,10 @@ if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer - from adafruit_ble.uuid import UUID + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] + except ImportError: pass @@ -66,11 +69,11 @@ class BLEConnection: def __init__(self, bleio_connection: _bleio.Connection) -> None: self._bleio_connection = bleio_connection # _bleio.Service objects representing services found during discovery. - self._discovered_bleio_services: Dict[UUID, _bleio.Service] = {} + self._discovered_bleio_services: Dict[Uuid, _bleio.Service] = {} # Service objects that wrap remote services. - self._constructed_services: Dict[UUID, Service] = {} + self._constructed_services: Dict[Uuid, Service] = {} - def _discover_remote(self, uuid: UUID) -> Optional[_bleio.Service]: + def _discover_remote(self, uuid: Uuid) -> Optional[_bleio.Service]: remote_service = None if uuid in self._discovered_bleio_services: remote_service = self._discovered_bleio_services[uuid] @@ -83,7 +86,7 @@ def _discover_remote(self, uuid: UUID) -> Optional[_bleio.Service]: self._discovered_bleio_services[uuid] = remote_service return remote_service - def __contains__(self, key: Union[UUID, Type[Service]]) -> bool: + def __contains__(self, key: Union[Uuid, Type[Service]]) -> bool: """ Allows easy testing for a particular Service class or a particular UUID associated with this connection. @@ -101,7 +104,7 @@ def __contains__(self, key: Union[UUID, Type[Service]]) -> bool: uuid = key.uuid return self._discover_remote(uuid) is not None - def __getitem__(self, key: Union[UUID, Type[Service]]) -> Optional[Service]: + def __getitem__(self, key: Union[Uuid, Type[Service]]) -> Optional[Service]: """Return the Service for the given Service class or uuid, if any.""" uuid = key maybe_service = False @@ -109,7 +112,7 @@ def __getitem__(self, key: Union[UUID, Type[Service]]) -> Optional[Service]: uuid = key.uuid maybe_service = True - if isinstance(uuid) and uuid in self._constructed_services: + if uuid in self._constructed_services: return self._constructed_services[uuid] remote_service = self._discover_remote(uuid) diff --git a/adafruit_ble/advertising/__init__.py b/adafruit_ble/advertising/__init__.py index 6b80769..1d7957f 100644 --- a/adafruit_ble/advertising/__init__.py +++ b/adafruit_ble/advertising/__init__.py @@ -11,17 +11,7 @@ import struct try: - from typing import ( - TYPE_CHECKING, - Any, - Dict, - List, - Optional, - Tuple, - Type, - TypeVar, - Union, - ) + from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Type, TypeVar, Union from typing_extensions import Literal @@ -245,6 +235,7 @@ class Advertisement: address: Optional[Address] _rssi: Optional[int] + mutable: bool match_prefixes: Optional[Tuple[bytes, ...]] = () """For Advertisement, :py:attr:`~adafruit_ble.advertising.Advertisement.match_prefixes` diff --git a/adafruit_ble/advertising/standard.py b/adafruit_ble/advertising/standard.py index 1cdeea2..8a65191 100644 --- a/adafruit_ble/advertising/standard.py +++ b/adafruit_ble/advertising/standard.py @@ -28,8 +28,10 @@ try: from typing import ( + TYPE_CHECKING, Any, Collection, + Dict, Iterable, Iterator, List, @@ -39,15 +41,21 @@ Union, ) - from _bleio import ScanEntry + if TYPE_CHECKING: + from _bleio import ScanEntry - from adafruit_ble.services import Service - from adafruit_ble.uuid import UUID + from adafruit_ble.services import Service + from adafruit_ble.uuid import StandardUUID, VendorUUID - UsesServicesAdvertisement = Union[ - "ProvideServicesAdvertisement", "SolicitServicesAdvertisement" - ] + from . import DataDict + AdvServiceLists = Dict[int, "BoundServiceList"] + + UsesServicesAdvertisement = Union[ + "ProvideServicesAdvertisement", "SolicitServicesAdvertisement" + ] + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -69,28 +77,30 @@ def __init__( self._advertisement = advertisement self._standard_service_fields = standard_services self._vendor_service_fields = vendor_services - self._standard_services = [] - self._vendor_services = [] + self._standard_services: List[StandardUUID] = [] + self._vendor_services: List[VendorUUID] = [] for adt in standard_services: if adt in self._advertisement.data_dict: data = self._advertisement.data_dict[adt] for i in range(len(data) // 2): - uuid = StandardUUID(data[2 * i : 2 * (i + 1)]) - self._standard_services.append(uuid) + standard_uuid = StandardUUID(data[2 * i : 2 * (i + 1)]) + self._standard_services.append(standard_uuid) for adt in vendor_services: if adt in self._advertisement.data_dict: data = self._advertisement.data_dict[adt] for i in range(len(data) // 16): - uuid = VendorUUID(data[16 * i : 16 * (i + 1)]) - self._vendor_services.append(uuid) + vendor_uuid = VendorUUID(data[16 * i : 16 * (i + 1)]) + self._vendor_services.append(vendor_uuid) - def __contains__(self, key: Union[UUID, Service]) -> bool: + def __contains__(self, key: Union[Uuid, Service]) -> bool: uuid = key if hasattr(key, "uuid"): uuid = key.uuid return uuid in self._vendor_services or uuid in self._standard_services - def _update(self, adt: int, uuids: List[UUID]) -> None: + def _update( + self, adt: int, uuids: Union[List[StandardUUID], List[VendorUUID]] + ) -> None: if not uuids: # uuids is empty del self._advertisement.data_dict[adt] @@ -102,7 +112,7 @@ def _update(self, adt: int, uuids: List[UUID]) -> None: i += uuid_length self._advertisement.data_dict[adt] = b - def __iter__(self) -> Iterator[UUID]: + def __iter__(self) -> Iterator[Uuid]: all_services = list(self._standard_services) all_services.extend(self._vendor_services) return iter(all_services) @@ -149,10 +159,10 @@ def extend(self, services: Iterable[Service]) -> None: def __str__(self) -> str: data = [] - for service_uuid in self._standard_services: - data.append(str(service_uuid)) - for service_uuid in self._vendor_services: - data.append(str(service_uuid)) + for standard_service_uuid in self._standard_services: + data.append(str(standard_service_uuid)) + for vendor_service_uuid in self._vendor_services: + data.append(str(vendor_service_uuid)) return "".format(", ".join(data)) @@ -194,6 +204,8 @@ def __get__( class ProvideServicesAdvertisement(Advertisement): """Advertise what services that the device makes available upon connection.""" + adv_service_lists: AdvServiceLists + # Prefixes that match each ADT that can carry service UUIDs. match_prefixes = (b"\x02", b"\x03", b"\x06", b"\x07") services = ServiceList(standard_services=[0x02, 0x03], vendor_services=[0x06, 0x07]) @@ -223,6 +235,8 @@ def matches(cls, entry: ScanEntry) -> bool: class SolicitServicesAdvertisement(Advertisement): """Advertise what services the device would like to use over a connection.""" + adv_service_lists: AdvServiceLists + # Prefixes that match each ADT that can carry solicited service UUIDs. match_prefixes = (b"\x14", b"\x15") @@ -263,7 +277,9 @@ def __init__( self._company_id = company_id self._adt = advertising_data_type - self.data = OrderedDict() # makes field order match order they are set in + self.data: DataDict = ( + OrderedDict() + ) # makes field order match order they are set in self.company_id = company_id encoded_company = struct.pack(" None: diff --git a/adafruit_ble/characteristics/__init__.py b/adafruit_ble/characteristics/__init__.py index ce80042..d255671 100644 --- a/adafruit_ble/characteristics/__init__.py +++ b/adafruit_ble/characteristics/__init__.py @@ -23,7 +23,9 @@ from circuitpython_typing import ReadableBuffer from adafruit_ble.services import Service - from adafruit_ble.uuid import UUID + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -93,7 +95,7 @@ class Characteristic: def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = 0, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, @@ -199,7 +201,7 @@ class ComplexCharacteristic: def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = 0, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, @@ -276,7 +278,7 @@ def __init__( # pylint: disable=too-many-arguments self, struct_format, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = 0, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, diff --git a/adafruit_ble/characteristics/float.py b/adafruit_ble/characteristics/float.py index d1d96b0..9c1c328 100644 --- a/adafruit_ble/characteristics/float.py +++ b/adafruit_ble/characteristics/float.py @@ -22,7 +22,9 @@ from adafruit_ble.characteristics import Characteristic from adafruit_ble.services import Service - from adafruit_ble.uuid import UUID + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -37,7 +39,7 @@ class FloatCharacteristic(StructCharacteristic): def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = 0, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, diff --git a/adafruit_ble/characteristics/int.py b/adafruit_ble/characteristics/int.py index 03b9d13..ef33b21 100755 --- a/adafruit_ble/characteristics/int.py +++ b/adafruit_ble/characteristics/int.py @@ -22,7 +22,9 @@ from adafruit_ble.characteristics import Characteristic from adafruit_ble.services import Service - from adafruit_ble.uuid import UUID + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -40,7 +42,7 @@ def __init__( # pylint: disable=too-many-arguments min_value: int, max_value: int, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = 0, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, diff --git a/adafruit_ble/characteristics/json.py b/adafruit_ble/characteristics/json.py index 03cf5b1..c3ffb14 100644 --- a/adafruit_ble/characteristics/json.py +++ b/adafruit_ble/characteristics/json.py @@ -17,13 +17,15 @@ from . import Attribute, Characteristic try: - from typing import TYPE_CHECKING, Any, Optional, Type + from typing import TYPE_CHECKING, Any, Optional, Type, Union if TYPE_CHECKING: from circuitpython_typing import ReadableBuffer from adafruit_ble.services import Service - from adafruit_ble.uuid import UUID + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -38,7 +40,7 @@ class JSONCharacteristic(Characteristic): def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = Characteristic.READ, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, diff --git a/adafruit_ble/characteristics/stream.py b/adafruit_ble/characteristics/stream.py index e25d5e9..0cea03c 100755 --- a/adafruit_ble/characteristics/stream.py +++ b/adafruit_ble/characteristics/stream.py @@ -25,7 +25,9 @@ from adafruit_ble.characteristics import Characteristic from adafruit_ble.services import Service - from adafruit_ble.uuid import UUID + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -55,7 +57,7 @@ class StreamOut(ComplexCharacteristic): def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, timeout: float = 1.0, buffer_size: int = 64, properties: int = Characteristic.NOTIFY, @@ -90,7 +92,7 @@ class StreamIn(ComplexCharacteristic): def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, timeout: float = 1.0, buffer_size: int = 64, properties: int = (Characteristic.WRITE | Characteristic.WRITE_NO_RESPONSE), diff --git a/adafruit_ble/characteristics/string.py b/adafruit_ble/characteristics/string.py index ac224a0..aea6e30 100755 --- a/adafruit_ble/characteristics/string.py +++ b/adafruit_ble/characteristics/string.py @@ -21,7 +21,9 @@ from circuitpython_typing import ReadableBuffer from adafruit_ble.services import Service - from adafruit_ble.uuid import UUID + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] except ImportError: pass @@ -36,7 +38,7 @@ class StringCharacteristic(Characteristic): def __init__( # pylint: disable=too-many-arguments self, *, - uuid: Optional[UUID] = None, + uuid: Optional[Uuid] = None, properties: int = Characteristic.READ, read_perm: int = Attribute.OPEN, write_perm: int = Attribute.OPEN, @@ -79,7 +81,7 @@ class FixedStringCharacteristic(Characteristic): """Fixed strings are set once when bound and unchanged after.""" def __init__( - self, *, uuid: Optional[UUID] = None, read_perm: int = Attribute.OPEN + self, *, uuid: Optional[Uuid] = None, read_perm: int = Attribute.OPEN ) -> None: super().__init__( uuid=uuid, diff --git a/adafruit_ble/services/__init__.py b/adafruit_ble/services/__init__.py index 42e1bce..dd5ff00 100644 --- a/adafruit_ble/services/__init__.py +++ b/adafruit_ble/services/__init__.py @@ -15,12 +15,14 @@ from ..characteristics import Characteristic, ComplexCharacteristic try: - from typing import TYPE_CHECKING, Dict, Optional + from typing import TYPE_CHECKING, Dict, Optional, Union except ImportError: pass if TYPE_CHECKING: - from adafruit_ble.uuid import UUID + from adafruit_ble.uuid import StandardUUID, VendorUUID + + Uuid = Union[StandardUUID, VendorUUID] __version__ = "0.0.0+auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git" @@ -39,7 +41,7 @@ class Service: instance for the connection's peer. """ - uuid: UUID + uuid: Uuid def __init__( self, diff --git a/adafruit_ble/uuid/__init__.py b/adafruit_ble/uuid/__init__.py index 1b5498d..4459c6f 100644 --- a/adafruit_ble/uuid/__init__.py +++ b/adafruit_ble/uuid/__init__.py @@ -19,9 +19,6 @@ class UUID: """Top level UUID""" - bleio_uuid: _bleio.UUID - size: int - # TODO: Make subclassing _bleio.UUID work so we can just use it directly. # pylint: disable=no-member def __hash__(self): From c86b3b1b52ef5a93bb16922056868c858f83e57c Mon Sep 17 00:00:00 2001 From: elpekenin Date: Thu, 22 Aug 2024 00:25:28 +0200 Subject: [PATCH 3/4] (wip) progress --- .pre-commit-config.yaml | 4 ++ adafruit_ble/__init__.py | 41 ++++++----- adafruit_ble/advertising/__init__.py | 48 +++++++++---- adafruit_ble/advertising/standard.py | 89 ++++++++++++++++++++---- adafruit_ble/characteristics/__init__.py | 1 + adafruit_ble/characteristics/float.py | 11 +-- adafruit_ble/characteristics/int.py | 11 +-- adafruit_ble/characteristics/string.py | 1 + adafruit_ble/services/standard/hid.py | 8 ++- docs/conf.py | 2 +- examples/ble_color_proximity.py | 5 +- examples/ble_current_time_service.py | 9 +++ examples/ble_demo_central.py | 8 ++- examples/ble_device_info_service.py | 7 ++ examples/ble_hid_periph.py | 11 +-- examples/ble_json_central.py | 3 + examples/ble_packet_buffer_client.py | 11 ++- examples/ble_simpletest.py | 3 +- examples/ble_uart_echo_client.py | 8 ++- 19 files changed, 209 insertions(+), 72 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ba466bc..2a8cf3e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -45,3 +45,7 @@ repos: files: "^tests/" args: - --disable=missing-docstring,consider-using-f-string,duplicate-code + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.11.0 + hooks: + - id: mypy diff --git a/adafruit_ble/__init__.py b/adafruit_ble/__init__.py index 2bee638..05dd59b 100755 --- a/adafruit_ble/__init__.py +++ b/adafruit_ble/__init__.py @@ -27,16 +27,19 @@ from .advertising import Advertisement from .services import Service +from .uuid import UUID try: from typing import ( TYPE_CHECKING, Dict, Iterator, + List, NoReturn, Optional, Tuple, Type, + TypeVar, Union, ) @@ -49,6 +52,8 @@ Uuid = Union[StandardUUID, VendorUUID] + Adv = TypeVar("Adv", bound=Advertisement) + Ser = TypeVar("Ser", bound=Service) except ImportError: pass @@ -99,16 +104,15 @@ def __contains__(self, key: Union[Uuid, Type[Service]]) -> bool: if StandardUUID(0x1234) in connection: # do something """ - uuid = key - if hasattr(key, "uuid"): - uuid = key.uuid + uuid = key if isinstance(key, UUID) else key.uuid return self._discover_remote(uuid) is not None - def __getitem__(self, key: Union[Uuid, Type[Service]]) -> Optional[Service]: + def __getitem__(self, key: Union[Uuid, Type[Ser]]) -> Optional[Ser]: """Return the Service for the given Service class or uuid, if any.""" - uuid = key - maybe_service = False - if hasattr(key, "uuid"): + if isinstance(key, UUID): + uuid = key + maybe_service = False + else: uuid = key.uuid maybe_service = True @@ -118,7 +122,7 @@ def __getitem__(self, key: Union[Uuid, Type[Service]]) -> Optional[Service]: remote_service = self._discover_remote(uuid) if remote_service: constructed_service = None - if maybe_service: + if maybe_service and not isinstance(key, UUID): constructed_service = key(service=remote_service) self._constructed_services[uuid] = constructed_service return constructed_service @@ -239,7 +243,7 @@ def stop_advertising(self) -> None: def start_scan( # pylint: disable=too-many-arguments self, - *advertisement_types: Type[Advertisement], + *advertisement_types: Type[Adv], buffer_size: int = 512, extended: bool = False, timeout: Optional[float] = None, @@ -247,7 +251,7 @@ def start_scan( # pylint: disable=too-many-arguments window: float = 0.1, minimum_rssi: int = -80, active: bool = True, - ) -> Iterator[Advertisement]: + ) -> Iterator[Adv]: """ Starts scanning. Returns an iterator of advertisement objects of the types given in advertisement_types. The iterator will block until an advertisement is heard or the scan @@ -276,10 +280,10 @@ def start_scan( # pylint: disable=too-many-arguments If none are given then `Advertisement` objects will be returned. :rtype: iterable """ - if not advertisement_types: - advertisement_types = (Advertisement,) - all_prefix_bytes = tuple(adv.get_prefix_bytes() for adv in advertisement_types) + adv_types: Tuple[Type[Adv], ...] = advertisement_types or (Advertisement,) + + all_prefix_bytes = tuple(adv.get_prefix_bytes() for adv in adv_types) # If one of the advertisement_types has no prefix restrictions, then # no prefixes should be specified at all, so we match everything. @@ -296,14 +300,14 @@ def start_scan( # pylint: disable=too-many-arguments active=active, ): adv_type = Advertisement - for possible_type in advertisement_types: + for possible_type in adv_types: if possible_type.matches(entry) and issubclass(possible_type, adv_type): adv_type = possible_type # Double check the adv_type is requested. We may return Advertisement accidentally # otherwise. - if adv_type not in advertisement_types: + if adv_type not in adv_types: continue - advertisement = adv_type(entry=entry) + advertisement: Adv = adv_type(entry=entry) if advertisement: yield advertisement @@ -329,8 +333,7 @@ def connect( peer_ = peer else: if peer.address is None: - msg = "Unreachable?" - raise RuntimeError(msg) + raise RuntimeError peer_ = peer.address connection = self._adapter.connect(peer_, timeout=timeout) @@ -348,7 +351,7 @@ def connections(self) -> Tuple[Optional[BLEConnection], ...]: """A tuple of active `BLEConnection` objects.""" self._clean_connection_cache() connections = self._adapter.connections - wrapped_connections = [None] * len(connections) + wrapped_connections: List[Optional[BLEConnection]] = [None] * len(connections) for i, connection in enumerate(connections): if connection not in self._connection_cache: self._connection_cache[connection] = BLEConnection(connection) diff --git a/adafruit_ble/advertising/__init__.py b/adafruit_ble/advertising/__init__.py index 1d7957f..c68f9bb 100644 --- a/adafruit_ble/advertising/__init__.py +++ b/adafruit_ble/advertising/__init__.py @@ -11,7 +11,17 @@ import struct try: - from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Type, TypeVar, Union + from typing import ( + TYPE_CHECKING, + Any, + Dict, + List, + Optional, + Tuple, + Type, + TypeVar, + Union, + ) from typing_extensions import Literal @@ -22,7 +32,7 @@ "LazyObjectField_GivenClass" ) - DataDict = Dict[Any, bytes] + DataDict = Dict[int, Union[bytes, List[bytes]]] except ImportError: pass @@ -42,7 +52,7 @@ def decode_data(data: bytes, *, key_encoding: str = "B") -> DataDict: """Helper which decodes length encoded structures into a dictionary with the given key encoding.""" i = 0 - data_dict = {} + data_dict: DataDict = {} key_size = struct.calcsize(key_encoding) while i < len(data): item_length = data[i] @@ -50,12 +60,15 @@ def decode_data(data: bytes, *, key_encoding: str = "B") -> DataDict: if item_length == 0: break key = struct.unpack_from(key_encoding, data, i)[0] - if key not in data_dict: - data_dict[key] = b"" - value = data[i + key_size : i + item_length] - data_dict[key] += value - + if key in data_dict: + cur_value = data_dict[key] + if isinstance(cur_value, list): + cur_value.append(value) + else: + data_dict[key] = [cur_value, value] + else: + data_dict[key] = value i += item_length return data_dict @@ -133,7 +146,10 @@ def __init__( self._adt = advertising_data_type self.flags = 0 if self._adt in self._advertisement.data_dict: - self.flags = self._advertisement.data_dict[self._adt][0] + value = self._advertisement.data_dict[self._adt] + if isinstance(value, list): + raise RuntimeError + self.flags = value[0] def __len__(self) -> Literal[1]: return 1 @@ -166,7 +182,10 @@ def __get__( return self if self._adt not in obj.data_dict: return None - return str(obj.data_dict[self._adt], "utf-8") + value = obj.data_dict[self._adt] + if isinstance(value, list): + raise RuntimeError + return str(value, "utf-8") def __set__(self, obj: "Advertisement", value: str) -> None: obj.data_dict[self._adt] = value.encode("utf-8") @@ -186,7 +205,10 @@ def __get__( return self if self._adt not in obj.data_dict: return None - return struct.unpack(self._format, obj.data_dict[self._adt])[0] + value = obj.data_dict[self._adt] + if isinstance(value, list): + raise RuntimeError + return struct.unpack(self._format, value)[0] def __set__(self, obj: "Advertisement", value: Any) -> None: obj.data_dict[self._adt] = struct.pack(self._format, value) @@ -233,10 +255,6 @@ class Advertisement: bytestring prefixes to match against the multiple data structures in the advertisement. """ - address: Optional[Address] - _rssi: Optional[int] - mutable: bool - match_prefixes: Optional[Tuple[bytes, ...]] = () """For Advertisement, :py:attr:`~adafruit_ble.advertising.Advertisement.match_prefixes` will always return ``True``. Subclasses may override this value.""" diff --git a/adafruit_ble/advertising/standard.py b/adafruit_ble/advertising/standard.py index 8a65191..168a64e 100644 --- a/adafruit_ble/advertising/standard.py +++ b/adafruit_ble/advertising/standard.py @@ -26,6 +26,7 @@ to_hex, ) +TYPE_CHECKING = False try: from typing import ( TYPE_CHECKING, @@ -36,9 +37,12 @@ Iterator, List, Optional, + Protocol, Tuple, Type, + TypeVar, Union, + overload, ) if TYPE_CHECKING: @@ -57,6 +61,18 @@ Uuid = Union[StandardUUID, VendorUUID] + class WithManufacturerData: + """Stub type, anything with a manufacturer_data attribute.""" + + manufacturer_data: ManufacturerData + + AdvertisementWithManufacturerData = TypeVar( + "AdvertisementWithManufacturerData", + Advertisement, + WithManufacturerData, + ) + + except ImportError: pass @@ -113,12 +129,12 @@ def _update( self._advertisement.data_dict[adt] = b def __iter__(self) -> Iterator[Uuid]: - all_services = list(self._standard_services) + all_services: List[Uuid] = list(self._standard_services) all_services.extend(self._vendor_services) return iter(all_services) # TODO: Differentiate between complete and incomplete lists. - def append(self, service: Service) -> None: + def append(self, service: Type[Service]) -> None: """Append a service to the list.""" if ( isinstance(service.uuid, StandardUUID) @@ -184,11 +200,27 @@ def _present(self, obj: UsesServicesAdvertisement) -> bool: return True return False + if TYPE_CHECKING: + + @overload + def __get__( + self, obj: None, cls: Optional[Type[UsesServicesAdvertisement]] = None + ) -> ServiceList: + ... + + @overload + def __get__( + self, + obj: UsesServicesAdvertisement, + cls: Optional[Type[UsesServicesAdvertisement]] = None, + ) -> Union[BoundServiceList, Tuple[()]]: + ... + def __get__( self, obj: Optional[UsesServicesAdvertisement], - cls: Type[UsesServicesAdvertisement], - ) -> Union[UsesServicesAdvertisement, Tuple[()], "ServiceList"]: + cls: Optional[Type[UsesServicesAdvertisement]] = None, + ) -> Union[ServiceList, Union[BoundServiceList, Tuple[()]]]: if obj is None: return self if not self._present(obj) and not obj.mutable: @@ -219,6 +251,8 @@ def __init__(self, *services: Service, entry: Optional[ScanEntry] = None) -> Non # Attributes are supplied by entry. return if services: + if not self.services: + raise RuntimeError self.services.extend(services) self.connectable = True self.flags.general_discovery = True @@ -250,6 +284,8 @@ def __init__(self, *services: Service, entry: Optional[ScanEntry] = None) -> Non raise ValueError("Supply services or entry, not both") # Attributes are supplied by entry. return + if not self.solicited_services: + raise RuntimeError self.solicited_services.extend(services) self.connectable = True self.flags.general_discovery = True @@ -265,6 +301,8 @@ class ManufacturerData(AdvertisingDataField): sub-class. """ + data: DataDict + def __init__( self, obj: UsesServicesAdvertisement, @@ -277,9 +315,7 @@ def __init__( self._company_id = company_id self._adt = advertising_data_type - self.data: DataDict = ( - OrderedDict() - ) # makes field order match order they are set in + self.data = OrderedDict() # makes field order match order they are set in self.company_id = company_id encoded_company = struct.pack(" int: @@ -330,9 +371,29 @@ def __init__( # Mostly, this is to raise a ValueError if field_names has invalid entries self.mdf_tuple = namedtuple("mdf_tuple", self.field_names) + if TYPE_CHECKING: + + @overload + def __get__( + self, + obj: None, + cls: Optional[Type[AdvertisementWithManufacturerData]] = None, + ) -> ManufacturerDataField: + ... + + @overload + def __get__( + self, + obj: AdvertisementWithManufacturerData, + cls: Optional[Type[AdvertisementWithManufacturerData]] = None, + ) -> Optional[Tuple]: + ... + def __get__( - self, obj: Optional[Advertisement], cls: Type[Advertisement] - ) -> Optional[Union["ManufacturerDataField", Tuple, tuple]]: + self, + obj: Optional[AdvertisementWithManufacturerData], + cls: Optional[Type[AdvertisementWithManufacturerData]] = None, + ) -> Union[ManufacturerDataField, Optional[Tuple]]: if obj is None: return self if self._key not in obj.manufacturer_data.data: @@ -360,7 +421,7 @@ def __get__( unpacked_[i] = val[0] return tuple(unpacked) - def __set__(self, obj: "Advertisement", value: Any) -> None: + def __set__(self, obj: AdvertisementWithManufacturerData, value: Any) -> None: if not obj.mutable: raise AttributeError() if isinstance(value, tuple) and ( @@ -392,7 +453,7 @@ def __init__(self, service: Service) -> None: self._prefix = bytes(service.uuid) def __get__( # pylint: disable=too-many-return-statements,too-many-branches - self, obj: Optional[Service], cls: Type[Service] + self, obj: Optional[Advertisement], cls: Type[Advertisement] ) -> Optional[Union["ServiceData", memoryview]]: if obj is None: return self diff --git a/adafruit_ble/characteristics/__init__.py b/adafruit_ble/characteristics/__init__.py index d255671..ddf8489 100644 --- a/adafruit_ble/characteristics/__init__.py +++ b/adafruit_ble/characteristics/__init__.py @@ -16,6 +16,7 @@ from ..attributes import Attribute +TYPE_CHECKING = False try: from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Type, Union, overload diff --git a/adafruit_ble/characteristics/float.py b/adafruit_ble/characteristics/float.py index 9c1c328..838f448 100644 --- a/adafruit_ble/characteristics/float.py +++ b/adafruit_ble/characteristics/float.py @@ -14,6 +14,7 @@ from . import Attribute, StructCharacteristic +TYPE_CHECKING = False try: from typing import TYPE_CHECKING, Optional, Type, Union, overload @@ -57,8 +58,9 @@ def __init__( # pylint: disable=too-many-arguments ) if TYPE_CHECKING: - - @overload + # NOTE(elpekenin): return type doesn't match parent, but that's + # not a problem + @overload # type: ignore[override] def __get__( self, obj: None, cls: Optional[Type[Service]] = None ) -> Characteristic: @@ -75,9 +77,8 @@ def __get__( return self get = super().__get__(obj) if get is None: - msg = "Unreachable?" - raise RuntimeError(msg) + raise RuntimeError return get[0] # pylint: disable=unsubscriptable-object - def __set__(self, obj: Service, value: float) -> None: + def __set__(self, obj: Service, value: float) -> None: # type: ignore[override] super().__set__(obj, (value,)) diff --git a/adafruit_ble/characteristics/int.py b/adafruit_ble/characteristics/int.py index ef33b21..f2109fb 100755 --- a/adafruit_ble/characteristics/int.py +++ b/adafruit_ble/characteristics/int.py @@ -14,6 +14,7 @@ from . import Attribute, StructCharacteristic +TYPE_CHECKING = False try: from typing import TYPE_CHECKING, Optional, Type, Union, overload @@ -65,8 +66,9 @@ def __init__( # pylint: disable=too-many-arguments ) if TYPE_CHECKING: - - @overload + # NOTE(elpekenin): return type doesn't match parent, but that's + # not a problem + @overload # type: ignore[override] def __get__( self, obj: None, cls: Optional[Type[Service]] = None ) -> Characteristic: @@ -83,11 +85,10 @@ def __get__( return self get = super().__get__(obj) if get is None: - msg = "Unreachable?" - raise RuntimeError(msg) + raise RuntimeError return get[0] # pylint: disable=unsubscriptable-object - def __set__(self, obj: Service, value: int) -> None: + def __set__(self, obj: Service, value: int) -> None: # type: ignore[override] if not self._min_value <= value <= self._max_value: raise ValueError("out of range") super().__set__(obj, (value,)) diff --git a/adafruit_ble/characteristics/string.py b/adafruit_ble/characteristics/string.py index aea6e30..bcd3a44 100755 --- a/adafruit_ble/characteristics/string.py +++ b/adafruit_ble/characteristics/string.py @@ -14,6 +14,7 @@ from . import Attribute, Characteristic +TYPE_CHECKING = False try: from typing import TYPE_CHECKING, Optional, Type, Union, overload diff --git a/adafruit_ble/services/standard/hid.py b/adafruit_ble/services/standard/hid.py index 1c6251d..c5f46c0 100755 --- a/adafruit_ble/services/standard/hid.py +++ b/adafruit_ble/services/standard/hid.py @@ -26,7 +26,9 @@ from .. import Service try: - from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Union + from typing import TYPE_CHECKING, Dict, List, Optional, Union + + from typing_extensions import Literal if TYPE_CHECKING: from typing import TypedDict @@ -220,7 +222,7 @@ def __init__( # pylint: disable=too-many-arguments initial_value=struct.pack(" None: + def send_report(self, report: bytearray) -> None: """Send a report to the peers""" self._characteristic.value = report @@ -266,7 +268,7 @@ def __init__( # pylint: disable=too-many-arguments ) @property - def report(self) -> Dict: + def report(self) -> bytearray: """The HID OUT report""" return self._characteristic.value diff --git a/docs/conf.py b/docs/conf.py index 15fb40d..a49c58f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -123,7 +123,7 @@ # -- Options for LaTeX output --------------------------------------------- -latex_elements = { +latex_elements: dict[str, str] = { # The paper size ('letterpaper' or 'a4paper'). # # 'papersize': 'letterpaper', diff --git a/examples/ble_color_proximity.py b/examples/ble_color_proximity.py index a72bb5d..124e466 100644 --- a/examples/ble_color_proximity.py +++ b/examples/ble_color_proximity.py @@ -73,10 +73,13 @@ else: closest = None closest_rssi = -80 - closest_last_time = 0 + closest_last_time: float = 0 print("Scanning for colors") while not slide_switch.value: for entry in ble.start_scan(AdafruitColor, minimum_rssi=-100, timeout=1): + if entry.rssi is None: + raise RuntimeError + if slide_switch.value: break now = time.monotonic() diff --git a/examples/ble_current_time_service.py b/examples/ble_current_time_service.py index 6747c0c..4f9ef48 100644 --- a/examples/ble_current_time_service.py +++ b/examples/ble_current_time_service.py @@ -15,6 +15,8 @@ radio = adafruit_ble.BLERadio() a = SolicitServicesAdvertisement() a.complete_name = "TimePlease" +if not a.solicited_services: + raise RuntimeError a.solicited_services.append(CurrentTimeService) radio.start_advertising(a) @@ -25,10 +27,17 @@ while radio.connected: for connection in radio.connections: + if connection is None: + raise RuntimeError + if not connection.paired: connection.pair() print("paired") + cts = connection[CurrentTimeService] + if cts is None: + raise RuntimeError + print(cts.current_time) time.sleep(1) diff --git a/examples/ble_demo_central.py b/examples/ble_demo_central.py index d2b6ff4..07bc574 100644 --- a/examples/ble_demo_central.py +++ b/examples/ble_demo_central.py @@ -41,6 +41,9 @@ def scale(value): # See if any existing connections are providing UARTService. if ble.connected: for connection in ble.connections: + if connection is None: + raise RuntimeError + if UARTService in connection: uart_connection = connection break @@ -63,7 +66,10 @@ def scale(value): neopixels.fill(color) color_packet = ColorPacket(color) try: - uart_connection[UARTService].write(color_packet.to_bytes()) + service = uart_connection[UARTService] + if service is None: + raise RuntimeError + service.write(color_packet.to_bytes()) except OSError: try: uart_connection.disconnect() diff --git a/examples/ble_device_info_service.py b/examples/ble_device_info_service.py index 32086c2..0c4ec02 100644 --- a/examples/ble_device_info_service.py +++ b/examples/ble_device_info_service.py @@ -29,10 +29,17 @@ while radio.connected: for connection in radio.connections: + if connection is None: + raise RuntimeError + if not connection.paired: connection.pair() print("paired") + dis = connection[DeviceInfoService] + if dis is None: + raise RuntimeError + print(dis.manufacturer) print(dis.model_number) time.sleep(60) diff --git a/examples/ble_hid_periph.py b/examples/ble_hid_periph.py index 6224a95..51dea4d 100644 --- a/examples/ble_hid_periph.py +++ b/examples/ble_hid_periph.py @@ -29,8 +29,9 @@ ble = adafruit_ble.BLERadio() if ble.connected: - for c in ble.connections: - c.disconnect() + for conn in ble.connections: + if conn is not None: + conn.disconnect() print("advertising") ble.start_advertising(advertisement, scan_response) @@ -42,9 +43,9 @@ pass print("Start typing:") while ble.connected: - c = sys.stdin.read(1) - sys.stdout.write(c) - kl.write(c) + char = sys.stdin.read(1) + sys.stdout.write(char) + kl.write(char) # print("sleeping") time.sleep(0.1) ble.start_advertising(advertisement) diff --git a/examples/ble_json_central.py b/examples/ble_json_central.py index 557b183..5b933e7 100644 --- a/examples/ble_json_central.py +++ b/examples/ble_json_central.py @@ -24,6 +24,9 @@ if connection and connection.connected: service = connection[SensorService] + if service is None: + raise RuntimeError + service.settings = {"unit": "celsius"} # 'fahrenheit' while connection.connected: print("Sensors: ", service.sensors) diff --git a/examples/ble_packet_buffer_client.py b/examples/ble_packet_buffer_client.py index 158ac61..a436ba9 100644 --- a/examples/ble_packet_buffer_client.py +++ b/examples/ble_packet_buffer_client.py @@ -17,13 +17,22 @@ buf = bytearray(512) while True: while ble.connected and any( - PacketBufferService in connection for connection in ble.connections + map( + lambda conn: conn is not None and PacketBufferService in conn, + ble.connections, + ) ): for connection in ble.connections: + if connection is None: + raise RuntimeError + if PacketBufferService not in connection: continue print("echo") + pb = connection[PacketBufferService] + if pb is None: + raise RuntimeError pb.write(b"echo") # Returns 0 if nothing was read. packet_len = pb.readinto(buf) diff --git a/examples/ble_simpletest.py b/examples/ble_simpletest.py index 03a44e2..5894c96 100644 --- a/examples/ble_simpletest.py +++ b/examples/ble_simpletest.py @@ -7,12 +7,13 @@ """ from adafruit_ble import BLERadio +from adafruit_ble.advertising import Advertisement ble = BLERadio() print("scanning") found = set() scan_responses = set() -for advertisement in ble.start_scan(): +for advertisement in ble.start_scan(Advertisement): addr = advertisement.address if advertisement.scan_response and addr not in scan_responses: scan_responses.add(addr) diff --git a/examples/ble_uart_echo_client.py b/examples/ble_uart_echo_client.py index 48a0f23..263369b 100644 --- a/examples/ble_uart_echo_client.py +++ b/examples/ble_uart_echo_client.py @@ -14,13 +14,19 @@ ble = BLERadio() while True: while ble.connected and any( - UARTService in connection for connection in ble.connections + map(lambda conn: conn is not None and UARTService in conn, ble.connections) ): for connection in ble.connections: + if connection is None: + raise RuntimeError + if UARTService not in connection: continue print("echo") + uart = connection[UARTService] + if uart is None: + raise RuntimeError uart.write(b"echo") # Returns b'' if nothing was read. one_byte = uart.read(4) From 8bcb474189774d6efcccd0866fd53d37acd93b9f Mon Sep 17 00:00:00 2001 From: elpekenin Date: Thu, 22 Aug 2024 18:02:57 +0200 Subject: [PATCH 4/4] mypy is happy --- adafruit_ble/__init__.py | 25 ++++++-------- adafruit_ble/advertising/__init__.py | 12 +++---- adafruit_ble/advertising/standard.py | 50 +++++++++++++++------------ adafruit_ble/characteristics/float.py | 3 +- adafruit_ble/characteristics/int.py | 3 +- examples/ble_color_proximity.py | 4 +-- examples/ble_current_time_service.py | 9 ++--- examples/ble_demo_central.py | 8 ++--- examples/ble_device_info_service.py | 6 ++-- examples/ble_json_central.py | 4 +-- examples/ble_packet_buffer_client.py | 8 ++--- examples/ble_uart_echo_client.py | 8 ++--- 12 files changed, 67 insertions(+), 73 deletions(-) diff --git a/adafruit_ble/__init__.py b/adafruit_ble/__init__.py index 05dd59b..c8725f4 100755 --- a/adafruit_ble/__init__.py +++ b/adafruit_ble/__init__.py @@ -39,7 +39,6 @@ Optional, Tuple, Type, - TypeVar, Union, ) @@ -52,9 +51,6 @@ Uuid = Union[StandardUUID, VendorUUID] - Adv = TypeVar("Adv", bound=Advertisement) - Ser = TypeVar("Ser", bound=Service) - except ImportError: pass @@ -107,7 +103,7 @@ def __contains__(self, key: Union[Uuid, Type[Service]]) -> bool: uuid = key if isinstance(key, UUID) else key.uuid return self._discover_remote(uuid) is not None - def __getitem__(self, key: Union[Uuid, Type[Ser]]) -> Optional[Ser]: + def __getitem__(self, key: Union[Uuid, Type[Service]]) -> Optional[Service]: """Return the Service for the given Service class or uuid, if any.""" if isinstance(key, UUID): uuid = key @@ -243,7 +239,7 @@ def stop_advertising(self) -> None: def start_scan( # pylint: disable=too-many-arguments self, - *advertisement_types: Type[Adv], + *advertisement_types: Type[Advertisement], buffer_size: int = 512, extended: bool = False, timeout: Optional[float] = None, @@ -251,7 +247,7 @@ def start_scan( # pylint: disable=too-many-arguments window: float = 0.1, minimum_rssi: int = -80, active: bool = True, - ) -> Iterator[Adv]: + ) -> Iterator[Advertisement]: """ Starts scanning. Returns an iterator of advertisement objects of the types given in advertisement_types. The iterator will block until an advertisement is heard or the scan @@ -280,10 +276,10 @@ def start_scan( # pylint: disable=too-many-arguments If none are given then `Advertisement` objects will be returned. :rtype: iterable """ + if not advertisement_types: + advertisement_types = (Advertisement,) - adv_types: Tuple[Type[Adv], ...] = advertisement_types or (Advertisement,) - - all_prefix_bytes = tuple(adv.get_prefix_bytes() for adv in adv_types) + all_prefix_bytes = tuple(adv.get_prefix_bytes() for adv in advertisement_types) # If one of the advertisement_types has no prefix restrictions, then # no prefixes should be specified at all, so we match everything. @@ -300,14 +296,14 @@ def start_scan( # pylint: disable=too-many-arguments active=active, ): adv_type = Advertisement - for possible_type in adv_types: + for possible_type in advertisement_types: if possible_type.matches(entry) and issubclass(possible_type, adv_type): adv_type = possible_type # Double check the adv_type is requested. We may return Advertisement accidentally # otherwise. - if adv_type not in adv_types: + if adv_type not in advertisement_types: continue - advertisement: Adv = adv_type(entry=entry) + advertisement = adv_type(entry=entry) if advertisement: yield advertisement @@ -332,8 +328,7 @@ def connect( if isinstance(peer, _bleio.Address): peer_ = peer else: - if peer.address is None: - raise RuntimeError + assert peer.address is not None peer_ = peer.address connection = self._adapter.connect(peer_, timeout=timeout) diff --git a/adafruit_ble/advertising/__init__.py b/adafruit_ble/advertising/__init__.py index c68f9bb..ba49376 100644 --- a/adafruit_ble/advertising/__init__.py +++ b/adafruit_ble/advertising/__init__.py @@ -147,8 +147,7 @@ def __init__( self.flags = 0 if self._adt in self._advertisement.data_dict: value = self._advertisement.data_dict[self._adt] - if isinstance(value, list): - raise RuntimeError + assert not isinstance(value, list) self.flags = value[0] def __len__(self) -> Literal[1]: @@ -183,8 +182,7 @@ def __get__( if self._adt not in obj.data_dict: return None value = obj.data_dict[self._adt] - if isinstance(value, list): - raise RuntimeError + assert not isinstance(value, list) return str(value, "utf-8") def __set__(self, obj: "Advertisement", value: str) -> None: @@ -206,8 +204,7 @@ def __get__( if self._adt not in obj.data_dict: return None value = obj.data_dict[self._adt] - if isinstance(value, list): - raise RuntimeError + assert not isinstance(value, list) return struct.unpack(self._format, value)[0] def __set__(self, obj: "Advertisement", value: Any) -> None: @@ -255,6 +252,9 @@ class Advertisement: bytestring prefixes to match against the multiple data structures in the advertisement. """ + address: Optional[Address] + _rssi: Optional[int] + match_prefixes: Optional[Tuple[bytes, ...]] = () """For Advertisement, :py:attr:`~adafruit_ble.advertising.Advertisement.match_prefixes` will always return ``True``. Subclasses may override this value.""" diff --git a/adafruit_ble/advertising/standard.py b/adafruit_ble/advertising/standard.py index 168a64e..8c077fb 100644 --- a/adafruit_ble/advertising/standard.py +++ b/adafruit_ble/advertising/standard.py @@ -37,10 +37,9 @@ Iterator, List, Optional, - Protocol, Tuple, Type, - TypeVar, + TypeGuard, Union, overload, ) @@ -61,18 +60,11 @@ Uuid = Union[StandardUUID, VendorUUID] - class WithManufacturerData: - """Stub type, anything with a manufacturer_data attribute.""" + class WithManufacturerData(Advertisement): + """Stub type, any subclass of Advertisement which has manufacturer data.""" manufacturer_data: ManufacturerData - AdvertisementWithManufacturerData = TypeVar( - "AdvertisementWithManufacturerData", - Advertisement, - WithManufacturerData, - ) - - except ImportError: pass @@ -80,6 +72,13 @@ class WithManufacturerData: __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git" +def has_manufacturer_data(obj: Advertisement) -> TypeGuard[WithManufacturerData]: + """Tiny function for type correctness.""" + return hasattr(obj, "manufacturer_data") and isinstance( + obj.manufacturer_data, ManufacturerData + ) + + class BoundServiceList: """Sequence-like object of Service UUID objects. It stores both standard and vendor UUIDs.""" @@ -251,8 +250,7 @@ def __init__(self, *services: Service, entry: Optional[ScanEntry] = None) -> Non # Attributes are supplied by entry. return if services: - if not self.services: - raise RuntimeError + assert self.services self.services.extend(services) self.connectable = True self.flags.general_discovery = True @@ -284,8 +282,7 @@ def __init__(self, *services: Service, entry: Optional[ScanEntry] = None) -> Non raise ValueError("Supply services or entry, not both") # Attributes are supplied by entry. return - if not self.solicited_services: - raise RuntimeError + assert self.solicited_services self.solicited_services.extend(services) self.connectable = True self.flags.general_discovery = True @@ -368,8 +365,12 @@ def __init__( self._entry_length = struct.calcsize(value_format) self.field_names = field_names if field_names: + assert self.field_names is not None # Mostly, this is to raise a ValueError if field_names has invalid entries - self.mdf_tuple = namedtuple("mdf_tuple", self.field_names) + self.mdf_tuple = namedtuple( # type: ignore[misc, arg-type] + "mdf_tuple", + self.field_names, + ) if TYPE_CHECKING: @@ -377,29 +378,31 @@ def __init__( def __get__( self, obj: None, - cls: Optional[Type[AdvertisementWithManufacturerData]] = None, + cls: Optional[Type[Advertisement]] = None, ) -> ManufacturerDataField: ... @overload def __get__( self, - obj: AdvertisementWithManufacturerData, - cls: Optional[Type[AdvertisementWithManufacturerData]] = None, + obj: Advertisement, + cls: Optional[Type[Advertisement]] = None, ) -> Optional[Tuple]: ... def __get__( self, - obj: Optional[AdvertisementWithManufacturerData], - cls: Optional[Type[AdvertisementWithManufacturerData]] = None, + obj: Optional[Advertisement], + cls: Optional[Type[Advertisement]] = None, ) -> Union[ManufacturerDataField, Optional[Tuple]]: if obj is None: return self + assert has_manufacturer_data(obj) if self._key not in obj.manufacturer_data.data: return None packed = obj.manufacturer_data.data[self._key] if self._entry_length == len(packed): + assert isinstance(packed, bytes) unpacked = struct.unpack_from(self._format, packed) if self.element_count == 1: unpacked = unpacked[0] @@ -414,6 +417,7 @@ def __get__( unpacked_: List[Optional[Tuple[Any, ...]]] = [None] * entry_count for i in range(entry_count): offset = i * self._entry_length + assert isinstance(packed, bytes) unpacked_[i] = struct.unpack_from(self._format, packed, offset=offset) if self.element_count == 1: val = unpacked_[i] @@ -421,7 +425,9 @@ def __get__( unpacked_[i] = val[0] return tuple(unpacked) - def __set__(self, obj: AdvertisementWithManufacturerData, value: Any) -> None: + def __set__(self, obj: Advertisement, value: Any) -> None: + assert has_manufacturer_data(obj) + if not obj.mutable: raise AttributeError() if isinstance(value, tuple) and ( diff --git a/adafruit_ble/characteristics/float.py b/adafruit_ble/characteristics/float.py index 838f448..74dcc02 100644 --- a/adafruit_ble/characteristics/float.py +++ b/adafruit_ble/characteristics/float.py @@ -76,8 +76,7 @@ def __get__( if obj is None: return self get = super().__get__(obj) - if get is None: - raise RuntimeError + assert get is not None return get[0] # pylint: disable=unsubscriptable-object def __set__(self, obj: Service, value: float) -> None: # type: ignore[override] diff --git a/adafruit_ble/characteristics/int.py b/adafruit_ble/characteristics/int.py index f2109fb..5a71ada 100755 --- a/adafruit_ble/characteristics/int.py +++ b/adafruit_ble/characteristics/int.py @@ -84,8 +84,7 @@ def __get__( if obj is None: return self get = super().__get__(obj) - if get is None: - raise RuntimeError + assert get is not None return get[0] # pylint: disable=unsubscriptable-object def __set__(self, obj: Service, value: int) -> None: # type: ignore[override] diff --git a/examples/ble_color_proximity.py b/examples/ble_color_proximity.py index 124e466..463aeb7 100644 --- a/examples/ble_color_proximity.py +++ b/examples/ble_color_proximity.py @@ -77,8 +77,8 @@ print("Scanning for colors") while not slide_switch.value: for entry in ble.start_scan(AdafruitColor, minimum_rssi=-100, timeout=1): - if entry.rssi is None: - raise RuntimeError + assert isinstance(entry, AdafruitColor) + assert entry.rssi is not None if slide_switch.value: break diff --git a/examples/ble_current_time_service.py b/examples/ble_current_time_service.py index 4f9ef48..58213b3 100644 --- a/examples/ble_current_time_service.py +++ b/examples/ble_current_time_service.py @@ -15,8 +15,7 @@ radio = adafruit_ble.BLERadio() a = SolicitServicesAdvertisement() a.complete_name = "TimePlease" -if not a.solicited_services: - raise RuntimeError +assert a.solicited_services a.solicited_services.append(CurrentTimeService) radio.start_advertising(a) @@ -27,16 +26,14 @@ while radio.connected: for connection in radio.connections: - if connection is None: - raise RuntimeError + assert connection is not None if not connection.paired: connection.pair() print("paired") cts = connection[CurrentTimeService] - if cts is None: - raise RuntimeError + assert isinstance(cts, CurrentTimeService) print(cts.current_time) time.sleep(1) diff --git a/examples/ble_demo_central.py b/examples/ble_demo_central.py index 07bc574..571f5e5 100644 --- a/examples/ble_demo_central.py +++ b/examples/ble_demo_central.py @@ -41,8 +41,7 @@ def scale(value): # See if any existing connections are providing UARTService. if ble.connected: for connection in ble.connections: - if connection is None: - raise RuntimeError + assert connection is not None if UARTService in connection: uart_connection = connection @@ -52,6 +51,7 @@ def scale(value): if not uart_connection: print("Scanning...") for adv in ble.start_scan(ProvideServicesAdvertisement, timeout=5): + assert isinstance(adv, ProvideServicesAdvertisement) if UARTService in adv.services: print("found a UARTService advertisement") uart_connection = ble.connect(adv) @@ -67,8 +67,8 @@ def scale(value): color_packet = ColorPacket(color) try: service = uart_connection[UARTService] - if service is None: - raise RuntimeError + assert isinstance(service, UARTService) + service.write(color_packet.to_bytes()) except OSError: try: diff --git a/examples/ble_device_info_service.py b/examples/ble_device_info_service.py index 0c4ec02..466581e 100644 --- a/examples/ble_device_info_service.py +++ b/examples/ble_device_info_service.py @@ -29,16 +29,14 @@ while radio.connected: for connection in radio.connections: - if connection is None: - raise RuntimeError + assert connection is not None if not connection.paired: connection.pair() print("paired") dis = connection[DeviceInfoService] - if dis is None: - raise RuntimeError + assert isinstance(dis, DeviceInfoService) print(dis.manufacturer) print(dis.model_number) diff --git a/examples/ble_json_central.py b/examples/ble_json_central.py index 5b933e7..16ac489 100644 --- a/examples/ble_json_central.py +++ b/examples/ble_json_central.py @@ -16,6 +16,7 @@ if not connection: print("Scanning for BLE device advertising our sensor service...") for adv in ble.start_scan(ProvideServicesAdvertisement): + assert isinstance(adv, ProvideServicesAdvertisement) if SensorService in adv.services: connection = ble.connect(adv) print("Connected") @@ -24,8 +25,7 @@ if connection and connection.connected: service = connection[SensorService] - if service is None: - raise RuntimeError + assert isinstance(service, SensorService) service.settings = {"unit": "celsius"} # 'fahrenheit' while connection.connected: diff --git a/examples/ble_packet_buffer_client.py b/examples/ble_packet_buffer_client.py index a436ba9..fc21398 100644 --- a/examples/ble_packet_buffer_client.py +++ b/examples/ble_packet_buffer_client.py @@ -23,16 +23,15 @@ ) ): for connection in ble.connections: - if connection is None: - raise RuntimeError + assert connection is not None if PacketBufferService not in connection: continue print("echo") pb = connection[PacketBufferService] - if pb is None: - raise RuntimeError + assert isinstance(pb, PacketBufferService) + pb.write(b"echo") # Returns 0 if nothing was read. packet_len = pb.readinto(buf) @@ -43,6 +42,7 @@ print("disconnected, scanning") for advertisement in ble.start_scan(ProvideServicesAdvertisement, timeout=1): + assert isinstance(advertisement, ProvideServicesAdvertisement) if PacketBufferService not in advertisement.services: continue ble.connect(advertisement) diff --git a/examples/ble_uart_echo_client.py b/examples/ble_uart_echo_client.py index 263369b..b5f7757 100644 --- a/examples/ble_uart_echo_client.py +++ b/examples/ble_uart_echo_client.py @@ -17,16 +17,15 @@ map(lambda conn: conn is not None and UARTService in conn, ble.connections) ): for connection in ble.connections: - if connection is None: - raise RuntimeError + assert connection is not None if UARTService not in connection: continue print("echo") uart = connection[UARTService] - if uart is None: - raise RuntimeError + assert isinstance(uart, UARTService) + uart.write(b"echo") # Returns b'' if nothing was read. one_byte = uart.read(4) @@ -37,6 +36,7 @@ print("disconnected, scanning") for advertisement in ble.start_scan(ProvideServicesAdvertisement, timeout=1): + assert isinstance(advertisement, ProvideServicesAdvertisement) if UARTService not in advertisement.services: continue ble.connect(advertisement)