Step 16 - Cancellation¶
from enum import Enum
from typing import Any, Callable, Generator
from loop import Loop
class CancelledError(Exception):
"""
Raised if a Future is cancelled
"""
class FutureState(Enum):
PENDING = "pending"
DONE = "done"
CANCELLED = "cancelled"
class Future:
"""Return a result in the future v4"""
_result = None
def __init__(self, name: str = None):
self._name = name
self._callbacks = []
self._state = FutureState.PENDING
self._loop = Loop.get_current_loop()
def set_result(self, result: Any):
if self._state != FutureState.PENDING:
raise RuntimeError("Invalid Future state")
self._result = result
self._state = FutureState.DONE
self._schedule_callbacks()
def cancel(self) -> bool:
if self._state != FutureState.PENDING:
return False
self._state = FutureState.CANCELLED
self._schedule_callbacks()
return True
def result(self) -> Any:
if self._state == FutureState.CANCELLED:
raise CancelledError()
if self._state != FutureState.DONE:
raise RuntimeError("Invalid Future state")
return self._result
def done(self) -> bool:
return self._state == FutureState.DONE
def cancelled(self) -> bool:
return self._state == FutureState.CANCELLED
def add_done_callback(self, fn: Callable[["Future"], None]) -> None:
if self._state != FutureState.PENDING:
# we already have a result or are cancelled
self._loop.schedule(self._name, fn, self)
else:
self._callbacks.append(fn)
def _schedule_callbacks(self) -> None:
if not self._callbacks:
return
callbacks = self._callbacks.copy()
self._callbacks.clear()
for callback in callbacks:
self._loop.schedule(self._name, callback, self)
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} name='{self._name}' "
f"state={self._state} id='{hex(id(self))}'>"
)
def __iter__(self) -> Generator["Future", None, Any]:
yield self # some new magic
return self.result()
--- future3.py 2022-11-02 10:37:42.421495887 +0100
+++ future4.py 2022-11-09 12:22:03.254019054 +0100
@@ -1,33 +1,64 @@
+from enum import Enum
from typing import Any, Callable, Generator
from loop import Loop
+class CancelledError(Exception):
+ """
+ Raised if a Future is cancelled
+ """
+
+
+class FutureState(Enum):
+ PENDING = "pending"
+ DONE = "done"
+ CANCELLED = "cancelled"
+
+
class Future:
- """Return a result in the future v3"""
+ """Return a result in the future v4"""
_result = None
- _done = False
def __init__(self, name: str = None):
self._name = name
self._callbacks = []
+ self._state = FutureState.PENDING
self._loop = Loop.get_current_loop()
def set_result(self, result: Any):
+ if self._state != FutureState.PENDING:
+ raise RuntimeError("Invalid Future state")
+
self._result = result
- self._done = True
+ self._state = FutureState.DONE
self._schedule_callbacks()
+ def cancel(self) -> bool:
+ if self._state != FutureState.PENDING:
+ return False
+
+ self._state = FutureState.CANCELLED
+ self._schedule_callbacks()
+ return True
+
def result(self) -> Any:
+ if self._state == FutureState.CANCELLED:
+ raise CancelledError()
+ if self._state != FutureState.DONE:
+ raise RuntimeError("Invalid Future state")
return self._result
def done(self) -> bool:
- return self._done
+ return self._state == FutureState.DONE
+
+ def cancelled(self) -> bool:
+ return self._state == FutureState.CANCELLED
def add_done_callback(self, fn: Callable[["Future"], None]) -> None:
- if self.done():
- # we already have a result
+ if self._state != FutureState.PENDING:
+ # we already have a result or are cancelled
self._loop.schedule(self._name, fn, self)
else:
self._callbacks.append(fn)
@@ -45,7 +76,7 @@
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} name='{self._name}' "
- f"id='{hex(id(self))}'>"
+ f"state={self._state} id='{hex(id(self))}'>"
)
def __iter__(self) -> Generator["Future", None, Any]:
from typing import Any, Generator
from future import CancelledError, Future
from loop import Loop
class Task(Future):
"""Task v5"""
def __init__(self, coroutine: Generator[Any, None, Any], name: str):
super().__init__(name)
self._coroutine = coroutine
self._loop = Loop.get_current_loop()
self._fut_waiter = None
self._must_cancel = False
self.schedule()
def step(self, exc: Exception = None) -> None:
self._fut_waiter = None
if self._must_cancel:
if not isinstance(exc, CancelledError):
exc = CancelledError()
self._must_cancel = False
try:
if exc is None:
yielded = self._coroutine.send(None)
else:
# This may also be a cancellation.
yielded = self._coroutine.throw(exc)
except StopIteration as e:
self.set_result(e.value)
except CancelledError:
# coroutine is cancelled
# update task status via future
super().cancel()
else:
# no result yet
if isinstance(yielded, Future):
# we are blocked by some external event for example waiting for
# incoming data. let's wait until result is available
yielded.add_done_callback(self._wakeup)
self._fut_waiter = yielded
if self._must_cancel: # may have been set since last suspend
if yielded.cancel():
self._must_cancel = False
else:
# just schedule again
self.schedule()
def schedule(self, exc: Exception = None) -> None:
self._loop.schedule(self._name, self.step, exc)
def cancel(self) -> bool:
if self.done():
return False
if self._fut_waiter is not None:
# we are waiting for a "blocked" result
if self._fut_waiter.cancel():
return True
# task is just scheduled now
self._must_cancel = True
return True
def _wakeup(self, future: Future) -> None:
try:
future.result()
except Exception as exc:
# This may also be a cancellation.
self.schedule(exc)
else:
self.schedule()
--- task4.py 2022-11-02 16:25:54.369385455 +0100
+++ task5.py 2022-11-09 12:17:00.995591335 +0100
@@ -1,36 +1,75 @@
from typing import Any, Generator
-from future import Future
-
+from future import CancelledError, Future
from loop import Loop
class Task(Future):
- """Task v4"""
+ """Task v5"""
def __init__(self, coroutine: Generator[Any, None, Any], name: str):
super().__init__(name)
self._coroutine = coroutine
self._loop = Loop.get_current_loop()
+ self._fut_waiter = None
+ self._must_cancel = False
self.schedule()
- def step(self) -> None:
+ def step(self, exc: Exception = None) -> None:
+ self._fut_waiter = None
+
+ if self._must_cancel:
+ if not isinstance(exc, CancelledError):
+ exc = CancelledError()
+ self._must_cancel = False
+
try:
- yielded = next(self._coroutine)
+ if exc is None:
+ yielded = self._coroutine.send(None)
+ else:
+ # This may also be a cancellation.
+ yielded = self._coroutine.throw(exc)
except StopIteration as e:
self.set_result(e.value)
+ except CancelledError:
+ # coroutine is cancelled
+ # update task status via future
+ super().cancel()
else:
# no result yet
if isinstance(yielded, Future):
# we are blocked by some external event for example waiting for
# incoming data. let's wait until result is available
yielded.add_done_callback(self._wakeup)
+ self._fut_waiter = yielded
+ if self._must_cancel: # may have been set since last suspend
+ if yielded.cancel():
+ self._must_cancel = False
else:
# just schedule again
self.schedule()
- def schedule(self) -> None:
- self._loop.schedule(self._name, self.step)
+ def schedule(self, exc: Exception = None) -> None:
+ self._loop.schedule(self._name, self.step, exc)
- def _wakeup(self, _future: Future) -> None:
- self.schedule()
+ def cancel(self) -> bool:
+ if self.done():
+ return False
+
+ if self._fut_waiter is not None:
+ # we are waiting for a "blocked" result
+ if self._fut_waiter.cancel():
+ return True
+
+ # task is just scheduled now
+ self._must_cancel = True
+ return True
+
+ def _wakeup(self, future: Future) -> None:
+ try:
+ future.result()
+ except Exception as exc:
+ # This may also be a cancellation.
+ self.schedule(exc)
+ else:
+ self.schedule()
from future import CancelledError, Future
from loop import Loop
from wait import wait
def some_result(result):
future = Future("Some Result")
future.set_result(result)
return (yield from future)
def cancelled_result(result):
future = Future("Some Result")
future.cancel()
return (yield from future)
def add(coroutine1, coroutine2):
x, y = yield from wait([coroutine1, coroutine2])
return x + y
def main():
return (yield from add(some_result(1), cancelled_result(2)))
loop = Loop.get_current_loop()
try:
result = loop.run(main())
print("Loop finished with result", result)
except CancelledError:
print("Loop got cancelled")
Output:
Loop step 1 [<Handle name='Initial Task' callback='step'>]
Loop step 2 [<Handle name='Task for some_result' callback='step'>, <Handle name='Task for cancelled_result' callback='step'>]
Loop step 3 [<Handle name='Some Result' callback='_wakeup'>, <Handle name='Some Result' callback='_wakeup'>]
Loop step 4 [<Handle name='Task for some_result' callback='step'>, <Handle name='Task for cancelled_result' callback='step'>]
Loop step 5 [<Handle name='Task for some_result' callback='_on_completion'>, <Handle name='Task for cancelled_result' callback='_on_completion'>]
Loop step 6 [<Handle name='Waiter for <Task name='Task for some_result' state=FutureState.PENDING id='0x7f3cd90b6530'>, <Task name='Task for cancelled_result' state=FutureState.PENDING id='0x7f3cd90b65c0'>' callback='_wakeup'>]
Loop step 7 [<Handle name='Initial Task' callback='step'>]
Loop step 8 [<Handle name='Initial Task' callback='_done'>]
Loop got cancelled