Step 10 - Scheduling all the time

The last step introduced a new Task class that runs a loop over a provided coroutine. But wait isn’t there an existing loop already? Yes indeed. Our Loop class. Why not extend the Loop class to run this loop too.

Loop v3
from typing import Any, Generator


class Loop:
    """Loop v3"""

    _instance: "Loop" = None

    def __init__(self):
        self._running = False
        self._scheduled = []

    @classmethod
    def get_current_loop(cls) -> "Loop":
        if not cls._instance:
            cls._instance = Loop()
        return cls._instance

    def run_step(self) -> None:
        """Run a single step/tick of the loop"""
        try:
            task = self._scheduled.pop(0)  # fifo: extract first item
            while task is not None:
                task.step()
                task = self._scheduled.pop(0)  # fifo: extract first item
        except IndexError:
            # list is empty
            pass

    def run(self, coroutine: Generator[Any, None, Any]) -> Any:
        """Run a coroutine"""
        self._running = True
        step = 1
        while self._running:
            print("Loop step", step, self._scheduled)
            try:
                self.run_step()
                next(coroutine)
                step += 1
            except StopIteration as e:
                self._running = False
                return e.value

    def stop(self) -> None:
        """Stop running the loop"""
        self._running = False

    def schedule(self, task) -> None:
        """Schedule a Task for the next step/tick"""
        self._scheduled.append(task)
Loop v3
--- loop2.py	2022-11-02 10:06:59.866639812 +0100
+++ loop3.py	2022-11-02 10:33:10.882781008 +0100
@@ -2,12 +2,13 @@
 
 
 class Loop:
-    """Loop v2"""
+    """Loop v3"""
 
     _instance: "Loop" = None
 
     def __init__(self):
         self._running = False
+        self._scheduled = []
 
     @classmethod
     def get_current_loop(cls) -> "Loop":
@@ -15,13 +16,25 @@
             cls._instance = Loop()
         return cls._instance
 
+    def run_step(self) -> None:
+        """Run a single step/tick of the loop"""
+        try:
+            task = self._scheduled.pop(0)  # fifo: extract first item
+            while task is not None:
+                task.step()
+                task = self._scheduled.pop(0)  # fifo: extract first item
+        except IndexError:
+            # list is empty
+            pass
+
     def run(self, coroutine: Generator[Any, None, Any]) -> Any:
         """Run a coroutine"""
         self._running = True
         step = 1
         while self._running:
-            print("Loop step", step)
+            print("Loop step", step, self._scheduled)
             try:
+                self.run_step()
                 next(coroutine)
                 step += 1
             except StopIteration as e:
@@ -31,3 +44,7 @@
     def stop(self) -> None:
         """Stop running the loop"""
         self._running = False
+
+    def schedule(self, task) -> None:
+        """Schedule a Task for the next step/tick"""
+        self._scheduled.append(task)

The Loop class is extended to allow scheduling Tasks for the next step/tick. All scheduled Tasks are run in the next step/tick.

Task v2
from typing import Any, Generator

from future import Future

from loop import Loop


class Task(Future):
    """Task v2"""

    def __init__(self, coroutine: Generator[Any, None, Any], name: str):
        super().__init__(name)
        self._coroutine = coroutine
        self._loop = Loop.get_current_loop()
        self.schedule()

    def step(self) -> None:
        try:
            next(self._coroutine)
        except StopIteration as e:
            self.set_result(e.value)
        else:
            # no result yet
            self.schedule()

    def schedule(self) -> None:
        self._loop.schedule(self)
Task v2
--- task1.py	2022-11-02 10:06:06.566947275 +0100
+++ task2.py	2022-11-02 10:08:05.706252365 +0100
@@ -2,21 +2,26 @@
 
 from future import Future
 
+from loop import Loop
+
 
 class Task(Future):
-    """Task v1"""
+    """Task v2"""
 
     def __init__(self, coroutine: Generator[Any, None, Any], name: str):
         super().__init__(name)
         self._coroutine = coroutine
-        self.run_loop()
+        self._loop = Loop.get_current_loop()
+        self.schedule()
 
     def step(self) -> None:
         try:
             next(self._coroutine)
         except StopIteration as e:
             self.set_result(e.value)
+        else:
+            # no result yet
+            self.schedule()
 
-    def run_loop(self) -> None:
-        while not self.done():
-            self.step()
+    def schedule(self) -> None:
+        self._loop.schedule(self)

The Task is refactored to not run a loop by itself. Instead it schedules its execution via the Loop.

Let’s take a look at our simple example:

from future import Future
from task import Task

from loop import Loop


def some_result(result):
    future = Future("Some Result")
    future.set_result(result)
    return (yield from future)


def add(coroutine1, coroutine2):
    x = yield from Task(coroutine1, "Add X")
    y = yield from Task(coroutine2, "Add Y")
    return x + y


def main():
    return (yield from add(some_result(1), some_result(2)))


loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)

Output:

Loop step 1 []
Loop step 2 [<Task name='Add X' id='0x7f44a8c33b80'>]
Loop step 3 [<Task name='Add Y' id='0x7f44a8c33b80'>]
Loop finished with result 3

Both tasks are still run sequentially. Let’s change that.

from future import Future
from task import Task

from loop import Loop


def some_result(result):
    future = Future("Some Result")
    future.set_result(result)
    return (yield from future)


def add(coroutine1, coroutine2):
    task1 = Task(coroutine1, "Add X")
    task2 = Task(coroutine2, "Add Y")
    x = yield from task1
    y = yield from task2
    return x + y


def main():
    return (yield from add(some_result(1), some_result(2)))


loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)
--- step10_1.py	2022-10-26 09:41:52.478641884 +0200
+++ step10_2.py	2022-10-26 08:52:23.880895959 +0200
@@ -11,8 +11,10 @@
 
 
 def add(coroutine1, coroutine2):
-    x = yield from Task(coroutine1, "Add X")
-    y = yield from Task(coroutine2, "Add Y")
+    task1 = Task(coroutine1, "Add X")
+    task2 = Task(coroutine2, "Add Y")
+    x = yield from task1
+    y = yield from task2
     return x + y
 
 

Output:

Loop step 1 []
Loop step 2 [<Task name='Add X' id='0x7fe9c7e6d240'>, <Task name='Add Y' id='0x7fe9c7e6e290'>]
Loop step 3 []
Loop finished with result 3

Summary

  • The Loop schedules tasks now.

  • The Task itself has no loop anymore to run the coroutine.

  • Tasks can be run immediately and concurrently.