Step 13 - Deduplicate Running Coroutines

Currently the code for running a coroutine

try:
    next(coroutine)
except StopIteration as e:
    result = e.value

can be found twice in our code. Once in the Task class and another time in the Loop class. With the new add_done_callback also being available at the Task class (because it derives from the Future) we could create an initial root Task that runs the main coroutine in the Loop. If that root Task is done our Loop is done too.

Loop v5
from typing import Any, Callable, Generator

from handle import Handle


class Loop:
    """Loop v5"""

    _instance: "Loop" = None

    def __init__(self):
        self._running = False
        self._scheduled = []

    @classmethod
    def get_current_loop(cls) -> "Loop":
        if not cls._instance:
            cls._instance = Loop()
        return cls._instance

    def run_step(self) -> None:
        """Run a single step/tick of the loop"""
        # execute all current known handles only.
        # not the ones added while running the handle callbacks
        scheduled = self._scheduled.copy()
        self._scheduled.clear()

        try:
            handle = scheduled.pop(0)  # fifo: extract first item
            while handle is not None:
                handle.run()
                handle = scheduled.pop(0)  # fifo: extract first item
        except IndexError:
            # list is empty
            pass
        # a shorter and even more smarter version for the above code could be
        #
        # for _ in range(len(self._scheduled)):
        #     handle = self._scheduled.pop(0)  # fifo: extract first item
        #     handle.run()

    def run_loop(self) -> None:
        """Run the loop"""
        self._running = True
        step = 1
        while self._running:
            print("Loop step", step, self._scheduled)
            self.run_step()
            step += 1

    def run(self, coroutine: Generator[Any, None, Any]) -> Any:
        """Run a coroutine until it is done/completed"""
        from task import Task  # avoid cyclic dependency

        # create a root task for our coroutine
        # the tasks gets scheduled immediately in its constructor
        task = Task(coroutine, "Initial Task")
        task.add_done_callback(self._done)
        self.run_loop()
        return task.result()

    def stop(self) -> None:
        """Stop running the loop"""
        self._running = False

    def schedule(self, name: str, callback: Callable, *args: Any) -> None:
        """Schedule a callback for the next step/tick"""
        self._scheduled.append(Handle(name, callback, args))

    def _done(self, _future) -> None:
        self.stop()
Loop v5
--- loop4.py	2022-11-02 10:35:15.054206925 +0100
+++ loop5.py	2022-11-02 15:31:25.547688722 +0100
@@ -4,7 +4,7 @@
 
 
 class Loop:
-    """Loop v4"""
+    """Loop v5"""
 
     _instance: "Loop" = None
 
@@ -20,28 +20,44 @@
 
     def run_step(self) -> None:
         """Run a single step/tick of the loop"""
+        # execute all current known handles only.
+        # not the ones added while running the handle callbacks
+        scheduled = self._scheduled.copy()
+        self._scheduled.clear()
+
         try:
-            handle = self._scheduled.pop(0)  # fifo: extract first item
+            handle = scheduled.pop(0)  # fifo: extract first item
             while handle is not None:
                 handle.run()
-                handle = self._scheduled.pop(0)  # fifo: extract first item
+                handle = scheduled.pop(0)  # fifo: extract first item
         except IndexError:
             # list is empty
             pass
+        # a shorter and even more smarter version for the above code could be
+        #
+        # for _ in range(len(self._scheduled)):
+        #     handle = self._scheduled.pop(0)  # fifo: extract first item
+        #     handle.run()
 
-    def run(self, coroutine: Generator[Any, None, Any]) -> Any:
-        """Run a coroutine"""
+    def run_loop(self) -> None:
+        """Run the loop"""
         self._running = True
         step = 1
         while self._running:
             print("Loop step", step, self._scheduled)
-            try:
-                self.run_step()
-                next(coroutine)
-                step += 1
-            except StopIteration as e:
-                self._running = False
-                return e.value
+            self.run_step()
+            step += 1
+
+    def run(self, coroutine: Generator[Any, None, Any]) -> Any:
+        """Run a coroutine until it is done/completed"""
+        from task import Task  # avoid cyclic dependency
+
+        # create a root task for our coroutine
+        # the tasks gets scheduled immediately in its constructor
+        task = Task(coroutine, "Initial Task")
+        task.add_done_callback(self._done)
+        self.run_loop()
+        return task.result()
 
     def stop(self) -> None:
         """Stop running the loop"""
@@ -50,3 +66,6 @@
     def schedule(self, name: str, callback: Callable, *args: Any) -> None:
         """Schedule a callback for the next step/tick"""
         self._scheduled.append(Handle(name, callback, args))
+
+    def _done(self, _future) -> None:
+        self.stop()

The Loop doesn’t know anything about the Generator/Iterator protocol anymore. It just starts a coroutine in a Tasks and schedules Handles. Instead the Tasks are resuming the coroutines via the Generator/Iterator protocol.

from future import Future
from loop import Loop
from task import Task


def some_result(result):
    future = Future("Some Result")
    future.set_result(result)
    return (yield from future)


def add(coroutine1, coroutine2):
    task1 = Task(coroutine1, "Add X")
    task2 = Task(coroutine2, "Add Y")
    x = yield from task1
    y = yield from task2
    return x + y


def main():
    return (yield from add(some_result(1), some_result(2)))


loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)

Output:

Loop step 1 [<Handle name='Initial Task' callback='step'>]
Loop step 2 [<Handle name='Add X' callback='step'>, <Handle name='Add Y' callback='step'>]
Loop step 3 [<Handle name='Some Result' callback='_wakeup'>, <Handle name='Some Result' callback='_wakeup'>]
Loop step 4 [<Handle name='Add X' callback='step'>, <Handle name='Add Y' callback='step'>]
Loop step 5 [<Handle name='Add X' callback='_wakeup'>]
Loop step 6 [<Handle name='Initial Task' callback='step'>]
Loop step 7 [<Handle name='Add Y' callback='_wakeup'>]
Loop step 8 [<Handle name='Initial Task' callback='step'>]
Loop step 9 [<Handle name='Initial Task' callback='_done'>]
Loop finished with result 3

Summary

  • It’s possible have code to handle coroutines at a single place.

  • Tasks are dedicated to run coroutines.

  • Starting an initial root Task allows the Loop to be completely independent of coroutines.