不需要close
Sequential Producer
package main
import (
"fmt"
"time"
)
type producer struct {
data chan int
}
func main() {
prod := &producer{
data: make(chan int, 10),
}
// producer
go func() {
var i int32 = 0
for {
i = calculateNextInt(i)
prod.data <- int(i)
}
}()
// consumer
for {
a := <-prod.data
fmt.Printf("consume %v\n", a)
}
}
func calculateNextInt(prev int32) int32 {
time.Sleep(1 * time.Second) // pretend this is an expensive operation
return prev + 1
}
Parallel Producer
package main
import (
"fmt"
"sync/atomic"
"time"
)
const ProducerNum = 5
type producer struct {
data chan int
quit chan chan error
}
func (p *producer) Close() error {
ch := make(chan error)
p.quit <- ch
return <-ch
}
func main() {
prod := &producer{
data: make(chan int, 10),
}
// producer
var i int32 = 0
var processCnt = 0
for processCnt < ProducerNum {
go func() {
for {
j := atomic.AddInt32(&i, 1)
calculateNextInt()
prod.data <- int(j)
}
}()
processCnt++
}
// consumer
for {
a := <-prod.data
fmt.Printf("consume %v\n", a)
}
}
func calculateNextInt() {
time.Sleep(1 * time.Second) // pretend this is an expensive operation
}
需要close
package main
import (
"fmt"
"time"
)
type producer struct {
data chan int
quit chan chan error
}
func (p *producer) Close() error {
ch := make(chan error)
p.quit <- ch
return <-ch
}
func main() {
prod := &producer{
data: make(chan int),
quit: make(chan chan error),
}
// producer
go func() {
var i = 0
for {
i = calculateNextInt(i)
select {
case prod.data <- i:
case ch := <-prod.quit:
close(prod.data)
// If the producer had an error while shutting down,
// we could write the error to the ch channel here.
close(ch)
return
}
}
}()
// consumer
for i := range prod.data {
fmt.Printf("i=%v\n", i)
if i >= 5 {
err := prod.Close()
if err != nil {
// cannot happen in this example
fmt.Printf("unexpected error: %v\n", err)
}
}
}
}
func calculateNextInt(prev int) int {
time.Sleep(1 * time.Second) // pretend this is an expensive operation
return prev + 1
}
Reference