8000 bpo-29679: Implement @contextlib.asynccontextmanager by JelleZijlstra · Pull Request #360 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

bpo-29679: Implement @contextlib.asynccontextmanager #360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
May 1, 2017
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add tests (duplicating the @contextmanager ones)
  • Loading branch information
JelleZijlstra committed Feb 28, 2017
commit a1d5b3f4e11f4842beaa1cd24084bf3e7e2df688
155 changes: 155 additions & 0 deletions Lib/test/test_contextlib.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Unit tests for contextlib.py, and other context managers."""

import asyncio
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer for other implementations to be able to readily test the rest of contextlib without requiring a working asyncio implementation, so it would be good to split these new tests out to a separate Lib/test_contextlib_async.py file.

import io
import sys
import tempfile
Expand Down Expand Up @@ -189,6 +190,160 @@ def woohoo(self, func, args, kwds):
self.assertEqual(target, (11, 22, 33, 44))


def _async_test(func):
"""Decorator to turn an async function into a test case."""
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
coro = func(*args, **kwargs)
return loop.run_until_complete(coro)
return wrapper


class AsyncContextManagerTestCase(unittest.TestCase):

@_async_test
async def test_contextmanager_plain(self):
state = []
@asynccontextmanager
async def woohoo():
state.append(1)
yield 42
state.append(999)
async with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
self.assertEqual(state, [1, 42, 999])

@_async_test
async def test_contextmanager_finally(self):
state = []
@asynccontextmanager
async def woohoo():
state.append(1)
try:
yield 42
finally:
state.append(999)
with self.assertRaises(ZeroDivisionError):
async with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
raise ZeroDivisionError()
self.assertEqual(state, [1, 42, 999])

@_async_test
async def test_contextmanager_no_reraise(self):
@asynccontextmanager
async def whee():
yield
ctx = whee()
await ctx.__aenter__()
# Calling __exit__ should not result in an exception
self.assertFalse(await ctx.__aexit__(TypeError, TypeError("foo"), None))

@_async_test
async def test_contextmanager_trap_yield_after_throw(self):
@asynccontextmanager
async def whoo():
try:
yield
except:
yield
ctx = whoo()
await ctx.__aenter__()
8000 with self.assertRaises(RuntimeError):
await ctx.__aexit__(TypeError, TypeError('foo'), None)

@_async_test
async def test_contextmanager_except(self):
state = []
@asynccontextmanager
async def woohoo():
state.append(1)
try:
yield 42
except ZeroDivisionError as e:
state.append(e.args[0])
self.assertEqual(state, [1, 42, 999])
async with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
raise ZeroDivisionError(999)
self.assertEqual(state, [1, 42, 999])

@_async_test
async def test_contextmanager_except_stopiter(self):
stop_exc = StopIteration('spam')
@asynccontextmanager
async def woohoo():
yield
try:
async with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail('StopIteration was suppressed')

@_async_test
async def test_contextmanager_except_stopasynciter(self):
stop_exc = StopAsyncIteration('spam')
@asynccontextmanager
async def woohoo():
yield
try:
async with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail('StopAsyncIteration was suppressed')

def _create_contextmanager_attribs(self):
def attribs(**kw):
def decorate(func):
for k,v in kw.items():
setattr(func,k,v)
return func
return decorate
@asynccontextmanager
@attribs(foo='bar')
async def baz(spam):
"""Whee!"" 8C76 "
yield
return baz

def test_contextmanager_attribs(self):
baz = self._create_contextmanager_attribs()
self.assertEqual(baz.__name__,'baz')
self.assertEqual(baz.foo, 'bar')

@support.requires_docstrings
def test_contextmanager_doc_attrib(self):
baz = self._create_contextmanager_attribs()
self.assertEqual(baz.__doc__, "Whee!")

@support.requires_docstrings
@_async_test
async def test_instance_docstring_given_cm_docstring(self):
baz = self._create_contextmanager_attribs()(None)
self.assertEqual(baz.__doc__, "Whee!")
async with baz:
pass # suppress warning

@_async_test
async def test_keywords(self):
# Ensure no keyword arguments are inhibited
@asynccontextmanager
async def woohoo(self, func, args, kwds):
yield (self, func, args, kwds)
async with woohoo(self=11, func=22, args=33, kwds=44) as target:
self.assertEqual(target, (11, 22, 33, 44))


class ClosingTestCase(unittest.TestCase):

@support.requires_docstrings
Expand Down
0