Step 10 - Scheduling all the time¶
The last step introduced a new Task
class that runs a loop over a provided
coroutine. But wait isn’t there an existing loop already? Yes indeed. Our
Loop
class. Why not extend the Loop
class to run this loop too.
from typing import Any, Generator
class Loop:
"""Loop v3"""
_instance: "Loop" = None
def __init__(self):
self._running = False
self._scheduled = []
@classmethod
def get_current_loop(cls) -> "Loop":
if not cls._instance:
cls._instance = Loop()
return cls._instance
def run_step(self) -> None:
"""Run a single step/tick of the loop"""
try:
task = self._scheduled.pop(0) # fifo: extract first item
while task is not None:
task.step()
task = self._scheduled.pop(0) # fifo: extract first item
except IndexError:
# list is empty
pass
def run(self, coroutine: Generator[Any, None, Any]) -> Any:
"""Run a coroutine"""
self._running = True
step = 1
while self._running:
print("Loop step", step, self._scheduled)
try:
self.run_step()
next(coroutine)
step += 1
except StopIteration as e:
self._running = False
return e.value
def stop(self) -> None:
"""Stop running the loop"""
self._running = False
def schedule(self, task) -> None:
"""Schedule a Task for the next step/tick"""
self._scheduled.append(task)
--- loop2.py 2022-11-02 10:06:59.866639812 +0100
+++ loop3.py 2022-11-02 10:33:10.882781008 +0100
@@ -2,12 +2,13 @@
class Loop:
- """Loop v2"""
+ """Loop v3"""
_instance: "Loop" = None
def __init__(self):
self._running = False
+ self._scheduled = []
@classmethod
def get_current_loop(cls) -> "Loop":
@@ -15,13 +16,25 @@
cls._instance = Loop()
return cls._instance
+ def run_step(self) -> None:
+ """Run a single step/tick of the loop"""
+ try:
+ task = self._scheduled.pop(0) # fifo: extract first item
+ while task is not None:
+ task.step()
+ task = self._scheduled.pop(0) # fifo: extract first item
+ except IndexError:
+ # list is empty
+ pass
+
def run(self, coroutine: Generator[Any, None, Any]) -> Any:
"""Run a coroutine"""
self._running = True
step = 1
while self._running:
- print("Loop step", step)
+ print("Loop step", step, self._scheduled)
try:
+ self.run_step()
next(coroutine)
step += 1
except StopIteration as e:
@@ -31,3 +44,7 @@
def stop(self) -> None:
"""Stop running the loop"""
self._running = False
+
+ def schedule(self, task) -> None:
+ """Schedule a Task for the next step/tick"""
+ self._scheduled.append(task)
The Loop
class is extended to allow scheduling Task
s for the next step/tick.
All scheduled Task
s are run in the next step/tick.
from typing import Any, Generator
from future import Future
from loop import Loop
class Task(Future):
"""Task v2"""
def __init__(self, coroutine: Generator[Any, None, Any], name: str):
super().__init__(name)
self._coroutine = coroutine
self._loop = Loop.get_current_loop()
self.schedule()
def step(self) -> None:
try:
next(self._coroutine)
except StopIteration as e:
self.set_result(e.value)
else:
# no result yet
self.schedule()
def schedule(self) -> None:
self._loop.schedule(self)
--- task1.py 2022-11-02 10:06:06.566947275 +0100
+++ task2.py 2022-11-02 10:08:05.706252365 +0100
@@ -2,21 +2,26 @@
from future import Future
+from loop import Loop
+
class Task(Future):
- """Task v1"""
+ """Task v2"""
def __init__(self, coroutine: Generator[Any, None, Any], name: str):
super().__init__(name)
self._coroutine = coroutine
- self.run_loop()
+ self._loop = Loop.get_current_loop()
+ self.schedule()
def step(self) -> None:
try:
next(self._coroutine)
except StopIteration as e:
self.set_result(e.value)
+ else:
+ # no result yet
+ self.schedule()
- def run_loop(self) -> None:
- while not self.done():
- self.step()
+ def schedule(self) -> None:
+ self._loop.schedule(self)
The Task
is refactored to not run a loop by itself. Instead it schedules its
execution via the Loop
.
Let’s take a look at our simple example:
from future import Future
from task import Task
from loop import Loop
def some_result(result):
future = Future("Some Result")
future.set_result(result)
return (yield from future)
def add(coroutine1, coroutine2):
x = yield from Task(coroutine1, "Add X")
y = yield from Task(coroutine2, "Add Y")
return x + y
def main():
return (yield from add(some_result(1), some_result(2)))
loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)
Output:
Loop step 1 []
Loop step 2 [<Task name='Add X' id='0x7f44a8c33b80'>]
Loop step 3 [<Task name='Add Y' id='0x7f44a8c33b80'>]
Loop finished with result 3
Both tasks are still run sequentially. Let’s change that.
from future import Future
from task import Task
from loop import Loop
def some_result(result):
future = Future("Some Result")
future.set_result(result)
return (yield from future)
def add(coroutine1, coroutine2):
task1 = Task(coroutine1, "Add X")
task2 = Task(coroutine2, "Add Y")
x = yield from task1
y = yield from task2
return x + y
def main():
return (yield from add(some_result(1), some_result(2)))
loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)
--- step10_1.py 2022-10-26 09:41:52.478641884 +0200
+++ step10_2.py 2022-10-26 08:52:23.880895959 +0200
@@ -11,8 +11,10 @@
def add(coroutine1, coroutine2):
- x = yield from Task(coroutine1, "Add X")
- y = yield from Task(coroutine2, "Add Y")
+ task1 = Task(coroutine1, "Add X")
+ task2 = Task(coroutine2, "Add Y")
+ x = yield from task1
+ y = yield from task2
return x + y
Output:
Loop step 1 []
Loop step 2 [<Task name='Add X' id='0x7fe9c7e6d240'>, <Task name='Add Y' id='0x7fe9c7e6e290'>]
Loop step 3 []
Loop finished with result 3
Summary
The
Loop
schedules tasks now.The
Task
itself has no loop anymore to run the coroutine.Task
s can be run immediately and concurrently.