实现一个简单的 channel & 介绍下 tokio channel 的实现

Golang 并发编程的核心数据结构就是 channel 了,核心观点是通过通信来共享内存,而不是通过共享内存来通信。channel 确实好用,避免了直接写各种复杂的通信原语,在 rust 中当然也有相应的实现,tokio channel 的实现非常巧妙,很值得学习。本文目的是简单介绍下 channel 的基本概念,然后实现一个最简版 mpsc unbuffered channel ,最后再介绍下 tokio 里 channel 的实现

最简版 rust channel 实现

这一部分实现一个最简版 channel ,这里的 channel 是一个有着无限 buffer 的 channel ,允许多个写入者,并且写入 channel 的操作永远不会阻塞,但是如果 channel 是空的,从 channel 消费数据的消费者会被阻塞,这里其实就是实现一个 mpsc unbuffered channel。本部分参考了 陈天 · Rust编程第一课 35 讲,陈天这一课讲得非常通俗易懂,以 TDD 的模式,从接口测试样例开始,逐步完成整个 channel 的设计。这里同样也这么做,采用 TDD 模式,并且对齐标准库的 mspc 的接口。

channel 功能及实现

我们预期实现这些功能

  1. sender 产生数据, receiver 消费数据
  2. 允许多个 sender
  3. channel 为空
  4. sender 全部退出
  5. receiver 退出

sender 产生数据, receiver 消费数据

同样采用 TDD 的模式,先考虑我们需要有哪些功能,假定功能已经实现了,编写相应的 test

1
2
3
4
5
6
#[test]
fn test_chanel_basic() {
let (mut s, mut r) = unbounded();
s.send(1).unwrap();
assert_eq!(r.recv().unwrap(), 1);
}

通过 unbounded 创建 channel ,拿到相应的 sender 和 receiver ,sender 通过 send 发送数据 ,receiver 通过 recv 接收数据。为什么需要 unwrap ?在 receiver 被 drop 或者 sender 全部被 drop 的时候,相应的发送和接受需要返回 Error

简单起见,我们可以直接用 VecDeque 实现,由于 sender 和 receiver 需要共同修改这个 buffer ,所以需要加锁;由于需要在不同 thread 之间共享,sender、receiver 需要通过 Arc 持有 Shared

1
2
3
4
5
6
7
8
9
10
11
struct Shared<T> {
inner: Mutex<VecDeque<T>>,
}

struct Sender<T> {
inner: Arc<Shared<T>>,
}

struct Receiver<T> {
inner: Arc<Shared<T>>,
}

允许多个 sender

通过 unbounded 只能拿到一个 sender ,如果支持多个 sender 就意味着 sender 需要实现 Clone

1
2
3
4
5
6
7
8
9
10
11
12
13
#[test]
fn test_clone() {
let (mut s, mut r) = unbounded();
let mut s1 = s.clone();
let mut s2 = s.clone();
s.send(1).unwrap();
s1.send(2).unwrap();
s2.send(3).unwrap();

assert_eq!(1, r.recv().unwrap());
assert_eq!(2, r.recv().unwrap());
assert_eq!(3, r.recv().unwrap());
}

channel 为空

channel 为空时需要阻塞消费者,那么消费者需要如何被唤醒呢?通过 Condvar !Condvar 可以阻塞当前线程,然后等待事件发生再唤醒当前线程,这个等待不会占用额外的 cpu,Condvar wait 参数时 MutexGuard ,在 wait 的时候会释放掉锁,在被 notify 或者 notify_all 的时候又会重新获取锁,使用者不用担心锁的问题,感觉挺有意思的,具体实现可以之后详细研究下

1
2
3
4
struct Shared<T> {
inner: Mutex<VecDeque<T>>,
available: Condvar,
}

那么 test 该如何写呢?我们可以让 receiver 不停消费,消费完一批数据之后,等待一段时间,sender 再发送一批数据,receiver 可以继续消费。因为 receiver 只有一个,我们为 receiver 实现 Iterator 会非常有用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#[test]
fn test_receiver_block() {
let (mut s, mut r) = unbounded();

thread::spawn(move || {
for (idx, i) in r.into_iter().enumerate() {
assert_eq!(idx, i)
}
});

for i in 0..100usize {
s.send(i).unwrap();
}

thread::sleep(Duration::from_millis(1));

for i in 100..200usize {
s.send(i).unwrap();
}
}

sender 全部退出

sender 全部退出的时候,receiver recv 需要报错,简单的做法是保存 sender 的数量。在新的 sender 被创建 / clone 出来的时候,数量+1 ,sender 被 drop 的时候数量 -1 ,因此我们需要为 sender 实现自定义 Drop 方法,并且在 sender 数量变为 0 的时候,通知正在等待的 receiver 退出。下面就是最终的结构了,虽然不够高效,但是对于普通的 thread 级别的共享 channel 而言已经是一个可以 work 的版本了

1
2
3
4
5
6
struct Shared<T> {
inner: Mutex<VecDeque<T>>,
available: Condvar,
sender_cnt: AtomicUsize,
receiver_cnt: AtomicUsize,
}

test 可以这么写

1
2
3
4
5
6
#[test]
fn test_no_send() {
let (s, r) = unbounded();
drop(s);
assert!(r.recv().is_err());
}

receiver 退出

同上,receiver 退出的时候,receiver 对应的 receiver_cnt 需要减 -1,sender send 的时候,如果发现 receiver 全部退出,也需要报错。test 如下

1
2
3
4
5
#[test]
fn test_no_receiver() {
let (s, _) = unbounded();
assert!(s.send().is_err());
}

其他更详细的内容可以参考陈天老师的极客时间课程。我们也可以延伸下,如果想做一个粗糙的 mpsc bounded channel,只需要简单再增加一个 condvar 就可以了,两个 condvar 一个表示 buffer 中有数据可读,一个表示有数据可写。不过通过 condvar 粒度太大了,我们可以通过信号量,sender 想要发送数据的时候需要拿到 permit 才能发,不然只能等着

tokio channel 实现介绍

上面介绍的只是一个 toy channel ,生产环境下对 channel 性能有着更高的要求

底层数据结构

tokio 里面的 channel 实现非常巧妙,channel 底层是由链表构成的,每个链表上的元素叫做 block ,block 可以存放 channel 的多个元素。当尝试读数据的时候,会从链表的头部开始读,当某个 block 被读完的时候,会被重置放到链表的尾部,可以节省内存分配的消耗;当尝试写数据的时候,会从链表的尾部开始写,当前 block 不足的时候会分配新的 block 或者使用被重置的 block。整个结构图如下所示,代码里有很多细节值得参考。

tokio-channel-implemantation

unbounded channel

tokio-channel-waker

写入数据

sender 向 channel 写数据无非就是操作底层数据结构,主要是找到当前可以写入的 block 的相应的位置,然后向该位置写入数据,数据写入之后,如果有等待读取数据的 receiver ,则会通过 waker 唤醒这个 receiver

  • sender 实现了 clone ,并且这些 sender 都可以被安全的传递到其他线程中,如果有并发发送数据的情况,是否需要加锁呢?

    答案是不用的,每个 sender 知道自己要写入哪个位置就可以了,也就是通过 tail_position 这个 AtomiUsize 类型的变量来共享下一次写入位置,每次写入只要 fetch_add + 1 即可

    1
    2
    3
    4
    5
    6
    7
    8
    9
    /// List queue transmit handle.
    pub(crate) struct Tx<T> {
    /// Tail in the `Block` mpmc list.
    block_tail: AtomicPtr<Block<T>>,

    /// Position to push the next message. This references a block and offset
    /// into the block.
    tail_position: AtomicUsize,
    }
  • 唤醒 receiver

    在 receiver 拿不到数据的时候,会注册 waker 到底层的 channel 。在有数据的可读的时候,通过这个 waker 唤醒注册它的 receiver

    1
    2
    3
    4
    pub(crate) struct AtomicWaker {
    state: AtomicUsize,
    waker: UnsafeCell<Option<Waker>>,
    }

读取数据

receiver 尝试从 channel 中读数据,如果读得到就会退出,读不到就会把 waker 注册到 channel

bounded channel

与 unbounded channel 相比,写入数据可能也会被阻塞。这里我们需要关注,如何在可写时如何唤醒 sender ,在可读时如何唤醒 receiver,唤醒 receiver 的逻辑大致同 unbounded channel,因此先略过。sender 如何被阻塞呢?简单概括下是通过信号量,sender 想要发送数据的时候需要拿到一个“允许”发送的许可,不然只能被阻塞。与 unbounded channel 相比,bounded receiver 消费了数据之后,还要增加 permit ,唤醒 waitlist 里在等待写入数据的 sender 。这里只是大致了解了下实现,没有深究,以后有更多时间了会再补充进去。

总结

这篇文档简单介绍了基于 VecDeque 实现一个简单的 channel ,这个 channel 是 thread 级别的,有大粒度的锁,性能比较拉垮。tokio channel 的实现有很多值得借鉴的地方,代码还是很值得仔细品味的。

Author: suikammd
Link: https://www.suikammd.com/2021/11/20/rust-simple-channel/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.