<
8000
/svg>@@ -60,6 +60,12 @@ def _server_irq(event, data):
60
60
def _server_shutdown ():
61
61
global _registered_characteristics
62
62
_registered_characteristics = {}
63
+ if hasattr (BaseCharacteristic , "_capture_task" ):
64
+ BaseCharacteristic ._capture_task .cancel ()
65
+ del BaseCharacteristic ._capture_queue
66
+ del BaseCharacteristic ._capture_write_event
67
+ del BaseCharacteristic ._capture_consumed_event
68
+ del BaseCharacteristic ._capture_task
63
69
64
70
65
71
register_irq_handler (_server_irq , _server_shutdown )
@@ -97,6 +103,42 @@ def write(self, data, send_update=False):
97
103
else :
98
104
ble .gatts_write (self ._value_handle , data , send_update )
99
105
106
+ # When the a capture-enabled characteristic is created, create the
107
+ # necessary events (if not already created).
108
+ @staticmethod
109
+ def _init_capture ():
110
+ if hasattr (BaseCharacteristic , "_capture_queue" ):
111
+ return
112
+
113
+ BaseCharacteristic ._capture_queue = deque ((), _WRITE_CAPTURE_QUEUE_LIMIT )
114
+ BaseCharacteristic ._capture_write_event = asyncio .ThreadSafeFlag ()
115
+ BaseCharacteristic ._capture_consumed_event = asyncio .ThreadSafeFlag ()
116
+ BaseCharacteristic ._capture_task = asyncio .create_task (
117
+ BaseCharacteristic ._run_capture_task ()
118
+ )
119
+
120
+ # Monitor the shared queue for incoming characteristic writes and forward
121
+ # them sequentially to the individual characteristic events.
122
+ @staticmethod
123
+ async def _run_capture_task ():
124
+ write = BaseCharacteristic ._capture_write_event
125
+ consumed = BaseCharacteristic ._capture_consumed_event
126
+ q = BaseCharacteristic ._capture_queue
127
+
128
+ while True :
129
+ if len (q ):
130
+ conn , data , characteristic = q .popleft ()
131
+ # Let the characteristic waiting in `written()` know that it
132
+ # can proceed.
133
+ characteristic ._write_data = (conn , data )
134
+ characteristic ._write_event .set ()
135
+ # Wait for the characteristic to complete `written()` before
136
+ # continuing.
137
+ await consumed .wait ()
138
+
139
+ if not len (q ):
140
+ await write .wait ()
141
+
100
142
# Wait for a write on this characteristic. Returns the connection that did
101
143
# the write, or a tuple of (connection, value) if capture is enabled for
102
144
# this characteristics.
@@ -105,17 +147,27 @@ async def written(self, timeout_ms=None):
105
147
# Not a writable characteristic.
106
148
return
107
149
108
- # If the queue is empty, then we need to wait. However, if the queue
109
- # has a single item, we also need to do a no-op wait in order to
110
- # clear the event flag (because the queue will become empty and
111
- # therefore the event should be cleared).
112
- if len (self ._write_queue ) <= 1 :
113
- with DeviceTimeout (None , timeout_ms ):
114
- await self ._write_event .wait ()
150
+ # If no write has been seen then we need to wait. If the event has
151
+ # already been set this will clear the event and continue
152
+ # immediately. In regular mode, this is set by the write IRQ
153
+ # directly (in _remote_write). In capture mode, this is set when it's
154
+ # our turn by _capture_task.
155
+ with DeviceTimeout (None , timeout_ms ):
156
+ await self ._write_event .wait ()
157
+
158
+ # Return the write data and clear the stored copy.
159
+ # In default usage this will be just the connection handle.
160
+ # In capture mode this will be a tuple of (connection_handle, received_data)
161
+ data = self ._write_data
162
+ self ._write_data = None
115
163
116
- # Either we started > 1 item, or the wait completed successfully, return
117
- # the front of the queue.
118
- return self ._write_queue .popleft ()
164
+ if self .flags & _FLAG_WRITE_CAPTURE :
165
+ # Notify the shared queue monitor that the event has been consumed
166
+ # by the caller to `written()` and another characteristic can now
167
+ # proceed.
168
+ BaseCharacteristic ._capture_consumed_event .set ()
169
+
170
+ return data
119
171
120
172
def on_read (self , connection ):
121
173
return 0
@@ -124,27 +176,20 @@ def _remote_write(conn_handle, value_handle):
124
176
if characteristic := _registered_characteristics .get (value_handle , None ):
125
177
# If we've gone from empty to one item, then wake something
126
178
# blocking on `await char.written()`.
127
- wake = len (characteristic ._write_queue ) == 0
128
179
129
180
conn = DeviceConnection ._connected .get (conn_handle , None )
130
- q = characteristic ._write_queue
131
181
132
182
if characteristic .flags & _FLAG_WRITE_CAPTURE :
133
- # For capture, we append both the connection and the written
134
- # value to the queue. The deque will enforce the max queue len.
183
+ # For capture, we append the connection and the written value
184
+ # value to the shared queue along with the matching characteristic object.
185
+ # The deque will enforce the max queue len.
135
186
data = characteristic .read ()
136
- q .append ((conn , data ))
187
+ BaseCharacteristic ._capture_queue .append ((conn , data , characteristic ))
188
+ BaseCharacteristic ._capture_write_event .set ()
137
189
else :
138
- # Use the queue as a single slot -- it has max length of 1,
139
- # so if there's an existing item it will be replaced.
140
- q .append (conn )
141
-
142
- if wake :
143
- # Queue is now non-empty. If something is waiting, it will be
144
- # worken. If something isn't waiting right now, then a future
145
- # caller to `await char.written()` will see the queue is
146
- # non-empty, and wait on the event if it's going to empty the
147
- # queue.
190
+ # Store the write connection handle to be later used to retrieve the data
191
+ # then set event to handle in written() task.
192
+ characteristic ._write_data = conn
148
193
characteristic ._write_event .set ()
149
194
150
195
def _remote_read (conn_handle , value_handle ):
@@ -178,10 +223,15 @@ def __init__(
178
223
if capture :
179
224
# Capture means that we keep track of all writes, and capture
180
225
# their values (and connection) in a queue. Otherwise we just
181
- # track the most recent connection .
226
+ # track the connection of the most recent write .
182
227
flags |= _FLAG_WRITE_CAPTURE
228
+ BaseCharacteristic ._init_capture ()
229
+
230
+ # Set when this characteristic has a value waiting in self._write_data.
183
231
self ._write_event = asyncio .ThreadSafeFlag ()
184
- self ._write_queue = deque ((), _WRITE_CAPTURE_QUEUE_LIMIT if capture else 1 )
232
+ # The connection of the most recent write, or a tuple of
233
+ # (connection, data) if capture is enabled.
234
+ self ._write_data = None
185
235
if notify :
186
236
flags |= _FLAG_NOTIFY
187
237
if indicate :
@@ -263,7 +313,7 @@ def __init__(self, characteristic, uuid, read=False, write=False, initial=None):
263
313
flags |= _FLAG_DESC_READ
264
314
if write :
265
315
self ._write_event = asyncio .ThreadSafeFlag ()
266
- self ._write_queue = deque ((), 1 )
316
+ self ._write_data = None
267
317
flags |= _FLAG_DESC_WRITE
268
318
269
319
self .uuid = uuid
0 commit comments