Step 11 - Schedule Callbacks¶
The Loop
is currently stuck to schedule Task
s and run coroutines.
We could introduce a new Protocol
to abstract the scheduling of Tasks
but
at the end we just need to schedule some callbacks (which are actually
Callable
s) and their arguments. Therefore we introduce an internal class
Handler
.
from typing import Any, Callable, Iterable
class Handle:
"""A callback handle"""
def __init__(self, name: str, callback: Callable, args: Iterable[Any]):
self._name = name
self._callback = callback
self._args = args
def run(self) -> None:
self._callback(*self._args)
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} name='{self._name}' "
f"callback='{self._callback.__name__}'>"
)
Let’s arrange our Loop
to schedule Handle
instances.
from typing import Any, Callable, Generator
from handle import Handle
class Loop:
"""Loop v4"""
_instance: "Loop" = None
def __init__(self):
self._running = False
self._scheduled = []
@classmethod
def get_current_loop(cls) -> "Loop":
if not cls._instance:
cls._instance = Loop()
return cls._instance
def run_step(self) -> None:
"""Run a single step/tick of the loop"""
try:
handle = self._scheduled.pop(0) # fifo: extract first item
while handle is not None:
handle.run()
handle = self._scheduled.pop(0) # fifo: extract first item
except IndexError:
# list is empty
pass
def run(self, coroutine: Generator[Any, None, Any]) -> Any:
"""Run a coroutine"""
self._running = True
step = 1
while self._running:
print("Loop step", step, self._scheduled)
try:
self.run_step()
next(coroutine)
step += 1
except StopIteration as e:
self._running = False
return e.value
def stop(self) -> None:
"""Stop running the loop"""
self._running = False
def schedule(self, name: str, callback: Callable, *args: Any) -> None:
"""Schedule a callback for the next step/tick"""
self._scheduled.append(Handle(name, callback, args))
--- loop3.py 2022-11-02 10:33:10.882781008 +0100
+++ loop4.py 2022-11-02 10:35:15.054206925 +0100
@@ -1,8 +1,10 @@
-from typing import Any, Generator
+from typing import Any, Callable, Generator
+
+from handle import Handle
class Loop:
- """Loop v3"""
+ """Loop v4"""
_instance: "Loop" = None
@@ -19,10 +21,10 @@
def run_step(self) -> None:
"""Run a single step/tick of the loop"""
try:
- task = self._scheduled.pop(0) # fifo: extract first item
- while task is not None:
- task.step()
- task = self._scheduled.pop(0) # fifo: extract first item
+ handle = self._scheduled.pop(0) # fifo: extract first item
+ while handle is not None:
+ handle.run()
+ handle = self._scheduled.pop(0) # fifo: extract first item
except IndexError:
# list is empty
pass
@@ -45,6 +47,6 @@
"""Stop running the loop"""
self._running = False
- def schedule(self, task) -> None:
- """Schedule a Task for the next step/tick"""
- self._scheduled.append(task)
+ def schedule(self, name: str, callback: Callable, *args: Any) -> None:
+ """Schedule a callback for the next step/tick"""
+ self._scheduled.append(Handle(name, callback, args))
The scheduler method (run_step
) now loops over Handle
instances instead of
Task
s. The schedule
method got adjusted to accept a Callable
as callback
and some arguments. These are put into a Handle
which gets scheduled for the
next step/tick.
from typing import Any, Generator
from future import Future
from loop import Loop
class Task(Future):
"""Task v3"""
def __init__(self, coroutine: Generator[Any, None, Any], name: str):
super().__init__(name)
self._coroutine = coroutine
self._loop = Loop.get_current_loop()
self.schedule()
def step(self) -> None:
try:
next(self._coroutine)
except StopIteration as e:
self.set_result(e.value)
else:
# no result yet
self.schedule()
def schedule(self) -> None:
self._loop.schedule(self._name, self.step)
--- task2.py 2022-11-02 10:08:05.706252365 +0100
+++ task3.py 2022-11-02 10:10:30.317376323 +0100
@@ -6,7 +6,7 @@
class Task(Future):
- """Task v2"""
+ """Task v3"""
def __init__(self, coroutine: Generator[Any, None, Any], name: str):
super().__init__(name)
@@ -24,4 +24,4 @@
self.schedule()
def schedule(self) -> None:
- self._loop.schedule(self)
+ self._loop.schedule(self._name, self.step)
The Task
requires only a small change. Instead of passing itself to the loop,
it passed its step
method for being scheduled and its name for debugging
purposes.
from future import Future
from task import Task
from loop import Loop
def some_result(result):
future = Future("Some Result")
future.set_result(result)
return (yield from future)
def add(coroutine1, coroutine2):
task1 = Task(coroutine1, "Add X")
task2 = Task(coroutine2, "Add Y")
x = yield from task1
y = yield from task2
return x + y
def main():
return (yield from add(some_result(1), some_result(2)))
loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)
Output:
Loop step 1 []
Loop step 2 [<Handle name='Add X' callback='step'>, <Handle name='Add Y' callback='step'>]
Loop step 3 []
Loop finished with result 3
Summary
Using a callback abstraction makes the scheduling more generic.
Our loop can schedule and run all kind of things using a callback.
Internally the
Loop
schedulesHandle
instances.