【Golang】使用 - 优雅重启/关闭进程

Posted by 西维蜀黍 on 2021-04-05, Last Modified on 2024-09-18

Graceful Exit

func Notify(c chan<- os.Signal, sig ...os.Signal)

Notify causes package signal to relay incoming signals to c. If no signals are provided, all incoming signals will be relayed to c. Otherwise, just the provided signals will.

Package signal will not block sending to c: the caller must ensure that c has sufficient buffer space to keep up with the expected signal rate. For a channel used for notification of just one signal value, a buffer of size 1 is sufficient.

It is allowed to call Notify multiple times with the same channel: each call expands the set of signals sent to that channel. The only way to remove signals from the set is to call Stop.

It is allowed to call Notify multiple times with different channels and the same signals: each channel receives copies of incoming signals independently.

Ref

Demo

Demo1

package main

import (
	"fmt"
	"os"
	"os/signal"
)

func main() {
	// Set up channel on which to send signal notifications.
	// We must use a buffered channel or risk missing the signal
	// if we're not ready to receive when the signal is sent.
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

	// Block until a signal is received.
	s := <-c
	fmt.Println("Got signal:", s)
}

Demo2

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
)

func main() {

    sigs := make(chan os.Signal, 1)
    done := make(chan bool, 1)

    // When we run this program it will block waiting for a signal. By typing ctrl-C (which the terminal shows as ^C) we can send a SIGINT signal, causing the program to print interrupt and then exit.
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        sig := <-sigs
        fmt.Println()
        fmt.Println(sig)
        done <- true
    }()

    fmt.Println("awaiting signal")
    <-done
    fmt.Println("exiting")
}
$ go run signals.go
awaiting signal
^C
interrupt
exiting

Graceful Restart

If you have a Golang HTTP service, chances are, you will need to restart it on occasion to upgrade the binary or change some configuration. And if you (like me) have been taking graceful restart for granted because the webserver took care of it, you may find this recipe very handy because with Golang you need to roll your own.

There are actually two problems that need to be solved here. First is the UNIX side of the graceful restart, i.e. the mechanism by which a process can restart itself without closing the listening socket. The second problem is ensuring that all in-progress requests are properly completed or timed-out.

值得思考的一种情况是,如果在处理这些 in-progress requests 的过程中,又有 request 来了,by right 需要将这些后来的 request处理完后,才能真正进行 restart;而如果现在处理这些后来的 request 的时候,新的 request又来了呢?so on and so forth。

Via 负载均衡

In practise, 我们可能会在 Nginx,服务端代码经常需要升级,对于线上系统的升级常用的做法是,通过前端的负载均衡(如nginx)来保证升级时至少有一个服务可用,依次(灰度)升级。 而另一种更方便的方法是在应用上做热重启,直接升级应用而不停服务。

package main

import (
	"context"
	"flag"
	"fmt"
	"net"
	"net/http"
	"os"
	"os/exec"
	"os/signal"
	"syscall"
)

var (
	upgrade bool
	ln      net.Listener
	server  *http.Server
)

func init() {
	flag.BoolVar(&upgrade, "upgrade", false, "user can't use this")
}

func hello(w http.ResponseWriter, r *http.Request) {
	fmt.Fprintf(w, "hello world from pid:%d, ppid: %d\n", os.Getpid(), os.Getppid())
}

func main() {
	flag.Parse()
	http.HandleFunc("/", hello)
	server = &http.Server{Addr: ":8999"}
	var err error
	if upgrade {
		fd := os.NewFile(3, "")
		ln, err = net.FileListener(fd)
		if err != nil {
			fmt.Printf("fileListener fail, error: %s\n", err)
			os.Exit(1)
		}
		fd.Close()
	} else {
		ln, err = net.Listen("tcp", server.Addr)
		if err != nil {
			fmt.Printf("listen %s fail, error: %s\n", server.Addr, err)
			os.Exit(1)
		}
	}
	go func() {
		err := server.Serve(ln)
		if err != nil && err != http.ErrServerClosed {
			fmt.Printf("serve error: %s\n", err)
		}
	}()
	setupSignal()
	fmt.Println("over")
}

func setupSignal() {
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGUSR2, syscall.SIGINT, syscall.SIGTERM)
	sig := <-ch
	switch sig {
	case syscall.SIGUSR2:
		fmt.Println("SIGUSR2 received")
		err := forkProcess()
		if err != nil {
			fmt.Printf("fork process error: %s\n", err)
		}
		err = server.Shutdown(context.Background())
		if err != nil {
			fmt.Printf("shutdown after forking process error: %s\n", err)
		}
	case syscall.SIGINT, syscall.SIGTERM:
		signal.Stop(ch)
		close(ch)
		err := server.Shutdown(context.Background())
		if err != nil {
			fmt.Printf("shutdown error: %s\n", err)
		}
	}
}

func forkProcess() error {
	flags := []string{"-upgrade"}
	fmt.Printf("forkProcess - arg: %v", os.Args[0])
	cmd := exec.Command(os.Args[0], flags...)
	cmd.Stderr = os.Stderr
	cmd.Stdout = os.Stdout
	l, _ := ln.(*net.TCPListener)
	lfd, err := l.File()
	if err != nil {
		return err
	}
	cmd.ExtraFiles = []*os.File{lfd}
	cmd.
	return cmd.Start()
}

Via 热重载(Hot Reload)

热重载技术允许服务在不中断的情况下重新加载配置文件或代码。某些框架(如 Nginx、Unicorn 等)支持通过发送特定信号来实现热重载。

Via Graceful Upgrades

Just Exec()

Ok, how hard can it be. Let’s just Exec() the new binary (without doing a fork first). This does exactly what we want, by replacing the currently running code with the new code from disk.

// The following is pseudo-Go.

func main() {
	var ln net.Listener
	if isUpgrade {
		ln = net.FileListener(os.NewFile(uintptr(fdNumber), "listener"))
	} else {
		ln = net.Listen(network, address)
	}
	
	go handleRequests(ln)

	<-waitForUpgradeRequest

	syscall.Exec(os.Argv[0], os.Argv[1:], os.Environ())
}

Unfortunately this has a fatal flaw since we can’t “undo” the exec. Imagine a configuration file with too much white space in it or an extra semicolon. The new process would try to read that file, get an error and exit.

Even if the exec call works, this solution assumes that initialisation of the new process is practically instantaneous. We can get into a situation where the kernel refuses new connections because the listen queue is overflowing.

Specifically, the new binary is going to spend some time after Exec() to initialise, which delays calls to Accept(). This means the backlog of new connections grows until some are dropped. Plain exec is out of the game.

Delay Accept()

package main

import (
	"fmt"
	"log"
	"net"
	"time"
)

func handleConnection(conn net.Conn) {
	defer conn.Close()

	// Buffer to hold incoming data.
	buffer := make([]byte, 1024)

	for {
		// Read data from the connection.
		n, err := conn.Read(buffer)
		if err != nil {
			if err.Error() != "EOF" {
				log.Println("Error reading data:", err)
			}
			break
		}

		// Process the received data.
		data := string(buffer[:n])
		fmt.Printf("Received: %s\n", data)

		// Send a response back to the client.
		response := "Message received\n"
		_, err = conn.Write([]byte(response))
		if err != nil {
			log.Println("Error sending response:", err)
			break
		}
	}
}
func main() {
	// Start listening on port 8080.
	listener, err := net.Listen("tcp", ":8080")
	if err != nil {
		log.Fatal(err)
	}
	defer listener.Close()
	fmt.Println("TCP server is listening on port 8080...")

	time.Sleep(10 * time.Minute)
	for {
		// Accept a new connection.
		conn, err := listener.Accept()
		if err != nil {
			log.Println("Error accepting connection:", err)
			continue
		}
		// Handle the connection in a new goroutine.
		go handleConnection(conn)
	}
}
# run the server

# establish a conn
$ telnet localhost 8080
Trying ::1...
Connected to localhost.
Escape character is '^]'.
aaaa

$ sudo lsof -i -P | grep 8080
___1go_bu  4002         weishi    3u  IPv6 0xa481a7d36a23313b      0t0    TCP *:8080 (LISTEN)
telnet     4035         weishi    5u  IPv6 0x836c848941dff377      0t0    TCP localhost:57536->localhost:8080 (ESTABLISHED)

Listen() all the things

Since just using exec is out of the question, we can try the next best thing. Lets fork and exec a new process which then goes through its usual start up routine. At some point it will create a few sockets by listening on some addresses, except that won’t work out of the box due to errno 48, otherwise known as Address Already In Use. The kernel is preventing us from listening on the address and port combination used by the old process.

Of course, there is a flag to fix that: SO_REUSEPORT. This tells the kernel to ignore the fact that there is already a listening socket for a given address and port, and just allocate a new one.

func main() {
	ln := net.ListenWithReusePort(network, address)

	go handleRequests(ln)

	<-waitForUpgradeRequest

	cmd := exec.Command(os.Argv[0], os.Argv[1:])
	cmd.Start()

	<-waitForNewProcess
}

Now both processes have working listening sockets and the upgrade works. Right?

SO_REUSEPORT is a little bit peculiar in what it does inside the kernel. As systems programmers, we tend to think of a socket as the file descriptor that is returned by the socket call. The kernel however makes a distinction between the data structure of a socket, and one or more file descriptors pointing at it. It creates a separate socket structure if you bind using SO_REUSEPORT, not just another file descriptor. The old and the new process are thus referring to two separate sockets, which happen to share the same address. This leads to an unavoidable race condition: new-but-not-yet-accepted connections on the socket used by the old process will be orphaned and terminated by the kernel. GitHub wrote an excellent blog post about this problem.

The engineers at GitHub solved the problems with SO_REUSEPORT by using an obscure feature of the sendmsg syscall called ancilliary data. It turns out that ancillary data can include file descriptors. Using this API made sense for GitHub, since it allowed them to integrate elegantly with HAProxy. Since we have the luxury of changing the program we can use simpler alternatives.

NGINX: share sockets via fork and exec

NGINX is the tried and trusted workhorse of the Internet, and happens to support graceful upgrades. As a bonus we also use it at Cloudflare, so we were confident in its implementation.

It is written in a process-per-core model, which means that instead of spawning a bunch of threads NGINX runs a process per logical CPU core. Additionally, there is a primary process which orchestrates graceful upgrades.

The primary is responsible for creating all listen sockets used by NGINX and sharing them with the workers. This is fairly straightforward: first, the FD_CLOEXEC bit is cleared on all listen sockets. This means that they are not closed when the exec() syscall is made. The primary then does the customary fork() / exec() dance to spawn the workers, passing the file descriptor numbers as an environment variable.

Graceful upgrades make use of the same mechanism. We can spawn a new primary process (PID 1176) by following the NGINX documentation. This inherits the existing listeners from the old primary process (PID 1017) just like workers do. The new primary then spawns its own workers:

 CGroup: /system.slice/nginx.service
       	├─1017 nginx: master process /usr/sbin/nginx -g daemon on; master_process on;
       	├─1019 nginx: worker process
       	├─1021 nginx: worker process
       	├─1024 nginx: worker process
       	├─1026 nginx: worker process
       	├─1027 nginx: worker process
       	├─1028 nginx: worker process
       	├─1029 nginx: worker process
       	├─1030 nginx: worker process
       	├─1176 nginx: master process /usr/sbin/nginx -g daemon on; master_process on;
       	├─1187 nginx: worker process
       	├─1188 nginx: worker process
       	├─1190 nginx: worker process
       	├─1191 nginx: worker process
       	├─1192 nginx: worker process
       	├─1193 nginx: worker process
       	├─1194 nginx: worker process
       	└─1195 nginx: worker process

At this point there are two completely independent NGINX processes running. PID 1176 might be a new version of NGINX, or could use an updated config file. When a new connection arrives for port 80, one of the 16 worker processes is chosen by the kernel.

After executing the remaining steps, we end up with a fully replaced NGINX:

   CGroup: /system.slice/nginx.service
       	├─1176 nginx: master process /usr/sbin/nginx -g daemon on; master_process on;
       	├─1187 nginx: worker process
       	├─1188 nginx: worker process
       	├─1190 nginx: worker process
       	├─1191 nginx: worker process
       	├─1192 nginx: worker process
       	├─1193 nginx: worker process
       	├─1194 nginx: worker process
       	└─1195 nginx: worker process

Now, when a request arrives the kernel chooses between one of the eight remaining processes.

This process is rather fickle, so NGINX has a safeguard in place. Try requesting a second upgrade while the first hasn’t finished, and you’ll find the following message in the error log:

[crit] 1176#1176: the changing binary signal is ignored: you should shutdown or terminate before either old or new binary's process

This is very sensible, there is no good reason why there should be more than two processes at any given point in time. In the best case, we also want this behaviour from our Go solution.

Demo

package main

import (
	"flag"
	"fmt"
	"log"
	"net"
	"os"
	"os/signal"
	"syscall"

	"github.com/cloudflare/tableflip"
)

func handleConnection(conn net.Conn) {
	defer conn.Close()

	// Buffer to hold incoming data.
	buffer := make([]byte, 1024)

	for {
		// Read data from the connection.
		n, err := conn.Read(buffer)
		if err != nil {
			if err.Error() != "EOF" {
				log.Println("Error reading data:", err)
			}
			break
		}

		// Process the received data.
		data := string(buffer[:n])
		fmt.Printf("Received: %s\n", data)

		// Send a response back to the client.
		response := "Message received\n"
		_, err = conn.Write([]byte(response))
		if err != nil {
			log.Println("Error sending response:", err)
			break
		}
	}
}

func main() {
	var (
		listenAddr = "localhost:8080"
		pidFile    = flag.String("pid-file", "", "`Path` to pid file")
	)

	flag.Parse()
	log.SetPrefix(fmt.Sprintf("%d ", os.Getpid()))

	upg, err := tableflip.New(tableflip.Options{
		PIDFile: *pidFile,
	})
	if err != nil {
		panic(err)
	}
	defer upg.Stop()

	// Do an upgrade on SIGHUP
	go func() {
		sig := make(chan os.Signal, 1)
		signal.Notify(sig, syscall.SIGHUP)
		for range sig {
			log.Println("SIGHUP received, upgrading")
			err := upg.Upgrade()
			if err != nil {
				log.Println("upgrade failed:", err)
			}
		}
	}()

	ln, err := upg.Fds.Listen("tcp", listenAddr)
	if err != nil {
		log.Fatalln("Can't listen:", err)
	}

	go func() {
		defer ln.Close()

		log.Printf("listening on %s", ln.Addr())

		for {
			c, err := ln.Accept()
			if err != nil {
				return
			}

			// Handle the connection in a new goroutine.
			go handleConnection(c)
		}
	}()

	log.Printf("ready")
	if err := upg.Ready(); err != nil {
		panic(err)
	}
	<-upg.Exit()
}
# run the server
$ go run sw.go -pid-file ./pid

$ sudo lsof -i -P | grep 8080
sw         7257         weishi    7u  IPv4 0x4fce1b642fdcf067      0t0    TCP localhost:8080 (LISTEN)
sw         7257         weishi   11u  IPv4 0x4fce1b642fdcf067      0t0    TCP localhost:8080 (LISTEN)

# establish a conn
$ telnet localhost 8080
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
abc
Message received

$ sudo lsof -i -P | grep 8080
sw         7257         weishi    7u  IPv4 0x4fce1b642fdcf067      0t0    TCP localhost:8080 (LISTEN)
sw         7257         weishi   11u  IPv4 0x4fce1b642fdcf067      0t0    TCP localhost:8080 (LISTEN)
sw         7257         weishi   12u  IPv4 0x787fa2ad0e407131      0t0    TCP localhost:8080->localhost:57736 (ESTABLISHED)
telnet     7461         weishi    5u  IPv4 0xd79464cff9f26014      0t0    TCP localhost:57736->localhost:8080 (ESTABLISHED)

Ref

Reference