tokio源码阅读1
tokio源码阅读1
codemap
- 先把
runtime/mod.rs的文档看完(尤其是“Detailed runtime behavior”那段)Docs.rs - 然后按这个顺序看代码:
runtime.rs+builder.rs+config.rshandle.rs+context.rspark.rsscheduler/+runtime::task(配合 coop 文档一起看)driver/+io/+time/(把 reactor + timer mental model补齐)- 最后再看
metrics.rs、task_hooks.rs、dump.rs这些“观测层”。
runtime文档
runtime的基本构成:
//! * An **I/O event loop**, called the driver, which drives I/O resources and
//! dispatches I/O events to tasks that depend on them.
//! * A **scheduler** to execute [tasks] that use these I/O resources.
//! * A **timer** for scheduling work to run after a set period of time.那么总体看,runtime就是围绕IO事件循环,调度器,计时器组成
+------------------------------------------------------+
//! | Do you want work-stealing or multi-thread scheduler? |
//! +------------------------------------------------------+
//! | Yes | No
//! | |
//! | |
//! v |
//! +------------------------+ |
//! | Multi-threaded Runtime | |
//! +------------------------+ |
//! |
//! V
//! +--------------------------------+
//! | Do you execute `!Send` Future? |
//! +--------------------------------+
//! | Yes | No
//! | |
//! V |
//! +--------------------------+ |
//! | Local Runtime (unstable) | |
//! +--------------------------+ |
//! |
//! v
//! +------------------------+
//! | Current-thread Runtime |
//! +------------------------+这里进一步描述,就是如果你要采用多线程还是单线程,这里有一个三选一。多线程单线程运行时,本地运行时(这个我觉得先忽略掉后面再看)。
The tokio runtime is not NUMA (Non-Uniform Memory Access) aware.
//! You may want to start multiple runtimes instead of a single runtime
//! for better performance on NUMA systems.这里的NUMA不知道是什么,我觉得后面应该查一查
//! Tokio provides multiple task scheduling strategies, suitable for different
//! applications. The [runtime builder] or `#[tokio::main]` attribute may be
//! used to select which scheduler to use.这里写了就是说,运行时有两种构建模式,并且提供一个多任务调度策略,可以通过自行构造runtime和添加宏属性进行切换。
//! The multi-thread scheduler executes futures on a _thread pool_, using a
//! work-stealing strategy. By default, it will start a worker thread for each
//! CPU core available on the system. This tends to be the ideal configuration
//! for most applications. The multi-thread scheduler requires the `rt-multi-thread`这里就提到,多线程调度是基于一个线程池,采用工作窃取的策略,在默认状态下,他会通过系统中的所有CPU启动所有工作线程。多线程调度是tokio推荐的选择(因为很多功能才能使用),然后是通过rt-multi-thread的flag去启用这个依赖。这里提到一个非常重要的概念,就是tokio的worker多线程模式下的工作线程是取决于你的CPU核数的,核数越多,线程越多,可是不会限制设置工作线程,不过实际上并不会使得其效率更高,因为worker本质上是一个poll future的CPU密集的工作,所以线程数大于核心数只会带来更多不必要的上下文切换。
The current-thread scheduler provides a _single-threaded_ future executor.
//! All tasks will be created and executed on the current thread. This requires
//! the `rt` feature flag.单线程则是提供一个单一线程的执行器,所有任务都是被创建和执行在一个单一线程,然后通过rt这个flag启用
//! When configuring a runtime by hand, no resource drivers are enabled by
//! default. In this case, attempting to use networking types or time types will
//! fail. In order to enable these types, the resource drivers must be enabled.
//! This is done with [`Builder::enable_io`] and [`Builder::enable_time`]. As a
//! shorthand, [`Builder::enable_all`] enables both resource drivers.这里说,你要是自己开,默认是没有io事件循环和计时器的,但是可以通过方法调用的形式使用默认提供的工具。
//! The runtime may spawn threads depending on its configuration and usage. The
//! multi-thread scheduler spawns threads to schedule tasks and for `spawn_blocking`
//! calls.运行时所使用的线程是基于配置和当前使用状况,但是多线程调度的线程使用状况是基于spawn_blocking这个方法的调用情况。
While the `Runtime` is active, threads may shut down after periods of being
//! idle. Once `Runtime` is dropped, all runtime threads have usually been
//! terminated, but in the presence of unstoppable spawned work are not
//! guaranteed to have been terminated. See the
//! [struct level documentation](Runtime#shutdown) for more details.就是当运行时是运作的,那么线程就会关闭并处于一个限制状态,运行时关掉一个,所有运行时都会关掉,但在存在无法停止的持续工作的情况下,这些工作并不能保证已经完成。这个应该是是否阻塞等待
//! At its most basic level, a runtime has a collection of tasks that need to be
//! scheduled. It will repeatedly remove a task from that collection and
//! schedule it (by calling [`poll`]). When the collection is empty, the thread
//! will go to sleep until a task is added to the collection.
//!
//! However, the above is not sufficient to guarantee a well-behaved runtime.
//! For example, the runtime might have a single task that is always ready to be
//! scheduled, and schedule that task every time. This is a problem because it
//! starves other tasks by not scheduling them. To solve this, Tokio provides
//! the following fairness guarantee:这里就直接解释runtime的基本执行路径,简单点就是,runtime会维护一个集合,然后当这个集合存在任务的时候,就会通过poll把这个任务拿出来执行,如果这个集合没有任何任务,那么所有线程都会休眠,直到有任务进来。但是这并不能保证这个过程是好的行为,因为存在反复调度同一个任务,使得线程始终给占用,然后下面给了一个解决方法
If the total number of tasks does not grow without bound, and no task is
//! > [blocking the thread], then it is guaranteed that tasks are scheduled
//! > fairly.这里提到一个场景,就是,任务不会无限增长,没有任务阻塞当前线程,那么这就可以说线程调度是公平的。
//! > Under the following two assumptions:
//! >
//! > * There is some number `MAX_TASKS` such that the total number of tasks on
//! > the runtime at any specific point in time never exceeds `MAX_TASKS`.
//! > * There is some number `MAX_SCHEDULE` such that calling [`poll`] on any
//! > task spawned on the runtime returns within `MAX_SCHEDULE` time units.
//! > Then, there is some number `MAX_DELAY` such that when a task is woken, it
//! > will be scheduled by the runtime within `MAX_DELAY` time units.这里提到两个假设,首先是最大任务量,这个用于限制runtime所处理的任务不会超过这个数,其次是最大执行事件,限制每个正在执行任务的task单位事件不能超过该时间(就是每个任务在执行的时候都有这个时间限制,无论是否执行完都必须中断)至于最后一个设置限制任务唤醒的时候出现超时所等待的时间。 这里补充一点就是,在前一个前提下(有限增长和无阻塞任务),若出现无限任务和阻塞任务的时候,tokio采取的策略是限制最大执行事件和单位事件的执行时间,以此达到前提的最优状况
//! (Here, `MAX_TASKS` and `MAX_SCHEDULE` can be any number and the user of
//! the runtime may choose them. The `MAX_DELAY` number is controlled by the
//! runtime, and depends on the value of `MAX_TASKS` and `MAX_SCHEDULE`.)这里说了就是,最后一个是又前两个参数和运行时共同控制。前两个可以自定义。
//! The current thread runtime maintains two FIFO queues of tasks that are ready
//! to be scheduled: the global queue and the local queue. The runtime will prefer
//! to choose the next task to schedule from the local queue, and will only pick a
//! task from the global queue if the local queue is empty, or if it has picked
//! a task from the local queue 31 times in a row. The number 31 can be
//! changed using the [`global_queue_interval`] setting.这里说单线程就简单的多,直接一个先进先出,但是维护两个队列,第一个是全局队列,第二个是本地队列,优先处理本地队列,然后处理一定次数之后才到全局队列,次数可以通过global_queue_interval配置
//! The multi thread runtime uses [the lifo slot optimization]: Whenever a task
//! wakes up another task, the other task is added to the worker thread's lifo
//! slot instead of being added to a queue. If there was already a task in the
//! lifo slot when this happened, then the lifo slot is replaced, and the task
//! that used to be in the lifo slot is placed in the thread's local queue.
//! When the runtime finishes scheduling a task, it will schedule the task in
//! the lifo slot immediately, if any. When the lifo slot is used, the [coop
//! budget] is not reset. Furthermore, if a worker thread uses the lifo slot
//! three times in a row, it is temporarily disabled until the worker thread has
//! scheduled a task that didn't come from the lifo slot. The lifo slot can be
//! disabled using the [`disable_lifo_slot`] setting. The lifo slot is separate
//! from the local queue, so other worker threads cannot steal the task in the
//! lifo slot.这里说的是,多线程会有一个lifo slot和一个local queue,当任务A准备poll的时候,下一个任务如果相关,那么会直接进入lifo slot,作为下一个任务进行处理,它让 task 在同个线程立即执行,而不进入队列。提升 cache locality、减少原子操作,并降低延迟。
runtime.rs+config.rs+builder.rs
runtime.rs
/// The runtime provides an I/O driver, task scheduler, [timer], and
/// blocking pool, necessary for running asynchronous tasks.这里和mod.rs的描述多了一个阻塞池
/// Instances of `Runtime` can be created using [`new`], or [`Builder`].
/// However, most users will use the [`#[tokio::main]`][main] annotation on
/// their entry point instead.这里提到三种方法取创建runtime,第一种直接new,第二种通过builder创建,最后一种则是直接使用main macos
/// Shutting down the runtime is done by dropping the value, or calling
/// [`shutdown_background`] or [`shutdown_timeout`].这里提到关闭runtime的方法有三个,第一个是运行时本身被删除了,第二是调用shutdown_background和shutdown_timeout这两个方法
/// Tasks spawned through [`Runtime::spawn`] keep running until they yield.
/// Then they are dropped. They are not *guaranteed* to run to completion, but
/// *might* do so if they do not yield until completion.第一个例子,就是任务会通过Runtime::spawn去完成任务,直到任务完成(这里的任务完成并不是业务语义,而是这个任务的状态机变成Poll::Ready了),任务完成之后,任务会被删除,但是这并不能确保任务(这里的任务就是业务语义了)已经完成,而且任务可能永远pending
/// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running
/// until they return.如果存在阻塞的函数也是一样的,只不过通过Runtime::spawn_blocking执行,但是只有函数自己返回了之后才会结束阻塞
/// The thread initiating the shutdown blocks until all spawned work has been
/// stopped. This can take an indefinite amount of time. The `Drop`
/// implementation waits forever for this.在shutdown过程中,Tokio 会阻塞调用 shutdown 的线程,直到所有 spawn 和 spawn_blocking 启动的任务自然结束。
因为 Tokio 不强制取消 async 任务或阻塞任务,如果其中任何一个任务永不结束,runtime 的 Drop 将永远阻塞(死等)。
/// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if
/// waiting forever is undesired. When the timeout is reached, spawned work that
/// did not stop in time and threads running it are leaked. The work continues
/// to run until one of the stopping conditions is fulfilled, but the thread
/// initiating the shutdown is unblocked.在存在无法及时结束的任务时,shutdown_background 或 shutdown_timeout 可以避免无限阻塞。
这两个方法不会等待所有 blocking 任务正常结束;一旦 timeout 到达,仍未结束的 blocking 任务(连同执行它们的线程)会被泄露,Tokio 将不再管理它们。 这些阻塞任务仍会继续运行,直到它们自己返回,但发起 shutdown 的线程会立即解除阻塞。
/// Once the runtime has been dropped, any outstanding I/O resources bound to
/// it will no longer function. Calling any method on them will result in an
/// error.一旦runtime(所有的对象)被 drop,所有绑定到该 runtime 的异步 IO 资源(例如 TcpStream、TcpListener)将失去 reactor 驱动能力,它们的异步方法不再有效。此时再调用任何 async IO 操作都会立即返回错误。
/// # Sharing
///
/// There are several ways to establish shared access to a Tokio runtime:
///
/// * Using an <code>[Arc]\<Runtime></code>.
/// * Using a [`Handle`].
/// * Entering the runtime context.
///
/// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various
/// things with the runtime such as spawning new tasks or entering the runtime
/// context. Both types can be cloned to create a new handle that allows access
/// to the same runtime. By passing clones into different tasks or threads, you
/// will be able to access the runtime from those tasks or threads.这里提到,有三种方法可以共享rumtime,其中handle和Arc<Runtime>都是可以通过克隆的形式去共享runtime句柄,并且可以在不同的任务和线程中使用。
/// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that
/// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down,
/// whereas a [`Handle`] does not prevent that. This is because shutdown of the
/// runtime happens when the destructor of the `Runtime` object runs.
///
/// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive
/// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>,
/// this can be achieved via [`Arc::try_unwrap`] when only one strong count
/// reference is left over.这两者的区别在于Arc<Rutime>是会阻止runtime结束的(因为runtime的结束取决于所有Runtime对象结束生命,所以只要有程序持有这个引用,则会无法触发结束命令),但是Handle并不会。因为runtime的结束会解构Runtime本身这个对象,显然Handle没有拥有这个Runtime本身的所有权。
/// The runtime context is entered using the [`Runtime::enter`] or
/// [`Handle::enter`] methods, which use a thread-local variable to store the
/// current runtime. Whenever you are inside the runtime context, methods such
/// as [`tokio::spawn`] will use the runtime whose context you are inside.至于通过进入runtime来达到共享的目的,这里提供两个方法,第一个是Runtime::enter,第二个是Handle::enter,这两个的作用,是将当前线程的 Tokio Runtime 设置到一个线程局部变量(TLS)中。当你处于 this runtime context 内部时,tokio::spawn、tokio::time::sleep 等全局函数会自动使用这个被设置的 runtime。
/// Creates a new runtime instance with default configuration values.
///
/// This results in the multi threaded scheduler, I/O driver, and time driver being
/// initialized.
///
/// Most applications will not need to call this function directly. Instead,
/// they will use the [`#[tokio::main]` attribute][main]. When a more complex
/// configuration is necessary, the [runtime builder] may be used.
///
/// See [module level][mod] documentation for more details.
///
/// # Examples
///
/// Creating a new `Runtime` with default configuration values.
///
/// ```
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// // Use the runtime...
/// ```
///
/// [mod]: index.html
/// [main]: ../attr.main.html
/// [threaded scheduler]: index.html#threaded-scheduler
/// [runtime builder]: crate::runtime::Builder
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new() -> std::io::Result<Runtime> {
Builder::new_multi_thread().enable_all().build()
}这里提供了一个创建runtime示例的例子,用的是默认配置。但是这里是默认多线程
/// Returns a handle to the runtime's spawner.
///
/// The returned handle can be used to spawn tasks that run on this runtime, and can
/// be cloned to allow moving the `Handle` to other threads.
///
/// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone.
/// Refer to the documentation of [`Handle::block_on`] for more.
///
/// # Examples
///
/// ```
/// # #[cfg(not(target_family = "wasm"))]
/// # {
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// let handle = rt.handle();
///
/// // Use the handle...
/// # }
/// ```
pub fn handle(&self) -> &Handle {
&self.handle
}这里提到可以通过这个方法调用runtime里面的handle,并允许克隆至其他线程使用,不过需要注意的是,如果在单线程的状态下,使用Handle::block_on会导致死锁,因为这个会阻塞当前线程,而单线程情况下,Future需要通过poll改变阻塞状态,结果就是死锁。
/// Spawns a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
/// The provided future will start running in the background immediately
/// when `spawn` is called, even if you don't await the returned
/// `JoinHandle`.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```
/// # #[cfg(not(target_family = "wasm"))]
/// # {
/// use tokio::runtime::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
///
/// // Spawn a future onto the runtime
/// rt.spawn(async {
/// println!("now running on a worker thread");
/// });
/// # }
/// # }
/// ```
#[track_caller]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.handle
.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
} else {
self.handle
.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
}
}这里就是spawn的基本核心了,当调用这个方法的时候,runtime会将任务放到线程池里面,但是这个线程池不是一般意义上的线程,这里指的是工作线程,就是前面的worker thread,然后这个线程负责保证future的状态变更,直至完成。当这个方法被调用的时候,纵使没有await,任务也会在后台立即执行。
/// Runs a future to completion on the Tokio runtime. This is the
/// runtime's entry point.
///
/// This runs the given future on the current thread, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers
/// which the future spawns internally will be executed on the runtime.
///
/// # Non-worker future
///
/// Note that the future required by this function does not run as a
/// worker. The expectation is that other tasks are spawned by the future here.
/// Awaiting on other futures from the future provided here will not
/// perform as fast as those spawned as workers.
///
/// # Multi thread scheduler
///
/// When the multi thread scheduler is used this will allow futures
/// to run within the io driver and timer context of the overall runtime.
///
/// Any spawned tasks will continue running after `block_on` returns.
///
/// # Current thread scheduler
///
/// When the current thread scheduler is enabled `block_on`
/// can be called concurrently from multiple threads. The first call
/// will take ownership of the io and timer drivers. This means
/// other threads which do not own the drivers will hook into that one.
/// When the first `block_on` completes, other threads will be able to
/// "steal" the driver to allow continued execution of their futures.
///
/// Any spawned tasks will be suspended after `block_on` returns. Calling
/// `block_on` again will resume previously spawned tasks.
///
/// # Panics
///
/// This function panics if the provided future panics, or if called within an
/// asynchronous execution context.block_on 会在当前线程上同步运行一个 Future,并阻塞线程直到 Future 完成。该 Future 内部产生的 task 和 timer 会在同一个 runtime 上执行。 在多线程 runtime 中,block_on 所运行的 Future 可以访问整个 runtime 的 I/O 与 timer 驱动,且内部 spawn 的任务会在 block_on 返回后继续运行。 在单线程 runtime 中,第一个调用 block_on 的线程会获得 I/O 与 timer 驱动的所有权;其它线程会挂靠在该驱动上执行。spawn 的任务在 block_on 返回时会暂停,下次调用 block_on 时恢复执行。 如果在 async 上下文中调用 block_on,或 Future 本身 panic,则本函数会 panic。
config.rs
#![cfg_attr(
any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
allow(dead_code)
)]
use crate::runtime::{Callback, TaskCallback};
use crate::util::RngSeedGenerator;
pub(crate) struct Config {
/// How many ticks before pulling a task from the global/remote queue?
pub(crate) global_queue_interval: Option<u32>,
/// How many ticks before yielding to the driver for timer and I/O events?
pub(crate) event_interval: u32,
/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,
/// Callback for a worker unparking itself
pub(crate) after_unpark: Option<Callback>,
/// To run before each task is spawned.
pub(crate) before_spawn: Option<TaskCallback>,
/// To run after each task is terminated.
pub(crate) after_termination: Option<TaskCallback>,
/// To run before each poll
#[cfg(tokio_unstable)]
pub(crate) before_poll: Option<TaskCallback>,
/// To run after each poll
#[cfg(tokio_unstable)]
pub(crate) after_poll: Option<TaskCallback>,
/// The multi-threaded scheduler includes a per-worker LIFO slot used to
/// store the last scheduled task. This can improve certain usage patterns,
/// especially message passing between tasks. However, this LIFO slot is not
/// currently stealable.
///
/// Eventually, the LIFO slot **will** become stealable, however as a
/// stop-gap, this unstable option lets users disable the LIFO task.
pub(crate) disable_lifo_slot: bool,
/// Random number generator seed to configure runtimes to act in a
/// deterministic way.
pub(crate) seed_generator: RngSeedGenerator,
/// How to build poll time histograms
pub(crate) metrics_poll_count_histogram: Option<crate::runtime::HistogramBuilder>,
#[cfg(tokio_unstable)]
/// How to respond to unhandled task panics.
pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
}这个 Config 结构体是 Tokio 多线程调度器(scheduler)的内部配置项,包含调度行为参数,以及任务执行期间的各种回调钩子。
builder.rs
/// Creates the configured `Runtime`.
///
/// The returned `Runtime` instance is ready to spawn tasks.
///
/// # Examples
///
/// ```
/// # #[cfg(not(target_family = "wasm"))]
/// # {
/// use tokio::runtime::Builder;
///
/// let rt = Builder::new_multi_thread().build().unwrap();
///
/// rt.block_on(async {
/// println!("Hello from the Tokio runtime");
/// });
/// # }
/// ```
pub fn build(&mut self) -> io::Result<Runtime> {
match &self.kind {
Kind::CurrentThread => self.build_current_thread_runtime(),
#[cfg(feature = "rt-multi-thread")]
Kind::MultiThread => self.build_threaded_runtime(),
}
}这里是builder的入口函数了,builder同时支持多线程和单线程构造方式
fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::runtime::Scheduler;
let (scheduler, handle, blocking_pool) =
self.build_current_thread_runtime_components(None)?;
Ok(Runtime::from_parts(
Scheduler::CurrentThread(scheduler),
handle,
blocking_pool,
))
}
fn build_current_thread_runtime_components(
&mut self,
local_tid: Option<ThreadId>,
) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
use crate::runtime::scheduler;
use crate::runtime::Config;
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
// Blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
let blocking_spawner = blocking_pool.spawner().clone();
// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();
// And now put a single-threaded scheduler on top of the timer. When
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let (scheduler, handle) = CurrentThread::new(
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
before_spawn: self.before_spawn.clone(),
#[cfg(tokio_unstable)]
before_poll: self.before_poll.clone(),
#[cfg(tokio_unstable)]
after_poll: self.after_poll.clone(),
after_termination: self.after_termination.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
local_tid,
);
let handle = Handle {
inner: scheduler::Handle::CurrentThread(handle),
};
Ok((scheduler, handle, blocking_pool))
}我们在这里看到一个东西,就是两个seed_generator,这个就是tokio为了保持每个调度执行体都有独立的执行体,那么会给每个调度执行器提供一个独立的seed_generator,这样能保证避免全局锁 RNG,避免多线程争用,保证调度扰动彼此独立,
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, runtime::Scheduler};
use crate::runtime::scheduler::{self, MultiThread};
let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
// Create the blocking pool
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
let blocking_spawner = blocking_pool.spawner().clone();
// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();
let (scheduler, handle, launch) = MultiThread::new(
worker_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
before_spawn: self.before_spawn.clone(),
#[cfg(tokio_unstable)]
before_poll: self.before_poll.clone(),
#[cfg(tokio_unstable)]
after_poll: self.after_poll.clone(),
after_termination: self.after_termination.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
);
let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
// Spawn the thread pool workers
let _enter = handle.enter();
launch.launch();
Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
}tokio设计的代码还是很好的,单线程和多线程的创建几乎没有区别,除了:
let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
// 和
let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
// Spawn the thread pool workers
let _enter = handle.enter();
launch.launch();多线程为什么会有后面这个东西,因为worker的构造都需要一个入口,当前线程在没有handle.enter()的时候是不具备runtime上下文的,那么当handle.enter()执行之后,当前主线程上就会建立一个“上下文锚点”,然后我们就可以通过这个锚点启动所有的worker了,所以说整个创建大致是:
Builder
↓
选择调度器类型
↓
创建统一的 Driver(IO + Timer 的载体)
↓
创建 Blocking Pool
↓
组装成 Runtime多线程就是多了一些步骤:
Builder
↓
选择调度器类型
↓
创建统一的 Driver(IO + Timer 的载体)
↓
创建 Blocking Pool
↓
创建Runtime上下文锚点
↓
创建worker线程
↓
组装成 Runtime /// Enables both I/O and time drivers.
///
/// Doing this is a shorthand for calling `enable_io` and `enable_time`
/// individually. If additional components are added to Tokio in the future,
/// `enable_all` will include these future components.
///
/// # Examples
///
/// ```
/// # #[cfg(not(target_family = "wasm"))]
/// # {
/// use tokio::runtime;
///
/// let rt = runtime::Builder::new_multi_thread()
/// .enable_all()
/// .build()
/// .unwrap();
/// # }
/// ```
pub fn enable_all(&mut self) -> &mut Self {
#[cfg(any(
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal")
))]
self.enable_io();
#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux",
))]
self.enable_io_uring();
#[cfg(feature = "time")]
self.enable_time();
self
}总结
runtime的闭环调度大致符合这种:
1️⃣ Scheduler 持有所有 Task
2️⃣ Worker 从 Scheduler 取出一个 Task
3️⃣ Worker poll 这个 Task 的 Future
4️⃣ 如果返回 Pending:
- Future 把 Waker 注册到 Driver / IO / Timer
5️⃣ IO / Timer 事件发生:
- Driver 触发 Waker
6️⃣ Waker 把 Task 重新 push 回 Scheduler
7️⃣ 回到第 2️⃣ 步Tokio Runtime 架构与执行模型总结
Tokio Runtime 由三大核心组件构成:
Scheduler(调度器):负责任务的调度与再调度
Driver(IO + Timer 驱动):负责将操作系统事件转化为异步唤醒信号
Blocking Pool(阻塞线程池):隔离所有无法异步化的同步阻塞任务
Tokio 提供多种 Runtime 运行模型,其中:
- 单线程 Runtime(current_thread) 是最小实现,所有任务在同一个 OS 线程内调度执行;
- 多线程 Runtime(multi_thread) 是功能最完整的实现,采用 work-stealing 线程池模型,也是官方默认与推荐方案。
Tokio Runtime 的核心执行闭环是一个标准的异步调度模型:
Scheduler 负责管理所有 Task
Worker 线程从 Scheduler 中取出 Task 并 poll 执行
Future 返回 Pending 时,将 Waker 注册到 IO/Timer Driver
当 IO 或 Timer 事件发生时,由 Driver 触发 Waker
Waker 再将 Task 重新注入 Scheduler,进入下一轮调度从本质上看:
Tokio 是一个“事件驱动的用户态调度系统”,
不是一个简单的异步语法糖执行器。