在微服务盛行的现在,内部服务通常以长连接为主,而 Go 原生 net 网络库却无法提供足够的性能和控制力,如无法感知连接状态、连接数量多导致利用率低、无法控制协程数量等。为了能够获取对于网络层的完全控制权,同时先于业务做一些探索并最终赋能业务,所以有了 netpoll
原生库,每建立一个连接会创建一个 GoRoutine ,在微服务长连接的模式下,大量长连接沉睡,会给 epoll 带来极大的负担,并且大量 GoRoutine 沉睡,占用了很多资源
通过异步回调 + Routine Pool 的方式。当连接有读写事件发生时,回调事件触发业务逻辑,将回调函数注册进 Routine Pool ,集中算力处理,上层逻辑依然保持着单线程的体验。
同步方式,直接从内核读取数据,硬件拷贝到内核,内核拷贝到内存,两次拷贝
从内核读取数据,硬件拷贝到内核,内核拷贝到内存缓冲区,两次拷贝
异步调用,从缓冲区拷贝数据到业务逻辑一次拷贝
从内核读取数据,硬件拷贝到内核,内核拷贝到内存缓冲区,两次拷贝
异步调用,使用 NoCopy API 拷贝数据到业务逻辑 无拷贝
// Reader is a collection of operations for nocopy reads.
//
// For ease of use, it is recommended to implement Reader as a blocking interface,
// rather than simply fetching the buffer.
// For example, the return of calling Next(n) should be blocked if there are fewer than n bytes, unless timeout.
// The return value is guaranteed to meet the requirements or an error will be returned.
type Reader interface {
// Next returns a slice containing the next n bytes from the buffer,
// advancing the buffer as if the bytes had been returned by Read.
//
// If there are fewer than n bytes in the buffer, Next returns will be blocked
// until data enough or an error occurs (such as a wait timeout).
//
// The slice p is only valid until the next call to the Release method.
// Next is not globally optimal, and Skip, ReadString, ReadBinary methods
// are recommended for specific scenarios.
//
// Return: len(p) must be n or 0, and p and error cannot be nil at the same time.
Next(n int) (p []byte, err error)
// Peek returns the next n bytes without advancing the reader.
// Other behavior is the same as Next.
Peek(n int) (buf []byte, err error)
// Skip the next n bytes and advance the reader, which is
// a faster implementation of Next when the next data is not used.
Skip(n int) (err error)
// Until reads until the first occurrence of delim in the input,
// returning a slice stops with delim in the input buffer.
// If Until encounters an error before finding a delimiter,
// it returns all the data in the buffer and the error itself (often ErrEOF or ErrConnClosed).
// Until returns err != nil only if line does not end in delim.
Until(delim byte) (line []byte, err error)
// ReadString is a faster implementation of Next when a string needs to be returned.
// It replaces:
//
// var p, err = Next(n)
// return string(p), err
//
ReadString(n int) (s string, err error)
// ReadBinary is a faster implementation of Next when it needs to
// return a copy of the slice that is not shared with the underlying layer.
// It replaces:
//
// var p, err = Next(n)
// var b = make([]byte, n)
// copy(b, p)
// return b, err
//
ReadBinary(n int) (p []byte, err error)
// ReadByte is a faster implementation of Next when a byte needs to be returned.
// It replaces:
//
// var p, err = Next(1)
// return p[0], err
//
ReadByte() (b byte, err error)
// Slice returns a new Reader containing the next n bytes from this reader,
// the operation is zero-copy, similar to b = p [:n].
Slice(n int) (r Reader, err error)
// Release the memory space occupied by all read slices. This method needs to be executed actively to
// recycle the memory after confirming that the previously read data is no longer in use.
// After invoking Release, the slices obtained by the method such as Next, Peek, Skip will
// become an invalid address and cannot be used anymore.
Release() (err error)
// Len returns the total length of the readable data in the reader.
Len() (length int)
}
// Writer is a collection of operations for nocopy writes.
//
// The usage of the design is a two-step operation, first apply for a section of memory,
// fill it and then submit. E.g:
//
// var buf, _ = Malloc(n)
// buf = append(buf[:0], ...)
// Flush()
//
// Note that it is not recommended to submit self-managed buffers to Writer.
// Since the writer is processed asynchronously, if the self-managed buffer is used and recycled after submission,
// it may cause inconsistent life cycle problems. Of course this is not within the scope of the design.
type Writer interface {
// Malloc returns a slice containing the next n bytes from the buffer,
// which will be written after submission(e.g. Flush).
//
// The slice p is only valid until the next submit(e.g. Flush).
// Therefore, please make sure that all data has been written into the slice before submission.
Malloc(n int) (buf []byte, err error)
// WriteString is a faster implementation of Malloc when a string needs to be written.
// It replaces:
//
// var buf, err = Malloc(len(s))
// n = copy(buf, s)
// return n, err
//
// The argument string s will be referenced based on the original address and will not be copied,
// so make sure that the string s will not be changed.
WriteString(s string) (n int, err error)
// WriteBinary is a faster implementation of Malloc when a slice needs to be written.
// It replaces:
//
// var buf, err = Malloc(len(b))
// n = copy(buf, b)
// return n, err
//
// The argument slice b will be referenced based on the original address and will not be copied,
// so make sure that the slice b will not be changed.
WriteBinary(b []byte) (n int, err error)
// WriteByte is a faster implementation of Malloc when a byte needs to be written.
// It replaces:
//
// var buf, _ = Malloc(1)
// buf[0] = b
//
WriteByte(b byte) (err error)
// WriteDirect is used to insert an additional slice of data on the current write stream.
// For example, if you plan to execute:
//
// var bufA, _ = Malloc(nA)
// WriteBinary(b)
// var bufB, _ = Malloc(nB)
//
// It can be replaced by:
//
// var buf, _ = Malloc(nA+nB)
// WriteDirect(b, nB)
//
// where buf[:nA] = bufA, buf[nA:nA+nB] = bufB.
WriteDirect(p []byte, remainCap int) error
// MallocAck will keep the first n malloc bytes and discard the rest.
// The following behavior:
//
// var buf, _ = Malloc(8)
// buf = buf[:5]
// MallocAck(5)
//
// equivalent as
// var buf, _ = Malloc(5)
//
MallocAck(n int) (err error)
// Append the argument writer to the tail of this writer and set the argument writer to nil,
// the operation is zero-copy, similar to p = append(p, w.p).
Append(w Writer) (err error)
// Flush will submit all malloc data and must confirm that the allocated bytes have been correctly assigned.
// Its behavior is equivalent to the io.Writer hat already has parameters(slice b).
Flush() (err error)
// MallocLen returns the total length of the writable data that has not yet been submitted in the writer.
MallocLen() (length int)
}
// LinkBuffer implements ReadWriter.
type LinkBuffer struct {
length int32
mallocSize int
// 头指针,位于头部,在 Release 时追上 Read 指针,并且 Release 沿途的节点
head *linkBufferNode // release head
// Read 指针,标志正在读的 Node 节点
read *linkBufferNode // read head
// Flush 指针,在 Flush 时追上 Write 指针,Read 指针最多只能读到 Flush 指针的位置
flush *linkBufferNode // malloc head
// Write 指针,标志已经写入数据的节点位置
write *linkBufferNode // malloc tail
// 缓存 mcache 的节点 Release 的时候归还
caches [][]byte // buf allocated by Next when cross-package, which should be freed when release
}
// linkBufferNode 节点定义
//
type linkBufferNode struct {
buf []byte // buffer 数据缓冲区
off int // read-offset 读偏移
malloc int // write-offset 写偏移
refer int32 // reference count 节点引用计数
readonly bool // read-only node, introduced by Refer, WriteString, WriteBinary, etc., default false
origin *linkBufferNode // the root node of the extends 引用的原始节点
next *linkBufferNode // the next node of the linked buffer 当前节点的下一个节点
}
对于 Listener 和 Connection 并不会直接触碰 Poller ,而是通过一个 FDOperator 的代理,将自己的 FD 托管,与 Poller 解耦
对于 Poller 也不会直接感知到上层,只是简单对触发了读写事件的 FD 触发相应的回调,使逻辑更加清真
// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
// FD is file descriptor, poll will bind when register.
FD int
// The FDOperator provides three operations of reading, writing, and hanging.
// The poll actively fire the FDOperator when fd changes, no check the return value of FDOperator.
// 读回调,一般用于 Listener
OnRead func(p Poll) error
// 写回调,用于 Dialer
OnWrite func(p Poll) error
// FD 卸载回调
OnHup func(p Poll) error
// The following is the required fn, which must exist when used, or directly panic.
// Fns are only called by the poll when handles connection events.
// 读回调,用于 Connection
Inputs func(vs [][]byte) (rs [][]byte)
// 读ACK回调,从内核读取了 N 字节数据以后,ACK 回调 N 字节数据告诉用户进程已经读取成功
InputAck func(n int) (err error)
// Outputs will locked if len(rs) > 0, which need unlocked by OutputAck.
// 写回调,用于 Connection
Outputs func(vs [][]byte) (rs [][]byte, supportZeroCopy bool)
// 写ACK回调,发送了 N 字节数据给内核以后,ACK 回调 N 字节数据告诉用户进程已经写入成功
OutputAck func(n int) (err error)
// poll is the registered location of the file descriptor.
// 当前 FD 所挂载
poll Poll
// private, used by operatorCache
// FDOperator 下一个缓存
next *FDOperator
state int32 // CAS: 0(unused) 1(inuse) 2(do-done)
}
// 操作 FDOperator 注册相应的监听
func (op *FDOperator) Control(event PollEvent) error
// 标记 FDOperator 正在处理 IO 事件
func (op *FDOperator) do() (can bool)
// 标记 FDOperator 处理 IO 事件结束,重新回到已使用状态
func (op *FDOperator) done()
// 将 FDOperator 从 未使用状态 置入 已使用状态
func (op *FDOperator) inuse()
// 将 FDOperator 从 已使用状态 置入 未使用状态
func (op *FDOperator) unused()
// 判断 FDOperator 状态是否是 未使用状态
func (op *FDOperator) isUnused() bool
// 重置 FDOperator
func (op *FDOperator) reset()
// 池化技术
type operatorCache struct {
locked int32
first *FDOperator // 可用的第一个 FDOperator
cache []*FDOperator // 这里 cache 的作用是优化 GC
}
func (c *operatorCache) alloc() *FDOperator {
c.lock()
if c.first == nil {
// 批量申请一堆 FDOperator
const opSize = unsafe.Sizeof(FDOperator{})
n := block4k / opSize
if n == 0 {
n = 1
}
// Must be in non-GC memory because can be referenced
// only from epoll/kqueue internals.
for i := uintptr(0); i < n; i++ {
pd := &FDOperator{}
c.cache = append(c.cache, pd)
pd.next = c.first
c.first = pd
}
}
op := c.first
c.first = op.next
c.unlock()
return op
}
当有读事件发生时,由 FDOperator 作为代理,将 字节流 读入到 Connection 的 NoCopy Buffer 中
并且进行已读确认,确认完成后,OnEvent 将会发布任务到 Routine Pool 执行。
对于 Connection 来说,只需要 提供给上层读写接口,管理自己的 Buffer ,以及在读确认时告诉 OnEvent 自己有请求进来了而已,从而维护了自身逻辑的简洁和完整
// onEvent is the collection of event processing.
// OnPrepare, OnRequest, CloseCallback share the lock processing,
// which is a CAS lock and can only be cleared by OnRequest.
type onEvent struct {
ctx context.Context
onConnectCallback atomic.Value
onRequestCallback atomic.Value
closeCallbacks atomic.Value // value is latest *callbackNode
}
// 关闭时回调
type CloseCallback func(connection Connection) error
// 在 Connection 初始化时,对 connection 注入一些参数 并创建一个 context 供 OnConnect 使用
type OnPrepare func(connection Connection) context.Context
// 在 Connection 准备就绪以后,会回调 OnConnection ,它会创建一个 context 供 OnRequest 使用
type OnConnect func(ctx context.Context, connection Connection) context.Context
// 在有读写事件时,会回调 OnRequest ,当 error 时应主动关闭 连接, 返回的 error 不会进行任何处理,同时只会有一个 OnRequest 运行,如果不关闭连接 / 不读完数据,GoRoutine 会进入死循环
type OnRequest func(ctx context.Context, connection Connection) error
每个实例都实现了 Close 方法在触发 Close 时,会触发内部属性的一系列 Close
// Close this server with deadline.
func (s *server) Close(ctx context.Context) error {
s.operator.Control(PollDetach)
s.ln.Close()
var ticker = time.NewTicker(time.Second)
defer ticker.Stop()
var hasConn bool
for {
hasConn = false
// 遍历所有连接,尝试关闭,如果关闭失败,则继续轮询关闭,直到所有连接全部安全关闭为止
s.connections.Range(func(key, value interface{}) bool {
var conn, ok = value.(gracefulExit)
if !ok || conn.isIdle() {
value.(Connection).Close()
}
hasConn = true
return true
})
if !hasConn { // all connections have been closed
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
continue
}
}
}
// onClose means close by user.
func (c *connection) onClose() error {
// 关闭连接
if c.closeBy(user) {
// If Close is called during OnPrepare, poll is not registered.
if c.operator.poll != nil {
c.operator.Control(PollDetach)
}
// 触发消除读写阻塞
c.triggerRead()
c.triggerWrite(ErrConnClosed)
c.closeCallback(true) // 设置 true 意味着需要和 OnRequest 争抢锁,确保关闭连接时,OnRequest 没有执行
return nil
}
if c.isCloseBy(poller) {
// Connection with OnRequest of nil
// relies on the user to actively close the connection to recycle resources.
c.closeCallback(true)
}
return nil
}
// closeCallback .
// It can be confirmed that closeCallback and onRequest will not be executed concurrently.
// If onRequest is still running, it will trigger closeCallback on exit.
func (c *connection) closeCallback(needLock bool) (err error) {
if needLock && !c.lock(processing) {
return nil
}
var latest = c.closeCallbacks.Load()
if latest == nil {
return nil
}
for callback := latest.(*callbackNode); callback != nil; callback = callback.pre {
callback.fn(c)
}
return nil
}
// 优雅退出接口,判定连接是否是空闲的,尽量在连接为空时,关闭连接
type gracefulExit interface {
isIdle() (yes bool)
Close() (err error)
}
// 回调节点,堆栈形式回调
type callbackNode struct {
fn CloseCallback
pre *callbackNode
}
// CloseCallback will be called when the connection is closed.
// Return: error is unused which will be ignored directly.
type CloseCallback func(connection Connection) error
好好学习,天天向上