处理流程对比
在进行了解fasthttp底层代码实现之前,我们先对两者处理请求的方式进行一个回顾和对比,了解完两者的基本的情况之后,再对fasthttp的实现最进一步分析。
net/http处理流程
在小许文章《图文讲透golang标准库 net/http实现原理 -- 服务端》中讲的比较详细了,这里再把大致流程整理以下,整体流程如下:
1. 将路由和对应的handler注册到一个 map 中,用做后续键值路由匹配
2. 注册完之后就是开启循环监听连接,每获取到一个连接就会创建一个 goroutine进行处理
3. 在创建好的 goroutine 里面会循环的等待接收请求数据,然后根据请求的地址去键值路由map中匹配对应的handler
4. 执行匹配到的处理器handler
net/http 的实现是一个连接新建一个 goroutine,如果在连接数非常多的时候,,每个连接都会创建一个 goroutine 就会给系统带来一定的压力。这也就造成了 net/http在处理高并发时的瓶颈。
每次来了一个连接,都要实例化一个连接对象,这谁受得了,哈哈
fasthttp处理流程
再看看fasthttp处理请求的流程:
1. 启动监听
2. 循环监听端口获取连接,建立workerpool
3. 循环尝试获取连接 net.conn,先会去 ready 队列里获取 workerchan,获取不到就会去对象池获取
4. 将获取到的的连接net.conn 发送到 workerchan 的 channel 中
5. 开启一个 goroutine 一直循环获取 workerchan 这个 channel 中的数据
6. 获取到channel中的net.conn之后就会对请求进行处理
workerchan 其实就是一个连接处理对象,这个对象里面有一个 channel 用来传递连接;每个 workerchan 在后台都会有一个 goroutine 循环获取 channel 中的连接,然后进行处理。
workerchan是在workerpool临时对象分别存取
fasthttp为什么快
fasthttp的优化主要有以下几个点:
• 连接复用,如slice中有可复用的workerchan就从ready这个slice中获取,没有可复用的就在workerchanpool创建一个,万一池子满了(默认是 256 * 1024个)就报错。
• 对于内存复用,就是大量使用了sync.pool(你知道的,sync.pool复用对象有啥好处),有人统计过,用了整整30个sync.pool,context、request对象、header、response对象都用了sync.pool ....
• 利用unsafe.pointer指针进行[]byte 和 string 转换,避免[]byte到string转换时带来的内存分配和拷贝带来的消耗 。
知道了fasthttp为什么快,接下来我们看下它是如何处理监听处理请求的,在哪些地方用到了这些特性。
底层实现
简单案例
import ( "github.com/buaazp/fasthttprouter" "github.com/valyala/fasthttp" "log" ) func main() { //创建路由 r := fasthttprouter.new() r.get("/", index) if err := fasthttp.listenandserve(":8083", r.handler); err != nil { log.fatalf("listenandserve fatal: %s", err) } } func index(ctx *fasthttp.requestctx) { ctx.writestring("hello xiaou code!") }
这个案例同样是几样代码就启动了一个服务。
创建路由、为不同的路由执行关联不同的处理函数handler,接着跟net/http一样调用 listenandserve 函数进行启动服务监听,等待请求进行处理。
workerpool结构
workerpool 对象表示 连接处理 工作池,这样可以控制连接建立后的处理方式,而不是像标准库 net/http 一样,对每个请求连接都启动一个 goroutine 处理, 内部的 ready 字段存储空闲的 workerchan 对象,workerchanpool 字段表示管理 workerchan 的对象池。
workerpool结构体如下:
type workerpool struct { //匹配请求对应的handler workerfunc servehandler //最大同时处理的请求数 maxworkerscount int logallerrors bool //最大空闲工作时间 maxidleworkerduration time.duration logger logger //互斥锁 lock sync.mutex //work数量 workerscount int muststop bool // 空闲的 workerchan ready []*workerchan //是否关闭workerpool stopch chan struct{} //sync.pool workerchan 的对象池 workerchanpool sync.pool connstate func(net.conn, connstate) }
workerfunc :这个属性挺重要的,因为给它赋值的是server.serveconn
ready:存储了空闲的workerchan
workerchanpool:是workerchan 的对象池,在sync.pool中存取临时对象,可减少内存分配
启动服务
listenandserve是启动服务监听的入口,内部的调用过程如下:
server.serve
serve方法为来自给监听到的连接提供处理服务,直到超过了最大限制(256 * 1024)才会报错。
func (s *server) serve(ln net.listener) error { //最大连接处理数 maxworkerscount := s.getconcurrency() s.mu.lock() s.ln = append(s.ln, ln) if s.done == nil { s.done = make(chan struct{}) } if s.concurrencych == nil { s.concurrencych = make(chan struct{}, maxworkerscount) } s.mu.unlock() //workerpool进行初始化 wp := &workerpool{ workerfunc: s.serveconn, maxworkerscount: maxworkerscount, logallerrors: s.logallerrors, maxidleworkerduration: s.maxidleworkerduration, logger: s.logger(), connstate: s.setstate, } //开启协程,处理协程池的清理工作 wp.start() atomic.addint32(&s.open, 1) defer atomic.addint32(&s.open, -1) for { // 阻塞等待,获取连接net.conn if c, err = acceptconn(s, ln, &lastperiperrortime); err != nil { ... return err } s.setstate(c, statenew) atomic.addint32(&s.open, 1) //处理获取到的连接net.conn if !wp.serve(c) { //未能处理,说明已达到最大worker限制 ... } c = nil } }
从上面的注释中我们可以看出 server 方法主要做了以下几件事:
1. 初始化 worker pool,并启动
2. net.listener循环接收请求
3. 将接收到的请求交给workerchan 处理
注意:这里如果超过了设定的最大连接数(默认是 256 * 1024个)就直接报错了
start开启协程池
workerpool进行初始化之后接着就调用start开启,这里主要是指定sync.pool变量workerchanpool的创建函数。
接着开启一个协程,该goroutine的目的是进行定时清理 workerpool 中的 ready 中保存的空闲 workerchan,清理频率为每 10s 启动一次。
清理规则是使用二进制搜索算法找出最近可以清理的工作者的索引
func (wp *workerpool) start() { //wp的关闭channel是否为空 if wp.stopch != nil { return } wp.stopch = make(chan struct{}) stopch := wp.stopch //指定workerchanpool的创建函数 wp.workerchanpool.new = func() interface{} { return &workerchan{ ch: make(chan net.conn, workerchancap), } } //开启协程 go func() { var scratch []*workerchan for { //清理空闲超时的 workerchan wp.clean(&scratch) select { case <-stopch: return default: // 间隔10 s time.sleep(wp.getmaxidleworkerduration()) } } }() }
开启一个清理goroutine的目的是为了避免在流量高峰创建了大量协程,之后不再使用,造成协程浪费。
清理流程是在wp.clean()方法中实现的。
接收连接
acceptconn函数通过调用net.listener的accept方法去接受连接,这里获取连接的方式跟net/http调用的其实都是一样的。
func acceptconn(s *server, ln net.listener, lastperiperrortime *time.time) (net.conn, error) { for { c, err := ln.accept() if err != nil { //err判断 ... } //校验是否net.tcpconn连接 // 校验每个ip对应的连接数 if s.maxconnsperip > 0 { pic := wrapperipconn(s, c) if pic == nil { ... continue } c = pic } return c, nil } }
获取 workerchan
func (wp *workerpool) serve(c net.conn) bool { //获取 workerchan ch := wp.getch() if ch == nil { return false } //将连接放到channel中 ch.ch <- c //返回true return true }
这里调用的getch()函数实现了获取workerchan,获取到之后将之前接受的连接net.conn放到workerchan结构体的channel通道中。
我们看下workerchan这个结构体
type workerchan struct { lastusetime time.time ch chan net.conn }
lastusetime:最后一次被使用的时间,这个值在进行清理workerchan的时候是会用到的
ch:用来传递获取到的连接net.conn,获取到连接时接收,处理请求时获取
getch方法:
func (wp *workerpool) getch() *workerchan { var ch *workerchan createworker := false wp.lock.lock() //从ready队列中拿workerchan ready := wp.ready n := len(ready) - 1 if n < 0 { if wp.workerscount < wp.maxworkerscount { createworker = true wp.workerscount++ } } else { //ready队列不为空,从队尾拿workerchan ch = ready[n] //队尾置为nil ready[n] = nil //重新将ready赋值给wp.ready wp.ready = ready[:n] } wp.lock.unlock() //ready中获取不到workerchan,则从对象池中新建一个 if ch == nil { if !createworker { return nil } vch := wp.workerchanpool.get() ch = vch.(*workerchan) //开启一个goroutine执行 go func() { //处理ch中channel中的数据 wp.workerfunc(ch) //处理完后将workerchan放回对象池 wp.workerchanpool.put(vch) }() } return ch }
getch()方法的目的就是获取workerchan,流程如下:
• 先会去 ready 空闲队列中获取 workerchan
• ready 获取不到则从对象池中创建一个新的 workerchan
• 并启动 goroutine 用来处理 channel 中的数据
workpool中的ready是一个filo的栈,每次从队尾取出workchan
处理连接
func (wp *workerpool) workerfunc(ch *workerchan) { var c net.conn var err error for c = range ch.ch { //channel的值是nil,退出 if c == nil { break } //执行请求,并处理 if err = wp.workerfunc(c); err != nil && err != errhijacked { ... } ... //将当前workerchan放入ready队列 if !wp.release(ch) { break } } wp.lock.lock() wp.workerscount-- wp.lock.unlock() }
执行流程
• 先遍历workerchan的channel,看是否有连接net.conn
• 获取到连接之后就执行workerfunc 函数处理请求
• 请求处理完之后将当前workerchan放入ready队列
workerfunc 函数实际上是 server 的 serveconn 方法
一开始开代码的时候我还没发现呢,细看了之后在server.serve()启动服务时将server.serveconn()方法赋值给了workerpool的workerfunc()。
要想了解实现的朋友可以搜下这方面的代码
func (s *server) serveconn(c net.conn) error { ... err := s.serveconn(c) ... }
里面的代码会比较多,不过里面的流程就是是获取到请求的参数,找到对应的 handler 进行请求处理,然后返回 响应给客户端。
这里的实现代码可以看到context、request对象的sync.pool实现,这里就不一一贴出来了。
总结
fasthttp和net/http在实现上还是有较大区别,通过对实现原理的分析,知道了fasthttp速度快是利用了大量sync.pool对象复用 、[]byte 和 string利用万能指针unsafe.pointer进行转换等优化技巧。
如果你的业务需要支撑较高的 qps 并且保持一致的低延迟时间,那么采用 fasthttp 是一个较好的选择。不过net/http兼容性更高,在多数情况下反而是更好的选择!
以上就是浅析go中fasthttp与net/http的性能对比及应用的详细内容,更多关于go fasthttp的资料请关注代码网其它相关文章!
发表评论