如何在 Rust 中实现 HTTP 长轮询
实时通信顾名思义就是尽可能快地传播新数据。在讨论实时数据时,有两种工作负载:
低延迟、多向流:Websockets。 中等延迟、单向流:短轮询、服务器发送事件 (SSE) 和长轮询。
今天,我们将研究后者,因为它是你在开发 Web 应用程序时最常遇到的工作负载。
短轮询
实时通信的第一种方法是短轮询。
在这种情况下,客户端向服务器发送请求,服务器立即回复。如果没有新数据,则响应为空。而大多数时候,情况就是这样的。所以大多数时候,服务器的响应是空的,这本来是可以避免的。
因此,短轮询在网络传输和 CPU 方面都是浪费的,因为每次都需要解析和编码请求。
唯一的优点就是简单。
服务器发送事件 (SSE)
与 WebSockets 相反,SSE 流是单向的:只有服务器可以将数据发送回客户端。此外,自动重新连接的机制(通常)内置于客户端。
缺点是实现服务器端并不容易。
长轮询
最后是长轮询:客户端发出请求,并指示它拥有的最后一条数据,服务器仅在有新数据可用或达到一定时间时才将响应发送回去。
它的优点是实现起来极其简单,因为它不是一个流,而是一个简单的请求—响应方案,因此非常健壮,不需要复杂的自动重连算法,并且可以优雅地处理网络错误。此外,与短轮询相反,长轮询在资源使用方面的浪费较少。
唯一的缺点是,长轮询的延迟不如 WebSockets 好,但在大多数情况下并不重要。
长轮询在 Rust 中非常有效:多亏有 async
,使得每个打开的连接使用的资源(一个简单的任务)很少,而许多语言使用整个操作系统线程。
最后,由于长轮询是简单的 HTTP 请求,因此这种技术更有可能不被某种激进的防火墙或网络设备阻止。
Rust 中的长轮询
我们将使用由tokio 团队[1]开发的新 Web 框架:axum[2]。它的性能和简单性在 Rust 界中是无与伦比的。另外,请注意,将此代码移植到另一个 Web 框架很容易。
我们将实现一个简单的聊天服务器,因为聊天是从长轮询中获益最多的教科书应用程序。
有 3 个技巧可以使这个实现起来更高效,可以关注一下。
聊天服务
聊天服务是一个封装了我们所有业务逻辑的对象。为了使示例简单,我们将只进行数据库调用。
这是我们的第一个技巧:为了启用消息排序,我们不使用 UUIDv4
。相反,我们使用转换为 UUID 的 ULID[3],因此序列化/反序列化它没有问题:Uuid = Ulid::new().into()
chat.rs
impl ChatService {
pub fn new(db: DB) -> Self {
ChatService { db }
}
pub async fn create_message(&self, body: String) -> Result {
if body.len() > 10_000 {
return Err(Error::InvalidArgument("Message is too large".to_string()));
}
let created_at = chrono::Utc::now();
let id: Uuid = Ulid::new().into();
let query = "INSERT INTO messages
(id, created_at, body)
VALUES ($1, $2, $3)";
sqlx::query(query)
.bind(id)
.bind(created_at)
.bind(&body)
.execute(&self.db)
.await?;
Ok(Message {
id,
created_at,
body,
})
}
}
这是我们的第二个技巧:注意 after.unwrap_or(Uuid::nil())
返回 “0” UUID ( 00000000-0000-0000-0000-000000000000
) 。有了 WHERE id > $1
它,我们就可以返回所有消息(如果 after
是 none
)。
例如,恢复客户端的整个状态是很有用的。
pub async fn find_messages(&self, after: Option) -> Result<Vec, Error> {
let query = "SELECT *
FROM messages
WHERE id > $1";
let messages: Vec = sqlx::query_as::<_, Message>(query)
.bind(after.unwrap_or(Uuid::nil()))
.fetch_all(&self.db)
.await?;
Ok(messages)
}
}
网络服务器
接下来,运行 Web 服务器的样板。
由于 .layer(AddExtensionLayer::new(ctx))
,ServerContext
被注入到所有路由中,因此我们可以调用 ChatService
的方法。
struct ServerContext {
chat_service: chat::ChatService,
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
std::env::set_var("RUST_LOG", "rust_long_polling=info");
env_logger::init();
let database_url = std::env::var("DATABASE_URL")
.map_err(|_| Error::BadConfig("DATABASE_URL env var is missing".to_string()))?;
let db = db::connect(&database_url).await?;
db::migrate(&db).await?;
let chat_service = chat::ChatService::new(db);
let ctx = Arc::new(ServerContext::new(chat_service));
let app = Router::new()
.route(
"/messages",
get(handler_find_messages).post(handler_create_message),
)
.or(handler_404.into_service())
.layer(AddExtensionLayer::new(ctx));
log::info!("Starting server on 0.0.0.0:8080");
axum::Server::bind(
&"0.0.0.0:8080"
.parse()
.expect("parsing server's bind address"),
)
.serve(app.into_make_service())
.await
.expect("running server");
Ok(())
}
长轮询
最后,我们的第三个技巧:长轮询是一个简单的循环 tokio::time::sleep
。
通过使用 tokio::time::sleep
,活动连接在等待时几乎不会使用任何资源。
如果找到新数据,我们立即返回新数据。否则,我们再等一秒钟。
10 秒后,我们返回空数据。
main.rs
async fn handler_find_messages(
Extension(ctx): Extension>,
query_params: Query,
) -> ResultVec>, Error> {
let sleep_for = Duration::from_secs(1);
// long polling: 10 secs
for _ in 0..10u64 {
let messages = ctx.chat_service.find_messages(query_params.after).await?;
if messages.len() != 0 {
return Ok(messages.into());
}
tokio::time::sleep(sleep_for).await;
}
// return an empty response
Ok(Vec::new().into())
}
代码在 GitHub 上
像往常一样,你可以在 GitHub 上找到代码:github.com/skerkour/kerkour.com[4]。
参考资料
tokio 团队: https://github.com/tokio-rs
[2]axum: https://github.com/tokio-rs/axum
[3]ULID: https://github.com/ulid/spec
[4]github.com/skerkour/kerkour.com: https://github.com/skerkour/kerkour.com/tree/main/2021/rust_long_polling
我是 polarisxu,北大硕士毕业,曾在 360 等知名互联网公司工作,10多年技术研发与架构经验!2012 年接触 Go 语言并创建了 Go 语言中文网!著有《Go语言编程之旅》、开源图书《Go语言标准库》等。
坚持输出技术(包括 Go、Rust 等技术)、职场心得和创业感悟!欢迎关注「polarisxu」一起成长!也欢迎加我微信好友交流:gopherstudio