golang fasthttp 你为何如此优秀!

共 14160字,需浏览 29分钟

 ·

2020-09-19 08:21

由于对服务接入层网关入口的诉求,fasthttp开始进入了我的实现,本想着直接用用就好,但是看大部分人都说fasthttp比go官方的net/http包性能高上10多倍,由于好奇害死猫的传统美德,于是我就被一步一步拐进来,然后就开始具体分析fasthttp的网络模型和http协议的解析和组装和代理转发http请求,这篇文章由于太长不利于阅读,会分为两篇文章来分享,下一篇会分享fasthttp解析http请求协议和组装http响应协议,方便我日后去用好它和拾遗。


fasthttp网络模型

    

传统的网络模型,。。。。。。。,网上一大堆大家自己搜一下

直接讲fasthttp是怎么处理网络请求的。


1. Server listen地址和端口,死循环处理客户端连接的请求,acceptConn监听客户端的连接请求

2. 当请求连接到来的时候,Server会进入workpool的 wp.Serve处理逻辑中 

3. 不是来一个连接就去处理一个请求,处理连接的请求会有一个并发的限制,默认是 256 * 1024,这个数值就是workpool中的workchan的数量

4. 请求处理之前,先要去workpool中获取workchan,只有获取到workchan信道后,才能去处理请求,否则返回客户端请求达到限制

5. 当一个请求从workpool获取workchan后,就会去开启一个worker goroutine 去处理用户的请求,main协程会把conn通过workchan信道传递给worker协程,这样就可以并发处理多个请求

简而言之,就是处理并发请求的数量通过workchan的数量来控制,如果能从workpool中获取workchan,开启一个work goroutine 去处理用户请求,然后循环在去监听下一个客户端连接请求


上面是说它怎么去处理用户请求的,那它性能优秀的地方有哪些呢


1. 整个逻辑中,用内存最多的4个地方都用了对象池

    * ctxPool ,requestCtx, 存储http请求数据和http响应数据

    * readerPool, bufio.reader 读取用户请求conn,存储用户请求数据

    * writePool, bufio.write 响应用户请求conn,存储响应给用户的数据

    * workerpool 获取workchan也用了对象池,还有个ready切片,用来存放归还workchan,ready有个优化点

// workerPool serves incoming connections via a pool of workers

// in FILO order, i.e. the   most recently stopped worker will serve the next

// incoming connection.

// Such a scheme keeps CPU caches hot (in theory).


大致意思,就是利用CPU缓存的热点数据,尽可能用最近用户的workchan,着用可能直接命中CPU缓存,提高性能

2. 在读写conn数据的时候,用到了官方包bufio.reader wiriter, 为了在读写conn数据时,加一个缓存区,减少多次对conn IO带来的性能消耗


请求处理完之后做了什么


1. 归还requestCtx,reader buf,write buf 对象,并且把不用的栈变量对象赋值为nil, 方便下次GC回收没有引用的对象


这里要分一下场景


短连接,服务端是短连接模式,处理完请求后,不会主动关闭连接,而是返回响应头connection: close,让客户端去处理关闭链接,这样可以让服务端减少timewait状态端口,归还workchand清理资源。

长连接,  服务端在读取完一个连接中的一个请求数据后,下次会read conn,此时这个连接客户端没有发请求过来的时候,会阻塞直至有请求到来,如果有自定义readtimeout时间的话,会返回超时错误,归还workchand清理资源。


总结


fasthttp优秀的点感觉很多,但是由于自己了解的程度和叙述的能力,总感觉不能讲解的很全面和清晰。只能叙述到这里,本篇完结。


代码太刚,所以最后列一下它的网络模型主要相关的代码 

// Server implements HTTP server.//// Default Server settings should satisfy the majority of Server users.// Adjust Server settings only if you really understand the consequences.//// It is forbidden copying Server instances. Create new Server instances// instead.//// It is safe to call Server methods from concurrently running goroutines.type Server struct {  noCopy noCopy //nolint:unused,structcheck
// Handler for processing incoming requests. // // Take into account that no `panic` recovery is done by `fasthttp` (thus any `panic` will take down the entire server). // Instead the user should use `recover` to handle these situations. // 业务处理请求执行的handler Handler RequestHandler
// ErrorHandler for returning a response in case of an error while receiving or parsing the request. // // The following is a non-exhaustive list of errors that can be expected as argument: // * io.EOF // * io.ErrUnexpectedEOF // * ErrGetOnly // * ErrSmallBuffer // * ErrBodyTooLarge // * ErrBrokenChunks // 当读取conn数据的时出错,执行的handler ErrorHandler func(ctx *RequestCtx, err error)
// HeaderReceived is called after receiving the header // // non zero RequestConfig field values will overwrite the default configs HeaderReceived func(header *RequestHeader) RequestConfig
// Server name for sending in response headers. // // Default server name is used if left blank. Name string
// The maximum number of concurrent connections the server may serve. // // DefaultConcurrency is used if not set. // // Concurrency only works if you either call Serve once, or only ServeConn multiple times. // It works with ListenAndServe as well. // 处理的请求的并发数 Concurrency int
// Whether to disable keep-alive connections. // // The server will close all the incoming connections after sending // the first response to client if this option is set to true. // // By default keep-alive connections are enabled. // 服务端控制是否与客户端建立长连接,如果true的话,响应头connection: close, 否则就是keep-alive DisableKeepalive bool
// Per-connection buffer size for requests' reading. // This also limits the maximum header size. // // Increase this buffer if your clients send multi-KB RequestURIs // and/or multi-KB headers (for example, BIG cookies). // // Default buffer size is used if not set. // 服务端读取conn请求数据的用的bufio read缓存结构,需要定义一个buf的大小,如果没有定义就用默认的4KB ReadBufferSize int
// Per-connection buffer size for responses' writing. // // Default buffer size is used if not set. // 服务端写数据的用的bufio write缓存结构,需要定义一个buf的大小,如果没有定义就用默认的4KB WriteBufferSize int
// ReadTimeout is the amount of time allowed to read // the full request including body. The connection's read // deadline is reset when the connection opens, or for // keep-alive connections after the first byte has been read. // // By default request read timeout is unlimited. // 服务端,read的超时时间,如果没有请求,会read conn 阻塞到ReadTimeout时间然后返回io/timeout, 默认0不超时 ReadTimeout time.Duration
// WriteTimeout is the maximum duration before timing out // writes of the response. It is reset after the request handler // has returned. // // By default response write timeout is unlimited. // 服务端,write的超时时间,会write conn 阻塞到WriteTimeout时间然后返回io/timeout, 默认0不超时 WriteTimeout time.Duration
// IdleTimeout is the maximum amount of time to wait for the // next request when keep-alive is enabled. If IdleTimeout // is zero, the value of ReadTimeout is used. // 长连接模式中,read的超时时间,优先于ReadTimeout IdleTimeout time.Duration
// Maximum number of concurrent client connections allowed per IP. // // By default unlimited number of concurrent connections // may be established to the server from a single IP address. MaxConnsPerIP int
// Maximum number of requests served per connection. // // The server closes connection after the last request. // 'Connection: close' header is added to the last response. // // By default unlimited number of requests may be served per connection. MaxRequestsPerConn int
// MaxKeepaliveDuration is a no-op and only left here for backwards compatibility. // Deprecated: Use IdleTimeout instead. MaxKeepaliveDuration time.Duration
// Whether to enable tcp keep-alive connections. // // Whether the operating system should send tcp keep-alive messages on the tcp connection. // // By default tcp keep-alive connections are disabled. // 启用TCP保活 TCPKeepalive bool
// Period between tcp keep-alive messages. // // TCP keep-alive period is determined by operation system by default. // TCP保活周期 TCPKeepalivePeriod time.Duration
// Maximum request body size. // // The server rejects requests with bodies exceeding this limit. // // Request body size is limited by DefaultMaxRequestBodySize by default. // 请求体的大小限制,如果是大文件上传的话这里要改大 MaxRequestBodySize int
// Aggressively reduces memory usage at the cost of higher CPU usage // if set to true. // // Try enabling this option only if the server consumes too much memory // serving mostly idle keep-alive connections. This may reduce memory // usage by more than 50%. // // Aggressive memory usage reduction is disabled by default. // 减少内存使用,复用分配的内存 ReduceMemoryUsage bool
// Rejects all non-GET requests if set to true. // // This option is useful as anti-DoS protection for servers // accepting only GET requests. The request size is limited // by ReadBufferSize if GetOnly is set. // // Server accepts all the requests by default. GetOnly bool
// Will not pre parse Multipart Form data if set to true. // // This option is useful for servers that desire to treat // multipart form data as a binary blob, or choose when to parse the data. // // Server pre parses multipart form data by default. // 是否禁止提前解析 Content-Type: multipart/form-data 的请求 DisablePreParseMultipartForm bool
// Logs all errors, including the most frequent // 'connection reset by peer', 'broken pipe' and 'connection timeout' // errors. Such errors are common in production serving real-world // clients. // // By default the most frequent errors such as // 'connection reset by peer', 'broken pipe' and 'connection timeout' // are suppressed in order to limit output log traffic. LogAllErrors bool
// Header names are passed as-is without normalization // if this option is set. // // Disabled header names' normalization may be useful only for proxying // incoming requests to other servers expecting case-sensitive // header names. See https://github.com/valyala/fasthttp/issues/57 // for details. // // By default request and response header names are normalized, i.e. // The first letter and the first letters following dashes // are uppercased, while all the other letters are lowercased. // Examples: // // * HOST -> Host // * content-type -> Content-Type // * cONTENT-lenGTH -> Content-Length DisableHeaderNamesNormalizing bool
// SleepWhenConcurrencyLimitsExceeded is a duration to be slept of if // the concurrency limit in exceeded (default [when is 0]: don't sleep // and accept new connections immidiatelly). // 当达到服务处理的并发限制时,触发服务器sleep,的时长 SleepWhenConcurrencyLimitsExceeded time.Duration
// NoDefaultServerHeader, when set to true, causes the default Server header // to be excluded from the Response. // // The default Server header value is the value of the Name field or an // internal default value in its absence. With this option set to true, // the only time a Server header will be sent is if a non-zero length // value is explicitly provided during a request. NoDefaultServerHeader bool
// NoDefaultDate, when set to true, causes the default Date // header to be excluded from the Response. // // The default Date header value is the current date value. When // set to true, the Date will not be present. NoDefaultDate bool
// NoDefaultContentType, when set to true, causes the default Content-Type // header to be excluded from the Response. // // The default Content-Type header value is the internal default value. When // set to true, the Content-Type will not be present. NoDefaultContentType bool
// ConnState specifies an optional callback function that is // called when a client connection changes state. See the // ConnState type and associated constants for details. ConnState func(net.Conn, ConnState)
// Logger, which is used by RequestCtx.Logger(). // // By default standard logger from log package is used. Logger Logger
// KeepHijackedConns is an opt-in disable of connection // close by fasthttp after connections' HijackHandler returns. // This allows to save goroutines, e.g. when fasthttp used to upgrade // http connections to WS and connection goes to another handler, // which will close it when needed. KeepHijackedConns bool
tlsConfig *tls.Config nextProtos map[string]ServeHandler
concurrency uint32 concurrencyCh chan struct{} perIPConnCounter perIPConnCounter serverName atomic.Value
// RequestCtx对象池 ctxPool sync.Pool // bufio.reader 对象池 readerPool sync.Pool // bufio.write 对象池 writerPool sync.Pool hijackConnPool sync.Pool
// We need to know our listeners so we can close them in Shutdown(). ln []net.Listener
mu sync.Mutex open int32 stop int32 done chan struct{}}

// workerPool serves incoming connections via a pool of workers// in FILO order, i.e. the most recently stopped worker will serve the next// incoming connection.//// Such a scheme keeps CPU caches hot (in theory).// workerChan对象池type workerPool struct { // Function for serving server connections. // It must leave c unclosed. WorkerFunc ServeHandler
MaxWorkersCount int
LogAllErrors bool
MaxIdleWorkerDuration time.Duration

Logger Logger
lock sync.Mutex workersCount int mustStop bool
ready []*workerChan
stopCh chan struct{}
workerChanPool sync.Pool
connState func(net.Conn, ConnState)} 
// Serve serves incoming connections from the given listener.//// Serve blocks until the given listener returns permanent error.func (s *Server) Serve(ln net.Listener) error { var lastOverflowErrorTime time.Time var lastPerIPErrorTime time.Time var c net.Conn var err error
maxWorkersCount := s.getConcurrency()
s.mu.Lock() { s.ln = append(s.ln, ln) if s.done == nil { s.done = make(chan struct{}) }
if s.concurrencyCh == nil { s.concurrencyCh = make(chan struct{}, maxWorkersCount) } } s.mu.Unlock()
wp := &workerPool{ WorkerFunc: s.serveConn, MaxWorkersCount: maxWorkersCount, LogAllErrors: s.LogAllErrors, Logger: s.logger(), connState: s.setState, } wp.Start()
// Count our waiting to accept a connection as an open connection. // This way we can't get into any weird state where just after accepting // a connection Shutdown is called which reads open as 0 because it isn't // incremented yet. atomic.AddInt32(&s.open, 1) defer atomic.AddInt32(&s.open, -1)
for { if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil { wp.Stop() if err == io.EOF { return nil } return err } s.setState(c, StateNew) atomic.AddInt32(&s.open, 1) if !wp.Serve(c) { atomic.AddInt32(&s.open, -1) s.writeFastError(c, StatusServiceUnavailable, "The connection cannot be served because Server.Concurrency limit exceeded") c.Close() s.setState(c, StateClosed) if time.Since(lastOverflowErrorTime) > time.Minute { s.logger().Printf("The incoming connection cannot be served, because %d concurrent connections are served. "+ "Try increasing Server.Concurrency", maxWorkersCount) lastOverflowErrorTime = time.Now() }
// The current server reached concurrency limit, // so give other concurrently running servers a chance // accepting incoming connections on the same address. // // There is a hope other servers didn't reach their // concurrency limits yet :) // // See also: https://github.com/valyala/fasthttp/pull/485#discussion_r239994990 if s.SleepWhenConcurrencyLimitsExceeded > 0 { time.Sleep(s.SleepWhenConcurrencyLimitsExceeded) } } c = nil }}





推荐阅读



学习交流 Go 语言,扫码回复「进群」即可


站长 polarisxu

自己的原创文章

不限于 Go 技术

职场和创业经验


Go语言中文网

每天为你

分享 Go 知识

Go爱好者值得关注


浏览 143
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报