Tokio 源码解析

2022-12-10 rust language

Runtime

在使用时需要提供异步运行时环境,创建的任务会在这个环境里运行,可以选择单线程或者多线程,而且可以同时存在。对于 IO 类型任务建议使用多线程,但是多线程间的通信会变的复杂,从而加重了线程间切换的开销,可能导致某些场景下性能可能降低。

用户态的协程实现是基于协作时的调度策略,详细可以参考 Making the Tokio scheduler 10x faster 中的介绍。

在代码内部,会通过 scheduler::Handle::current() 获取当前 Handle 实现,相关代码调用逻辑如下。

// runtime/scheduler/mod.rs
use crate::runtime::context;
impl Handle {
    pub(crate) fn current() -> Handle {
        match context::with_current(Clone::clone) {
            Ok(handle) => handle,
            Err(e) => panic!("{}", e),
        }
    }
}

// runtime/context/current.rs
pub(crate) fn with_current<F, R>(f: F) -> Result<R, TryCurrentError>
where F: FnOnce(&scheduler::Handle) -> R,
{
    match CONTEXT.try_with(|ctx| ctx.current.handle.borrow().as_ref().map(f)) {
        Ok(Some(ret)) => Ok(ret),
        Ok(None) => Err(TryCurrentError::new_no_context()),
        Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()),
    }
}

其中 CONTEXT 是一个 TLS 变量,会在 runtime.enter()[runtime/handle.rs] 中通过 context::try_set_current() 设置指定的运行时,包括了如下几种:

  • CurrentThread 只在当前线程执行。
  • MultiThread 允许在多个线程中执行,不过是空结构体,只作为标识。

如下以 MultiThread 为例,从 Builder 开始,其构建过程的部分代码梳理如下。

// runtime/builder.rs
impl Builder {
    pub fn new_multi_thread() -> Builder {
        Builder::new(Kind::MultiThread, 61)
    }

    pub fn enable_all(&mut self) -> &mut Self {
        self.enable_io();
        self.enable_time();
        self
    }

    pub fn build(&mut self) -> io::Result<Runtime> {
        match &self.kind {
            Kind::CurrentThread => self.build_current_thread_runtime(),
            #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
            Kind::MultiThread => self.build_threaded_runtime(),
            #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
            Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
        }
    }
}
// runtime/runtime.rs  build_threaded_runtime()[runtime/builder.rs] 中创建 
pub struct Runtime {
    scheduler: Scheduler,  //  build_threaded_runtime() 中通过 enum Scheduler[runtime/runtime.rs] 进行了简单封装,用于区分不同的运行态
    handle: Handle,  //  build_threaded_runtime() 中对 MultiThread::new()[multi/mod.rs] 创建对象进行封装
    blocking_pool: BlockingPool,
}
pub(super) enum Scheduler {
    CurrentThread(CurrentThread),
    MultiThread(MultiThread),  // 内部封装内容在 build_threaded_runtime() 中通过 MultiThread::new()[multi/mod.rs] 创建
    MultiThreadAlt(MultiThreadAlt),
}
// runtime/handle.rs
pub struct Handle {
    pub(crate) inner: scheduler::Handle,
}
// runtime/scheduler/mod.rs
pub(crate) enum Handle {
    CurrentThread(Arc<current_thread::Handle>),
    MultiThread(Arc<multi_thread::Handle>),
    MultiThreadAlt(Arc<multi_thread_alt::Handle>),
    Disabled,
}

如上,Runtime 实际上经过了几次封装,最终才到了 MultiThread 的实现。

调用流程如下,在 build_threaded_runtime() 里有个 enter() + launch() 操作,这里会启动线程开始执行

Builder::build_threaded_runtime()[runtime/builder.rs]
 |-driver::Driver::new()[runtime/driver.rs]
 |-blocking::create_blocking_pool()[runtime/blocking/mod.rs]
 | |-BlockingPool::new()[runtime/blocking/pool.rs]
 |-MultiThread::new()[runtime/scheduler/multi_thread/mod.rs]
 | |-Parker::new()[runtime/scheduler/multi_thread/park.rs]
 | |-worker::create()[runtime/scheduler/multi_thread/worker.rs] 这里会新建Core数量线程并启动
 |   |-queue::local()[runtime/scheduler/multi_thread/queue.rs] 创建本地队列
 |   |-OwnedTasks::new()
 |-scheduler::Handle::MultiThread()[runtime/scheduler/multi_thread/mod.rs] 这里会新建一个Handle对象
 |-Handle::enter()[runtime/handle.rs]
 | |-try_set_current()[runtime/context/current.rs] 设置运行上下文
 |-Launch::launch()
 | |-runtime::spawn_blocking() 这里会遍历 drain 所有的 workers 执行该函数,
 |-Runtime::from_parts()[runtime/runtime.rs] 这里会通过Scheduler

OwnedTasks/LocalOwnedTasks::bind()[runtime/task/list.rs]
new_task()[runtime/task/mod.rs] 这里会对 RawTask 进行封装,生成 Task、Notified、JoinHandle 几个变量
 |-RawTask::new()[runtime/task/raw.rs]
 | |-Cell::new()[runtime/task/core.rs] 在 Cell 中会包含 Header、Core、Trailer 三个核心部分,同样包含了 poll() 的实现
 |-JoinHandle::new()[runtime/task/join.rs]


这里的 Core::poll() 会执行 Future 的 poll 操作,
修改状态时通过 Harness Lifecycle 进行加锁。
Core.poll() runtime/task/core.rs
pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {

如上,对于 MultiThread 来说,通过 scheduler::Handle::current() 返回的就是 runtime/scheduler/multi_thread/handle.rs 中的 Handle 实现了,一般会再调用 handle.driver().io() handle.driver().signal() 等函数实现,这样又会返回到 runtime/driver.rs 中的 Handle 实现。

// src/runtime/scheduler/mod.rs
#[derive(Debug, Clone)]
pub(crate) enum Handle {
    #[cfg(feature = "rt")]
    CurrentThread(Arc<current_thread::Handle>),

    #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
    MultiThread(Arc<multi_thread::Handle>),

    #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
    MultiThreadAlt(Arc<multi_thread_alt::Handle>),

    #[cfg(not(feature = "rt"))]
    #[allow(dead_code)]
    Disabled,
}

// src/runtime/drivers.rs
#[derive(Debug)]
pub(crate) struct Handle {
    /// IO driver handle
    pub(crate) io: IoHandle,

    /// Signal driver handle
    #[cfg_attr(any(not(unix), loom), allow(dead_code))]
    pub(crate) signal: SignalHandle,

    /// Time driver handle
    pub(crate) time: TimeHandle,

    /// Source of `Instant::now()`
    #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))]
    pub(crate) clock: Clock,
}

这里的 MultiThread 是一个空结构体,不占用任何内存空间,通常作为标识、占位使用。

内核退出则是通过 Drop Runtime 来实现的。

Runtime::drop()[runtime/runtime.rs]
 |-MultiThread::shutdown()[runtime/scheduler/multi_thread/mod.rs] 会根据Scheduler分别进行调度
   |-Handle::shutdown()[runtime/scheduler/multi_thread/handle.rs]
     |-Handle::close()[runtime/scheduler/multi_thread/worker.rs]
       |-Shared::close()[runtime/scheduler/inject/shared.rs] 会将队列设置为 closed
       |-Handle::notify_all()[runtime/scheduler/multi_thread/worker.rs] 通过调用 remotes 执行 unpark 参数

关闭实际上是就是 runtime 已经不在对应生命周期内,会自动调用 drop() 接口,

Spawn

如上有两种 spawn() 方式创建任务,分别通过 tokio 以及 Runtime 库,其中前者在 tokio/src/lib.rs 中有如下配置。

pub mod task;
cfg_rt! {
    pub use task::spawn;
}

也就是实际实现在 src/task/spawn.rs 代码中。

task::spawn()[task/spawn.rs] 这里会包含真正的 Future 实现
 |-task::spawn_inner()
   |-Handle::spawn() 这里会在 Context 上下文中执行

后者的实现如下,最后殊途同归,这里以 MultiThread 为例。

runtime/task/list.rs
pub(crate) struct OwnedTasks<S: 'static> {
    list: List<S>,
    pub(crate) id: NonZeroU64,
    closed: AtomicBool,
}
type List<S> = sharded_list::ShardedList<Task<S>, <Task<S> as Link>::Target>;

util/sharded_list.rs
pub(crate) struct ShardedList<L, T> {
    lists: Box<[Mutex<LinkedList<L, T>>]>,  这里是根据Core创建的数组,数组中是List
    count: AtomicUsize,
    shard_mask: usize,
}
pub(crate) struct ShardGuard<'a, L, T> {
    lock: MutexGuard<'a, LinkedList<L, T>>,
    count: &'a AtomicUsize,
    id: usize,
}

MultiThread::Handle::spawn()[multi/handle.rs]
 |-MultiThread::Handle::bind_new_task() 这里会返回 JoinHandle
   |-OwnedTasks::bind()[runtime/task/list.rs] 会通过 me.shared.owned.bind() 函数调用
   | |-super::new_task()[runtime/task/mod.rs] 这里就是真正创建任务的过程了,返回的是 Notified 创建的对象
   | | |-RawTask::new() runtime/task/raw.rs
   | |-OwnedTasks::bind_inner()[runtime/task/list.rs] 会检查是否已经关闭
   |   |-ShardedList::lock_shard()[util/sharded_list.rs] 这里会返回 ShardGuard 实现
   |   |-ShardGuard::push()
   |-MultiThread::Handle::schedule_option_task_without_yield()[runtime/task/list.rs] 这里对任务进行调度
     |-MulitThread::Handle::schedule_task()[multi/worker.rs] 这里会在 with_current 中获取 Context 进行调度
       |-Handle::schedule_task()[multi/worker.rs] 这里会在 with_current 中获取 Context 进行调度
       |-Handle::schedule_local() 在本线程内创建
       |-Handle::push_remote_task() 否则添加到全局任务队列中
       |-Handle::notify_parked_remote()

Handle::spawn_thread()[runtime/handle.rs] 这里应该是多线程创建运行线程的实现

Park/Unpark

#[derive(Debug)]
pub(crate) struct ParkThread {
    inner: Arc<Inner>,
}

/// Unblocks a thread that was blocked by `ParkThread`.
#[derive(Clone, Debug)]
pub(crate) struct UnparkThread {
    inner: Arc<Inner>,
}

#[derive(Debug)]
struct Inner {
    state: AtomicUsize,
    mutex: Mutex<()>,
    condvar: Condvar,
}

#[derive(Debug)]
pub(crate) struct CachedParkThread {
    _anchor: PhantomData<Rc<()>>,
}

实际上就是封装了一个 Condvar 实现内核级别的阻塞、唤醒。

Future

之前已经介绍过,在类似 Tokio 这种的运行态中,需要实现对应的 LeafFuture 才行,如下是一个及其简单的 Future 实现,可以在 Tokio 中运行的简单实现,当超过指定时间之后输出 Hello World 信息。

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello World!!!");
            Poll::Ready("done")
        } else {
            // Ignore this line for now.
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(1000);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

注意,此时会跑满单个 CPU 资源。

LeafFuture

当在代码中调用 let mut stream = TcpStream::connect(&addr).await?; 时,最终会生成类似如下的实现。

let mut stream = {
    let mut future = TcpStream::connect(&addr);
    loop {
        match future.poll() {
            Ok(Async::Ready(e)) => break Ok(e),
            Ok(Async::NotReady) => yield,
            Err(e) => break Err(e),
        }
    }
}?;

这里的 TcpStream 就是 Tokio 需要实现的底层 Future 代码。

IO

// src/net/tcp/stream.rs
pub struct TcpStream {
    io: PollEvented<mio::net::TcpStream>,
}
async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
}

// src/io/poll_evented.rs
pub(crate) struct PollEvented<E: Source> {
    io: Option<E>,
    registration: Registration,
}

在创建 IO 时,如 TcpListener accept 到一个 fd,那么这个 fd 会被包装为 TcpStream。TcpStream 内其实是一个 PollEvented 结构,很多面向用户的结构(如 UnixStream 等)都只是 PollEvented 的包装。

PollEvented 创建时即会做 fd 和 interest 的注册,drop 时会解注册。IO 的就绪状态和等待者是在一个统一的 Slab 中管理的(对应 Registration 这个概念)。PollEvented 内部包含 io 和 注册信息 两部分;注册信息 又持有了 Driver Handle 和 Ref。

在 PollEvented 创建时,会从 TLS 拿到当前 Driver 的 Handle,并将自己的 Interest 注册上去。注册的时候 Driver 内部会在 slab 中分配一块空间存放其状态信息(即 ScheduledIo),并在返回的注册信息中记录该状态在 slab 中的 index。

Sleep

以 MultiThread 为例,就可以看到其中 Handle 的具体实现,其中实现相关的代码可以查看 src/time 以及 src/runtime/time 中。

tokio::time::sleep_until() src/time/sleep.rs
 |-Sleep::new_timeout()
   |-TimerEntry::new() 这里会返回 Sleep 对象

Sleep 实现了 Future 接口。

pub struct Sleep {
    inner: Inner,
    #[pin]
    entry: TimerEntry,
}

impl Future for Sleep {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
        match ready!(self.as_mut().poll_elapsed(cx)) {
            Ok(()) => Poll::Ready(()),
            Err(e) => panic!("timer error: {}", e),
        }
    }
}

详细调用逻辑如下。

Sleep::poll()
 |-Sleep::poll_elapsed()
   |-TimerEntry::poll_elapsed()
     |-TimerEntry::reset()
       |-TimerShared::extend_expiration()
       |-Handle::register()  src/runtime/time/mod.rs
create_time_driver

调用队列

// 本地队列实现
runtime/scheduler/multi_thread/queue.rs
/// Producer handle. May only be used from a single thread.
pub(crate) struct Local<T: 'static> {
    inner: Arc<Inner<T>>,
}
/// Consumer handle. May be used from many threads.
pub(crate) struct Steal<T: 'static>(Arc<Inner<T>>);
pub(crate) struct Inner<T: 'static> {
    head: AtomicUnsignedLong,
    tail: AtomicUnsignedShort,
    buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>,
}

对于 Local 有对应的 push_back() 等函数的实现。

Handle::schedule_task()[runtime/scheduler/multi_thread/worker.rs]
 |-Handle::schedule_local()

Context::run()[runtime/scheduler/multi_thread/worker.rs] 开始在死循环中运行
 |-Core::tick() 增加 tick 计数
 |-Context.maintenance() 处理一些维护任务
 |-Core::next_task()[runtime/scheduler/multi_thread/worker.rs] 获取下一个需要执行的任务,会先尝试本地Worker获取
 |-Context::run_task() 获取然后真正开始执行任务
 | |-Local::push_back_or_overflow()[runtime/scheduler/multi_thread/queue.rs]
 |   |-Local::push_overflow()
 |-Core::steal_work() 本地无任务,尝试从其它线程获取,否则从全局获取
 | |-Steal::steal_into() 开始窃取
 | |-Handle::next_remote_task()[runtime/scheduler/multi_thread/worker.rs] 不行从全局获取,这里会加锁的
 |   |-Shared::pop()[runtime/scheduler/inject/shared.rs]
 |     |-Shared::pop_n() 会获取N个任务,这里就是1了,会封装为Pop
 |     |-Pop::next()[runtime/scheduler/inject/pop.rs]
 |-Context::run_task() 获取后同样会运行
 |-Context.park() 没有任务开始等待

// runtime/task/raw.rs
/// Raw task handle
#[derive(Clone)]
pub(crate) struct RawTask {
    ptr: NonNull<Header>,
}

// 真正保存全局任务队列的实现
// runtime/scheduler/inject/synced.rs
pub(crate) struct Synced {
    /// True if the queue is closed.
    pub(super) is_closed: bool,

    /// Linked-list head.
    pub(super) head: Option<task::RawTask>,

    /// Linked-list tail.
    pub(super) tail: Option<task::RawTask>,
}

// runtime/scheduler/inject/shared.rs
pub(crate) struct Shared<T: 'static> {
    /// Number of pending tasks in the queue. This helps prevent unnecessary
    /// locking in the hot path.
    pub(super) len: AtomicUsize,
    _p: PhantomData<T>,
}

// 所有Worker的共享信息,包含全局队列实现
// runtime/scheduler/multi_thread/worker.rs
pub(crate) struct Shared {
    /// Per-worker remote state. All other workers have access to this and is
    /// how they communicate between each other.
    remotes: Box<[Remote]>,

    /// Global task queue used for:
    ///  1. Submit work to the scheduler while **not** currently on a worker thread.
    ///  2. Submit work to the scheduler when a worker run queue is saturated
    pub(super) inject: inject::Shared<Arc<Handle>>,

    /// Data synchronized by the scheduler mutex
    pub(super) synced: Mutex<Synced>,
}

// runtime/scheduler/multi_thread/handle.rs
pub(crate) struct Handle {
    /// Task spawner
    pub(super) shared: worker::Shared,

    /// Resource driver handles
    pub(crate) driver: driver::Handle,

    /// Blocking pool spawner
    pub(crate) blocking_spawner: blocking::Spawner,

    /// Current random number generator seed
    pub(crate) seed_generator: RngSeedGenerator,
}

// 注意,如下 Pop 时会同时传入 Synced 实现
fn next_remote_task(&self) -> Option<Notified> {
    if self.shared.inject.is_empty() {
        return None;
    }
    let mut synced = self.shared.synced.lock();
    // safety: passing in correct `idle::Synced`
    unsafe { self.shared.inject.pop(&mut synced.inject) }
}

另外,会通过 runtime/coop.rs 处理协作式的调度。

源码解析

简单来说,这也是一个通过 async 修饰的 Future 实现。

TcpStream::connect() net/tcp/stream.rs 这是一个 async 函数
 |-to_socket_addrs() 这里应该是做DNS解析,每个解析后的地址会调用如下函数
 |-TcpStream::connect_addr() 建立链接
   |-TcpStream::connect_mio()
     |-TcpStream::new()
     | |-PollEvented::new()
     |   |-PollEvented::new_with_interest()
     |     |-PollEvented::new_with_interest_and_handle()
     |       |-Registration::new_with_interest_and_handle() 注意这里的第三个参数是 scheduler::Handle::current() 获取,也就是 scheduler.Handle,实际调用的是 handle.driver().io().add_source(io, interest)
     |         |-Handle::driver() 这里会根据类型选择 MultiThread、CurrentThread 等实现
     |-poll_fn() 这里会调用 poll_fn() 简单封装一个 Future 实现,详见future/poll_fn.rs,暂不清楚为什么
       |-poll_read_ready() 这里有很多

Task

// runtime/task/core.rs
pub(crate) struct Header {
    pub(super) state: State,
    pub(super) queue_next: UnsafeCell<Option<NonNull<Header>>>,
    pub(super) vtable: &'static Vtable,
    pub(super) owner_id: UnsafeCell<Option<NonZeroU64>>,
    pub(super) tracing_id: Option<tracing::Id>,
}
pub(super) struct CoreStage<T: Future> {
    stage: UnsafeCell<Stage<T>>,
}
pub(super) struct Core<T: Future, S> {
    pub(super) scheduler: S,
    pub(super) task_id: Id,
    pub(super) stage: CoreStage<T>,
}
pub(super) struct Cell<T: Future, S> {
    pub(super) header: Header,
    pub(super) core: Core<T, S>,
    pub(super) trailer: Trailer,
}

// runtime/task/raw.rs
pub(crate) struct RawTask {
    ptr: NonNull<Header>,
}

// runtime/task/mod.rs
pub(crate) struct Task<S: 'static> {
    raw: RawTask,
    _p: PhantomData<S>,
}
pub(crate) struct Notified<S: 'static>(Task<S>);
pub(crate) struct LocalNotified<S: 'static> {
    task: Task<S>,
    _not_send: PhantomData<*const ()>,
}
pub(crate) struct UnownedTask<S: 'static> {
    raw: RawTask,
    _p: PhantomData<S>,
}




# LocalSet
对于多线程 runtime 来说,任务可能会在不同的线程上执行,对于某些任务如果只能在固定线程运行,那么可以使用 LocalSet 功能。
https://rust-book.junmajinlong.com/ch100/02_understand_tokio_task.html
https://www.cnblogs.com/SantiagoZhang/p/16505647.html

# 源码解析
## 任务创建

Runtime.block_on() 这里会封装一个入口,实际调用通过 Scheduler 设置,包括 CurrentThread、MultiThread 等
 |-MultiThread.block_on()
   |-runtime::context::enter_runtime() 这里的调用逻辑实在是有点复杂,没有搞懂 src/runtime/context/runtime.rs ,猜测应该是调用如下函数
     |-BlockingRegionGuard::block_on()
       |-CachedParkThread::block_on() 对应的 Future 会传递到这里,这里会构造 Waker 等,有死循环
         |-CachedParkThread::waker()
fn block_on<F: Future>(&self, future: F) -> F::Output




// 定义了虚拟表 runtime/task/waker.rs
static WAKER_VTABLE: RawWakerVTable =
    RawWakerVTable::new(clone_waker, wake_by_val, wake_by_ref, drop_waker);
fn raw_waker(header: NonNull<Header>) -> RawWaker {
    let ptr = header.as_ptr() as *const ();
    RawWaker::new(ptr, &WAKER_VTABLE)
}

src/runtime/task/mod.rs
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
    raw: RawTask,
    _p: PhantomData<S>,
}





RawTask.wake_by_val()


runtime 实现了 Send  Sync 这两个 Trait,因此可以将 runtime 包在 Arc 里,然后跨线程使用相同的 runtime 实现。




MultiThread::block_on() runtime/scheduler/multi_thread/mod.rs
 |-runtime::context::enter_runtime() 调用时会传入一个匿名函数,最终调用如下
   |-BlockingRegionGuard::block_on() runtime/context/blocking.rs
     |-CachedParkThread::block_on() runtime/park.rs 这里会阻塞循环调用
pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
    use std::task::Context;
    use std::task::Poll::Ready;

    let waker = self.waker()?;
    let mut cx = Context::from_waker(&waker);

    pin!(f);

    loop {
        if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
            return Ok(v);
        }
        self.park();
    }
}
这里实际上就是一直调用该函数了,除此之外,还包括了 spawn() 函数。在执行 spawn() 函数时,有两种方法:
* Runtime[runtime/handle.rs] 最终会调用具体 Scheduler 执行,例如 MultiThread 对应了 runtime/scheduler/multi_thread/handle.rs 中的 Handle 实现。
* tokio::spawn() 这也是最常用的,在 lib.rs 中通过 pub use task::spawn 将相关实现保留到了库级别。
Handle::spawn() runtime/handle.rs
 |-Handle::spawn_named()
   |-runtime::task::Id::next() 获取任务ID
     |-MultiThread::Handle::spawn() 这里通过 self.inner.spawn(future, id) 调用,如下以 MultiThread 实现为例
task::spawn() task/spawn.rs
 |-task::spawn_inner()
   |-MultiThread::Handle::spawn() 会通过 context::with_current 调用
这种方式要求 Future  Future + Send + 'static




# MultiThread


runtime/scheduler/multi_thread/worker.rs
/// State shared across all workers
pub(crate) struct Shared {
    pub(crate) owned: OwnedTasks<Arc<Handle>>, // 添加到该Executor的任务列表



    /// Per-worker remote state. All other workers have access to this and is
    /// how they communicate between each other.
    remotes: Box<[Remote]>,

    /// Global task queue used for:
    ///  1. Submit work to the scheduler while **not** currently on a worker thread.
    ///  2. Submit work to the scheduler when a worker run queue is saturated
    pub(super) inject: inject::Shared<Arc<Handle>>,

    /// Coordinates idle workers
    idle: Idle,


    /// Data synchronized by the scheduler mutex
    pub(super) synced: Mutex<Synced>,

    /// Cores that have observed the shutdown signal
    ///
    /// The core is **not** placed back in the worker to avoid it from being
    /// stolen by a thread that was spawned as part of `block_in_place`.
    #[allow(clippy::vec_box)] // we're moving an already-boxed value
    shutdown_cores: Mutex<Vec<Box<Core>>>,

    /// The number of cores that have observed the trace signal.
    pub(super) trace_status: TraceStatus,

    /// Scheduler configuration options
    config: Config,

    /// Collects metrics from the runtime.
    pub(super) scheduler_metrics: SchedulerMetrics,

    pub(super) worker_metrics: Box<[WorkerMetrics]>,

    /// Only held to trigger some code on drop. This is used to get internal
    /// runtime metrics that can be useful when doing performance
    /// investigations. This does nothing (empty struct, no drop impl) unless
    /// the `tokio_internal_mt_counters` `cfg` flag is set.
    _counters: Counters,
}

runtime/scheduler/multi_thread/handle.rs
/// Handle to the multi thread scheduler
pub(crate) struct Handle {
    /// Task spawner
    pub(super) shared: worker::Shared,

    /// Resource driver handles
    pub(crate) driver: driver::Handle,

    /// Blocking pool spawner
    pub(crate) blocking_spawner: blocking::Spawner,

    /// Current random number generator seed
    pub(crate) seed_generator: RngSeedGenerator,
}


const TOKEN_WAKEUP: mio::Token = mio::Token(0);
const TOKEN_SIGNAL: mio::Token = mio::Token(1);



runtime/io/scheduled_io.rs
pub(crate) struct ScheduledIo {
    pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>,
    readiness: AtomicUsize, // 为了兼容32位,只用了32
    waiters: Mutex<Waiters>,
}
Driver::park() runtime/io/driver.rs
 |-Driver::turn() 会获取IO然后调度
   |-ScheduledIo::set_readiness()
这里对应的唤醒实现为 Handle.unpark() 实现。

/// Future returned by `readiness()`,这里会应用 Future 接口
struct Readiness<'a> {
    scheduled_io: &'a ScheduledIo,
    state: State,
    /// Entry in the waiter `LinkedList`.
    waiter: UnsafeCell<Waiter>,
}

runtime/io/registration.rs
pub(crate) struct Registration {
    /// Handle to the associated runtime.
    /// TODO: this can probably be moved into `ScheduledIo`.
    handle: scheduler::Handle,

    /// Reference to state stored by the driver.
    shared: Arc<ScheduledIo>,
}


net/tcp/stream.rs
pub struct TcpStream {
    io: PollEvented<mio::net::TcpStream>,
}
io/poll_evented.rs
pub(crate) struct PollEvented<E: Source> {
    io: Option<E>,
    registration: Registration,
}

pub(crate) fn new(io: E) -> io::Result<Self> {
    PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE)
}
pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
    Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current())
}
pub(crate) fn new_with_interest_and_handle(mut io: E, interest: Interest, handle: scheduler::Handle) -> io::Result<Self> {
    let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
    Ok(Self {
        io: Some(io),
        registration,
    })
}

runtime/io/registration.rs
pub(crate) fn new_with_interest_and_handle(io: &mut impl Source, interest: Interest, handle: scheduler::Handle) -> io::Result<Registration> {
    let shared = handle.driver().io().add_source(io, interest)?;  // 这里的数据结构是怎么保存的
    Ok(Registration { handle, shared })
}

add_source() 对应的实现在 runtime/io/driver.rs




runtime/io/driver.rs
/// I/O driver, backed by Mio.
pub(crate) struct Driver {
    /// True when an event with the signal token is received
    signal_ready: bool,

    /// Reuse the `mio::Events` value across calls to poll.
    events: mio::Events,

    /// The system event queue.
    poll: mio::Poll,
}

/// A reference to an I/O driver.
pub(crate) struct Handle {
    /// Registers I/O resources.
    registry: mio::Registry,

    /// Tracks all registrations
    registrations: RegistrationSet,

    /// State that should be synchronized
    synced: Mutex<registration_set::Synced>,

    /// Used to wake up the reactor from a call to `turn`.
    /// Not supported on `Wasi` due to lack of threading support.
    #[cfg(not(target_os = "wasi"))]
    waker: mio::Waker,

    pub(crate) metrics: IoDriverMetrics,
}

impl Handle {
    pub(super) fn add_source(&self, source: &mut impl mio::event::Source, interest: Interest) -> io::Result<Arc<ScheduledIo>> {
        let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
        let token = scheduled_io.token();

        // we should remove the `scheduled_io` from the `registrations` set if registering
        // the `source` with the OS fails. Otherwise it will leak the `scheduled_io`.
        if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
            // safety: `scheduled_io` is part of the `registrations` set.
            unsafe {
                self.registrations
                    .remove(&mut self.synced.lock(), &scheduled_io)
            };

            return Err(e);
        }

        // TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList`
        self.metrics.incr_fd_count();

        Ok(scheduled_io)
    }
}



几乎每个模块里都会有 Driver  Handle 两个模块,如下以简单的 signal 为例。
https://docs.rs/tokio/latest/tokio/signal/index.html


runtime/signal/mod.rs:33:pub(crate) struct Handle {
runtime/time/handle.rs:5:pub(crate) struct Handle {
runtime/io/driver.rs:32:pub(crate) struct Handle {
runtime/driver.rs:21:pub(crate) struct Handle {
runtime/handle.rs:14:pub struct Handle {
runtime/scheduler/current_thread/mod.rs:32:pub(crate) struct Handle {
runtime/scheduler/multi_thread_alt/handle.rs:17:pub(crate) struct Handle {
runtime/scheduler/multi_thread/handle.rs:21:pub(crate) struct Handle {

signal()
 |-signal_with_handle()
   |-signal_enable()
     |-Handle::check_inner()
pub fn signal(kind: SignalKind) -> io::Result<Signal> {
    let handle = scheduler::Handle::current();
    let rx = signal_with_handle(kind, handle.driver().signal())?;

    Ok(Signal {
        inner: RxFuture::new(rx),
    })
}

runtime/scheduler/defer.rs
pub(crate) struct Defer {
    deferred: RefCell<Vec<Waker>>,  // 这里是个数组啊
}

runtime/scheduler/multi_thread/worker.rs
pub(crate) struct Context {
    worker: Arc<Worker>,
    core: RefCell<Option<Box<Core>>>,
    pub(crate) defer: Defer,
}

runtime/scheduler/multi_thread/worker.rs 这里的 Run 也会调用如下函数
Handle::block_on()[runtime/handle.rs]
 |-context::enter_runtime()[runtime/scheduler/multi_thread/worker.rs]

其中设置,就在上述的 enter_runtime() 函数中。



https://zhuanlan.zhihu.com/p/104098627
https://juejin.cn/post/7216217118588747837
https://rustmagazine.github.io/rust_magazine_2021/chapter_12/monoio.html
https://rust-lang.github.io/async-book/02_execution/04_executor.html
https://tony612.github.io/tokio-internals/02_main_thread_2.html
https://www.driftluo.com/article/c88d853a-7ad4-47ea-b7b4-778dec039350
https://xie.infoq.cn/article/5694ce615d1095cf6e1a5d0ae
https://zhuanlan.zhihu.com/p/129273132
https://stevenbai.top/books-futures-explained/book/6_future_example.html
https://juejin.cn/post/7063022099689897991
https://zlotus.github.io/2021/04/14/writing-an-os-in-rust-4.1/
https://zhuanlan.zhihu.com/p/66028983
https://dongs.xyz/post/behind-tokio-spawn/

这里有个自己实现的,应该是字节的代码
https://www.ihcblog.com/rust-runtime-design-1/

https://stevenbai.top/books-futures-explained/book/1_futures_in_rust.html
https://rust-book.junmajinlong.com/ch100/01_understand_tokio_runtime.html






https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
src/util/rand.rs 这里有个 FastRand 的实现

runtime/scheduler/multi_thread/worker.rs
调度运行的核心

如何优雅关闭
https://tokio.rs/tokio/topics/shutdown



# MultiThread
// runtime/scheduler/multi_thread/handle.rs
pub(crate) struct Handle {
    pub(super) shared: worker::Shared, // 详见如下 Shared 实现
    pub(crate) driver: driver::Handle, // 应该是对应 struct Handle[runtime/driver.rs] 实现
    pub(crate) blocking_spawner: blocking::Spawner,
    pub(crate) seed_generator: RngSeedGenerator,
}
// 下面这个结构体里的 inject+synced 操作实在是没有搞懂啊,整这么复杂吗
// runtime/scheduler/multi_thread/worker.rs
pub(crate) struct Shared {
    remotes: Box<[Remote]>,
    pub(super) inject: inject::Shared<Arc<Handle>>,
    idle: Idle,
    pub(crate) owned: OwnedTasks<Arc<Handle>>,
    pub(super) synced: Mutex<Synced>,
    shutdown_cores: Mutex<Vec<Box<Core>>>,
    pub(super) trace_status: TraceStatus,
    config: Config,
    pub(super) scheduler_metrics: SchedulerMetrics,
    pub(super) worker_metrics: Box<[WorkerMetrics]>,
    _counters: Counters,
}
// 这里的 mutex 执行了 lock() 之后竟然可以跟 .inject 怀疑是 loom 工具的原因
// runtime/scheduler/multi_thread/worker.rs
pub(super) fn close(&self) {
    if self.shared.inject.close(&mut self.shared.synced.lock().inject) {
        self.notify_all();
    }
}
// runtime/scheduler/inject/shared.rs
pub(crate) struct Shared<T: 'static> {
    pub(super) len: AtomicUsize,
    _p: PhantomData<T>,
}
// runtime/scheduler/inject/synced.rs
pub(crate) struct Synced {
    pub(super) is_closed: bool,
    pub(super) head: Option<task::RawTask>,
    pub(super) tail: Option<task::RawTask>,
}

全局任务获取,详见
next_remote_task()
 |-


/// State shared across all workers
pub(crate) struct Shared {
    /// Per-worker remote state. All other workers have access to this and is
    /// how they communicate between each other.
    remotes: Box<[Remote]>,

    /// Global task queue used for:
    ///  1. Submit work to the scheduler while **not** currently on a worker thread.
    ///  2. Submit work to the scheduler when a worker run queue is saturated
    pub(super) inject: inject::Shared<Arc<Handle>>,

    /// Coordinates idle workers
    idle: Idle,

    /// Collection of all active tasks spawned onto this executor.
    pub(crate) owned: OwnedTasks<Arc<Handle>>,

    /// Data synchronized by the scheduler mutex
    pub(super) synced: Mutex<Synced>,

    /// Cores that have observed the shutdown signal
    ///
    /// The core is **not** placed back in the worker to avoid it from being
    /// stolen by a thread that was spawned as part of `block_in_place`.
    #[allow(clippy::vec_box)] // we're moving an already-boxed value
    shutdown_cores: Mutex<Vec<Box<Core>>>,

    /// The number of cores that have observed the trace signal.
    pub(super) trace_status: TraceStatus,

    /// Scheduler configuration options
    config: Config,

    /// Collects metrics from the runtime.
    pub(super) scheduler_metrics: SchedulerMetrics,

    pub(super) worker_metrics: Box<[WorkerMetrics]>,

    /// Only held to trigger some code on drop. This is used to get internal
    /// runtime metrics that can be useful when doing performance
    /// investigations. This does nothing (empty struct, no drop impl) unless
    /// the `tokio_internal_mt_counters` `cfg` flag is set.
    _counters: Counters,
}

fn next_remote_task(&self) -> Option<Notified> {
    if self.shared.inject.is_empty() {
        return None;
    }
    let mut synced = self.shared.synced.lock();
    // safety: passing in correct `idle::Synced`
    unsafe { self.shared.inject.pop(&mut synced.inject) }
}

MIO

Rust 一个轻量级异步 IO 库,对不同操作系统的底层 API 封装,例如 Linux 采用 epoll,Windows 采用 IOCP,OSX 下为 kqueue 等。实现采用与 epoll 类似的 Readiness 机制,所以大部分的接口能做到一一映射,而 IOCP 基于 Completion 有很多适配代码。

如下是简单示例,注意,需要开启 features=["os-poll", "net"] 才可以。

use log::info;

use mio::net::TcpListener;
use mio::{Events, Interest, Poll, Token};

pub fn main() {
    env_logger::init();

    const SERVER: Token = Token(0); // to identify which event.
    let mut poll = Poll::new().unwrap();
    let mut events = Events::with_capacity(128); // storage for events.

    let addr = "127.0.0.1:9090".parse().unwrap();
    let mut server = TcpListener::bind(addr).unwrap();
    poll.registry()
        .register(&mut server, SERVER, Interest::READABLE)
        .unwrap();
    info!("Listening {}.", addr);

    loop {
        poll.poll(&mut events, None).unwrap(); // blocking until we got an event.
        for event in events.iter() {
            match event.token() {
                SERVER => {
                    let (conn, peer) = server.accept().unwrap();
                    info!("Got new connection {}.", peer);
                    drop(conn);
                }
                _ => unreachable!(),
            }
        }
    }
}

其它

4165601b 提交中引入了新的 multi-threaded runtime 实现,可以有效提升调度效率,不过在超过 16 核之后,由于 mutex 会引入比较大的性能下降,所以当前仅作为测试版本,详见 tokio/src/runtime/scheduler/multi_thread_alt 中的实现。

参考