mio 是 Metal IO,Rust语言生态比较底层的I/O库,官网的介绍:

Mio is a lightweight I/O library for Rust with a focus on adding as little overhead as possible over the OS abstractions.

mio目前已经发布了v0.6.19版本,这次分析代码版本选择 master分支,commit id 14f37f283576040c8763f45de6c2b2bbcb82436d

我们从官方自带的example进行源码跟踪分析。

use std::collections::HashMap;
use std::io::{self, Read, Write};
use std::str::from_utf8;

use mio::event::Event;
use mio::net::{TcpListener, TcpStream};
use mio::{Events, Interests, Poll, Token};

// Setup some tokens to allow us to identify which event is for which socket.
const SERVER: Token = Token(0);

// Some data we'll send over the connection.
const DATA: &[u8] = b"Hello world!\n";

fn main() -> io::Result<()> {
    env_logger::init();

    // Create a poll instance.
    let mut poll = Poll::new()?;
    // Create storage for events.
    let mut events = Events::with_capacity(128);

    // Setup the TCP server socket.
    let addr = "127.0.0.1:13265".parse().unwrap();
    let server = TcpListener::bind(addr)?;

    // Register the server with poll we can receive events for it.
    poll.registry()
        .register(&server, SERVER, Interests::READABLE)?;

    // Map of `Token` -> `TcpStream`.
    let mut connections = HashMap::new();
    // Unique token for each incoming connection.
    let mut unique_token = Token(SERVER.0 + 1);

    println!("You can connect to the server using `nc`:");
    println!(" $ nc 127.0.0.1 13265");
    println!("You'll see our welcome message and anything you type we'll be printed here.");

    loop {
        poll.poll(&mut events, None)?;

        for event in events.iter() {
            match event.token() {
                SERVER => {
                    // Received an event for the TCP server socket.
                    // Accept an connection.
                    let (connection, address) = server.accept()?;
                    println!("Accepted connection from: {}", address);

                    let token = next(&mut unique_token);
                    poll.registry().register(
                        &connection,
                        token,
                        Interests::READABLE.add(Interests::WRITABLE),
                    )?;

                    connections.insert(token, connection);
                }
                token => {
                    // (maybe) received an event for a TCP connection.
                    let done = if let Some(connection) = connections.get_mut(&token) {
                        handle_connection_event(&mut poll, connection, event)?
                    } else {
                        // Sporadic events happen.
                        false
                    };
                    if done {
                        connections.remove(&token);
                    }
                }
            }
        }
    }
}

fn next(current: &mut Token) -> Token {
    let next = current.0;
    current.0 += 1;
    Token(next)
}

/// Returns `true` if the connection is done.
fn handle_connection_event(
    poll: &mut Poll,
    connection: &mut TcpStream,
    event: &Event,
) -> io::Result<bool> {
    if event.is_writable() {
        // We can (maybe) write to the connection.
        match connection.write(DATA) {
            // We want to write the entire `DATA` buffer in a single go. If we
            // write less we'll return a short write error (same as
            // `io::Write::write_all` does).
            Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
            Ok(_) => {
                // After we've written something we'll reregister the connection
                // to only respond to readable events.
                poll.registry()
                    .reregister(&connection, event.token(), Interests::READABLE)?
            }
            // Would block "errors" are the OS's way of saying that the
            // connection is not actually ready to perform this I/O operation.
            Err(ref err) if would_block(err) => {}
            // Got interrupted (how rude!), we'll try again.
            Err(ref err) if interrupted(err) => {
                return handle_connection_event(poll, connection, event)
            }
            // Other errors we'll consider fatal.
            Err(err) => return Err(err),
        }
    }

    if event.is_readable() {
        let mut connection_closed = false;
        let mut received_data = Vec::with_capacity(4096);
        // We can (maybe) read from the connection.
        loop {
            let mut buf = [0; 256];
            match connection.read(&mut buf) {
                Ok(0) => {
                    // Reading 0 bytes means the other side has closed the
                    // connection or is done writing, then so are we.
                    connection_closed = true;
                    break;
                }
                Ok(n) => received_data.extend_from_slice(&buf[..n]),
                // Would block "errors" are the OS's way of saying that the
                // connection is not actually ready to perform this I/O operation.
                Err(ref err) if would_block(err) => break,
                Err(ref err) if interrupted(err) => continue,
                // Other errors we'll consider fatal.
                Err(err) => return Err(err),
            }
        }

        if let Ok(str_buf) = from_utf8(&received_data) {
            println!("Received data: {}", str_buf.trim_end());
        } else {
            println!("Received (none UTF-8) data: {:?}", &received_data);
        }

        if connection_closed {
            println!("Connection closed");
            return Ok(true);
        }
    }

    Ok(false)
}

fn would_block(err: &io::Error) -> bool {
    err.kind() == io::ErrorKind::WouldBlock
}

fn interrupted(err: &io::Error) -> bool {
    err.kind() == io::ErrorKind::Interrupted
}

main()方法体开始看:

Poll::new()

    pub fn new() -> io::Result<Poll> {
        sys::Selector::new().map(|selector| Poll {
            registry: Registry { selector },
        })
    }

看一下Poll的结构体

pub struct Poll {
    registry: Registry,
}

/// Registers I/O resources.
pub struct Registry {
    selector: sys::Selector,
}

sys::Selector 在不同的操作系统上有不同的实现:

/// `Poll` is backed by the selector provided by the operating system.
///
/// |      OS       |  Selector |
/// |---------------|-----------|
/// | Android       | [epoll]   |
/// | DragonFly BSD | [kqueue]  |
/// | FreeBSD       | [kqueue]  |
/// | Linux         | [epoll]   |
/// | NetBSD        | [kqueue]  |
/// | OpenBSD       | [kqueue]  |
/// | Solaris       | [epoll]   |
/// | Windows       | [IOCP]    |
/// | iOS           | [kqueue]  |
/// | macOS         | [kqueue]  |
///

我们挑选Linux的epoll跟踪。源文件在/src/sys/unix/epoll.rs

impl Selector {
    pub fn new() -> io::Result<Selector> {
        // According to libuv `EPOLL_CLOEXEC` is not defined on Android API <
        // 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on all platforms,
        // so we use that instead.
        syscall!(epoll_create1(libc::O_CLOEXEC)).map(|ep| Selector {
            #[cfg(debug_assertions)]
            id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
            ep,
        })
    }
  
...  
}

这段代码调用了linux的api epoll_create1() 该接口返回一个int值,表示指向epoll实例的文件描述符。就是代码中的ep。还涉及到id 自增。 NEXT_ID 的定义:

/// Unique id for use as `SelectorId`.
#[cfg(debug_assertions)]
static NEXT_ID: AtomicUsize = AtomicUsize::new(1);

看一下syscall!宏。定义的文件在`/src/sys/unix/mod.rs

// Macro must be defined before any modules that uses them.
macro_rules! syscall {
    ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{
        let res = unsafe { libc::$fn($($arg, )*) };
        if res == -1 {
            Err(io::Error::last_os_error())
        } else {
            Ok(res)
        }
    }};
}

就是指定函数名和实际参数,调用libc下的函数。

TcpListener

接着是 let server = TcpListener::bind(addr)?;看一看这个bind方法

    /// 1. Create a new TCP socket.
    /// 2. Set the `SO_REUSEADDR` option on the socket on Unix.
    /// 3. Bind the socket to the specified address.
    /// 4. Calls `listen` on the socket to prepare it to receive new connections.
		pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
        sys::TcpListener::bind(addr).map(|sys| TcpListener {
            sys,
            #[cfg(debug_assertions)]
            selector_id: SelectorId::new(),
        })
    }

注释说明该方法完成了socket编程的4个步骤。由于是调用sys::TcpListener::bind(),跟进去看

    pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
        new_ip_socket(addr, libc::SOCK_STREAM).and_then(|socket| {
            // Set SO_REUSEADDR (mirrors what libstd does).
            syscall!(setsockopt(
                socket,
                libc::SOL_SOCKET,
                libc::SO_REUSEADDR,
                &1 as *const libc::c_int as *const libc::c_void,
                size_of::<libc::c_int>() as libc::socklen_t,
            ))
            .and_then(|_| {
                let (raw_addr, raw_addr_length) = socket_addr(&addr);
                syscall!(bind(socket, raw_addr, raw_addr_length))
            })
            .and_then(|_| syscall!(listen(socket, 1024)))
            .map_err(|err| {
                // Close the socket if we hit an error, ignoring the error
                // from closing since we can't pass back two errors.
                let _ = unsafe { libc::close(socket) };
                err
            })
            .map(|_| TcpListener {
                inner: unsafe { net::TcpListener::from_raw_fd(socket) },
            })
        })
    }

回到主方法继续看

poll.registry().register()

    pub fn registry(&self) -> &Registry {
        &self.registry
    }

真正完成register动作的是&self.registry

    pub fn register<S>(&self, source: &S, token: Token, interests: Interests) -> io::Result<()>
    where
        S: event::Source + ?Sized,
    {
        trace!(
            "registering event source with poller: token={:?}, interests={:?}",
            token,
            interests
        );
        source.register(self, token, interests)
    }

又调用了source.register(),看一看source trait 的定义:

pub trait Source {   
		/// Register `self` with the given `Registry` instance.
    ///
    /// This function should not be called directly. Use [`Registry::register`]
    /// instead. Implementors should handle registration by delegating the call
    /// to another `Source` type.
    ///
    /// [`Registry::register`]: crate::Registry::register
    fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()>;

    /// Re-register `self` with the given `Registry` instance.
    ///
    /// This function should not be called directly. Use
    /// [`Registry::reregister`] instead. Implementors should handle
    /// re-registration by either delegating the call to another `Source` type.
    ///
    /// [`Registry::reregister`]: crate::Registry::reregister
    fn reregister(&self, registry: &Registry, token: Token, interests: Interests)
        -> io::Result<()>;

    /// Deregister `self` from the given `Registry` instance.
    ///
    /// This function should not be called directly. Use
    /// [`Registry::deregister`] instead. Implementors should handle
    /// deregistration by delegating the call to another `Source` type.
    ///
    /// [`Registry::deregister`]: crate::Registry::deregister
    fn deregister(&self, registry: &Registry) -> io::Result<()>;
}

三个方法,注册、再次注册、反注册。

example里传入的 source&server:TcpListener

impl event::Source for TcpListener {
    fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
        #[cfg(debug_assertions)]
        self.selector_id.associate_selector(registry)?;
        self.sys.register(registry, token, interests)
    }

    fn reregister(
        &self,
        registry: &Registry,
        token: Token,
        interests: Interests,
    ) -> io::Result<()> {
        self.sys.reregister(registry, token, interests)
    }

    fn deregister(&self, registry: &Registry) -> io::Result<()> {
        self.sys.deregister(registry)
    }
}

其中register()主要涉及到:

self.selector_id.associate_selector(registry)?;
self.sys.register(registry, token, interests)

先看第一个。

impl SelectorId {
    pub fn new() -> SelectorId {
        SelectorId {
            id: AtomicUsize::new(0),
        }
    }

    pub fn associate_selector(&self, registry: &Registry) -> io::Result<()> {
        let selector_id = self.id.load(Ordering::SeqCst);

        if selector_id != 0 && selector_id != registry.selector.id() {
            Err(io::Error::new(
                io::ErrorKind::Other,
                "socket already registered",
            ))
        } else {
            self.id.store(registry.selector.id(), Ordering::SeqCst);
            Ok(())
        }
    }
}

这里就是把registry.selector.id() 赋值给SelectorId.id。注意后者是AtomicUsize

看第二个。看看sys的类型:

pub struct TcpListener {
    sys: sys::TcpListener,
    #[cfg(debug_assertions)]
    selector_id: SelectorId,
}

sys::TcpListenerregister方法:

    fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
        SourceFd(&self.as_raw_fd()).register(registry, token, interests)
    }

SourceFd()

先看SourceFd() 的第一个入参&self.as_raw_fd()

impl AsRawFd for TcpListener {
    fn as_raw_fd(&self) -> RawFd {
        self.inner.as_raw_fd()
    }
}

这里的innernet::TcpListener类型。跟踪到std::net::TcpListner的实现:

#[stable(feature = "rust1", since = "1.0.0")]
impl AsRawFd for net::TcpListener {
    fn as_raw_fd(&self) -> RawFd { *self.as_inner().socket().as_inner() }
}
impl AsInner<net_imp::TcpListener> for TcpListener {
    fn as_inner(&self) -> &net_imp::TcpListener { &self.0 }
}

注意到 use crate::sys_common::net as net_imp;

这里稍微梳理一下TcpListener的依赖关系 。 mio提供的TcpListener -->sys::TcpListener --> net::TcpListener -->sys_common::net::TcpListener

sys_common::net::TcpListenersocket() 方法

    pub fn socket(&self) -> &Socket { &self.inner }

std库的/src/libstd/sys/unix/net.rs找到socket的定义:

pub struct Socket(FileDesc);
impl AsInner<c_int> for Socket {
    fn as_inner(&self) -> &c_int { self.0.as_inner() }
}
pub struct FileDesc {
    fd: c_int,
}
impl AsInner<c_int> for FileDesc {
    fn as_inner(&self) -> &c_int { &self.fd }
}

回到SourceFd(&self.as_raw_fd()).register(registry, token, interests)

/// Adapter for [`RawFd`] providing an [`event::Source`] implementation.
///
/// `SourceFd` enables registering any type with an FD with [`Poll`].
///
/// While only implementations for TCP and UDP are provided, Mio supports
/// registering any FD that can be registered with the underlying OS selector.
/// `SourceFd` provides the necessary bridge.
/// ...

pub struct SourceFd<'a>(pub &'a RawFd);

impl<'a> event::Source for SourceFd<'a> {
    fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
        poll::selector(registry).register(*self.0, token, interests)
    }

    fn reregister(
        &self,
        registry: &Registry,
        token: Token,
        interests: Interests,
    ) -> io::Result<()> {
        poll::selector(registry).reregister(*self.0, token, interests)
    }

    fn deregister(&self, registry: &Registry) -> io::Result<()> {
        poll::selector(registry).deregister(*self.0)
    }
}

SourceFd 也实现了event::Source trait。注释中说明很详细,SourceFd 主要是做一个桥接,使得可以注册一个文件描述符到系统的selector

为什么要这么绕?因为sys::selector 需要一个文件描述符参数。

poll::selector()方法的定义:

// ===== Accessors for internal usage =====

pub fn selector(registry: &Registry) -> &sys::Selector {
    &registry.selector
}

通过前面的代码我们知道&registry.selectorsys::Selector。跟进去看看

    pub fn register(&self, fd: RawFd, token: Token, interests: Interests) -> io::Result<()> {
        let mut event = libc::epoll_event {
            events: interests_to_epoll(interests),
            u64: usize::from(token) as u64,
        };

        syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ())
    }

最终是调用linux系统的

       int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

其中的参数

self.epsys::Selector::new()方法里调用int epoll_create1(int flags);返回的int值,是表示epoll实例的文件描述符。

op 参数传入的是libc::EPOLL_CTL_ADD表示本次操作是add一个fd到interest list

fd 就是刚才分析的SourceFd持有的RawFd

epoll_event * 参数传入了&mut eventlibc::epoll_event 结构体。

pub struct epoll_event {
    pub events: uint32_t,
    pub u64: uint64_t,
}

Linux 结构体相关定义:

          typedef union epoll_data {
               void        *ptr;
               int          fd;
               uint32_t     u32;
               uint64_t     u64;
           } epoll_data_t;

           struct epoll_event {
               uint32_t     events;      /* Epoll events */
               epoll_data_t data;        /* User data variable */
           };

uint32_t的bit位表示事件类型,和epoll_data_t 存放用户的自定义参数,这里是Token

看一下向系统注册了什么interests

fn interests_to_epoll(interests: Interests) -> u32 {
    let mut kind = EPOLLET;

    if interests.is_readable() {
        kind = kind | EPOLLIN | EPOLLRDHUP;
    }

    if interests.is_writable() {
        kind |= EPOLLOUT;
    }

    kind as u32
}

EPOLLET 表示设置 Edge Triggered 边缘触发模式。

epoll 默认采用Level Triggered水平触发模式。关于两种模式更详细的介绍,可以参考 http://man7.org/linux/man-pages/man7/epoll.7.html

最开始的example里,interests的实际参数是Interests::READABLE所以会走到:

   	if interests.is_readable() {
       kind = kind | EPOLLIN | EPOLLRDHUP;
   }

这里还添加了EPOLLIN | EPOLLRDHUP 事件。

EPOLLIN就是文件处于可读性状态,EPOLLRDHUP表示socket节点关闭了连接,或者关闭了写连接(TCP是双工模式,任何一方都可以同时读写,所以任何一方也可以只关闭读或者写的连接状态)。

从这里我们知道,除了文件可读,文件关闭也会触发就绪事件。

跟踪到这里,我们总算看到了将TcpListener关联的socket注册到了epollinterest list

poll.poll()

总算进入了核心方法

poll.poll(&mut events, None)?;
 		/// Wait for readiness events
    ///
    /// Blocks the current thread and waits for readiness events for any of the
    /// [`event::Source`]s that have been registered with this `Poll` instance.
    /// The function will block until either at least one readiness event has
    /// been received or `timeout` has elapsed. A `timeout` of `None` means that
    /// `poll` will block until a readiness event has been received.
    ///
		pub fn poll(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
        self.registry.selector.select(events.sys(), timeout)
    }

注释里说明,如果没有就绪事件的话,该方法阻塞当前线程。timeout的实参是None,表示该方法会一直阻塞直到有事件到达。所以外面用一个loop{}循环包裹该方法。

跟进去真正干活的的select()方法

    pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
        let timeout = timeout
            .map(|to| cmp::min(to.as_millis(), libc::c_int::max_value() as u128) as libc::c_int)
            .unwrap_or(-1);

        events.clear();
        syscall!(epoll_wait(
            self.ep,
            events.as_mut_ptr(),
            events.capacity() as i32,
            timeout,
        ))
        .map(|n_events| {
            // This is safe because `epoll_wait` ensures that `n_events` are
            // assigned.
            unsafe { events.set_len(n_events as usize) };
        })
    }

events.clear()然后调用系统的epoll_wait。如果有就绪事件,返回事件个数,并且将事件设置在events里。系统的epoll_wait

int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);

event consume

回到example的代码,如果有客户端连接到server,代码中的 events 被赋值,

for event in events.iter() {
            match event.token() {
                SERVER => {
                  ...
              },
              	token =>{
                  ...
              }
  }

通过匹配event.token()进入SERVER 的代码块(一开始注册的token就是SERVER )。看里面的逻辑:

                   let (connection, address) = server.accept()?;
                    println!("Accepted connection from: {}", address);

                    let token = next(&mut unique_token);
                    poll.registry().register(
                        &connection,
                        token,
                        Interests::READABLE.add(Interests::WRITABLE),
                    )?;

                    connections.insert(token, connection);
  1. 通过server.accept()获取connection实例和客户端address
  2. 将这个新的connection注册到poll
  3. connection插入HashMap方便后面使用

一个一个看。 先看TcpListeneraccept():

    pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
        self.inner.accept().and_then(|(inner, addr)| {
            inner
                .set_nonblocking(true)
                .map(|()| (TcpStream::new(inner), addr))
        })
    }

跟进去看它的inner(net::TcpListener)的accept() 方法:

		#[stable(feature = "rust1", since = "1.0.0")]
    pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
        // On WASM, `TcpStream` is uninhabited (as it's unsupported) and so
        // the `a` variable here is technically unused.
        #[cfg_attr(target_arch = "wasm32", allow(unused_variables))]
        self.0.accept().map(|(a, b)| (TcpStream(a), b))
    }

又会像之前一个进入sys_common::net::TcpListener 的方法,不再重复跟踪。

创建一个新的token,然后注册到Poll。注意register() 方法的参数,event::Source&connection:TcpStream

InterestsInterests::READABLE.add(Interests::WRITABLE)可读就绪与可写就绪。看一看TcpStreamevent::Source的实现:

impl event::Source for TcpStream {
    fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
        #[cfg(debug_assertions)]
        self.selector_id.associate_selector(registry)?;
        self.sys.register(registry, token, interests)
    }

    fn reregister(
        &self,
        registry: &Registry,
        token: Token,
        interests: Interests,
    ) -> io::Result<()> {
        self.sys.reregister(registry, token, interests)
    }

    fn deregister(&self, registry: &Registry) -> io::Result<()> {
        self.sys.deregister(registry)
    }
}

register()方法里,设置了self.selector_id。通过self.sys.register()注册到selector

pub struct TcpStream {
    sys: sys::TcpStream,
    #[cfg(debug_assertions)]
    selector_id: SelectorId,
}

跟进去sys::TcpStream::register()

impl event::Source for TcpStream {
    fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
        SourceFd(&self.as_raw_fd()).register(registry, token, interests)
    }

    fn reregister(
        &self,
        registry: &Registry,
        token: Token,
        interests: Interests,
    ) -> io::Result<()> {
        SourceFd(&self.as_raw_fd()).reregister(registry, token, interests)
    }

    fn deregister(&self, registry: &Registry) -> io::Result<()> {
        SourceFd(&self.as_raw_fd()).deregister(registry)
    }
}

进入了SourceFd(),上面已经分析过,不再重复。

connection插入HashMap方便后面使用,不需要解释。

通过将TcpStream注册到Poll,一旦连接建立稳定,将触发读就绪和写就绪事件。通过event.token() 分流到下面的代码:

                token => {
                    // (maybe) received an event for a TCP connection.
                    let done = if let Some(connection) = connections.get_mut(&token) {
                        handle_connection_event(&mut poll, connection, event)?
                    } else {
                        // Sporadic events happen.
                        false
                    };
                    if done {
                        connections.remove(&token);
                    }
                }

if let Some(connection) = connections.get_mut(&token)通过&tokenHashMap获取保存的connection然后进入handle_connection_event(),该方法判断event.is_writable()event.is_readable()然后分别处理。对一个TcpStream哪一个事件会先到达?不确定。代码中的两个if没有先后的依赖关系。

/// Returns `true` if the connection is done.
fn handle_connection_event(
    poll: &mut Poll,
    connection: &mut TcpStream,
    event: &Event,
) -> io::Result<bool> {
    if event.is_writable() {
      ...
    }

    if event.is_readable() {
      ...
    }

    Ok(false)
}

我们从if event.is_writable()

		if event.is_writable() {
        // We can (maybe) write to the connection.
        match connection.write(DATA) {
            // We want to write the entire `DATA` buffer in a single go. If we
            // write less we'll return a short write error (same as
            // `io::Write::write_all` does).
            Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
            Ok(_) => {
                // After we've written something we'll reregister the connection
                // to only respond to readable events.
                poll.registry()
                    .reregister(&connection, event.token(), Interests::READABLE)?
            }
            // Would block "errors" are the OS's way of saying that the
            // connection is not actually ready to perform this I/O operation.
            Err(ref err) if would_block(err) => {}
            // Got interrupted (how rude!), we'll try again.
            Err(ref err) if interrupted(err) => {
                return handle_connection_event(poll, connection, event)
            }
            // Other errors we'll consider fatal.
            Err(err) => return Err(err),
        }
    }

如果可写就绪,就立即connection.write(DATA)。客户端会收到"Hello world!\n"字符,该方法的签名

fn write(&mut self, buf: &[u8]) -> io::Result<usize>;

如果写入字节长度小于DATA.len()就返回Err。

如果写入成功,重新注册该connection,只关心Interests::READABLE可读就绪事件。

                // After we've written something we'll reregister the connection
                // to only respond to readable events.
                poll.registry()
                    .reregister(&connection, event.token(), Interests::READABLE)?

如果返回的错误是阻塞则什么也不做。

如果返回的是被打断,则递归调用handle_connection_event()

接着看if event.is_readable()

    if event.is_readable() {
        let mut connection_closed = false;
        let mut received_data = Vec::with_capacity(4096);
        // We can (maybe) read from the connection.
        loop {
            let mut buf = [0; 256];
            match connection.read(&mut buf) {
                Ok(0) => {
                    // Reading 0 bytes means the other side has closed the
                    // connection or is done writing, then so are we.
                    connection_closed = true;
                    break;
                }
                Ok(n) => received_data.extend_from_slice(&buf[..n]),
                // Would block "errors" are the OS's way of saying that the
                // connection is not actually ready to perform this I/O operation.
                Err(ref err) if would_block(err) => break,
                Err(ref err) if interrupted(err) => continue,
                // Other errors we'll consider fatal.
                Err(err) => return Err(err),
            }
        }

        if let Ok(str_buf) = from_utf8(&received_data) {
            println!("Received data: {}", str_buf.trim_end());
        } else {
            println!("Received (none UTF-8) data: {:?}", &received_data);
        }

        if connection_closed {
            println!("Connection closed");
            return Ok(true);
        }
    }

有一个loop {},其中 connection.read(&mut buf) 方法的签名:

    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        (&self.sys).read(buf)
    }

如果返回结果为0,表示全部读取完毕。connection_closed = true;对方已经关闭连接。

如果返回n,说明读取到客户端的数据,将数据追加到received_data

如果是阻塞,则跳出loop循环。

如果是被打断,则继续循环。

将读取到的数据打印出来,如果connection_closed = true;则返回Ok(true),否则后面会默认返回 Ok(false)

大部分情况下,可读就绪,但是客户端没有任何数据输入会走阻塞,跳出loop那个分支。

回到外层的方法:

                    
                    let done = ... {
                        handle_connection_event(&mut poll, connection, event)?
                    } else {
                        // Sporadic events happen.
                        false
                    };
                    if done {
                        connections.remove(&token);
                    }

如果刚才返回了Ok(true),会connections.remove(&token);

Waker

上面的分析没涉及到的,有一个/src/waker.rs需要提一下。Waker允许跨线程唤醒Poll

#[derive(Debug)]
pub struct Waker {
    inner: sys::Waker,
}

impl Waker {
    /// Create a new `Waker`.
    pub fn new(registry: &Registry, token: Token) -> io::Result<Waker> {
        sys::Waker::new(poll::selector(&registry), token).map(|inner| Waker { inner })
    }

    /// Wake up the [`Poll`] associated with this `Waker`.
    ///
    /// [`Poll`]: crate::Poll
    pub fn wake(&self) -> io::Result<()> {
        self.inner.wake()
    }
}

定义比较简单,也就两个方法。我们看看Linux下的实现:

#[cfg(any(target_os = "linux", target_os = "android"))]
mod eventfd {
   use crate::sys::Selector;
   use crate::{Interests, Token};

   use std::fs::File;
   use std::io::{self, Read, Write};
   use std::os::unix::io::FromRawFd;

   /// Waker backed by `eventfd`.
   ///
   /// `eventfd` is effectively an 64 bit counter. All writes must be of 8
   /// bytes (64 bits) and are converted (native endian) into an 64 bit
   /// unsigned integer and added to the count. Reads must also be 8 bytes and
   /// reset the count to 0, returning the count.
   #[derive(Debug)]
   pub struct Waker {
       fd: File,
   }

   impl Waker {
       pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
           syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK)).and_then(|fd| {
               // Turn the file descriptor into a file first so we're ensured
               // it's closed when dropped, e.g. when register below fails.
               let file = unsafe { File::from_raw_fd(fd) };
               selector
                   .register(fd, token, Interests::READABLE)
                   .map(|()| Waker { fd: file })
           })
       }

       pub fn wake(&self) -> io::Result<()> {
           let buf: [u8; 8] = 1u64.to_ne_bytes();
           match (&self.fd).write(&buf) {
               Ok(_) => Ok(()),
               Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
                   // Writing only blocks if the counter is going to overflow.
                   // So we'll reset the counter to 0 and wake it again.
                   self.reset()?;
                   self.wake()
               }
               Err(err) => Err(err),
           }
       }

       /// Reset the eventfd object, only need to call this if `wake` fails.
       fn reset(&self) -> io::Result<()> {
           let mut buf: [u8; 8] = 0u64.to_ne_bytes();
           match (&self.fd).read(&mut buf) {
               Ok(_) => Ok(()),
               // If the `Waker` hasn't been awoken yet this will return a
               // `WouldBlock` error which we can safely ignore.
               Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()),
               Err(err) => Err(err),
           }
       }
   }
}

#[cfg(any(target_os = "linux", target_os = "android"))]
pub use self::eventfd::Waker;

可以看到,Wakernew()是调用了系统的eventfd()方法。

 int eventfd(unsigned int initval, int flags);

eventfd()方法创建一个文件描述符用于事件(event)通知。既可以用于用户空间的应用间通知,还可以用于内核空间事件通知用户空间的应用。实际上是内核空间维护的一个64位的int counter,表示“eventfd object”。

initval参数是初始化的值,flags 包括:

EFD_CLOEXEC表示fork子进程时不继承。

EFD_NONBLOCK通常会设置成O_NONBLOCK,如果不设置,read可能会阻塞。

EFD_SEMAPHORE支持semophore语义的read。

这个"eventfd object"的相关操作很简单,write()会修改counter的值(累加),read()读取counter的值,然后清零(如果是semophore模式,就减1)。

上面的源码中,通过eventfd()创建了fd,然后将该fd注册到了sys::selector(实际上就是epoll),Interests是可读就绪。

                selector
                    .register(fd, token, Interests::READABLE)
                    .map(|()| Waker { fd: file })

接着看它的wake()

        pub fn wake(&self) -> io::Result<()> {
            let buf: [u8; 8] = 1u64.to_ne_bytes();
            match (&self.fd).write(&buf) {
                Ok(_) => Ok(()),
                Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
                    // Writing only blocks if the counter is going to overflow.
                    // So we'll reset the counter to 0 and wake it again.
                    self.reset()?;
                    self.wake()
                }
                Err(err) => Err(err),
            }
        }

就是很简单地写入一个64位的1。会被加到当前的 counter上。

write(&buf)的结果正常都会 OK。但是当超过了counter的最大值0xfffffffffffffffe会导致int_64溢出,本次写操作会阻塞直到有读操作,如果文件设置了非阻塞(上面源码设置的libc::EFD_NONBLOCK),会返回EAGAIN的错误。

这里只处理了io::ErrorKind::WouldBlock的错误,是不是存在问题?

并不是。按照POSIX的标准,Linux里面对错误的定义是:

​ All the error names specified by POSIX.1 must have distinct values, ​ with the exception of EAGAIN and EWOULDBLOCK, which may be the same. ​ On Linux, these two have the same value on all architectures.

是同一个错误值,而Rust目前只定义了io::ErrorKind::WouldBlock来统一表示EWOULDBLOCKEAGAIN,在有些场景下会引发混乱。

freeBSDmacOS系统上,通过kqueue()kevent()实现Waker

openbsdsolaris系统上通过pipe实现Waker

windows 环境下通过 IOCPI/O Completion Port)实现。

最后

本文对mio的核心流程做了一个概要性分析,由于只关注核心流程,很多细节并没有展开。

mio的代码整体上小巧精悍,大都是对底层操作系统的很薄的一层包装。这也符合自身的定位。

with a focus on adding as little overhead as possible

核心逻辑围绕sys::selectorevent::source展开。Poll是一个Facade Pattern

本次分析没有涉及到udp, 可自行分析,代码在 /src/net/udp.rs

window平台相关代码在/src/sys/windows

分析完mio,也就为分析tokio代码打好了基础。