Step 9 - A Job has to be done¶
Let us look at the example from the last step again with some more output.
from loop import Loop
class Future:
"""Return a result in the future"""
_result = None
def set_result(self, result):
self._result = result
def __iter__(self):
print(self.__class__.__name__, 1)
yield
print(self.__class__.__name__, 2)
return self._result
def some_result(value):
future = Future()
future.set_result(value)
print(some_result.__name__, 1)
return (yield from future)
def add(coroutine1, coroutine2):
print(add.__name__, 1)
x = yield from coroutine1
print(add.__name__, 2)
y = yield from coroutine2
print(add.__name__, 3)
return x + y
def main():
print(main.__name__, 1)
result = yield from add(some_result(1), some_result(2))
print(main.__name__, 2)
return result
loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)
Output:
Loop step 1
main 1
add 1
some_result 1
Future 1
Loop step 2
Future 2
add 2
some_result 1
Future 1
Loop step 3
Future 2
add 3
main 2
Loop finished with result 3
Future 1
is reached and afterwards the control is given back to the loop. A
next step is executed. After Future 2
a result is available.
As you can see there is still no real concurrency. Only a single coroutine is run until it is finished and the result is available before another one is called in the next step of the loop.
Yes we can suspend the current function but we still run all coroutines sequentially.
To change that we need a new thing.
This thing should take a coroutine, because all our business logic uses coroutines.
It should run the coroutine immediately, because we want to get a job done.
It should run until a job is finally done.
And its result should be gathered in the future.
Because a result should be gathered in the future, let’s derive the new thing
from Future
. A job can be worked on in a task. So let’s call this new thing
Task
. But first let’s extend our Future
a bit.
from typing import Any, Generator
class Future:
"""Return a result in the future v2"""
_result = None
def __init__(self, name: str = None):
self._name = name
def set_result(self, result: Any) -> None:
self._result = result
def result(self) -> Any:
return self._result
def done(self) -> bool:
return self.result() is not None
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} name='{self._name}' "
f"id='{hex(id(self))}'>"
)
def __iter__(self) -> Generator[None, None, Any]:
yield
return self.result()
--- future1.py 2022-11-02 10:49:53.297744123 +0100
+++ future2.py 2022-11-02 10:50:00.133708767 +0100
@@ -2,13 +2,28 @@
class Future:
- """Return a result in the future v1"""
+ """Return a result in the future v2"""
_result = None
- def set_result(self, result: Any):
+ def __init__(self, name: str = None):
+ self._name = name
+
+ def set_result(self, result: Any) -> None:
self._result = result
+ def result(self) -> Any:
+ return self._result
+
+ def done(self) -> bool:
+ return self.result() is not None
+
+ def __repr__(self) -> str:
+ return (
+ f"<{self.__class__.__name__} name='{self._name}' "
+ f"id='{hex(id(self))}'>"
+ )
+
def __iter__(self) -> Generator[None, None, Any]:
yield
- return self._result
+ return self.result()
The Future
class got a done
method to check if the result has been set and
also a result
method to allow accessing the result from the outside of the
class. Now it is possible to check whether the state of the Future
is done.
Additionally it is possible to pass a name to identify the Future
via
__repr__
more easily in later steps for debugging purposes.
from typing import Any, Generator
from future import Future
class Task(Future):
"""Task v1"""
def __init__(self, coroutine: Generator[Any, None, Any], name: str):
super().__init__(name)
self._coroutine = coroutine
self.run_loop()
def step(self) -> None:
try:
next(self._coroutine)
except StopIteration as e:
self.set_result(e.value)
def run_loop(self) -> None:
while not self.done():
self.step()
from future import Future
from task import Task
from loop import Loop
def some_result(result):
future = Future("Some Result")
future.set_result(result)
return (yield from future)
def add(coroutine1, coroutine2):
x = yield from Task(coroutine1, "Add X")
y = yield from Task(coroutine2, "Add Y")
return x + y
def main():
return (yield from add(some_result(1), some_result(2)))
loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)
Output:
Loop step 1
Loop step 2
Loop step 3
Loop finished with result 3
First of all the functionality and the result hasn’t changed. We just introduced
a new class that wraps our coroutines. We will get to concurrently running
Task
s in the next chapter.
Summary
We have a basic
Task
class that runs a coroutine in a loop.This coroutine is started immediately.
The
Task
returns the result of the coroutine in the future when it’s done.