Step 16 - Cancellation

Future v4
from enum import Enum
from typing import Any, Callable, Generator

from loop import Loop


class CancelledError(Exception):
    """
    Raised if a Future is cancelled
    """


class FutureState(Enum):
    PENDING = "pending"
    DONE = "done"
    CANCELLED = "cancelled"


class Future:
    """Return a result in the future v4"""

    _result = None

    def __init__(self, name: str = None):
        self._name = name
        self._callbacks = []
        self._state = FutureState.PENDING
        self._loop = Loop.get_current_loop()

    def set_result(self, result: Any):
        if self._state != FutureState.PENDING:
            raise RuntimeError("Invalid Future state")

        self._result = result
        self._state = FutureState.DONE
        self._schedule_callbacks()

    def cancel(self) -> bool:
        if self._state != FutureState.PENDING:
            return False

        self._state = FutureState.CANCELLED
        self._schedule_callbacks()
        return True

    def result(self) -> Any:
        if self._state == FutureState.CANCELLED:
            raise CancelledError()
        if self._state != FutureState.DONE:
            raise RuntimeError("Invalid Future state")
        return self._result

    def done(self) -> bool:
        return self._state == FutureState.DONE

    def cancelled(self) -> bool:
        return self._state == FutureState.CANCELLED

    def add_done_callback(self, fn: Callable[["Future"], None]) -> None:
        if self._state != FutureState.PENDING:
            # we already have a result or are cancelled
            self._loop.schedule(self._name, fn, self)
        else:
            self._callbacks.append(fn)

    def _schedule_callbacks(self) -> None:
        if not self._callbacks:
            return

        callbacks = self._callbacks.copy()
        self._callbacks.clear()

        for callback in callbacks:
            self._loop.schedule(self._name, callback, self)

    def __repr__(self) -> str:
        return (
            f"<{self.__class__.__name__} name='{self._name}' "
            f"state={self._state} id='{hex(id(self))}'>"
        )

    def __iter__(self) -> Generator["Future", None, Any]:
        yield self  # some new magic
        return self.result()
Future v4
--- future3.py	2022-11-02 10:37:42.421495887 +0100
+++ future4.py	2022-11-09 12:22:03.254019054 +0100
@@ -1,33 +1,64 @@
+from enum import Enum
 from typing import Any, Callable, Generator
 
 from loop import Loop
 
 
+class CancelledError(Exception):
+    """
+    Raised if a Future is cancelled
+    """
+
+
+class FutureState(Enum):
+    PENDING = "pending"
+    DONE = "done"
+    CANCELLED = "cancelled"
+
+
 class Future:
-    """Return a result in the future v3"""
+    """Return a result in the future v4"""
 
     _result = None
-    _done = False
 
     def __init__(self, name: str = None):
         self._name = name
         self._callbacks = []
+        self._state = FutureState.PENDING
         self._loop = Loop.get_current_loop()
 
     def set_result(self, result: Any):
+        if self._state != FutureState.PENDING:
+            raise RuntimeError("Invalid Future state")
+
         self._result = result
-        self._done = True
+        self._state = FutureState.DONE
         self._schedule_callbacks()
 
+    def cancel(self) -> bool:
+        if self._state != FutureState.PENDING:
+            return False
+
+        self._state = FutureState.CANCELLED
+        self._schedule_callbacks()
+        return True
+
     def result(self) -> Any:
+        if self._state == FutureState.CANCELLED:
+            raise CancelledError()
+        if self._state != FutureState.DONE:
+            raise RuntimeError("Invalid Future state")
         return self._result
 
     def done(self) -> bool:
-        return self._done
+        return self._state == FutureState.DONE
+
+    def cancelled(self) -> bool:
+        return self._state == FutureState.CANCELLED
 
     def add_done_callback(self, fn: Callable[["Future"], None]) -> None:
-        if self.done():
-            # we already have a result
+        if self._state != FutureState.PENDING:
+            # we already have a result or are cancelled
             self._loop.schedule(self._name, fn, self)
         else:
             self._callbacks.append(fn)
@@ -45,7 +76,7 @@
     def __repr__(self) -> str:
         return (
             f"<{self.__class__.__name__} name='{self._name}' "
-            f"id='{hex(id(self))}'>"
+            f"state={self._state} id='{hex(id(self))}'>"
         )
 
     def __iter__(self) -> Generator["Future", None, Any]:
Task v5
from typing import Any, Generator

from future import CancelledError, Future
from loop import Loop


class Task(Future):
    """Task v5"""

    def __init__(self, coroutine: Generator[Any, None, Any], name: str):
        super().__init__(name)
        self._coroutine = coroutine
        self._loop = Loop.get_current_loop()
        self._fut_waiter = None
        self._must_cancel = False
        self.schedule()

    def step(self, exc: Exception = None) -> None:
        self._fut_waiter = None

        if self._must_cancel:
            if not isinstance(exc, CancelledError):
                exc = CancelledError()
            self._must_cancel = False

        try:
            if exc is None:
                yielded = self._coroutine.send(None)
            else:
                # This may also be a cancellation.
                yielded = self._coroutine.throw(exc)
        except StopIteration as e:
            self.set_result(e.value)
        except CancelledError:
            # coroutine is cancelled
            # update task status via future
            super().cancel()
        else:
            # no result yet
            if isinstance(yielded, Future):
                # we are blocked by some external event for example waiting for
                # incoming data. let's wait until result is available
                yielded.add_done_callback(self._wakeup)
                self._fut_waiter = yielded
                if self._must_cancel:  # may have been set since last suspend
                    if yielded.cancel():
                        self._must_cancel = False
            else:
                # just schedule again
                self.schedule()

    def schedule(self, exc: Exception = None) -> None:
        self._loop.schedule(self._name, self.step, exc)

    def cancel(self) -> bool:
        if self.done():
            return False

        if self._fut_waiter is not None:
            # we are waiting for a "blocked" result
            if self._fut_waiter.cancel():
                return True

        # task is just scheduled now
        self._must_cancel = True
        return True

    def _wakeup(self, future: Future) -> None:
        try:
            future.result()
        except Exception as exc:
            # This may also be a cancellation.
            self.schedule(exc)
        else:
            self.schedule()
Task v5
--- task4.py	2022-11-02 16:25:54.369385455 +0100
+++ task5.py	2022-11-09 12:17:00.995591335 +0100
@@ -1,36 +1,75 @@
 from typing import Any, Generator
 
-from future import Future
-
+from future import CancelledError, Future
 from loop import Loop
 
 
 class Task(Future):
-    """Task v4"""
+    """Task v5"""
 
     def __init__(self, coroutine: Generator[Any, None, Any], name: str):
         super().__init__(name)
         self._coroutine = coroutine
         self._loop = Loop.get_current_loop()
+        self._fut_waiter = None
+        self._must_cancel = False
         self.schedule()
 
-    def step(self) -> None:
+    def step(self, exc: Exception = None) -> None:
+        self._fut_waiter = None
+
+        if self._must_cancel:
+            if not isinstance(exc, CancelledError):
+                exc = CancelledError()
+            self._must_cancel = False
+
         try:
-            yielded = next(self._coroutine)
+            if exc is None:
+                yielded = self._coroutine.send(None)
+            else:
+                # This may also be a cancellation.
+                yielded = self._coroutine.throw(exc)
         except StopIteration as e:
             self.set_result(e.value)
+        except CancelledError:
+            # coroutine is cancelled
+            # update task status via future
+            super().cancel()
         else:
             # no result yet
             if isinstance(yielded, Future):
                 # we are blocked by some external event for example waiting for
                 # incoming data. let's wait until result is available
                 yielded.add_done_callback(self._wakeup)
+                self._fut_waiter = yielded
+                if self._must_cancel:  # may have been set since last suspend
+                    if yielded.cancel():
+                        self._must_cancel = False
             else:
                 # just schedule again
                 self.schedule()
 
-    def schedule(self) -> None:
-        self._loop.schedule(self._name, self.step)
+    def schedule(self, exc: Exception = None) -> None:
+        self._loop.schedule(self._name, self.step, exc)
 
-    def _wakeup(self, _future: Future) -> None:
-        self.schedule()
+    def cancel(self) -> bool:
+        if self.done():
+            return False
+
+        if self._fut_waiter is not None:
+            # we are waiting for a "blocked" result
+            if self._fut_waiter.cancel():
+                return True
+
+        # task is just scheduled now
+        self._must_cancel = True
+        return True
+
+    def _wakeup(self, future: Future) -> None:
+        try:
+            future.result()
+        except Exception as exc:
+            # This may also be a cancellation.
+            self.schedule(exc)
+        else:
+            self.schedule()
from future import CancelledError, Future
from loop import Loop
from wait import wait


def some_result(result):
    future = Future("Some Result")
    future.set_result(result)
    return (yield from future)


def cancelled_result(result):
    future = Future("Some Result")
    future.cancel()
    return (yield from future)


def add(coroutine1, coroutine2):
    x, y = yield from wait([coroutine1, coroutine2])
    return x + y


def main():
    return (yield from add(some_result(1), cancelled_result(2)))


loop = Loop.get_current_loop()
try:
    result = loop.run(main())
    print("Loop finished with result", result)
except CancelledError:
    print("Loop got cancelled")

Output:

Loop step 1 [<Handle name='Initial Task' callback='step'>]
Loop step 2 [<Handle name='Task for some_result' callback='step'>, <Handle name='Task for cancelled_result' callback='step'>]
Loop step 3 [<Handle name='Some Result' callback='_wakeup'>, <Handle name='Some Result' callback='_wakeup'>]
Loop step 4 [<Handle name='Task for some_result' callback='step'>, <Handle name='Task for cancelled_result' callback='step'>]
Loop step 5 [<Handle name='Task for some_result' callback='_on_completion'>, <Handle name='Task for cancelled_result' callback='_on_completion'>]
Loop step 6 [<Handle name='Waiter for <Task name='Task for some_result' state=FutureState.PENDING id='0x7f3cd90b6530'>, <Task name='Task for cancelled_result' state=FutureState.PENDING id='0x7f3cd90b65c0'>' callback='_wakeup'>]
Loop step 7 [<Handle name='Initial Task' callback='step'>]
Loop step 8 [<Handle name='Initial Task' callback='_done'>]
Loop got cancelled