Step 13 - Deduplicate Running Coroutines¶
Currently the code for running a coroutine
try:
next(coroutine)
except StopIteration as e:
result = e.value
can be found twice in our code. Once in the Task class and another time in the
Loop class. With the new add_done_callback also being available at the
Task class (because it derives from the Future) we could create an initial
root Task that runs the main coroutine in the Loop. If that root Task is
done our Loop is done too.
from typing import Any, Callable, Generator
from handle import Handle
class Loop:
"""Loop v5"""
_instance: "Loop" = None
def __init__(self):
self._running = False
self._scheduled = []
@classmethod
def get_current_loop(cls) -> "Loop":
if not cls._instance:
cls._instance = Loop()
return cls._instance
def run_step(self) -> None:
"""Run a single step/tick of the loop"""
# execute all current known handles only.
# not the ones added while running the handle callbacks
scheduled = self._scheduled.copy()
self._scheduled.clear()
try:
handle = scheduled.pop(0) # fifo: extract first item
while handle is not None:
handle.run()
handle = scheduled.pop(0) # fifo: extract first item
except IndexError:
# list is empty
pass
# a shorter and even more smarter version for the above code could be
#
# for _ in range(len(self._scheduled)):
# handle = self._scheduled.pop(0) # fifo: extract first item
# handle.run()
def run_loop(self) -> None:
"""Run the loop"""
self._running = True
step = 1
while self._running:
print("Loop step", step, self._scheduled)
self.run_step()
step += 1
def run(self, coroutine: Generator[Any, None, Any]) -> Any:
"""Run a coroutine until it is done/completed"""
from task import Task # avoid cyclic dependency
# create a root task for our coroutine
# the tasks gets scheduled immediately in its constructor
task = Task(coroutine, "Initial Task")
task.add_done_callback(self._done)
self.run_loop()
return task.result()
def stop(self) -> None:
"""Stop running the loop"""
self._running = False
def schedule(self, name: str, callback: Callable, *args: Any) -> None:
"""Schedule a callback for the next step/tick"""
self._scheduled.append(Handle(name, callback, args))
def _done(self, _future) -> None:
self.stop()
--- loop4.py 2022-11-02 10:35:15.054206925 +0100
+++ loop5.py 2022-11-02 15:31:25.547688722 +0100
@@ -4,7 +4,7 @@
class Loop:
- """Loop v4"""
+ """Loop v5"""
_instance: "Loop" = None
@@ -20,28 +20,44 @@
def run_step(self) -> None:
"""Run a single step/tick of the loop"""
+ # execute all current known handles only.
+ # not the ones added while running the handle callbacks
+ scheduled = self._scheduled.copy()
+ self._scheduled.clear()
+
try:
- handle = self._scheduled.pop(0) # fifo: extract first item
+ handle = scheduled.pop(0) # fifo: extract first item
while handle is not None:
handle.run()
- handle = self._scheduled.pop(0) # fifo: extract first item
+ handle = scheduled.pop(0) # fifo: extract first item
except IndexError:
# list is empty
pass
+ # a shorter and even more smarter version for the above code could be
+ #
+ # for _ in range(len(self._scheduled)):
+ # handle = self._scheduled.pop(0) # fifo: extract first item
+ # handle.run()
- def run(self, coroutine: Generator[Any, None, Any]) -> Any:
- """Run a coroutine"""
+ def run_loop(self) -> None:
+ """Run the loop"""
self._running = True
step = 1
while self._running:
print("Loop step", step, self._scheduled)
- try:
- self.run_step()
- next(coroutine)
- step += 1
- except StopIteration as e:
- self._running = False
- return e.value
+ self.run_step()
+ step += 1
+
+ def run(self, coroutine: Generator[Any, None, Any]) -> Any:
+ """Run a coroutine until it is done/completed"""
+ from task import Task # avoid cyclic dependency
+
+ # create a root task for our coroutine
+ # the tasks gets scheduled immediately in its constructor
+ task = Task(coroutine, "Initial Task")
+ task.add_done_callback(self._done)
+ self.run_loop()
+ return task.result()
def stop(self) -> None:
"""Stop running the loop"""
@@ -50,3 +66,6 @@
def schedule(self, name: str, callback: Callable, *args: Any) -> None:
"""Schedule a callback for the next step/tick"""
self._scheduled.append(Handle(name, callback, args))
+
+ def _done(self, _future) -> None:
+ self.stop()
The Loop doesn’t know anything about the Generator/Iterator protocol
anymore. It just starts a coroutine in a Tasks and schedules Handles.
Instead the Tasks are resuming the coroutines via the Generator/Iterator
protocol.
from future import Future
from loop import Loop
from task import Task
def some_result(result):
future = Future("Some Result")
future.set_result(result)
return (yield from future)
def add(coroutine1, coroutine2):
task1 = Task(coroutine1, "Add X")
task2 = Task(coroutine2, "Add Y")
x = yield from task1
y = yield from task2
return x + y
def main():
return (yield from add(some_result(1), some_result(2)))
loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)
Output:
Loop step 1 [<Handle name='Initial Task' callback='step'>]
Loop step 2 [<Handle name='Add X' callback='step'>, <Handle name='Add Y' callback='step'>]
Loop step 3 [<Handle name='Some Result' callback='_wakeup'>, <Handle name='Some Result' callback='_wakeup'>]
Loop step 4 [<Handle name='Add X' callback='step'>, <Handle name='Add Y' callback='step'>]
Loop step 5 [<Handle name='Add X' callback='_wakeup'>]
Loop step 6 [<Handle name='Initial Task' callback='step'>]
Loop step 7 [<Handle name='Add Y' callback='_wakeup'>]
Loop step 8 [<Handle name='Initial Task' callback='step'>]
Loop step 9 [<Handle name='Initial Task' callback='_done'>]
Loop finished with result 3
Summary
It’s possible have code to handle coroutines at a single place.
Tasks are dedicated to run coroutines.Starting an initial root
Taskallows theLoopto be completely independent of coroutines.