【Golang】源码 - net 包 - unixsock

Posted by 西维蜀黍 on 2021-07-17, Last Modified on 2021-09-21

Test UDS Server

package main

import (
	"log"
	"net"
	"os"
	"os/signal"
	"syscall"
)

func echoServer(c net.Conn) {
	for {
		buf := make([]byte, 512)
		nr, err := c.Read(buf)
		if err != nil {
			return
		}

		data := buf[0:nr]
		println("Server got:", string(data))
		_, err = c.Write(data)
		if err != nil {
			log.Fatal("Writing client error: ", err)
		}
	}
}

func main() {
	log.Println("Starting echo server")
	ln, err := net.Listen("unix", "/tmp/go1.sock")
	if err != nil {
		log.Fatal("Listen error: ", err)
	}

	sigc := make(chan os.Signal, 1)
	signal.Notify(sigc, os.Interrupt, syscall.SIGTERM)
	go func(ln net.Listener, c chan os.Signal) {
		sig := <-c
		log.Printf("Caught signal %s: shutting down.", sig)
		ln.Close()
		os.Exit(0)
	}(ln, sigc)

	for {
		fd, err := ln.Accept()
		if err != nil {
			log.Fatal("Accept error: ", err)
		}

		go echoServer(fd)
	}
}

Listen()

// /usr/local/go/src/net/dial.go

// Listen announces on the local network address.
//
// The network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket".
//
// For TCP networks, if the host in the address parameter is empty or
// a literal unspecified IP address, Listen listens on all available
// unicast and anycast IP addresses of the local system.
// To only use IPv4, use network "tcp4".
// The address can use a host name, but this is not recommended,
// because it will create a listener for at most one of the host's IP
// addresses.
// If the port in the address parameter is empty or "0", as in
// "127.0.0.1:" or "[::1]:0", a port number is automatically chosen.
// The Addr method of Listener can be used to discover the chosen
// port.
//
// See func Dial for a description of the network and address
// parameters.
func Listen(network, address string) (Listener, error) {
	var lc ListenConfig
	return lc.Listen(context.Background(), network, address)
}

// Listen announces on the local network address.
//
// See func Listen for a description of the network and address
// parameters.
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
	addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
  // 对于 UDS,这里 addrs 一定等于 addrList[&UnixAddr{Nmae: "<uds_path>", Net: "unix"}]
	...
	sl := &sysListener{
		ListenConfig: *lc,
		network:      network,
		address:      address,
	}
	var l Listener
	la := addrs.first(isIPv4)
	switch la := la.(type) {
	case *UnixAddr:
		l, err = sl.listenUnix(ctx, la)
	default:
		...
	}
  ...
	return l, nil
}
// /usr/local/go/src/net/unixsock_posix.go
func (sl *sysListener) listenUnix(ctx context.Context, laddr *UnixAddr) (*UnixListener, error) {
	fd, err := unixSocket(ctx, sl.network, laddr, nil, "listen", sl.ListenConfig.Control)
	if err != nil {
		return nil, err
	}
  // 返回一个UnixListener,它实现了 Listener interface
	return &UnixListener{fd: fd, path: fd.laddr.String(), unlink: true}, nil
}

func unixSocket(ctx context.Context, net string, laddr, raddr sockaddr, mode string, ctrlFn func(string, string, syscall.RawConn) error) (*netFD, error) {
	var sotype int
	switch net {
	case "unix":
		sotype = syscall.SOCK_STREAM
  ...
	}

  ...

	fd, err := socket(ctx, net, syscall.AF_UNIX, sotype, 0, false, laddr, raddr, ctrlFn)
	if err != nil {
		return nil, err
	}
	return fd, nil
}
// /usr/local/go/src/net/sock_posix.go

// socket returns a network file descriptor that is ready for
// asynchronous I/O using the network poller.
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
  // 调用这个 function 会在runtime时,调用 C library来触发一个syscall
  // 返回的 s 是在当前proces中新创建的的file descriptor number(for 这个UDS)
	s, err := sysSocket(family, sotype, proto)
  ...
  // 创建一个在runtime中表示FD的struct,这个newFD function的实现很简单,就不展开了
	if fd, err = newFD(s, family, sotype, net); err != nil {
		poll.CloseFunc(s)
		return nil, err
	}

	...
	return fd, nil
}

fd_unix.go

// /usr/local/go/src/net/fd_unix.go
// Network file descriptor.
type netFD struct {
	pfd poll.FD

	// immutable until Close
	family      int
	sotype      int
	isConnected bool // handshake completed or use of association with peer
	net         string
	laddr       Addr
	raddr       Addr
}

UnixListener

// UnixListener is a Unix domain socket listener. Clients should
// typically use variables of type Listener instead of assuming Unix
// domain sockets.
type UnixListener struct {
	fd         *netFD
	path       string
	unlink     bool
	unlinkOnce sync.Once
}

它实现了3个 interface

  • Listener
  • Conn
  • Closer

Accept()

从process的角度来说,每个accept的 connection,都对应一个 fd:

# 当一个新的client出现,并与 server 尝试建立连接后(nc -U /tmp/go1.sock),server process 就会多一个 fd
$ lsof -p 2419
COMMAND    PID   USER   FD     TYPE             DEVICE SIZE/OFF                NODE NAME
...
___go_bui 2419 shiwei    9u    unix 0xec48c03b93989f9d      0t0                     /tmp/go1.sock

# 当又一个新的client出现,并与server 尝试建立连接后(nc -U /tmp/go1.sock)),server process 就会又多一个 fd

$ lsof -p 2419
COMMAND    PID   USER   FD     TYPE             DEVICE SIZE/OFF                NODE NAME
...
___go_bui 2419 shiwei    9u    unix 0xec48c03b93989f9d      0t0                     /tmp/go1.sock
# new fd
___go_bui 2419 shiwei   10u    unix 0xec48c03b9398aa8d      0t0                     /tmp/go1.sock
// /usr/local/go/src/net/unixsock.go

// Accept implements the Accept method in the Listener interface.
// Returned connections will be of type *UnixConn.
func (l *UnixListener) Accept() (Conn, error) {
	...
  // 返回一个 *UnixConn
	c, err := l.accept()
	...
	return c, nil
}

// UnixConn is an implementation of the Conn interface for connections
// to Unix domain sockets.
type UnixConn struct {
	conn
}
// /usr/local/go/src/net/unixsock_posix.go
func (ln *UnixListener) accept() (*UnixConn, error) {
	fd, err := ln.fd.accept()
	if err != nil {
		return nil, err
	}
  // 创建一个新的 UnixConn 用于在runtime表示一个UDS connection
	return newUnixConn(fd), nil
}

func newUnixConn(fd *netFD) *UnixConn { return &UnixConn{conn{fd}} }
// /usr/local/go/src/net/fd_unix.go
func (fd *netFD) accept() (netfd *netFD, err error) {
  // 会被 Block 在这里,直到有新连接进来 
	d, rsa, errcall, err := fd.pfd.Accept()
	...

  // 在 runtime 创建一个对应的 fd struct
	if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
		poll.CloseFunc(d)
		return nil, err
	}
	if err = netfd.init(); err != nil {
		netfd.Close()
		return nil, err
	}
	lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
	netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
	return netfd, nil
}

func (fd *netFD) init() error {
	return fd.pfd.Init(fd.net, true)
}

Read()

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
	if !c.ok() {
		return 0, syscall.EINVAL
	}
	n, err := c.fd.Read(b)
	if err != nil && err != io.EOF {
		err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
	}
	return n, err
}
// /usr/local/go/src/net/fd_unix.go
func (fd *netFD) Read(p []byte) (n int, err error) {
	n, err = fd.pfd.Read(p)
	runtime.KeepAlive(fd)
	return n, wrapSyscallError("read", err)
}

// /usr/local/go/src/internal/poll/fd_unix.go
// Write implements io.Writer.
func (fd *FD) Write(p []byte) (int, error) {
	。。。
	var nn int
	for {
		...
    // 当 p 中有数据时, n 为写入的长度,err为nil
		n, err := syscall.Write(fd.Sysfd, p[nn:max])
		if n > 0 {
			nn += n
		}
		if nn == len(p) { // 当已经把 p 中数据全部写入后,直接return
			return nn, err
		}
		if err == syscall.EAGAIN && fd.pd.pollable() {
      // 暂时不知道什么情况下回进到这里
			if err = fd.pd.waitWrite(fd.isFile); err == nil {
				continue
			}
		}
		...
	}
}

// /usr/local/go/src/syscall/syscall_unix.go
func Write(fd int, p []byte) (n int, err error) {
	if race.Enabled {
		race.ReleaseMerge(unsafe.Pointer(&ioSync))
	}
	if faketime && (fd == 1 || fd == 2) {
		n = faketimeWrite(fd, p)
		if n < 0 {
			n, err = 0, errnoErr(Errno(-n))
		}
	} else {
		n, err = write(fd, p)
	}
	if race.Enabled && n > 0 {
		race.ReadRange(unsafe.Pointer(&p[0]), n)
	}
	if msanenabled && n > 0 {
		msanRead(unsafe.Pointer(&p[0]), n)
	}
	return
}

Write()

// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
	...
	n, err := c.fd.Write(b)
	...
	return n, err
}
// /usr/local/go/src/net/fd_unix.go
func (fd *netFD) Write(p []byte) (nn int, err error) {
	nn, err = fd.pfd.Write(p)
	runtime.KeepAlive(fd)
	return nn, wrapSyscallError("write", err)
}

// /usr/local/go/src/internal/poll/fd_unix.go
// Write implements io.Writer.
func (fd *FD) Write(p []byte) (int, error) {
	...
	var nn int
	for {
		max := len(p)
		if fd.IsStream && max-nn > maxRW {
			max = nn + maxRW
		}
		n, err := syscall.Write(fd.Sysfd, p[nn:max])
		if n > 0 {
			nn += n
		}
		if nn == len(p) {
			return nn, err
		}
		if err == syscall.EAGAIN && fd.pd.pollable() {
			if err = fd.pd.waitWrite(fd.isFile); err == nil {
				continue
			}
		}
		...
	}
}

Reference