1 并行与并发的编程区别
1.1 并发与并行
1.1.1 说明
我们举个例子,如果有条高速公路 A 上面并排有 8 条车道,那么最大的并行车辆就是 8 辆此条高速公路 A 同时并排行走的车辆小于等于 8 辆的时候,车辆就可以并行运行。
CPU 也是这个原理,一个 CPU 相当于一个高速公路 A,核心数或者线程数就相当于并排可以通行的车道;而多个CPU 就相当于并排有多条高速公路,而每个高速公路并排有多个车道。
当谈论并发的时候一定要加个单位时间,也就是说单位时间内并发量是多少?
离开了单位时间其实是没有意义的。俗话说,一心不能二用,这对计算机也一样,原则上一个 CPU 只能分配给一个进程,以便运行这个进程。我们通常使用的计算机中只有一个 CPU,也就是说只有一颗心,要让它一心多用同时运行多个进程,就必须使用并发技术。实现并发技术相当复杂,最容易理解的是“时间片轮转进程调度算法”。
并发(concurrency):把任务在不同的时间点交给处理器进行处理。在同一时间点,任务并不会同时运行。
并行(parallelism):把每一个任务分配给每一个处理器独立完成。在同一时间点,任务一定是同时运行。
1.1.2 图例
1.2 异步编程
1.2.1 分类
1.利用资源的多线程编程方案
并发编程
2.利用单一资源,但是内部进行事件循环处理
并行编程
1.2.2 并行的说明
在并发的CPU理论中我们能知道,其实一个CPU只能承载一个线程处理
但是通过时间片分配算法,让代码在宏观上看起来是异步的,但是微观上是同步的
1.3 并行异步开发思路
1.3.1 事件循环机制实现并行异步
目的就是在主题中维护一个队列,然后循环队列
1.3.2 JAVA模拟示例
//Task
package com.kerwin.event;
public abstract class Task {
public abstract void run();
}
//ScheduleMicrotask
package com.kerwin.event;
public class ScheduleMicrotask extends Task {
@Override
public void run() {
}
}
//Timer
package com.kerwin.event;
public class Timer extends Task {
@Override
public void run() { }
}
//Main
package com.kerwin.event;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
public class Main {
//微任务队列-优先级高 private static Queue<ScheduleMicrotask> scheduleMicrotasks = new LinkedBlockingQueue<>();
//事件队列-优先级低
private static Queue<Timer> timers = new LinkedBlockingQueue<>();
public static void processAsync(){
while(!scheduleMicrotasks.isEmpty() || !timers.isEmpty()){
Task task = null;
if((task = scheduleMicrotasks.poll()) != null){
}else if((task = timers.poll()) != null){}
task.run();
}
}
public static void main(String[] args){
System.out.println("main start!");
timers.offer(new Timer(){
@Override
public void run() {
System.out.println("timer - event - A");
scheduleMicrotasks.offer(new ScheduleMicrotask(){
@Override
public void run() {
System.out.println("ScheduleMicrotask - A - in Timer A");
}
});
scheduleMicrotasks.offer(new ScheduleMicrotask(){
@Override
public void run() {
System.out.println("ScheduleMicrotask - B - in Timer A");
}
});
}
});
scheduleMicrotasks.offer(new ScheduleMicrotask(){
@Override
public void run() {
System.out.println("ScheduleMicrotask - C - in MAIN ");
timers.offer(new Timer(){
@Override
public void run() {
System.out.println("timer - event - B - in ScheduleMicrotask - C ");
}
});
}
});
System.out.println("main end!");
processAsync();
}
}
1.3.3 Dart实现方案
图例
说明
一个Dart应用有一个消息循环和两个消息队列-- event队列和microtask队列。
event队列包含所有外来的事件:I/O,mouse events,drawing events,timers,isolate之间的message等。
microtask 队列在Dart中是必要的,因为有时候事件处理想要在稍后完成一些任务但又希望是在执行下一个事件消息之前。
event队列包含Dart和来自系统其它位置的事件。但microtask队列只包含来自当前isolate的内部代码。
正如下面的流程图,当main方法退出后,event循环就开始它的工作。首先它会以FIFO的顺序执行micro task,当所有micro task执行完后它会从event 队列中取事件并执行。如此反复,直到两个队列都为空。
当一个Dart应用开始的标志是它的main isolate执行了main方法。当main方法退出后,main isolate的线程就会去逐一处理消息队列中的消息。
正如下面的流程图,当main方法退出后,event循环就开始它的工作。首先它会以FIFO的顺序执行micro task,当所有micro task执行完后它会从event 队列中取事件并执行。如此反复,直到两个队列都为空。
当一个Dart应用开始的标志是它的main isolate执行了main方法。当main方法退出后,main isolate的线程就会去逐一处理消息队列中的消息
实例
import 'dart:async';
/// 并行异步编程
void main(){
print("main begin");
Timer.run(() {
print("timer - event - A");
scheduleMicrotask(() {
print("ScheduleMicrotask - A - in Timer A");
});
scheduleMicrotask(() {
print("ScheduleMicrotask - B - in Timer A");
});
});
scheduleMicrotask(() {
print("ScheduleMicrotask - C - in MAIN ");
Timer.run(() {
print("timer - event - B - in ScheduleMicrotask - C ");
});
});
print("main end");
}
2 Future
2.1 概述
Dart 的异步对象,类似于 Javascript 中的 Promise。
Future 表示一个异步操作返回的结果;
Future 是一个泛型类;
Future 实现异步的原因是通过 dart 的 event loop,这里不过多赘述。
2.2 基本使用
2.2.1 说明
2.2.2 代码
import 'package:http/http.dart' as http;
Future getIp() {
final url = 'https://httpbin.org/ip';
return http.get(url).then((res) {
return res.body;
});
}
void main() {
getIp().then((res) {
print(res);
}).catchError((err) {
print(err);
});
}
2.3 方法
2.3.1 then
通过 .then 的形式,获取 Future 对象执行成功后的结果。
示例
//第一个参数 onValue
void main() {
testThen().then((res) {
print(res);
// 异步数据
// 可以将数据处理后,继续 return 出去,实现链式调用
return "加工后的${res}";
}).then((res) {
print(res);
// 加工后的异步数据
});
}
// 返回一个 Future<String> 对象,其中 String 可以省略,会被 dart 推断出来
Future<String> testThen() {
// return 一个 Future 对象
return new Future(() {
// 发送给 .then 的数据
return "异步数据";
});
}
//第二个参数 onError
void main() {
testThen().then((res) {
// 不执行 onValue 的方式
print(res);
return "加工后的${res}";
}, onError: (err) {
// 接收到 Future 的错误
print(err);
// 这是一个异常
});
}
Future<String> testThen() {
return new Future(() {
throw ('这是一个异常');
});
}
2.3.2 catchError
它比 then/onError 的接受范围要广,它不仅能接受 Future 中的异常,还能接受 then 中的异常;
如果没有填 then/onError
示例
void main() {
testThen().then((res) {
print(res);
return "加工后的${res}";
},onError: (err) {
print("onError: ${err}");
throw ("这是 onError 抛出的异常");
}).catchError((err) {
print("catchError: ${err}");
});
}
Future<String> testThen() {
return new Future(() {
throw ('这是一个异常');
});
}
// 执行结果
// onError: 这是一个异常
// catchError: 这是 onError 抛出的异常
2.3.3 whenComplete
在 Future 完成之后总是会调用,不管是错误导致的完成还是正常执行完毕,并且返回一个 Future 对象。类似于 JavaScript 中的 Promise 中的 finally。
示例
void main() {
testThen().then((res) {
print(res);
return "加工后的${res}";
}, onError: (err) {
print("onError: ${err}");
throw ("这是 onError 抛出的异常");
}).catchError((err) {
print("catchError: ${err}");
}).whenComplete(() {
print("执行完成");
});
}
Future<String> testThen() {
return new Future(() {
throw ('这是一个异常');
});
}
// 执行结果
// onError: 这是一个异常
// catchError: 这是 onError 抛出的异常
// 执行完成
2.4 静态方法
2.4.1 Future.delayed()
执行一个延时任务。
示例
void main(List<String> args) {
print('start....');
Future.delayed(Duration(seconds: 3), () => print("延时任务"));
print('end....');
// 执行结果:
// start....
// end....
// 延时任务
}
//注: delayed 中实现的延时操作通过 Timer 来实现的,
//在实际开发中,如果只是一个单纯的延时操作,建议使用 Timer。
void main(List<String> args) {
print('start....');
new Timer(Duration(seconds: 3), () => print("延时任务"));
print('end....');
// 执行结果:
// start....
// end....
// 延时任务
}
2.4.2 Future.value()
创建一个返回指定 value 值的 Future :
void main() {
print(1);
Future.value("abc").then((res) => print(res));
print(2);
// 执行结果
// 1
// 2
// abc
}
2.5 执行顺序
2.5.1 说明
future是一个异步任务的封装,提供了让自己进行任务转换的过程
2.5.2 方案
Future() - 默认为timer
Future.sync() - 同步在主线执行
Future.microtask() - 转换微任务
Future.value(val) 根据值选择,同时可以控制顺序
2.6 不常用,但是同样挺重要的方法
2.6.1 Future.wait()
执行多个 Future,等待所有的 Future 执行完成。
void main() {
Future.wait([futureA(), futureB()])
.then((res) {
print(res);
});
}
Future futureA() {
return Future.delayed(Duration(seconds: 3), () {
print("futureA 完成了");
});
}
Future futureB() {
return Future.delayed(Duration(seconds: 5), () {
print("futureB 完成了");
});
}
// 执行结果
// futureA 完成了
// futureB 完成了
// [null, null]
2.6.2 Future.any()
执行多个 Future,哪个先执行完成,就返回对应的数据。
void main() {
Future.any([futureA(), futureB()])
.then((res) {
// 此时 res 获取的是 futureA() 执行的结果
print(res);
});
}
Future futureA() {
return Future.delayed(Duration(seconds: 3), () {
print("futureA 完成了");
});
}
Future futureB() {
return Future.delayed(Duration(seconds: 5), () {
print("futureB 完成了");
});
}
// 执行结果
// futureA 完成了
// null ----------- Future.any 返回的结果
// futureB 完成了
2.6.3 timeout
设置异步操作的超时时长,返回的同样是个 Future.
import 'dart:async';
void main() {
futureA();
}
Future futureA() {
return Future(() {
new Timer(Duration(seconds: 5), () {
print("测试");
});
}).timeout(Duration(seconds: 3)).then((res) {
// 设置超时
print(res);
print("超时后执行的 then");
});
}
// 执行结果
// null
// 超时后执行的 then
// 测试
3 Stream
3.1 简介
Stream是一个异步的事件队列,也是响应式编程的实现
编程和核心就是处理数据,从上游拿到数据,经过处理后传递给下游。
后续随着需要处理的数据越来越多,有了集合的概念,包装了一系列数据在上下游之间进行传递。
随着计算资源的发展,业务数据的处理可能要跨多个线程,这样数据处理流程就从单个线程的同步处理变成跨多个线程的异步处理。此时集合通常充当多个线程之间协作的缓冲池,当缓冲池满了的时候上游线程阻塞,空了的时候下游线程阻塞。
在开发过程中经常需要开发人员写很多遍历数据集合的操作,就出现了集合的迭代器来简化遍历。在遍历过程中常会有筛选、排序、映射转换、合并或者拆分等操作。
流的出现就是简化针对集合的这些操作,方便开发。流关注的是针对数据源的一系列操作而不是数据的存储。
所以流就是一个数据源+管道(多个中间操作)+终端操作组成。
数据源:定义流操作的原始数据是哪里来的,可以是集合、数组、生成器函数、io管道以及其他流作为数据来源。
中间操作:针对流中的数据进行处理,处理产生的结果还是流:比如过滤filter、一对多flatMap、一对一的变换map、排序sorted等等操作。
终端操作:定义了流中元素最终要经历的操作,比如最大最小值、个数统计、转换成集合或其他类型、提供对元素遍历的接口等。终端操作的执行意味着定义的流开始消耗,等最后一个元素经历过终端操作后,这个流就相当于完成了。
流的中间操作是惰性的:中间操作并不会立即执行,而是等终端操作开始执行时才会被执行。
中间操作又被分为
(1)有状态的中间操作:比如排序、去重等操作,需要知道整个序列的所有元素才可以完成操作。
(2)无状态的中间操作:比如过滤、映射等操作,单个元素就可以完成操作,不需要知道其他元素的状态。所以这种中间操作是可以并行处理的。
短路操作:如果在提供无限流输入时,它可能会产生一个有限的流,那么他就是短路的。如果在无限流作为输入时,它可能在有限的时间内终止,这个终端操作是短路的。
流不存储数据,流的中间操作并不修改数据源的结构。流的短路操作让其可以处理无线数据源。流的延迟特性让其可以优化中间操作。
流的惰性让流可以处理异步事件。
大致了解了流的特性之后,我们开始看Dart中的流是如何使用的。
Future和Stream是Dart异步库中的核心,Future代表的是单个异步事件,Stream代表的是一个异步事件序列
3.2 创建流
3.2.1 通过生成器创建流
async* 标记的方法称为异步生成器,在其中可以通过yield生成单个元素,yield*生成多个元素,最终汇集成流
示例
Stream<String> createStream() async* {
for (int i = 0; i < 100; i++) {
await Future.delayed(Duration(seconds: 1));
yield "第${i}份数据";
}
}
main() {
Stream<String> result = createStream();
Future future = result.forEach((element) {
print(element);
});
print('future开始!');
}
3.2.2 通过流命名构造函数创建流
根据已有的数据序列创建流序列。
Stream.fromIterable(Iterable<T> elements)
根据Future创建流
Stream.fromFuture(Future<T> future) Stream.fromFutures(Iterable<Future<T>> futures)
3.2.3 通过StreamController创建流
其实上面(2)中创建流的方式底层都是通过StreamController来创建的。
StreamController<int> controller = StreamController();
Stream<int> stream = controller.stream;
3.3 流的订阅
3.3.1 说明
Stream可以通过listen方法来创建流的订阅,需要回调函数。返回的是订阅类StreamSubscription;
通订阅流可以监听流中的数据或事件。
3.3.2 单订阅流
默认情况下创建的流都是单订阅流,单订阅流只能被订阅一次,第二次监听会报错!监听开始之前的元素不会被订阅。但 Stream 可以通过 transform() 方法(返回另一个 Stream)进行连续调用。通过 Stream.asBroadcastStream() 可以将一个单订阅模式的 Stream 转换成一个多订阅模式的 Stream,isBroadcast 属性可以判断当前 Stream 所处的模式。
3.3.3 广播订阅
广播流允许存在任意数量的 listener,并且无论是否存在 listener,它都能产生事件,所以中途加入的 listener 不会侦听到已发生的事件。
单订阅流常用于数据的传递,广播流用于事件的分发。
3.4 常用方法
3.4.1 Sink
Sink是流控制器最基础的接口,定义了一个通用的数据接收器所需要的方法,
3.4.2 EventSink
EventSink是对Sink的扩展,添加了处理Error的方法。
3.4.3 StreamConsumer
StreamConsumer 定义了可以接受流输入的槽
3.4.4 StreamSink
StreamSink定义了可以同步或异步接收流事件的接口方法。
3.4.5 StreamController
StreamController 是流控制器的核心接口,包含了流控制器该有的大多数接口方法。
其中
stream用于向外提供创建的Stream。
sink 是该控制器关联的流的数据来源。可以使用sink.add 方法向流中添加数据。
onListen, 当控制器中的流被监听的时候,会回调该方法。
onPause, 当流的监听主动暂停的时候,会回调该方法。
onResume, 当流的监听主动恢复监听的时候,会回调该方法。
onCancel,当流的监听取消监听的时候,会回调该方法。
流控制器中有两个工厂构造函数,分别构造了单订阅,和广播流以及同步和异步流。
3.5 案例
3.5.1 基本使用
import 'dart:async';
/// stream 主要是为了实现消息通信方案
void main(){
StreamController controller = StreamController();
controller.stream.listen((event) {
print("接受到数据:$event");
});
// //单一订阅模式下二次监听报错
// controller.stream.listen((event) {
// print("接受到数据:$event");
// });
controller.sink.add("abc");
controller.sink.add("123");
}
3.5.2 订阅模式-广播
import 'dart:async';
/// stream 主要是为了实现消息通信方案
void main(){
StreamController controller = StreamController();
controller.stream.listen((event) {
print("接受到数据:$event"); });
// //单一订阅模式下二次监听报错
// controller.stream.listen((event) {
// print("接受到数据:$event");
// });
controller.sink.add("abc");
controller.sink.add("123");
}
3.5.3 stream 创建方案
import 'dart:async';
/// stream 创建方案
void main(){
testFromFuture();
// testFromFutures();
// testFromIterable();
// testPeriodic1();
// testPeriodic2();
}
Future<String> getData(){
return Future.delayed(Duration(seconds: 5),(){
return "当前时间:${DateTime.now()}";
});
}
void testFromFuture(){
Stream.fromFuture(getData())
.listen((event) {
print("event:$event");
})
.onDone(() {
print("done ");
});
}
void testFromFutures(){
Stream.fromFutures([getData(),getData(),getData()])
.listen((event) {
print("event:$event");
})
.onDone(() {
print("done ");
});
}
void testFromIterable(){
var datas = [1,'hello',2,'he',5];
Stream.fromIterable(datas)
.listen((event) {
print("event:$event");
})
.onDone(() {
print("done ");
});
}
void testPeriodic1(){
Duration interval = Duration(seconds: 2);
int i = 0;
Stream.periodic(interval)
.listen((event) {
print("event:$event");
})
.onDone(() {
print("done ");
});
}
void testPeriodic2(){
Duration interval = Duration(seconds: 2);
int i = 0;
//take(5) 指定最大提取
Stream.periodic(interval,(data)=>data)
.listen((event) {
print("event:$event");
})
.onDone(() {
print("done ");
});
}
3.5.4 API使用
import 'dart:async';
/// stream 创建方案
void main() {
// testTake();
// testWhile();
// testWhere();
// testDistinct();
// testSkip();
testSkipWhile();
}
void testTake() {
Duration interval = Duration(seconds: 1);
int i = 0;
//take(5) 指定最大提取
Stream.periodic(interval, (data) => data).take(5).listen((event) {
print("event:$event");
}).onDone(() {
print("done ");
});
}
///循环
void testWhile() {
Duration interval = Duration(seconds: 1);
int i = 0;
//take(5) 指定最大提取
Stream.periodic(interval, (data) => data).takeWhile((element) {
print("takeWhile element:$element");
return element <= 5;
}).listen((event) {
print("event:$event");
}).onDone(() {
print("done ");
});
}
/// 条件过滤
void testWhere() {
Duration interval = Duration(seconds: 2);
int i = 0;
//take(5) 指定最大提取
Stream.periodic(interval, (data) => data)
.takeWhile((element) {
return element <= 5;
})
.where((data) => data % 2 == 0)
.listen((event) {
print("event:$event");
})
.onDone(() {
print("done ");
});
}
/// 去重
void testDistinct() {
var data = [1, 2, 'a', 'a', 1, 1, 3, 4];
Stream.fromIterable(data).distinct().listen((event) {
print("testDistinct listen:$event");
}).onDone(() {
print("testDistinct done");
});
}
void testSkip() {
var data = [1, 2, 'a', 'a', 1, 1, 3, 4];
Stream.fromIterable(data).skip(2).listen((event) {
print("testDistinct listen:$event");
}).onDone(() {
print("testDistinct done");
});
}
void testSkipWhile() {
Stream.periodic(Duration(seconds: 1),(data) => data)
.takeWhile((element) => element <= 6)
.skipWhile((element){
return element <= 3;
}).listen((event) {
print("testDistinct listen:$event");
}).onDone(() {
print("testDistinct done");
});
}