8000 examples/bluetooth: Add basic BLE peripheral examples. by jimmo · Pull Request #5223 · micropython/micropython · GitHub
[go: up one dir, main page]

Skip to content

examples/bluetooth: Add basic BLE peripheral examples. #5223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions examples/bluetooth/ble_advertising.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Helpers for generating BLE advertising payloads.

from micropython import const
import struct

# Advertising payloads are repeated:
# 1 byte type (see constants below)
# 1 byte data length (N + 1)
# N bytes type-specific data

_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_APPEARANCE = const(0x19)

# Generate a payload to be passed to gap_advertise(adv_data=...).
def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0):
payload = bytearray()

def _append(adv_type, value):
nonlocal payload
payload += struct.pack('BB', len(value) + 1, adv_type) + value

_append(_ADV_TYPE_FLAGS, struct.pack('B', (0x01 if limited_disc else 0x02) + (0x00 if br_edr else 0x04)))

if name:
_append(_ADV_TYPE_NAME, name)

if services:
for uuid in services:
# TODO: Support bluetooth.UUID class.
_append(_ADV_TYPE_UUID16_COMPLETE, struct.pack('<h', uuid))

# See org.bluetooth.characteristic.gap.appearance.xml
_append(_ADV_TYPE_APPEARANCE, struct.pack('<h', appearance))

return payload
76 changes: 76 additions & 0 deletions examples/bluetooth/ble_temperature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# This example demonstrates a simple temperature sensor peripheral.
#
# The sensor's local value updates every second, and it will notify
# any connected central every 10 seconds.

import bluetooth
import random
import struct
import time
from ble_advertising import advertising_payload

from micropython import const
_IRQ_CENTRAL_CONNECT = const(1 << 0)
_IRQ_CENTRAL_DISCONNECT = const(1 << 1)

# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_TEMP_CHAR = (bluetooth.UUID(0x2A6E), bluetooth.FLAG_READ|bluetooth.FLAG_NOTIFY,)
_ENV_SENSE_SERVICE = (_ENV_SENSE_UUID, (_TEMP_CHAR,),)

# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)

class BLETemperature:
def __init__(self, ble, name='mpy-temp'):
self._ble = ble
self._ble.active(True)
self._ble.irq(handler=self._irq)
((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
self._connections = set()
self._payload = advertising_payload(name=name, services=[0x181A], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER)
self._advertise()

def _irq(self, event, data):
# Track connections so we can send notifications.
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _, = data
self._connections.add(conn_handle)
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _, = data
self._connections.remove(conn_handle)
# Start advertising again to allow a new connection.
self._advertise()

def set_temperature(self, temp_deg_c, notify=False):
# Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
# Write the local value, ready for a central to read.
self._ble.gatts_write(self._handle, struct.pack('<h', int(temp_deg_c * 100)))
if notify:
for conn_handle in self._connections:
# Notify connected centrals to issue a read.
self._ble.gatts_notify(conn_handle, self._handle)

def _advertise(self, interval_us=500000):
self._ble.gap_advertise(interval_us, adv_data=self._payload)


def demo():
ble = bluetooth.BLE()
temp = BLETemperature(ble)

t = 25
i = 0

while True:
# Write every second, notify every 10 seconds.
i = (i + 1) % 10
temp.set_temperature(t, notify=i == 0)
# Random walk the temperature.
t += random.uniform(-0.5, 0.5)
time.sleep_ms(1000)


if __name__ == '__main__':
demo()
78 changes: 78 additions & 0 deletions examples/bluetooth/ble_uart_peripheral.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# This example demonstrates a peripheral implementing the Nordic UART Service (NUS).

import bluetooth
from ble_advertising import advertising_payload

from micropython import const
_IRQ_CENTRAL_CONNECT = const(1 << 0)
_IRQ_CENTRAL_DISCONNECT = const(1 << 1)
_IRQ_GATTS_WRITE = const(1 << 2)

_UART_UUID = bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E')
_UART_TX = (bluetooth.UUID('6E400003-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_NOTIFY,)
_UART_RX = (bluetooth.UUID('6E400002-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_WRITE,)
_UART_SERVICE = (_UART_UUID, (_UART_TX, _UART_RX,),)

# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_COMPUTER = const(128)

class BLEUART:
def __init__(self, ble, name='mpy-uart', rxbuf=100):
self._ble = ble
self._ble.active(True)
self._ble.irq(handler=self._irq)
((self._tx_handle, self._rx_handle,),) = self._ble.gatts_register_services((_UART_SERVICE,))
# Increase the size of the rx buffer.
self._ble.gatts_write(self._rx_handle, bytes(rxbuf))
self._connections = set()
self._rx_buffer = bytearray()
self._handler = None
self._payload = advertising_payload(name=name, appearance=_ADV_APPEARANCE_GENERIC_COMPUTER)
self._advertise()

def irq(self, handler):
self._handler = handler

def _irq(self, event, data):
# Track connections so we can send notifications.
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _, = data
self._connections.add(conn_handle)
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _, = data
if conn_handle in self._connections:
self._connections.remove(conn_handle)
# Start advertising again to allow a new connection.
self._advertise()
elif event == _IRQ_GATTS_WRITE:
conn_handle, value_handle, = data
if conn_handle in self._connections and value_handle == self._rx_handle:
self._rx_buffer += self._ble.gatts_read(self._rx_handle)
if self._handler:
self._handler()

def any(self):
return len(self._rx_buffer)

def read(self, sz=None):
if not sz:
sz = len(self._rx_buffer)
result = self._rx_buffer[0:sz]
self._rx_buffer = self._rx_buffer[sz:]
return result

def write(self, data):
for conn_handle in self._connections:
self._ble.gatts_notify(conn_handle, self._tx_handle, data)

def close(self):
for conn_handle in self._connections:
self._ble.gap_disconnect(conn_handle)
self._connections.clear()

def _advertise(self, interval_us=500000):
self._ble.gap_advertise(interval_us, adv_data=self._payload)


# ble = bluetooth.BLE()
# uart = BLEUART(ble)
80 changes: 80 additions & 0 deletions examples/bluetooth/ble_uart_repl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Proof-of-concept of a REPL over BLE UART.
#
# Tested with the Adafruit Bluefruit app on Android.
# Set the EoL characters to \r\n.

import bluetooth
import io
import os
import micropython
import machine

from ble_uart_peripheral import BLEUART

_MP_STREAM_POLL = const(3)
_MP_STREAM_POLL_R 8000 D = const(0x0001)

# TODO: Remove this when STM32 gets machine.Timer.
if hasattr(machine, 'Timer'):
_timer = machine.Timer(-1)
else:
_timer = None

# Batch writes into 50ms intervals.
def schedule_in(handler, delay_ms):
def _wrap(_arg):
handler()
if _timer:
_timer.init(mode=machine.Timer.ONE_SHOT, period=delay_ms, callback=_wrap)
else:
micropython.schedule(_wrap, None)

# Simple buffering stream to support the dupterm requirements.
class BLEUARTStream(io.IOBase):
def __init__(self, uart):
self._uart = uart
self._tx_buf = bytearray()
self._uart.irq(self._on_rx)

def _on_rx(self):
# Needed for ESP32.
if hasattr(os, 'dupterm_notify'):
os.dupterm_notify(None)

def read(self, sz=None):
return self._uart.read(sz)

def readinto(self, buf):
avail = self._uart.read(len(buf))
if not avail:
return None
for i in range(len(avail)):
buf[i] = avail[i]
return len(avail)

def ioctl(self, op, arg):
if op == _MP_STREAM_POLL:
if self._uart.any():
return _MP_STREAM_POLL_RD
return 0

def _flush(self):
data = self._tx_buf[0:100]
self._tx_buf = self._tx_buf[100:]
self._uart.write(data)
if self._tx_buf:
schedule_in(self._flush, 50)

def write(self, buf):
empty = not self._tx_buf
self._tx_buf += buf
if empty:
schedule_in(self._flush, 50)


def start():
ble = bluetooth.BLE()
uart = BLEUART(ble, name='mpy-repl')
stream = BLEUARTStream(uart)

os.dupterm(stream)
0