Step 14 - Let’s wait¶
Often it is desireable to start some coroutines concurrently and wait for their
results. This can be accomplished by wrapping them in Tasks in some wait
function.
from asyncio import iscoroutine
from typing import Any, Generator, Iterable, List, Union
from future import Future
from task import Task
Coroutine = Generator[Any, None, Any]
def ensure_future(coro_or_future: Union[Coroutine, Future]) -> Future:
"""
Ensure that passed object is a Future.
If it is already a Future it is returned. If it is a coroutine a Task
wrapping the coroutine is returned.
"""
if isinstance(coro_or_future, Future):
return coro_or_future
elif iscoroutine(coro_or_future):
return Task(coro_or_future, f"Task for {coro_or_future.__name__}")
else:
raise TypeError("A Future or a coroutine is required")
def wait(coros_or_futures: Iterable[Union[Coroutine, Future]]) -> List[Any]:
"""
Wait for coroutines or futures to finish and gather their results as a list
"""
counter = len(coros_or_futures)
futures = [ensure_future(f) for f in coros_or_futures]
waiter = Future(f"Waiter for {', '.join([str(f) for f in futures])}")
def _on_completion(_future):
nonlocal counter
counter -= 1
if counter <= 0:
# all results are available let's wakeup
waiter.set_result(None)
for future in futures:
future.add_done_callback(_on_completion)
yield from waiter
return [f.result() for f in futures]
Using the wait
function our previous example from Step 13
can be re-written as:
from future import Future
from loop import Loop
from wait import wait
def some_result(result):
future = Future("Some Result")
future.set_result(result)
return (yield from future)
def add(coroutine1, coroutine2):
x, y = yield from wait([coroutine1, coroutine2])
return x + y
def main():
return (yield from add(some_result(1), some_result(2)))
loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)
Output:
Loop step 1 [<Handle name='Initial Task' callback='step'>]
Loop step 2 [<Handle name='Task for some_result' callback='step'>, <Handle name='Task for some_result' callback='step'>]
Loop step 3 [<Handle name='Some Result' callback='_wakeup'>, <Handle name='Some Result' callback='_wakeup'>]
Loop step 4 [<Handle name='Task for some_result' callback='step'>, <Handle name='Task for some_result' callback='step'>]
Loop step 5 [<Handle name='Task for some_result' callback='_on_completion'>, <Handle name='Task for some_result' callback='_on_completion'>]
Loop step 6 [<Handle name='Waiter for <Task name='Task for some_result' id='0x7fe9759a5e40'>, <Task name='Task for some_result' id='0x7fe9759a5f00'>' callback='_wakeup'>]
Loop step 7 [<Handle name='Initial Task' callback='step'>]
Loop step 8 [<Handle name='Initial Task' callback='_done'>]
Loop finished with result 3
Summary
Using specific
wait
functions allow ot gather results from concurrentTask
s more easily.