Rust网络框架Pingora源码阅读1

共 23729字,需浏览 48分钟

 ·

2024-04-12 03:10

要想深入Pingora应该是需要阅读源代码的,所以分析一下源代码,虽然Pingora没有提供丰富的示例,但是提供了一些不错的文档,比如它的internals.md文档,提供了很多细节和示意图,本系列文章会引用很多其中的示意图,Pingora的源码分析应该会分为2篇文章或更多。

Pingora代码版本: v0.1.0

internals.md在https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md

一个例子

看源码总是需要一个入口的,所以可以根据下面的代码来观察一下Pingora的大致运行逻辑。

这个例子来自我自己的之前的文章:  https://youerning.top/post/pingora/tutorial-1

      
      // https://youerning.top
use async_trait::async_trait;
use log::info;
use pingora_core::services::background::background_service;
use std::{sync::Arc, time::Duration};
use structopt::StructOpt;

use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::{Result, Error, ErrorType};
use pingora_load_balancing::{health_check, selection::RoundRobin, LoadBalancer};
use pingora_proxy::{ProxyHttp, Session};

// 随便定义一个可以实现trait的struct对象
// 为了简单起见当然是要包含Pingora提供的负载均衡对象啦.
pub struct LB(Arc<LoadBalancer<RoundRobin>>);


#[async_trait]
impl ProxyHttp for LB {
    type CTX = ();
    fn new_ctx(&self) -> Self::CTX {}

    // ProxyHttp唯一必须要自己定义的方法, 也就是每次请求来会调用这个方法以选择后端(upstream)
    async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
        let upstream = match self
            .0
            // 这里使用的负载均衡算法是RoundRobin, 所以key不重要, key是用用于hash算法的
            // 后面做灰度发布会用到hash算法
            .select(b""256) { 
                Some(upstream) => upstream,
                None => {
                    // 因为本代码中创建了一个健康检查的服务,所以可能会出现没有后端的情况
                    return Err(Error::new(ErrorType::new("没有健康的后端可以选择.")))
                }
            };
            

        info!("选择的后端是: {:?}", upstream);
        let peer = Box::new(HttpPeer::new(upstream, false"".to_string()));
        Ok(peer)
    }

    // Pingora提供了很多的钩子函数,这个钩子函数用于修改或者说过滤客户端请求
    async fn upstream_request_filter(
        &self,
        _session: &mut Session,
        upstream_request: &mut pingora_http::RequestHeader,
        _ctx: &mut Self::CTX,
    ) -> Result<()> {
        // 将发送给后端的请求插入一个Host请求头
        upstream_request
            .insert_header("Host""youering.top")
            .unwrap();
        Ok(())
    }
}

fn main() {
    // 初始化日志服务,默认级别是Error及以上才会显示, 可以通过RUST_LOG=INFO来调整日志输出级别
    env_logger::init();
 
    // 1.
    // 读取命令行参数
    let opt = Opt::from_args();
    let mut my_server = Server::new(Some(opt)).unwrap();
    my_server.bootstrap();

    // 127.0.0.1:343 是一个不存在的服务, 这样就会在Pingora的输出看到一个错误,说127.0.0.1:343不健康
    // 不健康的服务就不会被选择了
    let mut upstreams =
        LoadBalancer::try_from_iter(["127.0.0.1:10081""127.0.0.1:10082""127.0.0.1:343"]).unwrap();

    // 这里创建了一个健康检查的服务,健康检查服务会保证不健康的服务不会被选择
    let hc = health_check::TcpHealthCheck::new();
    upstreams.set_health_check(hc);
    upstreams.health_check_frequency = Some(Duration::from_secs(1));
 
    // 2.1
    // 创建一个后台服务
    let background = background_service("健康检查", upstreams);
    let upstreams = background.task();
    
 // 2.2
    // 创建一个代理服务
    let mut lb = pingora_proxy::http_proxy_service(&my_server.configuration, LB(upstreams));
    lb.add_tcp("0.0.0.0:10080");
 
    // 3.
    my_server.add_service(lb);
    my_server.add_service(background);
    // 4.
    my_server.run_forever();
}

上面的大致逻辑如下:

  1. 创建一个Server对象
  2. 创建特定的Service对象,本文有http_proxy_servicebackground_service
  3. Service对象列表依次加入或者说注册到Server对象中
  4. 启动Server对象

从上面的描述可以得出,Pingora有两个比较重要的对象,即ServerService,事实也的确如此。下面是官方对于这两个对象的描述示意图。

      
                                     ┌───────────┐
                    ┌─────────>│  Service  │
                    │          └───────────┘
┌────────┐          │          ┌───────────┐
│ Server │──Spawns──┼─────────>│  Service  │
└────────┘          │          └───────────┘
                    │          ┌───────────┐
                    └─────────>│  Service  │
                               └───────────┘

Server

Server主要是用于处理信号(比如升级, 关闭),  读取配置文件, 错误处理,管理Service对象等。

new

创建一个Server对象需要一个Opt对象,它会解析命令行参数,而Server对象会根据这些参数来配置Server的各项配置,比较主要的是ServerConf配置文件的路径以及是否后台运行。

      
      pub fn new(opt: Option<Opt>) -> Result<Server> {
    let (tx, rx) = watch::channel(false);

    // 1.
    let conf = if let Some(opt) = opt.as_ref() {
        // 2.
        opt.conf.as_ref().map_or_else(
            || {
                // options, no conf, generated
                ServerConf::new_with_opt_override(opt).ok_or_else(|| {
                    Error::explain(ErrorType::ReadError, "Conf generation failed")
                })
            },
            |_| {
                // options and conf loaded
                ServerConf::load_yaml_with_opt_override(opt)
            },
        )
    } else {
        ServerConf::new()
        .ok_or_else(|| Error::explain(ErrorType::ReadError, "Conf generation failed"))
    }?;

    // 3.
    Ok(Server {
        services: vec![],
        listen_fds: None,
        shutdown_watch: tx,
        shutdown_recv: rx,
        configuration: Arc::new(conf),
        options: opt,
        sentry: None,
    })
}

代码逻辑如下:

  1. 获取Opt对象, Opt对象是通过let opt = Opt::from_args();创建的,它会读取命令行参数
  2. 读取Opt对象的conf字段,也就是ServerConf配置文件的路径,如果不存在会创建一个空的Yaml文件, 命令行的daemon参数会覆盖ServerConfdaemon参数
  3. 返回一个Server对象

bootstrap

Server创建成功之后就可以尝试预启动,这个方法有两个作用。

  1. 测试服务是否可以正常运行,可以通过-t或者--test命令行参数启用,如果指定了这个参数,就会在这一步退出,什么都不做。
  2. 从旧的Server对象获取之前监听的句柄列表, 可以通过-u or --upgrade命令行参数启用,这也说明本次启动是为了升级服务。
      
      pub fn bootstrap(&mut self) {
    info!("Bootstrap starting");

    // 如果是测试就退出
    if self.options.as_ref().map_or(false, |o| o.test) {
        info!("Server Test passed, exiting");
        std::process::exit(0);
    }

    // 如果是upgrade会接替旧的句柄
    match self.load_fds(self.options.as_ref().map_or(false, |o| o.upgrade)) {
        Ok(_) => {
            info!("Bootstrap done");
        }
        Err(e) => {
            // 错误处理
        }
    }
}

add_service

这个方法比较简单,只要传入的参数符合impl Service + 'static的类型约定就行,下面是它的源代码。

      
      pub fn add_service(&mut self, service: impl Service + 'static) {
    self.services.push(Box::new(service));
}

所以你可以实现自己的Service来让Server创建对应异步任务,值得一提的是,Pingora定义了两种Service,即后台(background)服务监听(listener)服务, 两者的名字都足够直白了,前者是后台服务,默默的在后台运行,比如健康检查的服务,而监听服务就是监听客户请求然后处理的服务。

关于Service的内容会在另一篇文章在详细介绍,现在就大致有个概念就行。

run_forever

当需要的服务都添加了之后,就该让服务永远的运行了,所以它启动的方法叫run_forever

永远只是一个愿景...

      
      pub fn run_forever(&mut self) {
    info!("Server starting");

    let conf = self.configuration.as_ref();
 
    // 1.
    if conf.daemon {
        info!("Daemonizing the server");
        fast_timeout::pause_for_fork();
        daemonize(&self.configuration);
        fast_timeout::unpause();
    }


    // 2.
    let mut runtimes: Vec<Runtime> = Vec::new();
    while let Some(service) = self.services.pop() {
        let threads = service.threads().unwrap_or(conf.threads);
        let runtime = Server::run_service(
            service,
            self.listen_fds.clone(),
            self.shutdown_recv.clone(),
            threads,
            conf.work_stealing,
        );
        runtimes.push(runtime);
    }

    // blocked on main loop so that it runs forever
    // Only work steal runtime can use block_on()
    // 3.
    let server_runtime = Server::create_runtime("Server"1true);
    let shutdown_type = server_runtime.get_handle().block_on(self.main_loop());

    // 收到关闭信号的收尾逻辑就省略了。。。
    std::process::exit(0);
}

注意: 由于本函数可能会为了在后台运行做fork操作,所以一旦这个函数本调用,那么在此函数之前创建的线程会丢失。

代码逻辑如下

  1. 如果配置了后台运行参数,就后台运行,通过fork操作完成,fork没怎么用过,所以不深入了。
  2. 根据Service列表的数量创建对应数量的Runtime, 也就是TokioRuntime对象
  3. 主线程在创建一个运行时,阻塞运行main_loop函数以监听关闭信号。

main_loop

因为这部分比较简单,我们就先看上面的第三步吧。

      
          async fn main_loop(&self) -> ShutdownType {
        // 1.
        let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
        let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
        let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
        
        // 2. 
        tokio::select! {
            // 3.直接退出
            _ = fast_shutdown_signal.recv() => {
                info!("SIGINT received, exiting");
                ShutdownType::Quick
            },
            // 4. 收到优雅关闭的信号当然就通知其他线程关闭啦
            _ = graceful_terminate_signal.recv() => {
                info!("Broadcasting graceful shutdown");
                // 通过之前创建shutdown_watch通知其他线程
                match self.shutdown_watch.send(true) {
                    Ok(_) => { info!("Graceful shutdown started!"); }
                    Err(e) => {
                        error!("Graceful shutdown broadcast failed: {e}");
                    }
                }
                info!("Broadcast graceful shutdown complete");
                ShutdownType::Graceful
            }
            // 5. 优雅升级就稍微复杂一点
            _ = graceful_upgrade_signal.recv() => {
                // 6.
                if let Some(fds) = &self.listen_fds {
                    let fds = fds.lock().await;
                    match fds.send_to_sock(
                        self.configuration.as_ref().upgrade_sock.as_str())
                    {
                        // 省略具体逻辑
                    }
                    sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
                    // 6.
                    match self.shutdown_watch.send(true) {
                        // 省略具体逻辑
                    }
                    info!("Broadcast graceful shutdown complete");
                    ShutdownType::Graceful
                } else {
                    info!("No socks to send, shutting down.");
                    ShutdownType::Graceful
                }
            },
        }
    }

关于各种信号的解释可以参考: https://www.gnu.org/software/libc/manual/html_node/Termination-Signals.html

代码逻辑如下:

  1. 获取各种信号的Signal对象, 三种不同的信号分别对应不同的操作。
  2. 通过tokio::select!宏同时轮训三种信号。
  3. 快速关闭进程,啥都不管了,连接丢不丢失无所谓了。
  4. 通知其他线程关闭,那么其他线程会收到通知之后就不处理新请求了,然后主线程会等待3分钟,无论其他线程完成与否,都会等待3分钟。
  5. 优雅升级就会稍稍复杂一点
  6. 首先通过约定的unix socket路径将当前监听的句柄发给对新的服务
  7. 通知其他线程可以停止,然后旧的服务主线程也会等待3分钟,所以如果有些长连接一直不断开还是会丢失连接呀。。。不知道我理解的对不对。

run_service

启动注册的各种服务

      
      fn run_service(
    mut service: Box<dyn Service>,
    fds: Option<ListenFds>,
    shutdown: ShutdownWatch,
    threads: usize,
    work_stealing: bool,
) -> Runtime
// 

    // 1.创建运行时
    let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
    // 2.获取运行时的句柄并创建一个异步任务
    service_runtime.get_handle().spawn(async move {
        service.start_service(fds, shutdown).await;
        info!("service exited.")
    });
    service_runtime
}

注意: 我们要让runtimeasync之外,否则话runtime会被drop掉

这段代码的逻辑比较清晰,就是创建运行时并运行服务,最后返回创建的运行时,之所以包装一下tokio的运行时是因为还构造了一个不做工作窃取(NoSteal)的运行时,选择哪个跟具体的工作负载有关。

但是理论上来说,不做工作窃取的运行时可能在多核(比如4核及以上)的情况下要表现好于有工作窃取功能的运行时,这是因为工作窃取需要在线程之间同步,而多核的情况下同步会影响性能的(不同CPU核之间传递数据需要加锁的啦),所以Pingora提供了不做工作窃取的运行时。

值得注意的是,我这里之所以说4核及以上是基于字节跳动的异步运行时monoio的性能测试报告,所以我说的不一定准,仅做参考,虽然monoio用的底层调度机制不是epoll而是io_ring(似乎也搞epoll???没仔细研究过),但我觉得还是有一定的参考价值的。

默认运行时的创建比较简单,非工作窃取的运行时我没仔细看,所以就不深入运行时的创建了。

start_service

最后就是服务的启动,启动没有太多可讲的,就是通过运行时的spawn方法创建一个异步任务调用Servicestart_service方法,至于start_serverice具体做了啥,且待下回分解。

总结

Pingora有两个核心对象, ServerService, 前者控制整个程序的生命周期,后者负责具体的业务逻辑,比如代理服务。

参考链接

  • https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md
  • https://github.com/cloudflare/pingora
  • https://www.gnu.org/software/libc/manual/html_node/Termination-Signals.html
  • https://github.com/bytedance/monoio/blob/master/docs/zh/benchmark.md
浏览 8
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报