8000 bpo-29679: Implement @contextlib.asynccontextmanager by JelleZijlstra · Pull Request #360 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

bpo-29679: Implement @contextlib.asynccontextmanager #360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
May 1, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
move asynccontextmanager tests into their own file
  • Loading branch information
JelleZijlstra committed Mar 1, 2017
commit 689f4a54c3e8dcbecdc8148d0fbc6f0f90c77fc7
155 changes: 0 additions & 155 deletions Lib/test/test_contextlib.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Unit tests for contextlib.py, and other context managers."""

import asyncio
import io
import sys
import tempfile
Expand Down Expand Up @@ -190,160 +189,6 @@ def woohoo(self, func, args, kwds):
self.assertEqual(target, (11, 22, 33, 44))


def _async_test(func):
"""Decorator to turn an async function into a test case."""
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
coro = func(*args, **kwargs)
return loop.run_until_complete(coro)
return wrapper


class AsyncContextManagerTestCase(unittest.TestCase):

@_async_test
async def test_contextmanager_plain(self):
state = []
@asynccontextmanager
async def woohoo():
state.append(1)
yield 42
state.append(999)
async with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
self.assertEqual(state, [1, 42, 999])

@_async_test
async def test_contextmanager_finally(self):
state = []
@asynccontextmanager
async def woohoo():
state.append(1)
try:
yield 42
finally:
state.append(999)
with self.assertRaises(ZeroDivisionError):
async with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
raise ZeroDivisionError()
self.assertEqual(state, [1, 42, 999])

@_async_test
async def test_contextmanager_no_reraise(self):
@asynccontextmanager
async def whee():
yield
ctx = whee()
await ctx.__aenter__()
# Calling __exit__ should not result in an exception
self.assertFalse(await ctx.__aexit__(TypeError, TypeError("foo"), None))

@_async_test
async def test_contextmanager_trap_yield_after_throw(self):
@asynccontextmanager< 10000 /span>
async def whoo():
try:
yield
except:
yield
ctx = whoo()
await ctx.__aenter__()
with self.assertRaises(RuntimeError):
await ctx.__aexit__(TypeError, TypeError('foo'), None)

@_async_test
async def test_contextmanager_except(self):
state = []
@asynccontextmanager
async def woohoo():
state.append(1)
try:
yield 42
except ZeroDivisionError as e:
state.append(e.args[0])
self.assertEqual(state, [1, 42, 999])
async with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
raise ZeroDivisionError(999)
self.assertEqual(state, [1, 42, 999])

@_async_test
async def test_contextmanager_except_stopiter(self):
stop_exc = StopIteration('spam')
@asynccontextmanager
async def woohoo():
yield
try:
async with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail('StopIteration was suppressed')

@_async_test
async def test_contextmanager_except_stopasynciter(self):
stop_exc = StopAsyncIteration('spam')
@asynccontextmanager
async def woohoo():
yield
try:
async with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail('StopAsyncIteration was suppressed')

def _create_contextmanager_attribs(self):
def attribs(**kw):
def decorate(func):
for k,v in kw.items():
setattr(func,k,v)
return func
return decorate
@asynccontextmanager
@attribs(foo='bar')
async def baz(spam):
"""Whee!"""
yield
return baz

def test_contextmanager_attribs(self):
baz = self._create_contextmanager_attribs()
self.assertEqual(baz.__name__,'baz')
self.assertEqual(baz.foo, 'bar')

@support.requires_docstrings
def test_contextmanager_doc_attrib(self):
baz = self._create_contextmanager_attribs()
self.assertEqual(baz.__doc__, "Whee!")

@support.requires_docstrings
@_async_test
async def test_instance_docstring_given_cm_docstring(self):
baz = self._create_contextmanager_attribs()(None)
self.assertEqual(baz.__doc__, "Whee!")
async with baz:
pass # suppress warning

@_async_test
async def test_keywords(self):
# Ensure no keyword arguments are inhibited
@asynccontextmanager
async def woohoo(self, func, args, kwds):
yield (self, func, args, kwds)
async with woohoo(self=11, func=22, args=33, kwds=44) as target:
self.assertEqual(target, (11, 22, 33, 44))


class ClosingTestCase(unittest.TestCase):

@support.requires_docstrings
Expand Down
162 changes: 162 additions & 0 deletions Lib/test/test_contextlib_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import asyncio
from contextlib import asynccontextmanager
from test import support
import unittest


def _async_test(func):
"""Decorator to turn an async function into a test case."""
def wrapper(*args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I know it's just a test, but please add @functools.wraps(func)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a wraps decorator.

loop = asyncio.get_event_loop()
coro = func(*args, **kwargs)
return loop.run_until_complete(coro)
return wrapper


class AsyncContextManagerTestCase(unittest.TestCase):

@_async_test
async def test_contextmanager_plain(self):
state = []
@asynccontextmanager
async def woohoo():
state.append(1)
yield 42
state.append(999)
async with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
self.assertEqual(state, [1, 42, 999])

@_async_test
async def test_contextmanager_finally(self):
state = []
@asynccontextmanager
async def woohoo():
state.append(1)
try:
yield 42
finally:
state.append(999)
with self.assertRaises(ZeroDivisionError):
async with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
raise ZeroDivisionError()
self.assertEqual(state, [1, 42, 999])

@_async_test
async def test_contextmanager_no_reraise(self):
@asynccontextmanager
async def whee():
yield
ctx = whee()
await ctx.__aenter__()
# Calling __exit__ should not result in an exception
self.assertFalse(await ctx.__aexit__(TypeError, TypeError("foo"), None))

@_async_test
async def test_contextmanager_trap_yield_after_throw(self):
@asynccontextmanager
async def whoo():
try:
yield
except:
yield
ctx = whoo()
await ctx.__aenter__()
with self.assertRaises(RuntimeError):
await ctx.__aexit__(TypeError, TypeError('foo'), None)

@_async_test
async def test_contextmanager_except(self):
state = []
@asynccontextmanager
async def woohoo():
state.append(1)
try:
yield 42
except ZeroDivisionError as e:
state.append(e.args[0])
self.assertEqual(state, [1, 42, 999])
async with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
raise ZeroDivisionError(999)
self.assertEqual(state, [1, 42, 999])

@_async_test
async def test_contextmanager_except_stopiter(self):
stop_exc = StopIteration('spam')
@asynccontextmanager
async def woohoo():
yield
try:
async with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail('StopIteration was suppressed')

@_async_test
async def test_contextmanager_except_stopasynciter(self):
stop_exc = StopAsyncIteration('spam')
@asynccontextmanager
async def woohoo():
yield
try:
async with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail('StopAsyncIteration was suppressed')

def _create_contextmanager_attribs(self):
def attribs(**kw):
def decorate(func):
for k,v in kw.items():
setattr(func,k,v)
return func
return decorate
@asynccontextmanager
@attribs(foo='bar')
async def baz(spam):
"""Whee!"""
yield
return baz

def test_contextmanager_attribs(self):
baz = self._create_contextmanager_attribs()
self.assertEqual(baz.__name__,'baz')
self.assertEqual(baz.foo, 'bar')

@support.requires_docstrings
def test_contextmanager_doc_attrib(self):
baz = self._create_contextmanager_attribs()
self.assertEqual(baz.__doc__, "Whee!")

@support.requires_docstrings
@_async_test
async def test_instance_docstring_given_cm_docstring(self):
baz = self._create_contextmanager_attribs()(None)
self.assertEqual(baz.__doc__, "Whee!")
async with baz:
pass # suppress warning

@_async_test
async def test_keywords(self):
# Ensure no keyword arguments are inhibited
@asynccontextmanager
async def woohoo(self, func, args, kwds):
yield (self, func, args, kwds)
async with woohoo(self=11, func=22, args=33, kwds=44) as target:
self.assertEqual(target, (11, 22, 33, 44))


if __name__ == '__main__':
unittest.main()
0