概述
最近在做的一个项目,需要对大量数据进行一些基本的统计和处理,整个程序的思路很简单,但处理起来却很慢,特别是有二重循环的地方,龟速前进,眼看着16核32线程 的服务器只有一个线程被利用,束手无策。之前一直听说Go是一门对并发编程有很好的支持的语言,七牛的许牛大力推崇Go语言,于是就开始了对Go并行编程的探索之旅。
一些准备工作
在正式开始Go并行编程之前,首先我们需要准备Go的编程开发环境:Go编译器、Go编辑器。 在golang的官网上有go的下载链接: http://code.google.com/p/go/downloads/list 我选择的是go1.2.1.windows-amd64.msi安装包,下载完后直接点击该安装包,按照默认选项安装即可。安装完后,可以在命令行中查看go版本以检查是否安装成功: ``` C:\Users\Bill>go version go version go1.2.1 windows/amd64 ``` 然后,我们需要安装一个用于Go开发的IDE。当然也可以用通用的文本编辑器就可以,比如notepad、vi等,但这都是一些轻量级的,对于初学者来说还是不太合适的。 网上有很多推荐的IDE,主要有LiteIDE、Eclipse+Goclipse、GoIde等,以及vim/emacs搭配go插件等,这里我选择的是LiteIDE,下载地址: http://sourceforge.net/projects/liteide/files/x21/ 下载安装后,我们就可以开始go编程了。可以先运行一个Hello world程序,这里就不演示了,具体见:Go 指南
关于Go的一些特性的介绍这里也不讲了,有兴趣的可以移步酷壳的这两篇文章Go 语言简介(上)— 语法 和 Go 语言简介(下)— 特性。下面我们直接进入Go并发编程。
Go并发编程
#0x01.goroutine
优雅的并发编程范式,完善的并发支持,出色的并发性能是Go语言区别于其他语言的一大特色。在Go中,通过一种叫做goroutine的go协程这种轻量级线程来支持 并发编程范式。协程是比进程和线程更轻量级的线程,go语言标准库提供的所有系统调用操作都会出让CPU给其他goroutine,协程的切换管理不依赖于系统的线程和 进程,也不依赖于CPU的核心数量。下面我们来看一个简单的goroutine的实例。 ``` go package main import "fmt" func main() { arr := [10]int{} for i := 0; i < 10; i++ { fmt.Print("Result of ", i, ":") go func() { arr[i] = i + i*i fmt.Println(arr[i]) }() } fmt.Println("Done") } ``` Go从main package的main()函数开始执行,这段代码的功能是计算arr[i]=i+i*i,其中i取值从0到9,每一个i计算完后立即输出。其中go为golang的关键字,启动 一个协程(goroutine)。程序的运行的结果是什么呢?我们期望的结果应该是这样的吧: ``` Result of 0:0 Result of 1:2 Result of 2:6 Result of 3:12 Result of 4:20 Result of 5:30 Result of 6:42 Result of 7:56 Result of 8:72 Result of 9:90 Done ``` 可是在LiteIDE中运行后,却会发现结果是这样子的: ``` Result of 0:Result of 1:Result of 2:Result of 3:Result of 4:Result of 5:Result of 6:Result of 7:Result of 8:Result of 9:Done ``` 这是为神马捏?怎么第9~12行的go func(){...}() 这段代码好像更笨就没有执行嘛?!莫非这是golang的bug?
其实,这与golang的程序执行顺序有关。go程序从初始化main package并执行main()函数开始,当main()函数返回时,程序退出,且程序并不等待其他goroutine (非主goroutine)结束。于是上面的程序中,主函数虽然启动了10gegoroutine,但都没来得及执行,程序就已经退出了。那么怎么解决这个问题捏?很显然,我们 在退出程序之前,需要判断这些创建的goroutine执行完了没。我们可以用一个全局变量来计数执行了的协程数,如果计数变量小于10,我们就等待或sleep。
#0x02.并发通讯
等一等,多个协程读写同一个变量,我们是不是需要对这个变量枷锁呀?答案是肯定的,我们可以采用类似与C/C++的线程通讯、数据共享的思路来实现,如下: ``` go package main import ( "fmt" "runtime" "sync" ) func main() { var cnt int = 0 // 全局计数器 mylock := &sync.Mutex{} // 互斥锁 arr := [10]int{} for i := 0; i < 10; i++ { fmt.Print("Result of ", i, ":") go func() { arr[i] = i + i*i fmt.Println(arr[i]) mylock.Lock() // 写之前枷锁 cnt++ mylock.Unlock() // 写之后释放锁 }() } for { mylock.Lock() // 读之前枷锁 temp := cnt mylock.Unlock() // 读之后释放锁 runtime.Gosched() // 协程切换 if temp >= 10 { break } } fmt.Println("Done") } ``` 执行以上程序,应该能够得到我们之前预期的那样的结果吧?可以运行之后,我们发现,结果却是这样的: ``` Result of 0:Result of 1:Result of 2:Result of 3:Result of 4:Result of 5:Result of 6:Result of 7:Result of 8:Result of 9:panic: runtime error: index out of range ... ``` 出现了运行时错误,数组下表越界了?!第17行还是没有输出?!经过一番修改,得到如下代码: ``` go package main import ( "fmt" "runtime" "sync" ) func main() { var cnt int = 0 // 全局计数器 mylock := &sync.Mutex{} // 互斥锁 arr := [11]int{} for i := 0; i < 10; i++ { go func() { arr[i] = i + i*i mylock.Lock() // 写之前枷锁 cnt++ mylock.Unlock() // 写之后释放锁 }() } for { mylock.Lock() // 读之前枷锁 temp := cnt mylock.Unlock() // 读之后释放锁 runtime.Gosched() // 协程切换 if temp >= 10 { break } } for i := 0; i < 11; i++ { fmt.Println("Result of ", i, ":", arr[i]) } fmt.Println("Done") } ``` 这下应该能得到预期的结果了吧。可以运行程序后,有傻眼了,结果是: ``` Result of 0 : 0 Result of 1 : 0 Result of 2 : 0 Result of 3 : 0 Result of 4 : 0 Result of 5 : 0 Result of 6 : 0 Result of 7 : 0 Result of 8 : 0 Result of 9 : 0 Result of 10 : 110 Done ``` 神马?arr[0]~arr[9]肿么都是0,肿么会冒出一个arr[10]=110?!即使把第15、16行顺序互换,得到的结果还是一样!!!这真是一场噩梦啊!!!仔细对比许书上的例子, 我们发现,这里使用的是匿名函数创建协程,匿名函数中使用了全局的变量,而每次使用go关键字创建协程后,程序不是继续往下执行,而是继续返回到for这一行来执行,首先i++, 然后判断i<10,成立就继续执行并创建协程,10个协程创建完了之后,才真正开始匿名函数的执行,而此时i已经是10了,对于每一个协程i都是10,因此最终只计算了一个arr[10]=110, 这也是为什么申请10个单位的数组时会出现运行时错误。而这完全不是我们所要的结果,那么怎么办呢?我们不妨用带参数的匿名函数来试试?如下: ``` go package main import ( "fmt" "runtime" "sync" ) func main() { var cnt int = 0 // 全局计数器 mylock := &sync.Mutex{} // 互斥锁 arr := [11]int{} for i := 0; i < 10; i++ { go func(i int) { // 这里的i是形参 arr[i] = i + i*i mylock.Lock() // 写之前枷锁 cnt++ mylock.Unlock() // 写之后释放锁 }(i) // 这里的i是实参 } for { mylock.Lock() // 读之前枷锁 temp := cnt mylock.Unlock() // 读之后释放锁 runtime.Gosched() // 协程切换 if temp >= 10 { break } } for i := 0; i < 11; i++ { fmt.Println("Result of ", i, ":", arr[i]) } fmt.Println("Done") } ``` 终于,我们总算得到了想要的结果了,经过不懈努力终于大功告成。这里没创建一个协程,都有一个实参值传给形参,在匿名函数中就不存在共享的全局变量i了。 然而,在让我们回头看看我们的代码,却会发现,这与一般的多线程程序有什么区别的,而且写起来会感觉更麻烦,有过之而无不及啊!
想象一下,在一个大的系统中具有无数的锁、无数的共享变量、无数的业务逻辑与错误处理分支,那将是一场噩梦。这噩梦就是众多C/C++开发者正在经历的,其实Java和C#开发 者也好不到哪里去。Go语言既然以并发编程作为语言的最核心优势,当然不至于将这样的问题用这么无奈的方式来解决。Go语言提供的是另一种通信模型,即以消息机制而非共享 内存作为通信方式。
#0x03.channel
Go语言提供的消息通信机制被称为channel,接下来我们将详细介绍channel。现在,让我们用Go语言社区的那句著名的口号来开始这一小节:
channel是Go语言在语言级别提供的goroutine间的通信方式,可以使用channel在两个或多个goroutine之间传递消息。channel是进程内的通信方式,因此通过channel传递 对象的过程和调用函数时的参数传递行为比较一致,比如也可以传递指针等。如果需要跨进程通信,建议用分布式系统的方法来解决,比如使用Socket或者HTTP等通信协议。 channel是类型相关的。也就是说,一个channel只能传递一种类型的值,这个类型需要在声明channel时指定。我们先看下用channel的方式重写上面的例子是什么样子的 ``` go package main import "fmt" func main() { chs := make([]chan int, 10) // 申请一个10维的channel数组 arr := [11]int{} for i := 0; i < 10; i++ { chs[i] = make(chan int) // 对每个channel初始化 go func(ch chan int, i int) { arr[i] = i + i*i ch <- 1 // 写channel,应该在函数体的最后一行,许书(P94,代码清单4-4)上有bug }(chs[i], i) } for _, ch := range chs { <-ch // 读channel } for i := 0; i < 11; i++ { fmt.Println("Result of ", i, ":", arr[i]) } fmt.Println("Done") } ``` 在这段代码中,我们给每个goroutine分配了一个channel,在每个goroutine执行完时对该goroutine的channel进行写操作,而在第16行进行读操作,对于每个channel, 只有完成了写操作之后,才可以进行读取,否则会处于阻塞状态。因此在10个goroutine没有执行完之前,main函数是不会退出的。这样是不是比共享内存的方式更简单 而优雅呢?其实,我们还可以对代码继续进行简化,可以将匿名函数的传递的channel参数省去,而在匿名函数内部直接使用全局的chs[i],匿名函数可以修改为如下: ``` go go func(i int) { arr[i] = i + i*i chs[i] <- 1 // 写channel }(i) ```不要通过共享内存来通信,而应该通过通信来共享内存。
到这里,go的并行编程基本已经入门了,关于channel的更多详细的用法可以参见参考资料。
参考资料
[1] golang 官方主页:http://golang.org/ [2] go语言编程,许世伟