Step 12 - Back to the Future¶
Currently a Future
is done
in exactly two steps/ticks of the Loop
.
from future import Future
future = Future()
it = iter(future)
# step 1
next(it)
# step 2
# still no result yet
next(it)
A StopIteration
exception is raised without a value set at the second step.
Hint
Remember yield from future
is the same as
it = iter(future)
while True:
try:
value = next(it)
yield value
except StopIteration as e:
return e.value
What if we need to wait for more then one step/tick for example to wait for some external event providing the result?
from typing import Any, Callable, Generator
from loop import Loop
class Future:
"""Return a result in the future v3"""
_result = None
_done = False
def __init__(self, name: str = None):
self._name = name
self._callbacks = []
self._loop = Loop.get_current_loop()
def set_result(self, result: Any):
self._result = result
self._done = True
self._schedule_callbacks()
def result(self) -> Any:
return self._result
def done(self) -> bool:
return self._done
def add_done_callback(self, fn: Callable[["Future"], None]) -> None:
if self.done():
# we already have a result
self._loop.schedule(self._name, fn, self)
else:
self._callbacks.append(fn)
def _schedule_callbacks(self) -> None:
if not self._callbacks:
return
callbacks = self._callbacks.copy()
self._callbacks.clear()
for callback in callbacks:
self._loop.schedule(self._name, callback, self)
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} name='{self._name}' "
f"id='{hex(id(self))}'>"
)
def __iter__(self) -> Generator["Future", None, Any]:
yield self # some new magic
return self.result()
--- future2.py 2022-11-02 10:50:00.133708767 +0100
+++ future3.py 2022-11-02 10:37:42.421495887 +0100
@@ -1,22 +1,46 @@
-from typing import Any, Generator
+from typing import Any, Callable, Generator
+
+from loop import Loop
class Future:
- """Return a result in the future v2"""
+ """Return a result in the future v3"""
_result = None
+ _done = False
def __init__(self, name: str = None):
self._name = name
+ self._callbacks = []
+ self._loop = Loop.get_current_loop()
- def set_result(self, result: Any) -> None:
+ def set_result(self, result: Any):
self._result = result
+ self._done = True
+ self._schedule_callbacks()
def result(self) -> Any:
return self._result
def done(self) -> bool:
- return self.result() is not None
+ return self._done
+
+ def add_done_callback(self, fn: Callable[["Future"], None]) -> None:
+ if self.done():
+ # we already have a result
+ self._loop.schedule(self._name, fn, self)
+ else:
+ self._callbacks.append(fn)
+
+ def _schedule_callbacks(self) -> None:
+ if not self._callbacks:
+ return
+
+ callbacks = self._callbacks.copy()
+ self._callbacks.clear()
+
+ for callback in callbacks:
+ self._loop.schedule(self._name, callback, self)
def __repr__(self) -> str:
return (
@@ -24,6 +48,6 @@
f"id='{hex(id(self))}'>"
)
- def __iter__(self) -> Generator[None, None, Any]:
- yield
+ def __iter__(self) -> Generator["Future", None, Any]:
+ yield self # some new magic
return self.result()
The Future
is extended to allow adding callbacks that are executed after
a result is set. This allows for getting notified when a Future
is done. When
the Future
is done the callbacks are scheduled for the next step/tick in our
Loop
.
But the actual waiting for some external event providing the result is not
implemented directly in a Future
. Instead the Task
is extended.
from typing import Any, Generator
from future import Future
from loop import Loop
class Task(Future):
"""Task v4"""
def __init__(self, coroutine: Generator[Any, None, Any], name: str):
super().__init__(name)
self._coroutine = coroutine
self._loop = Loop.get_current_loop()
self.schedule()
def step(self) -> None:
try:
yielded = next(self._coroutine)
except StopIteration as e:
self.set_result(e.value)
else:
# no result yet
if isinstance(yielded, Future):
# we are blocked by some external event for example waiting for
# incoming data. let's wait until result is available
yielded.add_done_callback(self._wakeup)
else:
# just schedule again
self.schedule()
def schedule(self) -> None:
self._loop.schedule(self._name, self.step)
def _wakeup(self, _future: Future) -> None:
self.schedule()
--- task3.py 2022-11-02 10:10:30.317376323 +0100
+++ task4.py 2022-11-02 10:42:23.012079415 +0100
@@ -6,7 +6,7 @@
class Task(Future):
- """Task v3"""
+ """Task v4"""
def __init__(self, coroutine: Generator[Any, None, Any], name: str):
super().__init__(name)
@@ -16,12 +16,21 @@
def step(self) -> None:
try:
- next(self._coroutine)
+ yielded = next(self._coroutine)
except StopIteration as e:
self.set_result(e.value)
else:
# no result yet
- self.schedule()
+ if isinstance(yielded, Future):
+ # we are blocked by some external event for example waiting for
+ # incoming data. let's wait until result is available
+ yielded.add_done_callback(self._wakeup)
+ else:
+ # just schedule again
+ self.schedule()
def schedule(self) -> None:
self._loop.schedule(self._name, self.step)
+
+ def _wakeup(self, _future: Future) -> None:
+ self.schedule()
The Task
is extended to run a coroutine that is actually being blocked by
some external event. The coroutine indicates this by yielding a Future
. If
a Future
is yielded the Tasks
registers a wake up callback at this Future
.
The Task
is suspended now (it isn’t scheduled in the loop anymore). If the
Future
is done the Task
is notified and scheduled again via the Loop
. This
allows the Task to sleep for one or more scheduler ticks/steps until some
event has occurred.
from future import Future
from task import Task
from loop import Loop
def some_result(result):
future = Future("Some Result")
future.set_result(result)
return (yield from future)
def add(coroutine1, coroutine2):
task1 = Task(coroutine1, "Add X")
task2 = Task(coroutine2, "Add Y")
x = yield from task1
y = yield from task2
return x + y
def main():
return (yield from add(some_result(1), some_result(2)))
loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)
Output:
Loop step 1 []
Loop step 2 [<Handle name='Add X' callback='step'>, <Handle name='Add Y' callback='step'>]
Loop step 3 []
Loop finished with result 3
Summary
A
Task
can wait for results that are currently blocked.If a
Task
gets aFuture
from a coroutine it blocks until theFuture
is done.Blocked means the
Task
is suspended, it is not scheduled in theLoop
anymore and the result is not available yet.If the result is available finally the
Task
gets notified and schedules itself for the next step/tick in theLoop
.