@@ -68,65 +68,53 @@ def sleep(t):
68
68
69
69
class IOQueue :
70
70
def __init__ (self ):
71
- self .poller = select .poll ()
72
- self .map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream]
71
+ import uevent
73
72
74
- def _enqueue (self , s , idx ):
75
- if id (s ) not in self .map :
76
- entry = [None , None , s ]
77
- entry [idx ] = cur_task
78
- self .map [id (s )] = entry
79
- self .poller .register (s , select .POLLIN if idx == 0 else select .POLLOUT )
73
+ self .poll = uevent .poll ()
74
+
75
+ # This is not a method, just put here to keep the name "remove" out of global namespace.
76
+ def remove (self , task ):
77
+ if self .data [0 ] is task :
78
+ idx = 0
79
+ elif self .data [1 ] is task :
80
+ idx = 1
81
+ elif self .data [2 ] is task :
82
+ idx = 2
80
83
else :
81
- sm = self .map [id (s )]
82
- assert sm [idx ] is None
83
- assert sm [1 - idx ] is not None
84
- sm [idx ] = cur_task
85
- self .poller .modify (s , select .POLLIN | select .POLLOUT )
86
- # Link task to this IOQueue so it can be removed if needed
87
- cur_task .data = self
88
-
89
- def _dequeue (self , s ):
90
- del self .map [id (s )]
91
- self .poller .unregister (s )
84
+ return
85
+ self .data [idx ] = None
86
+ self .unregister (1 << idx )
87
+
88
+ def _enqueue (self , s , idx ):
89
+ entry = self .poll .register (s , 1 << idx )
90
+ if entry .data is None :
91
+ entry .data = [None , None , None ]
92
+ entry .remove = IOQueue .remove
93
+ if idx > 2 :
94
+ idx = 2
95
+ assert entry .data [idx ] == None
96
+ entry .data [idx ] = cur_task
97
+ cur_task .data = entry # Link task to this poll entry so it can be removed if needed
92
98
93
99
def queue_read (self , s ):
94
100
self ._enqueue (s , 0 )
95
101
96
102
def queue_write (self , s ):
97
103
self ._enqueue (s , 1 )
98
104
99
- def remove (self , task ):
100
- while True :
101
- del_s = None
102
- for k in self .map : # Iterate without allocating on the heap
103
- q0 , q1 , s = self .map [k ]
104
- if q0 is task or q1 is task :
105
- del_s = s
106
- break
107
- if del_s is not None :
108
- self ._dequeue (s )
109
- else :
110
- break
111
-
112
- def wait_io_event (self , dt ):
113
- for s , ev in self .poller .ipoll (dt ):
114
- sm = self .map [id (s )]
115
- # print('poll', s, sm, ev)
116
- if ev & ~ select .POLLOUT and sm [0 ] is not None :
117
- # POLLIN or error
118
- _task_queue .push_head (sm [0 ])
119
- sm [0 ] = None
120
- if ev & ~ select .POLLIN and sm [1 ] is not None :
121
- # POLLOUT or error
122
- _task_queue .push_head (sm [1 ])
123
- sm [1 ] = None
124
- if sm [0 ] is None and sm [1 ] is None :
125
- self ._dequeue (s )
126
- elif sm [0 ] is None :
127
- self .poller .modify (s , select .POLLOUT )
128
- else :
129
- self .poller .modify (s , select .POLLIN )
105
+ def poll_ms (self , dt ):
106
+ for entry in self .poll .poll_ms (dt ):
107
+ flags = entry .flags
108
+ data = entry .data
109
+ if flags & 1 and data [0 ] is not None :
110
+ _task_queue .push_head (data [0 ])
111
+ data [0 ] = None
112
+ if flags & 2 and data [1 ] is not None :
113
+ _task_queue .push_head (data [1 ])
114
+ data [1 ] = None
115
+ if flags >> 2 and data [2 ] is not None :
116
+ _task_queue .push_head (data [2 ])
117
+ data [2 ] = None
130
118
131
119
132
120
################################################################################
@@ -160,11 +148,8 @@ def run_until_complete(main_task=None):
160
148
if t :
161
149
# A task waiting on _task_queue; "ph_key" is time to schedule task at
162
150
dt = max (0 , ticks_diff (t .ph_key , ticks ()))
163
- elif not _io_queue .map :
164
- # No tasks can be woken so finished running
165
- return
166
- # print('(poll {})'.format(dt), len(_io_queue.map))
167
- _io_queue .wait_io_event (dt )
151
+ # print('(poll_ms {})'.format(dt))
152
+ _io_queue .poll_ms (dt )
168
153
169
154
# Get next task to run and continue it
170
155
t = _task_queue .pop_head ()
0 commit comments