Step 11 - Schedule Callbacks

The Loop is currently stuck to schedule Tasks and run coroutines. We could introduce a new Protocol to abstract the scheduling of Tasks but at the end we just need to schedule some callbacks (which are actually Callables) and their arguments. Therefore we introduce an internal class Handler.

Handle
from typing import Any, Callable, Iterable


class Handle:
    """A callback handle"""

    def __init__(self, name: str, callback: Callable, args: Iterable[Any]):
        self._name = name
        self._callback = callback
        self._args = args

    def run(self) -> None:
        self._callback(*self._args)

    def __repr__(self) -> str:
        return (
            f"<{self.__class__.__name__} name='{self._name}' "
            f"callback='{self._callback.__name__}'>"
        )

Let’s arrange our Loop to schedule Handle instances.

Loop v4
from typing import Any, Callable, Generator

from handle import Handle


class Loop:
    """Loop v4"""

    _instance: "Loop" = None

    def __init__(self):
        self._running = False
        self._scheduled = []

    @classmethod
    def get_current_loop(cls) -> "Loop":
        if not cls._instance:
            cls._instance = Loop()
        return cls._instance

    def run_step(self) -> None:
        """Run a single step/tick of the loop"""
        try:
            handle = self._scheduled.pop(0)  # fifo: extract first item
            while handle is not None:
                handle.run()
                handle = self._scheduled.pop(0)  # fifo: extract first item
        except IndexError:
            # list is empty
            pass

    def run(self, coroutine: Generator[Any, None, Any]) -> Any:
        """Run a coroutine"""
        self._running = True
        step = 1
        while self._running:
            print("Loop step", step, self._scheduled)
            try:
                self.run_step()
                next(coroutine)
                step += 1
            except StopIteration as e:
                self._running = False
                return e.value

    def stop(self) -> None:
        """Stop running the loop"""
        self._running = False

    def schedule(self, name: str, callback: Callable, *args: Any) -> None:
        """Schedule a callback for the next step/tick"""
        self._scheduled.append(Handle(name, callback, args))
Loop v4
--- loop3.py	2022-11-02 10:33:10.882781008 +0100
+++ loop4.py	2022-11-02 10:35:15.054206925 +0100
@@ -1,8 +1,10 @@
-from typing import Any, Generator
+from typing import Any, Callable, Generator
+
+from handle import Handle
 
 
 class Loop:
-    """Loop v3"""
+    """Loop v4"""
 
     _instance: "Loop" = None
 
@@ -19,10 +21,10 @@
     def run_step(self) -> None:
         """Run a single step/tick of the loop"""
         try:
-            task = self._scheduled.pop(0)  # fifo: extract first item
-            while task is not None:
-                task.step()
-                task = self._scheduled.pop(0)  # fifo: extract first item
+            handle = self._scheduled.pop(0)  # fifo: extract first item
+            while handle is not None:
+                handle.run()
+                handle = self._scheduled.pop(0)  # fifo: extract first item
         except IndexError:
             # list is empty
             pass
@@ -45,6 +47,6 @@
         """Stop running the loop"""
         self._running = False
 
-    def schedule(self, task) -> None:
-        """Schedule a Task for the next step/tick"""
-        self._scheduled.append(task)
+    def schedule(self, name: str, callback: Callable, *args: Any) -> None:
+        """Schedule a callback for the next step/tick"""
+        self._scheduled.append(Handle(name, callback, args))

The scheduler method (run_step) now loops over Handle instances instead of Tasks. The schedule method got adjusted to accept a Callable as callback and some arguments. These are put into a Handle which gets scheduled for the next step/tick.

Task v3
from typing import Any, Generator

from future import Future

from loop import Loop


class Task(Future):
    """Task v3"""

    def __init__(self, coroutine: Generator[Any, None, Any], name: str):
        super().__init__(name)
        self._coroutine = coroutine
        self._loop = Loop.get_current_loop()
        self.schedule()

    def step(self) -> None:
        try:
            next(self._coroutine)
        except StopIteration as e:
            self.set_result(e.value)
        else:
            # no result yet
            self.schedule()

    def schedule(self) -> None:
        self._loop.schedule(self._name, self.step)
Task v3
--- task2.py	2022-11-02 10:08:05.706252365 +0100
+++ task3.py	2022-11-02 10:10:30.317376323 +0100
@@ -6,7 +6,7 @@
 
 
 class Task(Future):
-    """Task v2"""
+    """Task v3"""
 
     def __init__(self, coroutine: Generator[Any, None, Any], name: str):
         super().__init__(name)
@@ -24,4 +24,4 @@
             self.schedule()
 
     def schedule(self) -> None:
-        self._loop.schedule(self)
+        self._loop.schedule(self._name, self.step)

The Task requires only a small change. Instead of passing itself to the loop, it passed its step method for being scheduled and its name for debugging purposes.

from future import Future
from task import Task

from loop import Loop


def some_result(result):
    future = Future("Some Result")
    future.set_result(result)
    return (yield from future)


def add(coroutine1, coroutine2):
    task1 = Task(coroutine1, "Add X")
    task2 = Task(coroutine2, "Add Y")
    x = yield from task1
    y = yield from task2
    return x + y


def main():
    return (yield from add(some_result(1), some_result(2)))


loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)

Output:

Loop step 1 []
Loop step 2 [<Handle name='Add X' callback='step'>, <Handle name='Add Y' callback='step'>]
Loop step 3 []
Loop finished with result 3

Summary

  • Using a callback abstraction makes the scheduling more generic.

  • Our loop can schedule and run all kind of things using a callback.

  • Internally the Loop schedules Handle instances.