Step 9 - A Job has to be done

Let us look at the example from the last step again with some more output.

from loop import Loop


class Future:
    """Return a result in the future"""

    _result = None

    def set_result(self, result):
        self._result = result

    def __iter__(self):
        print(self.__class__.__name__, 1)
        yield
        print(self.__class__.__name__, 2)
        return self._result


def some_result(value):
    future = Future()
    future.set_result(value)
    print(some_result.__name__, 1)
    return (yield from future)


def add(coroutine1, coroutine2):
    print(add.__name__, 1)
    x = yield from coroutine1
    print(add.__name__, 2)
    y = yield from coroutine2
    print(add.__name__, 3)
    return x + y


def main():
    print(main.__name__, 1)
    result = yield from add(some_result(1), some_result(2))
    print(main.__name__, 2)
    return result


loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)

Output:

Loop step 1
main 1
add 1
some_result 1
Future 1
Loop step 2
Future 2
add 2
some_result 1
Future 1
Loop step 3
Future 2
add 3
main 2
Loop finished with result 3

Future 1 is reached and afterwards the control is given back to the loop. A next step is executed. After Future 2 a result is available.

As you can see there is still no real concurrency. Only a single coroutine is run until it is finished and the result is available before another one is called in the next step of the loop.

Yes we can suspend the current function but we still run all coroutines sequentially.

To change that we need a new thing.

  1. This thing should take a coroutine, because all our business logic uses coroutines.

  2. It should run the coroutine immediately, because we want to get a job done.

  3. It should run until a job is finally done.

  4. And its result should be gathered in the future.

Because a result should be gathered in the future, let’s derive the new thing from Future. A job can be worked on in a task. So let’s call this new thing Task. But first let’s extend our Future a bit.

Future v2
from typing import Any, Generator


class Future:
    """Return a result in the future v2"""

    _result = None

    def __init__(self, name: str = None):
        self._name = name

    def set_result(self, result: Any) -> None:
        self._result = result

    def result(self) -> Any:
        return self._result

    def done(self) -> bool:
        return self.result() is not None

    def __repr__(self) -> str:
        return (
            f"<{self.__class__.__name__} name='{self._name}' "
            f"id='{hex(id(self))}'>"
        )

    def __iter__(self) -> Generator[None, None, Any]:
        yield
        return self.result()
Future v2
--- future1.py	2022-11-02 10:49:53.297744123 +0100
+++ future2.py	2022-11-02 10:50:00.133708767 +0100
@@ -2,13 +2,28 @@
 
 
 class Future:
-    """Return a result in the future v1"""
+    """Return a result in the future v2"""
 
     _result = None
 
-    def set_result(self, result: Any):
+    def __init__(self, name: str = None):
+        self._name = name
+
+    def set_result(self, result: Any) -> None:
         self._result = result
 
+    def result(self) -> Any:
+        return self._result
+
+    def done(self) -> bool:
+        return self.result() is not None
+
+    def __repr__(self) -> str:
+        return (
+            f"<{self.__class__.__name__} name='{self._name}' "
+            f"id='{hex(id(self))}'>"
+        )
+
     def __iter__(self) -> Generator[None, None, Any]:
         yield
-        return self._result
+        return self.result()

The Future class got a done method to check if the result has been set and also a result method to allow accessing the result from the outside of the class. Now it is possible to check whether the state of the Future is done. Additionally it is possible to pass a name to identify the Future via __repr__ more easily in later steps for debugging purposes.

Task v1
from typing import Any, Generator

from future import Future


class Task(Future):
    """Task v1"""

    def __init__(self, coroutine: Generator[Any, None, Any], name: str):
        super().__init__(name)
        self._coroutine = coroutine
        self.run_loop()

    def step(self) -> None:
        try:
            next(self._coroutine)
        except StopIteration as e:
            self.set_result(e.value)

    def run_loop(self) -> None:
        while not self.done():
            self.step()
from future import Future
from task import Task

from loop import Loop


def some_result(result):
    future = Future("Some Result")
    future.set_result(result)
    return (yield from future)


def add(coroutine1, coroutine2):
    x = yield from Task(coroutine1, "Add X")
    y = yield from Task(coroutine2, "Add Y")
    return x + y


def main():
    return (yield from add(some_result(1), some_result(2)))


loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)

Output:

Loop step 1
Loop step 2
Loop step 3
Loop finished with result 3

First of all the functionality and the result hasn’t changed. We just introduced a new class that wraps our coroutines. We will get to concurrently running Tasks in the next chapter.

Summary

  • We have a basic Task class that runs a coroutine in a loop.

  • This coroutine is started immediately.

  • The Task returns the result of the coroutine in the future when it’s done.