文章目录
- 多路复用
多路复用
操作系统级的I/O模型有:
- 阻塞I/O
- 非阻塞I/O
- 信号驱动I/O
- 异步I/O
- 多路复用I/O
Linux下,一切皆文件。包括普通文件、目录文件、字符设备文件(键盘、鼠标)、块设备文件(硬盘、光驱)、套接字socket等等。文件描述符(File descriptor,FD)是访问文件资源的抽象句柄,读写文件都要通过它。文件描述符就是个非负整数,每个进程默认都会打开3个文件描述符:0标准输入、1标准输出、2标准错误。由于内存限制,文件描述符是有上限的,可通过ulimit –n查看,文件描述符用完后应及时关闭。
阻塞I/O
非阻塞I/O
read和write默认是阻塞模式。
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t nbytes);
通过系统调用fcntl可将文件描述符设置成非阻塞模式。
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
多路复用I/O
select系统调用可同时监听1024个文件描述符的可读或可写状态。poll用链表存储文件描述符,摆脱了1024的上限。各操作系统实现了自己的I/O多路复用函数,如epoll、 evport 和kqueue等。
go多路复用函数以netpoll为前缀,针对不同的操作系统做了不同的封装,以达到最优的性能。在编译go语言时会根据目标平台选择特定的分支进行编译。
利用go channel的多路复用实现倒计时发射的demo。
package main
import (
"fmt"
"os"
"time"
)
//倒计时
func countDown(countCh chan int, n int, finishCh chan struct{}) {
if n <= 0 { //从n开始倒数
return
}
ticker := time.NewTicker(1 * time.Second) //创建一个周期性的定时器,每隔1秒执行一次
for {
countCh <- n //把n放入管道
<-ticker.C //等1秒钟
n-- //n减1
if n <= 0 { //n减到0时退出
ticker.Stop() //停止定时器
finishCh <- struct{}{} //成功结束
break //退出for循环
}
}
}
//中止
func abort(ch chan struct{}) {
buffer := make([]byte, 1)
os.Stdin.Read(buffer) //阻塞式IO,如果标准输入里没数据,该行一直阻塞。注意在键盘上敲完后要按下Enter才会把输入发给Stdin
ch <- struct{}{}
}
func main() {
countCh := make(chan int)
finishCh := make(chan struct{})
go countDown(countCh, 10, finishCh) //开一个子协程,去往countCh和finishCh里放数据
abortCh := make(chan struct{})
go abort(abortCh) //开一个子协程,去往abortCh里放数据
LOOP:
for { //循环监听
select { //同时监听3个channel,谁先准备好就执行谁,然后进入下一次for循环
case n := <-countCh:
fmt.Println(n)
case <-finishCh:
fmt.Println("finish")
break LOOP //退出for循环。在使用for select时,单独一个break不能退出for循环
case <-abortCh:
fmt.Println("abort")
break LOOP //退出for循环
}
}
}
timeout实现
-
ctx,cancel:=context.WithCancel(context.Background())调用cancel()将关闭ctx.Done()对应的管道
-
ctx,cancel:=context.WithTimeout(context.Background(),time.Microsecond*100)调用cancel或者到达超时时间都将关闭ctx.Done()对应的管道
-
ctx.Done()管道关闭后读操作将立即返回
函数超时控制的4种实现。
package main
import (
"context"
"fmt"
"time"
)
const (
WorkUseTime = 500 * time.Millisecond
Timeout = 100 * time.Millisecond
)
//模拟一个耗时较长的任务
func LongTimeWork() {
time.Sleep(WorkUseTime)
return
}
//模拟一个接口处理函数
func Handle1() {
deadline := make(chan struct{}, 1)
workDone := make(chan struct{}, 1)
go func() { //把要控制超时的函数放到一个协程里
LongTimeWork()
workDone <- struct{}{}
}()
go func() { //把要控制超时的函数放到一个协程里
time.Sleep(Timeout)
deadline <- struct{}{}
}()
select { //下面的case只执行最早到来的那一个
case <-workDone:
fmt.Println("LongTimeWork return")
case <-deadline:
fmt.Println("LongTimeWork timeout")
}
}
//模拟一个接口处理函数
func Handle2() {
workDone := make(chan struct{}, 1)
go func() { //把要控制超时的函数放到一个协程里
LongTimeWork()
workDone <- struct{}{}
}()
select { //下面的case只执行最早到来的那一个
case <-workDone:
fmt.Println("LongTimeWork return")
case <-time.After(Timeout):
fmt.Println("LongTimeWork timeout")
}
}
//模拟一个接口处理函数
func Handle3() {
//通过显式sleep再调用cancle()来实现对函数的超时控制
//调用cancel()将关闭ctx.Done()对应的管道
ctx, cancel := context.WithCancel(context.Background())
workDone := make(chan struct{}, 1)
go func() { //把要控制超时的函数放到一个协程里
LongTimeWork()
workDone <- struct{}{}
}()
go func() {
//100毫秒后调用cancel(),关闭ctx.Done()
time.Sleep(Timeout)
cancel()
}()
select { //下面的case只执行最早到来的那一个
case <-workDone:
fmt.Println("LongTimeWork return")
case <-ctx.Done(): //ctx.Done()是一个管道,调用了cancel()都会关闭这个管道,然后读操作就会立即返回
fmt.Println("LongTimeWork timeout")
}
}//LongTimeWork timeout
//模拟一个接口处理函数
func Handle4() {
//借助于带超时的context来实现对函数的超时控制
//调用cancel()或到达超时时间都将关闭ctx.Done()对应的管道
ctx, cancel := context.WithTimeout(context.Background(), Timeout)
defer cancel() //纯粹出于良好习惯,函数退出前调用cancel()
workDone := make(chan struct{}, 1)
go func() { //把要控制超时的函数放到一个协程里
LongTimeWork()
workDone <- struct{}{}
}()
select { //下面的case只执行最早到来的那一个
case <-workDone:
fmt.Println("LongTimeWork return")
case <-ctx.Done(): //ctx.Done()是一个管道,context超时或者调用了cancel()都会关闭这个管道,然后读操作就会立即返回
fmt.Println("LongTimeWork timeout")
}
}
func main() {
Handle1()
Handle2()
Handle3()
Handle4()
}