Nginx(二): worker 进程处理流程框架解析

JAVA烂猪皮

共 52272字,需浏览 105分钟

 ·

2021-01-17 13:21

走过路过不要错过

点击蓝字关注我们


Nginx 启动起来之后,会有几个进程运行:1. master 进程接收用户命令并做出响应; 2. worker 进程负责处理各网络事件,并同时接收来自master的处理协调命令;

master 主要是一控制命令,我们后面再说,而worker则是处理的nginx的核心任务,请求转发、反向代理、负载均衡等工作。所以我们先来啃啃worker这块硬骨头吧!

0. worker 主循环

worker 的启动是被master 操作的,作为一个 fork 出来的进程,它拥有和master一样的内存数据信息。但它的活动范围相对较小,所以它并不会替代master的位置。

// unix/ngx_process_cycle.cvoidngx_master_process_cycle(ngx_cycle_t *cycle){    char              *title;    u_char            *p;    size_t             size;    ngx_int_t          i;    ngx_uint_t         sigio;    sigset_t           set;    struct itimerval   itv;    ngx_uint_t         live;    ngx_msec_t         delay;    ngx_core_conf_t   *ccf;
sigemptyset(&set); sigaddset(&set, SIGCHLD); sigaddset(&set, SIGALRM); sigaddset(&set, SIGIO); sigaddset(&set, SIGINT); sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_NOACCEPT_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_TERMINATE_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_CHANGEBIN_SIGNAL));
if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "sigprocmask() failed"); }
sigemptyset(&set);

size = sizeof(master_process);
for (i = 0; i < ngx_argc; i++) { size += ngx_strlen(ngx_argv[i]) + 1; }
title = ngx_pnalloc(cycle->pool, size); if (title == NULL) { /* fatal */ exit(2); }
p = ngx_cpymem(title, master_process, sizeof(master_process) - 1); for (i = 0; i < ngx_argc; i++) { *p++ = ' '; p = ngx_cpystrn(p, (u_char *) ngx_argv[i], size); }
ngx_setproctitle(title);

ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); // 启动之后会主动启动 worker 进程 ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN); ngx_start_cache_manager_processes(cycle, 0);
ngx_new_binary = 0; delay = 0; sigio = 0; live = 1;
for ( ;; ) { if (delay) { if (ngx_sigalrm) { sigio = 0; delay *= 2; ngx_sigalrm = 0; }
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "termination cycle: %M", delay);
itv.it_interval.tv_sec = 0; itv.it_interval.tv_usec = 0; itv.it_value.tv_sec = delay / 1000; itv.it_value.tv_usec = (delay % 1000 ) * 1000;
if (setitimer(ITIMER_REAL, &itv, NULL) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setitimer() failed"); } }
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "sigsuspend");
sigsuspend(&set);
ngx_time_update();
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "wake up, sigio %i", sigio);
if (ngx_reap) { ngx_reap = 0; ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "reap children");
live = ngx_reap_children(cycle); }
if (!live && (ngx_terminate || ngx_quit)) { ngx_master_process_exit(cycle); }
if (ngx_terminate) { if (delay == 0) { delay = 50; }
if (sigio) { sigio--; continue; }
sigio = ccf->worker_processes + 2 /* cache processes */;
if (delay > 1000) { ngx_signal_worker_processes(cycle, SIGKILL); } else { ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_TERMINATE_SIGNAL)); }
continue; }
if (ngx_quit) { ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); ngx_close_listening_sockets(cycle);
continue; }
if (ngx_reconfigure) { ngx_reconfigure = 0;
if (ngx_new_binary) { ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN); ngx_start_cache_manager_processes(cycle, 0); ngx_noaccepting = 0;
continue; }
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reconfiguring");
cycle = ngx_init_cycle(cycle); if (cycle == NULL) { cycle = (ngx_cycle_t *) ngx_cycle; continue; }
ngx_cycle = cycle; ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); // 收到reconfig命令时,重启worker 进程 ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_JUST_RESPAWN); ngx_start_cache_manager_processes(cycle, 1);
/* allow new processes to start */ ngx_msleep(100);
live = 1; ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); }
if (ngx_restart) { ngx_restart = 0; // 收到重启命令时,传递消息给 worker ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN); ngx_start_cache_manager_processes(cycle, 0); live = 1; }
if (ngx_reopen) { ngx_reopen = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs"); ngx_reopen_files(cycle, ccf->user); ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_REOPEN_SIGNAL)); }
if (ngx_change_binary) { ngx_change_binary = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "changing binary"); ngx_new_binary = ngx_exec_new_binary(cycle, ngx_argv); }
if (ngx_noaccept) { ngx_noaccept = 0; ngx_noaccepting = 1; ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); } }}

static voidngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type){ ngx_int_t i; ngx_channel_t ch;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes");
ngx_memzero(&ch, sizeof(ngx_channel_t));
ch.command = NGX_CMD_OPEN_CHANNEL; // n 代表worker的进程数, 在 nginx.conf 中配置 for (i = 0; i < n; i++) { // 依次启动 worker 进程,实际上就是通过fork进行子进程启动的 ngx_spawn_process(cycle, ngx_worker_process_cycle, (void *) (intptr_t) i, "worker process", type);
ch.pid = ngx_processes[ngx_process_slot].pid; ch.slot = ngx_process_slot; ch.fd = ngx_processes[ngx_process_slot].channel[0];
ngx_pass_open_channel(cycle, &ch); }}
ngx_pid_tngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn){ u_long on; ngx_pid_t pid; ngx_int_t s;
if (respawn >= 0) { s = respawn;
} else { for (s = 0; s < ngx_last_process; s++) { if (ngx_processes[s].pid == -1) { break; } }
if (s == NGX_MAX_PROCESSES) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "no more than %d processes can be spawned", NGX_MAX_PROCESSES); return NGX_INVALID_PID; } }

if (respawn != NGX_PROCESS_DETACHED) {
/* Solaris 9 still has no AF_LOCAL */
if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "socketpair() failed while spawning \"%s\"", name); return NGX_INVALID_PID; }
ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0, "channel %d:%d", ngx_processes[s].channel[0], ngx_processes[s].channel[1]);
if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; }
if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; }
on = 1; if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "ioctl(FIOASYNC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; }
if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(F_SETOWN) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; }
if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; }
if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; }
ngx_channel = ngx_processes[s].channel[1];
} else { ngx_processes[s].channel[0] = -1; ngx_processes[s].channel[1] = -1; }
ngx_process_slot = s;
// fork 出子进程出来 pid = fork();
switch (pid) {
case -1: ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fork() failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID;
case 0: ngx_parent = ngx_pid; ngx_pid = ngx_getpid(); // 子进程将调用传入的处理方法,worker 则会进入循环处理事件逻辑中 // 即 ngx_worker_process_cycle 循环 proc(cycle, data); break;
default: break; }
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);
ngx_processes[s].pid = pid; ngx_processes[s].exited = 0;
if (respawn >= 0) { return pid; }
ngx_processes[s].proc = proc; ngx_processes[s].data = data; ngx_processes[s].name = name; ngx_processes[s].exiting = 0;
switch (respawn) {
case NGX_PROCESS_NORESPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break;
case NGX_PROCESS_JUST_SPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break;
case NGX_PROCESS_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break;
case NGX_PROCESS_JUST_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break;
case NGX_PROCESS_DETACHED: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 1; break; }
if (s == ngx_last_process) { ngx_last_process++; }
return pid;}

// os/unix/ngx_process_cycle.c// worker 主循环服务static voidngx_worker_process_cycle(ngx_cycle_t *cycle, void *data){ ngx_int_t worker = (intptr_t) data;
ngx_process = NGX_PROCESS_WORKER; ngx_worker = worker;
ngx_worker_process_init(cycle, worker); // 进程标题 worker process ngx_setproctitle("worker process"); // 死循环处理 worker 事务 for ( ;; ) { // 大部分逻辑在接受 master 传递过来折命令 if (ngx_exiting) { if (ngx_event_no_timers_left() == NGX_OK) { ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting"); ngx_worker_process_exit(cycle); } }
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle"); // 这是其核心任务,检测事件、处理事件 ngx_process_events_and_timers(cycle);
// 大部分逻辑在接受 master 传递过来折命令 if (ngx_terminate) { ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting"); ngx_worker_process_exit(cycle); } // 退出事件 if (ngx_quit) { ngx_quit = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "gracefully shutting down"); ngx_setproctitle("worker process is shutting down");
if (!ngx_exiting) { ngx_exiting = 1; ngx_set_shutdown_timer(cycle); ngx_close_listening_sockets(cycle); ngx_close_idle_connections(cycle); } } // reopen 事件 if (ngx_reopen) { ngx_reopen = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs"); ngx_reopen_files(cycle, -1); } }}

上面就是nginx worker的主要功能体现, 使用一个死循环提供服务. 有很多是接口master命令进行响应的逻辑, 咱们忽略其对master命令的响应,观其业务核心: ngx_process_events_and_timers .

// event/ngx_event.c// nginx worker 处理io事件和超时队列流程voidngx_process_events_and_timers(ngx_cycle_t *cycle){    ngx_uint_t  flags;    ngx_msec_t  timer, delta;
if (ngx_timer_resolution) { timer = NGX_TIMER_INFINITE; flags = 0;
} else { // 获取timer timer = ngx_event_find_timer(); flags = NGX_UPDATE_TIME;
#if (NGX_WIN32)
/* handle signals from master in case of network inactivity */
if (timer == NGX_TIMER_INFINITE || timer > 500) { timer = 500; }
#endif } // 使用锁进行 tcp 监听 // 该锁基于 shm 实现,多进程共享内存 if (ngx_use_accept_mutex) { // disabled 用于优化监听锁竞争,直到 ngx_accept_disabled 小于0 if (ngx_accept_disabled > 0) { ngx_accept_disabled--;
} else { // 通过 shm 获取一个进程锁,没抢到锁则直接返回了 // 获取到accept锁之后,其会注册 read 事件监听,所以,当其返回后,则意味着数据就绪 if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; } // 获取到锁,设置 flags if (ngx_accept_mutex_held) { flags |= NGX_POST_EVENTS;
} else { if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { timer = ngx_accept_mutex_delay; } } } } // post 事件队列不为空,则触发事件处理 if (!ngx_queue_empty(&ngx_posted_next_events)) { ngx_event_move_posted_next(cycle); timer = 0; }
delta = ngx_current_msec; // 处理事件 ngx_event_actions.process_events, 将会进行阻塞等待 // 此处的 ngx_event_actions 由系统决定如何初始化,如 linux 下 // 使用 event/modules/ngx_epoll_module.c 中的定义 ngx_event_actions = ngx_epoll_module_ctx.actions; // 而其他系统则另外决定, 总体来说可能有以下几种可能 // ngx_devpoll_module_ctx.actions; // ngx_epoll_module_ctx.actions; // ngx_eventport_module_ctx.actions; // ngx_iocp_module_ctx.actions; // ngx_kqueue_module_ctx.actions; // ngx_select_module_ctx.actions; // ngx_poll_module_ctx.actions; /** * 其定义样例如下: static ngx_event_module_t ngx_select_module_ctx = { &select_name, NULL, /* create configuration */ ngx_select_init_conf, /* init configuration */
{ ngx_select_add_event, /* add an event */ ngx_select_del_event, /* delete an event */ ngx_select_add_event, /* enable an event */ ngx_select_del_event, /* disable an event */ NULL, /* add an connection */ NULL, /* delete an connection */ NULL, /* trigger a notify */ ngx_select_process_events, /* process the events */ ngx_select_init, /* init the events */ ngx_select_done /* done the events */ }
}; */ (void) ngx_process_events(cycle, timer, flags); // 计算耗时 delta = ngx_current_msec - delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "timer delta: %M", delta); // 处理 posted 事件,它存放在 ngx_posted_accept_events 队列中 ngx_event_process_posted(cycle, &ngx_posted_accept_events); // 处理完事件后,释放锁 if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } // 处理超时的任务 if (delta) { ngx_event_expire_timers(); } // 读写事件将会被添加到 ngx_posted_events 队列中 ngx_event_process_posted(cycle, &ngx_posted_events);}

以上也就是nginx worker的主要功能框架了:

    1. 先通过shm获取tcp的监听锁, 避免socket惊群;
    2. 获取到锁的worker进程, 将会注册accept的read事件,没有抢到锁的进程不会立即返回,因为他还可以继续处理其他事件,以及在之前被监听到的socket(此处io事件处理决定了worker不会进行空转);
    3. 如果有 ngx_posted_next_events 队列, 则先处理其队列请求;
    4. 根据系统类型调用网络io模块, select 机制接收io事件;
    5. 接入accept事件后, 释放accept锁(基于shm);
    6. 处理过期超时队列;
    7. 处理普通的已接入的socket的读写事件;

一次处理往往只会处理部分事件, 比如可能只是处理了 accept, read 则需要在下一次或n次之后才会处理, 这也是异步机制非阻塞的体现.

1.worker 时序图

下面我先给到一个整个worker的工作时序图, 以便有个整体的认知.

接下来我们从几个点依次简单看看 nginx 是如何处理各细节的.

2. 获取accept锁及注册accept事件

由于nginx是基于多进程实现的并发处理, 那么各进程必然都需要监听相同的端口数据, 如果没有锁控制, 则当有事件到达时, 必然导致各进程同时被唤醒, 即所谓的惊群. 所以, nginx 提供了一个锁机制, 使同一时刻只有一个进程在监听某端口, 从而避免竞争.  实现方式是基于共享内存 shm 实现.(如果是多线程方式会更简单哟)

// event/ngx_event_accept.cngx_int_tngx_trylock_accept_mutex(ngx_cycle_t *cycle){    // 首先获取shm锁, 通过 shm 实现进程数据共享    if (ngx_shmtx_trylock(&ngx_accept_mutex)) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex locked"); // 如果上一次就是自己执行的accept操作, 则直接返回 // 否则需要重新注册accept监听 if (ngx_accept_mutex_held && ngx_accept_events == 0) { return NGX_OK; } // 注册 accept 事件 if (ngx_enable_accept_events(cycle) == NGX_ERROR) { ngx_shmtx_unlock(&ngx_accept_mutex); return NGX_ERROR; }
ngx_accept_events = 0; ngx_accept_mutex_held = 1;
return NGX_OK; }
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex lock failed: %ui", ngx_accept_mutex_held);
if (ngx_accept_mutex_held) { // 如果没有获取到锁,则将之前注册的 accept 事件取消,避免惊群 if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) { return NGX_ERROR; }
ngx_accept_mutex_held = 0; } // 不管有没有获取到锁, 都会执行后续的逻辑, 因为除了 accept 外, 还有read/write事件需要处理 return NGX_OK;}// core/ngx_shmtx.c, 获取锁,锁的值为当前进程idngx_uint_tngx_shmtx_trylock(ngx_shmtx_t *mtx){ return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));}// 注册 accept 事件监听// event/ngx_event_accept.cngx_int_tngx_enable_accept_events(ngx_cycle_t *cycle){ ngx_uint_t i; ngx_listening_t *ls; ngx_connection_t *c;
ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) {
c = ls[i].connection;
if (c == NULL || c->read->active) { continue; } // 注册accept事件,READ ? // 交由 ngx_event_actions.add 处理, 实际运行由系统决定, 如 ngx_select_add_event if (ngx_add_event(c->read, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; } }
return NGX_OK;}
// event/module/ngx_select_module.c// 注册一个 io 事件监听, fd_setstatic ngx_int_tngx_select_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags){ ngx_connection_t *c;
c = ev->data;
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, "select add event fd:%d ev:%i", c->fd, event);
if (ev->index != NGX_INVALID_INDEX) { ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "select event fd:%d ev:%i is already set", c->fd, event); return NGX_OK; }
if ((event == NGX_READ_EVENT && ev->write) || (event == NGX_WRITE_EVENT && !ev->write)) { ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "invalid select %s event fd:%d ev:%i", ev->write ? "write" : "read", c->fd, event); return NGX_ERROR; }
if (event == NGX_READ_EVENT) { FD_SET(c->fd, &master_read_fd_set);
} else if (event == NGX_WRITE_EVENT) { FD_SET(c->fd, &master_write_fd_set); }
if (max_fd != -1 && max_fd < c->fd) { max_fd = c->fd; }
ev->active = 1;
event_index[nevents] = ev; ev->index = nevents; nevents++;
return NGX_OK;}

主要就是shm的应用,以及fd_set处理。

3. 通用处理队列实现

在 ngx_process_events_and_timers 中, 我们看到, 在io事件返回之后, 都会多次进行队列处理. 它们的不同仅在于 队列不同. 那么, 它是如何实现这个处理过程的呢?

我们分两块来看这事: 1. 队列的数据结构; 2. 执行队列任务; so... 就这样呗.

// 1. 队列数据结构// 额, 两个循环嵌套的指针就是其结构了typedef struct ngx_queue_s  ngx_queue_t;struct ngx_queue_s {    ngx_queue_t  *prev;    ngx_queue_t  *next;};// 实际上, 此处还会有一个强制类型转换 ngx_event_ttypedef struct ngx_event_s           ngx_event_t;struct ngx_event_s {    void            *data;
unsigned write:1;
unsigned accept:1;
/* used to detect the stale events in kqueue and epoll */ unsigned instance:1;
/* * the event was passed or would be passed to a kernel; * in aio mode - operation was posted. */ unsigned active:1;
unsigned disabled:1;
/* the ready event; in aio mode 0 means that no operation can be posted */ unsigned ready:1;
unsigned oneshot:1;
/* aio operation is complete */ unsigned complete:1;
unsigned eof:1; unsigned error:1;
unsigned timedout:1; unsigned timer_set:1;
unsigned delayed:1;
unsigned deferred_accept:1;
/* the pending eof reported by kqueue, epoll or in aio chain operation */ unsigned pending_eof:1;
unsigned posted:1;
unsigned closed:1;
/* to test on worker exit */ unsigned channel:1; unsigned resolver:1;
unsigned cancelable:1;
#if (NGX_HAVE_KQUEUE) unsigned kq_vnode:1;
/* the pending errno reported by kqueue */ int kq_errno;#endif
/* * kqueue only: * accept: number of sockets that wait to be accepted * read: bytes to read when event is ready * or lowat when event is set with NGX_LOWAT_EVENT flag * write: available space in buffer when event is ready * or lowat when event is set with NGX_LOWAT_EVENT flag * * iocp: TODO * * otherwise: * accept: 1 if accept many, 0 otherwise * read: bytes to read when event is ready, -1 if not known */
int available; // 这个handler 比较重要, 它决定了本事件如何进行处理 ngx_event_handler_pt handler;

#if (NGX_HAVE_IOCP) ngx_event_ovlp_t ovlp;#endif
ngx_uint_t index;
ngx_log_t *log;
ngx_rbtree_node_t timer;
// queue 则是存放整个队列所有数据的地方 /* the posted queue */ ngx_queue_t queue;
#if 0
/* the threads support */
/* * the event thread context, we store it here * if $(CC) does not understand __thread declaration * and pthread_getspecific() is too costly */
void *thr_ctx;
#if (NGX_EVENT_T_PADDING)
/* event should not cross cache line in SMP */
uint32_t padding[NGX_EVENT_T_PADDING];#endif#endif};
// 有了数据结构支持后, 要处理队列就简单了, 只需遍历数据即可 // event/ngx_event_posted.cvoidngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted){ ngx_queue_t *q; ngx_event_t *ev;
while (!ngx_queue_empty(posted)) {
q = ngx_queue_head(posted); ev = ngx_queue_data(q, ngx_event_t, queue);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted event %p", ev); // 先删除事件,再进行处理, 这在单进程单线程下没有问题的哟 ngx_delete_posted_event(ev); // 调用 event 对应的handler 处理事件 // 所以核心在于这个 handler 的定义 ev->handler(ev); }}

以上的实现, 虽然是面向过程语言写的, 但因为有 struct 数据类型的支持, 实际上也是面向对象的概念呢.

4. io事件的监听实现

作为一个web服务器或者反向代理服务器, 其核心必然是网络io事件的处理. nginx 会根据不同的操作系统支持, 选择不同的io模型进行io事件的监听, 充分发挥系统的性能. 这也是其制胜之道吧. 具体如何确定哪种类型, 实际上可以在进行编译的时候, 获取系统变量来断定. (稍详细的说明, 见前面代码注释)

我们以 select 的实现来看看细节:

// event/module/ngx_select_module.c// io 事件监听static ngx_int_tngx_select_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,    ngx_uint_t flags){    int                ready, nready;    ngx_err_t          err;    ngx_uint_t         i, found;    ngx_event_t       *ev;    ngx_queue_t       *queue;    struct timeval     tv, *tp;    ngx_connection_t  *c;    // 获取 max_fd, 系统传值需要    if (max_fd == -1) {        for (i = 0; i < nevents; i++) {            c = event_index[i]->data;            if (max_fd < c->fd) {                max_fd = c->fd;            }        }
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "change max_fd: %i", max_fd); }
#if (NGX_DEBUG) if (cycle->log->log_level & NGX_LOG_DEBUG_ALL) { for (i = 0; i < nevents; i++) { ev = event_index[i]; c = ev->data; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "select event: fd:%d wr:%d", c->fd, ev->write); }
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "max_fd: %i", max_fd); }#endif
if (timer == NGX_TIMER_INFINITE) { tp = NULL;
} else { tv.tv_sec = (long) (timer / 1000); tv.tv_usec = (long) ((timer % 1000) * 1000); tp = &tv; }
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "select timer: %M", timer);
work_read_fd_set = master_read_fd_set; work_write_fd_set = master_write_fd_set; // 在此处交由内核进行处理网络事件,epoll 机制,至少有一个事件到来时返回 // tp 代表是否要超时退出 ready = select(max_fd + 1, &work_read_fd_set, &work_write_fd_set, NULL, tp);
err = (ready == -1) ? ngx_errno : 0;
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { // 事件结束后,先尝试更新gmtTime 时间信息 ngx_time_update(); }
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "select ready %d", ready);
if (err) { ngx_uint_t level;
if (err == NGX_EINTR) {
if (ngx_event_timer_alarm) { ngx_event_timer_alarm = 0; return NGX_OK; }
level = NGX_LOG_INFO;
} else { level = NGX_LOG_ALERT; }
ngx_log_error(level, cycle->log, err, "select() failed");
if (err == NGX_EBADF) { ngx_select_repair_fd_sets(cycle); }
return NGX_ERROR; }
if (ready == 0) { if (timer != NGX_TIMER_INFINITE) { return NGX_OK; }
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "select() returned no events without timeout"); return NGX_ERROR; }
nready = 0; // 遍历所有事件 for (i = 0; i < nevents; i++) { ev = event_index[i]; c = ev->data; found = 0; // 写事件处理 if (ev->write) { if (FD_ISSET(c->fd, &work_write_fd_set)) { found = 1; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "select write %d", c->fd); }
} // 读或accept事件 else { if (FD_ISSET(c->fd, &work_read_fd_set)) { found = 1; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "select read %d", c->fd); } } // 读写就绪事件 found 都为1 if (found) { ev->ready = 1; ev->available = -1; // 如果是 accept 事件则取 ngx_posted_accept_events 队列 // 否则取 ngx_posted_events 队列 queue = ev->accept ? &ngx_posted_accept_events : &ngx_posted_events; // 将事件插入到相应队列尾部 ngx_post_event(ev, queue); // 有效就绪事件+1 nready++; } } // 如果两个值不相等,则需要修正下 if (ready != nready) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "select ready != events: %d:%d", ready, nready);
ngx_select_repair_fd_sets(cycle); }
return NGX_OK;}

上面就是io事件的处理的了, 因为是 select 的实现, 所以调用系统的 select() 函数即可接收网络事件了. 具体能获取哪些事件, 实际上前面的工作已经决定了. 此处只是一个执行者的角色. 它是否高效, 则是取决于操作系统的io模型是否高效了. 有兴趣的同学可以看下 epoll 的实现.

5. accept 事件的处理

当系统发现有新的网络连接进来时, 会生成一个accept的事件, 给到应用. nginx 接收到accept事件后, 会放入 ngx_posted_accept_events 中, 然后调用通用队列处理方法处理队列. 此处的 handler 是 ngx_event_accept .  其核心工作就是建立新的socket连接, 以便后续读写.

// event/ngx_event_accept.c// accept 事件处理入口voidngx_event_accept(ngx_event_t *ev){    socklen_t          socklen;    ngx_err_t          err;    ngx_log_t         *log;    ngx_uint_t         level;    ngx_socket_t       s;    ngx_event_t       *rev, *wev;    ngx_sockaddr_t     sa;    ngx_listening_t   *ls;    ngx_connection_t  *c, *lc;    ngx_event_conf_t  *ecf;#if (NGX_HAVE_ACCEPT4)    static ngx_uint_t  use_accept4 = 1;#endif
if (ev->timedout) { if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) { return; }
ev->timedout = 0; } // 获取配置信息 ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);
if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) { ev->available = ecf->multi_accept; }
lc = ev->data; ls = lc->listening; ev->ready = 0;
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, "accept on %V, ready: %d", &ls->addr_text, ev->available); // 循环处理socket数据 do { socklen = sizeof(ngx_sockaddr_t);
#if (NGX_HAVE_ACCEPT4) if (use_accept4) { // 调用accept() 方法接入socket连接 s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK); } else { s = accept(lc->fd, &sa.sockaddr, &socklen); }#else s = accept(lc->fd, &sa.sockaddr, &socklen);#endif
if (s == (ngx_socket_t) -1) { err = ngx_socket_errno;
if (err == NGX_EAGAIN) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err, "accept() not ready"); return; }
level = NGX_LOG_ALERT;
if (err == NGX_ECONNABORTED) { level = NGX_LOG_ERR;
} else if (err == NGX_EMFILE || err == NGX_ENFILE) { level = NGX_LOG_CRIT; }
#if (NGX_HAVE_ACCEPT4) ngx_log_error(level, ev->log, err, use_accept4 ? "accept4() failed" : "accept() failed");
if (use_accept4 && err == NGX_ENOSYS) { use_accept4 = 0; ngx_inherited_nonblocking = 0; continue; }#else ngx_log_error(level, ev->log, err, "accept() failed");#endif
if (err == NGX_ECONNABORTED) { if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; }
if (ev->available) { continue; } }
if (err == NGX_EMFILE || err == NGX_ENFILE) { if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle, 1) != NGX_OK) { return; }
if (ngx_use_accept_mutex) { if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); ngx_accept_mutex_held = 0; }
ngx_accept_disabled = 1;
} else { ngx_add_timer(ev, ecf->accept_mutex_delay); } }
return; }
#if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1);#endif
ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; // 获取socket读写指针 c = ngx_get_connection(s, ev->log);
if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed"); }
return; }
c->type = SOCK_STREAM;
#if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_active, 1);#endif // 创建内存空间 c->pool = ngx_create_pool(ls->pool_size, ev->log); if (c->pool == NULL) { ngx_close_accepted_connection(c); return; }
if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) { socklen = sizeof(ngx_sockaddr_t); }
c->sockaddr = ngx_palloc(c->pool, socklen); if (c->sockaddr == NULL) { ngx_close_accepted_connection(c); return; }
ngx_memcpy(c->sockaddr, &sa, socklen);
log = ngx_palloc(c->pool, sizeof(ngx_log_t)); if (log == NULL) { ngx_close_accepted_connection(c); return; }
/* set a blocking mode for iocp and non-blocking mode for others */
if (ngx_inherited_nonblocking) { if (ngx_event_flags & NGX_USE_IOCP_EVENT) { if (ngx_blocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_blocking_n " failed"); ngx_close_accepted_connection(c); return; } }
} else { if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) { if (ngx_nonblocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed"); ngx_close_accepted_connection(c); return; } } }
*log = ls->log; // 创建各种上下文环境给到socket连接 c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain;
c->log = log; c->pool->log = log;
c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->local_socklen = ls->socklen;
#if (NGX_HAVE_UNIX_DOMAIN) if (c->sockaddr->sa_family == AF_UNIX) { c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;#if (NGX_SOLARIS) /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */ c->sendfile = 0;#endif }#endif
rev = c->read; wev = c->write;
wev->ready = 1;
if (ngx_event_flags & NGX_USE_IOCP_EVENT) { rev->ready = 1; }
if (ev->deferred_accept) { rev->ready = 1;#if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP) rev->available = 1;#endif }
rev->log = log; wev->log = log;
/* * TODO: MT: - ngx_atomic_fetch_add() * or protection by critical section or light mutex * * TODO: MP: - allocated in a shared memory * - ngx_atomic_fetch_add() * or protection by critical section or light mutex */
c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
#if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_handled, 1);#endif
if (ls->addr_ntop) { c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); if (c->addr_text.data == NULL) { ngx_close_accepted_connection(c); return; }
c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen, c->addr_text.data, ls->addr_text_max_len, 0); if (c->addr_text.len == 0) { ngx_close_accepted_connection(c); return; } }
#if (NGX_DEBUG) { ngx_str_t addr; u_char text[NGX_SOCKADDR_STRLEN];
ngx_debug_accepted_connection(ecf, c);
if (log->log_level & NGX_LOG_DEBUG_EVENT) { addr.data = text; addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text, NGX_SOCKADDR_STRLEN, 1);
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0, "*%uA accept: %V fd:%d", c->number, &addr, s); }
}#endif
if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) { if (ngx_add_conn(c) == NGX_ERROR) { ngx_close_accepted_connection(c); return; } }
log->data = NULL; log->handler = NULL; // 处理就绪的io事件,读写事件,此处将会转到 http 模块处理 ls->handler(c);
if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; }
} while (ev->available);}
// http/ngx_http_request.c// 初始化socket连接, 接入 http模块voidngx_http_init_connection(ngx_connection_t *c){ ngx_uint_t i; ngx_event_t *rev; struct sockaddr_in *sin; ngx_http_port_t *port; ngx_http_in_addr_t *addr; ngx_http_log_ctx_t *ctx; ngx_http_connection_t *hc;#if (NGX_HAVE_INET6) struct sockaddr_in6 *sin6; ngx_http_in6_addr_t *addr6;#endif // 分配数据内存 hc = ngx_pcalloc(c->pool, sizeof(ngx_http_connection_t)); if (hc == NULL) { ngx_http_close_connection(c); return; }
c->data = hc;
/* find the server configuration for the address:port */
port = c->listening->servers;
if (port->naddrs > 1) {
/* * there are several addresses on this port and one of them * is an "*:port" wildcard so getsockname() in ngx_http_server_addr() * is required to determine a server address */
if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) { ngx_http_close_connection(c); return; } // 根据网络类型处理 switch (c->local_sockaddr->sa_family) {
#if (NGX_HAVE_INET6) case AF_INET6: sin6 = (struct sockaddr_in6 *) c->local_sockaddr;
addr6 = port->addrs;
/* the last address is "*" */
for (i = 0; i < port->naddrs - 1; i++) { if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) { break; } }
hc->addr_conf = &addr6[i].conf;
break;#endif
default: /* AF_INET */ sin = (struct sockaddr_in *) c->local_sockaddr;
addr = port->addrs;
/* the last address is "*" */
for (i = 0; i < port->naddrs - 1; i++) { if (addr[i].addr == sin->sin_addr.s_addr) { break; } }
hc->addr_conf = &addr[i].conf;
break; }
} else {
switch (c->local_sockaddr->sa_family) {
#if (NGX_HAVE_INET6) case AF_INET6: addr6 = port->addrs; hc->addr_conf = &addr6[0].conf; break;#endif
default: /* AF_INET */ addr = port->addrs; hc->addr_conf = &addr[0].conf; break; } }
/* the default server configuration for the address:port */ hc->conf_ctx = hc->addr_conf->default_server->ctx;
ctx = ngx_palloc(c->pool, sizeof(ngx_http_log_ctx_t)); if (ctx == NULL) { ngx_http_close_connection(c); return; }
ctx->connection = c; ctx->request = NULL; ctx->current_request = NULL;
c->log->connection = c->number; // 每个http server 都有自己的日志记录控制 c->log->handler = ngx_http_log_error; c->log->data = ctx; c->log->action = "waiting for request";
c->log_error = NGX_ERROR_INFO;
rev = c->read; // 设置接收数据处理器为 ngx_http_wait_request_handler rev->handler = ngx_http_wait_request_handler; c->write->handler = ngx_http_empty_handler;
#if (NGX_HTTP_V2) if (hc->addr_conf->http2) { rev->handler = ngx_http_v2_init; }#endif
#if (NGX_HTTP_SSL) { ngx_http_ssl_srv_conf_t *sscf;
sscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_ssl_module);
if (sscf->enable || hc->addr_conf->ssl) { hc->ssl = 1; c->log->action = "SSL handshaking"; rev->handler = ngx_http_ssl_handshake; } }#endif
if (hc->addr_conf->proxy_protocol) { hc->proxy_protocol = 1; c->log->action = "reading PROXY protocol"; }
if (rev->ready) { /* the deferred accept(), iocp */
if (ngx_use_accept_mutex) { ngx_post_event(rev, &ngx_posted_events); return; }
rev->handler(rev); return; } // 将rev 放入到 ngx_event_timer_rbtree 队列中, 红黑树实现 ngx_add_timer(rev, c->listening->post_accept_timeout); // 重用 connection ngx_reusable_connection(c, 1); // 处理 读就绪事件,注册 read 监听 if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_http_close_connection(c); return; }}
// event/ngx_event.c// 通用处理: 读事件逻辑ngx_int_tngx_handle_read_event(ngx_event_t *rev, ngx_uint_t flags){ if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
/* kqueue, epoll */
if (!rev->active && !rev->ready) { if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT) == NGX_ERROR) { return NGX_ERROR; } }
return NGX_OK;
} else if (ngx_event_flags & NGX_USE_LEVEL_EVENT) {
/* select, poll, /dev/poll */ if (!rev->active && !rev->ready) { // ngx_event_actions.add, 实际为 ngx_select_add_event // 注册读事件 if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT) == NGX_ERROR) { return NGX_ERROR; }
return NGX_OK; }
if (rev->active && (rev->ready || (flags & NGX_CLOSE_EVENT))) { if (ngx_del_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT | flags) == NGX_ERROR) { return NGX_ERROR; }
return NGX_OK; }
} else if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) {
/* event ports */
if (!rev->active && !rev->ready) { if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; }
return NGX_OK; }
if (rev->oneshot && !rev->ready) { if (ngx_del_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; }
return NGX_OK; } }
/* iocp */
return NGX_OK;}

大体上就是,先调用内核的accept() 方法,接入socket, 然后调用 http 模块init handler, 注册读事件, 以便后续可以读取数据。至于什么时候会进行真正地读数据请求,则不一定。

6. read 事件处理

经过前面的accept处理,nginx会注册read事件,且会将handler设置为 ngx_http_wait_request_handler, 当数据就绪后,就会从 通用处理队列 的入口处,转到http处理模块处理 io 事件。

// http/ngx_http_request.c// 处理socket读事件static voidngx_http_wait_request_handler(ngx_event_t *rev){    u_char                    *p;    size_t                     size;    ssize_t                    n;    ngx_buf_t                 *b;    ngx_connection_t          *c;    ngx_http_connection_t     *hc;    ngx_http_core_srv_conf_t  *cscf;
c = rev->data;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http wait request handler");
if (rev->timedout) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); ngx_http_close_connection(c); return; }
if (c->close) { ngx_http_close_connection(c); return; }
hc = c->data; cscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_core_module); // 默认1024 缓冲大小 size = cscf->client_header_buffer_size;
b = c->buffer; // 首次接入时,创建初始空间 if (b == NULL) { // 创建缓冲区接收http传过来的数据 b = ngx_create_temp_buf(c->pool, size); if (b == NULL) { ngx_http_close_connection(c); return; }
c->buffer = b;
} else if (b->start == NULL) { // 缓冲冲填满,需要另外增加空间? b->start = ngx_palloc(c->pool, size); if (b->start == NULL) { ngx_http_close_connection(c); return; }
b->pos = b->start; b->last = b->start; b->end = b->last + size; } // 接收数据 n = c->recv(c, b->last, size);
if (n == NGX_AGAIN) {
if (!rev->timer_set) { ngx_add_timer(rev, c->listening->post_accept_timeout); ngx_reusable_connection(c, 1); }
if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_http_close_connection(c); return; }
/* * We are trying to not hold c->buffer's memory for an idle connection. */ // 如果还要等待更多数据,释放占有空间 if (ngx_pfree(c->pool, b->start) == NGX_OK) { b->start = NULL; }
return; }
if (n == NGX_ERROR) { ngx_http_close_connection(c); return; }
if (n == 0) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "client closed connection"); ngx_http_close_connection(c); return; }
b->last += n; // 如果配置了 proxy_pass (且匹配了模式), 则直代理逻辑 if (hc->proxy_protocol) { hc->proxy_protocol = 0;
p = ngx_proxy_protocol_read(c, b->pos, b->last);
if (p == NULL) { ngx_http_close_connection(c); return; }
b->pos = p;
if (b->pos == b->last) { c->log->action = "waiting for request"; b->pos = b->start; b->last = b->start; ngx_post_event(rev, &ngx_posted_events); return; } }
c->log->action = "reading client request line"; // 设置不可重用连接 ngx_reusable_connection(c, 0); // 创建 http 连接请求, 分配内存空, 设置下一个 handler 等等 c->data = ngx_http_create_request(c); if (c->data == NULL) { ngx_http_close_connection(c); return; } // 设置读取数据的处理器为 ngx_http_process_request_line, 以便下次使用 rev->handler = ngx_http_process_request_line; ngx_http_process_request_line(rev);}

// http/ngx_http_request.c// 读取body数据,并响应客户端static voidngx_http_process_request_line(ngx_event_t *rev){ ssize_t n; ngx_int_t rc, rv; ngx_str_t host; ngx_connection_t *c; ngx_http_request_t *r;
c = rev->data; r = c->data;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0, "http process request line");
if (rev->timedout) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); c->timedout = 1; ngx_http_close_request(r, NGX_HTTP_REQUEST_TIME_OUT); return; }
rc = NGX_AGAIN;
for ( ;; ) {
if (rc == NGX_AGAIN) { // 读取header n = ngx_http_read_request_header(r);
if (n == NGX_AGAIN || n == NGX_ERROR) { break; } } // 读取body 数据, 按照http协议解析,非常长 rc = ngx_http_parse_request_line(r, r->header_in);
if (rc == NGX_OK) {
/* the request line has been parsed successfully */
r->request_line.len = r->request_end - r->request_start; r->request_line.data = r->request_start; r->request_length = r->header_in->pos - r->request_start;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "http request line: \"%V\"", &r->request_line);
r->method_name.len = r->method_end - r->request_start + 1; r->method_name.data = r->request_line.data;
if (r->http_protocol.data) { r->http_protocol.len = r->request_end - r->http_protocol.data; } // 处理 uri, 解析路径 if (ngx_http_process_request_uri(r) != NGX_OK) { break; }
if (r->schema_end) { r->schema.len = r->schema_end - r->schema_start; r->schema.data = r->schema_start; }
if (r->host_end) {
host.len = r->host_end - r->host_start; host.data = r->host_start;
rc = ngx_http_validate_host(&host, r->pool, 0);
if (rc == NGX_DECLINED) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "client sent invalid host in request line"); ngx_http_finalize_request(r, NGX_HTTP_BAD_REQUEST); break; }
if (rc == NGX_ERROR) { ngx_http_close_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); break; }
if (ngx_http_set_virtual_server(r, &host) == NGX_ERROR) { break; }
r->headers_in.server = host; }
if (r->http_version < NGX_HTTP_VERSION_10) {
if (r->headers_in.server.len == 0 && ngx_http_set_virtual_server(r, &r->headers_in.server) == NGX_ERROR) { break; }
ngx_http_process_request(r); break; }

if (ngx_list_init(&r->headers_in.headers, r->pool, 20, sizeof(ngx_table_elt_t)) != NGX_OK) { ngx_http_close_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); break; }
c->log->action = "reading client request headers";
rev->handler = ngx_http_process_request_headers; ngx_http_process_request_headers(rev);
break; }
if (rc != NGX_AGAIN) {
/* there was error while a request line parsing */
ngx_log_error(NGX_LOG_INFO, c->log, 0, ngx_http_client_errors[rc - NGX_HTTP_CLIENT_ERROR]);
if (rc == NGX_HTTP_PARSE_INVALID_VERSION) { ngx_http_finalize_request(r, NGX_HTTP_VERSION_NOT_SUPPORTED);
} else { ngx_http_finalize_request(r, NGX_HTTP_BAD_REQUEST); }
break; }
/* NGX_AGAIN: a request line parsing is still incomplete */
if (r->header_in->pos == r->header_in->end) {
rv = ngx_http_alloc_large_header_buffer(r, 1);
if (rv == NGX_ERROR) { ngx_http_close_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); break; }
if (rv == NGX_DECLINED) { r->request_line.len = r->header_in->end - r->request_start; r->request_line.data = r->request_start;
ngx_log_error(NGX_LOG_INFO, c->log, 0, "client sent too long URI"); ngx_http_finalize_request(r, NGX_HTTP_REQUEST_URI_TOO_LARGE); break; } } } // 处理请求, 响应客户端 ngx_http_run_posted_requests(c);}
// http/ngx_http_request.c// 已经处理好的请求处理voidngx_http_run_posted_requests(ngx_connection_t *c){ ngx_http_request_t *r; ngx_http_posted_request_t *pr; // 循环处理数据,直到完成 for ( ;; ) {
if (c->destroyed) { return; }
r = c->data; pr = r->main->posted_requests;
if (pr == NULL) { return; }
r->main->posted_requests = pr->next;
r = pr->request;
ngx_http_set_log_request(c->log, r);
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "http posted request: \"%V?%V\"", &r->uri, &r->args); // 写客户端 r->write_event_handler(r); }}

以上就是一个简单视角的 http 请求的处理大体流程了。从中我们大概也理解了,nginx的处理逻辑,和我们想像的方案并没有太大差别,先读取url请求,判断是否特殊转发设置,读取body数据,如果没有特殊设置则定位到相应文件直接响应客户端。(具体如何响应,我们后续再说)

本篇主要站在一个全局的角度,整体上理解nginx的处理请求流程,希望对大家理解nginx有一定的帮助。当然有很多的细节还未厘清,敬请期待。


往期精彩推荐



腾讯、阿里、滴滴后台面试题汇总总结 — (含答案)

面试:史上最全多线程面试题 !

最新阿里内推Java后端面试题

JVM难学?那是因为你没认真看完这篇文章


END


关注作者微信公众号 —《JAVA烂猪皮》


了解更多java后端架构知识以及最新面试宝典


你点的每个好看,我都认真当成了


看完本文记得给作者点赞+在看哦~~~大家的支持,是作者源源不断出文的动力

作者:等你归去来

出处:https://www.cnblogs.com/yougewe/p/13659987.html

浏览 69
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报