当前位置: 代码网 > it编程>前端脚本>Golang > 如何解析golang中Context在HTTP服务中的角色

如何解析golang中Context在HTTP服务中的角色

2024年05月15日 Golang 我要评论
问题背景在go语言的http服务中,我们常常会使用到context来取消一个请求,或者取消数据的读取。偶然的一次尝试,让我对context有了一定的兴趣。接下来本文围绕下面的例子,分析http如何利用

问题背景

在go语言的http服务中,我们常常会使用到context来取消一个请求,或者取消数据的读取。偶然的一次尝试,让我对context有了一定的兴趣。

接下来本文围绕下面的例子,分析http如何利用context来控制请求的取消和影响数据读取。

例子

我们开启一个http服务,发送大量数据给每个请求,代码如下:

srv.go:http服务

package main

import (
	"fmt"
	"net/http"
)

func hello(w http.responsewriter, r *http.request) {
	for i := 0; i < 100*10000; i++ {
		w.write([]byte("hello world"))
	}
}

func main() {
	fmt.println("listening 8888:")
	http.handlefunc("/hello", hello)
	_ = http.listenandserve(":8888", nil)
}

client.go: 发送请求的客户端

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"net/http"
	"time"
)

func main() {

	client := http.client{}
	request, err := http.newrequest(http.methodpost, "http://127.0.0.1:8888/hello", nil)
	ctx, cancelfunc := context.withcancel(request.context())
	request = request.withcontext(ctx)
	if err != nil {
		return
	}
	response, err := client.do(request)
	if err != nil {
		log.fatal(err)
	}
	cache := make([]byte, 128)
	timer := time.newtimer(time.millisecond)
	go func() {
		select {
		case <-timer.c:
			cancelfunc()
		}
	}()
	for {
		read, err := response.body.read(cache)
		if err == nil {
			fmt.println(string(cache[:read]))
			continue
		}
		if err == io.eof {
			fmt.println(string(cache[:read]))
			break
		}
		log.fatal(err)
	}

}

代码很简单,就不做注释啦。分别启动服务和client,我们将得到如下结果:

我们看到这句话process finished with the exit code 1,程序非正常退出,那么首先是追踪这个错误,下面我们追踪这个错误。

错误追踪

首先清楚这个“context canceled” 是客户端打印出来的:

log.fatal(err)
// 这个错误来源于读取response中的数据时得到错误,而且这个错误非io.eof错误

断点入口:

read, err := response.body.read(cache)

我们会进入transport.go文件中:

func (es *bodyeofsignal) read(p []byte) (n int, err error) { // 这里表明我们读取的body是bodyeofsignal类型
	es.mu.lock()
	closed, rerr := es.closed, es.rerr
	es.mu.unlock()
	if closed {
		return 0, errreadonclosedresbody
	}
	if rerr != nil {
		return 0, rerr
	}

	n, err = es.body.read(p)// 我们在这里读到了错误,这里是什么错误,在后面将会介绍
	if err != nil {
		es.mu.lock()
		defer es.mu.unlock()
		if es.rerr == nil {
			es.rerr = err
		}
		err = es.condfn(err) // 通过这个方法对错误进行判别,得到上层传下来的错误信息
	}
	return
}

然后我们继续进入到bodyeofsignal的condfn(error)函数中:

func (es *bodyeofsignal) condfn(err error) error {
	if es.fn == nil {
		return err //1
	}
	err = es.fn(err) // 如果fn不为空,这里会继续到bodyeofsignal去得到上层的错误信息;fn为空,显然错误和上层就没有关系,就在上面1处就返回了。除此,因为client从这个body读的数据,这里的错误是通过fn从上层获取。
	es.fn = nil
	return err
}

那我们继续到es.fn(err)中一探究竟:

body := &bodyeofsignal{
			body: resp.body,
			earlyclosefn: func() error {
				waitforbodyread <- false
				<-eofc // will be closed by deferred call at the end of the function
				return nil

			},
			fn: func(err error) error {// 就到了这里,这一段代码源自transport.go中的封装内部类persistconn的方法readloop,顾名思义:循环读取
			// 这里会简单的皮判断错误是不是io.eof,然后作进一步处理
				iseof := err == io.eof
				waitforbodyread <- iseof
				if iseof {
					<-eofc // see comment above eofc declaration
				} else if err != nil {
					if cerr := pc.canceled(); cerr != nil {// 继续调试我们就到了这里,显然不是io.eof错误
						return cerr // 返回的是pc.canceled()
					}
				}
				return err
			},
		}

继续到pc.canceled()中:

func (pc *persistconn) canceled() error {
	pc.mu.lock()
	defer pc.mu.unlock()
	return pc.cancelederr // 返回的这个错误,那么下一步便需要知道这个cancelederr是什么?如何被赋值?
}

1. 是什么?

cancelederr          error // set non-nil if conn is canceled 
//是一种错误,且如果非空,则连接被取消,那么这个错误是一个连接状态的标志或者连接断开的原因

2. 如何被赋值?

根据cancelederr,我们找被赋值的函数如下:

func (pc *persistconn) cancelrequest(err error) {
	pc.mu.lock()
	defer pc.mu.unlock()
	pc.cancelederr = err // 在这里被赋值
	pc.closelocked(errrequestcanceled)
}

错误追踪先到这里。接下来我们换一个角度,我们从context的角度来看。

context

这里就不讲context了,有兴趣的伙伴去官网获取吧!!!回到客户端代码,给request传入了一个withcancel context,看看这个函数做了什么:

func withcancel(parent context) (ctx context, cancel cancelfunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	c := newcancelctx(parent) // 包装父类context
	propagatecancel(parent, &c)
	return &c, func() { 
		c.cancel(true, canceled) // 返回一个取消函数
	}
}

进入到c.cancel(),会发现canceled作为一个错误类型,定义如下:

// canceled is the error returned by context.err when the context is canceled.
var canceled = errors.new("context canceled")// 这个不是客户端打印的吗?是不是很激动,找到了错误信息的祖宗
...
//而cancel函数定义如下:
// cancel closes c.done, cancels each of c's children, and, if
// removefromparent is true, removes c from its parent's children.
func (c *cancelctx) cancel(removefromparent bool, err error) {
	...
	c.err = err //这里做了一个赋值,即把这个错误传给cancelctx了,它是context的一个内部类
	...
	// 做一些子context的通知以及错误的传递,说取消了,不用干了
}

context先到这里,在context里找到了错误信息的来源,接下来看看错误是如何传给前面我们谈到的cancelederr。

似乎还有一个入口没有看,就是http.client.do的方法:

我们打断点进入到roundtrip方法的调用入口,看看下面是如何感知context被取消:

resp, err = rt.roundtrip(req) //这个在send()方法内部调用

...

// send issues an http request.
// caller should close resp.body when done reading from it.
func send(ireq *request, rt roundtripper, deadline time.time) (resp *response, didtimeout func() bool, err error) {
	...
	resp, err = rt.roundtrip(req) 
	...
}

然后跟着roundtrip(…), 进入到:

func (t *transport) roundtrip(req *request) (*response, error) {
	...
	var resp *response
		if pconn.alt != nil {
			// http/2 path.
			t.setreqcanceler(cancelkey, nil) // not cancelable with cancelrequest
			resp, err = pconn.alt.roundtrip(req)
		} else {
			resp, err = pconn.roundtrip(treq) // 继续可到这里,我们看看这个pconn,刚好就是前面提到的persistconn,它里面包含了cancelederr,似乎我们离真相更近了
		}
}

进入到persistconn的实现方法roundtrip(),我们看看这个for循环:

var respheadertimer <-chan time.time
cancelchan := req.request.cancel
ctxdonechan := req.context().done() //这个request是setrequestcancel(req *request, rt roundtripper, deadline time.time)中重新定义的request,里实现了超时取消的机制,这里的监听便是超时的监听,并不是我们取消的监听
pcclosed := pc.closech
canceled := false
for {
		testhookwaitresloop()
		select { // select开启对channel的轮询
		case err := <-writeerrch:
			if debugroundtrip {
				req.logf("writeerrch resv: %t/%#v", err, err)
			}
			if err != nil {
				pc.close(fmt.errorf("write error: %v", err))
				return nil, pc.maproundtriperror(req, startbyteswritten, err)
			}
			if d := pc.t.responseheadertimeout; d > 0 {
				if debugroundtrip {
					req.logf("starting timer for %v", d)
				}
				timer := time.newtimer(d)
				defer timer.stop() // prevent leaks
				respheadertimer = timer.c
			}
		case <-pcclosed:
			pcclosed = nil
			if canceled || pc.t.replacereqcanceler(req.cancelkey, nil) {
				if debugroundtrip {
					req.logf("closech recv: %t %#v", pc.closed, pc.closed)
				}
				return nil, pc.maproundtriperror(req, startbyteswritten, pc.closed)
			}
		case <-respheadertimer:
			if debugroundtrip {
				req.logf("timeout waiting for response headers.")
			}
			pc.close(errtimeout)
			return nil, errtimeout
		case re := <-resc:
			if (re.res == nil) == (re.err == nil) {
				panic(fmt.sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
			}
			if debugroundtrip {
				req.logf("resc recv: %p, %t/%#v", re.res, re.err, re.err)
			}
			if re.err != nil {
				return nil, pc.maproundtriperror(req, startbyteswritten, re.err)
			}
			return re.res, nil
		case <-cancelchan:
			canceled = pc.t.cancelrequest(req.cancelkey, errrequestcanceled)
			cancelchan = nil
		case <-ctxdonechan:
			canceled = pc.t.cancelrequest(req.cancelkey, req.context().err())
			cancelchan = nil
			ctxdonechan = nil
		}
	}

因而这里的监听不是在客户端取消的context的监听,根据客户端的输出显示,表明请求已经发送到服务端,请求并未超时,response也返回了,那么这里的函数监听是与我们读取数据没有联系。

小编最开始也以为是在这里监听返回,然而这里打断点,怎么进不来。

在前面提到,连接是类型为persistconn,其次是读取数据过程中,context的取消会产生影响,那么表明错误发生在tcp连接中的读取数据。

接下来,根据连接建立过程,看看http做了什么?其次是真正的数据读取来自哪里?

pconn, err := t.getconn(treq, cm)
...
func (t *transport) getconn(treq *transportrequest, cm connectmethod) (pc *persistconn, err error) {
	req := treq.request
	trace := treq.trace
	ctx := req.context() //这里去了request的context
	w := &wantconn{
			cm:         cm,
			key:        cm.key(),
			ctx:        ctx, //传给w
			ready:      make(chan struct{}, 1),
			beforedial: testhookprependingdial,
			afterdial:  testhookpostpendingdial,
		}
	...
	
	select{
	case <-w.ready:
		if w.err != nil {
				// if the request has been canceled, that's probably
				// what caused w.err; if so, prefer to return the
				// cancellation error (see golang.org/issue/16049).
				//如果建立连接前,请求被取消,这里会监听到取消的err
				select {
				case <-req.cancel:
					return nil, errrequestcanceledconn
				case <-req.context().done():
					return nil, req.context().err()
				case err := <-cancelc:
					if err == errrequestcanceled {
						err = errrequestcanceledconn
					}
					return nil, err
				default:
					// return below
				}
			}
	return w.pc, w.err//这里返回的是persistconn
		...	

通过这个w建立连接,进入到dialconn(ctx context.context, cm connectmethod) (pconn *persistconn, err error)。 在这里面开启了一个协程pconn.readloop(),读取连接里面的数据。

(t *transport) dialconn(ctx context.context, cm connectmethod) (pconn *persistconn, err error) {
	...
	go pconn.readloop()
}

因为错误与数据读取有直接联系,至少错误发生readloop中的某一个地方:

for alive {
		...

		var resp *response
		if err == nil {
			resp, err = pc.readresponse(rc, trace) // 得到response
		} else {
			err = transportreadfromservererror{err}
			closeerr = err
		}
		...

		waitforbodyread := make(chan bool, 2)
		body := &bodyeofsignal{ //对上面读取的resp.body进行封装,这里封装主要是传递请求取消的错误
			body: resp.body,
			earlyclosefn: func() error {
				waitforbodyread <- false
				<-eofc // will be closed by deferred call at the end of the function
				return nil

			},
			fn: func(err error) error {// 
				iseof := err == io.eof
				waitforbodyread <- iseof
				if iseof {
					<-eofc // see comment above eofc declaration
				} else if err != nil {
					if cerr := pc.canceled(); cerr != nil {
						return cerr
					}
				}
				return err
			},
		}

		resp.body = body
		...

		// before looping back to the top of this function and peeking on
		// the bufio.reader, wait for the caller goroutine to finish
		// reading the response body. (or for cancellation or death)
		// 这里有开启监听,显然是监听读的过程中发生的取消和超时等
		select {
		case bodyeof := <-waitforbodyread:
			replaced := pc.t.replacereqcanceler(rc.cancelkey, nil) // before pc might return to idle pool
			alive = alive &&
				bodyeof &&
				!pc.saweof &&
				pc.wroterequest() &&
				replaced && tryputidleconn(trace)
			if bodyeof {
				eofc <- struct{}{}
			}
		case <-rc.req.cancel:
			alive = false
			pc.t.cancelrequest(rc.req)
		case <-rc.req.context().done(): //这里便监听了客户顿context的取消
			alive = false //结束循环
			pc.t.cancelrequest(rc.cancelkey, rc.req.context().err())//传递err
		case <-pc.closech:
			alive = false
		}

		testhookreadloopbeforenextread()
	}

熟悉context的便知道,当我们调用context的cancel方法时,在前面的context的cancel()方法中有如下代码:

	d, _ := c.done.load().(chan struct{}) // 拿到done方法的返回值channel
	if d == nil {
		c.done.store(closedchan)
	} else {
		close(d)// 关闭channel,而关闭时会向channel写入值
	}

再回到:

ccase <-rc.req.context().done():// 当contex取消,便进入这个代码块
			alive = false
			pc.t.cancelrequest(rc.cancelkey, rc.req.context().err())

进入到cancelrequest(…)的rc.req.context().err()

func (c *cancelctx) err() error {
	c.mu.lock()
	err := c.err//这里似曾相识,前面我们说到context调用取消函数时,会给c.err赋值为cancelerr
	c.mu.unlock()
	return err
}

因而传入cancelrequest的err便是cancelerr,我们进入cancelrequest:

func (t *transport) cancelrequest(key cancelkey, err error) bool {
	// this function must not return until the cancel func has completed.
	// see: https://golang.org/issue/34658
	t.reqmu.lock()
	defer t.reqmu.unlock()
	cancel := t.reqcanceler[key]// 这里的key正是我们传入的请求的cancelkey,拿到reqcanceler中的func(error)
	delete(t.reqcanceler, key)
	if cancel != nil {
		cancel(err) // 进入cancel
	}

	return cancel != nil
}

进入cancel(err):

func (pc *persistconn) cancelrequest(err error) {//这个函数不正是我们前面追踪错误所看见的,这也表明我们追踪是正确的
	pc.mu.lock()
	defer pc.mu.unlock()
	pc.cancelederr = err 
	pc.closelocked(errrequestcanceled)
}

到这里我们的err就传给了body bodyeofsignal,整个错误传递流程便走通了。

还剩最后一个问题,bodyeofsignal的read函数中n, err = es.body.read§ 所遇到的错误是什么?

n, err = es.body.read(p)// 调试发现是网络连接关闭错误,这里表明我们执行完err的传递根本原因在于连接被关闭
	if err != nil {
		es.mu.lock()
		defer es.mu.unlock()
		if es.rerr == nil {
			es.rerr = err
		}
		err = es.condfn(err)
	}
	return

那么关闭连接又是在哪里呢?

我们回到cancelrequest函数:

pc.closelocked(errrequestcanceled) //这里便关闭了连接

这样err整个传递逻辑和原因便都走同通了!

总结

经过上面的分析,将整个context取消过程总结如下:

1.当创建一个带有取消的context,会把context的内部类中的err变量赋值为cancelerr;

2.客户端的调用cancelfunc,会向context的done所绑定的channel写入值;

3.当channel写入值后,transport.go中的readloop方法会监听这个channel的写入,从而把context取消的err传给persistconn,并关闭连接;

4.关闭连接后,数据读取便会遇到连接关闭的网络错误错误,当遇到这个错误,在bodysignal中进行错误处理,这里并不感知连接的关闭,只利用fn分别错误类型,当错误为io.eof,直接将这个错误置为nil,若不是,便通过bodysignal获取到连接中的错误,再返回这个错误;

5.最后通过body.read()方法将错误打印出来。

6.这里复杂在于,每个角色只做自己的工作,遇到错误不是直接返回,而是等待其他角色来读取错误;具体表现为:context负责生成错误消息、传递取消指令给persistconn;persistconn基于bodysignal建立读取数据和连接的关联,响应context的取消并关闭连接,拿到context的错误信息;client读取数据和错误;bodysignal:分析错误,并传递数据和persistconn的错误消息给client。 

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com