diff --git a/Lib/test/test_asyncgen.py b/Lib/test/test_asyncgen.py index 3a8d26f2253..c8024d659f1 100644 --- a/Lib/test/test_asyncgen.py +++ b/Lib/test/test_asyncgen.py @@ -1,11 +1,16 @@ import inspect import types import unittest +import contextlib from test.support.import_helper import import_module +from test.support import gc_collect asyncio = import_module("asyncio") +_no_default = object() + + class AwaitException(Exception): pass @@ -44,6 +49,37 @@ async def iterate(): return run_until_complete(iterate()) +def py_anext(iterator, default=_no_default): + """Pure-Python implementation of anext() for testing purposes. + + Closely matches the builtin anext() C implementation. + Can be used to compare the built-in implementation of the inner + coroutines machinery to C-implementation of __anext__() and send() + or throw() on the returned generator. + """ + + try: + __anext__ = type(iterator).__anext__ + except AttributeError: + raise TypeError(f'{iterator!r} is not an async iterator') + + if default is _no_default: + return __anext__(iterator) + + async def anext_impl(): + try: + # The C code is way more low-level than this, as it implements + # all methods of the iterator protocol. In this implementation + # we're relying on higher-level coroutine concepts, but that's + # exactly what we want -- crosstest pure-Python high-level + # implementation and low-level C anext() iterators. + return await __anext__(iterator) + except StopAsyncIteration: + return default + + return anext_impl() + + class AsyncGenSyntaxTest(unittest.TestCase): def test_async_gen_syntax_01(self): @@ -374,6 +410,380 @@ def tearDown(self): self.loop = None asyncio.set_event_loop_policy(None) + def check_async_iterator_anext(self, ait_class): + with self.subTest(anext="pure-Python"): + self._check_async_iterator_anext(ait_class, py_anext) + with self.subTest(anext="builtin"): + self._check_async_iterator_anext(ait_class, anext) + + def _check_async_iterator_anext(self, ait_class, anext): + g = ait_class() + async def consume(): + results = [] + results.append(await anext(g)) + results.append(await anext(g)) + results.append(await anext(g, 'buckle my shoe')) + return results + res = self.loop.run_until_complete(consume()) + self.assertEqual(res, [1, 2, 'buckle my shoe']) + with self.assertRaises(StopAsyncIteration): + self.loop.run_until_complete(consume()) + + async def test_2(): + g1 = ait_class() + self.assertEqual(await anext(g1), 1) + self.assertEqual(await anext(g1), 2) + with self.assertRaises(StopAsyncIteration): + await anext(g1) + with self.assertRaises(StopAsyncIteration): + await anext(g1) + + g2 = ait_class() + self.assertEqual(await anext(g2, "default"), 1) + self.assertEqual(await anext(g2, "default"), 2) + self.assertEqual(await anext(g2, "default"), "default") + self.assertEqual(await anext(g2, "default"), "default") + + return "completed" + + result = self.loop.run_until_complete(test_2()) + self.assertEqual(result, "completed") + + def test_send(): + p = ait_class() + obj = anext(p, "completed") + with self.assertRaises(StopIteration): + with contextlib.closing(obj.__await__()) as g: + g.send(None) + + test_send() + + async def test_throw(): + p = ait_class() + obj = anext(p, "completed") + self.assertRaises(SyntaxError, obj.throw, SyntaxError) + return "completed" + + result = self.loop.run_until_complete(test_throw()) + self.assertEqual(result, "completed") + + # TODO: RUSTPYTHON, NameError: name 'anext' is not defined + @unittest.expectedFailure + def test_async_generator_anext(self): + async def agen(): + yield 1 + yield 2 + self.check_async_iterator_anext(agen) + + # TODO: RUSTPYTHON, NameError: name 'anext' is not defined + @unittest.expectedFailure + def test_python_async_iterator_anext(self): + class MyAsyncIter: + """Asynchronously yield 1, then 2.""" + def __init__(self): + self.yielded = 0 + def __aiter__(self): + return self + async def __anext__(self): + if self.yielded >= 2: + raise StopAsyncIteration() + else: + self.yielded += 1 + return self.yielded + self.check_async_iterator_anext(MyAsyncIter) + + # TODO: RUSTPYTHON, NameError: name 'anext' is not defined + @unittest.expectedFailure + def test_python_async_iterator_types_coroutine_anext(self): + import types + class MyAsyncIterWithTypesCoro: + """Asynchronously yield 1, then 2.""" + def __init__(self): + self.yielded = 0 + def __aiter__(self): + return self + @types.coroutine + def __anext__(self): + if False: + yield "this is a generator-based coroutine" + if self.yielded >= 2: + raise StopAsyncIteration() + else: + self.yielded += 1 + return self.yielded + self.check_async_iterator_anext(MyAsyncIterWithTypesCoro) + + # TODO: RUSTPYTHON: async for gen expression compilation + # def test_async_gen_aiter(self): + # async def gen(): + # yield 1 + # yield 2 + # g = gen() + # async def consume(): + # return [i async for i in aiter(g)] + # res = self.loop.run_until_complete(consume()) + # self.assertEqual(res, [1, 2]) + + # TODO: RUSTPYTHON, NameError: name 'aiter' is not defined + @unittest.expectedFailure + def test_async_gen_aiter_class(self): + results = [] + class Gen: + async def __aiter__(self): + yield 1 + yield 2 + g = Gen() + async def consume(): + ait = aiter(g) + while True: + try: + results.append(await anext(ait)) + except StopAsyncIteration: + break + self.loop.run_until_complete(consume()) + self.assertEqual(results, [1, 2]) + + # TODO: RUSTPYTHON, NameError: name 'aiter' is not defined + @unittest.expectedFailure + def test_aiter_idempotent(self): + async def gen(): + yield 1 + applied_once = aiter(gen()) + applied_twice = aiter(applied_once) + self.assertIs(applied_once, applied_twice) + + # TODO: RUSTPYTHON, NameError: name 'anext' is not defined + @unittest.expectedFailure + def test_anext_bad_args(self): + async def gen(): + yield 1 + async def call_with_too_few_args(): + await anext() + async def call_with_too_many_args(): + await anext(gen(), 1, 3) + async def call_with_wrong_type_args(): + await anext(1, gen()) + async def call_with_kwarg(): + await anext(aiterator=gen()) + with self.assertRaises(TypeError): + self.loop.run_until_complete(call_with_too_few_args()) + with self.assertRaises(TypeError): + self.loop.run_until_complete(call_with_too_many_args()) + with self.assertRaises(TypeError): + self.loop.run_until_complete(call_with_wrong_type_args()) + with self.assertRaises(TypeError): + self.loop.run_until_complete(call_with_kwarg()) + + # TODO: RUSTPYTHON, NameError: name 'anext' is not defined + @unittest.expectedFailure + def test_anext_bad_await(self): + async def bad_awaitable(): + class BadAwaitable: + def __await__(self): + return 42 + class MyAsyncIter: + def __aiter__(self): + return self + def __anext__(self): + return BadAwaitable() + regex = r"__await__.*iterator" + awaitable = anext(MyAsyncIter(), "default") + with self.assertRaisesRegex(TypeError, regex): + await awaitable + awaitable = anext(MyAsyncIter()) + with self.assertRaisesRegex(TypeError, regex): + await awaitable + return "completed" + result = self.loop.run_until_complete(bad_awaitable()) + self.assertEqual(result, "completed") + + async def check_anext_returning_iterator(self, aiter_class): + awaitable = anext(aiter_class(), "default") + with self.assertRaises(TypeError): + await awaitable + awaitable = anext(aiter_class()) + with self.assertRaises(TypeError): + await awaitable + return "completed" + + # TODO: RUSTPYTHON, NameError: name 'anext' is not defined + @unittest.expectedFailure + def test_anext_return_iterator(self): + class WithIterAnext: + def __aiter__(self): + return self + def __anext__(self): + return iter("abc") + result = self.loop.run_until_complete(self.check_anext_returning_iterator(WithIterAnext)) + self.assertEqual(result, "completed") + + # TODO: RUSTPYTHON, NameError: name 'anext' is not defined + @unittest.expectedFailure + def test_anext_return_generator(self): + class WithGenAnext: + def __aiter__(self): + return self + def __anext__(self): + yield + result = self.loop.run_until_complete(self.check_anext_returning_iterator(WithGenAnext)) + self.assertEqual(result, "completed") + + # TODO: RUSTPYTHON, NameError: name 'anext' is not defined + @unittest.expectedFailure + def test_anext_await_raises(self): + class RaisingAwaitable: + def __await__(self): + raise ZeroDivisionError() + yield + class WithRaisingAwaitableAnext: + def __aiter__(self): + return self + def __anext__(self): + return RaisingAwaitable() + async def do_test(): + awaitable = anext(WithRaisingAwaitableAnext()) + with self.assertRaises(ZeroDivisionError): + await awaitable + awaitable = anext(WithRaisingAwaitableAnext(), "default") + with self.assertRaises(ZeroDivisionError): + await awaitable + return "completed" + result = self.loop.run_until_complete(do_test()) + self.assertEqual(result, "completed") + + # TODO: RUSTPYTHON, NameError: name 'anext' is not defined + @unittest.expectedFailure + def test_anext_iter(self): + @types.coroutine + def _async_yield(v): + return (yield v) + + class MyError(Exception): + pass + + async def agenfn(): + try: + await _async_yield(1) + except MyError: + await _async_yield(2) + return + yield + + def test1(anext): + agen = agenfn() + with contextlib.closing(anext(agen, "default").__await__()) as g: + self.assertEqual(g.send(None), 1) + self.assertEqual(g.throw(MyError, MyError(), None), 2) + try: + g.send(None) + except StopIteration as e: + err = e + else: + self.fail('StopIteration was not raised') + self.assertEqual(err.value, "default") + + def test2(anext): + agen = agenfn() + with contextlib.closing(anext(agen, "default").__await__()) as g: + self.assertEqual(g.send(None), 1) + self.assertEqual(g.throw(MyError, MyError(), None), 2) + with self.assertRaises(MyError): + g.throw(MyError, MyError(), None) + + def test3(anext): + agen = agenfn() + with contextlib.closing(anext(agen, "default").__await__()) as g: + self.assertEqual(g.send(None), 1) + g.close() + with self.assertRaisesRegex(RuntimeError, 'cannot reuse'): + self.assertEqual(g.send(None), 1) + + def test4(anext): + @types.coroutine + def _async_yield(v): + yield v * 10 + return (yield (v * 10 + 1)) + + async def agenfn(): + try: + await _async_yield(1) + except MyError: + await _async_yield(2) + return + yield + + agen = agenfn() + with contextlib.closing(anext(agen, "default").__await__()) as g: + self.assertEqual(g.send(None), 10) + self.assertEqual(g.throw(MyError, MyError(), None), 20) + with self.assertRaisesRegex(MyError, 'val'): + g.throw(MyError, MyError('val'), None) + + def test5(anext): + @types.coroutine + def _async_yield(v): + yield v * 10 + return (yield (v * 10 + 1)) + + async def agenfn(): + try: + await _async_yield(1) + except MyError: + return + yield 'aaa' + + agen = agenfn() + with contextlib.closing(anext(agen, "default").__await__()) as g: + self.assertEqual(g.send(None), 10) + with self.assertRaisesRegex(StopIteration, 'default'): + g.throw(MyError, MyError(), None) + + def test6(anext): + @types.coroutine + def _async_yield(v): + yield v * 10 + return (yield (v * 10 + 1)) + + async def agenfn(): + await _async_yield(1) + yield 'aaa' + + agen = agenfn() + with contextlib.closing(anext(agen, "default").__await__()) as g: + with self.assertRaises(MyError): + g.throw(MyError, MyError(), None) + + def run_test(test): + with self.subTest('pure-Python anext()'): + test(py_anext) + with self.subTest('builtin anext()'): + test(anext) + + run_test(test1) + run_test(test2) + run_test(test3) + run_test(test4) + run_test(test5) + run_test(test6) + + # TODO: RUSTPYTHON, NameError: name 'aiter' is not defined + @unittest.expectedFailure + def test_aiter_bad_args(self): + async def gen(): + yield 1 + async def call_with_too_few_args(): + await aiter() + async def call_with_too_many_args(): + await aiter(gen(), 1) + async def call_with_wrong_type_arg(): + await aiter(1) + with self.assertRaises(TypeError): + self.loop.run_until_complete(call_with_too_few_args()) + with self.assertRaises(TypeError): + self.loop.run_until_complete(call_with_too_many_args()) + with self.assertRaises(TypeError): + self.loop.run_until_complete(call_with_wrong_type_arg()) + async def to_list(self, gen): res = [] async for i in gen: @@ -663,6 +1073,7 @@ async def run(): await g.__anext__() await g.__anext__() del g + gc_collect() # For PyPy or other GCs. await asyncio.sleep(0.1) @@ -1079,6 +1490,89 @@ async def wait(): self.assertEqual(finalized, 2) + def test_async_gen_asyncio_shutdown_02(self): + messages = [] + + def exception_handler(loop, context): + messages.append(context) + + async def async_iterate(): + yield 1 + yield 2 + + it = async_iterate() + async def main(): + loop = asyncio.get_running_loop() + loop.set_exception_handler(exception_handler) + + async for i in it: + break + + asyncio.run(main()) + + self.assertEqual(messages, []) + + # TODO: RUSTPYTHON, ValueError: not enough values to unpack (expected 1, got 0) + @unittest.expectedFailure + def test_async_gen_asyncio_shutdown_exception_01(self): + messages = [] + + def exception_handler(loop, context): + messages.append(context) + + async def async_iterate(): + try: + yield 1 + yield 2 + finally: + 1/0 + + it = async_iterate() + async def main(): + loop = asyncio.get_running_loop() + loop.set_exception_handler(exception_handler) + + async for i in it: + break + + asyncio.run(main()) + + message, = messages + self.assertEqual(message['asyncgen'], it) + self.assertIsInstance(message['exception'], ZeroDivisionError) + self.assertIn('an error occurred during closing of asynchronous generator', + message['message']) + + # TODO: RUSTPYTHON, ValueError: not enough values to unpack (expected 1, got 0) + @unittest.expectedFailure + def test_async_gen_asyncio_shutdown_exception_02(self): + messages = [] + + def exception_handler(loop, context): + messages.append(context) + + async def async_iterate(): + try: + yield 1 + yield 2 + finally: + 1/0 + + async def main(): + loop = asyncio.get_running_loop() + loop.set_exception_handler(exception_handler) + + async for i in async_iterate(): + break + gc_collect() + + asyncio.run(main()) + + message, = messages + self.assertIsInstance(message['exception'], ZeroDivisionError) + self.assertIn('unhandled exception during asyncio.run() shutdown', + message['message']) + # TODO: RUSTPYTHON: async for gen expression compilation # def test_async_gen_expression_01(self): # async def arange(n): @@ -1096,6 +1590,7 @@ async def wait(): # res = self.loop.run_until_complete(run()) # self.assertEqual(res, [i * 2 for i in range(10)]) + # TODO: RUSTPYTHON: async for gen expression compilation # def test_async_gen_expression_02(self): # async def wrap(n): # await asyncio.sleep(0.01)