概述

最近在做的一个项目,需要对大量数据进行一些基本的统计和处理,整个程序的思路很简单,但处理起来却很慢,特别是有二重循环的地方,龟速前进,眼看着16核32线程 的服务器只有一个线程被利用,束手无策。之前一直听说Go是一门对并发编程有很好的支持的语言,七牛的许牛大力推崇Go语言,于是就开始了对Go并行编程的探索之旅。

一些准备工作

在正式开始Go并行编程之前,首先我们需要准备Go的编程开发环境:Go编译器、Go编辑器。
golang的官网上有go的下载链接:
http://code.google.com/p/go/downloads/list
我选择的是go1.2.1.windows-amd64.msi安装包,下载完后直接点击该安装包,按照默认选项安装即可。安装完后,可以在命令行中查看go版本以检查是否安装成功:

1
2
C:\Users\Bill>go version
go version go1.2.1 windows/amd64
然后,我们需要安装一个用于Go开发的IDE。当然也可以用通用的文本编辑器就可以,比如notepad、vi等,但这都是一些轻量级的,对于初学者来说还是不太合适的。 网上有很多推荐的IDE,主要有LiteIDE、Eclipse+Goclipse、GoIde等,以及vim/emacs搭配go插件等,这里我选择的是LiteIDE,下载地址:
http://sourceforge.net/projects/liteide/files/x21/
下载安装后,我们就可以开始go编程了。可以先运行一个Hello world程序,这里就不演示了,具体见:Go 指南

关于Go的一些特性的介绍这里也不讲了,有兴趣的可以移步酷壳的这两篇文章Go 语言简介(上)— 语法Go 语言简介(下)— 特性。下面我们直接进入Go并发编程。

Go并发编程

#0x01.goroutine

优雅的并发编程范式,完善的并发支持,出色的并发性能是Go语言区别于其他语言的一大特色。在Go中,通过一种叫做goroutine的go协程这种轻量级线程来支持 并发编程范式。协程是比进程和线程更轻量级的线程,go语言标准库提供的所有系统调用操作都会出让CPU给其他goroutine,协程的切换管理不依赖于系统的线程和 进程,也不依赖于CPU的核心数量。下面我们来看一个简单的goroutine的实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import "fmt"

func main() {
  arr := [10]int{}
  for i := 0; i < 10; i++ {
      fmt.Print("Result of ", i, ":")
      go func() {
          arr[i] = i + i*i
          fmt.Println(arr[i])
      }()
  }
  fmt.Println("Done")
}
Go从main package的main()函数开始执行,这段代码的功能是计算arr[i]=i+i*i,其中i取值从0到9,每一个i计算完后立即输出。其中go为golang的关键字,启动 一个协程(goroutine)。程序的运行的结果是什么呢?我们期望的结果应该是这样的吧:
1
2
3
4
5
6
7
8
9
10
11
Result of 0:0
Result of 1:2
Result of 2:6
Result of 3:12
Result of 4:20
Result of 5:30
Result of 6:42
Result of 7:56
Result of 8:72
Result of 9:90
Done
可是在LiteIDE中运行后,却会发现结果是这样子的:
1
Result of 0:Result of 1:Result of 2:Result of 3:Result of 4:Result of 5:Result of 6:Result of 7:Result of 8:Result of 9:Done
这是为神马捏?怎么第9~12行的go func(){...}() 这段代码好像更笨就没有执行嘛?!莫非这是golang的bug?

其实,这与golang的程序执行顺序有关。go程序从初始化main package并执行main()函数开始,当main()函数返回时,程序退出,且程序并不等待其他goroutine (非主goroutine)结束。于是上面的程序中,主函数虽然启动了10gegoroutine,但都没来得及执行,程序就已经退出了。那么怎么解决这个问题捏?很显然,我们 在退出程序之前,需要判断这些创建的goroutine执行完了没。我们可以用一个全局变量来计数执行了的协程数,如果计数变量小于10,我们就等待或sleep。

#0x02.并发通讯

等一等,多个协程读写同一个变量,我们是不是需要对这个变量枷锁呀?答案是肯定的,我们可以采用类似与C/C++的线程通讯、数据共享的思路来实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
  "fmt"
  "runtime"
  "sync"
)

func main() {
  var cnt int = 0  // 全局计数器
  mylock := &sync.Mutex{}  // 互斥锁
  arr := [10]int{}
  for i := 0; i < 10; i++ {
      fmt.Print("Result of ", i, ":")
      go func() {
          arr[i] = i + i*i
          fmt.Println(arr[i])
          mylock.Lock() // 写之前枷锁
          cnt++
          mylock.Unlock() // 写之后释放锁
      }()
  }
  for {
      mylock.Lock() // 读之前枷锁
      temp := cnt
      mylock.Unlock() // 读之后释放锁
      runtime.Gosched() // 协程切换
      if temp >= 10 {
          break
      }
  }
  fmt.Println("Done")
}
执行以上程序,应该能够得到我们之前预期的那样的结果吧?可以运行之后,我们发现,结果却是这样的:
1
2
Result of 0:Result of 1:Result of 2:Result of 3:Result of 4:Result of 5:Result of 6:Result of 7:Result of 8:Result of 9:panic: runtime error: index out of range
...
出现了运行时错误,数组下表越界了?!第17行还是没有输出?!经过一番修改,得到如下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
  "fmt"
  "runtime"
  "sync"
)

func main() {
  var cnt int = 0  // 全局计数器
  mylock := &sync.Mutex{}  // 互斥锁
  arr := [11]int{}
  for i := 0; i < 10; i++ {
      go func() {
          arr[i] = i + i*i
          mylock.Lock() // 写之前枷锁
          cnt++
          mylock.Unlock() // 写之后释放锁
      }()
  }
  for {
      mylock.Lock() // 读之前枷锁
      temp := cnt
      mylock.Unlock() // 读之后释放锁
      runtime.Gosched() // 协程切换
      if temp >= 10 {
          break
      }
  }
  for i := 0; i < 11; i++ {
      fmt.Println("Result of ", i, ":", arr[i])
  }
  fmt.Println("Done")
}
这下应该能得到预期的结果了吧。可以运行程序后,有傻眼了,结果是:
1
2
3
4
5
6
7
8
9
10
11
12
Result of  0 : 0
Result of  1 : 0
Result of  2 : 0
Result of  3 : 0
Result of  4 : 0
Result of  5 : 0
Result of  6 : 0
Result of  7 : 0
Result of  8 : 0
Result of  9 : 0
Result of  10 : 110
Done
神马?arr[0]~arr[9]肿么都是0,肿么会冒出一个arr[10]=110?!即使把第15、16行顺序互换,得到的结果还是一样!!!这真是一场噩梦啊!!!仔细对比许书上的例子, 我们发现,这里使用的是匿名函数创建协程,匿名函数中使用了全局的变量,而每次使用go关键字创建协程后,程序不是继续往下执行,而是继续返回到for这一行来执行,首先i++, 然后判断i<10,成立就继续执行并创建协程,10个协程创建完了之后,才真正开始匿名函数的执行,而此时i已经是10了,对于每一个协程i都是10,因此最终只计算了一个arr[10]=110, 这也是为什么申请10个单位的数组时会出现运行时错误。而这完全不是我们所要的结果,那么怎么办呢?我们不妨用带参数的匿名函数来试试?如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
  "fmt"
  "runtime"
  "sync"
)

func main() {
  var cnt int = 0  // 全局计数器
  mylock := &sync.Mutex{}  // 互斥锁
  arr := [11]int{}
  for i := 0; i < 10; i++ {
      go func(i int) { // 这里的i是形参
          arr[i] = i + i*i
          mylock.Lock() // 写之前枷锁
          cnt++
          mylock.Unlock() // 写之后释放锁
      }(i) // 这里的i是实参
  }
  for {
      mylock.Lock() // 读之前枷锁
      temp := cnt
      mylock.Unlock() // 读之后释放锁
      runtime.Gosched() // 协程切换
      if temp >= 10 {
          break
      }
  }
  for i := 0; i < 11; i++ {
      fmt.Println("Result of ", i, ":", arr[i])
  }
  fmt.Println("Done")
}
终于,我们总算得到了想要的结果了,经过不懈努力终于大功告成。这里没创建一个协程,都有一个实参值传给形参,在匿名函数中就不存在共享的全局变量i了。 然而,在让我们回头看看我们的代码,却会发现,这与一般的多线程程序有什么区别的,而且写起来会感觉更麻烦,有过之而无不及啊!

想象一下,在一个大的系统中具有无数的锁、无数的共享变量、无数的业务逻辑与错误处理分支,那将是一场噩梦。这噩梦就是众多C/C++开发者正在经历的,其实Java和C#开发 者也好不到哪里去。Go语言既然以并发编程作为语言的最核心优势,当然不至于将这样的问题用这么无奈的方式来解决。Go语言提供的是另一种通信模型,即以消息机制而非共享 内存作为通信方式。

#0x03.channel

Go语言提供的消息通信机制被称为channel,接下来我们将详细介绍channel。现在,让我们用Go语言社区的那句著名的口号来开始这一小节:

不要通过共享内存来通信,而应该通过通信来共享内存。

channel是Go语言在语言级别提供的goroutine间的通信方式,可以使用channel在两个或多个goroutine之间传递消息。channel是进程内的通信方式,因此通过channel传递 对象的过程和调用函数时的参数传递行为比较一致,比如也可以传递指针等。如果需要跨进程通信,建议用分布式系统的方法来解决,比如使用Socket或者HTTP等通信协议。 channel是类型相关的。也就是说,一个channel只能传递一种类型的值,这个类型需要在声明channel时指定。我们先看下用channel的方式重写上面的例子是什么样子的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import "fmt"

func main() {
  chs := make([]chan int, 10) // 申请一个10维的channel数组
  arr := [11]int{}
  for i := 0; i < 10; i++ {
      chs[i] = make(chan int) // 对每个channel初始化
      go func(ch chan int, i int) {
          arr[i] = i + i*i
          ch <- 1  // 写channel,应该在函数体的最后一行,许书(P94,代码清单4-4)上有bug
      }(chs[i], i)
  }
  for _, ch := range chs {
      <-ch // 读channel
  }
  for i := 0; i < 11; i++ {
      fmt.Println("Result of ", i, ":", arr[i])
  }
  fmt.Println("Done")
}
在这段代码中,我们给每个goroutine分配了一个channel,在每个goroutine执行完时对该goroutine的channel进行写操作,而在第16行进行读操作,对于每个channel, 只有完成了写操作之后,才可以进行读取,否则会处于阻塞状态。因此在10个goroutine没有执行完之前,main函数是不会退出的。这样是不是比共享内存的方式更简单 而优雅呢?其实,我们还可以对代码继续进行简化,可以将匿名函数的传递的channel参数省去,而在匿名函数内部直接使用全局的chs[i],匿名函数可以修改为如下:
1
2
3
4
go func(i int) {
  arr[i] = i + i*i
  chs[i] <- 1  // 写channel
}(i)

到这里,go的并行编程基本已经入门了,关于channel的更多详细的用法可以参见参考资料。

参考资料

[1] golang 官方主页:http://golang.org/
[2] go语言编程,许世伟

Original Link: http://ibillxia.github.io/blog/2014/03/16/go-concurrent-programming-first-try/
Attribution - NON-Commercial - ShareAlike - Copyright © Bill Xia