Go netpoll
Go netpoll 通过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。总结来说,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。
net.Listen("tcp", ":8888")
方法返回了一个 TCPListener
,它是一个实现了 net.Listener
接口的 struct。
通过 listener.Accept()
接收的新连接 TCPConn
则是一个实现了 net.Conn
接口的 struct,它内嵌了 net.conn
struct。
- 不管是 Listener 的 Accept 还是 Conn 的 Read/Write 方法,都是基于一个
netFD
的数据结构的操作 netFD
是一个网络描述符,类似于 Linux 的文件描述符的概念,netFD 中包含一个 poll.FD 数据结构- 而 poll.FD 中包含两个重要的数据结构 Sysfd 和 pollDesc,前者是真正的系统文件描述符,后者对是底层事件驱动的封装,所有的读写超时等操作都是通过调用后者的对应方法实现的
epoll in Golang
我们前面提到的 epoll 的三个基本调用,Go 在源码里实现了对那三个调用的封装:
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
// Go 对上面三个调用的封装
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList
pollDesc
// /usr/local/Cellar/go/1.16.6/libexec/src/runtime/netpoll.go
// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
// that will blow up when GC starts moving objects.
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}
该结构体中包含用于监控可读和可写状态的变量,我们按照功能将它们分成以下四组:
rseq
和wseq
— 表示文件描述符被重用或者计时器被重置5;rg
和wg
— 表示二进制的信号量,可能为pdReady
、pdWait
、等待文件描述符可读或者可写的 Goroutine 以及nil
;rd
和wd
— 等待文件描述符可读或者可写的截止日期;rt
和wt
— 用于等待文件描述符的计时器;
除了上述八个变量之外,该结构体中还保存了用于保护数据的互斥锁、文件描述符。runtime.pollDesc
结构体会使用 link
字段串联成链表存储在 runtime.pollCache
中:
type pollCache struct {
lock mutex
first *pollDesc
}
runtime.pollCache
是运行时包中的全局变量,该结构体中包含一个用于保护轮询数据的互斥锁和链表头:
// 使用 sync.Once 来确保一个 listener 只持有一个 epoll 实例
var serverInit sync.Once
// netFD.init 会调用 poll.FD.Init 并最终调用到 pollDesc.init,
// 它会创建 epoll 实例并把 listener fd 加入监听队列
func (pd *pollDesc) init(fd *FD) error {
// runtime_pollServerInit 内部调用了 netpollinit 来创建 epoll 实例
serverInit.Do(runtime_pollServerInit)
// runtime_pollOpen 内部调用了 netpollopen 来将 listener fd 注册到
// epoll 实例中,另外,它会初始化一个 pollDesc 并返回
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return errnoErr(syscall.Errno(errno))
}
// 把真正初始化完成的 pollDesc 实例赋值给当前的 pollDesc 代表自身的指针,
// 后续使用直接通过该指针操作
pd.runtimeCtx = ctx
return nil
}
netpollinit()
- Init epoll instance
因为文件 I/O、网络 I/O 以及计时器都依赖网络轮询器,所以 Go 语言会通过以下两条不同路径初始化网络轮询器:
internal/poll.pollDesc.init
— 通过net.netFD.init
和os.newFile
初始化网络 I/O 和文件 I/O 的轮询信息时;runtime.doaddtimer
— 向处理器中增加新的计时器时;
网络轮询器的初始化会使用 runtime.poll_runtime_pollServerInit
和 runtime.netpollGenericInit
两个函数:
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
runtime.netpollGenericInit
会调用平台上特定实现的 runtime.netpollinit
,即 Linux 上的 epoll,它主要做了以下几件事情:
- 是调用 epollcreate1 创建一个新的 epoll 文件描述符,这个文件描述符会在整个程序的生命周期中使用;
- 通过 runtime.nonblockingPipe 创建一个用于通信的管道;
- 使用 epollctl 将用于读取数据的文件描述符打包成 epollevent 事件加入监听;
// /usr/local/go/src/runtime/netpoll_epoll.go
var (
// 全局唯一的 epoll fd,只在 listener fd 初始化之时被指定一次
epfd int32 = -1 // epoll descriptor
)
// netpollinit 会创建一个 epoll 实例,然后把 epoll fd 赋值给 epfd,
// 后续 listener 以及它 accept 的所有 sockets 有关 epoll 的操作都是基于这个全局的 epfd
// called by netpollGenericInit
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd >= 0 {
return
}
epfd = epollcreate(1024)
if epfd >= 0 {
closeonexec(epfd)
return
}
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
初始化网络轮询器,通过 sync.Once 和 netpollInited 变量保证函数只会调用一次;
初始化的管道为我们提供了中断多路复用等待文件描述符中事件的方法,runtime.netpollBreak
会向管道中写入数据唤醒 epoll
:
func netpollBreak() {
for {
var b byte
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
if n == 1 {
break
}
if n == -_EINTR {
continue
}
if n == -_EAGAIN {
return
}
}
}
因为目前的计时器由网络轮询器管理和触发,它能够让网络轮询器立刻返回并让运行时检查是否有需要触发的计时器。
netpollopen()
- Register fd
调用 internal/poll.pollDesc.init
初始化文件描述符时不止会初始化网络轮询器,还会通过 runtime.poll_runtime_pollOpen
重置轮询信息 runtime.pollDesc
并调用 runtime.netpollopen
初始化轮询事件:
// /usr/local/Cellar/go/1.16.6/libexec/src/runtime/netpoll.go
// call netpollopen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
if pd.wg != 0 && pd.wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
if pd.rg != 0 && pd.rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
pd.closing = false
pd.everr = false
pd.rseq++
pd.rg = 0
pd.rd = 0
pd.wseq++
pd.wg = 0
pd.wd = 0
pd.self = pd
unlock(&pd.lock)
var errno int32
errno = netpollopen(fd, pd)
return pd, int(errno)
}
runtime.netpollopen
的实现非常简单,它会调用 epollctl 向全局的轮询文件描述符 epfd 中加入新的轮询事件监听文件描述符的可读和可写状态:
// netpollopen 会被 runtime_pollOpen 调用,注册 fd 到 epoll 实例,
// 同时会利用万能指针把 pollDesc 保存到 epollevent 的一个 8 位的字节数组 data 里
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
netpoll()
轮询网络并返回一组已经准备就绪的 Goroutine,传入的参数会决定它的行为3;
- 如果参数小于 0,无限期等待文件描述符就绪;
- 如果参数等于 0,非阻塞地轮询网络;
- 如果参数大于 0,阻塞特定时间轮询网络;
当 I/O 事件发生之后,netpoll 是通过什么方式唤醒那些在 I/O wait 的 goroutine 的?答案是通过 epoll_wait
,在 Go 源码中的 src/runtime/netpoll_epoll.go
文件中有一个 func netpoll(block bool) gList
方法,它会内部调用 epoll_wait
获取就绪的 fd 列表,并将每个 fd 对应的 goroutine 添加到链表返回:
// /usr/local/go/src/runtime/netpoll_epoll.go
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
var events [128]epollevent
retry:
// 获取就绪的 fd 列表
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
// toRun 是一个 g 的链表,存储要恢复的 goroutines,最后返回给调用方
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
}
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 调用 netpollready,传入就绪 fd 的 pollDesc,把 fd 对应的 goroutine 添加到链表 toRun 中
netpollready(&toRun, pd, mode)
}
}
return toRun
}
// netpollready 调用 netpollunblock 返回就绪 fd 对应的 goroutine 的抽象数据结构 g
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
// netpollunblock 会依据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的
// goroutine 抽象数据结构 g 并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
// mode == 'r' 代表当时 gopark 是为了等待读事件,而 mode == 'w' 则代表是等待写事件
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
// 取出 gpp 存储的 g
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 重置 pollDesc 的 rg 或者 wg
if atomic.Casuintptr(gpp, old, new) {
if old == pdReady || old == pdWait {
old = 0
}
// 通过万能指针还原成 g 并返回
return (*g)(unsafe.Pointer(old))
}
}
}
而 Go 在多种场景下都可能会调用 netpoll
检查文件描述符状态。寻找到 I/O 就绪的 socket fd,并找到这些 socket fd 对应的轮询器中附带的信息,根据这些信息将之前等待这些 socket fd 就绪的 goroutine 状态修改为 _Grunnable
。执行完 netpoll
之后,会返回一个就绪 fd 列表对应的 goroutine 列表,接下来将就绪的 goroutine 加入到调度队列中,等待调度运行。
首先,在 Go runtime scheduler 正常调度 goroutine 之时就有可能会调用 netpoll
获取到已就绪的 fd 对应的 goroutine 来调度执行:
// /usr/local/go/src/runtime/proc.go
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
// ...
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
// ...
}
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
// ...
// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(false); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
// ...
}
Go scheduler 的核心方法 schedule
里会调用一个叫 findrunable()
的方法获取可运行的 goroutine 来执行,而在 findrunable()
方法里就调用了 netpoll
获取已就绪的 fd 列表对应的 goroutine 列表。
另外, sysmon
监控线程也可能会调用到 netpoll
:
// /usr/local/go/src/runtime/proc.go
// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
// ...
now := nanotime()
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
// 以非阻塞的方式调用 netpoll 获取就绪 fd 列表
list := netpoll(false) // non-blocking - returns list of goroutines
if !list.empty() {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1)
// 将其插入调度器的runnable列表中(全局),等待被调度执行
injectglist(&list)
incidlelocked(1)
}
}
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
// check if we need to force a GC
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
lock(&forcegc.lock)
forcegc.idle = 0
var list gList
list.push(forcegc.g)
injectglist(&list)
unlock(&forcegc.lock)
}
if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
lasttrace = now
schedtrace(debug.scheddetail > 0)
}
}
}
Go runtime 在程序启动的时候会创建一个独立的 M 作为监控线程,叫 sysmon
,这个线程为系统级的 daemon 线程,无需 P 即可运行, sysmon
每 20us~10ms 运行一次。 sysmon
中以轮询的方式执行以下操作(如上面的代码所示):
- 以非阻塞的方式调用
runtime.netpoll
,从中找出能从网络 I/O 中唤醒的 G,并调用injectglist
,将其插入调度器的 runnable 列表中(全局),调度触发时,有可能从这个全局 runnable 列表获取 G。然后再循环调用startm
,直到所有 P 都不处于_Pidle
状态。 - 调用
retake
,抢占长时间处于_Psyscall
状态的 P。
G 在运行过程中如果被阻塞在某个 system call 操作上,那么不光 G 会阻塞,执行该 G 的 M 也会解绑 P(实质是被 sysmon 抢走了),与 G 一起进入 sleep 状态。如果此时有 idle 的 M,则 P 与其绑定继续执行其他 G;如果没有 idle M,但仍然有其他 G 要去执行,那么就会创建一个新的 M。当阻塞在 system call 上的 G 完成 syscall 调用后,G 会去尝试获取一个可用的 P,如果没有可用的 P,那么 G 会被标记为 _Grunnable
并把它放入全局的 runqueue 中等待调度,之前的那个 sleep 的 M 将再次进入 sleep。
现在清楚为什么 netpoll 为什么一定要使用非阻塞 I/O 了吧?就是为了避免让操作网络 I/O 的 goroutine 陷入到系统调用从而进入内核态,因为一旦进入内核态,整个程序的控制权就会发生转移(到内核),不再属于用户进程了,那么也就无法借助于 Go 强大的 runtime scheduler 来调度业务程序的并发了;而有了 netpoll 之后,借助于非阻塞 I/O ,G 就再也不会因为系统调用的读写而陷入内核态,当 G 被阻塞在某个 network I/O 操作上时,实际上它不是因为陷入内核态被阻塞住了,而是被 Go runtime 调用 gopark 给 park 住了,此时 G 会被放置到某个 wait queue 中,而 M 会尝试运行下一个 _Grunnable
的 G,如果此时没有 _Grunnable
的 G 供 M 运行,那么 M 将解绑 P,并进入 sleep 状态。当 I/O available,在 wait queue 中的 G 会被唤醒,标记为 _Grunnable
,放入某个可用的 P 的 local 队列中,绑定一个 M 恢复执行。
TCP
一个典型的 Go TCP server:
package main
import (
"fmt"
"net"
)
func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
fmt.Println("listen error: ", err)
return
}
for {
conn, err := listen.Accept()
if err != nil {
fmt.Println("accept error: ", err)
break
}
// start a new goroutine to handle the new connection
go HandleConn(conn)
}
}
func HandleConn(conn net.Conn) {
defer conn.Close()
packet := make([]byte, 1024)
for {
// 如果没有可读数据,也就是读 buffer 为空,则阻塞
_, _ = conn.Read(packet)
// 同理,不可写则阻塞
_, _ = conn.Write(packet)
}
}
上面是一个基于 Go 原生网络模型(基于 netpoll)编写的一个 TCP server,模式是 goroutine-per-connection
,在这种模式下,开发者使用的是同步的模式去编写异步的逻辑而且对于开发者来说 I/O 是否阻塞是无感知的,也就是说开发者无需考虑 goroutines 甚至更底层的线程、进程的调度和上下文切换。而 Go netpoll 最底层的事件驱动技术肯定是基于 epoll/kqueue/iocp 这一类的 I/O 事件驱动技术,只不过是把这些调度和上下文切换的工作转移到了 runtime 的 Go scheduler,让它来负责调度 goroutines,从而极大地降低了程序员的心智负担!
use
socat TCP:localhost:8888 -
to debug dynamically
Accept
listener, err := net.ListenTCP("tcp", tcpAddress)
// If listener fails, we panic.
if err != nil {
panic(err)
}
defer listener.Close()
// TestAuthenticateUser()
for {
conn, err := listener.AcceptTCP()
if err != nil {
logger.Error("TCP Listener unable to accept connection: ", err)
continue
}
go handleConn(conn)
}
Go netpoll 通过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。总结来说,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。
net.Listen("tcp", ":8888")
方法返回了一个 *TCPListener,它是一个实现了 net.Listener
接口的 struct。
通过 listener.Accept()
接收的新连接 *TCPConn 则是一个实现了 net.Conn
接口的 struct,它内嵌了 net.conn
struct。
- 不管是 Listener 的 Accept 还是 Conn 的 Read/Write 方法,都是基于一个
netFD
的数据结构的操作 netFD
是一个网络描述符,类似于 Linux 的文件描述符的概念,netFD 中包含一个 poll.FD 数据结构- 而 poll.FD 中包含两个重要的数据结构 Sysfd 和 pollDesc,前者是真正的系统文件描述符,后者对是底层事件驱动的封装,所有的读写超时等操作都是通过调用后者的对应方法实现的
// /usr/local/go/src/net/tcpsock.go
// AcceptTCP accepts the next incoming call and returns the new
// connection.
func (l *TCPListener) AcceptTCP() (*TCPConn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
// /usr/local/go/src/net/tcpsock_posix.go
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
// /usr/local/go/src/net/fd_unix.go
// Network file descriptor.
type netFD struct {
pfd poll.FD
...
}
// /usr/local/go/src/net/fd_unix.go
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
Get a socket
netpoll
accept socket 的工作流程如下:
- 服务端的 netFD 在
listen
时会创建 epoll instance,并将 listenerFD 加入 epoll 的事件队列 - netFD 在
accept
时将返回的 connFD 也加入 epoll 的事件队列 - netFD 在读写时出现
syscall.EAGAIN
错误,通过 pollDesc 的waitRead
方法将当前的 goroutine park 住,直到 ready,从 pollDesc 的waitRead
中返回
Listener.Accept()
接收来自客户端的新连接通过:调用 netFD.accept
-> 调用 pfd.Accept -> 使用 Linux 的系统调用 accept
来完成新连接的接收,并且会把 accept 的 socket 设置成非阻塞 I/O 模式
// /usr/local/go/src/internal/poll/fd_unix.go
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
...
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
...
}
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
...
for {
// 使用 linux 系统调用 accept 接收新连接,创建对应的 socket
s, rsa, errcall, err := accept(fd.Sysfd)
// 因为 listener fd 在创建的时候已经设置成非阻塞的了,
// 所以 accept 方法会直接返回,不管有没有新连接到来;如果 err == nil 则表示正常建立新连接,直接返回;
// 如果 err == syscall.EAGAIN,表示当前通过epoll的方式等待 new socket(当 new socket到来,该G会被唤醒并继续执行)
if err == nil {
return s, rsa, "", err
}
switch err {
// 如果 err != nil,则判断 err 是否 == syscall.EAGAIN,符合条件则进入 pollDesc.waitRead 方法
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里(该G进入sleep)
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}
// /usr/local/go/src/internal/poll/sys_cloexec.go
// Wrapper around the accept system call that marks the returned file
// descriptor as nonblocking and close-on-exec.
func accept(s int) (int, syscall.Sockaddr, string, error) {
// See ../syscall/exec_unix.go for description of ForkLock.
// It is probably okay to hold the lock across syscall.Accept
// because we have put fd.sysfd into non-blocking mode.
// However, a call to the File method will put it back into
// blocking mode. We can't take that risk, so no use of ForkLock here.
ns, sa, err := AcceptFunc(s)
if err == nil {
syscall.CloseOnExec(ns)
}
if err != nil {
return -1, nil, "accept", err
}
if err = syscall.SetNonblock(ns, true); err != nil {
CloseFunc(ns)
return -1, nil, "setnonblock", err
}
return ns, sa, "", nil
}
pollDesc.wait
内部调用了 runtime_pollWait
来达成无 I/O 事件时 park 住 goroutine 的目的:
// /usr/local/go/src/runtime/netpoll.go
// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
// that will blow up when GC starts moving objects.
...
}
// /usr/local/go/src/internal/poll/fd_poll_runtime.go
// waitRead 检测当前这个 pollDesc 的上层 netFD 对应的 fd 是否有『期待的』I/O 事件发生,如果有就直接返回,否则就 park 住当前的 goroutine 并持续等待直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
// the G will be blocked here until a new socket comes (and then this G becomes runnable from the sleep status)
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
// /usr/local/go/src/runtime/netpoll.go
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
...
// Puts the current goroutine into a sleep state until I/O is ready
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
...
}
return 0
}
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to WAIT
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
// waitio 此时是 false,netpollcheckerr 方法会检查当前 pollDesc 对应的 fd 是否是正常的,
// 通常来说 netpollcheckerr(pd, mode) == 0 是成立的,所以这里会执行 gopark
// 把当前 goroutine 给 park 住,直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,
// 然后 unpark 返回,在 gopark 内部会把当前 goroutine 的抽象数据结构 g 存入
// gpp(pollDesc.rg/pollDesc.wg) 指针里,以便在后面的 netpoll 函数取出 pollDesc 之后,
// 把 g 添加到链表里返回,接着重新调度 goroutine
if waitio || netpollcheckerr(pd, mode) == 0 {
// 注册 netpollblockcommit 回调给 gopark,在 gopark 内部会执行它,保存当前 goroutine 到 gpp
// the G will be blocked here until it gets a P associated a M
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent READY notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
// /usr/local/go/src/runtime/proc.go
// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
// gopark 会停住当前的 goroutine 并且调用传递进来的回调函数 unlockf,从上面的源码我们可以知道这个函数是
// netpollblockcommit
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
// gopark 最终会调用 park_m,在这个函数内部会调用 unlockf,也就是 netpollblockcommit,
// 然后会把当前的 goroutine,也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
mcall(park_m)
}
// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if fn := _g_.m.waitunlockf; fn != nil {
// 调用 netpollblockcommit,把当前的 goroutine,
// 也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}
Create a Runtime netFD
poll.FD.Accept() 返回之后,会构造一个对应这个新 socket 的 netFD:
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}
然后调用 init() 方法完成初始化,这个 init 过程和前面 net.Listen()
是一样的,调用链:netFD.init()
–> poll.FD.Init()
–> poll.pollDesc.init()
:
// /usr/local/go/src/net/fd_unix.go
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}
// /usr/local/go/src/internal/poll/fd_unix.go
// Init initializes the FD. The Sysfd field should already be set.
// This can be called multiple times on a single FD.
// The net argument is a network name from the net package (e.g., "tcp"),
// or "file".
// Set pollable to true if fd should be managed by runtime netpoll.
func (fd *FD) Init(net string, pollable bool) error {
// We don't actually care about the various network types.
if net == "file" {
fd.isFile = true
}
if !pollable {
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd)
if err != nil {
// If we could not initialize the runtime poller,
// assume we are using blocking mode.
fd.isBlocking = 1
}
return err
}
// /usr/local/go/src/internal/poll/fd_poll_runtime.go
// 使用 sync.Once 来确保一个 listener 只持有一个 epoll 实例
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
// runtime_pollServerInit 内部调用了 netpollinit 来创建 epoll instance
serverInit.Do(runtime_pollServerInit)
// runtime_pollOpen 内部调用了 netpollopen 来将 accept 的 fd 注册到
// epoll 实例中,另外,它会初始化一个 pollDesc 并返回
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
然后把这个 socket fd 注册到 listener 的 epoll 实例的事件队列中去,等待 I/O 事件。
Write/Read
和 Listener.Accept
是类似,即先调用 conn 的 netFD.Read
,然后内部再调用 poll.FD.Read
,最后使用 Linux 的系统调用 read(syscall.Read
)完成数据读取:
// /usr/local/go/src/net/net.go
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
// /usr/local/go/src/net/fd_unix.go
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}
// /usr/local/go/src/internal/poll/fd_unix.go
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
...
for {
// 尝试从该 socket 读取数据,因为 socket 在被 listener accept 的时候设置成
// 了非阻塞 I/O,所以这里同样也是直接返回,不管有没有可读的数据
// 如果没可读的数据,err == EAGAIN (35)
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
// err == syscall.EAGAIN 表示当前没有期待的 I/O 事件发生,也就是 socket 不可读
if err == syscall.EAGAIN && fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead
// 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
...
}
err = fd.eofError(n, err)
return n, err
}
}
// /usr/local/go/src/syscall/syscall_unix.go
func Read(fd int, p []byte) (n int, err error) {
n, err = read(fd, p)
if race.Enabled {
if n > 0 {
race.WriteRange(unsafe.Pointer(&p[0]), n)
}
if err == nil {
race.Acquire(unsafe.Pointer(&ioSync))
}
}
if msanenabled && n > 0 {
msanWrite(unsafe.Pointer(&p[0]), n)
}
return
}
waitRead 最终会调用 runtime_pollWait
,这和 Accept
的情况是一模一样的,即通过pollDesc.waitRead
来park 当前G,直到I/O事件发生才返回,
// /usr/local/go/src/internal/poll/fd_poll_runtime.go
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
...
// the G will be blocked here until a new socket comes (and then this G becomes runnable from the sleep status)
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
conn.Write
和 conn.Read
的原理是一致的,它也是通过类似 pollDesc.waitRead
的 pollDesc.waitWrite
来 park 住 goroutine 直至期待的 I/O 事件发生才返回,而 pollDesc.waitWrite
的内部实现原理和 pollDesc.waitRead
是一样的,都是基于 runtime_pollWait
,这里就不再赘述。
Go netpoll 的问题
Go netpoll 的设计不可谓不精巧、性能也不可谓不高,配合 goroutine 开发网络应用的时候就一个字:爽。因此 Go 的网络编程模式是及其简洁高效的。然而,没有任何一种设计和架构是完美的, goroutine-per-connection
这种模式虽然简单高效,但是在某些极端的场景下也会暴露出问题:goroutine 虽然非常轻量,它的自定义栈内存初始值仅为 2KB,后面按需扩容;海量连接的业务场景下, goroutine-per-connection
,此时 goroutine 数量以及消耗的资源就会呈线性趋势暴涨,首先给 Go runtime scheduler 造成极大的压力和侵占系统资源,然后资源被侵占又反过来影响 runtime 的调度,导致性能大幅下降;此外,我们通过源码可以知道,Go netpoll 会通过 sync.Once
确保只初始化一个 epoll 实例,也就是说它是 single event-loop 模式,接受新连接和处理 I/O 事件是全部放在一个 thread 里的,所以在海量连接同时又高频创建和销毁连接的业务场景下有可能会导致性能瓶颈。