Step 15 - And what about real Async IO?¶
Until now we didn’t handle real Asynchronous IO like reading and writing from and to network sockets. Because this topic heavily depends on the used operating system and loop implementation it is handled here only schematically for a Linux based system.
from typing import Any, Callable, Generator
from future import Future
from handle import Handle
# this is just an non runable example how it could possibly work
#
# the real implementation is more complex and depends on the operating system,
# chosen loop implementation, socket type, selector implementation, ...
def create_socket(host) -> Socket:
# just create a socket connection to a host
sock = socket.socket()
sock.setblocking(False)
sock.connect(host)
return sock # a socket
class Loop:
"""Loop v6"""
_instance: "Loop" = None
def __init__(self):
self._running = False
self._scheduled = []
self._selector = Selector() # Some Selector class
@classmethod
def get_current_loop(cls) -> "Loop":
if not cls._instance:
cls._instance = Loop()
return cls._instance
def create_connection(self, host: str):
waiter = Future()
socket = create_socket(host)
self._selector.add(socket, Handle(self._receive_data, waiter, socket))
return waiter
def _receive_data(future: Future, socket: Socket):
data = socket.recv()
future.set_result(data)
def run_step(self) -> None:
"""Run a single step/tick of the loop"""
if not self._scheduled:
timeout = None # wait forever until data is available
else:
timeout = 0 # just get the sockets with data
# wait and block for data depending on the timeout
# returns an iterable of handles which corresponding sockets have data
handles = self._selector.select(timeout)
if handles:
self._scheduled.extend(handles)
for _ in range(len(self._scheduled)):
handle = self._scheduled.pop(0) # fifo: extract first item
handle.run()
def run_loop(self) -> None:
"""Run the loop"""
self._running = True
step = 1
while self._running:
print("Loop step", step, self._scheduled)
self.run_step()
step += 1
def run(self, coroutine: Generator[Any, None, Any]) -> Any:
"""Run a coroutine until it is done/completed"""
from task import Task # avoid cyclic dependency
# create a root task for our coroutine
# the tasks gets scheduled immediately in its constructor
task = Task(coroutine, "Initial Task")
task.add_done_callback(self._done)
self.run_loop()
return task.result()
def stop(self) -> None:
"""Stop running the loop"""
self._running = False
def schedule(self, name: str, callback: Callable, *args: Any) -> None:
"""Schedule a callback for the next step/tick"""
self._scheduled.append(Handle(name, callback, args))
def _done(self, _future) -> None:
self.stop()
--- loop5.py 2022-11-02 15:31:25.547688722 +0100
+++ step15/loop.py 2022-11-02 16:21:46.390948993 +0100
@@ -1,16 +1,31 @@
from typing import Any, Callable, Generator
+from future import Future
from handle import Handle
+# this is just an non runable example how it could possibly work
+#
+# the real implementation is more complex and depends on the operating system,
+# chosen loop implementation, socket type, selector implementation, ...
+
+
+def create_socket(host) -> Socket:
+ # just create a socket connection to a host
+ sock = socket.socket()
+ sock.setblocking(False)
+ sock.connect(host)
+ return sock # a socket
+
class Loop:
- """Loop v5"""
+ """Loop v6"""
_instance: "Loop" = None
def __init__(self):
self._running = False
self._scheduled = []
+ self._selector = Selector() # Some Selector class
@classmethod
def get_current_loop(cls) -> "Loop":
@@ -18,26 +33,32 @@
cls._instance = Loop()
return cls._instance
+ def create_connection(self, host: str):
+ waiter = Future()
+ socket = create_socket(host)
+ self._selector.add(socket, Handle(self._receive_data, waiter, socket))
+ return waiter
+
+ def _receive_data(future: Future, socket: Socket):
+ data = socket.recv()
+ future.set_result(data)
+
def run_step(self) -> None:
"""Run a single step/tick of the loop"""
- # execute all current known handles only.
- # not the ones added while running the handle callbacks
- scheduled = self._scheduled.copy()
- self._scheduled.clear()
-
- try:
- handle = scheduled.pop(0) # fifo: extract first item
- while handle is not None:
- handle.run()
- handle = scheduled.pop(0) # fifo: extract first item
- except IndexError:
- # list is empty
- pass
- # a shorter and even more smarter version for the above code could be
- #
- # for _ in range(len(self._scheduled)):
- # handle = self._scheduled.pop(0) # fifo: extract first item
- # handle.run()
+ if not self._scheduled:
+ timeout = None # wait forever until data is available
+ else:
+ timeout = 0 # just get the sockets with data
+
+ # wait and block for data depending on the timeout
+ # returns an iterable of handles which corresponding sockets have data
+ handles = self._selector.select(timeout)
+ if handles:
+ self._scheduled.extend(handles)
+
+ for _ in range(len(self._scheduled)):
+ handle = self._scheduled.pop(0) # fifo: extract first item
+ handle.run()
def run_loop(self) -> None:
"""Run the loop"""
from loop import Loop
from wait import wait
def load_data(host):
connection = loop.create_connection(host)
return (yield from connection)
def add(coroutine1, coroutine2):
x, y = yield from wait([coroutine1, coroutine2])
return x + y
def main():
return (yield from add(load_data("1.2.3.4"), load_data("4.2.3.1")))
loop = Loop.get_current_loop()
result = loop.run(main())
print("Loop finished with result", result)
The idea is to have a Selector
API that does select
calls on sockets internally. If no handle is scheduled and no data is available
the select
calls do block the current thread (and therefore our loop) until
data is available. Otherwise the Selector
API returns the corresponding
Handler
for sockets with available data. The Selector
API is called at every
step/tick before the Handler
s are run to schedule the socket Handler
s for
this step.
To allow the Loop
to track all socket communication the creation of the socket
must originate in the Loop
. In our case it is done via the create_connection
method. This allows for pairing a Future
with a socket in a Handler
.
Summary
All I/O must originate in our
Loop
.The
Loop
provides methods for creating network connections.The
Loop
tracks the network connections and notifies waitingFuture
s.