tokio 库提供了 copy_bidirectional 方法,可以很方便地对于实现了 AsyncRead+AsyncWrite 的数据结构实现双向拷贝。在上一篇文章(还在持续修改)中,我基于 websocket 实现了一个简单代理,使用的 websocket 库是 tokio_tungstenite 。
原本在client端我自定义了类似双向拷贝的函数,而不是直接使用 copy_bidirectional ,这是由于 tokio_tungstenite 基于 TcpStream 构建的 WebSocketStream 并没有实现 AsyncRead+AsyncWrite 。另外,由于 tokio_tungstenite 返回的的 WebSocketStream 没有对外暴露好用的 read 和 write 方法,只是实现了 Sink 和 Stream,不过它对外提供了 split 方法,可以将 stream 的读写分别拆分开来,拆分出来的读无脑将读到的数据写到 TcpStream ,拆分出来的写无脑将从 TcpStream 读到的写到自己的 stream 。这样带来的问题是,在读写分离的情况下,不能够优雅的感知 Close 包,无法优雅的退出上述读写的流程。简单展示下原本的实现:
1 | // copy data from websocket to tcp |
发现原本正常工作的 curl block 住了
于是我就想自己包一下这个 WebSocketStream 并为它实现 AsyncRead + AsyncWrite ,这样我就可以使用 copy_bidirectional 的能力来帮我处理一部分 corner case,比如连接异常断开之类的。总之,这样我就应该可以直接使用 copy_bidirectional了, inbound 是 TcpStream, outbound 是 WebSocketStream
1 | let _ = copy_bidirectional(&mut inbound, &mut outbound).await |
当我吭哧吭哧实现完,我的预期是立马就能跑起来(哈哈哈,蜜汁自信),我继续使用 curl 来测试代理
1 | curl -vvv --socks5 127.0.0.1:8081 t.cn |
本来预期返回 302 ,结果 curl 却 block 住了
1 | * Trying 127.0.0.1:8081... |
难道是 AsyncRead + AsyncWrite 的实现有问题?简单过了下代码,也没有发现特别明显的问题,难道是库问题?当我产生这个想法后,开始了漫长的曲线救国之路
更具体的问题表现
代码的改动只在 client 端,但是以防万一还是尝试同时抓包下 client 端和 server 端是不是有问题。由于 client 和 server 建立的 websocket 连接的端口不好确定,所以这里直接考虑抓 server 收到和发出去的包
1 | sudo tcpdump -i lo -X dst port 8080 |
发现 websocket 握手是没有问题的, server 也将从 t.cn 收到的 302 包发给了 client ,既然连接上有数据,那为什么 client 读不到呢?下面是 client AsyncRead 的实现(具体实现省略掉了),我这里在具体实现前加了行日志,这样当 poll_read 的时候就应该打印出来 poll.....
1 | impl AsyncRead for WebSocketStreamConnection { |
当我重新运行代码,并没有预期的日志被打印出来。这时候我交换了下在 copy_bidrectional 里的 inbound 和 outbound 的位置,重新运行代码,预期的日志打印出来了(这时候意识到可能是线程卡死了,比陷入无限循环中)。难道是 tokio 的实现有问题,更进一步的难道 tokio_tungstenite 的实现有问题?
也许是 tokio 的 copy_bidirectional 出问题了
(在这一步深入了 tokio copy_bidirectional 的细节。。因为想要分享出来这次经历,也看了看 tokio_tungstenite 的实现)
tokio copy_bidrectional
1 | let _ = copy_bidirectional(&mut inbound, &mut outbound).await |
copy_bidirectional 会构建一个 CopyBidirectional Future
1 | struct CopyBidirectional<'a, A: ?Sized, B: ?Sized> { |
在 poll 的具体实现里,会对 inbound 和 outbound 做等价的双向拷贝
1 | let a_to_b = transfer_one_direction(cx, a_to_b, &mut *a, &mut *b)?; |
在 transfer_one_direction 的实现里,通过一个大 loop ,实现了 TransferState 的状态转移,TransferState 会从 Running 转换到 ShuttingDown 最后到 Done。核心逻辑在 Running 这一步,这里通过 CopyBuffer 避免了在双向拷贝过程中多次初始化 buffer 的开销,在 CopyBuffer 的 poll_copy 方法里,将数据从 reader 拷贝到 writer
1 | let count = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut(), prefix.clone()))?; |
在 poll_copy里 也是一个大 loop ,不停的 poll_read reader ,将读到的数据写到 writer ,同时修改 CopyBuffer 的状态,比如当读完的时候设置 read_done 为 true ,正常读完、数据写到 writer 的时候会修改 pos 和 cap ,pos 和 cap 之间的数据是预期要写到 writer 的数据
至此,copy_directional 的逻辑介绍的差不多了,看起来交换 copy_bidirectional 的参数两者应该完全等价呀(当然如果这时候我再细心一些。。就没有下面的文章了哈哈)。
那么是不是线程卡死在 poll_copy 了呢?
- 第一步是想看线程卡死在从 inbound(TcpStream)到 outbound ( a -> b ),还是从 outbound 到 inbound ( b -> a )。于是在
transfer_one_direction的签名里增加prefix: String,同时也要修改 bufpoll_copy的签名,增加prefix: String
1 | fn transfer_one_direction<A, B>( |
1 | pub(super) fn poll_copy<R, W>( |
这一步需要 patch tokio 的代码,以便 debug 的时候可以方便修改 copy_bidirectional 的逻辑
1 | [patch.crates-io] |
重新运行发现,一直在疯狂打印 a -> b,也就是卡死在了从 TcpStream 拷贝数据到 WebSocketStream 的逻辑里。
- 又在
poll_copy的 loop 里增加了很多日志
1 | pub(super) fn poll_copy<R, W>( |
重新运行,发现在疯狂输出 #4 这行日志,而且 pos 是 69 ,cap 是 68 !内心窃喜,看来是 tokio 的 bug ?可是我之前写的一个简单的 echo server 是可以正常工作的,那一定还是我的问题了。这时候仔细看了看下面的代码,突然想起来我在为 WebSocketStream 实现 poll_write 的时候,返回的长度是 buf.len() + 1(1是我这边自定义的包头长度),问题一目了然。
1 | // If our buffer has some data, let's write it out! |
当时就觉得很好笑=。=,不过查问题的过程还是很有意义的。这时候就会闪过某人讲的话,you can‘t master it if you don’t understand it