8000 examples/bluetooth: Add basic BLE peripheral examples. · micropython/micropython@25a228a · GitHub
[go: up one dir, main page]

Skip to content

Commit 25a228a

Browse files
jimmodpgeorge
authored andcommitted
examples/bluetooth: Add basic BLE peripheral examples.
Consisting of: - ble_advertising.py -- helper to generate advertising payload. - ble_temperature.py -- simple temperature device. - ble_uart_periperhal.py -- BLE UART wrapper. - ble_uart_repl.py -- dupterm-compatible uart.
1 parent ebf8332 commit 25a228a

File tree

4 files changed

+271
-0
lines changed

4 files changed

+271
-0
lines changed

examples/bluetooth/ble_advertising.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Helpers for generating BLE advertising payloads.
2+
3+
from micropython import const
4+
import struct
5+
6+
# Advertising payloads are repeated packets of the following form:
7+
# 1 byte data length (N + 1)
8+
# 1 byte type (see constants below)
9+
# N bytes type-specific data
10+
11+
_ADV_TYPE_FLAGS = const(0x01)
12+
_ADV_TYPE_NAME = const(0x09)
13+
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
14+
_ADV_TYPE_APPEARANCE = const(0x19)
15+
16+
# Generate a payload to be passed to gap_advertise(adv_data=...).
17+
def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0):
18+
payload = bytearray()
19+
20+
def _append(adv_type, value):
21+
nonlocal payload
22+
payload += struct.pack('BB', len(value) + 1, adv_type) + value
23+
24+
_append(_ADV_TYPE_FLAGS, struct.pack('B', (0x01 if limited_disc else 0x02) + (0x00 if br_edr else 0x04)))
25+
26+
if name:
27+
_append(_ADV_TYPE_NAME, name)
28+
29+
if services:
30+
for uuid in services:
31+
# TODO: Support bluetooth.UUID class.
32+
_append(_ADV_TYPE_UUID16_COMPLETE, struct.pack('<h', uuid))
33+
34+
# See org.bluetooth.characteristic.gap.appearance.xml
35+
_append(_ADV_TYPE_APPEARANCE, struct.pack('<h', appearance))
36+
37+
return payload

examples/bluetooth/ble_temperature.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# This example demonstrates a simple temperature sensor peripheral.
2+
#
3+
# The sensor's local value updates every second, and it will notify
4+
# any connected central every 10 seconds.
5+
6+
import bluetooth
7+
import random
8+
import struct
9+
import time
10+
from ble_advertising import advertising_payload
11+
12+
from micropython import const
13+
_IRQ_CENTRAL_CONNECT = const(1 << 0)
14+
_IRQ_CENTRAL_DISCONNECT = const(1 << 1)
15+
16+
# org.bluetooth.service.environmental_sensing
17+
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
18+
# org.bluetooth.characteristic.temperature
19+
_TEMP_CHAR = (bluetooth.UUID(0x2A6E), bluetooth.FLAG_READ|bluetooth.FLAG_NOTIFY,)
20+
_ENV_SENSE_SERVICE = (_ENV_SENSE_UUID, (_TEMP_CHAR,),)
21+
22+
# org.bluetooth.characteristic.gap.appearance.xml
23+
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
24+
25+
class BLETemperature:
26+
def __init__(self, ble, name='mpy-temp'):
27+
self._ble = ble
28+
self._ble.active(True)
29+
self._ble.irq(handler=self._irq)
30+
((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
31+
self._connections = set()
32+
self._payload = advertising_payload(name=name, services=[0x181A], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER)
33+
self._advertise()
34+
35+
def _irq(self, event, data):
36+
# Track connections so we can send notifications.
37+
if event == _IRQ_CENTRAL_CONNECT:
38+
conn_handle, _, _, = data
39+
self._connections.add(conn_handle)
40+
elif event == _IRQ_CENTRAL_DISCONNECT:
41+
conn_handle, _, _, = data
42+
self._connections.remove(conn_handle)
43+
# Start advertising again to allow a new connection.
44+
self._advertise()
45+
46+
def set_temperature(self, temp_deg_c, notify=False):
47+
# Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
48+
# Write the local value, ready for a central to read.
49+
self._ble.gatts_write(self._handle, struct.pack('<h', int(temp_deg_c * 100)))
50+
if notify:
51+
for conn_handle in self._connections:
52+
# Notify connected centrals to issue a read.
53+
self._ble.gatts_notify(conn_handle, self._handle)
54+
55+
def _advertise(self, interval_us=500000):
56+
self._ble.gap_advertise(interval_us, adv_data=self._payload)
57+
58+
59+
def demo():
60+
ble = bluetooth.BLE()
61+
temp = BLETemperature(ble)
62+
63+
t = 25
64+
i = 0
65+
66+
while True:
67+
# Write every second, notify every 10 seconds.
68+
i = (i + 1) % 10
69+
temp.set_temperature(t, notify=i == 0)
70+
# Random walk the temperature.
71+
t += random.uniform(-0.5, 0.5)
72+
time.sleep_ms(1000)
73+
74+
75+
if __name__ == '__main__':
76+
demo()
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# This example demonstrates a peripheral implementing the Nordic UART Service (NUS).
2+
3+
import bluetooth
4+
from ble_advertising import advertising_payload
5+
6+
from micropython import const
7+
_IRQ_CENTRAL_CONNECT = const(1 << 0)
8+
_IRQ_CENTRAL_DISCONNECT = const(1 << 1)
9+
_IRQ_GATTS_WRITE = const(1 << 2)
10+
11+
_UART_UUID = bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E')
12+
_UART_TX = (bluetooth.UUID('6E400003-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_NOTIFY,)
13+
_UART_RX = (bluetooth.UUID('6E400002-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_WRITE,)
14+
_UART_SERVICE = (_UART_UUID, (_UART_TX, _UART_RX,),)
15+
16+
# org.bluetooth.characteristic.gap.appearance.xml
17+
_ADV_APPEARANCE_GENERIC_COMPUTER = const(128)
18+
19+
class BLEUART:
20+
def __init__(self, ble, name='mpy-uart', rxbuf=100):
21+
self._ble = ble
22+
self._ble.active(True)
23+
self._ble.irq(handler=self._irq)
24+
((self._tx_handle, self._rx_handle,),) = self._ble.gatts_register_services((_UART_SERVICE,))
25+
# Increase the size of the rx buffer.
26+
self._ble.gatts_write(self._rx_handle, bytes(rxbuf))
27+
self._connections = set()
28+
self._rx_buffer = bytearray()
29+
self._handler = None
30+
self._payload = advertising_payload(name=name, appearance=_ADV_APPEARANCE_GENERIC_COMPUTER)
31+
self._advertise()
32+
33+
def irq(self, handler):
34+
self._handler = handler
35+
36+
def _irq(self, event, data):
37+
# Track connections so we can send notifications.
38+
if event == _IRQ_CENTRAL_CONNECT:
39+
conn_handle, _, _, = data
40+
self._connections.add(conn_handle)
41+
elif event == _IRQ_CENTRAL_DISCONNECT:
42+
conn_handle, _, _, = data
43+
if conn_handle in self._connections:
44+
self._connections.remove(conn_handle)
45+
# Start advertising again to allow a new connection.
46+
self._advertise()
47+
elif event == _IRQ_GATTS_WRITE:
48+
conn_handle, value_handle, = data
49+
if conn_handle in self._connections and value_handle == self._rx_handle:
50+
self._rx_buffer += self._ble.gatts_read(self._rx_handle)
51+
if self._handler:
52+
self._handler()
53+
54+
def any(self):
55+
return len(self._rx_buffer)
56+
57+
def read(self, sz=None):
58+
if not sz:
59+
sz = len(self._rx_buffer)
60+
result = self._rx_buffer[0:sz]
61+
self._rx_buffer = self._rx_buffer[sz:]
62+
return result
63+
64+
def write(self, data):
65+
for conn_handle in self._connections:
66+
self._ble.gatts_notify(conn_handle, self._tx_handle, data)
67+
68+
def close(self):
69+
for conn_handle in self._connections:
70+
self._ble.gap_disconnect(conn_handle)
71+
self._connections.clear()
72+
73+
def _advertise(self, interval_us=500000):
74+
self._ble.gap_advertise(interval_us, adv_data=self._payload)
75+
76+
77+
# ble = bluetooth.BLE()
78+
# uart = BLEUART(ble)

examples/bluetooth/ble_uart_repl.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Proof-of-concept of a REPL over BLE UART.
2+
#
3+
# Tested with the Adafruit Bluefruit app on Android.
4+
# Set the EoL characters to \r\n.
5+
6+
import bluetooth
7+
import io
8+
import os
9+
import micropython
10+
import machine
11+
12+
from ble_uart_peripheral import BLEUART
13+
14+
_MP_STREAM_POLL = const(3)
15+
_MP_STREAM_POLL_RD = const(0x0001)
16+
17+
# TODO: Remove this when STM32 gets machine.Timer.
18+
if hasattr(machine, 'Timer'):
19+
_timer = machine.Timer(-1)
20+
else:
21+
_timer = None
22+
23+
# Batch writes into 50ms intervals.
24+
def schedule_in(handler, delay_ms):
25+
def _wrap(_arg):
26+
handler()
27+
if _timer:
28+
_timer.init(mode=machine.Timer.ONE_SHOT, period=delay_ms, callback=_wrap)
29+
else:
30+
micropython.schedule(_wrap, None)
31+
32+
# Simple buffering stream to support the dupterm requirements.
33+
class BLEUARTStream(io.IOBase):
34+
def __init__(self, uart):
35+
self._uart = uart
36+
self._tx_buf = bytearray()
37+
self._uart.irq(self._on_rx)
38+
39+
def _on_rx(self):
40+
# Needed for ESP32.
41+
if hasattr(os, 'dupterm_notify'):
42+
os.dupterm_notify(None)
43+
44+
def read(self, sz=None):
45+
return self._uart.read(sz)
46+
47+
def readinto(self, buf):
48+
avail = self._uart.read(len(buf))
49+
if not avail:
50+
return None
51+
for i in range(len(avail)):
52+
buf[i] = avail[i]
53+
return len(avail)
54+
55+
def ioctl(self, op, arg):
56+
if op == _MP_STREAM_POLL:
57+
if self._uart.any():
58+
return _MP_STREAM_POLL_RD
59+
return 0
60+
61+
def _flush(self):
62+
data = self._tx_buf[0:100]
63+
self._tx_buf = self._tx_buf[100:]
64+
self._uart.write(data)
65+
if self._tx_buf:
66+
schedule_in(self._flush, 50)
67+
68+
def write(self, buf):
69+
empty = not self._tx_buf
70+
self._tx_buf += buf
71+
if empty:
72+
schedule_in(self._flush, 50)
73+
74+
75+
def start():
76+
ble = bluetooth.BLE()
77+
uart = BLEUART(ble, name='mpy-repl')
78+
stream = BLEUARTStream(uart)
79+
80+
os.dupterm(stream)

0 commit comments

Comments
 (0)
0