要说明的几点
一、为了方便调试及跟踪代码所以采用了单进程的模式运行程序
二、自备一份源码
三、在阅读函数之前,我觉得要带有几个问题去看
- 事件循环什么时候开始?
- 怎么添加事件?
- 事件如何分发处理?
一、事件循环的创建
当我们启动程序,会进入下面函数,顾名思义就是处理单进程循环的函数
void ngx_single_process_cycle(ngx_cycle_t *cycle) { ngx_uint_t i; if (ngx_set_environment(cycle, null) == null) { /* fatal */ exit(2); } //调用每个模块的初始化 for (i = 0; cycle->modules[i]; i++) { if (cycle->modules[i]->init_process) { if (cycle->modules[i]->init_process(cycle) == ngx_error) { /* fatal */ exit(2); } } } for ( ;; ) { ngx_log_debug0(ngx_log_debug_event, cycle->log, 0, "worker cycle"); //处理定时器事件及网络事件 ngx_process_events_and_timers(cycle); ... } }
在调用模块的初始化时就会调用ngx_event_core_module模块的ngx_event_process_init函数
ngx_event_process_init实现如下,隐藏部分不需要的代码过程
static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) { ... for (m = 0; cycle->modules[m]; m++) { //只有ngx_epoll_module是ngx_event_module类型,epoll模块 if (cycle->modules[m]->type != ngx_event_module) { continue; } if (cycle->modules[m]->ctx_index != ecf->use) { continue; } module = cycle->modules[m]->ctx; //初始化epoll ,创建epoll if (module->actions.init(cycle, ngx_timer_resolution) != ngx_ok) { /* fatal */ exit(2); } break; } }
当类型是ngx_event_module的模块只有ngx_epoll_module和ngx_event_core_module,而只有ngx_epoll_module模块的actions.init接口不为空,因此会调用ngx_epoll_init,在该函数里面掉了epoll_create 创建了epoll,并将值赋给全局的ep
static int ep = -1;
二、连接的创建
2.1 创建管理连接及其读写事件的数组
依然在ngx_event_process_init函数中,数组大小为配置文件中 worker_connections 的值
static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) { ... cycle->connections = ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log); if (cycle->connections == null) { return ngx_error; } c = cycle->connections; //分配读事件 cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->read_events == null) { return ngx_error; } rev = cycle->read_events; for (i = 0; i < cycle->connection_n; i++) { rev[i].closed = 1; rev[i].instance = 1; } //分配写事件 cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log); if (cycle->write_events == null) { return ngx_error; } wev = cycle->write_events; for (i = 0; i < cycle->connection_n; i++) { wev[i].closed = 1; } }
2.2 创建一个空闲连接的链表
依然在ngx_event_process_init函数中,表示没有使用的连接
static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) { ... do { i--; c[i].data = next; c[i].read = &cycle->read_events[i]; c[i].write = &cycle->write_events[i]; c[i].fd = (ngx_socket_t) -1; next = &c[i]; } while (i); cycle->free_connections = next; cycle->free_connection_n = cycle->connection_n; ... }
此时数据结构如下
2.3 遍历所有的cycle->listening元素,并为其添加读事件
static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) { ... ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { ... rev->handler = (c->type == sock_stream) ? ngx_event_accept : ngx_event_recvmsg; ... if (ngx_add_event(rev, ngx_read_event, 0) == ngx_error) { return ngx_error; } ... } ... }
这里特别的说明下cycle->listening在哪里设置?设置了什么信息?后面会有用。
三、http服务器端口的设置
在进入事件循环之前,nginx会先对配置文件做解析处理,也就是ngx_conf_parse中,当解析到http配置项时,会调用ngx_http_module模块的配置项处理函数ngx_http_block,在其中做了很多http相关的设置
3.1 cycle->listening元素的添加
函数调用过程如下:
ngx_http_block -> ngx_http_add_listening ->ngx_create_listening
最终调用ngx_create_listening
static ngx_listening_t * ngx_http_add_listening(ngx_conf_t *cf, ngx_http_conf_addr_t *addr) { ngx_listening_t *ls; ... ls = ngx_create_listening(cf, addr->opt.sockaddr, addr->opt.socklen); if (ls == null) { return null; } ls->addr_ntop = 1; //设置ngx_listening_t的 接口handler ls->handler = ngx_http_init_connection; ... }
该函数做了2件事
- 将监听信息放入cycle->listening数组
- 设置ngx_listening_t的 接口handler为ngx_http_init_connection(后面会用到)
下面是ngx_create_listening 函数,用来说明1
ngx_listening_t * ngx_create_listening(ngx_conf_t *cf, struct sockaddr *sockaddr, socklen_t socklen) { ... ls = ngx_array_push(&cf->cycle->listening); ... }
3.2 开启了服务器端口的监听
调用了ngx_open_listening_sockets函数,函数调用过程如下:
ngx_init_cycle -> ngx_open_listening_sockets
看到bind 和listen就应该很快的反应过来这是一个服务器端的常用代码,所以是在这里开启了服务器端口的监听
ngx_int_t ngx_open_listening_sockets(ngx_cycle_t *cycle) { ... for (i = 0; i < cycle->listening.nelts; i++) { if (bind(s, ls[i].sockaddr, ls[i].socklen) == -1) { ... } if (listen(s, ls[i].backlog) == -1) { ... } } ... }
至此,服务器监听设置已经完成了,同时每个需要监听的端口到放入了cycle->listening。
我们再回到2.3,epoll开始将监听端口的连接放入了epoll管理,当客户端来连接请求时,就会触发epoll读事件,这里要注意的是,其中rev代表的是nginx定义的事件类。
rev->handler = (c->type == sock_stream) ? ngx_event_accept: ngx_event_recvmsg;
四、执行事件循环
让我们继续回到ngx_single_process_cycle函数,当模块调用完init_process后,开始进入了一个死循环的过程,显然这里应该会是一个事件循环入口,而我们使用epoll,那代码最终肯定会定格到 epoll_wait,那接下来的目的就是找到这个代码段,继续往下看代码
for ( ;; ) { ngx_process_events_and_timers(cycle); }
ngx_process_events_and_timers函数如下:其中ngx_process_events 就是ngx_epoll_process_events函数
void ngx_process_events_and_timers(ngx_cycle_t *cycle) { ... //epoll 等待 (void) ngx_process_events(cycle, timer, flags); delta = ngx_current_msec - delta; ngx_log_debug1(ngx_log_debug_event, cycle->log, 0, "timer delta: %m", delta); ngx_event_process_posted(cycle, &ngx_posted_accept_events); if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } ngx_event_expire_timers(); ngx_event_process_posted(cycle, &ngx_posted_events); }
ngx_epoll_process_events函数如下,去掉了大部分跟逻辑无关的代码,做了以下工作
- epoll_wait等待所有io事件来临
- 遍历触发的事件列表,调用其handler接口,在这里因为我们是单进程,固没有进程锁,flags 始终为0
static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { events = epoll_wait(ep, event_list, (int) nevents, timer); ... for (i = 0; i < events; i++) { c = event_list[i].data.ptr; instance = (uintptr_t) c & 1; c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); rev = c->read; revents = event_list[i].events; if ((revents & epollin) && rev->active) { rev->ready = 1; rev->available = -1; if (flags & ngx_post_events) { queue = rev->accept ? &ngx_posted_accept_events : &ngx_posted_events; ngx_post_event(rev, queue); } else { rev->handler(rev); } } wev = c->write; if ((revents & epollout) && wev->active) { if (flags & ngx_post_events) { ngx_post_event(wev, &ngx_posted_events); } else { wev->handler(wev); } } } return ngx_ok; }
这就很明显了,epoll事件循环体就在此,根据事件的触发方式来区分读写事件,因为flags为0,所以这里对事件的处理都是调用了nginx定义的事件结构体ngx_event_t的handler接口,而不会调用ngx_post_event,因此接下来根据handler接口实现的不同来说明连接的建立过程及读写事件的处理
五、客户端的连接的接入
先引入3.2节中描述的代码,其中 c->type 就是sock_stream ,在其他地方赋值了,不在过多描述
rev->handler = (c->type == sock_stream) ? ngx_event_accept: ngx_event_recvmsg;
所以调用了ngx_event_accept,那该函数做了哪些事?下面贴出一段代码来说明(忽略一些socket 的配置代码)
void ngx_event_accept(ngx_event_t *ev) { ngx_listening_t *ls; ngx_connection_t *c, *lc; ... lc = ev->data; ls = lc->listening; ... c = ngx_get_connection(s, ev->log); ... c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->log = log; c->pool->log = log; c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->local_socklen = ls->socklen; ... if (ngx_add_conn && (ngx_event_flags & ngx_use_epoll_event) == 0) { if (ngx_add_conn(c) == ngx_error) { ngx_close_accepted_connection(c); return; } } ls->handler(c); ... }
5.1 取出一个未使用的连接,并将该连接纳入epoll去管理
从连接池中拿出一个连接,在调用ngx_add_conn 宏,此时该连接已经将socket的描述符fd设置进去了其中ngx_add_conn 定义如下
#define ngx_add_conn ngx_event_actions.add_conn
而ngx_event_actions是一个全局的变量,在ngx_epoll_init函数 赋值
static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ... ngx_event_actions = ngx_epoll_module_ctx.actions; ... }
因此ngx_add_conn就是ngx_epoll_add_connection函数,就是将客户端的连接纳入epoll的管理,如下
static ngx_int_t ngx_epoll_add_connection(ngx_connection_t *c) { struct epoll_event ee; ee.events = epollin|epollout|epollet|epollrdhup; ee.data.ptr = (void *) ((uintptr_t) c | c->read->instance); ngx_log_debug2(ngx_log_debug_event, c->log, 0, "epoll add connection: fd:%d ev:%08xd", c->fd, ee.events); if (epoll_ctl(ep, epoll_ctl_add, c->fd, &ee) == -1) { ngx_log_error(ngx_log_alert, c->log, ngx_errno, "epoll_ctl(epoll_ctl_add, %d) failed", c->fd); return ngx_error; } c->read->active = 1; c->write->active = 1; return ngx_ok; }
注意看,在上面函数中,epoll事件的data已经指向了 nginx里面定义的连接类ngx_connection_t 了,这也就将epoll中的事件与nginx定义的ngx_connection_t 关联起来了
至此,epoll开始循环监听每一个连接的事件了。
5.2 设置了连接的发送和接收函数
c->recv = ngx_recv; c->send = ngx_send;
这里的ngx_recv
//recv 函数宏定义 #define ngx_recv ngx_io.recv //ngx_io为全局定义 ngx_os_io_t ngx_io; //被该结构体赋值,接收发送函数就很明显了 ngx_os_io_t ngx_os_io = { ngx_unix_recv, ngx_readv_chain, ngx_udp_unix_recv, ngx_unix_send, ngx_udp_unix_send, ngx_udp_unix_sendmsg_chain, ngx_writev_chain, 0 }; ngx_unix_recv
5.3 调用监听的连接类的接口handler
这个干了什么?有点懵。往前看 3.1节,在解析配置文件时,设置了该接口,其实就是ngx_http_init_connection函数,在其中设置了 客户端连接类的读写事件接口分别是ngx_http_wait_request_handler 以及ngx_http_empty_handler,就不展开了
void ngx_http_init_connection(ngx_connection_t *c) { rev = c->read; rev->handler = ngx_http_wait_request_handler; c->write->handler = ngx_http_empty_handler; }
总结
一:解析配置文件,确认需要监听的端口(配置信息),也就是需要listen几个fd,同时设置该监听端口的handler接口,将需要listen的端口信息放入cycle->listen,并设置了监听接口体的接口为ngx_http_init_connection
二:调用epoll模块的接口,初始化及创建epoll
三:将需要监听的端口(一种描述的)纳入epoll管理,同时设置其回调接口ngx_event_accept
四:当检测到客户端的连接调用ngx_event_accept,在该函数中将客户端的连接又纳入epoll管理(其实这是epoll模型的一种常用的写法),同时调用了ngx_http_init_connection函数,该函数设置了客户端连接的读写事件处理接口handler
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论