golang fasthttp 你为何如此优秀!
由于对服务接入层网关入口的诉求,fasthttp开始进入了我的实现,本想着直接用用就好,但是看大部分人都说fasthttp比go官方的net/http包性能高上10多倍,由于好奇害死猫的传统美德,于是我就被一步一步拐进来,然后就开始具体分析fasthttp的网络模型和http协议的解析和组装和代理转发http请求,这篇文章由于太长不利于阅读,会分为两篇文章来分享,下一篇会分享fasthttp解析http请求协议和组装http响应协议,方便我日后去用好它和拾遗。
fasthttp网络模型
传统的网络模型,。。。。。。。,网上一大堆大家自己搜一下
直接讲fasthttp是怎么处理网络请求的。
1. Server listen地址和端口,死循环处理客户端连接的请求,acceptConn监听客户端的连接请求
2. 当请求连接到来的时候,Server会进入workpool的 wp.Serve
处理逻辑中
3. 不是来一个连接就去处理一个请求,处理连接的请求会有一个并发的限制,默认是 256 * 1024,这个数值就是workpool中的workchan的数量
4. 请求处理之前,先要去workpool中获取workchan,只有获取到workchan信道后,才能去处理请求,否则返回客户端请求达到限制
5. 当一个请求从workpool获取workchan后,就会去开启一个worker goroutine 去处理用户的请求,main协程会把conn通过workchan信道传递给worker协程,这样就可以并发处理多个请求
简而言之,就是处理并发请求的数量通过workchan的数量来控制,如果能从workpool中获取workchan,开启一个work goroutine 去处理用户请求,然后循环在去监听下一个客户端连接请求
上面是说它怎么去处理用户请求的,那它性能优秀的地方有哪些呢
1. 整个逻辑中,用内存最多的4个地方都用了对象池
* ctxPool ,
requestCtx, 存储http请求数据和http响应数据
* readerPool, bufio.reader
读取用户请求conn,存储用户请求数据
* writePool, bufio.write 响应
用户请求conn,存储响应给用户的数据
* workerpool 获取workchan也用了对象池,还有个ready切片,用来存放归还workchan,ready有个优化点
// workerPool serves incoming connections via a pool of workers
// in FILO order, i.e. the most recently stopped worker will serve the next
// incoming connection.
// Such a scheme keeps CPU caches hot (in theory).
大致意思,就是利用CPU缓存的热点数据,尽可能用最近用户的workchan,着用可能直接命中CPU缓存,提高性能
2. 在读写conn数据的时候,用到了官方包bufio.reader wiriter, 为了在读写conn数据时,加一个缓存区,减少多次对conn IO带来的性能消耗
请求处理完之后做了什么
1. 归还requestCtx,reader buf,write buf 对象,并且把不用的栈变量对象赋值为nil, 方便下次GC回收没有引用的对象
这里要分一下场景
短连接,服务端是短连接模式,处理完请求后,不会主动关闭连接,而是返回响应头connection: close,让客户端去处理关闭链接,这样可以让服务端减少timewait状态端口,归还workchand清理资源。
长连接, 服务端在读取完一个连接中的一个请求数据后,下次会read conn,此时这个连接客户端没有发请求过来的时候,会阻塞直至有请求到来,如果有自定义readtimeout时间的话,会返回超时错误,归还workchand清理资源。
总结
fasthttp优秀的点感觉很多,但是由于自己了解的程度和叙述的能力,总感觉不能讲解的很全面和清晰。只能叙述到这里,本篇完结。
代码太刚,所以最后列一下它的网络模型主要相关的代码
// Server implements HTTP server.
//
// Default Server settings should satisfy the majority of Server users.
// Adjust Server settings only if you really understand the consequences.
//
// It is forbidden copying Server instances. Create new Server instances
// instead.
//
// It is safe to call Server methods from concurrently running goroutines.
type Server struct {
noCopy noCopy //nolint:unused,structcheck
// Handler for processing incoming requests.
//
// Take into account that no `panic` recovery is done by `fasthttp` (thus any `panic` will take down the entire server).
// Instead the user should use `recover` to handle these situations.
// 业务处理请求执行的handler
Handler RequestHandler
// ErrorHandler for returning a response in case of an error while receiving or parsing the request.
//
// The following is a non-exhaustive list of errors that can be expected as argument:
// * io.EOF
// * io.ErrUnexpectedEOF
// * ErrGetOnly
// * ErrSmallBuffer
// * ErrBodyTooLarge
// * ErrBrokenChunks
// 当读取conn数据的时出错,执行的handler
ErrorHandler func(ctx *RequestCtx, err error)
// HeaderReceived is called after receiving the header
//
// non zero RequestConfig field values will overwrite the default configs
HeaderReceived func(header *RequestHeader) RequestConfig
// Server name for sending in response headers.
//
// Default server name is used if left blank.
Name string
// The maximum number of concurrent connections the server may serve.
//
// DefaultConcurrency is used if not set.
//
// Concurrency only works if you either call Serve once, or only ServeConn multiple times.
// It works with ListenAndServe as well.
// 处理的请求的并发数
Concurrency int
// Whether to disable keep-alive connections.
//
// The server will close all the incoming connections after sending
// the first response to client if this option is set to true.
//
// By default keep-alive connections are enabled.
// 服务端控制是否与客户端建立长连接,如果true的话,响应头connection: close, 否则就是keep-alive
DisableKeepalive bool
// Per-connection buffer size for requests' reading.
// This also limits the maximum header size.
//
// Increase this buffer if your clients send multi-KB RequestURIs
// and/or multi-KB headers (for example, BIG cookies).
//
// Default buffer size is used if not set.
// 服务端读取conn请求数据的用的bufio read缓存结构,需要定义一个buf的大小,如果没有定义就用默认的4KB
ReadBufferSize int
// Per-connection buffer size for responses' writing.
//
// Default buffer size is used if not set.
// 服务端写数据的用的bufio write缓存结构,需要定义一个buf的大小,如果没有定义就用默认的4KB
WriteBufferSize int
// ReadTimeout is the amount of time allowed to read
// the full request including body. The connection's read
// deadline is reset when the connection opens, or for
// keep-alive connections after the first byte has been read.
//
// By default request read timeout is unlimited.
// 服务端,read的超时时间,如果没有请求,会read conn 阻塞到ReadTimeout时间然后返回io/timeout, 默认0不超时
ReadTimeout time.Duration
// WriteTimeout is the maximum duration before timing out
// writes of the response. It is reset after the request handler
// has returned.
//
// By default response write timeout is unlimited.
// 服务端,write的超时时间,会write conn 阻塞到WriteTimeout时间然后返回io/timeout, 默认0不超时
WriteTimeout time.Duration
// IdleTimeout is the maximum amount of time to wait for the
// next request when keep-alive is enabled. If IdleTimeout
// is zero, the value of ReadTimeout is used.
// 长连接模式中,read的超时时间,优先于ReadTimeout
IdleTimeout time.Duration
// Maximum number of concurrent client connections allowed per IP.
//
// By default unlimited number of concurrent connections
// may be established to the server from a single IP address.
MaxConnsPerIP int
// Maximum number of requests served per connection.
//
// The server closes connection after the last request.
// 'Connection: close' header is added to the last response.
//
// By default unlimited number of requests may be served per connection.
MaxRequestsPerConn int
// MaxKeepaliveDuration is a no-op and only left here for backwards compatibility.
// Deprecated: Use IdleTimeout instead.
MaxKeepaliveDuration time.Duration
// Whether to enable tcp keep-alive connections.
//
// Whether the operating system should send tcp keep-alive messages on the tcp connection.
//
// By default tcp keep-alive connections are disabled.
// 启用TCP保活
TCPKeepalive bool
// Period between tcp keep-alive messages.
//
// TCP keep-alive period is determined by operation system by default.
// TCP保活周期
TCPKeepalivePeriod time.Duration
// Maximum request body size.
//
// The server rejects requests with bodies exceeding this limit.
//
// Request body size is limited by DefaultMaxRequestBodySize by default.
// 请求体的大小限制,如果是大文件上传的话这里要改大
MaxRequestBodySize int
// Aggressively reduces memory usage at the cost of higher CPU usage
// if set to true.
//
// Try enabling this option only if the server consumes too much memory
// serving mostly idle keep-alive connections. This may reduce memory
// usage by more than 50%.
//
// Aggressive memory usage reduction is disabled by default.
// 减少内存使用,复用分配的内存
ReduceMemoryUsage bool
// Rejects all non-GET requests if set to true.
//
// This option is useful as anti-DoS protection for servers
// accepting only GET requests. The request size is limited
// by ReadBufferSize if GetOnly is set.
//
// Server accepts all the requests by default.
GetOnly bool
// Will not pre parse Multipart Form data if set to true.
//
// This option is useful for servers that desire to treat
// multipart form data as a binary blob, or choose when to parse the data.
//
// Server pre parses multipart form data by default.
// 是否禁止提前解析 Content-Type: multipart/form-data 的请求
DisablePreParseMultipartForm bool
// Logs all errors, including the most frequent
// 'connection reset by peer', 'broken pipe' and 'connection timeout'
// errors. Such errors are common in production serving real-world
// clients.
//
// By default the most frequent errors such as
// 'connection reset by peer', 'broken pipe' and 'connection timeout'
// are suppressed in order to limit output log traffic.
LogAllErrors bool
// Header names are passed as-is without normalization
// if this option is set.
//
// Disabled header names' normalization may be useful only for proxying
// incoming requests to other servers expecting case-sensitive
// header names. See https://github.com/valyala/fasthttp/issues/57
// for details.
//
// By default request and response header names are normalized, i.e.
// The first letter and the first letters following dashes
// are uppercased, while all the other letters are lowercased.
// Examples:
//
// * HOST -> Host
// * content-type -> Content-Type
// * cONTENT-lenGTH -> Content-Length
DisableHeaderNamesNormalizing bool
// SleepWhenConcurrencyLimitsExceeded is a duration to be slept of if
// the concurrency limit in exceeded (default [when is 0]: don't sleep
// and accept new connections immidiatelly).
// 当达到服务处理的并发限制时,触发服务器sleep,的时长
SleepWhenConcurrencyLimitsExceeded time.Duration
// NoDefaultServerHeader, when set to true, causes the default Server header
// to be excluded from the Response.
//
// The default Server header value is the value of the Name field or an
// internal default value in its absence. With this option set to true,
// the only time a Server header will be sent is if a non-zero length
// value is explicitly provided during a request.
NoDefaultServerHeader bool
// NoDefaultDate, when set to true, causes the default Date
// header to be excluded from the Response.
//
// The default Date header value is the current date value. When
// set to true, the Date will not be present.
NoDefaultDate bool
// NoDefaultContentType, when set to true, causes the default Content-Type
// header to be excluded from the Response.
//
// The default Content-Type header value is the internal default value. When
// set to true, the Content-Type will not be present.
NoDefaultContentType bool
// ConnState specifies an optional callback function that is
// called when a client connection changes state. See the
// ConnState type and associated constants for details.
ConnState func(net.Conn, ConnState)
// Logger, which is used by RequestCtx.Logger().
//
// By default standard logger from log package is used.
Logger Logger
// KeepHijackedConns is an opt-in disable of connection
// close by fasthttp after connections' HijackHandler returns.
// This allows to save goroutines, e.g. when fasthttp used to upgrade
// http connections to WS and connection goes to another handler,
// which will close it when needed.
KeepHijackedConns bool
tlsConfig *tls.Config
nextProtos map[string]ServeHandler
concurrency uint32
concurrencyCh chan struct{}
perIPConnCounter perIPConnCounter
serverName atomic.Value
// RequestCtx对象池
ctxPool sync.Pool
// bufio.reader 对象池
readerPool sync.Pool
// bufio.write 对象池
writerPool sync.Pool
hijackConnPool sync.Pool
// We need to know our listeners so we can close them in Shutdown().
ln []net.Listener
mu sync.Mutex
open int32
stop int32
done chan struct{}
}
// workerPool serves incoming connections via a pool of workers
// in FILO order, i.e. the most recently stopped worker will serve the next
// incoming connection.
//
// Such a scheme keeps CPU caches hot (in theory).
// workerChan对象池
type workerPool struct {
// Function for serving server connections.
// It must leave c unclosed.
WorkerFunc ServeHandler
MaxWorkersCount int
LogAllErrors bool
MaxIdleWorkerDuration time.Duration
Logger Logger
lock sync.Mutex
workersCount int
mustStop bool
ready []*workerChan
stopCh chan struct{}
workerChanPool sync.Pool
connState func(net.Conn, ConnState)
}
// Serve serves incoming connections from the given listener.
//
// Serve blocks until the given listener returns permanent error.
func (s *Server) Serve(ln net.Listener) error {
var lastOverflowErrorTime time.Time
var lastPerIPErrorTime time.Time
var c net.Conn
var err error
maxWorkersCount := s.getConcurrency()
s.mu.Lock()
{
s.ln = append(s.ln, ln)
if s.done == nil {
s.done = make(chan struct{})
}
if s.concurrencyCh == nil {
s.concurrencyCh = make(chan struct{}, maxWorkersCount)
}
}
s.mu.Unlock()
wp := &workerPool{
WorkerFunc: s.serveConn,
MaxWorkersCount: maxWorkersCount,
LogAllErrors: s.LogAllErrors,
Logger: s.logger(),
connState: s.setState,
}
wp.Start()
// Count our waiting to accept a connection as an open connection.
// This way we can't get into any weird state where just after accepting
// a connection Shutdown is called which reads open as 0 because it isn't
// incremented yet.
atomic.AddInt32(&s.open, 1)
defer atomic.AddInt32(&s.open, -1)
for {
if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
wp.Stop()
if err == io.EOF {
return nil
}
return err
}
s.setState(c, StateNew)
atomic.AddInt32(&s.open, 1)
if !wp.Serve(c) {
atomic.AddInt32(&s.open, -1)
s.writeFastError(c, StatusServiceUnavailable,
"The connection cannot be served because Server.Concurrency limit exceeded")
c.Close()
s.setState(c, StateClosed)
if time.Since(lastOverflowErrorTime) > time.Minute {
s.logger().Printf("The incoming connection cannot be served, because %d concurrent connections are served. "+
"Try increasing Server.Concurrency", maxWorkersCount)
lastOverflowErrorTime = time.Now()
}
// The current server reached concurrency limit,
// so give other concurrently running servers a chance
// accepting incoming connections on the same address.
//
// There is a hope other servers didn't reach their
// concurrency limits yet :)
//
// See also: https://github.com/valyala/fasthttp/pull/485#discussion_r239994990
if s.SleepWhenConcurrencyLimitsExceeded > 0 {
time.Sleep(s.SleepWhenConcurrencyLimitsExceeded)
}
}
c = nil
}
}
推荐阅读
站长 polarisxu
自己的原创文章
不限于 Go 技术
职场和创业经验
Go语言中文网
每天为你
分享 Go 知识
Go爱好者值得关注