@@ -48,15 +48,15 @@ def base36(n):
48
48
49
49
50
50
class CCAPI (RootAPIMixin ):
51
- def __init__ (self , nid , program , cleanup_callback ):
51
+ def __init__ (self , nid , program , sender ):
52
52
self ._id = nid
53
53
self ._task_autoid = 1
54
- self ._cmd = asyncio .Queue (maxsize = 1 )
55
54
self ._result_locks = {}
56
55
self ._result_values = {}
57
56
self ._result_queues = {}
58
57
self ._event_to_tids = {}
59
58
self ._tid_to_event = {}
59
+ self ._sender = sender
60
60
61
61
self .colors = ColorsAPI (self , 'colors' )
62
62
self .commands = CommandsAPI (self , 'commands' )
@@ -81,27 +81,32 @@ def __init__(self, nid, program, cleanup_callback):
81
81
82
82
async def prog_wrap ():
83
83
err = None
84
+ cancel = False
84
85
try :
85
86
await program (self )
86
87
except asyncio .CancelledError :
87
88
print ('program {} cancelled' .format (self ._id ))
88
89
print_exc ()
89
90
err = 'program has been cancelled'
91
+ cancel = True
90
92
except Exception as e :
91
93
print ('program {} crashed: {} {}' .format (self ._id , type (e ), e ))
92
94
print_exc ()
93
95
err = type (e ).__name__ + ': ' + str (e )
94
96
else :
95
97
print ('program {} finished' .format (self ._id ))
96
98
finally :
97
- c = { 'action' : 'close' }
98
- if err is not None :
99
- c [ 'error' ] = err
100
- await self . _cmd . put ( c )
101
- cleanup_callback ( )
99
+ if not cancel :
100
+ c = { 'action' : 'close' }
101
+ if err is not None :
102
+ c [ 'error' ] = err
103
+ await self . _sender ( c )
102
104
103
105
self ._task = asyncio .create_task (prog_wrap ())
104
106
107
+ def cancel (self ):
108
+ self ._task .cancel ()
109
+
105
110
def _new_task_id (self ) -> str :
106
111
task_id = base36 (self ._task_autoid )
107
112
self ._task_autoid += 1
@@ -110,7 +115,7 @@ def _new_task_id(self) -> str:
110
115
async def _eval (self , lua_code , immediate = False ):
111
116
task_id = self ._new_task_id ()
112
117
self ._result_locks [task_id ] = asyncio .Event ()
113
- await self ._cmd . put ({
118
+ await self ._sender ({
114
119
'action' : 'task' ,
115
120
'task_id' : task_id ,
116
121
'code' : lua_code ,
@@ -133,7 +138,7 @@ async def _start_queue(self, event):
133
138
self ._result_queues [task_id ] = asyncio .Queue ()
134
139
es = self ._event_to_tids .setdefault (event , set ())
135
140
if not es :
136
- await self ._cmd . put ({
141
+ await self ._sender ({
137
142
'action' : 'sub' ,
138
143
'event' : event ,
139
144
})
@@ -147,7 +152,7 @@ async def _stop_queue(self, task_id):
147
152
del self ._tid_to_event [task_id ]
148
153
self ._event_to_tids [event ].discard (task_id )
149
154
if not self ._event_to_tids [event ]:
150
- await self ._cmd . put ({
155
+ await self ._sender ({
151
156
'action' : 'unsub' ,
152
157
'event' : event ,
153
158
})
@@ -166,16 +171,6 @@ async def _create_temp_object(self, create_expr: str, finalizer_template: str =
166
171
167
172
168
173
class CCApplication (web .Application ):
169
- @staticmethod
170
- async def _sender (ws , api ):
171
- while not ws .closed :
172
- cmd = await api ._cmd .get ()
173
- # print(f'_sender: {cmd}')
174
- if not ws .closed :
175
- await ws .send_json (cmd )
176
- if cmd ['action' ] == 'close' :
177
- break
178
-
179
174
@staticmethod
180
175
async def _json_messages (ws ):
181
176
async for msg in ws :
@@ -207,29 +202,32 @@ async def _launch_program(self, ws):
207
202
'error' : "program doesn't exist" ,
208
203
})
209
204
return None
210
- return CCAPI (msg ['computer' ], program , lambda : None )
205
+
206
+ return CCAPI (msg ['computer' ], program , ws .send_json )
211
207
212
208
async def ws (self , request ):
213
209
ws = web .WebSocketResponse ()
214
210
await ws .prepare (request )
215
211
216
212
api = await self ._launch_program (ws )
217
213
if api is not None :
218
- asyncio .create_task (self ._sender (ws , api ))
219
- async for msg in self ._json_messages (ws ):
220
- if msg ['action' ] == 'event' :
221
- for task_id in api ._event_to_tids .get (msg ['event' ], ()):
222
- await api ._result_queues [task_id ].put (msg ['params' ])
223
- elif msg ['action' ] == 'task_result' :
224
- api ._result_values [msg ['task_id' ]] = msg ['result' ]
225
- api ._result_locks [msg ['task_id' ]].set ()
226
- # print(msg['task_id'], msg['yields'])
227
- else :
228
- await ws .send_json ({
229
- 'action' : 'close' ,
230
- 'error' : 'protocol error' ,
231
- })
232
- break
214
+ try :
215
+ async for msg in self ._json_messages (ws ):
216
+ if msg ['action' ] == 'event' :
217
+ for task_id in api ._event_to_tids .get (msg ['event' ], ()):
218
+ await api ._result_queues [task_id ].put (msg ['params' ])
219
+ elif msg ['action' ] == 'task_result' :
220
+ api ._result_values [msg ['task_id' ]] = msg ['result' ]
221
+ api ._result_locks [msg ['task_id' ]].set ()
222
+ # print(msg['task_id'], msg['yields'])
223
+ else :
224
+ await ws .send_json ({
225
+ 'action' : 'close' ,
226
+ 'error' : 'protocol error' ,
227
+ })
228
+ break
229
+ finally :
230
+ api .cancel ()
233
231
234
232
return ws
235
233
0 commit comments