Step 14 - Let’s wait

Often it is desireable to start some coroutines concurrently and wait for their results. This can be accomplished by wrapping them in Tasks in some wait function.

wait.py
from asyncio import iscoroutine
from typing import Any, Generator, Iterable, List, Union

from future import Future
from task import Task

Coroutine = Generator[Any, None, Any]


def ensure_future(coro_or_future: Union[Coroutine, Future]) -> Future:
    """
    Ensure that passed object is a Future.

    If it is already a Future it is returned. If it is a coroutine a Task
    wrapping the coroutine is returned.
    """
    if isinstance(coro_or_future, Future):
        return coro_or_future
    elif iscoroutine(coro_or_future):
        return Task(coro_or_future, f"Task for {coro_or_future.__name__}")
    else:
        raise TypeError("A Future or a coroutine is required")


def wait(coros_or_futures: Iterable[Union[Coroutine, Future]]) -> List[Any]:
    """
    Wait for coroutines or futures to finish and gather their results as a list
    """
    counter = len(coros_or_futures)

    futures = [ensure_future(f) for f in coros_or_futures]
    waiter = Future(f"Waiter for {', '.join([str(f) for f in futures])}")

    def _on_completion(_future):
        nonlocal counter
        counter -= 1
        if counter <= 0:
            # all results are available let's wakeup
            waiter.set_result(None)

    for future in futures:
        future.add_done_callback(_on_completion)

    yield from waiter

    return [f.result() for f in futures]

Using the wait function our previous example from Step 13 can be re-written as:

from future import Future
from loop import Loop
from wait import wait


def some_result(result):
    future = Future("Some Result")
    future.set_result(result)
    return (yield from future)


def add(coroutine1, coroutine2):
    x, y = yield from wait([coroutine1, coroutine2])
    return x + y


def main():
    return (yield from add(some_result(1), some_result(2)))


loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)

Output:

Loop step 1 [<Handle name='Initial Task' callback='step'>]
Loop step 2 [<Handle name='Task for some_result' callback='step'>, <Handle name='Task for some_result' callback='step'>]
Loop step 3 [<Handle name='Some Result' callback='_wakeup'>, <Handle name='Some Result' callback='_wakeup'>]
Loop step 4 [<Handle name='Task for some_result' callback='step'>, <Handle name='Task for some_result' callback='step'>]
Loop step 5 [<Handle name='Task for some_result' callback='_on_completion'>, <Handle name='Task for some_result' callback='_on_completion'>]
Loop step 6 [<Handle name='Waiter for <Task name='Task for some_result' id='0x7fe9759a5e40'>, <Task name='Task for some_result' id='0x7fe9759a5f00'>' callback='_wakeup'>]
Loop step 7 [<Handle name='Initial Task' callback='step'>]
Loop step 8 [<Handle name='Initial Task' callback='_done'>]
Loop finished with result 3

Summary

  • Using specific wait functions allow ot gather results from concurrent Tasks more easily.