您现在的位置是:首页 > 文章详情

goroutine and channel

日期:2019-04-07点击:255

[TOC]

1 goroutine

package main import ( "fmt" "time" ) func spinner() { for { for _, c := range `\|/` { fmt.Printf("\r%c", c) time.Sleep(100 * time.Millisecond) } } } func fabi(n int) int { if n < 2 { return n } return fabi(n-1) + fabi(n-2) } func main() { go spinner() res := fabi(45) fmt.Println(res) }

goroutine 是golang并发编程的概念。主routine就是main函数所在的routine,当主routine退出时,所有的其他routine会自动被退出。除了主routine和程序退出之外,没有其他办法能让一个routine退出。不过后续可以通过channel的方式通知routine自动退出。

2 并发的clock

并发的clock分为server和client,其中server监听某个端口,当有client连接后,server会向client每个1s发送time消息。

// client.go package main import ( "fmt" "io" "log" "net" "os" ) func main() { conn, err := net.Dial("tcp", "localhost:8000") if err != nil { log.Fatal("Failed to Dial") } defer conn.Close() mustCopy(os.Stdout, conn) } func mustCopy(w io.Writer, r io.Reader) { // opy copies from src to dst until either EOF is reached on src or an error occurs len, err := io.Copy(w, r) if err != nil { log.Fatal(err) } fmt.Printf("%d bytes are received from the server", len) }
// server.go package main import ( "flag" "io" "log" "net" "os" "strconv" "time" ) func main() { port := flag.Uint("port", 8000, "The port of the clock server") flag.Parse() listener, err := net.Listen("tcp", "localhost:"+strconv.Itoa(int(*port))) if err != nil { log.Fatal(err) } for { conn, err := listener.Accept() if err != nil { log.Print(err) continue } defer conn.Close() go handleConn(conn) } } func handleConn(c net.Conn) { defer c.Close() for { timeStr := os.Getenv("TZ") + "--" + time.Now().Format("15:04:05\n") _, err := io.WriteString(c, timeStr) if err != nil { return } time.Sleep(1 * time.Second) } }
//clockWall.go 见习题8.1 package main import ( "fmt" "io" "log" "net" "os" "strings" "time" ) func main() { for _, cityAndServer := range os.Args[1:] { tmp := strings.Split(cityAndServer, "=") if len(tmp) != 2 { fmt.Println("The input is incorrect!") continue } city := tmp[0] server := tmp[1] fmt.Printf("city:%s -- server:%s\n", city, server) go clientDial(city, server) } // 这儿sleep的原因是如果不sleep的话,main routine走到这就直接退出了,从而导致所有的子routine也一起退出 time.Sleep(100 * time.Minute) } func clientDial(city string, server string) { conn, err := net.Dial("tcp", server) if err != nil { fmt.Printf("Failed tp Dial!err:%s", err.Error()) return } fmt.Println("Dial success") defer conn.Close() copyData(os.Stdout, conn) } func copyData(w io.Writer, r io.Reader) { if _, err := io.Copy(w, r); err != nil { log.Fatal(err) } }

3 Channel

Channel是routine间的通信机制。

chan := make(chan int) // the type of chan is chan int. 无缓冲的channel chan := make(chan int, 3) // 带缓冲的channel

注意上面的chan的type是chan int. 同map一样,chan的传递也是引用传递,即赋值和参数赋值都是传递的引用,也就是说所有的对象其实都指向同一个底层的对象。
channel的发送和接收都比较简单。

chan := make(chan int) chan <- struct // 发送 i := <-chan // 接收

chan还支持close操作。不能在一个close的channel上进行发送操作,否则会产生异常。但是在接收端还可以接收已经发送成功的数据,当数据传输完成后,会得到1个零值。

带缓冲的channel

package main import ( "fmt" "io" "log" "net" "os" ) func main() { conn, err := net.Dial("tcp", "localhost:8080") if err != nil { log.Fatalf("Failed to Dial!err:%s", err.Error()) } defer conn.Close() done := make(chan struct{}) // chan struct{} 是个类型,不是两个入参 go func() { io.Copy(os.Stdout, conn) fmt.Println("done") done <- struct{}{} }() mustCopy(conn, os.Stdin) conn.Close() <-done } func mustCopy(dst io.Writer, src io.Reader) { if _, err := io.Copy(dst, src); err != nil { log.Fatal("Failed to copy") } }

如下实现了main groutine 和 子groutine间的通信,当子groutine结束时,需要通知main groutine也stop掉。
对于无缓冲的channel,在<-done 会阻塞住, 直到发送端发送消息。

串联的channel

想实现如上的功能,分为3部分:

  • counter-生成数值
  • squarer - 平方数值
  • printer - 打印
    所以会用到两个channel:natural and squares.

所以,见如下code实现(ver1)

//ver1 package main import ( "fmt" "time" ) func main() { naturals := make(chan uint) squares := make(chan uint) // squares go func() { x := <-naturals fmt.Printf("Squares rcv: %d\n", x) squares <- x * x fmt.Printf("Squares send %d\n", x*x) }() // printer go func() { val := <-squares fmt.Printf("Printer rcv: %d\n", val) }() i := uint(0) for { i++ naturals <- i time.Sleep(1 * time.Second) // 每个1s产生1个数值 } }

但是run之后,会报如下的error

fatal error: all goroutines are asleep - deadlock!

|

In https://golang.org/ref/spec#Program_execution, A complete program is created by linking a single, unimported package called the main package with all the packages it imports, transitively. The main package must have package name main and declare a function mainthat takes no arguments and returns no value.
func main() { … }
Program execution begins by initializing the main package and then invoking the function main. When that function invocation returns, the program exits. It does not wait for other (non-main) goroutines to complete.

上述错误的原因是在子 goroutine中没有for语句,run一遍之后就自动推出了。所以,当执行完第一遍之后,就只剩下main groutine了。此时naturals <-i没有接收方了,所以就肯定会阻塞这里,即dead lock.
关于channel发送和接收的顺序,其实两者都是阻塞住的,也就是接收端会一直阻塞住直到收到一个消息,发送方会一直阻塞住直到消息被接收端接收。对于无缓存channel,发送成功和接收成功是同时发生的。比如

package main import ( "log" "time" ) func main() { ch := make(chan int) go func() { for { val := <-ch log.Printf("Rcv:%d\n", val) time.Sleep(2 * time.Second) } }() i := 0 for { i++ ch <- i log.Printf("Send %d\n", i) time.Sleep(1 * time.Second) } }

从输出可以看出,尽管发生和接受的sleep时间不一致,但是在输出的时刻是一致的。

2019/03/09 22:35:56 Rcv:1 2019/03/09 22:35:56 Send 1 2019/03/09 22:35:58 Rcv:2 2019/03/09 22:35:58 Send 2 2019/03/09 22:36:00 Rcv:3 2019/03/09 22:36:00 Send 3

上述是发送无尽个数值,假如想发送有限个数列呢 ?其实问题就在于怎么让后面的Squarer,Printer知道数列已经发送完成了?可以通过close(naturals)来实现

//ver1 package main import ( "fmt" "time" ) func main() { naturals := make(chan uint, 10) squares := make(chan uint, 10) //naturals go func() { for i := uint(0); i < 10; i++ { naturals <- i time.Sleep(1 * time.Second) } close(naturals) }() // squares go func() { for { x := <-naturals fmt.Printf("Squares rcv: %d\n", x) squares <- x * x fmt.Printf("Squares send %d\n", x*x) } }() // printer for { val := <-squares fmt.Printf("Printer rcv: %d\n", val) } }

当1个channel被close之后,在通过这个channel发送会产生一个pannic, 同时接收者也不会阻塞,它会立刻返回1个0值 。所以,上述的Suqares and Printer会一直打印0值。

Printer rcv: 0 Printer rcv: 0 Printer rcv: 0 Printer rcv: 0 Squares send 0 Squares rcv: 0 Squares send 0 Squares rcv: 0

那么有没办法让接收端知道channel是否已经被close ? 有!

//ver1 package main import ( "fmt" "time" ) func main() { naturals := make(chan uint, 10) squares := make(chan uint, 10) //naturals go func() { for i := uint(0); i < 5; i++ { naturals <- i time.Sleep(1 * time.Second) } close(naturals) }() // squares go func() { for { x, ok := <-naturals if !ok { fmt.Println("naturals has been closed") break } fmt.Printf("Squares rcv: %d\n", x) squares <- x * x fmt.Printf("Squares send %d\n", x*x) } }() // printer for { val, ok := <-squares if !ok { fmt.Println("the squares has been closed!") break } fmt.Printf("Printer rcv: %d\n", val) } }

在接收端我们可以通过增加一个参数ok, 来判断channel是否被close。true表示接收值成功,false表示channel已经被close,并且没有值可以读取。
但是在如上的code,还是会输出

fatal error: all goroutines are asleep - deadlock!

原因是在line27,break会退出子groutine,但是channel square并没有被close。此时在line38 依然会等待读取,但此时squares已经没有send了。所以就会形成dead lock
修改方式是:

//ver1 package main import ( "fmt" "time" ) func main() { naturals := make(chan uint, 10) squares := make(chan uint, 10) //naturals go func() { for i := uint(0); i < 5; i++ { naturals <- i time.Sleep(1 * time.Second) } close(naturals) }() // squares go func() { // 在此处添加一个defer close defer close(squares) for { x, ok := <-naturals if !ok { fmt.Println("naturals has been closed") break } fmt.Printf("Squares rcv: %d\n", x) squares <- x * x fmt.Printf("Squares send %d\n", x*x) } }() // printer for { val, ok := <-squares if !ok { fmt.Println("the squares has been closed!") break } fmt.Printf("Printer rcv: %d\n", val) } }

更优雅的用法是range,range依次从channel中读取数据,当channel被关闭且没有数据时跳出。

//ver1 package main import ( "fmt" "time" ) func main() { naturals := make(chan uint, 10) squares := make(chan uint, 10) //naturals go func() { defer close(naturals) for i := uint(0); i < 10; i++ { naturals <- i time.Sleep(1 * time.Second) } }() // squares go func() { // 在此处添加一个defer close defer close(squares) for x := range naturals { fmt.Printf("squares rcv: %d\n", x) squares <- x * x } fmt.Println("naturals has been closed") }() // printer for x := range squares { fmt.Printf("printer rcv: %d\n", x) } fmt.Println("squares has been closed") }

其实你并不需要关闭每一个channel。只要当需要告诉接收者goroutine,所有的数据已经全部发送时才需要关闭channel。不管一个channel是否被关闭,当它没有被引用时将会被Go语言
的垃圾自动回收器回收。(不要将关闭一个打开文件的操作和关闭一个channel操作混淆。对于每个打开的文件,都需要在不使用的使用调用对应的Close方法来关闭文件。)

单方向的channel

单方向的channel也就是只能发送或只能接收的channel。比如说,上文的counter的channel只需要发送功能,printer只负责接收功能,为了code的安全性,所以有单方向的channel的设计。

package main import ( "fmt" "time" ) func counter(out chan<-int) { defer close(out) for i:= 0; i < 10; i++ { out <-i time.Sleep(1*time.Second) } fmt.Println("counter is closed") } func square(in <-chan int, out chan <- int) { defer close(out) for x := range in { out <- x * x } fmt.Println("square is closed") } func printer(in <-chan int) { for x := range in { fmt.Printf("Rcv: %d\n", x) } } func main() { naturals := make(chan int) squares := make(chan int) go counter(naturals) // 将chan int隐式转换为<-chan int(send) go square(naturals, squares) // 将chan int隐式转换为chan<-int(rcv), <-chan int(send) printer(squares) }

带缓冲的channel

带缓冲的channel实际上是将发送和接收解耦了,可以在无人接收的同时进行发生。效率高一些。
无缓冲的channel在同步性上更高一点。

package main import ( "fmt" "time" ) func main() { done := make(chan int, 3) go func() { time.Sleep(910 * time.Microsecond) done <- 3 }() go func() { time.Sleep(890 * time.Microsecond) done <- 1 }() go func() { time.Sleep(900 * time.Microsecond) done <- 2 }() fmt.Println(<-done) }

输出为1.

4 并发的循环

现在想调用gopl.io的thumbnail包来处理图片,gopl的相关定义是:

package thumbnail // ImageFile reads an image from infile and writes // a thumbnail-size version of it in the same directory. // It returns the generated file name, e.g. "foo.thumb.jpeg". func ImageFile(infile string) (string, error) { ext := filepath.Ext(infile) // e.g., ".jpg", ".JPEG" outfile := strings.TrimSuffix(infile, ext) + ".thumb" + ext return outfile, ImageFile2(outfile, infile) }
  • 首先能想到的就是串行的处理方式
func makeThumbnails(filenames []string) { for _, f := range filenames { thumbnail.ImageFile(f) } }
  • 但是考虑到效率,现在使用并行的方式来处理(incorrect code)
func makeThumbnails1(filenames []string) { for _, filename := range filenames { go thumbnail.ImageFile(filename) } }

上述的实现是不正确的,因为在makeThumbnails执行完之后,main goroutine就自动退出了, 不会等待worker goroutine执行完。

  • 那么怎么让main groutine知道worker goroutine执行完了呢?方法就是channel
func makeThumbnails3(filenames []string) { done := make(chan struct{}) for _, f :=range filenames { go func() { log.Printf("Start process image %s\n", f) _, err := thumbnail.ImageFile(f) if err != nil { log.Println(err) } done <- struct{}{} }() } for range filenames { <-done } }

run 之后,发现输出有问题

PS D:\07-go\src\gopl> .\main.exe .\1.jpg .\2.jpg .\3.jpg 2019/03/11 21:58:39 Start process image .\2.jpg 2019/03/11 21:58:39 Start process image .\3.jpg 2019/03/11 21:58:39 Start process image .\3.jpg

执行了2次3.jpg, 为什么呢?其实在前面讲过,函数字面量会保存f的值。也就是如下的逻辑:
1) goroutine 1: 执行到log.Printf, 此时f=1.jpg, 还为进行ImageFile.
2)goroutine 2: 此时f变为2.jpg, 当goroutine1 执行ImageFile时,就变成了执行2.jpg的变换。
3) 其实这也就是goroutine间的共享变量的问题。

  • 应该采用显式传入filename的方式
func makeThumbnails3(filenames []string) { done := make(chan struct{}) for _, f :=range filenames { go func(infile string) { log.Printf("Start process image %s\n", infile) _, err := thumbnail.ImageFile(infile) if err != nil { log.Println(err) } done <- struct{}{} }(f) } for range filenames { <-done } }
  • 假如想实现遇到第一个error之后,直接退出并打印error的功能呢?
func makeThumbnails4(filenames []string) error{ errCh := make(chan error) for _, f := range filenames { go func(infile string) { log.Println("Start processing %s", infile) _, err := thumbnail.ImageFile(infile) errCh <- err }(f) } for range filenames { if err := <- errCh; err != nil { log.Println(err) return err } } return nil }

但是这里有个bug,当make函数遇到第1个error退出后,此时对于channel errCh就没有接收者了。但是对于其他仍在run的worker goroutine会block在发送端,从而导致goroutine泄漏。
可能会导致整个程序的卡住或者out of memory的错误

  • 解决办法就是采用带缓冲的channel。 我们在前面也说过,带缓冲的channel可以去掉channel发送和接收端的耦合。
func makeThumbnails4(filenames []string) error{ errCh := make(chan error, len(filenames)) for _, f := range filenames { go func(infile string) { log.Println("Start processing %s", infile) _, err := thumbnail.ImageFile(infile) errCh <- err }(f) } for range filenames { if err := <- errCh; err != nil { log.Println(err) return err } } return nil }
  • 假如我们想实现多个goroutine,但是有不知道goroutine的个数,应该怎么管理goroutine呢?又怎么能知道start了多少个goroutine,然后又close了多少goroutine呢?
    解决办法就是Sync.WaitGroup
package main import ( "fmt" "gopl.io/ch8/thumbnail" "log" "os" "sync" ) func makeThumbnails(filenames []string) { for _, f := range filenames { thumbnail.ImageFile(f) } } func makeThumbnails1(filenames []string) { for _, filename := range filenames { go thumbnail.ImageFile(filename) } } func makeThumbnails3(filenames []string) { done := make(chan struct{}) for _, f :=range filenames { go func(infile string) { log.Printf("Start process image %s\n", infile) _, err := thumbnail.ImageFile(infile) if err != nil { log.Println(err) } done <- struct{}{} }(f) } for range filenames { <-done } } func makeThumbnails4(filenames []string) error{ errCh := make(chan error, len(filenames)) for _, f := range filenames { go func(infile string) { log.Println("Start processing %s", infile) _, err := thumbnail.ImageFile(infile) errCh <- err }(f) } for range filenames { if err := <- errCh; err != nil { log.Println(err) return err } } return nil } func makeThumbnails6(filenames []string) int64 { ch := make(chan int64) var wg sync.WaitGroup for _, f := range filenames { wg.Add(1) go func(infile string) { defer wg.Done() f, _ := thumbnail.ImageFile(infile) info, _ := os.Stat(f) ch <- info.Size() }(f) } go func() { wg.Wait() close(ch) }() var total int64 for size := range ch { total += size } return total } func main() { filenames := os.Args[1:] sizes := makeThumbnails6(filenames) fmt.Printf("The sizes of files is %d\n", sizes) }

如上实现了统计变换后图片的大小。
那么现在就有个问题,为什么要把wg.Wait放到goroutine里?
假如不放到goroutine中,就需要考虑放在for total的前面还是后面?
假如放在循环之前,则程序永远都不会结束。因为对于ch走不到接收的地方,则send端会一直block住,从而形成dead lock.
如果放在之后,则也会形成dead lock, 因为worker goroutine发送完之后就退出了,就永远没有send了,则ch则会一直block在接收端。
因此,就需要将close操作也放到一个额外的goroutine中。
上面的程序代码结构, 是当我们使用并发循环,但又不知道迭代次数时很通常而且很地道的写法。

基于select的多路复用

package main import ( "fmt" "os" "time" ) func launch() { fmt.Println("LAUNCH") } func main() { abort := make(chan struct{}) go func() { os.Stdin.Read(make([]byte, 1)) abort <- struct{}{} }() tick := time.Tick(1*time.Second) for countdown := 10; countdown > 0; countdown-- { fmt.Println(countdown) select { case <-abort: fmt.Println("Rcv abort signal") return case <-tick: fmt.Println("Time out") } } launch() }
package main import ( "fmt" "time" ) func main() { ch := make(chan int, 1) // 注意这里的size是1 for i := 0; i < 10; i++ { select { case ch <- i: fmt.Printf("Send %d\n", i) case x := <-ch: fmt.Printf("Rcv %d\n", x) } time.Sleep(1*time.Second) } }

当channel size是1,输出永远是偶数,原因自己考虑吧。
那么当channel >1时,结果就不确定了,因为在select的多个case都满足时,就会自动选择其中1个。

原文链接:https://yq.aliyun.com/articles/697219
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章