Step 15 - And what about real Async IO?

Until now we didn’t handle real Asynchronous IO like reading and writing from and to network sockets. Because this topic heavily depends on the used operating system and loop implementation it is handled here only schematically for a Linux based system.

Loop v6
from typing import Any, Callable, Generator

from future import Future
from handle import Handle

# this is just an non runable example how it could possibly work
#
# the real implementation is more complex and depends on the operating system,
# chosen loop implementation, socket type, selector implementation, ...


def create_socket(host) -> Socket:
    # just create a socket connection to a host
    sock = socket.socket()
    sock.setblocking(False)
    sock.connect(host)
    return sock  # a socket


class Loop:
    """Loop v6"""

    _instance: "Loop" = None

    def __init__(self):
        self._running = False
        self._scheduled = []
        self._selector = Selector()  # Some Selector class

    @classmethod
    def get_current_loop(cls) -> "Loop":
        if not cls._instance:
            cls._instance = Loop()
        return cls._instance

    def create_connection(self, host: str):
        waiter = Future()
        socket = create_socket(host)
        self._selector.add(socket, Handle(self._receive_data, waiter, socket))
        return waiter

    def _receive_data(future: Future, socket: Socket):
        data = socket.recv()
        future.set_result(data)

    def run_step(self) -> None:
        """Run a single step/tick of the loop"""
        if not self._scheduled:
            timeout = None  # wait forever until data is available
        else:
            timeout = 0  # just get the sockets with data

        # wait and block for data depending on the timeout
        # returns an iterable of handles which corresponding sockets have data
        handles = self._selector.select(timeout)
        if handles:
            self._scheduled.extend(handles)

        for _ in range(len(self._scheduled)):
            handle = self._scheduled.pop(0)  # fifo: extract first item
            handle.run()

    def run_loop(self) -> None:
        """Run the loop"""
        self._running = True
        step = 1
        while self._running:
            print("Loop step", step, self._scheduled)
            self.run_step()
            step += 1

    def run(self, coroutine: Generator[Any, None, Any]) -> Any:
        """Run a coroutine until it is done/completed"""
        from task import Task  # avoid cyclic dependency

        # create a root task for our coroutine
        # the tasks gets scheduled immediately in its constructor
        task = Task(coroutine, "Initial Task")
        task.add_done_callback(self._done)
        self.run_loop()
        return task.result()

    def stop(self) -> None:
        """Stop running the loop"""
        self._running = False

    def schedule(self, name: str, callback: Callable, *args: Any) -> None:
        """Schedule a callback for the next step/tick"""
        self._scheduled.append(Handle(name, callback, args))

    def _done(self, _future) -> None:
        self.stop()
Loop v6
--- loop5.py	2022-11-02 15:31:25.547688722 +0100
+++ step15/loop.py	2022-11-02 16:21:46.390948993 +0100
@@ -1,16 +1,31 @@
 from typing import Any, Callable, Generator
 
+from future import Future
 from handle import Handle
 
+# this is just an non runable example how it could possibly work
+#
+# the real implementation is more complex and depends on the operating system,
+# chosen loop implementation, socket type, selector implementation, ...
+
+
+def create_socket(host) -> Socket:
+    # just create a socket connection to a host
+    sock = socket.socket()
+    sock.setblocking(False)
+    sock.connect(host)
+    return sock  # a socket
+
 
 class Loop:
-    """Loop v5"""
+    """Loop v6"""
 
     _instance: "Loop" = None
 
     def __init__(self):
         self._running = False
         self._scheduled = []
+        self._selector = Selector()  # Some Selector class
 
     @classmethod
     def get_current_loop(cls) -> "Loop":
@@ -18,26 +33,32 @@
             cls._instance = Loop()
         return cls._instance
 
+    def create_connection(self, host: str):
+        waiter = Future()
+        socket = create_socket(host)
+        self._selector.add(socket, Handle(self._receive_data, waiter, socket))
+        return waiter
+
+    def _receive_data(future: Future, socket: Socket):
+        data = socket.recv()
+        future.set_result(data)
+
     def run_step(self) -> None:
         """Run a single step/tick of the loop"""
-        # execute all current known handles only.
-        # not the ones added while running the handle callbacks
-        scheduled = self._scheduled.copy()
-        self._scheduled.clear()
-
-        try:
-            handle = scheduled.pop(0)  # fifo: extract first item
-            while handle is not None:
-                handle.run()
-                handle = scheduled.pop(0)  # fifo: extract first item
-        except IndexError:
-            # list is empty
-            pass
-        # a shorter and even more smarter version for the above code could be
-        #
-        # for _ in range(len(self._scheduled)):
-        #     handle = self._scheduled.pop(0)  # fifo: extract first item
-        #     handle.run()
+        if not self._scheduled:
+            timeout = None  # wait forever until data is available
+        else:
+            timeout = 0  # just get the sockets with data
+
+        # wait and block for data depending on the timeout
+        # returns an iterable of handles which corresponding sockets have data
+        handles = self._selector.select(timeout)
+        if handles:
+            self._scheduled.extend(handles)
+
+        for _ in range(len(self._scheduled)):
+            handle = self._scheduled.pop(0)  # fifo: extract first item
+            handle.run()
 
     def run_loop(self) -> None:
         """Run the loop"""
from loop import Loop
from wait import wait


def load_data(host):
    connection = loop.create_connection(host)
    return (yield from connection)


def add(coroutine1, coroutine2):
    x, y = yield from wait([coroutine1, coroutine2])
    return x + y


def main():
    return (yield from add(load_data("1.2.3.4"), load_data("4.2.3.1")))


loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)

The idea is to have a Selector API that does select calls on sockets internally. If no handle is scheduled and no data is available the select calls do block the current thread (and therefore our loop) until data is available. Otherwise the Selector API returns the corresponding Handler for sockets with available data. The Selector API is called at every step/tick before the Handlers are run to schedule the socket Handlers for this step.

To allow the Loop to track all socket communication the creation of the socket must originate in the Loop. In our case it is done via the create_connection method. This allows for pairing a Future with a socket in a Handler.

Summary

  • All I/O must originate in our Loop.

  • The Loop provides methods for creating network connections.

  • The Loop tracks the network connections and notifies waiting Futures.