【Golang】生产者消费者问题(Producer–consumer Problem)

Posted by 西维蜀黍 on 2021-07-29, Last Modified on 2021-07-29

不需要close

Sequential Producer

package main

import (
	"fmt"
	"time"
)

type producer struct {
	data chan int
}

func main() {
	prod := &producer{
		data: make(chan int, 10),
	}

	// producer
	go func() {
		var i int32 = 0
		for {
			i = calculateNextInt(i)
			prod.data <- int(i)
		}
	}()

	// consumer
	for {
		a := <-prod.data
		fmt.Printf("consume %v\n", a)
	}
}

func calculateNextInt(prev int32) int32 {
	time.Sleep(1 * time.Second) // pretend this is an expensive operation
	return prev + 1
}

Parallel Producer

package main

import (
	"fmt"
	"sync/atomic"
	"time"
)

const ProducerNum = 5

type producer struct {
	data chan int
	quit chan chan error
}

func (p *producer) Close() error {
	ch := make(chan error)
	p.quit <- ch
	return <-ch
}

func main() {
	prod := &producer{
		data: make(chan int, 10),
	}

	// producer
	var i int32 = 0
	var processCnt = 0
	for processCnt < ProducerNum {
		go func() {
			for {
				j := atomic.AddInt32(&i, 1)
				calculateNextInt()
				prod.data <- int(j)
			}
		}()
		processCnt++
	}

	// consumer
	for {
		a := <-prod.data
		fmt.Printf("consume %v\n", a)

	}
}
func calculateNextInt() {
	time.Sleep(1 * time.Second) // pretend this is an expensive operation
}

需要close

package main

import (
   "fmt"
   "time"
)

type producer struct {
   data chan int
   quit chan chan error
}

func (p *producer) Close() error {
   ch := make(chan error)
   p.quit <- ch
   return <-ch
}

func main() {
   prod := &producer{
      data: make(chan int),
      quit: make(chan chan error),
   }

   // producer
   go func() {
      var i = 0
      for {
         i = calculateNextInt(i)
         select {
         case prod.data <- i:
         case ch := <-prod.quit:
            close(prod.data)
            // If the producer had an error while shutting down,
            // we could write the error to the ch channel here.
            close(ch)
            return
         }
      }
   }()

   // consumer
   for i := range prod.data {
      fmt.Printf("i=%v\n", i)
      if i >= 5 {
         err := prod.Close()
         if err != nil {
            // cannot happen in this example
            fmt.Printf("unexpected error: %v\n", err)
         }
      }
   }
}
func calculateNextInt(prev int) int {
   time.Sleep(1 * time.Second) // pretend this is an expensive operation
   return prev + 1
}

Reference