8000 USB integration refactor for easier testing by puddly · Pull Request #141521 · home-assistant/core · GitHub
[go: up one dir, main page]

Skip to content

USB integration refactor for easier testing #141521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 21 additions & 41 deletions homeassistant/components/usb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@
from functools import partial
import logging
import os
import sys
from typing import Any, overload

from aiousbwatcher import AIOUSBWatcher, InotifyNotAvailableError
from serial.tools.list_ports import comports
from serial.tools.list_ports_common import ListPortInfo
import voluptuous as vol

from homeassistant import config_entries
Expand Down Expand Up @@ -43,7 +40,10 @@

from .const import DOMAIN
from .models import USBDevice
from .utils import usb_device_from_port
from .utils import (
scan_serial_ports,
usb_device_from_port, # noqa: F401
)

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -241,6 +241,13 @@ def _is_matching(device: USBDevice, matcher: USBMatcher | USBCallbackMatcher) ->
return True


async def async_request_scan(hass: HomeAssistant, *, force: bool = False) -> None:
"""Request a serial scan."""
usb_discovery: USBDiscovery = hass.data[DOMAIN]
if not usb_discovery.observer_active or force:
await usb_discovery.async_request_scan()


class USBDiscovery:
"""Manage USB Discovery."""

Expand Down Expand Up @@ -417,38 +424,14 @@ async def _async_process_discovered_usb_device(self, device: USBDevice) -> None:
service_info,
)

async def _async_process_ports(self, ports: Sequence[ListPortInfo]) -> None:
async def _async_process_ports(self, usb_devices: Sequence[USBDevice]) -> None:
"""Process each discovered port."""
_LOGGER.debug("Processing ports: %r", ports)
usb_devices = {
usb_device_from_port(port)
for port in ports
if port.vid is not None or port.pid is not None
}
_LOGGER.debug("USB devices: %r", usb_devices)

# CP2102N chips create *two* serial ports on macOS: `/dev/cu.usbserial-` and
# `/dev/cu.SLAB_USBtoUART*`. The former does not work and we should ignore them.
if sys.platform == "darwin":
silabs_serials = {
dev.serial_number
for dev in usb_devices
if dev.device.startswith("/dev/cu.SLAB_USBtoUART")
}

usb_devices = {
dev
for dev in usb_devices
if dev.serial_number not in silabs_serials
or (
dev.serial_number in silabs_serials
and dev.device.startswith("/dev/cu.SLAB_USBtoUART")
)
}

added_devices = usb_devices - self._last_processed_devices
removed_devices = self._last_processed_devices - usb_devices
self._last_processed_devices = usb_devices
devices = set(usb_devices)
added_devices = devices - self._last_processed_devices
removed_devices = self._last_processed_devices - devices
self._last_processed_devices = devices

_LOGGER.debug(
"Added devices: %r, removed devices: %r", added_devices, removed_devices
Expand All @@ -461,7 +444,7 @@ async def _async_process_ports(self, ports: Sequence[ListPortInfo]) -> None:
except Exception:
_LOGGER.exception("Error in USB port event callback")

for usb_device in usb_devices:
for usb_device in devices:
await self._async_process_discovered_usb_device(usb_device)

@hass_callback
Expand All @@ -480,11 +463,10 @@ def _async_delayed_add_remove_scan(self) -> None:

async def _async_scan_serial(self) -> None:
"""Scan serial ports."""
_LOGGER.debug("Executing comports scan")
_LOGGER.debug("Executing serial port scan")
async with self._scan_lock:
await self._async_process_ports(
await self.hass.async_add_executor_job(comports)
)
ports = await self.hass.async_add_executor_job(scan_serial_ports)
await self._async_process_ports(ports)
if self.initial_scan_done:
return

Expand Down Expand Up @@ -521,9 +503,7 @@ async def websocket_usb_scan(
msg: dict[str, Any],
) -> None:
"""Scan for new usb devices."""
usb_discovery: USBDiscovery = hass.data[DOMAIN]
if not usb_discovery.observer_active:
await usb_discovery.async_request_scan()
await async_request_scan(hass)
connection.send_result(msg["id"])


Expand Down
60 changes: 60 additions & 0 deletions homeassistant/components/usb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

from __future__ import annotations

from collections.abc import Sequence
import dataclasses
import os.path
import sys

from serial.tools.list_ports import comports
from serial.tools.list_ports_common import ListPortInfo

from .models import USBDevice
Expand All @@ -17,3 +23,57 @@
manufacturer=port.manufacturer,
description=port.description,
)


def get_serial_by_id_mapping() -> dict[str, str]:
"""Return a mapping of /dev/serial/by-id to /dev/tty."""
by_id = "/dev/serial/by-id"
if not os.path.isdir(by_id):
return {}

mapping = {}
for entry in os.scandir(by_id):
if not entry.is_symlink():
continue
mapping[entry.path] = os.path.realpath(entry.path)
return mapping

Check warning on line 39 in homeassistant/components/usb/utils.py

View check run for this annotation

Codecov / codecov/patch

homeassistant/components/usb/utils.py#L34-L39

Added lines #L34 - L39 were not covered by tests


def scan_serial_ports() -> Sequence[USBDevice]:
"""Scan for serial ports."""
ports = comports()
serial_by_id_mapping = get_serial_by_id_mapping()

usb_devices = [
usb_device_from_port(port)
for port in ports
if port.vid is not None or port.pid is not None
]

# Update the USB device path to point to the unique serial port, if one exists
for index, device in enumerate(usb_devices):
if device.device in serial_by_id_mapping:
usb_devices[index] = dataclasses.replace(

Check warning on line 56 in homeassistant/components/usb/utils.py

View check run for this annotation

Codecov / codecov/patch

homeassistant/components/usb/utils.py#L56

Added line #L56 was not covered by tests
device, device=serial_by_id_mapping[device.device]
)

# CP2102N chips create *two* serial ports on macOS: `/dev/cu.usbserial-` and
# `/dev/cu.SLAB_USBtoUART*`. The former does not work and we should ignore them.
if sys.platform == "darwin":
silabs_serials = {
dev.serial_number
for dev in usb_devices
if dev.device.startswith("/dev/cu.SLAB_USBtoUART")
}

usb_devices = [
dev
for dev in usb_devices
if dev.serial_number not in silabs_serials
or (
dev.serial_number in silabs_serials
and dev.device.startswith("/dev/cu.SLAB_USBtoUART")
)
]

return usb_devices
Loading
0