Test UDS Server
package main
import (
"log"
"net"
"os"
"os/signal"
"syscall"
)
func echoServer(c net.Conn) {
for {
buf := make([]byte, 512)
nr, err := c.Read(buf)
if err != nil {
return
}
data := buf[0:nr]
println("Server got:", string(data))
_, err = c.Write(data)
if err != nil {
log.Fatal("Writing client error: ", err)
}
}
}
func main() {
log.Println("Starting echo server")
ln, err := net.Listen("unix", "/tmp/go1.sock")
if err != nil {
log.Fatal("Listen error: ", err)
}
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, os.Interrupt, syscall.SIGTERM)
go func(ln net.Listener, c chan os.Signal) {
sig := <-c
log.Printf("Caught signal %s: shutting down.", sig)
ln.Close()
os.Exit(0)
}(ln, sigc)
for {
fd, err := ln.Accept()
if err != nil {
log.Fatal("Accept error: ", err)
}
go echoServer(fd)
}
}
Listen()
// /usr/local/go/src/net/dial.go
// Listen announces on the local network address.
//
// The network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket".
//
// For TCP networks, if the host in the address parameter is empty or
// a literal unspecified IP address, Listen listens on all available
// unicast and anycast IP addresses of the local system.
// To only use IPv4, use network "tcp4".
// The address can use a host name, but this is not recommended,
// because it will create a listener for at most one of the host's IP
// addresses.
// If the port in the address parameter is empty or "0", as in
// "127.0.0.1:" or "[::1]:0", a port number is automatically chosen.
// The Addr method of Listener can be used to discover the chosen
// port.
//
// See func Dial for a description of the network and address
// parameters.
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
// Listen announces on the local network address.
//
// See func Listen for a description of the network and address
// parameters.
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
// 对于 UDS,这里 addrs 一定等于 addrList[&UnixAddr{Nmae: "<uds_path>", Net: "unix"}]
...
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
...
}
...
return l, nil
}
// /usr/local/go/src/net/unixsock_posix.go
func (sl *sysListener) listenUnix(ctx context.Context, laddr *UnixAddr) (*UnixListener, error) {
fd, err := unixSocket(ctx, sl.network, laddr, nil, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
// 返回一个UnixListener,它实现了 Listener interface
return &UnixListener{fd: fd, path: fd.laddr.String(), unlink: true}, nil
}
func unixSocket(ctx context.Context, net string, laddr, raddr sockaddr, mode string, ctrlFn func(string, string, syscall.RawConn) error) (*netFD, error) {
var sotype int
switch net {
case "unix":
sotype = syscall.SOCK_STREAM
...
}
...
fd, err := socket(ctx, net, syscall.AF_UNIX, sotype, 0, false, laddr, raddr, ctrlFn)
if err != nil {
return nil, err
}
return fd, nil
}
// /usr/local/go/src/net/sock_posix.go
// socket returns a network file descriptor that is ready for
// asynchronous I/O using the network poller.
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// 调用这个 function 会在runtime时,调用 C library来触发一个syscall
// 返回的 s 是在当前proces中新创建的的file descriptor number(for 这个UDS)
s, err := sysSocket(family, sotype, proto)
...
// 创建一个在runtime中表示FD的struct,这个newFD function的实现很简单,就不展开了
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
...
return fd, nil
}
fd_unix.go
// /usr/local/go/src/net/fd_unix.go
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
UnixListener
// UnixListener is a Unix domain socket listener. Clients should
// typically use variables of type Listener instead of assuming Unix
// domain sockets.
type UnixListener struct {
fd *netFD
path string
unlink bool
unlinkOnce sync.Once
}
它实现了3个 interface
- Listener
- Conn
- Closer
Accept()
从process的角度来说,每个accept的 connection,都对应一个 fd:
# 当一个新的client出现,并与 server 尝试建立连接后(nc -U /tmp/go1.sock),server process 就会多一个 fd
$ lsof -p 2419
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
...
___go_bui 2419 shiwei 9u unix 0xec48c03b93989f9d 0t0 /tmp/go1.sock
# 当又一个新的client出现,并与server 尝试建立连接后(nc -U /tmp/go1.sock)),server process 就会又多一个 fd
$ lsof -p 2419
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
...
___go_bui 2419 shiwei 9u unix 0xec48c03b93989f9d 0t0 /tmp/go1.sock
# new fd
___go_bui 2419 shiwei 10u unix 0xec48c03b9398aa8d 0t0 /tmp/go1.sock
// /usr/local/go/src/net/unixsock.go
// Accept implements the Accept method in the Listener interface.
// Returned connections will be of type *UnixConn.
func (l *UnixListener) Accept() (Conn, error) {
...
// 返回一个 *UnixConn
c, err := l.accept()
...
return c, nil
}
// UnixConn is an implementation of the Conn interface for connections
// to Unix domain sockets.
type UnixConn struct {
conn
}
// /usr/local/go/src/net/unixsock_posix.go
func (ln *UnixListener) accept() (*UnixConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
// 创建一个新的 UnixConn 用于在runtime表示一个UDS connection
return newUnixConn(fd), nil
}
func newUnixConn(fd *netFD) *UnixConn { return &UnixConn{conn{fd}} }
// /usr/local/go/src/net/fd_unix.go
func (fd *netFD) accept() (netfd *netFD, err error) {
// 会被 Block 在这里,直到有新连接进来
d, rsa, errcall, err := fd.pfd.Accept()
...
// 在 runtime 创建一个对应的 fd struct
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}
Read()
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
// /usr/local/go/src/net/fd_unix.go
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}
// /usr/local/go/src/internal/poll/fd_unix.go
// Write implements io.Writer.
func (fd *FD) Write(p []byte) (int, error) {
。。。
var nn int
for {
...
// 当 p 中有数据时, n 为写入的长度,err为nil
n, err := syscall.Write(fd.Sysfd, p[nn:max])
if n > 0 {
nn += n
}
if nn == len(p) { // 当已经把 p 中数据全部写入后,直接return
return nn, err
}
if err == syscall.EAGAIN && fd.pd.pollable() {
// 暂时不知道什么情况下回进到这里
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
...
}
}
// /usr/local/go/src/syscall/syscall_unix.go
func Write(fd int, p []byte) (n int, err error) {
if race.Enabled {
race.ReleaseMerge(unsafe.Pointer(&ioSync))
}
if faketime && (fd == 1 || fd == 2) {
n = faketimeWrite(fd, p)
if n < 0 {
n, err = 0, errnoErr(Errno(-n))
}
} else {
n, err = write(fd, p)
}
if race.Enabled && n > 0 {
race.ReadRange(unsafe.Pointer(&p[0]), n)
}
if msanenabled && n > 0 {
msanRead(unsafe.Pointer(&p[0]), n)
}
return
}
Write()
// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
...
n, err := c.fd.Write(b)
...
return n, err
}
// /usr/local/go/src/net/fd_unix.go
func (fd *netFD) Write(p []byte) (nn int, err error) {
nn, err = fd.pfd.Write(p)
runtime.KeepAlive(fd)
return nn, wrapSyscallError("write", err)
}
// /usr/local/go/src/internal/poll/fd_unix.go
// Write implements io.Writer.
func (fd *FD) Write(p []byte) (int, error) {
...
var nn int
for {
max := len(p)
if fd.IsStream && max-nn > maxRW {
max = nn + maxRW
}
n, err := syscall.Write(fd.Sysfd, p[nn:max])
if n > 0 {
nn += n
}
if nn == len(p) {
return nn, err
}
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
...
}
}