阻塞与流程控制
通常在并发程序中要尽力避免阻塞式操作,但有时又需要让代码暂时处于阻塞状态,以等待某种条件、信号或数据,然后再继续运行。
对于无缓冲通道,试图从无人写入的通道中读取,或者向无人读取的通道中写入,都会引起阻塞。
重点:利用无缓冲通道的阻塞I/O,可以很容易地在异步执行的多个Goroutine之间构建同步化的流程控制。
// 基于通道的流程控制
// 试图从无人写入的通道中读取,或者向无人读取的通道中写入,都会引起阻塞,利用
// 这一特性可在多"线程"之间建立某种"停——等"机制
// 在下例中,模拟了一个电子时间,每隔1s更新显示一次时间。在父线程中先于子线程写入而读取,产生阻塞;子线程先于父现场而写入,也会产生阻塞。
package main
import (
"fmt"
"time"
)
func clock(c chan string) {
ticker := time.NewTicker(time.Second) // 定时器,周期为1s
for { // 死循环
t := <-ticker.C // C是一个chnnel,每间隔一个定时周期,可以从通道内
// 度取1个时间信息
// 格式化时间展示格式并写入传入函数的channel c
c <- t.Format("02 Jan 2006 15:04:05")
}
}
func main() {
c := make(chan string)
go clock(c)
for {
message := <-c
fmt.Printf("\r%v", message)
}
}
通道型函数参数(只读、只写、可读可写)
可将通道作为参数传递给函数,并在其类型中指明该通道型参数是只读的、只写的,还是既可读又可写的。
func channelReader(c <-chan string) { // 只读通道
message := <-c
}
func channelWriter(c chan<- string) { // 只写通道
c <- "Hello World!"
}
func channelReaderAndWriter(c chan string) { // 可读写通道
message := <-c
c <- "Hello World!"
}
通过指定通道型参数的读写权限,有助于确保通道中数据的完整性,同时指定程序的哪部分可向通道发送数据,哪部分又能从通道接收数据。
select语句
在并发式编程中,经常需要利用多个通道,同时与多个Goroutine建立通信。
顺序遍历来自多个通道的消息显然并非好的设计,因为仅一个通道的阻塞就会影响对其它所有通道消息的处理,例如:
for {
msg1 := <-c1 //
fmt.Println(msg1)
msg2 := <-c2 //
fmt.Println(msg2)
}
假设负责向c1通道写入数据的"子线程"由于某种原因发生了阻塞,没能及时地写入数据,"父线程"将阻塞在从c1通道读取数据的语句,这时负责向c2通道写入数据的另外一个"子线程"将因为c2通道无人读取而发生写阻塞。这种因为一个"线程"发生阻塞导致所有"线程"都跟着一起阻塞的运行模式,显然有悖于并发式编程的设计初衷,应当着意避免。
// 多通道I/O(错误实例:顺序遍历)
// 顺序遍历来自多个通道的消息显然并非好的设计,因为
// 仅一个通道的阻塞就会影响对其它所有通道消息的处理
package main
import (
"fmt"
"time"
)
func proc(c chan rune, ch rune, // rune类型,unicode编码等价于int32
ms time.Duration) {
for {
c <- ch
time.Sleep(ms * time.Millisecond)
}
}
func main() {
c1 := make(chan rune)
c2 := make(chan rune)
go proc(c1, '-', 100) // 初衷:每100ms,打印1个-
go proc(c2, '+', 500) // 初衷:每500ms,打印1个+
for {
ch := <-c1
fmt.Printf("%c", ch)
ch = <-c2
fmt.Printf("%c", ch)
}
}
// 打印输出:
// +-+-+-+-+-+-+-+-+-+-+-+-+
// 实际情况,两个现场都是按照500ms的时间间隔来打印的,其原因在于两个channel/// 的读取数据都发生在同一个线程中,且二者是顺序执行的关系,c2阻塞时,c1也无法
// 执行。
select语句为多个通道创建了一系列接收者,哪个通道先有消息被写入就先接收哪个通道。
for {
select {
case message := <-c1: //
fmt.Println(message)
case message := <-c2: //
fmt.Println(message)
}
}
"父线程"中的select语句以阻塞方式,同时监视连接着多个"子线程"的多个通道,无论哪个"子线程"向其所持有的通道写入了数据,select语句都会立即有所察觉,并根据先到先得的原则,匹配到与发生写入动作的通道相对应的case分支,读取该通道中的数据。
// 多通道选择 (前一示例的正确处理形式)
// select语句为多个通道创建了一系列接收者,
// 哪个通道先有消息被写入就先接收哪个通道
package main
import (
"fmt"
"time"
)
func proc(c chan rune, ch rune,
ms time.Duration) {
for {
c <- ch
time.Sleep(ms * time.Millisecond)
}
}
func main() {
c1 := make(chan rune)
c2 := make(chan rune)
go proc(c1, '-', 100)
go proc(c2, '+', 500)
for {
select {
case ch := <-c1:
fmt.Printf("%c", ch)
case ch := <-c2:
fmt.Printf("%c", ch)
}
}
}
// 打印输出:
// +-----+-----+-----+-----+
要想从多个通道中以最及时的方式接收并处理消息,select语句是个不错的选择,但如果所有的通道都没有消息呢?
- select语句将会长时间甚至永远处于阻塞状态,这对于并发式编程同样是不利的。
可以设置一个超时时间,让select语句于指定的时间后解除阻塞,继续运行。
注:time包的After函数,其参数为某一时间值,该函数会返回1个channel。这个channel会在指定的参数时间之后,会有消息写入(一个时间消息)。
for {
select {
case message := <-c1:
fmt.Println(message)
case message := <-c2:
fmt.Println(message)
case <-time.After(time.Second): // 触发超时
fmt.Println("反正也没消息,不如摸会鱼吧……╮(╯ω╰)╭ ")
}
}
// 永久等待
// 若通道长时间无人写入,针对该
// 通道的select语句将会一直阻塞
package main
import (
"fmt"
"time"
)
func proc(c chan rune, ch rune,
ms time.Duration) {
for i := 0; ; { // 仅执行10次写入操作
if i < 10 {
c <- ch
i++
}
time.Sleep(ms * time.Millisecond)
}
}
func main() {
c1 := make(chan rune)
c2 := make(chan rune)
go proc(c1, '-', 100) // 写10次-
go proc(c2, '+', 500) // 写10次+
for {
select {
case ch := <-c1:
fmt.Printf("%c", ch)
case ch := <-c2:
fmt.Printf("%c", ch)
}
}
}
// 打印输出:
// +-----+-----++++++++
// 主线程在读取10个-与10个+后,就处于了永久阻塞状态。
// 等待超时
// 使用超时时间,可让select语句在长时间收不到消息的
// 情况下不至于一直阻塞,可利用这段时间执行空闲处理
package main
import (
"fmt"
"time"
)
func proc(c chan rune, ch rune,
ms time.Duration) {
for i := 0; ; {
if i < 10 {
c <- ch
i++
}
time.Sleep(ms * time.Millisecond)
}
}
func main() {
c1 := make(chan rune)
c2 := make(chan rune)
go proc(c1, '-', 100)
go proc(c2, '+', 500)
for {
select {
case c := <-c1:
fmt.Printf("%c", c)
case c := <-c2:
fmt.Printf("%c", c)
case t := <-time.After(time.Second): // 触发超时,1s
fmt.Printf("\n%s> Timeout!",
t.Format("2006/01/02 15:04:05"))
// ……还应有相应的退出循环,退出通道等善后操作
}
}
}
// 打印输出:
// +-----+-----++++++++
// 2020/01/04 16:45:57> Timeout!
退出通道
通过设置超时时间,固然可以解除处于阻塞状态的select语句,但有时解除阻塞的条件也许并不是时间。
为select语句添加一个退出通道,通过向退出通道发送消息解除select阻塞。
stop := make(chan bool)
go func() {
if 消息循环可以退出了 {
stop <- true
}
}()
escape := false
for !escape { // 消息循环
select {
... // 处理各种消息
case <-stop:
escape = true
}
}
// 退出通道(给电子时钟的实例添加退出通道操作)
// 为select语句添加退出通道,向退出通道发送消息以结束select循环
package main
import (
"fmt"
"time"
)
func clock(channel chan string) {
ticker := time.NewTicker(time.Second)
for {
t := <-ticker.C
channel <- t.Format(
"02 Jan 2006 15:04:05")
}
}
func main() {
work := make(chan string)
stop := make(chan bool)
go clock(work)
go func() {
time.Sleep(10 * time.Second) // 10s后关闭消息循环
stop <- true
}()
escape := false
for !escape {
select {
case message := <-work:
fmt.Printf("\r%v", message)
case <-stop: // 退出通道
escape = true
}
}
fmt.Println("\nTime's up!")
}
// 打印输出:
// 04 Feb 2020 18:00:48
// Time's up!