Select Syscall in Rust

EDIT: Thanks to @steveklabnik for pointing out on Reddit that I was initially doing it wrong with the uninit thingy, that was changed in this commit

Disclaimer

  1. I am not a Rust expert, I am just getting started, so please take everything you read here with a grain of salt.
  2. You probably don’t want to use any of this in production code. There are libraries written by actual Rust developers providing similar features, in a way that will most certainly be more efficient and more reusable, such as Tokio and nix. Additionally, select is rarely use these days, kqueue and epoll tend to be preferred.

Good? Let’s get to it now.

The select syscall

The select syscall is really useful to write systems using an event loop. A good example is a database running as a TCP server. A database would want to maintain client connections open, and respond to them when then send queries to the server, and select can help with that.

select works by accepting four arguments, a list of file descriptors that are ready to be read from, a list of file descriptors that are ready to be written to, a list of file descriptors that have an exceptional condition pending and finally, an optional timeout. We give these file descriptors to select and it’ll return as soon as one or more file descriptors are ready.

A file descriptor, often abbreviated to fd is the identifier of a file or other input/output resource, such as a pipe or network socket. It is based on the idea that, in UNIX, everything is a file. In practical terms file descriptors are non-negative integers.

In the database example, we would give file descriptors for the connected clients, based on the sockets we created when they connected, and select would wait until one of them is ready. Essentially, as soon as the TCP server receives something from one of the clients, the server would be able to respond.

If you’re looking for real world example, Redis does exactly this if configured to use select, in ae_select.c. As noted earlier, it prefers other polling mechanisms, such as kqueue and epoll and select is only a last resort option.

pselect is very similar to select, with the difference that it accepts a timespec instead of a timeval and accepts a mask argument to have better control over signal handling.

Using the libc crate

Rust itself does not provide a way to call select or pselect directly from the standard library, but the Rust developers created the libc library that provides bindings to these, and more.

The nix crate mentioned at the beginning of this post uses the libc crate under the hood. This is how it provides bindings to the select syscall: https://github.com/nix-rust/nix/blob/master/src/sys/select.rs

Show me some code

The following is an example of using select to be notified when a TCP server sent something back to a connected client. For this example I was using a TCP server running in Ruby with the following code running in irb:

irb(main):001:0> require 'socket'
=> true
irb(main):002:0> server = TCPServer.new 'localhost', 2000
irb(main):003:0> client1 = server.accept; client2 = server.accept; client3 = server.accept

The last line does not return until three clients connect, which is what the following Rust code does:

extern crate libc;

use std::net::TcpStream;
use std::os::unix::io::{AsRawFd, RawFd};
use std::{io, mem, ptr, time};

pub struct FdSet(libc::fd_set);

impl FdSet {
    pub fn new() -> FdSet {
        unsafe {
            let mut raw_fd_set = mem::MaybeUninit::<libc::fd_set>::uninit();
            libc::FD_ZERO(raw_fd_set.as_mut_ptr());
            FdSet(raw_fd_set.assume_init())
        }
    }
    pub fn clear(&mut self, fd: RawFd) {
        unsafe { libc::FD_CLR(fd, &mut self.0) }
    }
    pub fn set(&mut self, fd: RawFd) {
        unsafe { libc::FD_SET(fd, &mut self.0) }
    }
    pub fn is_set(&mut self, fd: RawFd) -> bool {
        unsafe { libc::FD_ISSET(fd, &mut self.0) }
    }
}

fn to_fdset_ptr(opt: Option<&mut FdSet>) -> *mut libc::fd_set {
    match opt {
        None => ptr::null_mut(),
        Some(&mut FdSet(ref mut raw_fd_set)) => raw_fd_set,
    }
}

fn to_ptr<T>(opt: Option<&T>) -> *const T {
    match opt {
        None => ptr::null::<T>(),
        Some(p) => p,
    }
}

pub fn select(
    nfds: libc::c_int,
    readfds: Option<&mut FdSet>,
    writefds: Option<&mut FdSet>,
    errorfds: Option<&mut FdSet>,
    timeout: Option<&libc::timeval>,
) -> io::Result<usize> {
    match unsafe {
        libc::select(
            nfds,
            to_fdset_ptr(readfds),
            to_fdset_ptr(writefds),
            to_fdset_ptr(errorfds),
            to_ptr::<libc::timeval>(timeout) as *mut libc::timeval,
        )
    } {
        -1 => Err(io::Error::last_os_error()),
        res => Ok(res as usize),
    }
}

pub fn make_timeval(duration: time::Duration) -> libc::timeval {
    libc::timeval {
        tv_sec: duration.as_secs() as i64,
        tv_usec: duration.subsec_micros() as i32,
    }
}

pub fn connect_to_localhost_2000() -> TcpStream {
    TcpStream::connect("localhost:2000").expect("Failed to connect to localhost 2000")
}

fn main() {
    let mut fd_set = FdSet::new();

    let stream1 = connect_to_localhost_2000();
    let raw_fd1 = stream1.as_raw_fd();

    let stream2 = connect_to_localhost_2000();
    let raw_fd2 = stream2.as_raw_fd();

    let stream3 = connect_to_localhost_2000();
    let raw_fd3 = stream3.as_raw_fd();

    // let raw_fd2 = connect_to_localhost_2000().as_raw_fd(); DOES NOT WORK

    let max_fd = raw_fd1.max(raw_fd2.max(raw_fd3));

    println!("Socket 1: {}", raw_fd1);
    println!("Socket 2: {}", raw_fd2);
    println!("Socket 3: {}", raw_fd3);

    fd_set.set(raw_fd1);
    fd_set.set(raw_fd2);
    fd_set.set(raw_fd3);

    match select(
        max_fd + 1,
        Some(&mut fd_set),                               // read
        None,                                            // write
        None,                                            // error
        Some(&make_timeval(time::Duration::new(10, 0))), // timeout
    ) {
        Ok(res) => {
            println!("select result: {}", res);

            let range = std::ops::Range {
                start: 0,
                end: max_fd + 1,
            };
            for i in range {
                if (fd_set).is_set(i) {
                    println!("Socket {} received something!", i);
                }
            }
        }
        Err(err) => {
            println!("Failed to select: {:?}", err);
        }
    }
}

Let’s break it down, first we declare a Rust struct, FdSet, which wraps libc::fd_set. An fd_set is not the same on every platform, but based on the libc source, we can see that it is defined as an array of integers:

pub struct fd_set {
	#[cfg(all(target_pointer_width = "64",
			  any(target_os = "freebsd", target_os = "dragonfly")))]
	fds_bits: [i64; FD_SETSIZE / 64],
	#[cfg(not(all(target_pointer_width = "64",
				  any(target_os = "freebsd", target_os = "dragonfly"))))]
	fds_bits: [i32; FD_SETSIZE / 32],
}

We can use the macros that come with select to interact with it without having to worry too much about the underlying implementation details, FD_ZERO, FD_CLR, FD_SET and FD_ISSET

The next block in impl FdSet provides Rust functions for the FdSet type to use these macros. In the new function we create a new instance of FdSet, while using MaybeUninit to prevent Rust to do what it does for regular variables and that it essentially should trust us here, we know what we’re doing. You can read more on the topic in the “out-pointers” section of the MaybeUninit docs.

FD_ZERO is used to make sure that the integers that were allocated are in a clean state, technically speaking the OS does not have to clear the bits that were allocated, so we do it, just in case.

The next two functions, to_fdset_ptr and to_ptr are helper functions to convert some Rust-y things such as Option values into C things, like a null pointer.

Next is the actual binding to libc::select, where we accept all the values we want to pass, as idiomatic Rust values, that is some Option wrapped FdSet values, instead of explicit null values.

The return value is also translated from the C tradition of returning -1 if something went wrong to a Result type, which allows us to use pattern matching when dealing with the return value of select.

The timeout passed to select is a timeval, which is a bit verbose to write, so we also create a helper function to instantiate one based on a Rust Duration value.

You can read more about the details of select in the man page, with man 2 select, which is also available online for linux, and macOS.

We will connect three clients to the server so we created a small helper function to do that for us, connect_to_localhost_2000. We use the as_raw_fd function, from the TcpStream type, which returns a RawFd, which an alias for an integer, specifically c_int, which itself is an alias for i32, a 32-bit signed integer, for most platforms.

And now, the main method, we start by creating an FdSet, this will be the one and only fd_set we’ll give to select since we don’t care about writable sockets neither do we care about the ones with exception pending in this example.

We then connect three sockets using connect_to_localhost_2000.


Note that while it might look tempting to write the following if you come from a different language:

let raw_fd = connect_to_localhost_2000().as_raw_fd(); DOES NOT WORK

This will not work due to how Rust automatically releases variables that are not needed anymore. In this case the TcpStream variable returned by connect_to_localhost_2000 is used to call as_raw_fd() but is not needed anymore after that, and Rust will release it. The impact is that Rust knows that the release process for this variable involves closing the socket, which we absolutely do not want here. We need the socket to stay open until the end of the main function, so that select can use it. One way of doing this is to explicitly create a variable for the stream, which will force it to stay in scope for the rest of the function.


Back to main, we create the max_fd variable and set it to the max value of the three raw sockets. This is necessary because the first argument to select must be the the value of the file descriptors given, plus one. Most of the time the file descriptors are incremented, and while running this, my machine was consistently creating these three sockets as 5, 6 and 7, but this is not something that we should rely on, and explicitly grabbing the max value is more reliable.

We then need to prepare the fd_set variable for select, and this is what FD_SET is for, which we use through the fd_set function. It will take care of setting the correct bits inside the fd_set array to store the information of the file descriptors.

We can then call select, with max_fd + 1, as mentioned above. If we have passed min fd value plus one instead, only that socket would have been monitored. If you’re coming from a higher level language, such as Ruby, this might seem odd, but is essentially an “optimized” (one might see it as convoluted) way of passing an array of integers to select. select will know it will not have to look for file descriptors with a value greater than this value.

We also pass a timeout of ten seconds, which is an arbitrary value, and None values for the other arguments.

The function returns a Result, which will be an Err if something went wrong, for instance if we had used the inline version mentioned above, we would have received the following error due to the socket being closed: Failed to select: Os { code: 9, kind: Other, message: "Bad file descriptor" }

On the other hand if the result is successful, we want to know which sockets can be read from. To do that, we need to iterate through the range of all possible file descriptors that could have been described by fd_set, which is all the number between 0 and max_fd, inclusive.

For each of the file descriptors, we use the FD_ISSET macro, through the is_set function to ask if this file descriptor is set in fd_set, it will only be set if the file descriptor can be read from.

If we had been interested in which sockets could be written to, we would have created a different fd_set and use the same approach on each fd_set.

In other words, select modifies the fd_set you give it and sets the internal bits for the file descriptors that are ready, it’s then up to you to look at the content of fd_set and detect which file descriptors are ready. It is also up to you to keep track of which fd_set was given to be notified for readability and which one was given for writability.

One metaphor to explain this process is that we give select a huge piece of paper with a list of file descriptor ids, each followed by an empty checkbox and we tell it to not look past line n. n here is the equivalent of the max_fd argument we just discussed.

select gives it back to us with the checkbox checked for all the ones that are ready.

If you’re testing this locally, you’ll only have ten seconds to do something from irb, feel free to change the value, or pass None if you want an infinite timeout.

When writing to a single client in Ruby, with client2.write '123', I got the following output for my Rust program:

Socket 1: 5
Socket 2: 6
Socket 3: 7
select result: 1
Socket 6 received something!

And we can see that if multiple sockets are ready, they’re all detected, which we can test by writing to two clients with client2.write '123'; client1.write '456', which gives us the following output:

Socket 1: 5
Socket 2: 6
Socket 3: 7
select result: 2
Socket 5 received something!
Socket 6 received something!

It works!

Conclusion

At the risk of repeating myself, the purpose of all this is only to learn more about Rust and select, if you’re writing a real application, look into nix and Tokio instead.

Most of the code was adapted from this Gist I found on Reddit.

You can find the code on GitHub


Liked this post? You might like my free online book about Rebuilding Redis, in Ruby.


Appendix: Same example, but with pselect:

extern crate libc;

use std::net::TcpStream;
use std::os::unix::io::AsRawFd;
use std::os::unix::io::RawFd;
use std::{io, mem, ptr, time};

pub struct FdSet(libc::fd_set);

impl FdSet {
    pub fn new() -> FdSet {
        unsafe {
            let mut raw_fd_set = mem::MaybeUninit::<libc::fd_set>::uninit();
            libc::FD_ZERO(raw_fd_set.as_mut_ptr());
            FdSet(raw_fd_set.assume_init())
        }
    }
    pub fn clear(&mut self, fd: RawFd) {
        unsafe { libc::FD_CLR(fd, &mut self.0) }
    }
    pub fn set(&mut self, fd: RawFd) {
        unsafe { libc::FD_SET(fd, &mut self.0) }
    }
    pub fn is_set(&mut self, fd: RawFd) -> bool {
        unsafe { libc::FD_ISSET(fd, &mut self.0) }
    }
}

fn to_fdset_ptr(opt: Option<&mut FdSet>) -> *mut libc::fd_set {
    match opt {
        None => ptr::null_mut(),
        Some(&mut FdSet(ref mut raw_fd_set)) => raw_fd_set,
    }
}

fn to_ptr<T>(opt: Option<&T>) -> *const T {
    match opt {
        None => ptr::null::<T>(),
        Some(p) => p,
    }
}

pub fn pselect(
    nfds: libc::c_int,
    readfds: Option<&mut FdSet>,
    writefds: Option<&mut FdSet>,
    errorfds: Option<&mut FdSet>,
    timeout: Option<&libc::timespec>,
    sigmask: Option<&libc::sigset_t>,
) -> io::Result<usize> {
    match unsafe {
        libc::pselect(
            nfds,
            to_fdset_ptr(readfds),
            to_fdset_ptr(writefds),
            to_fdset_ptr(errorfds),
            to_ptr(timeout),
            to_ptr(sigmask),
        )
    } {
        -1 => Err(io::Error::last_os_error()),
        res => Ok(res as usize),
    }
}

pub fn make_timespec(duration: time::Duration) -> libc::timespec {
    libc::timespec {
        tv_sec: duration.as_secs() as i64,
        tv_nsec: duration.subsec_nanos() as i64,
    }
}

pub fn connect_to_localhost_2000() -> TcpStream {
    TcpStream::connect("localhost:2000").expect("Failed to connect to localhost 2000")
}

fn main() {
    let ten_seconds = time::Duration::new(10, 0);
    let mut fd_set = FdSet::new();

    let stream1 = connect_to_localhost_2000();
    let raw_fd1 = stream1.as_raw_fd();

    let stream2 = connect_to_localhost_2000();
    let raw_fd2 = stream2.as_raw_fd();

    let stream3 = connect_to_localhost_2000();
    let raw_fd3 = stream3.as_raw_fd();

    let max_fd = raw_fd1.max(raw_fd2.max(raw_fd3));

    println!("Socket 1: {}", raw_fd1);
    println!("Socket 2: {}", raw_fd2);
    println!("Socket 3: {}", raw_fd3);

    fd_set.set(raw_fd1);
    fd_set.set(raw_fd2);
    fd_set.set(raw_fd3);

    match pselect(
        max_fd + 1,
        Some(&mut fd_set),                 // read
        None,                              // write
        None,                              // error
        Some(&make_timespec(ten_seconds)), // timeout
        None,                              // mask
    ) {
        Ok(res) => {
            println!("select result: {}", res);

            let range = std::ops::Range {
                start: 0,
                end: max_fd + 1,
            };
            for i in range {
                if (fd_set).is_set(i) {
                    println!("Socket {} received something!", i);
                }
            }
        }
        Err(err) => {
            println!("Failed to select: {:?}", err);
        }
    }
}
dev  rust 

See also