8000 bpo-43352: Add a Barrier object to asyncio synchronized primitives (G… · python/cpython@cca2918 · GitHub
[go: up one dir, main page]

Skip to content

Commit cca2918

Browse files
committed
bpo-43352: Add a Barrier object to asyncio synchronized primitives (GH-24903)
Add a new test `test_draining_check_cancelled_action` Update `test_repr`
1 parent 28eba9b commit cca2918

File tree

1 file changed

+86
-47
lines changed

1 file changed

+86
-47
lines changed

Lib/test/test_asyncio/test_locks.py

Lines changed: 86 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -936,35 +936,35 @@ async def gather_tasks(self, n, coro):
936936

937937
async def test_barrier(self):
938938
barrier = asyncio.Barrier(self.N)
939-
self.assertTrue(barrier.filling)
939+
self.assertTrue(barrier._is_filling())
940940
with self.assertRaisesRegex(
941941
TypeError,
942942
"object Barrier can't be used in 'await' expression",
943943
):
944944
await barrier
945945

946-
self.assertTrue(barrier.filling)
946+
self.assertTrue(barrier._is_filling())
947947

948948
async def test_repr(self):
949949
barrier = asyncio.Barrier(self.N)
950950

951951
self.assertTrue(RGX_REPR.match(repr(barrier)))
952952

953953
incr = 2
954-
barrier._count_wait += incr
954+
barrier._count += incr
955955
self.assertTrue(RGX_REPR.match(repr(barrier)))
956956
self.assertTrue(f"wait:{incr}/{self.N}" in repr(barrier))
957-
self.assertTrue(f"block:0/{self.N}" in repr(barrier))
957+
self.assertTrue(f"block" not in repr(barrier))
958958

959959
barrier._set_filling()
960960
self.assertTrue(RGX_REPR.match(repr(barrier)))
961961
self.assertTrue(repr(barrier).endswith('state:0]>'))
962962

963963
barrier._count_block += incr
964-
barrier._count_wait -= incr
964+
barrier._count -= incr
965965
self.assertTrue(RGX_REPR.match(repr(barrier)))
966-
self.assertTrue(f"wait:0/{self.N}" in repr(barrier))
967966
self.assertTrue(f"block:{incr}/{self.N}" in repr(barrier))
967+
self.assertTrue(f"wait" not in repr(barrier))
968968

969969
barrier._set_draining()
970970
self.assertTrue(RGX_REPR.match(repr(barrier)))
@@ -992,7 +992,7 @@ async def test_barrier_parties(self):
992992
self.assertRaises(ValueError, lambda: asyncio.Barrier(-4))
993993

994994
async def test_context_manager(self):
995-
self.N = 2
995+
self.N = 3
996996
barrier = asyncio.Barrier(self.N)
997997
results = []
998998

@@ -1004,50 +1004,58 @@ async def coro():
10041004

10051005
self.assertListEqual(sorted(results), list(range(self.N)))
10061006
self.assertEqual(barrier.n_waiting, 0)
1007-
self.assertFalse(barrier.broken)
1007+
self.assertFalse(barrier._is_broken())
10081008

10091009
async def test_filling_one_task(self):
10101010
barrier = asyncio.Barrier(1)
1011-
results = []
10121011

10131012
async def f():
10141013
async with barrier as i:
1015-
results.append(i)
1014+
return True
10161015

1017-
await f()
1016+
ret = await f()
10181017

1019-
self.assertEqual(len(results), 1)
1020-
self.assertEqual(results, [0])
1018+
self.assertTrue(ret)
10211019
self.assertEqual(barrier.n_waiting, 0)
1022-
self.assertFalse(barrier.broken)
1020+
self.assertFalse(barrier._is_broken())
10231021

10241022
async def test_filling_one_task_twice(self):
10251023
barrier = asyncio.Barrier(1)
10261024

1027-
r1 = asyncio.create_task(barrier.wait())
1025+
t1 = asyncio.create_task(barrier.wait())
10281026
await asyncio.sleep(0)
10291027
self.assertEqual(barrier.n_waiting, 0)
10301028

1031-
r2 = asyncio.create_task(barrier.wait())
1029+
t2 = asyncio.create_task(barrier.wait())
10321030
await asyncio.sleep(0)
10331031

1034-
self.assertEqual(r1.result(), r2.result())
1035-
self.assertEqual(r1.done(), r2.done())
1032+
self.assertEqual(t1.result(), t2.result())
1033+
self.assertEqual(t1.done(), t2.done())
10361034

10371035
self.assertEqual(barrier.n_waiting, 0)
1038-
self.assertFalse(barrier.broken)
1036+
self.assertFalse(barrier._is_broken())
10391037

10401038
async def test_filling_task_by_task(self):
10411039
self.N = 3
10421040
barrier = asyncio.Barrier(self.N)
10431041

1044-
async def coro():
1045-
await barrier.wait()
1042+
t1 = asyncio.create_task(barrier.wait())
1043+
await asyncio.sleep(0)
1044+
self.assertEqual(barrier.n_waiting, 1)
1045+
self.assertTrue(barrier._is_filling())
10461046

1047-
await self.gather_tasks(self.N, coro)
1047+
t2 = asyncio.create_task(barrier.wait())
1048+
await asyncio.sleep(0)
1049+
self.assertEqual(barrier.n_waiting, 2)
1050+
self.assertTrue(barrier._is_filling())
1051+
1052+
t3 = asyncio.create_task(barrier.wait())
1053+
await asyncio.sleep(0)
1054+
1055+
await asyncio.wait([t1, t2, t3])
10481056

10491057
self.assertEqual(barrier.n_waiting, 0)
1050-
self.assertFalse(barrier.broken)
1058+
self.assertFalse(barrier._is_broken())
10511059

10521060
async def test_filling_tasks_wait_twice(self):
10531061
barrier = asyncio.Barrier(self.N)
@@ -1067,7 +1075,7 @@ async def coro():
10671075
self.assertEqual(results.count(False), self.N)
10681076

10691077
self.assertEqual(barrier.n_waiting, 0)
1070-
self.assertFalse(barrier.broken)
1078+
self.assertFalse(barrier._is_broken())
10711079

10721080
async def test_filling_tasks_check_return_value(self):
10731081
barrier = asyncio.Barrier(self.N)
@@ -1091,7 +1099,7 @@ async def coro():
10911099
self.assertListEqual(sorted(res), list(range(self.N)))
10921100

10931101
self.assertEqual(barrier.n_waiting, 0)
1094-
self.assertFalse(barrier.broken)
1102+
self.assertFalse(barrier._is_broken())
10951103

10961104
async def test_draining_state(self):
10971105
barrier = asyncio.Barrier(self.N)
@@ -1100,7 +1108,7 @@ async def test_draining_state(self):
11001108
async def coro():
11011109
async with barrier:
11021110
# barrier state change to filling for the last task release
1103-
results.append(barrier.draining)
1111+
results.append(barrier._is_draining())
11041112

11051113
await self.gather_tasks(self.N, coro)
11061114

@@ -1109,7 +1117,7 @@ async def coro():
11091117
self.assertTrue(all(results[:self.N-1]))
11101118

11111119
self.assertEqual(barrier.n_waiting, 0)
1112-
self.assertFalse(barrier.broken)
1120+
self.assertFalse(barrier._is_broken())
11131121

11141122
async def test_blocking_tasks_while_draining(self):
11151123
rewait = 2
@@ -1166,6 +1174,8 @@ async def coro():
11661174
t1.cancel()
11671175
await asyncio.sleep(0)
11681176
self.assertEqual(barrier.n_waiting, 1)
1177+
with self.assertRaises(asyncio.CancelledError):
1178+
await t1
11691179
self.assertTrue(t1.cancelled())
11701180

11711181
t3 = asyncio.create_task(coro())
@@ -1179,7 +1189,7 @@ async def coro():
11791189
self.assertTrue(all(results))
11801190

11811191
self.assertEqual(barrier.n_waiting, 0)
1182-
self.assertFalse(barrier.broken)
1192+
self.assertFalse(barrier._is_broken())
11831193

11841194
async def test_draining_check_action(self):
11851195
async def action_task():
@@ -1200,7 +1210,37 @@ async def coro():
12001210
self.assertTrue(all(results))
12011211

12021212
self.assertEqual(barrier.n_waiting, 0)
1203-
self.assertFalse(barrier.broken)
1213+
self.assertFalse(barrier._is_broken())
1214+
1215+
async def test_draining_check_cancelled_action(self):
1216+
async def action_coro():
1217+
asyncio.current_task().cancel()
1218+
await asyncio.sleep(0)
1219+
1220+
barrier = asyncio.Barrier(self.N, action=action_coro)
1221+
results = []
1222+
results1 = []
1223+
results2 = []
1224+
1225+
async def coro():
1226+
try:
1227+
await barrier.wait()
1228+
results.append(True)
1229+
except asyncio.CancelledError:
1230+
results1.append(True)
1231+
except asyncio.BrokenBarrierError:
1232+
results2.append(True)
1233+
1234+
#with self.assertRaises(asyncio.CancelledError):
1235+
res, t = await self.gather_tasks(self.N, coro)
1236+
1237+
self.assertEqual(len(results), 0)
1238+
self.assertEqual(results1, [True])
1239+
self.assertEqual(len(results2), self.N-1)
1240+
self.assertTrue(all(results2))
1241+
1242+
self.assertEqual(barrier.n_waiting, 0)
1243+
self.assertTrue(barrier._is_broken())
12041244

12051245
async def test_draining_check_error_on_action(self):
12061246
ERROR = ZeroDivisionError
@@ -1214,21 +1254,20 @@ async def raise_except():
12141254

12151255
async def coro():
12161256
try:
1217-
ret = await barrier.wait()
1257+
await barrier.wait()
12181258
except ERROR:
12191259
results1.append(False)
12201260
except asyncio.BrokenBarrierError:
12211261
results2.append(True)
12221262

12231263
await self.gather_tasks(self.N, coro)
12241264

1225-
self.assertEqual(len(results1), 1)
1226-
self.assertFalse(results1[0])
1265+
self.assertEqual(results1, [False])
12271266
self.assertEqual(len(results2), self.N-1)
12281267
self.assertTrue(all(results2))
12291268

12301269
self.assertEqual(barrier.n_waiting, 0)
1231-
self.assertTrue(barrier.broken)
1270+
self.assertTrue(barrier._is_broken())
12321271

12331272
async def test_reset_barrier(self):
12341273
barrier = asyncio.Barrier(1)
@@ -1237,7 +1276,7 @@ async def test_reset_barrier(self):
12371276
await asyncio.sleep(0)
12381277

12391278
self.assertEqual(barrier.n_waiting, 0)
1240-
self.assertFalse(barrier.broken)
1279+
self.assertFalse(barrier._is_broken())
12411280

12421281
async def test_reset_barrier_while_tasks_waiting(self):
12431282
barrier = asyncio.Barrier(self.N)
@@ -1263,8 +1302,8 @@ async def coro_reset():
12631302
self.assertEqual(len(results), self.N-1)
12641303
self.assertTrue(all(results))
12651304
self.assertEqual(barrier.n_waiting, 0)
1266-
self.assertFalse(barrier.resetting)
1267-
self.assertFalse(barrier.broken)
1305+
self.assertFalse(barrier._is_resetting())
1306+
self.assertFalse(barrier._is_broken())
12681307

12691308
async def test_reset_barrier_when_tasks_half_draining(self):
12701309
barrier = asyncio.Barrier(self.N)
@@ -1287,8 +1326,8 @@ async def coro():
12871326

12881327
self.assertEqual(results1, [True]*rest_of_tasks)
12891328
self.assertEqual(barrier.n_waiting, 0)
1290-
self.assertFalse(barrier.resetting)
1291-
self.assertFalse(barrier.broken)
1329+
self.assertFalse(barrier._is_resetting())
1330+
self.assertFalse(barrier._is_broken())
12921331

12931332
async def test_reset_barrier_when_tasks_half_draining_half_blocking(self):
12941333
barrier = asyncio.Barrier(self.N)
@@ -1324,8 +1363,8 @@ async def coro():
13241363
self.assertEqual(results1, [True]*blocking_tasks)
13251364
self.assertEqual(results2, [])
13261365
self.assertEqual(barrier.n_waiting, 0)
1327-
self.assertFalse(barrier.resetting)
1328-
self.assertFalse(barrier.broken)
1366+
self.assertFalse(barrier._is_resetting())
1367+
self.assertFalse(barrier._is_broken())
13291368

13301369
async def test_reset_barrier_while_tasks_waiting_and_waiting_again(self):
13311370
barrier = asyncio.Barrier(self.N)
@@ -1356,7 +1395,7 @@ async def coro2():
13561395

13571396
await asyncio.gather(*tasks)
13581397

1359-
self.assertFalse(barrier.broken)
1398+
self.assertFalse(barrier._is_broken())
13601399
self.assertEqual(len(results1), self.N-1)
13611400
self.assertTrue(all(results1))
13621401
self.assertEqual(len(results2), self.N)
@@ -1401,7 +1440,7 @@ async def coro():
14011440

14021441
await self.gather_tasks(self.N, coro)
14031442

1404-
self.assertFalse(barrier.broken)
1443+
self.assertFalse(barrier._is_broken())
14051444
self.assertTrue(all(results1))
14061445
self.assertEqual(len(results1), self.N-1)
14071446
self.assertEqual(len(results2), 0)
@@ -1417,7 +1456,7 @@ async def test_abort_barrier(self):
14171456
await asyncio.sleep(0)
14181457

14191458
self.assertEqual(barrier.n_waiting, 0)
1420-
self.assertTrue(barrier.broken)
1459+
self.assertTrue(barrier._is_broken())
14211460

14221461
async def test_abort_barrier_when_tasks_half_draining_half_blocking(self):
14231462
barrier = asyncio.Barrier(self.N)
@@ -1444,11 +1483,11 @@ async def coro():
14441483

14451484
await self.gather_tasks(self.N, coro)
14461485

1447-
self.assertTrue(barrier.broken)
1486+
self.assertTrue(barrier._is_broken())
14481487
self.assertEqual(results1, [True]*blocking_tasks)
14491488
self.assertEqual(results2, [True]*(self.N-blocking_tasks-1))
14501489
self.assertEqual(barrier.n_waiting, 0)
1451-
self.assertFalse(barrier.resetting)
1490+
self.assertFalse(barrier._is_resetting())
14521491

14531492
async def test_abort_barrier_when_exception(self):
14541493
# test from threading.Barrier: see `lock_tests.test_reset`
@@ -1470,7 +1509,7 @@ async def coro():
14701509

14711510
await self.gather_tasks(self.N, coro)
14721511

1473-
self.assertTrue(barrier.broken)
1512+
self.assertTrue(barrier._is_broken())
14741513
self.assertEqual(len(results1), 0)
14751514
self.assertEqual(len(results2), self.N-1)
14761515
self.assertTrue(all(results2))
@@ -1508,7 +1547,7 @@ async def coro():
15081547

15091548
await self.gather_tasks(self.N, coro)
15101549

1511-
self.assertFalse(barrier1.broken)
1550+
self.assertFalse(barrier1._is_broken())
15121551
self.assertEqual(len(results1), 0)
15131552
self.assertEqual(len(results2), self.N-1)
15141553
self.assertTrue(all(results2))

0 commit comments

Comments
 (0)
0