【Golang】 设计模式 - Pipeline

Posted by 西维蜀黍 on 2021-10-03, Last Modified on 2023-08-23

Pipeline Pattern

Pipeline这种技术在可以很容易的把代码按单一职责的原则拆分成多个高内聚低耦合的小模块,然后可以很方便地(通过channel)拼装起来去完成比较复杂的功能。

Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines

  • receive values from upstream via inbound channels
  • perform some function on that data, usually producing new values
  • send values downstream via outbound channels

Each stage has any number of inbound and outbound channels, except the first and last stages, which have only outbound or inbound channels, respectively. The first stage is sometimes called the source or producer; the last stage, the sink or consumer.

首先,我们需一个 echo()函数,其会把一个整型数组放到一个Channel中,并返回这个Channel

func echo(nums []int) <-chan int {
  out := make(chan int)
  go func() {
    for _, n := range nums {
      out <- n
    }
    close(out)
  }()
  return out
}

然后,我们依照这个模式,我们可以写下这个函数。

平方函数

func sq(in <-chan int) <-chan int {
  out := make(chan int)
  go func() {
    for n := range in {
      out <- n * n
    }
    close(out)
  }()
  return out
}

过滤奇数函数

func odd(in <-chan int) <-chan int {
  out := make(chan int)
  go func() {
    for n := range in {
      if n%2 != 0 {
        out <- n
      }
    }
    close(out)
  }()
  return out
}

求和函数

func sum(in <-chan int) <-chan int {
  out := make(chan int)
  go func() {
    var sum = 0
    for n := range in {
      sum += n
    }
    out <- sum
    close(out)
  }()
  return out
}

然后,我们的用户端的代码如下所示:(注:**你可能会觉得,sum()odd()sq()太过于相似。

var nums = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
for n := range sum(sq(odd(echo(nums)))) {
  fmt.Println(n)
}

上面的代码类似于我们执行了Unix/Linux命令: echo $nums | sq | sum

同样,如果你不想有那么多的函数嵌套,你可以使用一个代理函数来完成。

type EchoFunc func ([]int) (<- chan int) 
type PipeFunc func (<- chan int) (<- chan int) 
func pipeline(nums []int, echo EchoFunc, pipeFns ... PipeFunc) <- chan int {
  ch  := echo(nums)
  for i := range pipeFns {
    ch = pipeFns[i](ch)
  }
  return ch
}

然后,就可以这样做了:

var nums = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}    
for n := range pipeline(nums, gen, odd, sq, sum) {
    fmt.Println(n)
  }

Fan-in/Out

动用Go语言的 Go Routine和 Channel还有一个好处,就是可以写出1对多,或多对1的pipeline,也就是Fan In/ Fan Out。

Fan-in

A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed. This is called fan-in.

Example 1

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case s := <-input1:  c <- s
            case s := <-input2:  c <- s
            }
        }
    }()
    return c
}

func main() {
    c := fanIn(boring("Joe"), boring("Ann"))
    for i := 0; i < 10; i++ {
        fmt.Println(<-c)
    }
    fmt.Println("You're both boring; I'm leaving.")
}

func boring(msg string) <-chan string { // Returns receive-only channel of strings.
    c := make(chan string)
    go func() { // We launch the goroutine from inside the function.
        for i := 0; ; i++ {
            c <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }()
    return c // Return the channel to the caller.
}

Example 2

我们想通过并发的方式来对一个很长的数组中的质数进行求和运算,我们想先把数组分段求和,然后再把其集中起来。

package main

import (
	"fmt"
	"math"
	"sync"
)

func makeRange(min, max int) []int {
	a := make([]int, max-min+1)
	for i := range a {
		a[i] = min + i
	}
	return a
}
func main() {
	nums := makeRange(1, 10000)
	in := echo(nums)
	const nProcess = 5
	var chans [nProcess]<-chan int
	for i := range chans {
		chans[i] = sum(prime(in))
	}
	for n := range sum(merge(chans[:])) {
		fmt.Println(n)
	}
}

func is_prime(value int) bool {
	for i := 2; i <= int(math.Floor(float64(value)/2)); i++ {
		if value%i == 0 {
			return false
		}
	}
	return value > 1
}
func prime(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			if is_prime(n) {
				out <- n
			}
		}
		close(out)
	}()
	return out
}

func merge(cs []<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)
	wg.Add(len(cs))
	for _, c := range cs {
		go func(c <-chan int) {
			for n := range c {
				out <- n
			}
			wg.Done()
		}(c)
	}
	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

我们可以看到,

  • 我们先制造了从1到10000的一个数组,
  • 然后,把这堆数组全部 echo到一个channel里 – in
  • 此时,生成 5 个 Channel,然后都调用 sum(prime(in)) ,于是每个Sum的Go Routine都会开始计算和
  • 最后再把所有的结果再求和拼起来,得到最终的结果。

用图片表示一下,整个程序的结构如下所示:

Fan-out

Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.

Reference