实现一个简单的 rust executor

本文大部分内容参考 aysnc-book,介绍了如何实现一个简单的 executor,项目地址 simple-executor

Rust 异步编程涉及到三个比较重要的概念: Future 、 executor 和 reactor 。executor 在 poll Future 的时候,会传递给 Future 一个 waker ,之后由reactor 会等待 Future 涉及的 event ,在 event 完成的时候再通过这个 waker 唤醒 executor 来继续执行这个 Future。

Future

Rust 异步编程的核心是 Future trait。Future trait 有一个关联类型 Output ,它就是 Future 完成的时返回的结果。executor 会逐步 poll 这个 Future,直到 Future 完成

1
2
3
4
5
6
7
8
pub trait Future {
#[stable(feature = "futures_api", since = "1.36.0")]
type Output;

#[lang = "poll"]
#[stable(feature = "futures_api", since = "1.36.0")]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

当提交 Future 给 executor 的时候,executor 会 poll 这个 Future,如果返回 Poll::Ready(Result),这个 Future 就结束了,否则返回 Poll::Pending 的话,就需要在 Future 下次可以 poll 的时候重新把 Future 提交给 executor,做这件事的其实就是通过调用 Waker 的 wake 方法

Waker 是什么

当一个 task 可以执行的时候,Waker 负责唤醒这个 task ,把这个 task 提交给 executor ,比如通过使用 Waker 的 wake 或者 wake_by_ref 方法,区别是 wake 方法会吃掉 waker 的所有权

实现一个 TimerFuture

定义 TimerFuture

下面实现一个简单的 TimerFuture 。简单起见,这里在初始化 Timer 的时候,就会 spwan 出去一个线程,sleep 指定时间之后,会重新将 Timer 提交给 executor,所以这时候需要一个状态来表示 sleep 是否结束,还需要存一个 Waker,用来在 sleep 结束后重新提交 Timer 。定义如下:

1
2
3
4
pub struct TimerFuture {
completed: bool,
waker: Option<Waker>,
}

由于我们需要在另一个线程修改 TimerFuture 的状态 completed,并且在完成的时候通过 waker 再提交 Future 给 executor,所以我们可以把这两个字段封装起来

1
2
3
4
5
6
7
8
struct Inner {
completed: bool,
waker: Option<Waker>,
}

pub struct TimerFuture {
inner: Inner
}

由于我们要跨线程共享和修改,所以还需要 Arc 和 Mutex 包一下

1
2
3
pub struct TimerFuture {
inner: Arc<Mutex<Inner>>
}

到此为止一个简单的 TimerFuture 结构体我们就定义出来了。接下来,就要为 TimerFuture 实现 Future trait 了

实现 Future trait

我们不需要 TimerFuture 产生什么结果,所以关联类型 Output 直接写成空元组 () 就可以了。poll 的时候判断下 completed 的状态,如果是 true ,就返回 Poll::Ready(()) ,否则返回 Pending ,同时将 context 的 waker 复制给 TimerFuture 的 waker,以便再 sleep 结束的时候可以通过 waker 唤醒本 task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
impl Future for TimerFuture {
type Output = ();

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut shared_state = self.inner.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}

executor

Rust 的 Future 是 lazy 的,可以通过在 async 函数里 await Future 来驱动它的执行。 那么谁来执行 Future 呢?答案就是 Future executor 。executor 会在有进展(比如 socket 有数据可读、socket 可写)的时候 poll Future 。在最开始的时候 executor 会执行 poll 一次 Future ,之后在 Future 有进展的时候会通过 wake 方法重新提交 Future 给 executor。

这里我们首先可以抽象出来一个 spwaner 负责首次提交 Future,由于 Future 本身并不知道如何将自己提交给 executor ,所以我们可以将 Future 和与 executor 交互的方式封装在 Task 结构体里,我们不是直接提交 Future 而是提交 Task 。那么如何与 executor 交互呢?同样也是,简单起见,我们通过 channel 来做件事,spwaner 持有 channel 发送端的引用,每个 Task 同样也持有 channel 发送端的引用。

到这里 executor 做的事情就显而易见了,executor 持有 channel 的接收端,在一个大 loop 里不断消费从 channel 拿到的 Task 执行。

实现 Task

根据上文,我们可以简单的将 Future pin 在堆上拿到 BoxFuture,将 BoxFuture 的生命周期定义为 static ,Output 定义为空元组。另外由于 Future 可能需要在不同的 thread 上执行,所以需要实现 Send 和 Sync trait,我们可以通过 Mutex 包装下 Future ,通过 Mutex 获得 Sync 和 Send 的能力。那么我们可以定义 Task 如下:

1
2
3
4
struct Task {
future: Mutex<Option<BoxFuture<'static, ()>>>,
sync_sender: SyncSender<Task>, // 通过 sender 提交 Task 给 executor
}

为了能够 poll Future ,我们需要 waker ,当 wake 的时候 waker 可以告知 executor Future 可以再次执行了。我们预期 Task 在 wake 的时候可以再次提交自己给 executor ,因此我们可以为 Task 实现 ArcWake trait ,这样我们可以通过 wake_ref() 或者 into_waker()Arc<impl ArcWake> 转换成 Waker。具体实现如下,在 wake 的时候将 task 再次发送给 channel。

1
2
3
4
5
6
7
8
9
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("too many tasks queued");
}
}

实现 Spawner

Spwaner 持有 channel 的发送端,内容是 Arc<Task>

1
2
3
struct Spwaner {
sync_sender: SyncSender<Arc<Task>>
}

为什么需要 Arc ?方便后面直接通过 waker_ref() 构造 Waker

1
2
3
4
5
6
pub fn waker_ref<W>(wake: &Arc<W>) -> WakerRef<'_>
where
W: ArcWake,
{
...
}

还需要为 Spawner 实现 spwan 方法,让它可以提交 Future

1
2
3
4
5
6
7
8
9
10
impl Spawner {
pub fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(future),
sync_sender: self.sync_sender.clone(),
});
self.sync_sender.send(task).expect("too many tasks queued");
}
}

实现 Executor

Executor 持有 channel 的接收端

1
2
3
pub struct Executor {
receiver: Receiver<Arc<Task>>,
}

给 executor 实现 run 方法,拿到 task 的时候,首先把 future 从里面拿出来,构造 waker ,再构造 context ,接着 poll 这个 Future。回想下 TimerFuture 的实现,再 poll 的时候如果返回 pending ,我们会把 waker 传递给它,在它 sleep 结束通过这个 waker 唤醒 task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
impl Executor {
pub fn run(&self) {
while let Ok(task) = self.receiver.recv() {
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
let waker = waker_ref(&task);
let cx = &mut Context::from_waker(&waker);
if future.as_mut().poll(cx).is_pending() {
*future_slot = Some(future);
}
}
}
}
}

让这个简单的 executor 跑起来吧

首先需要新建 Spwaner 和 Executor

1
2
3
4
5
pub fn new_executor_and_spawner() -> (Executor, Spawner) {
const MAX_QUEUED_TASKS: usize = 10_000;
let (sync_sender, receiver) = sync_channel(MAX_QUEUED_TASKS);
(Executor { receiver }, Spawner { sync_sender })
}

这样我们可以完成 main 方法了

1
2
3
4
5
6
7
8
9
10
11
12
13
fn main() {
let (executor, spawner) = new_executor_and_spawner();

spawner.spawn(async {
println!("{:?}", Instant::now());
TimerFuture::new(Duration::new(2, 0)).await;
println!("{:?}", Instant::now());
});

drop(spawner); // drop 的目的是为了让 executor 知道没有任务需要执行了

executor.run();
}

总结

实现一个简单的 executor 还是比较容易的,下一步计划基于 mio 写一个真的可以运行的 runtime ,国庆有事干了嘿嘿

Author: suikammd
Link: https://www.suikammd.com/2021/09/28/simple-executor/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.