蛮荆

Go netpoll Code Reading

2023-06-12

概述

下面是一个基础的服务器网络程序,主要包含如下功能:

  • 监听 TCP 连接,绑定 8888 端口
  • 收到新的客户端连接后,启动一个新的 goroutine 进行处理
  • 收到客户端的数据后,不做任何处理,原样返回
package main

import (
	"log"
	"net"
)

func main() {
	// 初始化监听
	listener, err := net.ListenTCP("tcp", &net.TCPAddr{
		IP:   []byte("127.0.0.1"),
		Port: 8888,
	})
	if err != nil {
		panic(err)
	}

	for {
		// 接收请求
		conn, err := listener.Accept()
		if err != nil {
			panic(err)
		}

		// 启动 1 个 goroutine 处理请求
		go handle(conn)
	}
}

// 处理客户端连接请求
func handle(conn net.Conn) {
	defer func() {
		_ = conn.Close()
	}()

	buf := make([]byte, 1024)
	for {
		// 接收数据
		n, err := conn.Read(buf[:])
		if err != nil {
			log.Printf("conn Read %v", err)
			break
		}
		// 如果接收到了数据,原样返回
		if n > 0 {
			// 发送数据
			_, err = conn.Write(buf)
		}
	}
}

上述代码采用了类似 同步模型 代码的方式实现了功能,但是这种方式真的可以支撑高性能网络编程吗?答案就在隐藏在同步模型后面的底层系统调用和网络轮询器。

前置知识复习

在正式开始研究源代码之前,先来复习两个基础知识点。

1. 多路复用接口

I/O 多路复用于处理同一个事件循环中的多个 I/O 事件,这里的「多路」指多个 IO 事件,「复用」指处理事件的程序 (线程) 是同一个。

Go 网络标准库和一般的 接口 约束形式不同,并没有明确给出具体的 多路复用接口,但是不同平台上面都实现了如下几个方法:

// 初始化网络轮询器 
func netpollinit() {}

// 检测网络文件描述符是否被网络轮询器使用
func netpollIsPollDescriptor(fd uintptr) bool {}

// 创建监听事件并监听网络文件描述符
func netpollopen(fd uintptr, pd *pollDesc) int32 {}

// 删除网络文件描述符
func netpollclose(fd uintptr) int32 {}

// 检测网络轮询器并返回已经就绪的 goroutine 列表
func netpoll(delay int64) gList {}

// 唤醒网络轮询器
func netpollBreak() {}

例如 Linux 实现直接复用了底层 epoll 的相关方法, 方法定义在 $GOROOT/src/runtime/netpoll_epoll.go 文件中, MacOS 实现直接复用了底层 kqueue, 方法定义在 $GOROOT/src/runtime/netpoll_kqueue.go 文件中,其他平台以此类推。

最后,编译器利用条件编译规则,根据不同的平台编译对应的代码,例如 Linux 直接编译 $GOROOT/src/runtime/netpoll_epoll.go 文件。

2. epoll API

epoll 是 Linux 系统提供的一种 I/O 多路复用机制,它可以同时监听多个文件描述符的 I/O 事件,当其中任意一个文件描述符发生 I/O 事件时,就会触发相应的回调函数。 与传统的 select 和 poll 模型相比,epoll 的性能更好,具有更高的可扩展性和更好的业务逻辑处理能力。

epoll 的三个核心 API 如下:

  1. epoll_create: 创建一个新的 epoll 实例,返回一个 epoll 文件描述符,该文件描述符可用于 epoll_ctlepoll_wait 函数调用
  2. epoll_ctl : 管理 epoll 实例中的所有文件描述符 (内部使用红黑树数据结构进行管理),可以注册、修改或删除要监听的文件描述符,设置相应的事件类型和回调函数
  3. epoll_wait : 等待任意文件描述符监听的事件发生,当有事件触发时,函数返回一个非零值,并将所有到达的事件按顺序存入队列 (数组) 中

内部实现

结合文章开头的示例代码,接下来我们一起探究 网络轮询器 的内部实现,相关文件目录为 $GOROOT/src/runtime,笔者的 Go 版本为 go1.19 linux/amd64

本文着重分析一下 netpoll 的数据结构以及 IO 读写流程中涉及到的一些底层方法。

文件描述符数据结构

文件描述符

FD 对象表示最基础的文件描述符抽象,net 和 os 包使用该类型来表示网络连接或操作系统文件。

type FD struct {
	// 对 Sysfd 加锁,串行化 Read 和 Write
	fdmu fdMutex

	// 操作系统的文件描述符
	Sysfd int

	// 网络轮询 IO 描述符
	pd pollDesc

	// 描述符关闭信号
	csema uint32

	// 是否阻塞模式
	isBlocking uint32

	// 区分当前描述符是一个 stream, 还是一个基于包的描述符 (区分 TCP/UDP)
	// 不可变
	IsStream bool

	// 读取零字节是否表示 EOF
	ZeroReadIsEOF bool

	// 区分当前描述符是一个文件,还是一个 socket
	isFile bool
}

文件描述符初始化方法如下:

func (fd *FD) Init(net string, pollable bool) error {
    ...
	err := fd.pd.init(fd)
    ...
	return err
}

网络文件描述符

netFD 对象表示网络文件描述符。

type netFD struct {
	pfd poll.FD // 包装了一个 FD 结构体

	// 下列字段在 Close 之前不可变
	family      int
	sotype      int
	isConnected bool
	net         string
	laddr       Addr
	raddr       Addr
}

newFD 方法实例化一个 netFD 对象,并返回该对象的指针。

func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
	ret := &netFD{
		...
	}
	return ret, nil
}

网络文件描述符方法如下:

func (fd *netFD) init() error {
	return fd.pfd.Init(fd.net, true)
}

网络轮询 IO 文件描述符

pollDesc 对象表示网络轮询 IO 文件描述符,主要用于被 Go 的网络轮询器监听状态变化,是网络底层实现中的核心对象。

这里有一个需要学习的知识点: rg 字段和 wg 字段的数据类型都是 atomic.Uintptr, 而且可以用来表示 4 种数据:

  1. pdReady 信号
  2. pdWait 信号
  3. goroutine
  4. nil
// pollDesc 包含两种信号量,rg 和 wg, 可以表示多种状态
// Tips: 通过将字段设置为 atomic.Uintptr 类型 (效果和 guintptr 类似),可以支持多种类型表示

// 几种信号量状态:
// pdReady - IO 准备就绪
// pdWait - goroutine 准备休眠  
// G pointer - goroutine 阻塞
const (
    pdReady uintptr = 1
    pdWait  uintptr = 2
)

type pollDesc struct {
	link *pollDesc // 链表结构 (后面的元素) 指针
	fd   uintptr   
	
	atomicInfo atomic.Uint32
	
	rg atomic.Uintptr // 表示信号量,可能为 pdReady、pdWait、等待文件描述符可读的 goroutine 或者 nil
	wg atomic.Uintptr // 表示信号量,可能为 pdReady、pdWait、等待文件描述符可写的 goroutine 或者 nil

	lock    mutex     // 保护下面的字段
	closing bool
	rseq    uintptr   // 表示文件描述符被重用或者计时器被重置
	rt      timer     // 可读截至时间计时器
	rd      int64     // 等待文件描述符可读截至时间,-1 表示过期 (goroutine 被唤醒)
	wseq    uintptr   // 表示文件描述符被重用或者计时器被重置
	wt      timer     // 可写截至时间计时器
	wd      int64     // 等待文件描述符可写截至时间,-1 表示过期 (goroutine 被唤醒)
}

轮询文件描述符管理

pollCache 对象用来管理网络 IO 文件描述符,内置了一个互斥锁字段和一个 pollDesc 对象链表。

type pollCache struct {
	lock  mutex
	// 指向一个 pollDesc 链表
	first *pollDesc
}

数据结构图

网络文件描述符

Listen 流程

TCP 监听流程图

Listener 接口

type Listener interface {
	// 返回一个实现了 Conn 接口的连接实例
	Accept() (Conn, error)
	
	Close() error
	
	Addr() Addr
}

TCP 监听对象

type TCPListener struct {
    // 包装了一个 netFD 对象
	fd *netFD   
	lc ListenConfig
}

TCP 监听

ListenTCP 方法返回一个 TCP 监听对象的指针。

func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error) {
	...
	
	sl := &sysListener{network: network, address: laddr.String()}
	ln, err := sl.listenTCP(context.Background(), laddr)
	
	...

	return ln, nil
}

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
	fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
    
	...
	
	return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
	...
	
	family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
	return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

获取系统配置

listenerBacklog 方法缓存了系统全连接队列配置参数值,内部通过内嵌 sync.Once 的方式,保证了仅调用一次 maxListenerBacklog 方法。

func listenerBacklog() int {
	listenerBacklogCache.Do(func() { listenerBacklogCache.val = maxListenerBacklog() })
	return listenerBacklogCache.val
}

maxListenerBacklog 方法用于获取系统全连接队列配置参数值。

// Linux 读取配置文件
func maxListenerBacklog() int {
	fd, err := open("/proc/sys/net/core/somaxconn")
	...
	n, _, ok := dtoi(f[0])
	return n
}

创建 socket

socket 方法返回一个网络文件描述符,该描述符使用网络轮询器异步接收数据。

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    // 使用系统调用创建一个 socket 文件描述符
	// 并将 socket 文件描述符包装为 netFD 对象 
	s, err := sysSocket(family, sotype, proto)
	if fd, err = newFD(s, family, sotype, net); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }
	
	...

	if laddr != nil && raddr == nil {
		switch sotype {
		// 基于流: TCP
		case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
			// 1. 获取系统配置
			// 2. 绑定并监听端口
		    if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
				
            }
            ...
        // 基于数据报: UDP
		case syscall.SOCK_DGRAM:
            ...
		}
	}

	return fd, nil
}

绑定并监听端口

listenStream 方法内部实现了 TCP 的绑定端口和监听端口,并完成了 epoll 的初始化工作。

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
	...
	
	var lsa syscall.Sockaddr
	if lsa, err = laddr.sockaddr(fd.family); err != nil {
		return err
	}

	...
	
	// 绑定端口由系统调用实现
	if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
		return os.NewSyscallError("bind", err)
	}
	// 监听端口由系统调用实现
	if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
		return os.NewSyscallError("listen", err)
	}
	// 初始化 epoll
    if err = fd.init(); err != nil {
        return err
    }
	
	...
	
	return nil
}

epoll 初始化

serverInit 类型是 sync.Once 的类型别名,保证了 poll_runtime_pollServerInit 方法只会被调用一次 (也就是单个进程全局只有一个 epoll 实例,避免惊群效应)。

func (pd *pollDesc) init(fd *FD) error {
	// runtime_pollServerInit 通过链接器指向了 poll_runtime_pollServerInit
	// 初始化 epoll
	serverInit.Do(runtime_pollServerInit)
	
	// runtime_pollOpen 通过链接器指向了 poll_runtime_pollOpen
	// 将文件描述符加入 epoll 监听
	ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
	....
	return nil
}
func poll_runtime_pollServerInit() {
	netpollGenericInit()
}

func netpollGenericInit() {
	if atomic.Load(&netpollInited) == 0 {
        ...
		if netpollInited == 0 {
			netpollinit()
		}
		...
	}
}

netpollinit 方法实现了 多路复用接口,主要用于网络轮询器 epoll 具体的初始化工作,和上面的 netpollGenericInit 方法一样,该方法也只会被调用一次。

var (
    // epoll 全局对象 (也是一个文件描述符)
	// 相当于调用 epoll_create 函数返回的对象
	// 后续的 epoll_ctl, epoll_wait 函数都是基于这个对象操作的
    epfd int32 = -1

    // 数据读写管道
    netpollBreakRd, netpollBreakWr uintptr

    // 标识变量,避免重复调用 netpollBreak 方法
    netpollWakeSig uint32
)

func netpollinit() {
	// 创建 epoll 描述符,赋值到全局变量 epfd
	epfd = epollcreate1(_EPOLL_CLOEXEC)
	if epfd < 0 {
		epfd = epollcreate(1024)
		...
	}
	
	// 创建一个通信管道
	r, w, errno := nonblockingPipe()
	
    ...

	// 将用于读取数据的文件描述符转换为 epollevent 结构,进行监听
	ev := epollevent{
		events: _EPOLLIN,
	}
	*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
	
	errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)

	...
	
	netpollBreakRd = uintptr(r)
	netpollBreakWr = uintptr(w)
}

netpollopen 方法实现了 多路复用接口,将新的文件描述符和监听事件加入到全局变量 epfd 表示的网络轮询文件描述符。

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    ...
    errno := netpollopen(fd, pd)
    ...
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
	var ev epollevent
	ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
	*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
	return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

小结

ListenTCP 方法内部实现了创建 socket,绑定端口,监听端口三个操作,相对于传统的 C 系列语言编程,将初始化过程简化为一个方法 API, 当方法执行完成后,epoll 也已经完成初始化工作,进入轮询状态等待连接到来以及 IO 事件。

ListenTCP

接收 TCP 连接流程

TCP 连接对象

type TCPConn struct {
	conn
}

type conn struct {
    fd *netFD
}

Conn 接口

Conn 表示通用的面向流的网络连接。

type Conn interface {
	Read(b []byte) (n int, err error)
	
	Write(b []byte) (n int, err error)

	Close() error
	
	LocalAddr() Addr

	RemoteAddr() Addr
	
	SetDeadline(t time.Time) error
	
	SetReadDeadline(t time.Time) error
	
	SetWriteDeadline(t time.Time) error
}

接收 TCP 连接

TCPListener (TCP 监听对象) 的 Accept 方法返回一个 TCP 连接对象。

func (l *TCPListener) Accept() (Conn, error) {
    ...
	c, err := l.accept()
	...
	return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
	fd, err := ln.fd.accept()
    ...	
	tc := newTCPConn(fd)
	...
	return tc, nil
}

func (fd *netFD) accept() (netfd *netFD, err error) {
	d, rsa, errcall, err := fd.pfd.Accept()
    ...
	return netfd, nil
}

FD.Accept 方法内部不断轮询调用 accept 方法获取 TCP 连接并处理相应的错误。

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    ...
	
	for {
		// 轮询调用 accept 方法获取 TCP 连接
		s, rsa, errcall, err := accept(fd.Sysfd)
		if err == nil {
			return s, rsa, "", err
		}
		switch err {
            ...
		}
		return -1, nil, errcall, err
	}
}

accept 方法内部封装了一层 系统调用 accept,返回一个非阻塞的文件描述符。

func accept(s int) (int, syscall.Sockaddr, string, error) {
	// 先尝试 accept4 调用,如果报错了,改用 accept
	// nonblock: 设置为非阻塞模式 
	
	// accept4 通过 1 次系统调用完成 accept 和设置 nonblock 两个操作
	ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
	
	switch err {
	case nil:
		return ns, sa, "", nil
		
        ...
	}

    // accept 通过 2 次系统调用完成 accept 和设置 nonblock 两个操作 
	ns, sa, err = AcceptFunc(s)
	
    ...
	
	if err = syscall.SetNonblock(ns, true); err != nil {
		...
	}
	return ns, sa, "", nil
}

newTCPConn 方法返回一个包装好的 TCP 连接对象。

func newTCPConn(fd *netFD) *TCPConn {
	c := &TCPConn{conn{fd}}
	setNoDelay(c.fd, true)
	return c
}

接收 TCP 连接流程图

TCPAccept

数据接收和发送

接收方法

接收数据的对象是具体的 TCP 连接,所以从 conn.Read 方法开始。

func (c *conn) Read(b []byte) (int, error) {
	...
	n, err := c.fd.Read(b)
    ...
	return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
	n, err = fd.pfd.Read(p)
	// 文件描述符保活机制
	runtime.KeepAlive(fd)
	return n, wrapSyscallError(readSyscallName, err)
}

FD.Read 方法内部不断轮询 系统调用 Read 并处理相应的错误。

func (fd *FD) Read(p []byte) (int, error) {
    ...
	
	for {
		n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
		if err != nil {
			n = 0
			if err == syscall.EAGAIN && fd.pd.pollable() {
				// 如果没有可用数据,抛出 syscall.EAGAIN
				// 将当前连接所在的 goroutine 休眠
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		}
		err = fd.eofError(n, err)
		return n, err
	}
}

func (pd *pollDesc) waitRead(isFile bool) error {
	return pd.wait('r', isFile)
}


func (pd *pollDesc) wait(mode int, isFile bool) error {
	...
	
	// runtime_pollWait 通过链接器指向了 poll_runtime_pollWait
	res := runtime_pollWait(pd.runtimeCtx, mode)
	return convertErr(res, isFile)
}

poll_runtime_pollWait 方法等待网络文件描述符准备好读或写 (读写取决于参数 mode)。

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    ...
	
	for !netpollblock(pd, int32(mode), false) {
		errcode = netpollcheckerr(pd, int32(mode))
		if errcode != pollNoError {
			return errcode
		}
	}
	return pollNoError
}

netpollblock 方法用于检测网络文件描述符准备好读或写。

// 如果 IO 已经准备好,返回 true
// 如果 IO 已经超时或关闭,返回 false
// 如果 waitio 参数为 true, 阻塞等待 IO 完成, 忽略错误
// 禁止使用同一种模式并发调用 netpollblock
// 因为 pollDesc 只能为每种模式保存 1 个等待的 goroutine
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}
	
	for {
		if gpp.CompareAndSwap(pdReady, 0) {
			return true
		}
		if gpp.CompareAndSwap(0, pdWait) {
			break
		}
		if v := gpp.Load(); v != pdReady && v != 0 {
			throw("runtime: double wait")
		}
	}
	
	if waitio || netpollcheckerr(pd, mode) == pollNoError {
		// 休眠 goroutine, 等待 IO 完成
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
	}

	...
	
	return old == pdReady
}

func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
	r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
	if r {
		// 增加等待网络轮询器的 goroutine 数量
		// 调度器使用这个值决定是否阻塞,如果没有其他工作的情况下,调度器会阻塞等待网络轮询器的 IO 事件
		atomic.Xadd(&netpollWaiters, 1)
	}
	return r
}

发送方法

发送数据的对象是具体的 TCP 连接,所以从 conn.Write 方法开始。

func (c *conn) Write(b []byte) (int, error) {
	...
	n, err := c.fd.Write(b)
    ...
	return n, err
}

func (fd *netFD) Write(p []byte) (nn int, err error) {
	nn, err = fd.pfd.Write(p)
	// 文件描述符保活机制
	runtime.KeepAlive(fd)
	return nn, wrapSyscallError(writeSyscallName, err)
}

FD.Write 方法内部不断轮询 系统调用 Write 并处理相应的错误。

func (fd *FD) Write(p []byte) (int, error) {
    ...
	
	var nn int
	for {
		...
		
		n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
        
		...
		
		if err == syscall.EAGAIN && fd.pd.pollable() {
			if err = fd.pd.waitWrite(fd.isFile); err == nil {
				continue
			}
		}
        
		...
	}
}

代码执行到这里,后面的流程就和 Read 接收数据 流程一样了,这里不再赘述。

func (pd *pollDesc) waitWrite(isFile bool) error {
	return pd.wait('w', isFile)
}

小结

数据发送和接收流程图

网络轮询器

netpoll 方法用于检测网络轮询器并返回已经就绪的 goroutine 列表。

// 轮询检测准备就绪的网络连接
// 返回一个可运行 (可读/可写/可读写) 的 goroutine 列表
// 参数规则:
//  delay < 0: 无限阻塞
//  delay == 0: 非阻塞
//  delay > 0: 阻塞时间 (单位: 纳秒)
func netpoll(delay int64) gList {
	if epfd == -1 {
		return gList{}
	}
    
	...
	
	// 每次读取 128 个 IO 事件
	var events [128]epollevent
retry:
	// 调用 epoll_wait 获取接收到的 IO 事件
	n := epollwait(epfd, &events[0], int32(len(events)), waitms)
	if n < 0 {
        ...
		
		if waitms > 0 {
			return gList{}
		}
		goto retry
	}
	
	var toRun gList
	for i := int32(0); i < n; i++ {
		ev := &events[i]
		
		...

		var mode int32
		if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'r'
		}
		if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'w'
		}
		if mode != 0 {
			pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
			pd.setEventErr(ev.events == _EPOLLERR)
			netpollready(&toRun, pd, mode)
		}
	}
	return toRun
}

netpollready 方法表示网络文件描述符关联的 IO 事件已经就绪,并将参数 pd 网络文件描述符内部的 goroutine 添加到参数队列中。

// 参数 toRun 是一个 goroutine 列表
// 参数 mode 规则
//  'r': IO 读
//  'w': IO 写
//  'r'+'w': IO 读写
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
	var rg, wg *g
	if mode == 'r' || mode == 'r'+'w' {
		rg = netpollunblock(pd, 'r', true)
	}
	if mode == 'w' || mode == 'r'+'w' {
		wg = netpollunblock(pd, 'w', true)
	}
	if rg != nil {
		toRun.push(rg)
	}
	if wg != nil {
		toRun.push(wg)
	}
}

netpollunblock 方法将网络文件描述符中的读信号或者写信号转换为 pdReady 状态,然后返回存储在内部的 goroutine。

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
	... 
	
	for {
		...
		var new uintptr
		if ioready {
			new = pdReady
		}
		if gpp.CompareAndSwap(old, new) {
			if old == pdWait {
				old = 0
			}
			return (*g)(unsafe.Pointer(old))
		}
	}
}

小结

网络轮询器调用关系图

netpoll 方法会返回一个可运行的 goroutine 列表,然后调用方会将返回的 goroutine 逐个加入处理器的本地队列或者全局队列。 从图中可以看到调用方主要有 4 个,其中调度线程 schedule 和监控线程 sysmon 在 GMP 调度器一文中已经讲过了,这里不再赘述,剩下的 GC 和 STW 后面有机会再讲。

超时控制

接收数据超时

conn.SetReadDeadline 方法设置连接的接收数据超时时间。

func (c *conn) SetReadDeadline(t time.Time) error {
    ...
	if err := c.fd.SetReadDeadline(t); err != nil {
		return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
	}
	return nil
}

func (fd *netFD) SetWriteDeadline(t time.Time) error {
	return fd.pfd.SetWriteDeadline(t)
}

func (fd *FD) SetWriteDeadline(t time.Time) error {
	return setDeadlineImpl(fd, t, 'w')
}

func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
	...
	
	runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
	return nil
}

poll_runtime_pollSetDeadline 方法会设置参数 pd 网络文件描述符内部的定时器 (goroutine 持有),并在定时器到期后进行相关的操作。

func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
	// 主要是对 pd 进行定时器的相关设置,这里直接跳过这部分内容
	...
	
	// 如果截止时间已经过期,取消等待 IO 而导致的阻塞
	var rg, wg *g
	if pd.rd < 0 {
		rg = netpollunblock(pd, 'r', false)
	}
	if pd.wd < 0 {
		wg = netpollunblock(pd, 'w', false)
	}
	
	// 如果有取消读事件的 goroutine, 则进行唤醒
	if rg != nil {
		netpollgoready(rg, 3)
	}
    // 如果有取消写事件的 goroutine, 则进行唤醒
	if wg != nil {
		netpollgoready(wg, 3)
	}
}

func netpollgoready(gp *g, traceskip int) {
    atomic.Xadd(&netpollWaiters, -1)
    goready(gp, traceskip+1)
}

发送数据超时

发送数据超时和接收数据流程基本一致,只是调用的方法不同,这里就不再展开了。

关闭连接

conn.Close 方法用于关闭网络连接。

func (c *conn) Close() error {
    ...
	err := c.fd.Close()
    ...
	return err
}

func (fd *netFD) Close() error {
	runtime.SetFinalizer(fd, nil)
	return fd.pfd.Close()
}

func (fd *FD) Close() error {
	...
	fd.pd.evict()
    ...
	return err
}

evict 方法会关闭网络文件描述符,并取消所有阻塞在等待该文件描述符的 IO 事件。

func (pd *pollDesc) evict() {
	...
	runtime_pollUnblock(pd.runtimeCtx)
}

func poll_runtime_pollUnblock(pd *pollDesc) {
    ...

	pd.closing = true
	var rg, wg *g
	rg = netpollunblock(pd, 'r', false)
	wg = netpollunblock(pd, 'w', false)
	
	...
	
	if rg != nil {
		netpollgoready(rg, 3)
	}
	if wg != nil {
		netpollgoready(wg, 3)
	}
}

流程图

关闭连接流程图

小结

本文用一个基础的服务器网络程序为示例,分析了网络标准库中的端口监听、接收连接、发送/接收数据, 关闭连接 4 个主要流程的 Linux 版本实现代码。 Go 网络标准库通过在底层封装 epoll 实现了 IO 多路复用,通过网络轮询器加 GMP 调度器避免了传统网络编程中的线程切换和 IO 阻塞,两者的完美配合是 Go 网络编程高性能的基石。

扩展阅读

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。