字节跳动的网络库 netpoll 源码解析

背景

在微服务盛行的现在,内部服务通常以长连接为主,而 Go 原生 net 网络库却无法提供足够的性能和控制力,如无法感知连接状态、连接数量多导致利用率低、无法控制协程数量等。为了能够获取对于网络层的完全控制权,同时先于业务做一些探索并最终赋能业务,所以有了 netpoll

设计一个网络库

image-20220324092755137

如何解决 goroutine-per-connection 问题

为什么原生库有这个问题?

原生库,每建立一个连接会创建一个 GoRoutine ,在微服务长连接的模式下,大量长连接沉睡,会给 epoll 带来极大的负担,并且大量 GoRoutine 沉睡,占用了很多资源

image-20220324092812343

Netpoll 主从Reactor

通过异步回调 + Routine Pool 的方式。当连接有读写事件发生时,回调事件触发业务逻辑,将回调函数注册进 Routine Pool ,集中算力处理,上层逻辑依然保持着单线程的体验。

image-20220324092825660

如何解决拷贝带来的性能损失?

原生库

同步方式,直接从内核读取数据,硬件拷贝到内核,内核拷贝到内存,两次拷贝

image-20220324092839024

GNet

从内核读取数据,硬件拷贝到内核,内核拷贝到内存缓冲区,两次拷贝

异步调用,从缓冲区拷贝数据到业务逻辑一次拷贝

image-20220324092850720

Netpoll

从内核读取数据,硬件拷贝到内核,内核拷贝到内存缓冲区,两次拷贝

异步调用,使用 NoCopy API 拷贝数据到业务逻辑 无拷贝

image-20220324092903954

NoCopyBuffer

// Reader is a collection of operations for nocopy reads.

//

// For ease of use, it is recommended to implement Reader as a blocking interface,

// rather than simply fetching the buffer.

// For example, the return of calling Next(n) should be blocked if there are fewer than n bytes, unless timeout.

// The return value is guaranteed to meet the requirements or an error will be returned.

type Reader interface {

   // Next returns a slice containing the next n bytes from the buffer,

   // advancing the buffer as if the bytes had been returned by Read.

   //

   // If there are fewer than n bytes in the buffer, Next returns will be blocked

   // until data enough or an error occurs (such as a wait timeout).

   //

   // The slice p is only valid until the next call to the Release method.

   // Next is not globally optimal, and Skip, ReadString, ReadBinary methods

   // are recommended for specific scenarios.

   //

   // Return: len(p) must be n or 0, and p and error cannot be nil at the same time.

   Next(n int) (p []byte, err error)



   // Peek returns the next n bytes without advancing the reader.

   // Other behavior is the same as Next.

   Peek(n int) (buf []byte, err error)



   // Skip the next n bytes and advance the reader, which is

   // a faster implementation of Next when the next data is not used.

   Skip(n int) (err error)



   // Until reads until the first occurrence of delim in the input,

   // returning a slice stops with delim in the input buffer.

   // If Until encounters an error before finding a delimiter,

   // it returns all the data in the buffer and the error itself (often ErrEOF or ErrConnClosed).

   // Until returns err != nil only if line does not end in delim.

   Until(delim byte) (line []byte, err error)



   // ReadString is a faster implementation of Next when a string needs to be returned.

   // It replaces:

   //

   //  var p, err = Next(n)

   //  return string(p), err

   //

   ReadString(n int) (s string, err error)



   // ReadBinary is a faster implementation of Next when it needs to

   // return a copy of the slice that is not shared with the underlying layer.

   // It replaces:

   //

   //  var p, err = Next(n)

   //  var b = make([]byte, n)

   //  copy(b, p)

   //  return b, err

   //

   ReadBinary(n int) (p []byte, err error)



   // ReadByte is a faster implementation of Next when a byte needs to be returned.

   // It replaces:

   //

   //  var p, err = Next(1)

   //  return p[0], err

   //

   ReadByte() (b byte, err error)



   // Slice returns a new Reader containing the next n bytes from this reader,

   // the operation is zero-copy, similar to b = p [:n].

   Slice(n int) (r Reader, err error)



   // Release the memory space occupied by all read slices. This method needs to be executed actively to

   // recycle the memory after confirming that the previously read data is no longer in use.

   // After invoking Release, the slices obtained by the method such as Next, Peek, Skip will

   // become an invalid address and cannot be used anymore.

   Release() (err error)



   // Len returns the total length of the readable data in the reader.

   Len() (length int)

}



// Writer is a collection of operations for nocopy writes.

//

// The usage of the design is a two-step operation, first apply for a section of memory,

// fill it and then submit. E.g:

//

//  var buf, _ = Malloc(n)

//  buf = append(buf[:0], ...)

//  Flush()

//

// Note that it is not recommended to submit self-managed buffers to Writer.

// Since the writer is processed asynchronously, if the self-managed buffer is used and recycled after submission,

// it may cause inconsistent life cycle problems. Of course this is not within the scope of the design.

type Writer interface {

   // Malloc returns a slice containing the next n bytes from the buffer,

   // which will be written after submission(e.g. Flush).

   //

   // The slice p is only valid until the next submit(e.g. Flush).

   // Therefore, please make sure that all data has been written into the slice before submission.

   Malloc(n int) (buf []byte, err error)



   // WriteString is a faster implementation of Malloc when a string needs to be written.

   // It replaces:

   //

   //  var buf, err = Malloc(len(s))

   //  n = copy(buf, s)

   //  return n, err

   //

   // The argument string s will be referenced based on the original address and will not be copied,

   // so make sure that the string s will not be changed.

   WriteString(s string) (n int, err error)



   // WriteBinary is a faster implementation of Malloc when a slice needs to be written.

   // It replaces:

   //

   //  var buf, err = Malloc(len(b))

   //  n = copy(buf, b)

   //  return n, err

   //

   // The argument slice b will be referenced based on the original address and will not be copied,

   // so make sure that the slice b will not be changed.

   WriteBinary(b []byte) (n int, err error)



   // WriteByte is a faster implementation of Malloc when a byte needs to be written.

   // It replaces:

   //

   //  var buf, _ = Malloc(1)

   //  buf[0] = b

   //

   WriteByte(b byte) (err error)



   // WriteDirect is used to insert an additional slice of data on the current write stream.

   // For example, if you plan to execute:

   //

   //  var bufA, _ = Malloc(nA)

   //  WriteBinary(b)

   //  var bufB, _ = Malloc(nB)

   //

   // It can be replaced by:

   //

   //  var buf, _ = Malloc(nA+nB)

   //  WriteDirect(b, nB)

   //

   // where buf[:nA] = bufA, buf[nA:nA+nB] = bufB.

   WriteDirect(p []byte, remainCap int) error



   // MallocAck will keep the first n malloc bytes and discard the rest.

   // The following behavior:

   //

   //  var buf, _ = Malloc(8)

   //  buf = buf[:5]

   //  MallocAck(5)

   //

   // equivalent as

   //  var buf, _ = Malloc(5)

   //

   MallocAck(n int) (err error)



   // Append the argument writer to the tail of this writer and set the argument writer to nil,

   // the operation is zero-copy, similar to p = append(p, w.p).

   Append(w Writer) (err error)



   // Flush will submit all malloc data and must confirm that the allocated bytes have been correctly assigned.

   // Its behavior is equivalent to the io.Writer hat already has parameters(slice b).

   Flush() (err error)



   // MallocLen returns the total length of the writable data that has not yet been submitted in the writer.

   MallocLen() (length int)

}





// LinkBuffer implements ReadWriter.

type LinkBuffer struct {

   length     int32

   mallocSize int



   // 头指针,位于头部,在 Release 时追上 Read 指针,并且 Release 沿途的节点

   head  *linkBufferNode // release head

   // Read 指针,标志正在读的 Node 节点

   read  *linkBufferNode // read head

   // Flush 指针,在 Flush 时追上 Write 指针,Read 指针最多只能读到 Flush 指针的位置

   flush *linkBufferNode // malloc head

   // Write 指针,标志已经写入数据的节点位置

   write *linkBufferNode // malloc tail



   // 缓存 mcache 的节点 Release 的时候归还

   caches [][]byte // buf allocated by Next when cross-package, which should be freed when release

}





// linkBufferNode 节点定义

//

type linkBufferNode struct {

   buf      []byte          // buffer 数据缓冲区

   off      int             // read-offset  读偏移

   malloc   int             // write-offset 写偏移

   refer    int32           // reference count 节点引用计数

   readonly bool            // read-only node, introduced by Refer, WriteString, WriteBinary, etc., default false

   origin   *linkBufferNode // the root node of the extends 引用的原始节点

   next     *linkBufferNode // the next node of the linked buffer 当前节点的下一个节点

}

image-20220324102152710

NetPoll 对于分层的设计

FDOperator (MainReactor)

设计

对于 Listener 和 Connection 并不会直接触碰 Poller ,而是通过一个 FDOperator 的代理,将自己的 FD 托管,与 Poller 解耦

对于 Poller 也不会直接感知到上层,只是简单对触发了读写事件的 FD 触发相应的回调,使逻辑更加清真

image-20220324092934868

实现

// FDOperator is a collection of operations on file descriptors.

type FDOperator struct {

   // FD is file descriptor, poll will bind when register.

   FD int



   // The FDOperator provides three operations of reading, writing, and hanging.

   // The poll actively fire the FDOperator when fd changes, no check the return value of FDOperator.

   // 读回调,一般用于 Listener

   OnRead  func(p Poll) error

   // 写回调,用于 Dialer

   OnWrite func(p Poll) error

   // FD 卸载回调

   OnHup   func(p Poll) error



   // The following is the required fn, which must exist when used, or directly panic.

   // Fns are only called by the poll when handles connection events.

   // 读回调,用于 Connection

   Inputs   func(vs [][]byte) (rs [][]byte)

   // 读ACK回调,从内核读取了 N 字节数据以后,ACK 回调 N 字节数据告诉用户进程已经读取成功

   InputAck func(n int) (err error)



   // Outputs will locked if len(rs) > 0, which need unlocked by OutputAck.

   // 写回调,用于 Connection

   Outputs   func(vs [][]byte) (rs [][]byte, supportZeroCopy bool)

   // 写ACK回调,发送了 N 字节数据给内核以后,ACK 回调 N 字节数据告诉用户进程已经写入成功

   OutputAck func(n int) (err error)



   // poll is the registered location of the file descriptor.

   // 当前 FD 所挂载 

   poll Poll



   // private, used by operatorCache

   // FDOperator 下一个缓存

   next  *FDOperator

   state int32 // CAS: 0(unused) 1(inuse) 2(do-done)

}



// 操作 FDOperator 注册相应的监听

func (op *FDOperator) Control(event PollEvent) error 

// 标记 FDOperator 正在处理 IO 事件

func (op *FDOperator) do() (can bool) 

// 标记 FDOperator 处理 IO 事件结束,重新回到已使用状态

func (op *FDOperator) done() 

// 将 FDOperator 从 未使用状态 置入 已使用状态

func (op *FDOperator) inuse() 

// 将 FDOperator 从 已使用状态 置入 未使用状态

func (op *FDOperator) unused() 

// 判断 FDOperator 状态是否是 未使用状态

func (op *FDOperator) isUnused() bool 

// 重置 FDOperator

func (op *FDOperator) reset() 



// 池化技术

type operatorCache struct {

   locked int32

   first  *FDOperator // 可用的第一个 FDOperator

   cache  []*FDOperator // 这里 cache 的作用是优化 GC

}





func (c *operatorCache) alloc() *FDOperator {

   c.lock()

   if c.first == nil {

      // 批量申请一堆 FDOperator

      const opSize = unsafe.Sizeof(FDOperator{})

      n := block4k / opSize

      if n == 0 {

         n = 1

      }

      // Must be in non-GC memory because can be referenced

      // only from epoll/kqueue internals.

      for i := uintptr(0); i < n; i++ {

         pd := &FDOperator{}

         c.cache = append(c.cache, pd)

         pd.next = c.first

         c.first = pd

      }

   }

   op := c.first

   c.first = op.next

   c.unlock()

   return op

}

OnEvent (SubReactor)

设计

当有读事件发生时,由 FDOperator 作为代理,将 字节流 读入到 Connection 的 NoCopy Buffer 中

并且进行已读确认,确认完成后,OnEvent 将会发布任务到 Routine Pool 执行。

对于 Connection 来说,只需要 提供给上层读写接口,管理自己的 Buffer ,以及在读确认时告诉 OnEvent 自己有请求进来了而已,从而维护了自身逻辑的简洁和完整

image-20220324092951319

实现

// onEvent is the collection of event processing.

// OnPrepare, OnRequest, CloseCallback share the lock processing,

// which is a CAS lock and can only be cleared by OnRequest.

type onEvent struct {

   ctx               context.Context

   onConnectCallback atomic.Value

   onRequestCallback atomic.Value

   closeCallbacks    atomic.Value // value is latest *callbackNode

}



// 关闭时回调

type CloseCallback func(connection Connection) error



// 在 Connection 初始化时,对 connection 注入一些参数 并创建一个 context 供 OnConnect 使用

type OnPrepare func(connection Connection) context.Context



// 在 Connection 准备就绪以后,会回调 OnConnection ,它会创建一个 context 供 OnRequest 使用

type OnConnect func(ctx context.Context, connection Connection) context.Context



// 在有读写事件时,会回调 OnRequest ,当 error 时应主动关闭 连接, 返回的 error 不会进行任何处理,同时只会有一个 OnRequest 运行,如果不关闭连接 / 不读完数据,GoRoutine 会进入死循环

type OnRequest func(ctx context.Context, connection Connection) error

NetPoll 对于优雅退出的一些尝试

每个实例都实现了 Close 方法在触发 Close 时,会触发内部属性的一系列 Close

image-20220324093010546

// Close this server with deadline.

func (s *server) Close(ctx context.Context) error {

   s.operator.Control(PollDetach)

   s.ln.Close()



   var ticker = time.NewTicker(time.Second)

   defer ticker.Stop()

   var hasConn bool

   for {

      hasConn = false

      // 遍历所有连接,尝试关闭,如果关闭失败,则继续轮询关闭,直到所有连接全部安全关闭为止

      s.connections.Range(func(key, value interface{}) bool {

         var conn, ok = value.(gracefulExit)

         if !ok || conn.isIdle() {

            value.(Connection).Close()

         }

         hasConn = true

         return true

      })

      if !hasConn { // all connections have been closed

         return nil

      }



      select {

      case <-ctx.Done():

         return ctx.Err()

      case <-ticker.C:

         continue

      }

   }

}



// onClose means close by user.

func (c *connection) onClose() error {

   // 关闭连接

   if c.closeBy(user) {

      // If Close is called during OnPrepare, poll is not registered.

      if c.operator.poll != nil {

         c.operator.Control(PollDetach)

      }

      // 触发消除读写阻塞

      c.triggerRead()

      c.triggerWrite(ErrConnClosed)

      c.closeCallback(true) // 设置 true 意味着需要和 OnRequest 争抢锁,确保关闭连接时,OnRequest 没有执行

      return nil

   }

   if c.isCloseBy(poller) {

      // Connection with OnRequest of nil

      // relies on the user to actively close the connection to recycle resources.

      c.closeCallback(true)

   }

   return nil

}



// closeCallback .

// It can be confirmed that closeCallback and onRequest will not be executed concurrently.

// If onRequest is still running, it will trigger closeCallback on exit.

func (c *connection) closeCallback(needLock bool) (err error) {

   if needLock && !c.lock(processing) {

      return nil

   }

   var latest = c.closeCallbacks.Load()

   if latest == nil {

      return nil

   }

   for callback := latest.(*callbackNode); callback != nil; callback = callback.pre {

      callback.fn(c)

   }

   return nil

}



// 优雅退出接口,判定连接是否是空闲的,尽量在连接为空时,关闭连接

type gracefulExit interface {

   isIdle() (yes bool)

   Close() (err error)

}





// 回调节点,堆栈形式回调

type callbackNode struct {

   fn  CloseCallback

   pre *callbackNode

}



// CloseCallback will be called when the connection is closed.

// Return: error is unused which will be ignored directly.

type CloseCallback func(connection Connection) error

好好学习,天天向上