Step 13 - Deduplicate Running Coroutines¶
Currently the code for running a coroutine
try:
next(coroutine)
except StopIteration as e:
result = e.value
can be found twice in our code. Once in the Task
class and another time in the
Loop
class. With the new add_done_callback
also being available at the
Task
class (because it derives from the Future
) we could create an initial
root Task
that runs the main coroutine in the Loop
. If that root Task
is
done our Loop
is done too.
from typing import Any, Callable, Generator
from handle import Handle
class Loop:
"""Loop v5"""
_instance: "Loop" = None
def __init__(self):
self._running = False
self._scheduled = []
@classmethod
def get_current_loop(cls) -> "Loop":
if not cls._instance:
cls._instance = Loop()
return cls._instance
def run_step(self) -> None:
"""Run a single step/tick of the loop"""
# execute all current known handles only.
# not the ones added while running the handle callbacks
scheduled = self._scheduled.copy()
self._scheduled.clear()
try:
handle = scheduled.pop(0) # fifo: extract first item
while handle is not None:
handle.run()
handle = scheduled.pop(0) # fifo: extract first item
except IndexError:
# list is empty
pass
# a shorter and even more smarter version for the above code could be
#
# for _ in range(len(self._scheduled)):
# handle = self._scheduled.pop(0) # fifo: extract first item
# handle.run()
def run_loop(self) -> None:
"""Run the loop"""
self._running = True
step = 1
while self._running:
print("Loop step", step, self._scheduled)
self.run_step()
step += 1
def run(self, coroutine: Generator[Any, None, Any]) -> Any:
"""Run a coroutine until it is done/completed"""
from task import Task # avoid cyclic dependency
# create a root task for our coroutine
# the tasks gets scheduled immediately in its constructor
task = Task(coroutine, "Initial Task")
task.add_done_callback(self._done)
self.run_loop()
return task.result()
def stop(self) -> None:
"""Stop running the loop"""
self._running = False
def schedule(self, name: str, callback: Callable, *args: Any) -> None:
"""Schedule a callback for the next step/tick"""
self._scheduled.append(Handle(name, callback, args))
def _done(self, _future) -> None:
self.stop()
--- loop4.py 2022-11-02 10:35:15.054206925 +0100
+++ loop5.py 2022-11-02 15:31:25.547688722 +0100
@@ -4,7 +4,7 @@
class Loop:
- """Loop v4"""
+ """Loop v5"""
_instance: "Loop" = None
@@ -20,28 +20,44 @@
def run_step(self) -> None:
"""Run a single step/tick of the loop"""
+ # execute all current known handles only.
+ # not the ones added while running the handle callbacks
+ scheduled = self._scheduled.copy()
+ self._scheduled.clear()
+
try:
- handle = self._scheduled.pop(0) # fifo: extract first item
+ handle = scheduled.pop(0) # fifo: extract first item
while handle is not None:
handle.run()
- handle = self._scheduled.pop(0) # fifo: extract first item
+ handle = scheduled.pop(0) # fifo: extract first item
except IndexError:
# list is empty
pass
+ # a shorter and even more smarter version for the above code could be
+ #
+ # for _ in range(len(self._scheduled)):
+ # handle = self._scheduled.pop(0) # fifo: extract first item
+ # handle.run()
- def run(self, coroutine: Generator[Any, None, Any]) -> Any:
- """Run a coroutine"""
+ def run_loop(self) -> None:
+ """Run the loop"""
self._running = True
step = 1
while self._running:
print("Loop step", step, self._scheduled)
- try:
- self.run_step()
- next(coroutine)
- step += 1
- except StopIteration as e:
- self._running = False
- return e.value
+ self.run_step()
+ step += 1
+
+ def run(self, coroutine: Generator[Any, None, Any]) -> Any:
+ """Run a coroutine until it is done/completed"""
+ from task import Task # avoid cyclic dependency
+
+ # create a root task for our coroutine
+ # the tasks gets scheduled immediately in its constructor
+ task = Task(coroutine, "Initial Task")
+ task.add_done_callback(self._done)
+ self.run_loop()
+ return task.result()
def stop(self) -> None:
"""Stop running the loop"""
@@ -50,3 +66,6 @@
def schedule(self, name: str, callback: Callable, *args: Any) -> None:
"""Schedule a callback for the next step/tick"""
self._scheduled.append(Handle(name, callback, args))
+
+ def _done(self, _future) -> None:
+ self.stop()
The Loop
doesn’t know anything about the Generator
/Iterator
protocol
anymore. It just starts a coroutine in a Tasks
and schedules Handle
s.
Instead the Task
s are resuming the coroutines via the Generator
/Iterator
protocol.
from future import Future
from loop import Loop
from task import Task
def some_result(result):
future = Future("Some Result")
future.set_result(result)
return (yield from future)
def add(coroutine1, coroutine2):
task1 = Task(coroutine1, "Add X")
task2 = Task(coroutine2, "Add Y")
x = yield from task1
y = yield from task2
return x + y
def main():
return (yield from add(some_result(1), some_result(2)))
loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)
Output:
Loop step 1 [<Handle name='Initial Task' callback='step'>]
Loop step 2 [<Handle name='Add X' callback='step'>, <Handle name='Add Y' callback='step'>]
Loop step 3 [<Handle name='Some Result' callback='_wakeup'>, <Handle name='Some Result' callback='_wakeup'>]
Loop step 4 [<Handle name='Add X' callback='step'>, <Handle name='Add Y' callback='step'>]
Loop step 5 [<Handle name='Add X' callback='_wakeup'>]
Loop step 6 [<Handle name='Initial Task' callback='step'>]
Loop step 7 [<Handle name='Add Y' callback='_wakeup'>]
Loop step 8 [<Handle name='Initial Task' callback='step'>]
Loop step 9 [<Handle name='Initial Task' callback='_done'>]
Loop finished with result 3
Summary
It’s possible have code to handle coroutines at a single place.
Task
s are dedicated to run coroutines.Starting an initial root
Task
allows theLoop
to be completely independent of coroutines.