记一次有趣的查bug之旅

tokio 库提供了 copy_bidirectional 方法,可以很方便地对于实现了 AsyncRead+AsyncWrite 的数据结构实现双向拷贝。在上一篇文章(还在持续修改)中,我基于 websocket 实现了一个简单代理,使用的 websocket 库是 tokio_tungstenite

原本在client端我自定义了类似双向拷贝的函数,而不是直接使用 copy_bidirectional ,这是由于 tokio_tungstenite 基于 TcpStream 构建的 WebSocketStream 并没有实现 AsyncRead+AsyncWrite 。另外,由于 tokio_tungstenite 返回的的 WebSocketStream 没有对外暴露好用的 read 和 write 方法,只是实现了 Sink 和 Stream,不过它对外提供了 split 方法,可以将 stream 的读写分别拆分开来,拆分出来的读无脑将读到的数据写到 TcpStream ,拆分出来的写无脑将从 TcpStream 读到的写到自己的 stream 。这样带来的问题是,在读写分离的情况下,不能够优雅的感知 Close 包,无法优雅的退出上述读写的流程。简单展示下原本的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// copy data from websocket to tcp
pub async fn server_read_from_websocket_to_tcp<T>(
mut tcp_stream: T,
websocket_stream: &mut SplitStream<WebSocketStream<TlsStream<TcpStream>>>,
) -> ProxyResult<()>
where
T: AsyncWrite + Unpin,
{
while let Some(msg) = websocket_stream.next().await {
match Packet::to_packet(msg?)? {
Packet::Connect(_) => {
return Err(ProxyError::Unknown(
"unexpected packet type `Connect`".into(),
))
}
Packet::Data(data) => tcp_stream.write_all(&data).await?,
Packet::Close() => return Ok(()),
}
}
Ok(())
}

// copy data from tcp to websocket
pub async fn server_read_from_tcp_to_websocket<T>(
mut tcp_stream: T,
websocket_sink: &mut SplitSink<WebSocketStream<TlsStream<TcpStream>>, Message>,
) -> ProxyResult<()>
where
T: AsyncRead + Unpin,
{
loop {
let mut buffer = vec![0; 1024];
let len = tcp_stream.read(&mut buffer).await?;
if len == 0 {
return Ok(());
}

unsafe {
buffer.set_len(len);
}
websocket_sink.send(Message::binary(buffer)).await?;
}
}

发现原本正常工作的 curl block 住了

于是我就想自己包一下这个 WebSocketStream 并为它实现 AsyncRead + AsyncWrite ,这样我就可以使用 copy_bidirectional 的能力来帮我处理一部分 corner case,比如连接异常断开之类的。总之,这样我就应该可以直接使用 copy_bidirectional了, inbound 是 TcpStream, outbound 是 WebSocketStream

1
let _ = copy_bidirectional(&mut inbound, &mut outbound).await

当我吭哧吭哧实现完,我的预期是立马就能跑起来(哈哈哈,蜜汁自信),我继续使用 curl 来测试代理

1
curl -vvv --socks5 127.0.0.1:8081 t.cn

本来预期返回 302 ,结果 curl 却 block 住了

1
2
3
4
5
6
7
8
9
*   Trying 127.0.0.1:8081...
* SOCKS5 connect to IPv4 47.95.48.149:80 (locally resolved)
* SOCKS5 request granted.
* Connected to 127.0.0.1 (127.0.0.1) port 8081 (#0)
> GET / HTTP/1.1
> Host: t.cn
> User-Agent: curl/7.78.0
> Accept: */*
>

难道是 AsyncRead + AsyncWrite 的实现有问题?简单过了下代码,也没有发现特别明显的问题,难道是库问题?当我产生这个想法后,开始了漫长的曲线救国之路

更具体的问题表现

代码的改动只在 client 端,但是以防万一还是尝试同时抓包下 client 端和 server 端是不是有问题。由于 client 和 server 建立的 websocket 连接的端口不好确定,所以这里直接考虑抓 server 收到和发出去的包

1
2
sudo tcpdump -i lo -X dst port 8080
sudo tcpdump -i lo -X src port 8080

发现 websocket 握手是没有问题的, server 也将从 t.cn 收到的 302 包发给了 client ,既然连接上有数据,那为什么 client 读不到呢?下面是 client AsyncRead 的实现(具体实现省略掉了),我这里在具体实现前加了行日志,这样当 poll_read 的时候就应该打印出来 poll.....

1
2
3
4
5
6
7
8
9
10
impl AsyncRead for WebSocketStreamConnection {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
println!("poll.....");
...
}
}

当我重新运行代码,并没有预期的日志被打印出来。这时候我交换了下在 copy_bidrectional 里的 inbound 和 outbound 的位置,重新运行代码,预期的日志打印出来了(这时候意识到可能是线程卡死了,比陷入无限循环中)。难道是 tokio 的实现有问题,更进一步的难道 tokio_tungstenite 的实现有问题?

也许是 tokio 的 copy_bidirectional 出问题了

(在这一步深入了 tokio copy_bidirectional 的细节。。因为想要分享出来这次经历,也看了看 tokio_tungstenite 的实现)

tokio copy_bidrectional

1
let _ = copy_bidirectional(&mut inbound, &mut outbound).await

copy_bidirectional 会构建一个 CopyBidirectional Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct CopyBidirectional<'a, A: ?Sized, B: ?Sized> {
a: &'a mut A,
b: &'a mut B,
a_to_b: TransferState,
b_to_a: TransferState,
}

enum TransferState {
Running(CopyBuffer),
ShuttingDown(u64),
Done(u64),
}

pub(super) struct CopyBuffer {
read_done: bool,
need_flush: bool,
pos: usize,
cap: usize,
amt: u64,
buf: Box<[u8]>,
}

在 poll 的具体实现里,会对 inbound 和 outbound 做等价的双向拷贝

1
2
let a_to_b = transfer_one_direction(cx, a_to_b, &mut *a, &mut *b)?;
let b_to_a = transfer_one_direction(cx, b_to_a, &mut *b, &mut *a)?;

transfer_one_direction 的实现里,通过一个大 loop ,实现了 TransferState 的状态转移,TransferState 会从 Running 转换到 ShuttingDown 最后到 Done。核心逻辑在 Running 这一步,这里通过 CopyBuffer 避免了在双向拷贝过程中多次初始化 buffer 的开销,在 CopyBufferpoll_copy 方法里,将数据从 reader 拷贝到 writer

1
let count = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut(), prefix.clone()))?;

poll_copy里 也是一个大 loop ,不停的 poll_read reader ,将读到的数据写到 writer ,同时修改 CopyBuffer 的状态,比如当读完的时候设置 read_done 为 true ,正常读完、数据写到 writer 的时候会修改 pos 和 cap ,pos 和 cap 之间的数据是预期要写到 writer 的数据

至此,copy_directional 的逻辑介绍的差不多了,看起来交换 copy_bidirectional 的参数两者应该完全等价呀(当然如果这时候我再细心一些。。就没有下面的文章了哈哈)。

那么是不是线程卡死在 poll_copy 了呢?

  • 第一步是想看线程卡死在从 inbound(TcpStream)到 outbound ( a -> b ),还是从 outbound 到 inbound ( b -> a )。于是在 transfer_one_direction 的签名里增加 prefix: String ,同时也要修改 buf poll_copy 的签名,增加 prefix: String
1
2
3
4
5
6
7
8
9
10
11
12
fn transfer_one_direction<A, B>(
cx: &mut Context<'_>,
state: &mut TransferState,
r: &mut A,
w: &mut B,
prefix: String,
) -> Poll<io::Result<u64>>
where
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
pub(super) fn poll_copy<R, W>(
&mut self,
cx: &mut Context<'_>,
mut reader: Pin<&mut R>,
mut writer: Pin<&mut W>,
prefix: String,
) -> Poll<io::Result<u64>>
where
R: AsyncRead + ?Sized,
W: AsyncWrite + ?Sized,
{
println!("out {} start copy lalalala", prefix);
loop {
// If our buffer is empty, then we need to read some data to
// continue.
println!("in {} start copy lalalala", prefix);
...
}
}

这一步需要 patch tokio 的代码,以便 debug 的时候可以方便修改 copy_bidirectional 的逻辑

1
2
[patch.crates-io]
tokio = { path="/home/suika/code/github/tokio/tokio" }

重新运行发现,一直在疯狂打印 a -> b,也就是卡死在了从 TcpStream 拷贝数据到 WebSocketStream 的逻辑里。

  • 又在 poll_copy 的 loop 里增加了很多日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
pub(super) fn poll_copy<R, W>(
&mut self,
cx: &mut Context<'_>,
mut reader: Pin<&mut R>,
mut writer: Pin<&mut W>,
prefix: String,
) -> Poll<io::Result<u64>>
where
R: AsyncRead + ?Sized,
W: AsyncWrite + ?Sized,
{
debug_assert!(true, "abc");
println!("out {} start copy lalalala", prefix);
loop {
// If our buffer is empty, then we need to read some data to
// continue.
println!("in {} start copy lalalala", prefix);
if self.pos == self.cap && !self.read_done {
...
}
println!("#2 {} start copy lalalala", prefix);

let n = buf.filled().len();
...
}

// If our buffer has some data, let's write it out!
while self.pos < self.cap {
println!("#3 {} start copy lalalala", prefix);
...
}
println!("#4 {} start copy lalalala, pos {} cap {} read_done {}", prefix, self.pos, self.cap, self.read_done);

// If we've written all the data and we've seen EOF, flush out the
// data and finish the transfer.
if self.pos == self.cap && self.read_done {
ready!(writer.as_mut().poll_flush(cx))?;
return Poll::Ready(Ok(self.amt));
}
}
}

重新运行,发现在疯狂输出 #4 这行日志,而且 pos 是 69 ,cap 是 68 !内心窃喜,看来是 tokio 的 bug ?可是我之前写的一个简单的 echo server 是可以正常工作的,那一定还是我的问题了。这时候仔细看了看下面的代码,突然想起来我在为 WebSocketStream 实现 poll_write 的时候,返回的长度是 buf.len() + 1(1是我这边自定义的包头长度),问题一目了然。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// If our buffer has some data, let's write it out!
while self.pos < self.cap {
println!("#3 {} start copy lalalala", prefix);
let me = &mut *self;
let i = ready!(writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]))?;
if i == 0 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"write zero byte into writer",
)));
} else {
self.pos += i;
self.amt += i as u64;
self.need_flush = true;
}
}
println!("#4 {} start copy lalalala, pos {} cap {} read_done {}", prefix, self.pos, self.cap, self.read_done);

当时就觉得很好笑=。=,不过查问题的过程还是很有意义的。这时候就会闪过某人讲的话,you can‘t master it if you don’t understand it

Author: suikammd
Link: https://www.suikammd.com/2021/09/22/record-a-weird-bug-fix-travel/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.