【Golang】类型 - Channel

Posted by 西维蜀黍 on 2020-11-24, Last Modified on 2024-01-09

Channel

Channels are type safe message queues that have the intelligence to control the behavior of any goroutine attempting to receive or send on it. A channel acts as a conduit between two goroutines and will synchronize the exchange of any resource that is passed through it. It is the channel’s ability to control the goroutines interaction that creates the synchronization mechanism.

When a channel is created with no capacity, it is called an unbuffered channel. In turn, a channel created with capacity is called a buffered channel.

熟悉Golang的人都知道一句名言:“使用通信来共享内存,而不是通过共享内存来通信”。这句话有两层意思,Go语言确实在sync包中提供了传统的锁机制,但更推荐使用channel来解决并发问题。

它的操作符是箭头 <-

ch <- v    // 发送值v到Channel ch中
v := <-ch  // 从Channel ch中接收数据,并将数据赋值给v

Channel类型

Channel类型的定义格式如下:

ChannelType = ( "chan" | "chan<-" | "<-chan" ) ElementType .

它包括三种类型的定义。可选的<-代表channel的方向。如果没有指定方向,那么Channel就是双向的,既可以接收数据,也可以发送数据。

chan T          // 可以接收和发送类型为 T 的数据
chan<- float64  // 只可以用来发送 float64 类型的数据
<-chan int      // 只可以用来接收 int 类型的数据

<-总是优先和最左边的类型结合。(The <- operator associates with the leftmost chan possible)

chan<- chan int    // 等价 chan<- (chan int)
chan<- <-chan int  // 等价 chan<- (<-chan int)
<-chan <-chan int  // 等价 <-chan (<-chan int)
chan (<-chan int)

设置 Channel 容量 - Buffered Channels

Channels can be buffered. Provide the buffer length as the second argument to make to initialize a buffered channel:

make(chan int, 100)
  • Sends to a buffered channel block only when the buffer is full. Receives block when the buffer is empty.
  • 一个nil channel不会通信。

可以通过内建的close()方法可close Channel。

你可以在多个goroutine从/往 一个channel 中 receive/send 数据, 不必考虑额外的同步措施。

Channel可以作为一个先入先出(FIFO)的队列,接收的数据和发送的数据的顺序是一致的。

如果没有为 Channel 设置容量,则为Blocking Buffer,或者容量设置为0, 说明Channel没有 Buffer。

  • In this case, sends and receives block until the other side is ready. This allows goroutines to synchronize without explicit locks or condition variables.

Channel Directions

Channels are a typed conduit through which you can send and receive values with the channel operator, <-.

ch <- v    // Send v to channel ch.
v := <-ch  // Receive from ch, and
           // assign value to v.

(The data flows in the direction of the arrow.)

Like maps and slices, channels must be created before use:

ch := make(chan int)

By default, sends and receives block until the other side is ready. This allows goroutines to synchronize without explicit locks or condition variables.

The example code sums the numbers in a slice, distributing the work between two goroutines. Once both goroutines have completed their computation, it calculates the final result.

package main

import "fmt"

func sum(s []int, c chan int) {
	sum := 0
	for _, v := range s {
		sum += v
	}
	c <- sum // send sum to c
}

func main() {
	s := []int{7, 2, 8, -9, 4, 0}

	c := make(chan int)
	go sum(s[:len(s)/2], c)
	go sum(s[len(s)/2:], c)
	x, y := <-c, <-c // receive from c

	fmt.Println(x, y, x+y) // -5 17 12
}

Example

package main

import "fmt"

func ping(pings chan<- string, msg string) {
    // This ping function only accepts a channel for sending values. It would be a compile-time error to try to receive on this channel.
    pings <- msg
}

func pong(pings <-chan string, pongs chan<- string) {
    // The pong function accepts one channel for receives (pings) and a second for sends (pongs).
    msg := <-pings
    pongs <- msg
}

func main() {
    pings := make(chan string, 1)
    pongs := make(chan string, 1)
    ping(pings, "passed message")
    pong(pings, pongs)
    fmt.Println(<-pongs)
}

output:

$ go run channel-directions.go
passed message

Send Channel

send语句用来往Channel中发送数据, 如ch <- 3。 它的定义如下:

SendStmt = Channel "<-" Expression .
Channel  = Expression .

在通讯(communication)开始前channel和expression必选先求值出来(evaluated),比如下面的(3+4)先计算出7然后再发送给channel。

c := make(chan int)
defer close(c)
go func() { c <- 3 + 4 }()
i := <-c
fmt.Println(i)

send被执行前(proceed)通讯(communication)一直被阻塞着。如前所言,无缓存的channel只有在receiver准备好后send才被执行。如果有缓存,并且缓存未满,则send会被执行。

往为nil 的 channel发送数据,一直会被阻塞。

Receive from Channel

<-ch用来从channel ch中接收数据,这个表达式会一直被block,直到有数据可以接收或者 ch 被 close。

从一个nil channel(未初始化的channel)中接收数据会一直被block。

从一个已经被close的channel中接收数据不会被阻塞,而是立即返回,按照 FIFO,先一个一个接收已发送的数据(如果仍然存在未没接收的数据),(如果已经不存在没接收的数据)就会接收到元素类型的零值(zero value)。

如前所述,你可以使用一个额外的返回参数(ok)来检查channel是否已经被close。

x, ok := <-ch

如果OK 是false,表明接收的x是产生的零值,这个channel被 close 了或者为 nil (未被初始化)。

证明 - 当 Channel close 时,Channel Receiver会收到通知

以下程序证明了:当 Channel close时,Channel Receiver会收到通知,因而不会继续被block:

package main

import (
	"fmt"
	"time"
)

func main() {
	jobs := make(chan int, 5)
	done := make(chan bool)

	go func() {
		for {
			j, more := <-jobs
			if more {
				fmt.Println("received job", j, time.Now())
			} else {
				fmt.Println("received all jobs", time.Now())
				done <- true
				return
			}
		}
	}()

	for j := 1; j <= 3; j++ {
		jobs <- j
		fmt.Println("sent job", j, time.Now())
		if j == 2 {
			time.Sleep(2 * time.Second)
			close(jobs)
			break
		}
		time.Sleep(5 * time.Second)
	}
	fmt.Println("sent all jobs", time.Now())

	<-done
}

ouput:

sent job 1 2021-05-26 12:06:59.600808 +0800 +08 m=+0.000072351
received job 1 2021-05-26 12:06:59.600823 +0800 +08 m=+0.000087571
sent job 2 2021-05-26 12:07:04.605585 +0800 +08 m=+5.004801492
received job 2 2021-05-26 12:07:04.60561 +0800 +08 m=+5.004826602
received all jobs 2021-05-26 12:07:06.606126 +0800 +08 m=+7.005324137
sent all jobs 2021-05-26 12:07:06.606106 +0800 +08 m=+7.005304197

从代码逻辑可以看出:

  • 在发送 job 2 后,会 sleep 2秒,这时候启动的 goroutine会被block 在 j, more := <-jobs ,因为 jobs Channels 中没有数据
  • 在发送 job 2 且 sleep 2秒后, jobs Channels 被 close后,此时启动的 goroutine不被block,且拿到的 more 为 false,这可以从print出来的 received all jobs 2021-05-26 12:07:06.606126 +0800 +08 m=+7.005324137 推断出来

Channel 存在的三种状态

  • nil:即未被初始化(通过make),而只是进行了声明,或者手动赋值为 nil
  • Active:正常的channel,可读或者可写
  • closed:已关闭,千万不要误认为关闭channel后,channel的值是nil

Unbuffered channels与Buffered channels

Unbuffered channels是指缓冲区大小为0的channel,这种channel的接收者会被阻塞,直至接收到消息,发送者会被阻塞,直至接收者接收到消息,这种机制可以用于两个goroutine进行状态同步;

Buffered channels拥有缓冲区,当缓冲区已满时,发送者会阻塞;当缓冲区为空时,接收者会阻塞。

Unbuffered Channels Unbuffered channels have no capacity and therefore require both goroutines to be ready to make any exchange. When a goroutine attempts to send a resource to an unbuffered channel and there is no goroutine waiting to receive the resource, the channel will lock the sending goroutine and make it wait. When a goroutine attempts to receive from an unbuffered channel, and there is no goroutine waiting to send a resource, the channel will lock the receiving goroutine and make it wait.

In the diagram above, we see an example of two goroutines making an exchange using an unbuffered channel. In step 1, the two goroutines approach the channel and then in step 2, the goroutine on the left sticks his hand into the channel or performs a send. At this point, that goroutine is locked in the channel until the exchange is complete. Then in step 3, the goroutine on the right places his hand into the channel or performs a receive. That goroutine is also locked in the channel until the exchange is complete. In step 4 and 5 the exchange is made and finally in step 6, both goroutines are free to remove their hands and go on their way.

Synchronization is inherent in the interaction between the send and the receive. One can not happen without the other. The nature of an unbuffered channel is guaranteed synchronization.

Buffered Channels Buffered channels have capacity and therefore can behave a bit differently. When a goroutine attempts to send a resource to a buffered channel and the channel is full, the channel will lock the goroutine and make it wait until a buffer becomes available. If there is room in the channel, the send can take place immediately and the goroutine can move on. When a goroutine attempts to receive from a buffered channel and the buffered channel is empty, the channel will lock the goroutine and make it wait until a resource has been sent.

In the diagram above, we see an example of two goroutines adding and removing items from a buffered channel independently. In step 1, the goroutine on the right is removing a resource from the channel or performing a receive. In step 2, the goroutine on the right can remove the resource independent of the goroutine on the left adding a new resource to the channel. In step 3, both goroutines are adding and removing a resource from the channel at the same time and in step 4 both goroutines are done.

Synchronization still occurs within the interactions of receives and sends, however when the queue has buffer availability, the sends will not lock. Receives will not lock when there is something to receive from the channel. Consequently, if the buffer is full or if there is nothing to receive, a buffered channel will behave very much like an unbuffered channel.

Blocking Channel (Unbuffered Channels)

缺省情况下,发送和接收会一直阻塞着,这称为Blocking Channel (Unbuffered Channels)。

在以下情况中,sender会一直被blocked,因为没有receiver

func main() {
	messages := make(chan string)
  // sender
	go func() {
		messages <- "ping"
    // sent 永远不会被 print,因为永远会被blocked在一行(因为一直没有receiver,就算到了程序结束的时候)
		fmt.Println("sent")
	}()
  // receiver
	go func() {

	}()
	time.Sleep(100 * time.Second)
}

类似地,最后一个receiver会一直被blocked,因为没有第二个sender:

func main() {
	messages := make(chan string)

	go func() {
		messages <- "ping"
	}()

	go func() {
		fmt.Println(<-messages)
		fmt.Println(<-messages) // never printed
	}()
	time.Sleep(100 * time.Second)
}

//output: 
ping
ping

这种方式可以用来在gororutine中进行同步,而不必使用显示的锁或者条件变量。

Example 1

如官方的例子中x, y := <-c, <-c这句会一直等待计算结果发送到channel中。

import "fmt"

func sum(s []int, c chan int) {
	sum := 0
	for _, v := range s {
		sum += v
	}
	c <- sum // send sum to c
}

func main() {
	s := []int{7, 2, 8, -9, 4, 0}

	c := make(chan int)
	go sum(s[:len(s)/2], c)
	go sum(s[len(s)/2:], c)
	x, y := <-c, <-c // receive from c

	fmt.Println(x, y, x+y)
}

Example 2

Channels are the pipes that connect concurrent goroutines. You can send values into channels from one goroutine and receive those values into another goroutine.

Create a new channel with make(chan val-type). Channels are typed by the values they convey.

Send a value into a channel using the channel <- syntax. Here we send “ping” to the messages channel we made above, from a new goroutine.

The <-channel syntax receives a value from the channel. Here we’ll receive the “ping” message we sent above and print it out.

When we run the program the “ping” message is successfully passed from one goroutine to another via our channel.

By default sends and receives block until both the sender and receiver are ready. This property allowed us to wait at the end of our program for the “ping” message without having to use any other synchronization.

package main

import "fmt"

func main() {

    messages := make(chan string)

    go func() { messages <- "ping" }()

    msg := <-messages
    fmt.Println(msg)
}

running:

$ go run channels.go 
ping

Bufferred Channel

By default channels are unbuffered, meaning that they will only accept sends (chan <-) if there is a corresponding receive (<- chan) ready to receive the sent value. Buffered channels accept a limited number of values without a corresponding receiver for those values.

Channels can be buffered. Provide the buffer length as the second argument to make to initialize a buffered channel:

ch := make(chan int, 100)

Example

package main

import "fmt"

func main() {
	ch := make(chan int, 2)
	ch <- 1 // won't be blocked here, as buffer is not full
	ch <- 2 // won't be blocked here, as buffer is not full
	fmt.Println("before receive channel")
	fmt.Println(<-ch)
	fmt.Println(<-ch)
}

output:

before receive channel
1
2

Sends to a buffered channel block only when the buffer is full. Receives block when the buffer is empty.

package main

import "fmt"

func main() {
	ch := make(chan int, 2)
	ch <- 1 // won't be blocked here, as buffer is not full
	ch <- 2 // won't be blocked here, as buffer is not full
	ch <- 2 // will be blocked here, which will trigger deadlock, as buffer is not full
	fmt.Println("before receive channel")
	fmt.Println(<-ch)
	fmt.Println(<-ch)
}

output:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /Users/shiwei/SW/GoPlayground/sw.go:9 +0x9b

Process finished with exit code 2

值得一提的是,通常我们会将 buffered channel用在多个gorontine中,以上例子只是为了demo(没有体现出使用 buffered channel 的好处,对于解决某个特定问题来说)。

range

A sender can close a channel to indicate that no more values will be sent. Receivers can test whether a channel has been closed by assigning a second parameter to the receive expression: after

v, ok := <-ch

ok is false if there are no more values to receive and the channel is closed.

The loop for i := range c receives values from the channel repeatedly until it is closed.

Note: Only the sender should close a channel, never the receiver. Sending on a closed channel will cause a panic.

Another note: Channels aren’t like files; you don’t usually need to close them. Closing is only necessary when the receiver must be told there are no more values coming, such as to terminate a range loop.

但是,在遍历时,如果channel 没有关闭,那么会一直等待下去,出现 deadlock 的错误。

Example 1

package main

import (
	"fmt"
)

func fibonacci(n int, c chan int) {
	x, y := 0, 1
	for i := 0; i < n; i++ {
		c <- x
		x, y = y, x+y
	}
	close(c)
}

func main() {
	c := make(chan int, 10)
	go fibonacci(cap(c), c)
	for i := range c {
		fmt.Println(i)
	}
}

Output:

0
1
1
2
3
5
8
13
21
34

Example 2

package main

import (
	"fmt"
	"time"
)

func main() {
	c := make(chan int)
	go func() {
		for i := 0; i < 10; i = i + 1 {
			c <- i
			time.Sleep(1 * time.Second)
		}
		close(c)
	}()
	for i := range c {
		fmt.Printf("time: %v, i: %v\n", time.Now(), i)
	}
	fmt.Println("Finished")
}

output:

time: 2021-05-25 22:52:19.371425 +0800 +08 m=+0.000074731, i: 0
time: 2021-05-25 22:52:20.371405 +0800 +08 m=+1.000096894, i: 1
time: 2021-05-25 22:52:21.372109 +0800 +08 m=+2.000842357, i: 2
time: 2021-05-25 22:52:22.372196 +0800 +08 m=+3.000971690, i: 3
time: 2021-05-25 22:52:23.372497 +0800 +08 m=+4.001314263, i: 4
time: 2021-05-25 22:52:24.372517 +0800 +08 m=+5.001376337, i: 5
time: 2021-05-25 22:52:25.37295 +0800 +08 m=+6.001851660, i: 6
time: 2021-05-25 22:52:26.373637 +0800 +08 m=+7.002579942, i: 7
time: 2021-05-25 22:52:27.374022 +0800 +08 m=+8.003006845, i: 8
time: 2021-05-25 22:52:28.374147 +0800 +08 m=+9.003173458, i: 9
Finished

range c产生的迭代值为Channel中发送的值,它会一直迭代直到channel被 close 。

如果把close(c)注释掉,程序会一直阻塞在for …… range那一行。

time: 2021-05-25 22:54:00.500467 +0800 +08 m=+0.000131651, i: 0
time: 2021-05-25 22:54:01.500468 +0800 +08 m=+1.000173934, i: 1
time: 2021-05-25 22:54:02.504109 +0800 +08 m=+2.003856943, i: 2
time: 2021-05-25 22:54:03.504406 +0800 +08 m=+3.004195906, i: 3
time: 2021-05-25 22:54:04.504601 +0800 +08 m=+4.004433119, i: 4
time: 2021-05-25 22:54:05.504618 +0800 +08 m=+5.004491722, i: 5
time: 2021-05-25 22:54:06.505093 +0800 +08 m=+6.005008355, i: 6
time: 2021-05-25 22:54:07.505197 +0800 +08 m=+7.005154989, i: 7
time: 2021-05-25 22:54:08.505187 +0800 +08 m=+8.005186492, i: 8
time: 2021-05-25 22:54:09.505178 +0800 +08 m=+9.005219076, i: 9

select

Go’s select lets you wait on multiple channel operations. Combining goroutines and channels with select is a powerful feature of Go.

A select blocks until one of its cases can run, then it executes that case. It chooses one at random if multiple are ready.

Example 1

package main

import (
    "fmt"
    "time"
)

func main() {
    // Each channel will receive a value after some amount of time, to simulate e.g. blocking RPC operations executing in concurrent goroutines.
    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        c1 <- "one"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        // We’ll use select to await both of these values simultaneously, printing each one as it arrives.
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
}
# Note that the total execution time is only ~2 seconds since both the 1 and 2 second Sleeps execute concurrently.
$ time go run select.go 
received one
received two

real    0m2.245s

Example 2

select语句选择一组可能的send操作和receive操作去处理。它类似switch,但是只是用来处理通讯(communication)操作。

它的case可以是send语句,也可以是receive语句,亦或者default

receive语句可以将值赋值给一个或者两个变量。它必须是一个receive操作。

最多允许有一个default case,它可以放在case列表的任何位置,尽管我们大部分会将它放在最后。

package main

import "fmt"

func fibonacci(c, quit chan int) {
	x, y := 0, 1
	for {
		select {
		case c <- x:
			x, y = y, x+y
		case <-quit:
			fmt.Println("quit")
			return
		}
	}
}
func main() {
	c := make(chan int)
	quit := make(chan int)
	go func() {
		for i := 0; i < 10; i++ {
			fmt.Println(<-c)
		}
		quit <- 0
	}()
	fibonacci(c, quit)
}

output:

0
1
1
2
3
5
8
13
21
34
quit

如果有同时多个case去处理,比如同时有多个channel可以接收数据,那么Go会伪随机的选择一个case处理(pseudo-random)。如果没有case需要处理,则会选择default去处理,如果default case存在的情况下。如果没有default case,则select语句会阻塞,直到某个case需要处理。

需要注意的是,nil channel上的操作会一直被阻塞,如果没有default case,只有nil channel的select会一直被阻塞。

select语句和switch语句一样,它不是循环,它只会选择一个case来处理,如果想一直处理channel,你可以在外面加一个无限的for循环:

for {
	select {
	case c <- x:
		x, y = y, x+y
	case <-quit:
		fmt.Println("quit")
		return
	}
}

Example 3 - Deafault Selection

The default case in a select is run if no other case is ready.

Use a default case to try a send or receive without blocking:

package main

import (
	"fmt"
	"time"
)

func main() {
	tick := time.Tick(100 * time.Millisecond)
	boom := time.After(500 * time.Millisecond)
	fmt.Println("start: ", time.Now())
	for {
		select {
		case <-tick:
			fmt.Println("tick.")
		case <-boom:
			fmt.Println("BOOM!")
			return
		default:
			fmt.Println("    .", time.Now())
			time.Sleep(50 * time.Millisecond)
		}
	}
}

Ouput:

start:  2021-05-30 00:06:25.112943 +0800 +08 m=+0.000076131
    . 2021-05-30 00:06:25.11308 +0800 +08 m=+0.000213931
    . 2021-05-30 00:06:25.163775 +0800 +08 m=+0.050907950
tick.
    . 2021-05-30 00:06:25.214493 +0800 +08 m=+0.101626169
    . 2021-05-30 00:06:25.264906 +0800 +08 m=+0.152038849
tick.
    . 2021-05-30 00:06:25.315246 +0800 +08 m=+0.202378358
    . 2021-05-30 00:06:25.365353 +0800 +08 m=+0.252484648
tick.
    . 2021-05-30 00:06:25.415521 +0800 +08 m=+0.302652447
    . 2021-05-30 00:06:25.46862 +0800 +08 m=+0.355750735
tick.
    . 2021-05-30 00:06:25.518711 +0800 +08 m=+0.405841375
    . 2021-05-30 00:06:25.569271 +0800 +08 m=+0.456400844
tick.
BOOM!

Example 4 - Timeout

select有很重要的一个应用就是超时处理。 因为上面我们提到,如果没有case需要处理,select语句就会一直阻塞着。这时候我们可能就需要一个超时操作,用来处理超时的情况。

下面这个例子我们会在2秒后往channel c1中发送一个数据,但是select设置为1秒超时,因此我们会打印出timeout 1,而不是result 1

import "time"
import "fmt"

func main() {
    c1 := make(chan string, 1)
    go func() {
        time.Sleep(time.Second * 2)
        c1 <- "result 1"
    }()

    select {
    case res := <-c1:
        fmt.Println(res)
    case <-time.After(time.Second * 1):
        fmt.Println("timeout 1")
    }
}

其实它利用的是time.After方法,它返回一个类型为<-chan Time的单向的channel,在指定的时间发送一个当前时间给返回的channel中。

Example 5 - Timers

timer是一个定时器,代表未来的一个单一事件,你可以告诉timer你要等待多长时间,它提供一个Channel,在将来的那个时间那个Channel提供了一个时间值。下面的例子中第二行会阻塞2秒钟左右的时间,直到时间到了才会继续执行。

timer1 := time.NewTimer(time.Second * 2)
<-timer1.C
fmt.Println("Timer 1 expired")

当然如果你只是想单纯的等待的话,可以使用time.Sleep来实现。

你还可以使用timer.Stop来停止计时器。

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Printf("start, current time: %v\n", time.Now())
	timer2 := time.NewTimer(1 * time.Second)
	go func() {
		<-timer2.C // after 1s, channel received here
		fmt.Printf("Timer 2 expired, current time: %v\n", time.Now())
	}()
	time.Sleep(5 * time.Second)

	fmt.Printf("check Timer 2 has stopped or not, current time: %v\n", time.Now())
	notStop := timer2.Stop() // return false if the timer has already expired or been stopped.
	if notStop {
		fmt.Println("Timer 2 stopped")
	}
}

如果注释掉 time.Sleep(5 * time.Second),output:

start, current time: 2021-05-25 23:28:06.155431 +0800 +08 m=+0.000060591
check Timer 2 has stopped or not, current time: 2021-05-25 23:28:06.155587 +0800 +08 m=+0.000216171
Timer 2 stopped

如果不注释掉 time.Sleep(5 * time.Second),output:

start, current time: 2021-05-25 23:31:36.563422 +0800 +08 m=+0.000061261
Timer 2 expired, current time: 2021-05-25 23:31:37.566743 +0800 +08 m=+1.003413541
check Timer 2 has stopped or not, current time: 2021-05-25 23:31:41.564029 +0800 +08 m=+5.000824104

Example 6 - Ticker

ticker是一个定时触发的计时器,它会以一个间隔(interval)往Channel发送一个事件(当前时间),而Channel的接收者可以以固定的时间间隔从Channel中读取事件。下面的例子中ticker每500毫秒触发一次,你可以观察输出的时间。

package main

import (
	"fmt"
	"time"
)

func main() {
	ticker := time.NewTicker(time.Millisecond * 500)
	go func() {
		for t := range ticker.C {
			fmt.Println("Tick at", t)
		}
	}()

	time.Sleep(2 * time.Second)
}

类似timer, ticker也可以通过Stop方法来停止。一旦它停止,接收者不再会从channel中接收数据了。

Output:

Tick at 2021-05-25 23:32:53.587043 +0800 +08 m=+0.500905900
Tick at 2021-05-25 23:32:54.088342 +0800 +08 m=+1.002220770
Tick at 2021-05-25 23:32:54.589596 +0800 +08 m=+1.503489999

未初始化的 Channel

  • 往为 nil 的 channel发送数据,一直会被阻塞。
  • 从一个 nil channel(未初始化的channel)中接收数据会一直被block。

在通常情况下,我们需要调用 make 来初始化 Channel。

如果我们不初始化 Channel,看看会发生什么:

package main

import "fmt"

func fibonacci(c, quit chan int) {
	x, y := 0, 1
	for {
		select {
		case c <- x:
			x, y = y, x+y
		case <-quit:
			fmt.Println("quit")
			return
		}
	}
}
func main() {
	//c := make(chan int)
	var c chan int
	go func() {
		c <- 1
	}()
	fmt.Println(<-c)
}

output:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive (nil chan)]:
main.main()
        /Users/shiwei/SW/GoPlayground/sw.go:23 +0x62

goroutine 6 [chan send (nil chan)]:
main.main.func1(0x0)
        /Users/shiwei/SW/GoPlayground/sw.go:21 +0x37
created by main.main
        /Users/shiwei/SW/GoPlayground/sw.go:20 +0x42

Process finished with exit code 2

Close Channel

向 closed 的 Channel 发送数据

如果channel c已经被 close,继续往它发送数据会导致panic: send on closed channel

这是make sense的,因为当我们 close Channel的时候,expectation是我们不会再往里面发送数据(虽然在 close Channel之后,我们仍然可以从 Channel 接收之前发送的数据),因而如果我们再向 Channel 中发送数据,就理应 panic。

package main

import "time"

func main() {
	go func() {
		time.Sleep(time.Hour)
	}()
	c := make(chan int, 10)
	c <- 1
	c <- 2
	close(c)
	c <- 3
}
$ /usr/local/go/bin/go build -o /private/var/folders/0g/klmrv6hx1vq0sx_yf1y6r6mm0000gn/T/___go_build_sw_go /Users/shiwei/SW/GoPlayground/sw.go #gosetup
/private/var/folders/0g/klmrv6hx1vq0sx_yf1y6r6mm0000gn/T/___go_build_sw_go
panic: send on closed channel

goroutine 1 [running]:
main.main()
        /Users/shiwei/SW/GoPlayground/sw.go:13 +0xb3

从 closed 的 Channel 中读取数据

但是,从这个 closed 的 channel 中,不但可以读取出在 close 之前已发送的数据,还可以不断的读取到零值(而且同时不会出现任何panic,只是读取到的零值没有意义罢了):

package main

import (
	"fmt"
)

func main() {
	c := make(chan int, 10)
	c <- 1
	c <- 2
	close(c)
	fmt.Println(<-c) //1
	fmt.Println(<-c) //2
	fmt.Println(<-c) //0
	fmt.Println(<-c) //0
}

ouput:

1
2
0
0

在 close Channel 时,Channel Receiver会受到通知

在 close Channel 时,Channel Receiver会受到通知,因而不会再被block:

package main

import (
	"fmt"
	"time"
)

func main() {
	jobs := make(chan int, 5)
	done := make(chan bool)

	go func() {
		for {
			j, more := <-jobs
			if more {
				fmt.Println("received job", j)
			} else {
				fmt.Println("received all jobs", time.Now())
				done <- true
				return
			}
		}
	}()

	for j := 1; j <= 3; j++ {
		jobs <- j
		fmt.Println("sent job", j, time.Now())
	}
	time.Sleep(10 * time.Second)
	close(jobs)
	fmt.Println("sent all jobs")

	<-done
}

我们来分析一下这个程序做了什么:

  • for j := 1; j <= 3; j++ 会发送3个 int 到 jobs Channel中

    • 在第一个 int还没被发送到 jobs Channel 之前,启动的goroutine会被block,因为 jobs Channel为 nil
    • 在第一个 int 被发送到 jobs Channel 之后,启动的goroutine不再被block,因而 print received job <job_i>

range

如果通过range读取,channel close 后for循环会跳出:

package main

import "fmt"

func main() {

    queue := make(chan string, 2)
    queue <- "one"
    queue <- "two"
    close(queue)

    // This range iterates over each element as it’s received from queue. Because we closed the channel above, the iteration terminates after receiving the 2 elements.
    for elem := range queue {
        fmt.Println(elem)
    }
}

This example also showed that it’s possible to close a non-empty channel (where there are data remaining values be received).

output:

$ go run range-over-channels.go
one
two

判断 Channel 是否已经 close

通过i, ok := <-c可以获取Channel的状态,即该Channel 是否已经被 close 且 Channel 中的数据均已经被读取,如果 ok 是 false,表明该 Channel 已经被 close ,且 Channel 中的数据均已经被读取。

// example 1
package main

import (
	"fmt"
)

func main() {
	c := make(chan int, 10)
	close(c)

	i, ok := <-c
	fmt.Printf("%d, %t", i, ok) //0, false
}

// example 2
func main() {
	ch := make(chan int, 100)
	ch <- 1 // won't be blocked here, as buffer is not full
	ch <- 2 // won't be blocked here, as buffer is not full
	a, b := <-ch
	fmt.Println(a, b)

	close(ch)
	c, d := <-ch
	fmt.Println(c, d)

	e, f := <-ch
	fmt.Println(e, f)
	//1 true
	//2 true
	//0 false
}

Example

Goroutine 同步

We can use channels to synchronize execution across goroutines. Here’s an example of using a blocking receive to wait for a goroutine to finish. When waiting for multiple goroutines to finish, you may prefer to use a WaitGroup.

import (
	"fmt"
	"time"
)

func worker(done chan bool) {
  // This is the function we’ll run in a goroutine. The done channel will be used to notify another goroutine that this function’s work is done.
  
	time.Sleep(time.Second)

	done <- true // Send a value to notify that we’re done.
}

func main() {
	done := make(chan bool, 1)
	go worker(done) // Start a worker goroutine, giving it the channel to notify on.

	<-done // Block until we receive a notification from the worker on the channel.
}

output:

$ go run channel-synchronization.go      
working...done  

Reference