Rust网络框架Pingora源码阅读1
要想深入Pingora
应该是需要阅读源代码的,所以分析一下源代码,虽然Pingora
没有提供丰富的示例,但是提供了一些不错的文档,比如它的internals.md
文档,提供了很多细节和示意图,本系列文章会引用很多其中的示意图,Pingora
的源码分析应该会分为2篇文章或更多。
Pingora
代码版本: v0.1.0
internals.md在https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md
一个例子
看源码总是需要一个入口的,所以可以根据下面的代码来观察一下Pingora
的大致运行逻辑。
这个例子来自我自己的之前的文章: https://youerning.top/post/pingora/tutorial-1
// https://youerning.top
use async_trait::async_trait;
use log::info;
use pingora_core::services::background::background_service;
use std::{sync::Arc, time::Duration};
use structopt::StructOpt;
use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::{Result, Error, ErrorType};
use pingora_load_balancing::{health_check, selection::RoundRobin, LoadBalancer};
use pingora_proxy::{ProxyHttp, Session};
// 随便定义一个可以实现trait的struct对象
// 为了简单起见当然是要包含Pingora提供的负载均衡对象啦.
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
#[async_trait]
impl ProxyHttp for LB {
type CTX = ();
fn new_ctx(&self) -> Self::CTX {}
// ProxyHttp唯一必须要自己定义的方法, 也就是每次请求来会调用这个方法以选择后端(upstream)
async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
let upstream = match self
.0
// 这里使用的负载均衡算法是RoundRobin, 所以key不重要, key是用用于hash算法的
// 后面做灰度发布会用到hash算法
.select(b"", 256) {
Some(upstream) => upstream,
None => {
// 因为本代码中创建了一个健康检查的服务,所以可能会出现没有后端的情况
return Err(Error::new(ErrorType::new("没有健康的后端可以选择.")))
}
};
info!("选择的后端是: {:?}", upstream);
let peer = Box::new(HttpPeer::new(upstream, false, "".to_string()));
Ok(peer)
}
// Pingora提供了很多的钩子函数,这个钩子函数用于修改或者说过滤客户端请求
async fn upstream_request_filter(
&self,
_session: &mut Session,
upstream_request: &mut pingora_http::RequestHeader,
_ctx: &mut Self::CTX,
) -> Result<()> {
// 将发送给后端的请求插入一个Host请求头
upstream_request
.insert_header("Host", "youering.top")
.unwrap();
Ok(())
}
}
fn main() {
// 初始化日志服务,默认级别是Error及以上才会显示, 可以通过RUST_LOG=INFO来调整日志输出级别
env_logger::init();
// 1.
// 读取命令行参数
let opt = Opt::from_args();
let mut my_server = Server::new(Some(opt)).unwrap();
my_server.bootstrap();
// 127.0.0.1:343 是一个不存在的服务, 这样就会在Pingora的输出看到一个错误,说127.0.0.1:343不健康
// 不健康的服务就不会被选择了
let mut upstreams =
LoadBalancer::try_from_iter(["127.0.0.1:10081", "127.0.0.1:10082", "127.0.0.1:343"]).unwrap();
// 这里创建了一个健康检查的服务,健康检查服务会保证不健康的服务不会被选择
let hc = health_check::TcpHealthCheck::new();
upstreams.set_health_check(hc);
upstreams.health_check_frequency = Some(Duration::from_secs(1));
// 2.1
// 创建一个后台服务
let background = background_service("健康检查", upstreams);
let upstreams = background.task();
// 2.2
// 创建一个代理服务
let mut lb = pingora_proxy::http_proxy_service(&my_server.configuration, LB(upstreams));
lb.add_tcp("0.0.0.0:10080");
// 3.
my_server.add_service(lb);
my_server.add_service(background);
// 4.
my_server.run_forever();
}
上面的大致逻辑如下:
- 创建一个
Server
对象 - 创建特定的
Service
对象,本文有http_proxy_service
和background_service
- 将
Service
对象列表依次加入或者说注册到Server
对象中 - 启动
Server
对象
从上面的描述可以得出,Pingora
有两个比较重要的对象,即Server
和Service
,事实也的确如此。下面是官方对于这两个对象的描述示意图。
┌───────────┐
┌─────────>│ Service │
│ └───────────┘
┌────────┐ │ ┌───────────┐
│ Server │──Spawns──┼─────────>│ Service │
└────────┘ │ └───────────┘
│ ┌───────────┐
└─────────>│ Service │
└───────────┘
Server
Server
主要是用于处理信号(比如升级, 关闭), 读取配置文件, 错误处理,管理Service
对象等。
new
创建一个Server
对象需要一个Opt
对象,它会解析命令行参数,而Server
对象会根据这些参数来配置Server
的各项配置,比较主要的是ServerConf
配置文件的路径以及是否后台运行。
pub fn new(opt: Option<Opt>) -> Result<Server> {
let (tx, rx) = watch::channel(false);
// 1.
let conf = if let Some(opt) = opt.as_ref() {
// 2.
opt.conf.as_ref().map_or_else(
|| {
// options, no conf, generated
ServerConf::new_with_opt_override(opt).ok_or_else(|| {
Error::explain(ErrorType::ReadError, "Conf generation failed")
})
},
|_| {
// options and conf loaded
ServerConf::load_yaml_with_opt_override(opt)
},
)
} else {
ServerConf::new()
.ok_or_else(|| Error::explain(ErrorType::ReadError, "Conf generation failed"))
}?;
// 3.
Ok(Server {
services: vec![],
listen_fds: None,
shutdown_watch: tx,
shutdown_recv: rx,
configuration: Arc::new(conf),
options: opt,
sentry: None,
})
}
代码逻辑如下:
- 获取
Opt
对象,Opt
对象是通过let opt = Opt::from_args();
创建的,它会读取命令行参数 - 读取
Opt
对象的conf
字段,也就是ServerConf
配置文件的路径,如果不存在会创建一个空的Yaml
文件, 命令行的daemon
参数会覆盖ServerConf
的daemon
参数 - 返回一个
Server
对象
bootstrap
当Server
创建成功之后就可以尝试预启动,这个方法有两个作用。
- 测试服务是否可以正常运行,可以通过
-t
或者--test
命令行参数启用,如果指定了这个参数,就会在这一步退出,什么都不做。 - 从旧的
Server
对象获取之前监听的句柄列表, 可以通过-u
or--upgrade
命令行参数启用,这也说明本次启动是为了升级服务。
pub fn bootstrap(&mut self) {
info!("Bootstrap starting");
// 如果是测试就退出
if self.options.as_ref().map_or(false, |o| o.test) {
info!("Server Test passed, exiting");
std::process::exit(0);
}
// 如果是upgrade会接替旧的句柄
match self.load_fds(self.options.as_ref().map_or(false, |o| o.upgrade)) {
Ok(_) => {
info!("Bootstrap done");
}
Err(e) => {
// 错误处理
}
}
}
add_service
这个方法比较简单,只要传入的参数符合impl Service + 'static
的类型约定就行,下面是它的源代码。
pub fn add_service(&mut self, service: impl Service + 'static) {
self.services.push(Box::new(service));
}
所以你可以实现自己的Service
来让Server
创建对应异步任务,值得一提的是,Pingora
定义了两种Service
,即后台(background)服务
和监听(listener)服务
, 两者的名字都足够直白了,前者是后台服务,默默的在后台运行,比如健康检查的服务,而监听服务就是监听客户请求然后处理的服务。
关于Service
的内容会在另一篇文章在详细介绍,现在就大致有个概念就行。
run_forever
当需要的服务都添加了之后,就该让服务永远的运行了,所以它启动的方法叫run_forever
。
永远只是一个愿景...
pub fn run_forever(&mut self) {
info!("Server starting");
let conf = self.configuration.as_ref();
// 1.
if conf.daemon {
info!("Daemonizing the server");
fast_timeout::pause_for_fork();
daemonize(&self.configuration);
fast_timeout::unpause();
}
// 2.
let mut runtimes: Vec<Runtime> = Vec::new();
while let Some(service) = self.services.pop() {
let threads = service.threads().unwrap_or(conf.threads);
let runtime = Server::run_service(
service,
self.listen_fds.clone(),
self.shutdown_recv.clone(),
threads,
conf.work_stealing,
);
runtimes.push(runtime);
}
// blocked on main loop so that it runs forever
// Only work steal runtime can use block_on()
// 3.
let server_runtime = Server::create_runtime("Server", 1, true);
let shutdown_type = server_runtime.get_handle().block_on(self.main_loop());
// 收到关闭信号的收尾逻辑就省略了。。。
std::process::exit(0);
}
注意: 由于本函数可能会为了在后台运行做fork操作,所以一旦这个函数本调用,那么在此函数之前创建的线程会丢失。
代码逻辑如下
- 如果配置了后台运行参数,就后台运行,通过fork操作完成,fork没怎么用过,所以不深入了。
- 根据
Service
列表的数量创建对应数量的Runtime
, 也就是Tokio
的Runtime
对象 - 主线程在创建一个运行时,阻塞运行
main_loop
函数以监听关闭信号。
main_loop
因为这部分比较简单,我们就先看上面的第三步吧。
async fn main_loop(&self) -> ShutdownType {
// 1.
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
// 2.
tokio::select! {
// 3.直接退出
_ = fast_shutdown_signal.recv() => {
info!("SIGINT received, exiting");
ShutdownType::Quick
},
// 4. 收到优雅关闭的信号当然就通知其他线程关闭啦
_ = graceful_terminate_signal.recv() => {
info!("Broadcasting graceful shutdown");
// 通过之前创建shutdown_watch通知其他线程
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
}
}
info!("Broadcast graceful shutdown complete");
ShutdownType::Graceful
}
// 5. 优雅升级就稍微复杂一点
_ = graceful_upgrade_signal.recv() => {
// 6.
if let Some(fds) = &self.listen_fds {
let fds = fds.lock().await;
match fds.send_to_sock(
self.configuration.as_ref().upgrade_sock.as_str())
{
// 省略具体逻辑
}
sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
// 6.
match self.shutdown_watch.send(true) {
// 省略具体逻辑
}
info!("Broadcast graceful shutdown complete");
ShutdownType::Graceful
} else {
info!("No socks to send, shutting down.");
ShutdownType::Graceful
}
},
}
}
关于各种信号的解释可以参考: https://www.gnu.org/software/libc/manual/html_node/Termination-Signals.html
代码逻辑如下:
- 获取各种信号的
Signal
对象, 三种不同的信号分别对应不同的操作。 - 通过
tokio::select!
宏同时轮训三种信号。 - 快速关闭进程,啥都不管了,连接丢不丢失无所谓了。
- 通知其他线程关闭,那么其他线程会收到通知之后就不处理新请求了,然后主线程会等待3分钟,无论其他线程完成与否,都会等待3分钟。
- 优雅升级就会稍稍复杂一点
- 首先通过约定的
unix socket
路径将当前监听的句柄发给对新的服务 - 通知其他线程可以停止,然后旧的服务主线程也会等待3分钟,所以如果有些长连接一直不断开还是会丢失连接呀。。。不知道我理解的对不对。
run_service
启动注册的各种服务
fn run_service(
mut service: Box<dyn Service>,
fds: Option<ListenFds>,
shutdown: ShutdownWatch,
threads: usize,
work_stealing: bool,
) -> Runtime
//
{
// 1.创建运行时
let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
// 2.获取运行时的句柄并创建一个异步任务
service_runtime.get_handle().spawn(async move {
service.start_service(fds, shutdown).await;
info!("service exited.")
});
service_runtime
}
注意: 我们要让
runtime
在async
之外,否则话runtime
会被drop掉
这段代码的逻辑比较清晰,就是创建运行时并运行服务,最后返回创建的运行时,之所以包装一下tokio
的运行时是因为还构造了一个不做工作窃取
(NoSteal
)的运行时,选择哪个跟具体的工作负载有关。
但是理论上来说,不做工作窃取
的运行时可能在多核(比如4核及以上)的情况下要表现好于有工作窃取
功能的运行时,这是因为工作窃取
需要在线程之间同步,而多核的情况下同步会影响性能的(不同CPU核之间传递数据需要加锁的啦),所以Pingora
提供了不做工作窃取
的运行时。
值得注意的是,我这里之所以说4核及以上是基于字节跳动的异步运行时monoio
的性能测试报告,所以我说的不一定准,仅做参考,虽然monoio
用的底层调度机制不是epoll
而是io_ring
(似乎也搞epoll???没仔细研究过),但我觉得还是有一定的参考价值的。
默认运行时的创建比较简单,非工作窃取
的运行时我没仔细看,所以就不深入运行时的创建了。
start_service
最后就是服务的启动,启动没有太多可讲的,就是通过运行时的spawn
方法创建一个异步任务调用Service
的start_service
方法,至于start_serverice
具体做了啥,且待下回分解。
总结
Pingora
有两个核心对象, Server
和Service
, 前者控制整个程序的生命周期,后者负责具体的业务逻辑,比如代理服务。
参考链接
- https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md
- https://github.com/cloudflare/pingora
- https://www.gnu.org/software/libc/manual/html_node/Termination-Signals.html
- https://github.com/bytedance/monoio/blob/master/docs/zh/benchmark.md