如何在 Rust 中实现 HTTP 长轮询

polarisxu

共 4543字,需浏览 10分钟

 ·

2021-10-11 16:17

实时通信顾名思义就是尽可能快地传播新数据。在讨论实时数据时,有两种工作负载:

  • 低延迟、多向流:Websockets。
  • 中等延迟、单向流:短轮询、服务器发送事件 (SSE) 和长轮询。

今天,我们将研究后者,因为它是你在开发 Web 应用程序时最常遇到的工作负载。

短轮询

短轮询

实时通信的第一种方法是短轮询。

在这种情况下,客户端向服务器发送请求,服务器立即回复。如果没有新数据,则响应为空。而大多数时候,情况就是这样的。所以大多数时候,服务器的响应是空的,这本来是可以避免的。

因此,短轮询在网络传输和 CPU 方面都是浪费的,因为每次都需要解析和编码请求。

唯一的优点就是简单。

服务器发送事件 (SSE)

SSE

与 WebSockets 相反,SSE 流是单向的:只有服务器可以将数据发送回客户端。此外,自动重新连接的机制(通常)内置于客户端。

缺点是实现服务器端并不容易。

长轮询

长轮询

最后是长轮询:客户端发出请求,并指示它拥有的最后一条数据,服务器仅在有新数据可用或达到一定时间时才将响应发送回去。

它的优点是实现起来极其简单,因为它不是一个流,而是一个简单的请求—响应方案,因此非常健壮,不需要复杂的自动重连算法,并且可以优雅地处理网络错误。此外,与短轮询相反,长轮询在资源使用方面的浪费较少。

唯一的缺点是,长轮询的延迟不如 WebSockets 好,但在大多数情况下并不重要。

长轮询在 Rust 中非常有效:多亏有 async,使得每个打开的连接使用的资源(一个简单的任务)很少,而许多语言使用整个操作系统线程。

最后,由于长轮询是简单的 HTTP 请求,因此这种技术更有可能不被某种激进的防火墙或网络设备阻止。

Rust 中的长轮询

我们将使用由tokio 团队[1]开发的新 Web 框架:axum[2]。它的性能和简单性在 Rust 界中是无与伦比的。另外,请注意,将此代码移植到另一个 Web 框架很容易。

我们将实现一个简单的聊天服务器,因为聊天是从长轮询中获益最多的教科书应用程序。

有 3 个技巧可以使这个实现起来更高效,可以关注一下。

聊天服务

聊天服务是一个封装了我们所有业务逻辑的对象。为了使示例简单,我们将只进行数据库调用。

这是我们的第一个技巧:为了启用消息排序,我们不使用 UUIDv4。相反,我们使用转换为 UUID 的 ULID[3],因此序列化/反序列化它没有问题:Uuid = Ulid::new().into()

chat.rs

impl ChatService {
    pub fn new(db: DB) -> Self {
        ChatService { db }
    }

    pub async fn create_message(&self, body: String) -> Result {
        if body.len() > 10_000 {
            return Err(Error::InvalidArgument("Message is too large".to_string()));
        }

        let created_at = chrono::Utc::now();
        let id: Uuid = Ulid::new().into();

        let query = "INSERT INTO messages
            (id, created_at, body)
            VALUES ($1, $2, $3)"
;

        sqlx::query(query)
            .bind(id)
            .bind(created_at)
            .bind(&body)
            .execute(&self.db)
            .await?;

        Ok(Message {
            id,
            created_at,
            body,
        })
    }
}

这是我们的第二个技巧:注意 after.unwrap_or(Uuid::nil()) 返回 “0” UUID ( 00000000-0000-0000-0000-000000000000)  。有了 WHERE id > $1 它,我们就可以返回所有消息(如果 afternone)。

例如,恢复客户端的整个状态是很有用的。

    pub async fn find_messages(&self, after: Option) -> Result<Vec, Error> {
        let query = "SELECT *
            FROM messages
            WHERE id > $1"
;

        let messages: Vec = sqlx::query_as::<_, Message>(query)
            .bind(after.unwrap_or(Uuid::nil()))
            .fetch_all(&self.db)
            .await?;

        Ok(messages)
    }
}

网络服务器

接下来,运行 Web 服务器的样板。

由于 .layer(AddExtensionLayer::new(ctx))ServerContext 被注入到所有路由中,因此我们可以调用 ChatService 的方法。

struct ServerContext {
    chat_service: chat::ChatService,
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    std::env::set_var("RUST_LOG""rust_long_polling=info");
    env_logger::init();

    let database_url = std::env::var("DATABASE_URL")
        .map_err(|_| Error::BadConfig("DATABASE_URL env var is missing".to_string()))?;

    let db = db::connect(&database_url).await?;
    db::migrate(&db).await?;

    let chat_service = chat::ChatService::new(db);
    let ctx = Arc::new(ServerContext::new(chat_service));

    let app = Router::new()
        .route(
            "/messages",
            get(handler_find_messages).post(handler_create_message),
        )
        .or(handler_404.into_service())
        .layer(AddExtensionLayer::new(ctx));

    log::info!("Starting server on 0.0.0.0:8080");
    axum::Server::bind(
        &"0.0.0.0:8080"
            .parse()
            .expect("parsing server's bind address"),
    )
    .serve(app.into_make_service())
    .await
    .expect("running server");

    Ok(())
}

长轮询

最后,我们的第三个技巧:长轮询是一个简单的循环 tokio::time::sleep

通过使用 tokio::time::sleep,活动连接在等待时几乎不会使用任何资源。

如果找到新数据,我们立即返回新数据。否则,我们再等一秒钟。

10 秒后,我们返回空数据。

main.rs

async fn handler_find_messages(
    Extension(ctx): Extension>,
    query_params: Query,
) -> ResultVec>, Error> {
    let sleep_for = Duration::from_secs(1);

    // long polling: 10 secs
    for _ in 0..10u64 {
        let messages = ctx.chat_service.find_messages(query_params.after).await?;
        if messages.len() != 0 {
            return Ok(messages.into());
        }

        tokio::time::sleep(sleep_for).await;
    }

    // return an empty response
    Ok(Vec::new().into())
}

代码在 GitHub 上

像往常一样,你可以在 GitHub 上找到代码:github.com/skerkour/kerkour.com[4]

参考资料

[1]

tokio 团队: https://github.com/tokio-rs

[2]

axum: https://github.com/tokio-rs/axum

[3]

ULID: https://github.com/ulid/spec

[4]

github.com/skerkour/kerkour.com: https://github.com/skerkour/kerkour.com/tree/main/2021/rust_long_polling




往期推荐


我是 polarisxu,北大硕士毕业,曾在 360 等知名互联网公司工作,10多年技术研发与架构经验!2012 年接触 Go 语言并创建了 Go 语言中文网!著有《Go语言编程之旅》、开源图书《Go语言标准库》等。


坚持输出技术(包括 Go、Rust 等技术)、职场心得和创业感悟!欢迎关注「polarisxu」一起成长!也欢迎加我微信好友交流:gopherstudio

浏览 94
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报