
tokio源码阅读3
tokio源码阅读3
scheduler
这个在tokio里面是一整个文件夹,受限于时间原因,目前只挑一些较为重要的部分进行阅读。 我们前面知道了scheduler/mod.rs是一个抽象封装库,所以这里就不赘述了。 当scheduler进来的时候,会根据前面的配置进入对应的Context,而Context具体的定义在worker里面;
multi_thread/worker.rs
pub(crate) struct Context {
/// Worker
worker: Arc<Worker>,
/// Core data
core: RefCell<Option<Box<Core>>>,
/// Tasks to wake after resource drivers are polled. This is mostly to
/// handle yielded tasks.
pub(crate) defer: Defer,
}这里是scheduler的基本构成,我们接着下去看:
pub(super) struct Worker {
/// Reference to scheduler's handle
handle: Arc<Handle>,
/// Index holding this worker's remote state
index: usize,
/// Used to hand-off a worker's core to another thread.
core: AtomicCell<Core>,
}大致的执行流程:
while alive:
poll_driver_if_needed
if local_task:
poll
else if stolen_task:
poll
else:
parktask/harness.rs
Worker 拿到 Notified(Task):
Harness.poll():
match poll_inner():
Notified → scheduler.yield_now(task_again)
Complete → complete() // 写 output, 唤醒 join, release
Dealloc → dealloc() // refcount 归零
Done → 什么都不做
poll_inner():
if state.transition_to_running() 失败:
// 要么 cancelled,要么需要 dealloc,要么别人正在跑
else:
res = poll_future(core, cx_with_task_waker)
if res == Ready:
return Complete
// Pending:
if transition_to_idle() == Cancelled:
cancel_task(core)
return {Done | Notified | Dealloc}task/raw.rs
L Scheduler:
拿到 Notified(Task)
M Worker poll Task:
→ Task.run() / RawTask::poll()
→ vtable.poll(ptr)
→ Harness::<T,S>::poll()
→ poll_future() → Future::poll()handle.rs
Runtime::new(builder)
→ Driver::new(Cfg { enable_io, enable_time, ... })
→ create_io_stack(...)
→ io::Driver (epoll/kqueue)
→ signal::Driver
→ process::Driver
→ create_clock(...)
→ create_time_driver(...)
→ time::Driver { timers + clock + IoStack }time/wheel/mod.rs
Wheel:
插入 timer(TimerHandle):
when = item.when()
if when <= elapsed:
return Err(Elapsed) // 调用者立即 fire
level = level_for(elapsed, when)
levels[level].add(item)
poll(now_ms):
loop:
if pending 队列非空:
return 一条 pending
next = next_expiration()
if next.deadline <= now_ms:
// 这一轮要处理
process_expiration(next)
elapsed = next.deadline
else:
elapsed = now_ms
break
// 可能 process_expiration 中塞了新 pending
return pending.pop_back()完整的流程图:
总结
所有 async 函数在编译期都会变成实现了 Future 的状态机。整个程序就是一个大 Future 里套很多小 Future,.await 就是在当前 Future 的 poll 里去 poll 子 Future。
Tokio 这边会把最外层的 Future 封装成一个 Task 丢进调度器。每个 worker 在自己的循环里从队列拿任务,然后对它调用 Future::poll。如果在 poll 过程中需要等待 IO 或定时器,Future 会用 cx.waker() 把自己的 waker 注册到对应的 driver 里,并返回 Poll::Pending,worker 就去处理其他任务或进入 park。
当 IO/Timer 真正就绪时,driver 会通过之前存的 waker 调用 wake(),把对应任务重新丢回调度队列。某个 worker 被 unpark,拿到这个任务,再次 poll 它,从上次 .await 停下的位置继续执行。如此往复,直到 Future 返回 Ready,Tokio 负责写出结果、唤醒 JoinHandle,并在引用计数清零后释放这整个 Task。 如果暂时所有队列都空了,worker 通过 Driver::park 进入休眠,等下一次有任务或 IO/Timer 事件再被唤醒。