Modular Async Modeling

Sept. 5th, 2024

In many modern programming languages, you usually have access to async primitives and general support for asynchronous programming. C++ has coroutines. Go has goroutines (lol). Javascript, Rust, C#, and many others have async/await. This is not the case in Zig where the way to reap the benefits of async modeling is to use an event loop[1].

There are a few different implementations in the Zig ecosystem, such as libxev, zig-aio, and the various ones that exist for C[2][3] that can be utilized through @cImport. For zzz, the web server hosting this site, I have rolled my own to achieve very specific goals.

[1]: Some async implementations use event loops under the hood, such as Javascript.
[2]: libuv
[3]: libevent

Importance of Modularity

The design of zzz is tightly coupled with the concept of portability and modularity. Functionality that is platform dependent, such as I/O, should support sane defaults for common platforms while also providing methods for swapping in user defined modules.

One of my core goals when creating zzz was to have a networking library that operates the same, irrespective of platform. This modularity is core for utilization in embedded or bare metal domains where socket abstractions (and apis such as io_uring or iocp) may not exist and you must roll your own functions for handling I/O. This means that we must have the ability for the consumer of this library to provide their own asynchronous backends.

The Event Loop

Initally, we want to consider what the core functionality these asynchronous backends need to provide. There are a couple we can immediately consider:

  • queue accept
  • queue recv
  • queue send

We must also consider how we interact with all of these queued events:

  • submit these jobs to be run
  • reap the completed jobs to be handled

These functions need to be transparent to the zzz core and they (may) need to each carry their own internal payloads.

            
                pub const Completion = struct {
                    context: *anyopaque,
                    result: i32,
                };

                pub const Async = struct {
                    runner: *anyopaque,
                    completions: [256]Completion,

                    _queue_accept: *const fn (self: *Async, context: *anyopaque, socket: Socket) AsyncError!void,
                    _queue_recv: *const fn (self: *Async, context: *anyopaque, socket: Socket, buffer: []u8) AsyncError!void,
                    _queue_send: *const fn (self: *Async, context: *anyopaque, socket: Socket, buffer: []const u8) AsyncError!void,
                    _reap: *const fn (self: *Async) AsyncError![]Completion,
                    _submit: *const fn (self: *Async) AsyncError!void,
                };
            
        

Note: There will be an abstracted Socket type that evaluates at compile time to be the socket of the system or an integer if there isn't one.

The Async struct is what abstracts every backend passed to zzz. It contains a field, runner: *anyopaque[4], storing a pointer to whatever the internal implementation uses.

It also contains a buffer of 256 Completion structs, each storing a result (i32) and a field, context: *anyopaque, for internal implementation use.

[4]: *anyopaque is the same as void* in C.

Integrating io_uring

Now that we have an idea of what we need in our generic Async struct, we can design a backend using my favorite async I/O api, io_uring.

            
                pub const AsyncIoUring = struct {
                    runner: *anyopaque,
                    completions: [256]Completion,

                    pub fn init(uring: *std.os.linux.IoUring) AsyncIoUring {
                        return AsyncIoUring{
                            .runner = uring,
                            .completions = [_]Completion{undefined} ** 256,
                        };
                    }
                    pub fn queue_accept(self: *Async, context: *anyopaque, socket: std.posix.socket_t) AsyncError!void {
                        const uring: *std.os.linux.IoUring = @ptrCast(@alignCast(self.runner));
                        _ = uring.accept(@as(u64, @intFromPtr(context)), socket, null, null, 0) catch unreachable;
                    }

                    pub fn queue_recv(self: *Async, context: *anyopaque, socket: std.posix.socket_t, buffer: []u8) AsyncError!void {
                        const uring: *std.os.linux.IoUring = @ptrCast(@alignCast(self.runner));
                        _ = uring.recv(@as(u64, @intFromPtr(context)), socket, .{ .buffer = buffer }, 0) catch unreachable;
                    }

                    pub fn queue_send(self: *Async, context: *anyopaque, socket: std.posix.socket_t, buffer: []const u8) AsyncError!void {
                        const uring: *std.os.linux.IoUring = @ptrCast(@alignCast(self.runner));
                        _ = uring.send(@as(u64, @intFromPtr(context)), socket, buffer, 0) catch unreachable;
                    }

                    pub fn submit(self: *Async) AsyncError!void {
                        const uring: *std.os.linux.IoUring = @ptrCast(@alignCast(self.runner));
                        _ = uring.submit() catch unreachable;
                    }

                    pub fn reap(self: *Async) AsyncError![]Completion {
                        const uring: *std.os.linux.IoUring = @ptrCast(@alignCast(self.runner));
                        var cqes: [256]std.os.linux.io_uring_cqe = [_]std.os.linux.io_uring_cqe{undefined} ** 256;
                        const count = uring.copy_cqes(cqes[0..], 1) catch unreachable;

                        const min = @min(self.completions.len, count);

                        for (0..min) |i| {
                            self.completions[i] = Completion{
                                .result = cqes[i].res,
                                .context = @ptrFromInt(@as(usize, @intCast(cqes[i].user_data))),
                            };
                        }

                        return self.completions[0..min];
                    }

                };
            
        

The AsyncIoUring implementation stores an instance of a uring in the runner: *anyopaque field, allowing for all invocations to have access to this uring. This backend can then utilize the functionality provided by the io_uring api to queue network operations.

These operations also utilize the context: *anyopaque field from the Completion, to attach user data (the internal state of this connection[5]) to the completion queue entry. submit and reap are self-explainatory, submitting completions or reaping and returning processed ones respectively.

[5]: You can see how it is used later in the Core Loop section
            
                pub fn to_async(self: *AsyncIoUring) Async {
                    return Async{
                        .runner = self.runner,
                        .completions = self.completions,
                        ._queue_accept = queue_accept,
                        ._queue_recv = queue_recv,
                        ._queue_send = queue_send,
                        ._submit = submit,
                        ._reap = reap,
                    };
                }
            
        

We then provide a method called to_async that just converts this implementation to an Async instance. This implementation will now work with zzz and a variation of this snippet is running and serving this site at the moment.

The Core Loop

There is an initial preamble that is required for creating the first accept and submitting it to the backend. The core loop of zzz works by reaping atleast one completion from the backend, iterating through all of the reaped completions, and handling each case appropriately (and likely submitting new actions to the backend).

             
                const first = ...;
                _ = try backend.queue_accept(&first, server_socket);
                try backend.submit();

                while (true) {
                    const completions = try backend.reap();
                    const completions_count = completions.len;
                    assert(completions_count > 0);

                    reap_loop: for (completions[0..completions_count]) |completion| {
                        const p: *Provision = @ptrCast(@alignCast(completion.context));

                        switch (p.job) {
                            .Accept => {
                                // Handle Accept...
                            },

                            .Recv => {
                                // Handle Recv...
                            },

                            .Send => {
                                // Handle Send...
                            },
                        }
                    }

                    try backend.submit();
                }
             
        

Various implementation details have been omitted for brevity but the philsophy around the core loop remains the same. The Provision type is internal and is not important for this example but remains to highlight how the context field of the Completion is utilized.