Step 12 - Back to the Future

Currently a Future is done in exactly two steps/ticks of the Loop.

from future import Future

future = Future()
it = iter(future)
# step 1
next(it)
# step 2
# still no result yet
next(it)

A StopIteration exception is raised without a value set at the second step.

Hint

Remember yield from future is the same as

it = iter(future)
while True:
  try:
    value = next(it)
    yield value
  except StopIteration as e:
    return e.value

What if we need to wait for more then one step/tick for example to wait for some external event providing the result?

Future v3
from typing import Any, Callable, Generator

from loop import Loop


class Future:
    """Return a result in the future v3"""

    _result = None
    _done = False

    def __init__(self, name: str = None):
        self._name = name
        self._callbacks = []
        self._loop = Loop.get_current_loop()

    def set_result(self, result: Any):
        self._result = result
        self._done = True
        self._schedule_callbacks()

    def result(self) -> Any:
        return self._result

    def done(self) -> bool:
        return self._done

    def add_done_callback(self, fn: Callable[["Future"], None]) -> None:
        if self.done():
            # we already have a result
            self._loop.schedule(self._name, fn, self)
        else:
            self._callbacks.append(fn)

    def _schedule_callbacks(self) -> None:
        if not self._callbacks:
            return

        callbacks = self._callbacks.copy()
        self._callbacks.clear()

        for callback in callbacks:
            self._loop.schedule(self._name, callback, self)

    def __repr__(self) -> str:
        return (
            f"<{self.__class__.__name__} name='{self._name}' "
            f"id='{hex(id(self))}'>"
        )

    def __iter__(self) -> Generator["Future", None, Any]:
        yield self  # some new magic
        return self.result()
Future v3
--- future2.py	2022-11-02 10:50:00.133708767 +0100
+++ future3.py	2022-11-02 10:37:42.421495887 +0100
@@ -1,22 +1,46 @@
-from typing import Any, Generator
+from typing import Any, Callable, Generator
+
+from loop import Loop
 
 
 class Future:
-    """Return a result in the future v2"""
+    """Return a result in the future v3"""
 
     _result = None
+    _done = False
 
     def __init__(self, name: str = None):
         self._name = name
+        self._callbacks = []
+        self._loop = Loop.get_current_loop()
 
-    def set_result(self, result: Any) -> None:
+    def set_result(self, result: Any):
         self._result = result
+        self._done = True
+        self._schedule_callbacks()
 
     def result(self) -> Any:
         return self._result
 
     def done(self) -> bool:
-        return self.result() is not None
+        return self._done
+
+    def add_done_callback(self, fn: Callable[["Future"], None]) -> None:
+        if self.done():
+            # we already have a result
+            self._loop.schedule(self._name, fn, self)
+        else:
+            self._callbacks.append(fn)
+
+    def _schedule_callbacks(self) -> None:
+        if not self._callbacks:
+            return
+
+        callbacks = self._callbacks.copy()
+        self._callbacks.clear()
+
+        for callback in callbacks:
+            self._loop.schedule(self._name, callback, self)
 
     def __repr__(self) -> str:
         return (
@@ -24,6 +48,6 @@
             f"id='{hex(id(self))}'>"
         )
 
-    def __iter__(self) -> Generator[None, None, Any]:
-        yield
+    def __iter__(self) -> Generator["Future", None, Any]:
+        yield self  # some new magic
         return self.result()

The Future is extended to allow adding callbacks that are executed after a result is set. This allows for getting notified when a Future is done. When the Future is done the callbacks are scheduled for the next step/tick in our Loop.

But the actual waiting for some external event providing the result is not implemented directly in a Future. Instead the Task is extended.

Task v4
from typing import Any, Generator

from future import Future

from loop import Loop


class Task(Future):
    """Task v4"""

    def __init__(self, coroutine: Generator[Any, None, Any], name: str):
        super().__init__(name)
        self._coroutine = coroutine
        self._loop = Loop.get_current_loop()
        self.schedule()

    def step(self) -> None:
        try:
            yielded = next(self._coroutine)
        except StopIteration as e:
            self.set_result(e.value)
        else:
            # no result yet
            if isinstance(yielded, Future):
                # we are blocked by some external event for example waiting for
                # incoming data. let's wait until result is available
                yielded.add_done_callback(self._wakeup)
            else:
                # just schedule again
                self.schedule()

    def schedule(self) -> None:
        self._loop.schedule(self._name, self.step)

    def _wakeup(self, _future: Future) -> None:
        self.schedule()
Task v4
--- task3.py	2022-11-02 10:10:30.317376323 +0100
+++ task4.py	2022-11-02 10:42:23.012079415 +0100
@@ -6,7 +6,7 @@
 
 
 class Task(Future):
-    """Task v3"""
+    """Task v4"""
 
     def __init__(self, coroutine: Generator[Any, None, Any], name: str):
         super().__init__(name)
@@ -16,12 +16,21 @@
 
     def step(self) -> None:
         try:
-            next(self._coroutine)
+            yielded = next(self._coroutine)
         except StopIteration as e:
             self.set_result(e.value)
         else:
             # no result yet
-            self.schedule()
+            if isinstance(yielded, Future):
+                # we are blocked by some external event for example waiting for
+                # incoming data. let's wait until result is available
+                yielded.add_done_callback(self._wakeup)
+            else:
+                # just schedule again
+                self.schedule()
 
     def schedule(self) -> None:
         self._loop.schedule(self._name, self.step)
+
+    def _wakeup(self, _future: Future) -> None:
+        self.schedule()

The Task is extended to run a coroutine that is actually being blocked by some external event. The coroutine indicates this by yielding a Future. If a Future is yielded the Tasks registers a wake up callback at this Future. The Task is suspended now (it isn’t scheduled in the loop anymore). If the Future is done the Task is notified and scheduled again via the Loop. This allows the Task to sleep for one or more scheduler ticks/steps until some event has occurred.

from future import Future
from task import Task

from loop import Loop


def some_result(result):
    future = Future("Some Result")
    future.set_result(result)
    return (yield from future)


def add(coroutine1, coroutine2):
    task1 = Task(coroutine1, "Add X")
    task2 = Task(coroutine2, "Add Y")
    x = yield from task1
    y = yield from task2
    return x + y


def main():
    return (yield from add(some_result(1), some_result(2)))


loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)

Output:

Loop step 1 []
Loop step 2 [<Handle name='Add X' callback='step'>, <Handle name='Add Y' callback='step'>]
Loop step 3 []
Loop finished with result 3

Summary

  • A Task can wait for results that are currently blocked.

  • If a Task gets a Future from a coroutine it blocks until the Future is done.

  • Blocked means the Task is suspended, it is not scheduled in the Loop anymore and the result is not available yet.

  • If the result is available finally the Task gets notified and schedules itself for the next step/tick in the Loop.