写在文章开头
我们都知道解决c10k
问题的最好方案就是通过在io多路复用
的基础上通过reactor
模型实现高性能的网络并发程序,借助这个设计,redis的主线程也是基于io多路复用
以reactor
模型的思路实现了一个高性能的单线程内存数据,本文将带领读者从源码的角度来查看redis
关于reactor
模型的设计。
详解redis中的reactor模型
reactor模型扫盲
在此之前我们先来了解一下reactor
模型,在高性能网络并发程序的设计中,reactor
模型通过reactor
接收用户连接事件、读事件、写事件这些网络事件,得到连接事件之后通过acceptor
为其分配handler
,后续的这些客户端的读写事件都会交由handler
完成读写事件的处理,由此实现尽可能少的线程处理尽可能多的连接。
详解reactor的实现
上文我们简单的对reactor
模型进行了简单的扫盲,接下来我们将从redis
的源码来了解redis
对于reactor
模型的实现,我们都知道reactor
模型是通过reactor接收连接、读、写三种事件的,这一点我们可以直接在main
方法看到aemain
的调用,该方法内部本质就是通过epoll模型进行非阻塞获取就的网络事件:
int main(int argc, char **argv) { //前置初始化步骤 //...... //事件循环轮询前置操作 aesetbeforesleepproc(server.el,beforesleep); //执行事件驱动框架,循环处理各种触发的事件 aemain(server.el); //事件循环后置操作 aedeleteeventloop(server.el); return 0; }
我们步入aemain
方法,可以看到只要eventloop
没有停止就会无限循环调用aeprocessevents
获取并处理就绪的事件:
void aemain(aeeventloop *eventloop) { eventloop->stop = 0; while (!eventloop->stop) { //...... //轮询并处理就绪的事件 aeprocessevents(eventloop, ae_all_events); } }
步入aeprocessevents
方法,我们就可以看到redis
通过对于epoll
的封装函数aeapipoll
非阻塞获取就绪的io事件
,注意笔者所强调的非阻塞获取,这也就是为什么redis仅仅用一个主线程即可实现reactor模型的原因所在。
int aeprocessevents(aeeventloop *eventloop, int flags) { //...... //非阻塞获取就绪事件 numevents = aeapipoll(eventloop, tvp); for (j = 0; j < numevents; j++) { //...... //处理事件 processed++; } } /* check time events */ if (flags & ae_time_events) processed += processtimeevents(eventloop); return processed; /* return the number of processed file/time events */ }
对此我们再次步入aeapipoll
实现可以看到redis
对于epoll
的调用epoll_wait
,得到事件数retval
之后,直接基于retval
遍历eventloop
的events
这里面存储的就是所有收到的事件aefiredevent
,redis
会根据其事件类型累加对应的事件mask
值,例如如果是得到的事件类型是epollin
则mask值会加上ae_readable
(1),若是标准输出事件epollout
则累加ae_writable
即2:
对应的我们给出这段基于epoll
实现reacor
的实现,可以看到其reactor
通过事件轮询获取对应的事件类型再将其封装为aefileevent
存到事件数组eventloop->fired
中:
static int aeapipoll(aeeventloop *eventloop, struct timeval *tvp) { aeapistate *state = eventloop->apidata; int retval, numevents = 0; retval = epoll_wait(state->epfd,state->events,eventloop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; //遍历事件 for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; //根据事件类型累加读写的mask值 if (e->events & epollin) mask |= ae_readable; if (e->events & epollout) mask |= ae_writable; if (e->events & epollerr) mask |= ae_writable; if (e->events & epollhup) mask |= ae_writable; //将该事件存到fired数组中 eventloop->fired[j].fd = e->data.fd; eventloop->fired[j].mask = mask; } } //返回事件数 return numevents; }
详解事件的封装
上文我们提到一个aefileevent
事件的概念,该个事件结构如下图所示,它通过mask
标记当前io事件类型,在epoll
轮询到事件时,它并通过rfileproc
读事件处理指针和wfileproc
写文件处理保存针对网络io事件
的处理函数,注意这个处理函数我们完全可以直接理解为reactor
模型中的handler
,最后用clientdata
记录客户端私有数据的指针:
typedef struct aefileevent { //记录事件读写类型,如果是读事件readable则mask+1,若是写事件writable则加2 int mask; /* one of ae_(readable|writable) */ //读事件处理器指针指向读事件处理函数handler aefileproc *rfileproc; //写事件处理器指针指向读事件处理函数handler aefileproc *wfileproc; //记录客户端私有数据指针 void *clientdata; } aefileevent;
这里我们以服务端socket
初始化阶段为例展示一下aefileevent
对应处理器的初始化过程,我们在redis
服务端启动的main
函数可以看到initserver
的调用,该方法会为当前服务端socket套接字的文件描述符绑定读事件的处理器accepttcphandler
:
对应的我们给出这一段事件绑定handler
的逻辑的核心代码段:
int main(int argc, char **argv) { //...... //server初始化,其内部会完成数据结构、键值对数据库初始化、网络框架初始化工作 initserver(); } void initserver(void) { //...... for (j = 0; j < server.ipfd_count; j++) { //为每一个监听服务端socket的读事件绑定对应的tcp处理器accepttcphandler,并将其注册到eventloop中 if (aecreatefileevent(server.el, server.ipfd[j], ae_readable, accepttcphandler,null) == ae_err) { redispanic( "unrecoverable error creating server.ipfd file event."); } } //...... }
轮询并分发到handler
上述步骤完成redis server
的事件注册之后,main
方法的aemain
函数就会通过epoll
轮询eventloop
中是否有就绪的io事件,如果redis server
的fd
的读事件就绪就会交给当前对应的读处理器完成redis
客户端初始化工作,后续redis
客户端套接字的fd
也会将读写事件注册到eventloop
中,如此一来所有的服务端和客户端socket
的读写事件都会注册到epoll
上,让epoll
作为reactor
进行轮询,然后根据读写事件分配到各自的handler
即rfileproc/wfileproc
指针所指向的函数上。
这里我们补充的一下rfileproc/wfileproc
指针指向的函数列表:
rfileproc
:如果是redis
服务端则该指针指向accepttcphandler
处理新连接,如果是客户端则指向readqueryfromclient
处理客户端的命令。wfileproc
:该指针服务端和客户端都一样,指向sendreplytoclient
用于将响应结果发送给客户端。
对应的我们给出上述描述的核心代码段,可以看到main
方法会调用aemain
开始事件轮询:
int main(int argc, char **argv) { //前置初始化步骤 //...... //事件循环轮询前置操作 aesetbeforesleepproc(server.el,beforesleep); //执行事件驱动框架,循环处理各种触发的事件 aemain(server.el); //事件循环后置操作 aedeleteeventloop(server.el); return 0; }
步入aemain
即可看到无限循环传入eventloop
查看是否有就绪的事件:
void aemain(aeeventloop *eventloop) { eventloop->stop = 0; while (!eventloop->stop) { //...... //传入eventloop查看是否有socket的事件就绪 aeprocessevents(eventloop, ae_all_events); } }
继续步入aeprocessevents
即看到轮询就绪事件、acceptor
调用accepttcphandler
分发到读写的处理器handler
上、后续客户端都会基于读写handler
完成事件处理这样一套核心的reactor
模型设计:
int aeprocessevents(aeeventloop *eventloop, int flags) { //...... //调用epoll获取所有就绪的socket的读写事件 numevents = aeapipoll(eventloop, tvp); for (j = 0; j < numevents; j++) { //获取当前事件的读写类型为mask赋值 aefileevent *fe = &eventloop->events[eventloop->fired[j].fd]; int mask = eventloop->fired[j].mask; int fd = eventloop->fired[j].fd; int rfired = 0; //如果是读事件则交给rfileproc指向的函数,可以是服务端socket的连接处理器accepttcphandler,也可能是客户端的命令处理器readqueryfromclient if (fe->mask & mask & ae_readable) { rfired = 1; fe->rfileproc(eventloop,fd,fe->clientdata,mask); } //如果是写事件则调用wfileproc指向的sendreplytoclient将结果发送给客户端 if (fe->mask & mask & ae_writable) { if (!rfired || fe->wfileproc != fe->rfileproc) fe->wfileproc(eventloop,fd,fe->clientdata,mask); } processed++; } } //...... }
小结
自此我们将redis单线程的reactor模型设计都分析完成了,希望对你有帮助。
到此这篇关于redis所实现的reactor模型的文章就介绍到这了,更多相关redis reactor模型内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论