使用rust实现sock5代理

熟悉一门语言的第一步可能是用它写个代理吧。看了很久 rust 的语法,工作上派不上啥用场,只能自己制造需求了,希望以后能找份写 rust 的工作,或者可以自由学东西的工作。

这个代理有什么特点?

  • 基于 rust

主要目的是练习写 rust ,另外 rust 安全高效

  • 基于 tls websocket

过神秘的墙需要加密连接,否则容易毙掉。常见的做法类似 shadowsocks ,自定义加密协议,但是这种比较麻烦,偷懒直接使用 TLS websocket 来做这件事

  • 池化 websocket 连接

池化连接的好处不容多说,少了建立连接的时间,代理速度更快

代理大体流程是这样的:

rust-proxy-pipeline

代码地址:https://github.com/suikammd/rust-proxy

前置准备

(不是必须)

client端整体流程

proxy-client

代码设计

浏览器和 client 建立的连接是普通的 sock5 连接,server 和浏览器想要访问的目的地是普通的 Tcp 连接,但是 client 和 server 之间是池化的 WebSocket 连接。因为我们需要做的事情主要有 4 件:

(1)解析 sock5 协议,拿到浏览器想要访问的目的地址

(2)自定义数据包,同一个 WebSocket 连接会对应多个浏览器到 client 的 socks5 连接,因此在新的 sock5 连接建立的时候,需要让 server 知道浏览器想要访问的地址( Connect 包),在 sock5 连接断开的时候,需要发送结束包( Close 包),让 server 重新返回等待 Connect 包的状态

(3)池化 WebSocket 连接,涉及到如何实现一个通用的连接池

(4)tokio 提供了双向拷贝方法,需要数据结构实现 AsyncRead + AsyncWrite trait,由于 tokio_tungstenite 定义的 WebSocketStream 并没有实现这两个 trait,所以还需要实现下这两个 trait

解析 socks5 协议

  • 浏览器与 client 确认协议版本及认证方式
VER NMETHODS METHODS
1 1 1-255

VER:SOCKS 版本,这里是 0x05

NMETHODS:METHODS 的数量

METHODS:客户端支持的认证方式

  • client 会从浏览器指定的认证方法中选择一个方法回复
VER METHOD
1 1

METHOD:client 选择的认证方式

目前本文采用了 0x00(即不需要认证)

  • 浏览器发送请求消息,告诉 client 其希望访问的地址
VER CMD RSV ATYP DST.ADDR DST.PORT
1 1 0x00 1 动态 2

CMD:SOCKS 命令码,0x01( Connect 请求)、0x02( BIND 请求)、0x03( UDP 转发)

RSV:保留字符 0x00

ATYP:DST.ADDR类型,0x01(IPv4)、0x03(域名)、0x04(IPv6)

DST.ADDR:目的地址

DST.PORT:目的端口

  • client 在请求完浏览器想要访问的地址后,会将结果返回给浏览器
VER REP RSV ATYP BND.ADDR BND.PORT
1 1 0x00 1 动态 2

内容几乎同浏览器的请求包

自定义包结构

从 client 到 server 的数据包有三种类型,Connect 包(告诉 server 浏览器想要访问的目的地址)、Data 包(浏览器请求数据)、Close 包( socks5 连接断开/结束),如下所示。

1
2
3
4
5
6
7
8
9
10
11
pub enum Packet {
Connect(Addr),
Data(Vec<u8>),
Close(),
}

pub enum Addr {
IpV4(([u8; 4], u16)),
Domain((String, u16)),
IpV6(([u8; 16], u16)),
}

顺便一提,本仓库是基于 tokio_tungstenite 这个 websocket 库实现的。这个库在设计上有很多不合理的地方,比如没有办法做 buffer 复用,可以看以下代码,当我构造一个 Binary 包的时候,我需要传 Vec 进去,即使传别的包也会通过 bin.into() 转换成 Vec,相当于又做了次拷贝,不如直接传 Vec 进去。如果之后优化代理,会考虑重写这个 websocket 库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
pub enum Message {
/// A text WebSocket message
Text(String),
/// A binary WebSocket message
Binary(Vec<u8>),
/// A ping message with the specified payload
///
/// The payload here must have a length less than 125 bytes
Ping(Vec<u8>),
/// A pong message with the specified payload
///
/// The payload here must have a length less than 125 bytes
Pong(Vec<u8>),
/// A close message with the optional close frame.
Close(Option<CloseFrame<'static>>),
}

impl Message {
/// Create a new binary WebSocket message by converting to Vec<u8>.
pub fn binary<B>(bin: B) -> Message
where
B: Into<Vec<u8>>,
{
Message::Binary(bin.into())
}
}

连接池实现

顾名思义,连接池就是池子里放着一大堆已经建立好的连接,在代理这个场景下(对端固定)连接池的作用毋庸置疑。那么我们这个连接池需要实现哪些功能呢?一言以蔽之,就是从池子里拿连接,将连接放回池子。但是为了让连接池足够快速,在从池子里拿连接的时候可以做这样的优化:如果池子里没有 idle 的连接,可以在池子里放一个 waiter(这里其实是 channel 的发送端),如果有人用完了连接,可以直接把连接给塞给这个 waiter,同时也去创建新的连接,然后通过 future::select 同时等待 waiter 拿到数据或者新连接创建完。

这时候我们可以总结出来 pool 的定义,可以是这样的:

1
2
3
4
5
6
7
8
9
pub struct Pool<T> {
inner: Arc<Mutex<Inner<T>>>,
}

pub struct Inner<T> {
idle: Vec<T>,
waiters: VecDeque<oneshot::Sender<T>>,
max_idle: usize,
}

idle 是空闲连接,waiters 是向池子拿连接时放入的 waiter ,max_idle 是最大空闲连接数

(1)这个pool需要做哪些事情

  • 管理空闲连接:其实就是一个vec

  • 从pool里拿出来一个连接

分为两种情况

(1)idle 有空闲连接,直接 pop idle 里的空闲连接

(2)向 waiters 丢一个新的 waiter,同时通过 get 方法传进来的 make_service 创建一个新的连接。这时候使用 future::select 同时等待有人将用完的连接丢给 waiter ,或者新的连接先创建完。这里还做了个比较有趣的优化,如果有人先将用完的连接丢给 waiter ,新的连接创建的一半岂不是很浪费,没关系,这里我们可以通过 tokio::spwan 出继续完成新连接的创建,再将新创建的连接放到 idle 里

返回的数据结构是这样定义的,一个是连接本身,一个是指向 pool 的弱引用。为什么 inner 需要时 Option 呢?这是因为这样可以比较方便的将连接 take 出来,也不至于触发 Pooled 的析构,用完了可以再 insert 进去。那为什么需要持有 pool 的弱引用呢?因为需要知道放回连接到哪个池子里

1
2
3
4
pub struct Pooled<T> {
pub inner: Option<T>,
pool: Weak<Mutex<Inner<T>>>,
}

看到这里,你可能就会想到,连接是如何放回池子里的了

  • 将连接放回pool

当连接用完的时候,我们并不会关闭连接,而是通过它的析构函数,将连接放回池子里。所以我们可以为 Pooled 实现 Drop trait ,在 drop 里调用 pool 的 put 方法真正将连接放回池子里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
impl<T> Drop for Pooled<T> {
fn drop(&mut self) {
if self.inner.is_none() {
info!("inner drop is not some?");
return;
}
let t = self.inner.take().unwrap();
if let Some(pool) = self.pool.upgrade() {
if let Ok(mut pool) = pool.lock() {
pool.put(t);
info!("successfully put ws stream back to pool")
}
}
}
}

从拿连接的逻辑我们就知道将连接放回池子应该做哪些事情了。先看有没有 waiter ,如果有 waiter ,直接将连接丢给 waiter,如果没有 waiter ,将连接放回 idle

  • 感知连接的状态

如果pool中的连接自己挂了,没有及时感知到,client拿到了挂的连接,那用户使用体验会很糟糕呀。因此还需要周期性的检查pool中连接的状态,所以这里我们需要定义一个Poolable,如果不是reuseable的,就从pool中移除掉这个连接

1
2
3
4
pub trait Poolable {
// check if the connection is opened
fn reuseable(&self) -> bool;
}

当然其实这个是比较通用的实现,在本代码中并没能做这件事情。因为底层无法感知连接的状态,还是需要上层主动探活,这个逻辑还没实现。

为 WebSocketStream 实现 AsyncRead + AsyncWrite

tokio_tungstenite 设计上有很多让我感到奇怪的地方,在 client 端,不论你想建立的是 TLS 连接还是非 TLS 连接,拿到的都是 WebSocketStream<MaybeTlsStream<TcpStream>> ,server 拿到的都是 WebSocketStream<TlsStream<TcpStream>>,不过还好 MaybeTlsStream 和 TlsStream 都实现了 AysncRead 和 AsyncWrite,而不是分别为 client、server 的连接分别实现一次

对于 AsyncRead ,我们需要实现 poll_read 方法

1
2
3
4
5
6
7
pub trait AsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>>;
}

对于 AsyncWrite ,我们需要实现 poll_write poll_flush poll_shutdown

1
2
3
4
5
6
7
8
9
10
11
pub trait AsyncWrite {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>>;

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
}

有两点需要注意,在 copy_bidirectional 完成的时候,会执行 poll_shutdown ,由于我们需要复用底层连接,所以我们不应该真的关闭连接,而是发送 Close 包(告诉对端这次承载的 socks5 连接结束了,可以接受新的 socks 连接了),返回 Poll::Ready(Ok(()))。令一点是 poll_write 返回的 usize 长度不应该超过 buf 的长度(具体原因见另一篇文档 记一次有趣的查bug之旅

总结

使用 rust 写一个能够 work 的代理还是很容易的,但是让它足够鲁棒还是很难的,有很多细节需要考虑,特别是在连接异常情况下的错误处理。接下来还想做的事情是避免连接池中存在死的连接,一个是需要主动探活连接,一个是需要做 client 端重试(底层连接异常,触发上层有限次数重试)

Author: suikammd
Link: https://www.suikammd.com/2021/09/11/rust-proxy/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.