tokio源码阅读2
tokio源码阅读2
Handle.rs
/// Handle to the runtime.
///
/// The handle is internally reference-counted and can be freely cloned. A handle can be
/// obtained using the [`Runtime::handle`] method.
///
/// [`Runtime::handle`]: crate::runtime::Runtime::handle()
#[derive(Debug, Clone)]
// When the `rt` feature is *not* enabled, this type is still defined, but not
// included in the public API.
pub struct Handle {
pub(crate) inner: scheduler::Handle,
}
#[derive(Debug, Clone)]
pub(crate) enum Handle {
#[cfg(feature = "rt")]
CurrentThread(Arc<current_thread::Handle>),
#[cfg(feature = "rt-multi-thread")]
MultiThread(Arc<multi_thread::Handle>),
// TODO: This is to avoid triggering "dead code" warnings many other places
// in the codebase. Remove this during a later cleanup
#[cfg(not(feature = "rt"))]
#[allow(dead_code)]
Disabled,
}
/// Handle to the multi thread scheduler
pub(crate) struct Handle {
/// Task spawner
pub(super) shared: worker::Shared,
/// Resource driver handles
pub(crate) driver: driver::Handle,
/// Blocking pool spawner
pub(crate) blocking_spawner: blocking::Spawner,
/// Current random number generator seed
pub(crate) seed_generator: RngSeedGenerator,
/// User-supplied hooks to invoke for things
pub(crate) task_hooks: TaskHooks,
}
/// Handle to the current thread scheduler
pub(crate) struct Handle {
/// Scheduler state shared across threads
shared: Shared,
/// Resource driver handles
pub(crate) driver: driver::Handle,
/// Blocking pool spawner
pub(crate) blocking_spawner: blocking::Spawner,
/// Current random number generator seed
pub(crate) seed_generator: RngSeedGenerator,
/// User-supplied hooks to invoke for things
pub(crate) task_hooks: TaskHooks,
/// If this is a `LocalRuntime`, flags the owning thread ID.
pub(crate) local_tid: Option<ThreadId>,
}这里看到我们看到,Handle是个枚举,其中,包含单线程和多线程,我们进一步拆解可以看到,多线程比单线程少一个local_tid属性,并且单线程的shared是私有字段,其实我们可以在字面意思可以推到出大致的设计,就是因为单线程不需要共享Handle所有share只需要私有化即可,并且要使得单线程可以支持前面的localRuntime模式,也要添加一个线程id,其他的字段几乎和多线程没有区别,那么我们可以得出,Handle由以下属性构成:
- 调度上下文共享
- 阻塞池生成器
- 随机数生成器
- 任务钩子
- IO对外句柄
/// Runtime context guard.
///
/// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits
/// the runtime context on drop.
///
/// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter
#[derive(Debug)]
#[must_use = "Creating and dropping a guard does nothing"]
pub struct EnterGuard<'a> {
_guard: context::SetCurrentGuard,
_handle_lifetime: PhantomData<&'a Handle>,
}
/// Enters the runtime context. This allows you to construct types that must
/// have an executor available on creation such as [`Sleep`] or
/// [`TcpStream`]. It will also allow you to call methods such as
/// [`tokio::spawn`] and [`Handle::current`] without panicking.
///
/// # Panics
///
/// When calling `Handle::enter` multiple times, the returned guards
/// **must** be dropped in the reverse order that they were acquired.
/// Failure to do so will result in a panic and possible memory leaks.
///
/// # Examples
///
/// ```
/// # #[cfg(not(target_family = "wasm"))]
/// # {
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new().unwrap();
///
/// let _guard = rt.enter();
/// tokio::spawn(async {
/// println!("Hello world!");
/// });
/// # }
/// ```
///
/// Do **not** do the following, this shows a scenario that will result in a
/// panic and possible memory leak.
///
/// ```should_panic,ignore-wasm
/// use tokio::runtime::Runtime;
///
/// let rt1 = Runtime::new().unwrap();
/// let rt2 = Runtime::new().unwrap();
///
/// let enter1 = rt1.enter();
/// let enter2 = rt2.enter();
///
/// drop(enter1);
/// drop(enter2);
/// ```
///
/// [`Sleep`]: struct@crate::time::Sleep
/// [`TcpStream`]: struct@crate::net::TcpStream
/// [`tokio::spawn`]: fn@crate::spawn
pub fn enter(&self) -> EnterGuard<'_> {
EnterGuard {
_guard: match context::try_set_current(&self.inner) {
Some(guard) => guard,
None => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
},
_handle_lifetime: PhantomData,
}
}Runtime::enter 通过返回一个 EnterGuard,
在当前 OS 线程的 TLS 中压入一个 Runtime 上下文,
使该线程临时进入 Tokio 的执行世界。 EnterGuard 采用 RAII 机制,在 drop 时自动退出该上下文,
并要求多次 enter 时必须按后进先出(LIFO)的顺序释放,
以维持 TLS Runtime 栈的一致性。 PhantomData<&Handle> 用于在编译期绑定 EnterGuard
与 Handle 的生命周期关系,
防止在 Handle 已被销毁后仍然持有 EnterGuard。 若在线程退出阶段 TLS 已被销毁,enter 将 panic,
这是线程生命周期保护,而非 Runtime 生命周期检查。
/// Returns a `Handle` view over the currently running `Runtime`.
///
/// # Panics
///
/// This will panic if called outside the context of a Tokio runtime. That means that you must
/// call this on one of the threads **being run by the runtime**, or from a thread with an active
/// `EnterGuard`. Calling this from within a thread created by `std::thread::spawn` (for example)
/// will cause a panic unless that thread has an active `EnterGuard`.
///
/// # Examples
///
/// This can be used to obtain the handle of the surrounding runtime from an async
/// block or function running on that runtime.
///
/// ```
/// # #[cfg(not(target_family = "wasm"))]
/// # {
/// # use std::thread;
/// # use tokio::runtime::Runtime;
/// # fn dox() {
/// # let rt = Runtime::new().unwrap();
/// # rt.spawn(async {
/// use tokio::runtime::Handle;
///
/// // Inside an async block or function.
/// let handle = Handle::current();
/// handle.spawn(async {
/// println!("now running in the existing Runtime");
/// });
///
/// # let handle =
/// thread::spawn(move || {
/// // Notice that the handle is created outside of this thread and then moved in
/// handle.spawn(async { /* ... */ });
/// // This next line would cause a panic because we haven't entered the runtime
/// // and created an EnterGuard
/// // let handle2 = Handle::current(); // panic
/// // So we create a guard here with Handle::enter();
/// let _guard = handle.enter();
/// // Now we can call Handle::current();
/// let handle2 = Handle::current();
/// });
/// # handle.join().unwrap();
/// # });
/// # }
/// # }
/// ```
#[track_caller]
pub fn current() -> Self {
Handle {
inner: scheduler::Handle::current(),
}
}这个方法只能在Tokio的runtime下工作,或者是手动的通过enter进入runtime,然后在这个上下文里面调用,否则将会panic
#[track_caller]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
} else {
self.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
}
}这个其实十分有意思,tokio将总体字节数大于2048B的Future交给堆,如果是小于的则直接被“按值内联”存储在Task结构体内部,这样能大幅增加调度速度
context.rs
use crate::loom::thread::AccessError;
use crate::task::coop;
use std::cell::Cell;
#[cfg(any(feature = "rt", feature = "macros"))]
use crate::util::rand::FastRand;
cfg_rt! {
mod blocking;
pub(crate) use blocking::{disallow_block_in_place, try_enter_blocking_region, BlockingRegionGuard};
mod current;
pub(crate) use current::{with_current, try_set_current, SetCurrentGuard};
mod runtime;
pub(crate) use runtime::{EnterRuntime, enter_runtime};
mod scoped;
use scoped::Scoped;
use crate::runtime::{scheduler, task::Id};
use std::task::Waker;
cfg_taskdump! {
use crate::runtime::task::trace;
}
}
cfg_rt_multi_thread! {
mod runtime_mt;
pub(crate) use runtime_mt::{current_enter_context, exit_runtime};
}
struct Context {
/// Uniquely identifies the current thread
#[cfg(feature = "rt")]
thread_id: Cell<Option<ThreadId>>,
/// Handle to the runtime scheduler running on the current thread.
#[cfg(feature = "rt")]
current: current::HandleCell,
/// Handle to the scheduler's internal "context"
#[cfg(feature = "rt")]
scheduler: Scoped<scheduler::Context>,
#[cfg(feature = "rt")]
current_task_id: Cell<Option<Id>>,
/// Tracks if the current thread is currently driving a runtime.
/// Note, that if this is set to "entered", the current scheduler
/// handle may not reference the runtime currently executing. This
/// is because other runtime handles may be set to current from
/// within a runtime.
#[cfg(feature = "rt")]
runtime: Cell<EnterRuntime>,
#[cfg(any(feature = "rt", feature = "macros"))]
rng: Cell<Option<FastRand>>,
/// Tracks the amount of "work" a task may still do before yielding back to
/// the scheduler
budget: Cell<coop::Budget>,
#[cfg(all(
tokio_unstable,
feature = "taskdump",
feature = "rt",
target_os = "linux",
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
))]
trace: trace::Context,
}
tokio_thread_local! {
static CONTEXT: Context = const {
Context {
#[cfg(feature = "rt")]
thread_id: Cell::new(None),
// Tracks the current runtime handle to use when spawning,
// accessing drivers, etc...
#[cfg(feature = "rt")]
current: current::HandleCell::new(),
// Tracks the current scheduler internal context
#[cfg(feature = "rt")]
scheduler: Scoped::new(),
#[cfg(feature = "rt")]
current_task_id: Cell::new(None),
// Tracks if the current thread is currently driving a runtime.
// Note, that if this is set to "entered", the current scheduler
// handle may not reference the runtime currently executing. This
// is because other runtime handles may be set to current from
// within a runtime.
#[cfg(feature = "rt")]
runtime: Cell::new(EnterRuntime::NotEntered),
#[cfg(any(feature = "rt", feature = "macros"))]
rng: Cell::new(None),
budget: Cell::new(coop::Budget::unconstrained()),
#[cfg(all(
tokio_unstable,
feature = "taskdump",
feature = "rt",
target_os = "linux",
any(
target_arch = "aarch64",
target_arch = "x86",
target_arch = "x86_64"
)
))]
trace: trace::Context::new(),
}
}
}
#[cfg(any(feature = "macros", all(feature = "sync", feature = "rt")))]
pub(crate) fn thread_rng_n(n: u32) -> u32 {
CONTEXT.with(|ctx| {
let mut rng = ctx.rng.get().unwrap_or_else(FastRand::new);
let ret = rng.fastrand_n(n);
ctx.rng.set(Some(rng));
ret
})
}
pub(crate) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, AccessError> {
CONTEXT.try_with(|ctx| f(&ctx.budget))
}
cfg_rt! {
use crate::runtime::ThreadId;
pub(crate) fn thread_id() -> Result<ThreadId, AccessError> {
CONTEXT.try_with(|ctx| {
match ctx.thread_id.get() {
Some(id) => id,
None => {
let id = ThreadId::next();
ctx.thread_id.set(Some(id));
id
}
}
})
}
pub(crate) fn set_current_task_id(id: Option<Id>) -> Option<Id> {
CONTEXT.try_with(|ctx| ctx.current_task_id.replace(id)).unwrap_or(None)
}
pub(crate) fn current_task_id() -> Option<Id> {
CONTEXT.try_with(|ctx| ctx.current_task_id.get()).unwrap_or(None)
}
#[track_caller]
pub(crate) fn defer(waker: &Waker) {
with_scheduler(|maybe_scheduler| {
if let Some(scheduler) = maybe_scheduler {
scheduler.defer(waker);
} else {
// Called from outside of the runtime, immediately wake the
// task.
waker.wake_by_ref();
}
});
}
pub(super) fn set_scheduler<R>(v: &scheduler::Context, f: impl FnOnce() -> R) -> R {
CONTEXT.with(|c| c.scheduler.set(v, f))
}
#[track_caller]
pub(super) fn with_scheduler<R>(f: impl FnOnce(Option<&scheduler::Context>) -> R) -> R {
let mut f = Some(f);
CONTEXT.try_with(|c| {
let f = f.take().unwrap();
if matches!(c.runtime.get(), EnterRuntime::Entered { .. }) {
c.scheduler.with(f)
} else {
f(None)
}
})
.unwrap_or_else(|_| (f.take().unwrap())(None))
}
cfg_taskdump! {
/// SAFETY: Callers of this function must ensure that trace frames always
/// form a valid linked list.
pub(crate) unsafe fn with_trace<R>(f: impl FnOnce(&trace::Context) -> R) -> Option<R> {
CONTEXT.try_with(|c| f(&c.trace)).ok()
}
}
}context.rs 定义的是 Tokio 的 线程本地运行时上下文模型(TLS Context Model),
每一个 OS 线程在首次访问时,都会懒加载创建一个独立的 Context 实例。
在多线程 Runtime 构造过程中,builder 所在的主线程会通过handle.enter() 将当前 Runtime 的 Handle 写入该线程的 CONTEXT.current,
从而使主线程临时进入 Tokio Runtime 上下文。
worker 线程则是在各自启动并进入调度循环时,通过内部机制
动态填充各自的 scheduler、thread_id、runtime 等字段。
二者并不是“谁给谁拷贝配置”,而是通过不同路径填充各自的 TLS Context。 以下是主要的属性于解释:
thread_id
thread_id: Cell<Option<ThreadId>>作用:给当前 OS 线程分配一个 Tokio 内部唯一线程 ID
用于:
- trace
- taskdump
- scheduler 统计
这是 Tokio 的“线程身份证”。
current: current::HandleCell
当前线程所“绑定”的 Runtime Handle
也就是说:
tokio::spawnHandle::currentSleep::new本质上都是:从这里读出当前 runtime 的 Handle 这正是enter()写入的地方。
scheduler: Scoped<scheduler::Context>
这个是:“当前线程正在驱动的调度器内部上下文” 注意这个关键词:Scoped 它的作用是:
- 支持 嵌套 runtime
- 支持:
外层 runtime └─ 内层 runtime::block_on(...)你之前担心的:
“嵌套 enter 会不会乱套?”
答案就在这:
就是靠 Scoped 做“栈式隔离”。
current_task_id
current_task_id: Cell<Option<Id>> 这是:“现在这个线程正在 poll 的 Task ID” 主要用于:
- tracing
- taskdump
- metrics
- debugging 和调度逻辑本身关系不大,是“可观测性设施”。
runtime: Cell<EnterRuntime>
runtime: Cell<EnterRuntime> 作用是:标记当前线程 是否正处于 runtime 驱动状态 也就是说:
- 防止你在 runtime 里“非法递归进入”
- 防止 Scheduler 上下文错乱
- 配合
with_scheduler做安全判断 你之前讨论的:“为什么 block_on 不能乱嵌套?” 核心开关就在这里。
rng: Cell<Option<FastRand>>
这是:当前线程的 快速随机源 服务于:
- 调度扰动
- 采样
- backoff 对应你之前读到的:
seed_generatorFastRand这是 TLS 侧的具体实现落点。
budget: Cell<coop::Budget>
这是: Tokio 的“协作式让出(cooperative scheduling)”核心控制器 负责:
- 限制某个 task 连续执行多少步
- 防止单个 Future 长时间霸占线程 这对应调度理论里的:调度公平性 / 防止 starvation
park.rs
这个文件应该要结合driver.rs阅读