diff --git a/adafruit_ble/advertising.py b/adafruit_ble/advertising.py index e507061..0e2d831 100644 --- a/adafruit_ble/advertising.py +++ b/adafruit_ble/advertising.py @@ -51,6 +51,10 @@ class AdvertisingPacket: """Incomplete list of 128 bit service UUIDs.""" ALL_128_BIT_SERVICE_UUIDS = 0x07 """Complete list of 128 bit service UUIDs.""" + SOLICITED_16_BIT_SERVICE_UUIDS = 0x14 + """List of 16 bit service UUIDs solicited by a peripheral.""" + SOLICITED_128_BIT_SERVICE_UUIDS = 0x15 + """List of 128 bit service UUIDs solicited by a peripheral.""" SHORT_LOCAL_NAME = 0x08 """Short local device name (shortened to fit).""" COMPLETE_LOCAL_NAME = 0x09 @@ -131,84 +135,76 @@ def get(self, element_type, default=None): except KeyError: return default + @property + def length(self): + """Current number of bytes in packet.""" + return len(self._packet_bytes) + @property def bytes_remaining(self): """Number of bytes still available for use in the packet.""" - return self._max_length - len(self._packet_bytes) + return self._max_length - self.length def _check_length(self): - if len(self._packet_bytes) > self._max_length: + if self.length > self._max_length: raise IndexError("Advertising data too long") + def add_flags(self, flags=(FLAG_GENERAL_DISCOVERY | FLAG_LE_ONLY)): + """Add advertising flags.""" + self.add_field(self.FLAGS, struct.pack("= len(name_bytes): - packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME, name_bytes) + self._packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME, name_bytes) else: - packet.add_field(AdvertisingPacket.SHORT_LOCAL_NAME, name_bytes[:bytes_available]) + self._packet.add_field(AdvertisingPacket.SHORT_LOCAL_NAME, name_bytes[:bytes_available]) self._scan_response_packet = AdvertisingPacket() try: self._scan_response_packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME, @@ -216,12 +212,23 @@ def __init__(self, peripheral, *, tx_power=0): except IndexError: raise IndexError("Name too long") - self._advertising_data_packet = packet + def add_uuids(self, uuids, field_type_16_bit_uuids, field_type_128_bit_uuids): + """Add 16-bit and 128-bit uuids to the packet, using the given field types.""" + concatenated_16_bit_uuids = b''.join( + struct.pack(" 1: + raise ValueError("Only one 128 bit UUID will fit") + if uuids_128_bits: + self._packet.add_field(field_type_128_bit_uuids, uuids_128_bits[0].uuid128) @property def advertising_data_bytes(self): """The raw bytes for the initial advertising data packet.""" - return self._advertising_data_packet.packet_bytes + return self._packet.packet_bytes @property def scan_response_bytes(self): @@ -229,3 +236,41 @@ def scan_response_bytes(self): if self._scan_response_packet: return self._scan_response_packet.packet_bytes return None + + +class ServerAdvertisement(Advertisement): + """Build an advertisement for a peripheral's services. + + There is room in the packet for only one 128-bit UUID. Giving UUIDs in the scan response + is not yet implemented. + + :param Peripheral peripheral: the Peripheral to advertise. Use its services and name. + :param int tx_power: transmit power in dBm at 0 meters (8 bit signed value). Default 0 dBm + """ + + def __init__(self, peripheral, *, tx_power=0): + super().__init__() + uuids = [service.uuid for service in peripheral.services if not service.secondary] + self.add_uuids(uuids, + AdvertisingPacket.ALL_16_BIT_SERVICE_UUIDS, + AdvertisingPacket.ALL_128_BIT_SERVICE_UUIDS) + self.add_name(peripheral.name) + + +class SolicitationAdvertisement(Advertisement): + """Build an advertisement for a peripheral to solicit one or more services from a central. + + There is room in the packet for only one 128-bit UUID. Giving UUIDs in the scan response + is not yet implemented. + + :param string name: Name to use in advertisement. + :param iterable service_uuids: One or more services requested from a central + :param int tx_power: transmit power in dBm at 0 meters (8 bit signed value). Default 0 dBm. + """ + + def __init__(self, name, service_uuids, *, tx_power=0): + super().__init__() + self.add_uuids(service_uuids, + AdvertisingPacket.SOLICITED_16_BIT_SERVICE_UUIDS, + AdvertisingPacket.SOLICITED_128_BIT_SERVICE_UUIDS) + self.add_name(name) diff --git a/adafruit_ble/current_time_client.py b/adafruit_ble/current_time_client.py new file mode 100644 index 0000000..8ea5ffe --- /dev/null +++ b/adafruit_ble/current_time_client.py @@ -0,0 +1,148 @@ + # The MIT License (MIT) +# +# Copyright (c) 2019 Dan Halbert for Adafruit Industries +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +""" +`adafruit_ble.current_time_client` +==================================================== + +Connect to a Current Time Service, as a peripheral. + +* Author(s): Dan Halbert for Adafruit Industries + +""" +import struct +import time + +from bleio import Peripheral, UUID +from .advertising import SolicitationAdvertisement + +class CurrentTimeClient: + """ + Set up a peripheral that solicits centrals for Current Time Service. + + :param str name: Name to advertise for server. If None, use default Advertisement name. + + Example:: + + from adafruit_ble.current_time_client import CurrentTimeClient + import time + + cts_client = CurrentTimeClient() + cts_client.start_advertising() + while not cts_client.connected: + pass + # The first time a property is read, the client + # will do discovery and pairing. + while True: + print(cts_client.current_time) + time.sleep(5) + + To try the example above, open Settings->Bluetooth on your iOS device. + After the program starts advertising, ``CIRCUITPYxxxx` will show up as a Bluetooth + device for possible connection. Tap it, and then accept the pairing request. + Then the time should print. + """ + + CTS_UUID = UUID(0x1805) + CURRENT_TIME_UUID = UUID(0x2A2B) + LOCAL_TIME_INFORMATION_UUID = UUID(0x2A0F) + + def __init__(self, name=None, tx_power=0): + self._periph = Peripheral(name=name) + self._advertisement = SolicitationAdvertisement(self._periph.name, + (self.CTS_UUID,), tx_power=tx_power) + self._current_time_char = self._local_time_char = None + + + def start_advertising(self): + """Start advertising to solicit a central that supports Current Time Service.""" + self._periph.start_advertising(self._advertisement.advertising_data_bytes, + scan_response=self._advertisement.scan_response_bytes) + + def stop_advertising(self): + """Stop advertising the service.""" + self._periph.stop_advertising() + + @property + def connected(self): + """True if a central connected to this peripheral.""" + return self._periph.connected + + def disconnect(self): + """Disconnect from central.""" + self._periph.disconnect() + + def _check_connected(self): + if not self.connected: + raise OSError("Not connected") + # Do discovery and pairing if not already done. + if not self._current_time_char: + self._discover() + self._periph.pair() + + def _discover(self): + """Discover service information.""" + services = self._periph.discover_remote_services((self.CTS_UUID,)) + if not services: + raise OSError("Unable to discover service") + for characteristic in services[0].characteristics: + if characteristic.uuid == self.CURRENT_TIME_UUID: + self._current_time_char = characteristic + elif characteristic.uuid == self.LOCAL_TIME_INFORMATION_UUID: + self._local_time_char = characteristic + if not self._current_time_char or not self._local_time_char: + raise OSError("Remote service missing needed characteristic") + + @property + def current_time(self): + """Get a tuple describing the current time from the server: + (year, month, day, hour, minute, second, weekday, subsecond, adjust_reason) + """ + self._check_connected() + if self._current_time_char: + # year, month, day, hour, minute, second, weekday, subsecond, adjust_reason + values = struct.unpack('