Step 12 - Back to the Future¶
Currently a Future is done in exactly two steps/ticks of the Loop.
from future import Future
future = Future()
it = iter(future)
# step 1
next(it)
# step 2
# still no result yet
next(it)
A StopIteration exception is raised without a value set at the second step.
Hint
Remember yield from future is the same as
it = iter(future)
while True:
try:
value = next(it)
yield value
except StopIteration as e:
return e.value
What if we need to wait for more then one step/tick for example to wait for some external event providing the result?
from typing import Any, Callable, Generator
from loop import Loop
class Future:
"""Return a result in the future v3"""
_result = None
_done = False
def __init__(self, name: str = None):
self._name = name
self._callbacks = []
self._loop = Loop.get_current_loop()
def set_result(self, result: Any):
self._result = result
self._done = True
self._schedule_callbacks()
def result(self) -> Any:
return self._result
def done(self) -> bool:
return self._done
def add_done_callback(self, fn: Callable[["Future"], None]) -> None:
if self.done():
# we already have a result
self._loop.schedule(self._name, fn, self)
else:
self._callbacks.append(fn)
def _schedule_callbacks(self) -> None:
if not self._callbacks:
return
callbacks = self._callbacks.copy()
self._callbacks.clear()
for callback in callbacks:
self._loop.schedule(self._name, callback, self)
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} name='{self._name}' "
f"id='{hex(id(self))}'>"
)
def __iter__(self) -> Generator["Future", None, Any]:
yield self # some new magic
return self.result()
--- future2.py 2022-11-02 10:50:00.133708767 +0100
+++ future3.py 2022-11-02 10:37:42.421495887 +0100
@@ -1,22 +1,46 @@
-from typing import Any, Generator
+from typing import Any, Callable, Generator
+
+from loop import Loop
class Future:
- """Return a result in the future v2"""
+ """Return a result in the future v3"""
_result = None
+ _done = False
def __init__(self, name: str = None):
self._name = name
+ self._callbacks = []
+ self._loop = Loop.get_current_loop()
- def set_result(self, result: Any) -> None:
+ def set_result(self, result: Any):
self._result = result
+ self._done = True
+ self._schedule_callbacks()
def result(self) -> Any:
return self._result
def done(self) -> bool:
- return self.result() is not None
+ return self._done
+
+ def add_done_callback(self, fn: Callable[["Future"], None]) -> None:
+ if self.done():
+ # we already have a result
+ self._loop.schedule(self._name, fn, self)
+ else:
+ self._callbacks.append(fn)
+
+ def _schedule_callbacks(self) -> None:
+ if not self._callbacks:
+ return
+
+ callbacks = self._callbacks.copy()
+ self._callbacks.clear()
+
+ for callback in callbacks:
+ self._loop.schedule(self._name, callback, self)
def __repr__(self) -> str:
return (
@@ -24,6 +48,6 @@
f"id='{hex(id(self))}'>"
)
- def __iter__(self) -> Generator[None, None, Any]:
- yield
+ def __iter__(self) -> Generator["Future", None, Any]:
+ yield self # some new magic
return self.result()
The Future is extended to allow adding callbacks that are executed after
a result is set. This allows for getting notified when a Future is done. When
the Future is done the callbacks are scheduled for the next step/tick in our
Loop.
But the actual waiting for some external event providing the result is not
implemented directly in a Future. Instead the Task is extended.
from typing import Any, Generator
from future import Future
from loop import Loop
class Task(Future):
"""Task v4"""
def __init__(self, coroutine: Generator[Any, None, Any], name: str):
super().__init__(name)
self._coroutine = coroutine
self._loop = Loop.get_current_loop()
self.schedule()
def step(self) -> None:
try:
yielded = next(self._coroutine)
except StopIteration as e:
self.set_result(e.value)
else:
# no result yet
if isinstance(yielded, Future):
# we are blocked by some external event for example waiting for
# incoming data. let's wait until result is available
yielded.add_done_callback(self._wakeup)
else:
# just schedule again
self.schedule()
def schedule(self) -> None:
self._loop.schedule(self._name, self.step)
def _wakeup(self, _future: Future) -> None:
self.schedule()
--- task3.py 2022-11-02 10:10:30.317376323 +0100
+++ task4.py 2022-11-02 10:42:23.012079415 +0100
@@ -6,7 +6,7 @@
class Task(Future):
- """Task v3"""
+ """Task v4"""
def __init__(self, coroutine: Generator[Any, None, Any], name: str):
super().__init__(name)
@@ -16,12 +16,21 @@
def step(self) -> None:
try:
- next(self._coroutine)
+ yielded = next(self._coroutine)
except StopIteration as e:
self.set_result(e.value)
else:
# no result yet
- self.schedule()
+ if isinstance(yielded, Future):
+ # we are blocked by some external event for example waiting for
+ # incoming data. let's wait until result is available
+ yielded.add_done_callback(self._wakeup)
+ else:
+ # just schedule again
+ self.schedule()
def schedule(self) -> None:
self._loop.schedule(self._name, self.step)
+
+ def _wakeup(self, _future: Future) -> None:
+ self.schedule()
The Task is extended to run a coroutine that is actually being blocked by
some external event. The coroutine indicates this by yielding a Future. If
a Future is yielded the Tasks registers a wake up callback at this Future.
The Task is suspended now (it isn’t scheduled in the loop anymore). If the
Future is done the Task is notified and scheduled again via the Loop. This
allows the Task to sleep for one or more scheduler ticks/steps until some
event has occurred.
from future import Future
from task import Task
from loop import Loop
def some_result(result):
future = Future("Some Result")
future.set_result(result)
return (yield from future)
def add(coroutine1, coroutine2):
task1 = Task(coroutine1, "Add X")
task2 = Task(coroutine2, "Add Y")
x = yield from task1
y = yield from task2
return x + y
def main():
return (yield from add(some_result(1), some_result(2)))
loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)
Output:
Loop step 1 []
Loop step 2 [<Handle name='Add X' callback='step'>, <Handle name='Add Y' callback='step'>]
Loop step 3 []
Loop finished with result 3
Summary
A
Taskcan wait for results that are currently blocked.If a
Taskgets aFuturefrom a coroutine it blocks until theFutureis done.Blocked means the
Taskis suspended, it is not scheduled in theLoopanymore and the result is not available yet.If the result is available finally the
Taskgets notified and schedules itself for the next step/tick in theLoop.