【Java基础(高级篇)】响应式编程

news2025/1/2 4:01:19

文章目录

  • 1. 概述
  • 2. stream 流式编程
  • 3. Reactive-Stream
  • 4. 响应式编程
  • 5. Reactor
    • 5.1 Mono和Flux
    • 5.2 subscribe()
    • 5.3 多线程
    • 5.4 常用操作API示例
    • 5.5 错误处理
    • 5.6 Sinks 工具类

1. 概述

本章将从响应式编程的开始,从 stream 开始逐步递进,如对流式编程或响应式编程十分熟悉的可直接跳过对应小节。本章内容因目前使用有限,仅供参考,目前也不必花费过多时间在该章内容上,待未来使用时再深入研究即可。本章内容可作为 webflux 学习的前置知识。

2. stream 流式编程

最佳实战:凡是写for循环处理数据的统一全部用StreamAPI进行替换

Stream所有数据和操作被组合成流管道流管道组成:

  • 一个数据源(原材料)
  • 零或多个中间操作(加工)
  • 一个终止操作(最终产物)

流的创建:主要有集合的 of 方法,以及 Stream.of

中间操作:可通过查看对应 api 源码注释看具体是中间操作还是终止操作, intermediate operation

  • filter :过滤
  • map :映射,一对一
  • flatMap :散列,一对多
filter、
map、mapToInt、mapToLong、mapToDouble
flatMap、flatMapToInt、flatMapToLong、flatMapToDouble
mapMulti、mapMultiToInt、mapMultiToLong、mapMultiToDouble、
parallel、unordered、onClose、sequential
distinct、sorted、peek、limit、skip、takeWhile、dropWhile

终止操作terminal operation ,必须调用终止操作才会真正执行。

forEach、forEachOrdered、toArray、reduce、collect、toList、min、
max、count、anyMatch、allMatch、noneMatch、findFirst、findAny、iterator

一个使用示例:

public static void main(String[] args) {
    Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5, 6, 7, 8);
    List<Integer> list = stream.filter(i -> i % 2 == 0).peek(System.out::println).toList();
    System.out.println(list);
}

流式编程就像制定工厂流水线,开发人员定制好流水线的原材料,各个加工步骤,最终产物。这条流水线完全制定好才会开始运行并产出。

3. Reactive-Stream

Reactive Streams是JVM面向流的库的标准和规范,是 jdk9 中的 API,方便本地开发基于异步、消息驱动的全事件回调系统:响应式系统。

API Components:注意,使用这些组件是天然异步且由 ForkJoinPool 线程池启用执行,所以程序最后调用 System.in.read() 来控制主线程不完全结束

  • Publisher :发布者接口;产生数据流
  • Subscriber :订阅者接口; 消费数据流
  • Subscription :订阅关系;订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅
  • Processor :处理器是同时实现了发布者和订阅者接口的组件。它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节,代表一个处理阶段,允许在数据流中进行转换、过滤和其他操作

数据流向Publisher ( dataBuffer) -> N 个 Processor -> Subscriber

public static void main(String[] args) throws Exception {
		// 1. 定义发布者:可发布消息
		try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()){
			// 2. 定义订阅者:订阅者可订阅发布者发布的消息
			Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {

				// 保存绑定关系
				private Flow.Subscription subscription;

				// 绑定订阅消息时触发
				@Override
				public void onSubscribe(Flow.Subscription subscription) {
					this.subscription = subscription;
					System.out.println("订阅事件发生了");
					subscription.request(1); // 背压模式,订阅者向发布者请求发布信息
					System.out.println("订阅者线程:" + Thread.currentThread()); // Thread[ForkJoinPool.commonPool-worker-1,5,main]
				}

				@Override
				public void onNext(String item) {
					System.out.println("本轮:" + item);
					subscription.request(1);
					if (item.equals("原材料0")) {
//						throw new RuntimeException("自控异常");
					}
					System.out.println("订阅者线程Next:" + Thread.currentThread()); // Thread[ForkJoinPool.commonPool-worker-1,5,main]
				}

				@Override
				public void onError(Throwable throwable) {
					System.out.println("异常了:" + throwable.getMessage());
				}

				@Override
				public void onComplete() {
					System.out.println("完成了");
				}
			};
			// 3. 发布者的订阅者列表中添加这名订阅者,后续发布信息会发送给订阅者
			publisher.subscribe(subscriber);

			// 4. 发布者发布消息
			for (int i = 0; i < 10; i++) {
				publisher.submit("原材料" + i);
			}

			System.out.println("主线程:" + Thread.currentThread()); // Thread[main,5,main]

			publisher.close(); // 这样才会回调 onComplete 方法

			System.in.read();
		}
	}

响应式编程:通常作为观察者模式的拓展,一般会使用线程池、DataBuffer

  • 底层:基于数据缓冲队列 + 消息驱动模型 + 异步回调机制
  • 编码:流式编程 + 链式调用 + 声明式API
  • 效果:优雅全异步 + 消息实时处理 + 高吞吐量 + 占用少量资源

发布订阅模式

在这里插入图片描述

响应式

在这里插入图片描述

4. 响应式编程

响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式。 这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。

  • 在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在JVM上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(使用 Flow 类)。

  • 响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。

  • 使用 iterator 是一种“命令式”(imperative)编程范式,即使访问元素的方法是 Iterable 的唯一职责。关键在于,什么时候执行 next() 获取元素取决于开发者。在响应式流中,相对应的 角色是 Publisher-Subscriber,但是 当有新的值到来的时候 ,却反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“控制流程”来定义对数据流的处理逻辑。

  • 除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。 一个 Publisher 可以推送新的值到它的 Subscriber(调用 onNext 方法), 同样也可以推送错误(调用 onError 方法)和完成(调用 onComplete 方法)信号。 错误和完成信号都可以终止响应式流。可以用下边的表达式描述:

    onNext x 0..N [onError | onComplete]
    

Java 提供了两种异步编程方式:

  • 回调(Callbacks) :异步方法没有返回值,而是采用一个 callback 作为参数(lambda 或匿名类),当结果出来后回调这个 callback。常见的例子比如 Swings 的 EventListener。(存在回调地狱问题)
  • Futures :异步方法 立即 返回一个 Future,该异步方法要返回结果的是 T 类型,通过 Future封装。这个结果并不是 立刻 可以拿到,而是等实际处理结束才可用。比如, ExecutorService 执行 Callable 任务时会返回 Future 对象。(当对 Future 对象最终调用 get() 方法时,仍然会导致阻塞,并且缺乏对多个值以及更进一步对错误的处理

回调模式是简单的,但是缺点 是在复杂的处理逻辑中,回调中会层层嵌入回调,导致 回调地狱(Callback Hell) 。Reactor 提供了丰富的编排操作,从而代码直观反映了处理流程,并且所有的操作保持在同一层次 (尽量避免了嵌套)。

5. Reactor

reactor是基于Reactive Streams的第四代响应式库规范,用于在JVM上构建非阻塞应用程序。

  • 完全非阻塞的,并提供高效的需求管理。它直接与Java的功能API、CompletableFuture、Stream和Duration交互
  • Reactor提供了两个响应式和可组合的API,Flux[N]Mono[0|1]
  • 适合微服务,提供基于netty背压机制的网络引擎(HTTP、TCP、UDP)
<dependencyManagement> 
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2023.0.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
    <!-- 单元测试依赖 -->
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-api</artifactId>
        <version>5.10.1</version>
        <scope>test</scope>
    </dependency>
</dependencies>

5.1 Mono和Flux

  • Flux:N个元素的流
  • Mono :0 | 1 个元素的流

响应式流:元素(内容) + 信号(完成/异常)

public class Main {

	public static void main(String[] args) throws Exception {
		Flux<Integer> just = Flux.just(1, 2, 3, 45, 0, 8);
		just.subscribe(System.out::println); // 逐个遍历打印
		just.subscribe(new BaseSubscriber<>() {
			@Override
			protected void hookOnNext(Integer value) {
				System.out.println(String.valueOf(value) + Thread.currentThread()); // Thread[main,5,main]
			}
		});
		System.out.println(Thread.currentThread()); // Thread[main,5,main]
	}
}

5.2 subscribe()

该方法传入参数可自定义消费者或订阅者,订阅者一般可继承 BaseSubscriber ,另外可传三个参数分别定义信号感知回调,分别是正常流元素消费,感知异常,感知正常结束。

Flux.just(1, 2, 3, 45, 0, 8).subscribe(
    v -> System.out.println("v=" + 10 / v), // 流元素消费
    throwable -> System.out.println("throwable=" + throwable.getMessage()), // 感知异常结束
    () -> System.out.println("感知流结束") // 感知正常结束
);

doOnXXXBaseSubscriber 以及取消流,订阅者推荐直接继承 BaseSubscriber

可理解为该方法传入的即最终消费者或订阅者,而中间调用的一系列方法是中间处理过程

public static void main(String[] args) throws Exception {
    Flux.just(1, 2, 3, 45, 0, 8).doOnNext(value -> {
        System.out.println("一起玩" + value + Thread.currentThread());
    }).subscribe(new BaseSubscriber<>() {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println("绑定成功" + Thread.currentThread());
            request(1); // 向发布者请求 1 次数据,n 表示请求 n 次数据
            // requestUnbounded(); // 请求无限次数据,用了该方法则 onNext 中无需再写 request(1)
        }

        @Override
        protected void hookOnNext(Integer value) {
            System.out.println("当前数据:" + value + Thread.currentThread());
            if (value == 45) {
                cancel(); // 取消流
            }
            request(1); // 继续要数据
        }
    });
}

输出:

绑定成功Thread[main,5,main]
一起玩1Thread[main,5,main]
当前数据:1Thread[main,5,main]
一起玩2Thread[main,5,main]
当前数据:2Thread[main,5,main]
一起玩3Thread[main,5,main]
当前数据:3Thread[main,5,main]
一起玩45Thread[main,5,main]
当前数据:45Thread[main,5,main]

5.3 多线程

默认订阅者都是使用当前线程执行,而发布者默认与订阅者使用同一线程,可使用 publishOnsubscribeOn 指定对应 Scheduler

  • publishOn :指定后续执行的线程,影响调用位置起到后续的线程
  • subscribeOn :指定源到后续的所有线程,但不会影响 publishOn 指定的逻辑
@Test
public void testThread() throws IOException {
    System.out.println(Thread.currentThread().getName());

    Scheduler publisherScheduler = Schedulers.newParallel("Publisher");
    Scheduler subscriberScheduler = Schedulers.newParallel("Subscriber");

    Flux.range(1, 3)
        .doOnNext(item -> System.out.println("Publisher Default:" + item + Thread.currentThread().getName()))
        .publishOn(publisherScheduler)
        .doOnNext(item -> System.out.println("Publisher publishOn:" + item + Thread.currentThread().getName()))
        .subscribeOn(subscriberScheduler)
        .doOnNext(item -> System.out.println("Publisher subscribeOn:" + item + Thread.currentThread().getName()))
        .subscribe(item -> System.out.println("Subscriber default:" + item + Thread.currentThread().getName()));

    System.in.read();
}

执行的输出结果是:

main
Publisher Default:1Subscriber-1
Publisher Default:2Subscriber-1
Publisher Default:3Subscriber-1
Publisher publishOn:1Publisher-2
Publisher subscribeOn:1Publisher-2
Subscriber default:1Publisher-2
Publisher publishOn:2Publisher-2
Publisher subscribeOn:2Publisher-2
Subscriber default:2Publisher-2
Publisher publishOn:3Publisher-2
Publisher subscribeOn:3Publisher-2
Subscriber default:3Publisher-2

还有延迟发布方法 delayElements() ,如下将以间隔 500 毫秒的时间逐个发送元素

@Test
public void testDelayElements() throws IOException {
    Flux.range(0,5).delayElements(Duration.ofMillis(500)).log().subscribe();
    System.in.read();
}

并发流多线程分批处理示例:将 1000 条数据,按每 10 个一组分片处理,用 4个线程跑该段逻辑,用 runOn 指定线程池

@Test
public void testParallel() throws IOException {
    Flux.range(1,1000)
        .buffer(10)
        .parallel(4)
        .runOn(Schedulers.newParallel("线程池"))
        .log()
        .flatMap(Flux::fromIterable)
        .collectSortedList(Integer::compareTo)
        .subscribe(v -> System.out.println("v=" + v));
    System.in.read();
}

5.4 常用操作API示例

public class MainTest {

	/**
	 * 日志显示:log(),下面为对应打印的解释
	 * onSubscribe:流被订阅
	 * request(unbounded):请求无限数据
	 * onNext(2): 每个数据到达
	 * onComplete:流结束
	 */
	@Test
	public void testLog() {
		Flux.just(1, 2, 3, 45, 0, 8)
//				.log()
				.filter(i -> i % 2 == 0)
				.log()
				.subscribe();
	}

	/**
	 * 同步环境 生成 0~10 的序列
	 * sink 是通道,sink.next(obj) 向下游发布 obj 对象
	 * sink.complete() 迭代完成
	 */
	@Test
	public void testGenerate() {
		Flux.generate( () -> 0, (state, sink) -> {
			sink.next(state);
			if (state == 10) sink.complete();
			return state + 1;
		}).log().subscribe();
	}

	/**
	 * 多线程环境 create:常用于监听事件,并将事件信息传入管道
	 * [ INFO] (main) onSubscribe(FluxCreate.BufferAsyncSink)
	 * [ INFO] (main) request(unbounded)
	 * 做家务
	 * [ INFO] (main) onNext(家务)
	 */
	@Test
	public void testCreate() {
		interface Listener {
			void afterDoSomeThing(String event);
		};
		class DoSomeThing {

			public void doSomeThing(String thing) {
				System.out.println("做" + thing);
				for (Listener listener : afterListenerList) {
					listener.afterDoSomeThing(thing);
				}
			}

			private final List<Listener> afterListenerList = new ArrayList<>();

			public void register(Listener listener) {
				afterListenerList.add(listener);
			}
		}

		DoSomeThing doSomeThing = new DoSomeThing();
		Flux.create(sink -> doSomeThing.register(sink::next)).log().subscribe();
		doSomeThing.doSomeThing("家务");
	}

	/**
	 * 当不使用缓冲区时,每有 1 个元素便会直接发给订阅者
	 * buffer(n):缓冲区,凑够数量 n 再发送给订阅者,订阅者接收到的将是 n 个元素组成的 ArrayList 集合
	 * request(n)含义:找发布者请求 n 次数据,每次请求 bufferSize 个数据,总共能得到 n * bufferSize 个数据
	 * [ INFO] (main) onNext([1, 2, 3])
	 * [ INFO] (main) onNext([4, 5, 6])
	 * [ INFO] (main) onNext([7, 8, 9])
	 * [ INFO] (main) onNext([10])
	 */
	@Test
	public void testBuffer() {
		Flux.range(1, 10).buffer(3).log().subscribe();
	}

	/**
	 * 预请求
	 * limitRate(n):首次 request(n),请求了 75% * n(四舍五入) 次后直接请求 request(75% * n) ,且后续均 request(75% * n)
	 */
	@Test
	public void testLimitRate() {
		Flux.range(1, 10).log().limitRate(4).subscribe();
	}

	/**
	 * map:一一映射
	 */
	@Test
	public void testMap() {
		Flux.range(1, 10).map(value -> value + 1).log().subscribe();
	}

	/**
	 * handle:类似于 map 可用于实现一对一映射,但更加强大的是sink.next()可以传不同类型
	 */
	@Test
	public void testHandle() {
		Flux.range(1, 10)
				.handle((value, sink) -> {
					if (value % 2 == 0) sink.next(value);
					else sink.next("字符串" + value);
				}).log().subscribe();
	}

	/**
	 * 扁平化处理:{ "张 三", "李 四"} 变为 { "张", "三", "李", "四"}
	 */
	@Test
	public void testFlatMap() {
		Flux.just("张 三", "李 四")
				.flatMap(item -> {
					String[] strings = item.split(" ");
					return Flux.fromArray(strings);
				})
				.log()
				.subscribe();
	}

	/**
	 * 流连接
	 * concatWith:两个流元素类型要求一致
	 * concat:静态方法,元素类型无要求
	 * concatMap:将流中单个元素映射成其他流,再将所有流组合成一个流
	 */
	@Test
	public void testConcat() {
		Flux.just(1, 2).concatWith(Flux.just(3,4)).log().subscribe();

		Flux.concat(Flux.just(1, 2), Flux.just("a",4)).log().subscribe();

		Flux.just(1, 2).concatMap(i -> Flux.just("key" + i, "value" + i)).log().subscribe();
	}

	/**
	 * 把流变形成新数据
	 * transform:无状态转换; 原理,无论多少个订阅者,transform只执行一次
	 * transformDeferred:有状态转换; 每个订阅者transform都只执行一次
	 */
	@Test
	public void testTransform() {
		AtomicInteger atomicInteger = new AtomicInteger(1);
		Flux<String> flux = Flux.just("a", "b", "c").transformDeferred(
				items -> {
					System.out.println("调用次数:" + atomicInteger.getAndIncrement());
					return items.map(String::toUpperCase);
				});
		flux.subscribe(System.out::println);
		flux.subscribe(System.out::println);
	}

	/**
	 * 空流是 Flux.empty(); Flux.just(null) 会报空指针异常
	 * switchIfEmpty:如果是空流则转换成其他流
	 * defaultIfEmpty:如果是空流则传入单个元素
	 */
	@Test
	public void testEmpty() {
		Flux.empty().switchIfEmpty(Flux.empty()).defaultIfEmpty("a").log().subscribe();
	}

	/**
	 * Flux.merge:按照流中每个元素发布的时间顺序合并流
	 * Flux.mergeSequential:按照流发布的时间顺序合并流,如流1中有多个元素且流1元素最先发布,则流1中元素会被合并到最开头
	 */
	@Test
	public void testMerge() throws IOException {
		Flux.merge(
				Flux.range(0, 2).delayElements(Duration.ofMillis(300)),
				Flux.range(5, 2).delayElements(Duration.ofMillis(100)),
				Flux.range(10, 2).delayElements(Duration.ofMillis(200))
		).log().subscribe();
		System.in.read();
	}

	/**
	 * zip:将两个流压缩成元组,多余单个元素会被忽略
	 * Tuple:元组,[value1, value2]
	 */
	@Test
	public void testZip() {
		Flux.range(0, 2).zipWith(Flux.range(0,3)).log().subscribe();
	}
    
    /**
	 * 重试,会从头重试
	 */
	@Test
	public void testRetry() throws IOException {
		Flux.just(1)
				.log()
				.delayElements(Duration.ofSeconds(2))
				.timeout(Duration.ofSeconds(1))
				.retry(2)
				.onErrorReturn(999)
				.subscribe();
		System.in.read();
	}
    
    @Test
	public void testCache() throws IOException {
		Flux<Integer> cache = Flux.range(1, 10)
				.delayElements(Duration.ofSeconds(1))
				.cache(2);// 缓存,不传参表示缓存所有元素
		cache.log().subscribe(); // 会输出 1~10
		new Thread(() -> {
			try {
				TimeUnit.SECONDS.sleep(5);
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
            // 缓存后用子线程输出缓存的3和4,之后和上面订阅者一样使用同一线程池输出
            // 如果不使用 cache 缓存,则默认会每10秒逐个输出 1~10
			cache.subscribe(item -> System.out.println("子线程输出:" + item)); 
		}).start();
		System.in.read();
	}
    
	/**
	 * 阻塞式 API
	 */
	@Test
	public void testBlock() {
		List<Integer> list = Flux.range(1, 1000).collectList().block();
		System.out.println(list);
	}
    
    /**
	 * 响应式中的ThreadLocal,响应式编程中 ThreadLocal机制失效
	 */
	@Test
	public void testContextAPI () {
		Flux.just(1,2,3)
				.transformDeferredContextual((flux,context)->{
					System.out.println("flux = " + flux);
					System.out.println("context = " + context);
					return flux.map(i->i+"==>"+context.get("prefix"));
				})
				//上游能拿到下游的最近一次数据
				.contextWrite(Context.of("prefix","哈哈"))
				//ThreadLocal共享了数据,上游的所有人能看到; Context由下游传播给上游
				.subscribe(v-> System.out.println("v = " + v));
	}

}

5.5 错误处理

前面有介绍过在订阅者处可使用 subscribe() 第二个参数感知错误,这里介绍更多错误处理的 API ,主要为 onErrorXXX()

onErrorReturn :捕获异常,返回默认值

  • onErrorReturn(T fallbackValue) :错误时返回值 fallbackValue ,且结束流传输,订阅者将无法感知到此次异常
  • onErrorReturn(Class<E> type, T fallbackValue) :指定返回的异常类型
  • onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue) :异常断言
@Test
public void testError() {
    Flux.just(1,0,2)
        .map(item -> 2 / item)
        .onErrorReturn(999)
        .log() // 输出 onNext(2),onNext(999),onComplete()
        .subscribe();
}

onErrorResume :捕获异常,执行兜底方法。兜底方法需返回一个流供后续继续处理,或者再抛出自定义异常(更推荐用 onErrorMap

@Test
public void testError() {
    Flux.just(1,0,2)
        .map(item -> 2 / item)
        .onErrorResume(throwable -> {
            System.out.println("异常:" + throwable.getMessage());
            return Flux.just(9,0,9);
        })
        .log() // onNext(2),onNext(9),onNext(0),cancel()
        .map(item -> 3 / item)
        .onErrorResume(throwable -> Flux.error(new RuntimeException(throwable)))
        .subscribe();
}

onErrorMap :捕获并包装成一个业务异常,并重新抛出

@Test
public void testError() {
    Flux.just(1,0,2)
        .map(item -> 2 / item)
        .onErrorMap(throwable -> new RuntimeException(throwable))
        .subscribe();
}
  • doOnError :发生异常时会执行该方法
  • onErrorContinue :发生异常后会吃掉异常继续执行
  • doFinally :必定执行
@Test
public void testError() {
    Flux.just(1,0,2,3)
        .log() // request(unbounded),onNext(1),onNext(0),request(1),onNext(2),onNext(3),onComplete()
        .map(item -> 2 / item)
        .doOnError(throwable -> {
            System.out.println("可获取到异常" + throwable.getMessage());
        })
        .onErrorContinue((throwable, item) -> {
            System.out.println("发生了异常:" + throwable.getMessage());
            System.out.println("导致异常发生的值:" + throwable.getMessage());
        })
        .doFinally(signalType -> {
            System.out.println("流信号类型" + signalType);
        })
        .subscribe();
}

5.6 Sinks 工具类

	@Test
	public void testSinks() throws InterruptedException, IOException {
//        Sinks.many(); //发送Flux数据
//        Sinks.one(); //发送Mono数据
		
		// Sinks: 接受器,数据管道,所有数据顺着这个管道往下走的

		Sinks.many().unicast(); //单播:  这个管道只能绑定单个订阅者(消费者)
		Sinks.many().multicast();//多播: 这个管道能绑定多个订阅者
		Sinks.many().replay();//重放: 这个管道能重放元素。 是否给后来的订阅者把之前的元素依然发给它;

		// 单播模式
		Sinks.Many<Object> many = Sinks.many().unicast().onBackpressureBuffer(new PriorityQueue<>(2));

		for (int i = 0; i < 5; i++) {
			many.tryEmitNext(i); // 将元素放入管道
		}

        // 单播只能订阅一次,二次订阅会报错,如允许多个订阅者可用 多播模式
		many.asFlux().subscribe(item -> System.out.println("单播模式:" + item));

		// 重放模式:底层利用队列进行缓存之前数据
		Sinks.Many<Object> limit = Sinks.many().replay().limit(2); // 缓存两个

		new Thread(() -> {
			for (int i = 0; i < 5; i++) {
				try {
					limit.tryEmitNext(i); // 将元素放入管道
					TimeUnit.SECONDS.sleep(1);
				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			}
		}).start();
		limit.asFlux().subscribe(item -> System.out.println("重放模式订阅1:" + item)); // 0、1、2、3、4

		TimeUnit.SECONDS.sleep(4);

		limit.asFlux().subscribe(item -> System.out.println("重放模式订阅2:" + item)); // 2、3、4

		System.in.read();
	}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1390139.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用 GitHub 远程仓库

使用 GitHub 远程仓库 GitHub 是最大的 Git 版本库托管商&#xff0c;是成千上万的开发者和项目能够合作进行的中心。 大部分 Git 版本库都托管在 GitHub&#xff0c;很多开源项目使用 GitHub 实现 Git 托管、问题追踪、代码审查以及其它事情。本篇文章主要带大家上手 GitHub …

element-ui表单验证时undefined (reading ‘validate‘)

我搜索了一下&#xff0c;大部分都是说不仔细造成的&#xff0c;但是我一一对照了 1、el-form中我定义了ref&#xff0c;并且ref前面也是没有加冒号的 2、el-form中也绑定了rules&#xff0c;并且rules前面加了冒号 2、el-form-item我是加了prop的&#xff0c;并且和rules中…

程序员客栈发布《2023程序员自由职业报告》

自2020年以来&#xff0c;自由职业者的生态系统迅速繁荣&#xff0c;从而塑造了一个全新的职业发展模式。2023年&#xff0c;经济形势严峻&#xff0c;但灵活就业形式越来越流行&#xff0c;包括自由职业、远程办公和平台经济等。越来越多的人选择从事自由职业或者利用互联网平…

costmap_2d包介绍

文章目录 一. costmap_2d包介绍二. Costmap包的执行入口-- move_base中调用三. Costmap包的初始化以及维护3.1 Costmap2DROS类3.1.1 构造函数 Costmap2DROS::Costmap2DROS3.1.2 地图更新线程 Costmap2DROS::mapUpdateLoop3.1.3 地图更新 Costmap2DROS::updateMap()3.1.4 激活各…

【HuggingFace Transformer库学习笔记】基础组件学习:Datasets

基础组件——Datasets datasets基本使用 导入包 from datasets import *加载数据 datasets load_dataset("madao33/new-title-chinese") datasetsDatasetDict({train: Dataset({features: [title, content],num_rows: 5850})validation: Dataset({features: [titl…

高级定时器

本节主要介绍以下内容&#xff1a; 定时器简介 高级定时器功能框图讲解 一、定时器简介 定时器功能 &#xff1a;定时、输出比较、输入捕获、断路输入 定时器分类 &#xff1a;基本定时器、通用定时器、高级定时器 定时器资源 &#xff1a;F103有2个高级定时器、4个通…

C#编程-实现委托

实现委托 委托是可以存储对方法的引用的对象。在C#中,委托允许您动态地改变类中方法的引用。 考虑咖啡售货机的示例,它配置不同口味的咖啡,例如卡布奇诺咖啡和黑咖啡。在选择所需口味的咖啡时,售货机决定混合各种成分,例如奶粉、咖啡粉、热水、卡布奇诺咖啡粉。所有的材…

构建一个最新版本 Maven 项目

文章目录 构建一个最新版本 Maven 项目1. 所用各种软件的版本2. 踩过的坑3. 构建项目过程4. 项目打包方式 构建一个最新版本 Maven 项目 截止 2024 年 1 月 13 日&#xff0c;Apache 官网上 Maven 的最新安全版本为 3.9.6&#xff0c;下载、安装及配置方法见之前的博客&#x…

TIMESAT提取物候信息操作流程

TIMESAT提取物候信息操作流程 软件环境&#xff1a;Matlab R2014aTIMESAT3.2 数据介绍&#xff1a;MODIS A3或Q1的NVI&#xff08;NDVI&#xff09;均测试过这个流程&#xff0c;可行&#xff08;大拇指&#xff09;。 TIMESAT输入n年数据&#xff0c;提取n-1年的物候参数。通…

jmeter--4.参数化的方式

目录 1. 用户定义的变量 2. 用户参数 3. 函数助手 3.1 time获取当前时间 3.2 Random随机数 3.3 随机字符串函数 3.4 字符串变更为大写 4. CSV数据文件设置 5. 接口关联--正则和json等提取 1. 用户定义的变量 线程组->添加->配置元件->用户定义的变量 引用方…

【设计模式-06】Observer观察者模式

简要说明 事件处理模型 场景示例&#xff1a;小朋友睡醒了哭&#xff0c;饿&#xff01; 一、v1版本(披着面向对象的外衣的面向过程) /*** description: 观察者模式-v1版本(披着面向对象的外衣的面向过程)* author: flygo* time: 2022/7/18 16:57*/ public class ObserverMain…

MySQL 从零开始:05 MySQL 数据类型

文章目录 1、数值类型1.1 整形数值1.2 浮点型数值1.3 布尔值 2、日期和时间类型3、字符串类型3.1 CHAR 和 VARCHAR3.2 BINARY 和 VARBINARY3.3 BLOB 和 TEXT3.4 ENUM 类型3.5 SET 类型 4、空间数据类型5、JSON 数据类型5、JSON 数据类型 前面的讲解中已经接触到了表的创建&…

这款软件轻松解决你图片水印问题

随着数字时代的到来&#xff0c;图片已经成为我们生活中不可或缺的一部分。然而&#xff0c;很多时候&#xff0c;我们会遇到带有水印的图片&#xff0c;这不仅影响了图片的视觉效果&#xff0c;还可能遮挡了重要的内容。这时&#xff0c;一款专业的去水印工具就显得尤为重要。…

博途PLC增量式PID和脉冲轴组合控制阀门开度(算法介绍)

这篇博客我们以S7-1200PLC平台来举例,介绍我们的PID闭环控制器如何控制脉冲轴实现阀门角度控制。SMART PLC PID控制器控制伺服驱动器实现关节角度控制详细内容请参考下面文章: https://rxxw-control.blog.csdn.net/article/details/129658364https://rxxw-control.blog.csdn…

Python基础语法(中)—— python列表、字符串、函数

文章目录 5. python中的列表5.1 列表的初始化5.1.1 直接初始化5.1.2 通过append函数初始化5.1.3 通过for语句初始化列表长度和每个位置的数值 5.2访问列表元素5.3使用循环语句遍历列表5.4列表的切片操作5.5列表的复制5.6列表的运算5.7列表的常用操作5.8嵌套列表5.9列表其他小知…

【手撕C语言 第二集】初识C语言

​​ 一、变量的作用域和生命周期 作用域&#xff1a;一个变量在哪里可以使用它&#xff0c;哪里就是它的作用域。 局部变量的作用域&#xff1a;变量所在的局部范围 全局变量的作用域&#xff1a;整个工程 不管整个工程里面有多少源文件&#xff0c;都可以使用全局变量。这样…

力扣电话号码的组合

文章目录 题目说明做题思路代码实现代码解析 题目链接 题目说明 首先我们先分析一下这个题目题目中说呢先给出一个字符串这个字符串其实就是这个九键数字我们要按照要求将数字所代表的字符进行自由组合形成一个字符串并且这个字符串的长度和输入的数字字符串长度相同&#xff0…

《每天一分钟学习C语言·十二》各种指针问题

1、 int arr; int * restrict pt &arr; *pt 100; *arr 10;注&#xff1a;restrict只能修饰指针&#xff0c;被restrict修饰的指针指向一块内存后这块内存就归这个指针管理了&#xff0c;其他任何指针都不能修改这块内存的内容&#xff0c;这是一个约定&#xff0c;当…

备份和容灾讲解

备份和容灾 &#xff08;1&#xff09;容灾&#xff08;容许灾难的发生&#xff09;是一种架构方案&#xff0c;包括了很多方案&#xff0c;如下 本地高可用 双活&#xff08;特指存储&#xff0c;可以理解为两端同时对外提供服务&#xff09;&#xff1a;通过一个双写模块把…

20240115-插入删除 GetRandom O(1)

题目要求 实现 RandomizedSet 类&#xff1a; RandomizedSet() 初始化 RandomizedSet 对象。bool insert(int val) 将不存在的项目 val 插入随机集合。如果项目不存在&#xff0c;则返回 true&#xff0c;否则返回 false。bool remove(int val) 从集合中删除项目 val&#xf…