【2024】JAVA实现响应式编程Reactor具体API文档使用说明

news2024/12/26 12:01:30

目录💻

  • 前言
  • 一、简介
    • 1、响应式编程概述
      • 背景知识
      • 什么是响应式编程
      • 具体概述
      • 应用场景:
      • 常用的库和框架
  • 二、 Reactor实现响应式编程
    • 1、Flux 和 Mono介绍
        • Flux:
        • Mono:
        • Flux 和 Mono 的区别:
        • Flux 和 Mono 的关系:
    • 2、常用API使用
      • 添加依赖
      • 2.1、生产流
          • 常用汇总
        • 2.1.1、直接创建
        • 2.1.2、使用Sinks工具类
      • 2.2、中间操作
          • 常用汇总
        • 2.2.1、变换数据
        • 2.2.2、Context API
        • 2.2.3、合并组合Flux
        • 2.2.3、并发控制Flux
        • 2.2.4、doOnxxx:感知事件相关的 API
        • 2.2.4、onErrorXXX:异常处理相关的 API
        • 2.2.4、其他工具 API
      • 2.3、订阅流
          • 常用汇总
          • 详情使用

前言

响应式编程 (Reactive Programming) 是一种声明式编程范式,专注于数据流和变化的传播。随着软件系统日益复杂,对高并发、实时性和弹性的需求不断增长,响应式编程正逐渐成为主流。特别从Spring Boot3开始逐渐越来越重视使用,并且Spring框架为了全面拥抱响应式编程,提供了Spring WebFlux、Spring Data Reactive等模块,为Java开发者构建响应式应用提供了强大的支持。

一、简介

1、响应式编程概述

背景知识

为了应对高并发服务器端开发场景,在2009 年,微软提出了一个更优雅地实现异步编程的方式——Reactive Programming,我们称之为响应式编程。随后,Netflix 和LightBend 公司提供了RxJava 和Akka Stream 等技术,使得Java 平台也有了能够实现响应式编程的框架。

在2017 年9 月28 日,Spring 5 正式发布。Spring 5 发布最大的意义在于,它将响应式编程技术的普及向前推进了一大步。而同时,作为在背后支持Spring 5 响应式编程的框架Spring Reactor,也进入了里程碑式的3.1.0 版本。

什么是响应式编程

响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

响应式编程基于reactor(Reactor 是一个运行在 Java8 之上的响应式框架)的思想,当你做一个带有一定延迟的才能够返回的io操作时,不会阻塞,而是立刻返回一个流,并且订阅这个流,当这个流上产生了返回数据,可以立刻得到通知并调用回调函数处理数据。

电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。

响应式传播核心特点之一:变化传播:一个单元格变化之后,会像多米诺骨牌一样,导致直接和间接引用它的其他单元格均发生相应变化。

具体概述

响应式编程 (Reactive Programming) 是一种声明式的编程范式,它关注于数据流和变化的传播。这意味着可以通过定义数据流和它们之间的关系来构建应用程序,当数据发生变化时,应用程序会自动做出响应。

核心概念:

  1. Publisher:发布者;产生数据流
  2. Subscriber:订阅者;消费数据流
  3. Subscription:订阅关系
    • 订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅。
  4. Processor:处理器
    • 处理器是同时实现了发布者和订阅者接口的组件,它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节,代表一个处理阶段,允许你在数据流中进行转换、过滤和其他操作。

这种模型遵循Reactive Streams规范,确保了异步流的一致性和可靠性。
在这里插入图片描述

应用场景:

  • 实时数据流处理: 例如股票交易系统、传感器数据监控、网络游戏等。
  • 用户界面开发: 例如响应式Web应用、移动应用等。
  • 微服务架构: 响应式编程可以帮助构建更加弹性和可扩展的微服务系统。
  • 大数据处理: 响应式编程可以用于处理大规模数据集,例如使用Spark Streaming或Apache Flink。

常用的库和框架

  • RxJava: Java的响应式扩展库,提供了丰富的操作符和工具。
  • Reactor: Java的响应式编程框架,由Pivotal开发,是Spring WebFlux的基础。
  • Kotlin Coroutines: Kotlin的协程库,提供了轻量级的异步编程模型,可以与响应式编程框架集成。
  • Spring WebFlux: Spring框架的响应式Web框架,用于构建响应式Web应用。

二、 Reactor实现响应式编程

目前java要实现响应式编程主要就是使用Reactor进行使用, Reactor 也是 Spring WebFlux 的基础,所以在使用上可以与 Spring 框架无缝集成,构建响应式 Web 应用。使用下面我们要介绍的就是Reactor的常用API的使用,基本上学会了Reactor,就可以直接使用Spring WebFlux 构建高性能、可扩展的 Web 应用。

并且使用Spring WebFlux 构建的非阻塞 I/O 和事件驱动模型可以充分利用系统资源,可以有效的提高应用程序的性能和效率。相对于Spring MVC 这种阻塞式来说在性能上会得到很大的提升,

而且响应式编程可以更好地处理并发和异步操作,提高系统的弹性和可扩展性。

1、Flux 和 Mono介绍

在 Reactor 中,Flux 和 Mono 是两个核心组件,用于表示异步数据流。它们都实现了 Reactive Streams 规范中的 Publisher 接口,可以发出零个或多个元素。

Flux:
  • 表示一个可以发出零个或多个元素的异步序列。

  • 可以用于表示任何类型的数据流,例如用户输入事件、传感器数据、数据库查询结果等。

  • 提供了丰富的操作符,用于对数据流进行转换、过滤、合并、延迟等操作。

    Flux<String> names = Flux.just("Alice", "Bob", "Charlie");
    
    names.subscribe(System.out::println); // 输出: Alice Bob Charlie
    
    
Mono:
  • 表示一个最多发出一个元素的异步序列(1个或者0个)。
  • 通常用于表示单个结果,例如数据库查询结果、HTTP 请求响应等。
  • 也提供了一些操作符,用于对结果进行转换和处理。
Mono<String> name = Mono.just("Alice");

name.subscribe(System.out::println); // 输出: Alice

Flux 和 Mono 的区别:
特性FluxMono
发射元素个数零个或多个零个或一个
使用场景表示数据流表示单个结果
操作符提供丰富的操作符提供一些操作符
Flux 和 Mono 的关系:
  • Mono 可以看作是 Flux 的特例,它最多只发出一个元素。
  • 可以使用 Flux.from(mono) 将 Mono 转换为 Flux。
  • 可以使用 mono.flux() 将 Mono 转换为 Flux。
  • 可以使用 flux.single() 或 flux.singleOrEmpty() 将 Flux 转换为 Mono,但前提是 Flux 必须只包含一个元素或为空。

2、常用API使用

在具体使用上Flux和Mono的API使用都差不多,在分类上,我们可以大致分为三类,用来产生生产流的和中间做转换的,还有结束流的

添加依赖

            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-core</artifactId>

            </dependency>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-test</artifactId>
                <scope>test</scope>
            </dependency>

2.1、生产流

也就是创建Flux和Mono的API

常用汇总
方法作用
just(T... data)创建一个发射指定元素的 Flux
fromIterable(Iterable<? extends T> it)从 Iterable(集合) 创建一个 Flux
fromArray(T[] array)从数组创建 Flux
fromStream(Stream<? extends T> s)从Stream中创建 Flux,意味着我们可以把jdk8的Stream也直接进行无缝衔接转为Flux
range(int start, int count)创建发射指定范围的 Flux,一般可以用作计数器或者读秒配合delayElements()
empty()创建一个不发射任何元素的 Flux,mono也一样。
Flux<T> error(Throwable error创建一个发射错误信号的 Flux
2.1.1、直接创建
  1. Flux<T> just(T... data):创建一个发射指定元素的 Flux

        public void just() throws IOException {
            Flux<String> just = Flux.just("a", "b", "c", "d", "e");
            just.subscribe(System.out::println);
    
            System.in.read();
        }
    
    /*得到的结果		
    a
    b
    c
    d
    e
    
  2. Flux<T> fromIterable(Iterable<? extends T> it) :从 Iterable(集合) 创建一个 Flux

             
        public void fromIterable() throws IOException {
            List<String> list = Arrays.asList("a", "b", "c", "d", "e");
    
            Flux<String> just = Flux.fromIterable(list);
            just.subscribe(System.out::println);
    
            System.in.read();
        }
    /*得到的结果	    
    a
    b
    c
    d
    e
    
  3. Flux<T> fromArray(T[] array):从数组创建 Flux

        public void fromArray() throws IOException {
        String[] arrays = {"a", "b", "c", "d", "e"};
    
        Flux<String> just = Flux.fromArray(arrays);
        just.subscribe(System.out::println);
    
        System.in.read();
    	}
    /*得到的结果	    
    a
    b
    c
    d
    e
    
  4. Flux<T> fromStream(Stream<? extends T> s):从Stream中创建 Flux,意味着我们可以把jdk8的Stream也直接进行无缝衔接转为Flux

    	public void fromStream() throws IOException {
        	Stream<String> stream = Stream.of("a", "b", "c", "d", "e");
    
        	Flux<String> just = Flux.fromStream(stream);
        	just.subscribe(System.out::println);
    
        	System.in.read();
    	}
    /*得到的结果	    
    a
    b
    c
    d
    e
    
  5. Flux<Integer> range(int start, int count):创建发射指定范围的 Flux,一般可以用作计数器或者读秒配合delayElements()

        public void range() throws IOException {
            Flux<Integer> just = Flux.range(0,5)
                    .delayElements(Duration.ofSeconds(1)); //一秒发射一次
    
            just.subscribe(System.out::println);
    
            System.in.read();
        }
    /* 得到的结果
    0
    1
    2
    3
    4
    
  6. Flux<T> empty():创建一个不发射任何元素的 Flux,mono也一样。

        public void empty() throws IOException {
            Mono<Integer> mono = Mono.empty();
            Flux<Integer> flux = Flux.empty();
            flux.subscribe(System.out::println);
    
            System.in.read();
        }
    /* 订阅者不会接受到任何结果
    
  7. Flux<T> error(Throwable error):创建一个发射错误信号的 Flux

        public void error() throws IOException {
    
            Flux<Integer> just = Flux.error(new RuntimeException("执行错误!"));
            just.subscribe(System.out::println);
    
            System.in.read();
        }
    /*
    reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 执行错误!
    
2.1.2、使用Sinks工具类

Sinks 是一个用于创建各种类型的 Sink工厂类,然后在通过asFlux方法把Sink转为Flux。具体分别为:

  1. One<T> one():创建一个只接收一个数据的 Sink。

        public void one(){
            Sinks.One<Integer> one = Sinks.one();
            one.tryEmitValue(1);
            one.tryEmitValue(2);
            
            Mono<Integer> mono = one.asMono();
            mono.subscribe(System.out::println);
        }
    /* 不管发射多少个消息到one里都只会读取一个
    1
    
  2. many():创建一个可以接收多个数据的 Sink。

    • unicast():单播,只能有一个消费者订阅转换后到flux,第二个去订阅时会报错
        public void unicast() throws IOException {
            Sinks.Many<Integer> sink = Sinks.many()
                    .unicast()
                    .onBackpressureBuffer();
    // 生产消息
            sink.tryEmitNext(1); 
            sink.tryEmitNext(2);
    
            Flux<Integer> flux = sink.asFlux();
            flux.subscribe(v->System.out.println("p1:"+v));
    	//如果有第二个消费者来消费消息,会直接抛出错误
            flux.subscribe(v->System.out.println("p2:"+v));
            sink.tryEmitNext(3);
            sink.tryEmitNext(4);
    
            System.in.read();
        }
    /* 执行到订阅者2时会报错
    
    • multicast():多播。
      Sinks.many() 方法创建的多播 Sink 可以选择不同的策略解决背压问题。不同策略之间的区别主要体现在 如何处理下游消费者无法及时处理数据的情况。

      背压 (Backpressure):在响应式编程中,背压指的是下游消费者无法及时处理上游生产者发送的数据,导致数据堆积,最终可能导致内存溢出等问题。也就是生产者生产的速度大于消费者消费的速度

      • onBackpressureBuffer():如果下游消费者无法及时处理数据,则 Sink 会将数据缓存到一个缓冲区中,直到消费者能够处理为止,缓冲区可以设置大小

            public void onBackpressureBuffer() throws IOException {
                Sinks.Many<Integer> sink = Sinks.many()
                        .multicast()
                        .onBackpressureBuffer(); //开启缓冲区
        
                sink.tryEmitNext(1);
                sink.tryEmitNext(2);
        
        
                Flux<Integer> flux = sink.asFlux();
                //在订阅之前发送的消息会先被放在缓冲区
                flux.subscribe(v->System.out.println("p1:"+v));
                sink.tryEmitNext(3);
                sink.tryEmitNext(4);
        
                flux.subscribe(v->System.out.println("p2:"+v));
                sink.tryEmitNext(5);
                sink.tryEmitNext(6);
        
                System.in.read();
            }
        /* 第二个订阅的消费者会从订阅的时间开始接收消息,前面存放缓冲区的消息无法接收到
        p1:1
        p1:2
        p1:3
        p1:4
        p1:5
        p2:5
        p1:6
        p2:6
        
      • directBestEffort():如果下游消费者无法及时处理数据,则 Sink 会尽力将数据发送给消费者,但可能会丢弃一些数据。

            public void directBestEffort() throws IOException {
                Sinks.Many<Integer> sink = Sinks.many()
                        .multicast()
                        .directBestEffort();
        
                sink.tryEmitNext(1);
                sink.tryEmitNext(2);
                sink.tryEmitNext(3);
                sink.tryEmitNext(4);
        
        
                Flux<Integer> flux = sink.asFlux();
                flux.subscribe(v->System.out.println("p1:"+v));
                sink.tryEmitNext(5);
                sink.tryEmitNext(6);
        
                flux.subscribe(v->System.out.println("p2:"+v));
                sink.tryEmitNext(7);
                sink.tryEmitNext(8);
        
                System.in.read();
            }
        /*
        p1:5
        p1:6
        p1:7
        p2:7
        p1:8
        p2:8
        
      • directAllOrNothing():如果下游消费者无法及时处理数据,则 Sink 会直接丢弃所有后续数据,并发出一个 onError 信号。操作同上

  3. empty():创建一个空的 Sink

2.2、中间操作

常用汇总
方法作用
map()将每个元素映射到另一个类型的值。
flatMap()将每个元素映射到一个新的 Publisher,并将其扁平化成一个新的 Flux
transform()transformer会立即执行传递给它的转换函数,并生成新的 Flux。
filter()根据指定条件过滤
take(long n)获取到指定长度的结果结束
skip(long skipped)从第几个元素开始获取
distinct()去重,去除重复元素
contextWrite()在整个响应式链中传递上下文信息
merge()合并多个flux,按照源里每个元素的写入的时间排序,不会保证顺序
zip()压缩多个flux,按照下标把每个flux的元素都放入一个数组
combineLatest()把多个flux按照下标把值压缩再一起进行处理
publishOn(Scheduler scheduler)并发处理,指定后续操作的执行线程池
doOnComplete(Runnable onComplete)当数据流完成时执行指定的操作,可以用于清理资源、记录日志等。
doOnEach(Consumer<? super Signal<T>> signalConsumer)每个元素(流的数据和信号)到达的时候触发
onErrorReturn()吃掉异常,消费者无异常感知,返回一个兜底默认值,并结束flux流
onErrorContinue()忽略当前异常,仅通知记录,继续推进
retry()当发生错误时,重新订阅 Flux 流,最多尝试指定次数
buffer()缓冲指定元素再消费
cache()缓存数据,把订阅的数据缓存,可以设置缓存大小。当第二个订阅者来获取数据的时候就只能到缓存区去取数据
handle()自定义流中元素处理规则,内部可以自定义设置处理规则,然后再通过sink.next(),把处理后的数据发送到下一个节点
2.2.1、变换数据
  1. map():将每个元素映射到另一个类型的值。

        public void map(){
            Flux<String> flux = Flux.just("a", "b", "c", "d", "e")
                    .map(String::toUpperCase)
                    .log();
    
            flux.subscribe(System.out::println);
        }
    
  2. flatMap():将每个元素映射到一个新的 Publisher,并将其扁平化成一个新的 Flux

        public void flatMap() throws IOException {
            Flux<String> flux = Flux.just("a1", "b2", "c3", "d4", "e5")
                    .flatMap(v->
                            Flux.just(v.split(""))
                    );
    
            flux.subscribe(System.out::println);
    
            System.in.read();
        }
    

transformDeferredtransform都是用来转换 Flux 数据流的运算符。区别在于 转换逻辑的执行时机

  1. transformDeferred() 运算符会延迟执行传递给它的转换函数,直到下游订阅者订阅时才会执行。每个订阅者都会独立地执行转换逻辑,生成自己的转换结果。适用于复杂、有状态的转换

        public void transformDeferred() throws IOException {
            AtomicInteger atomicInteger = new AtomicInteger();
    
            Flux<String> flux = Flux.just("a", "b", "c")
                    .transformDeferred(v->{  //
                        int andIncrement = atomicInteger.getAndIncrement(); //把值加一
                        return v.map(it->it+andIncrement);
    
                    });
            //transformDeferred中,每个订阅者都会去执行一次transformDeferred,等于每个订阅者都会有自己独立的结果
            flux.subscribe(v-> System.out.println("订阅者1:"+v));
            flux.subscribe(v-> System.out.println("订阅者2:"+v));
    
            System.in.read();
        }
    /* transformDeferred每个订阅者都会独立生成结果
    订阅者1:a0
    订阅者1:b0
    订阅者1:c0
    订阅者2:a1
    订阅者2:b1
    订阅者2:c1
    
  2. transform():transformer会立即执行传递给它的转换函数,并生成新的 Flux。由于转换逻辑立即执行,所有订阅者都会共享同一个转换结果。适用于简单、无状态的转换

        public void transform() throws IOException {
            AtomicInteger atomicInteger = new AtomicInteger();
    
            Flux<String> flux = Flux.just("a", "b", "c")
                    .transform(v->{
                        int andIncrement = atomicInteger.getAndIncrement();
                        return v.map(it->it+andIncrement);
                    });
             //不管多少个订阅者都会共享transform这一个转换的结果,等于transform只会执行一次
            flux.subscribe(v-> System.out.println("订阅者1:"+v));
            flux.subscribe(v-> System.out.println("订阅者2:"+v));
    
            System.in.read();
        }
    /*transform所有订阅者都会共享同一个转换结果
    订阅者1:a0
    订阅者1:b0
    订阅者1:c0
    订阅者2:a0
    订阅者2:b0
    订阅者2:c0
    
  3. filter():根据指定条件过滤

        public void filter() throws IOException {
            Flux<String> flux = Flux.just("a", "b", "c")
                    .filter(v->
                        v.equals("b")
                    );
            flux.subscribe(System.out::println);
    
            System.in.read();
        }
    /*生成结果
    b
    
  4. take(long n):获取到指定长度的结果结束

        public void take() throws IOException {
            Flux<String> flux = Flux.just("a", "b", "c","d","e")
                    .take(3);
            flux.subscribe(System.out::println);
    
            System.in.read();
        }
    /*生成结果
    a
    b
    c
    
  5. skip(long skipped):从第几个元素开始获取

        public void skip() throws IOException {
            Flux<String> flux = Flux.just("a", "b", "c","d","e")
                    .skip(3);
            flux.subscribe(System.out::println);
    
            System.in.read();
        }
    /*生成结果
    d
    e
    
  6. distinct():去重,去除重复元素

        public void distinct() throws IOException {
            Flux<String> flux = Flux.just("a", "a", "c","a","e")
                    .distinct();
            flux.subscribe(System.out::println);
    
            System.in.read();
        }
    /*
    a
    c
    e
    
2.2.2、Context API

在Flux响应式编程中,Context API用于在整个响应式链中传递上下文信息,例如订阅者信息、调度器、钩子等。它类似于线程本地存储,但作用于响应式流。
可以使用Context API添加钩子函数,例如在响应式流的每个元素处理前后执行特定操作。

用法:

  1. transformDeferredContextual():两个参数一个是当前的flux对象,一个是Context上下文对象
  2. contextWrite():一个参数,就是Context对象
  3. Context:通过of方法把数据通过key,value的方式写入到flux中
    • of(Object key1, Object value1, Object key2, Object value2) :如果有多组参数要传入,通过of的(k, v, k, v)的方式这样写
        public void context() throws IOException {
    
            Flux<String> fluxs = Flux.just(1, 2, 3)
                    .transformDeferredContextual((flux, context) -> { //它会在每个元素处理之前,提供当前的 Flux 和下面添加的上下文context对象信息。
                        System.out.println("flux:" + flux);
                        System.out.println("context:" + context);
    
                        return flux.map(i -> i + "====>" + context.get("key1")); //对每个元素进行处理,将元素与上下文中的 "key" 值拼接在一起。
                    }).map(String::toUpperCase);
    
    	//	可以在后面把context通过contextWrite添加到上下文中
            Context context = Context.of("key1", "zhangsan","key2", "list");
            fluxs.contextWrite(context) //设置上下文中的 "key" 为 "zhangsan"。
                    .subscribe(v-> System.out.println("v = " + v));
    
            System.in.read();
        }
    /* 返回的结果
    flux:FluxArray 当前flux
    context:Context2{key1=zhangsan, key2=list}   Context对象,几组k,v,名字就是几
    v = 1====>ZHANGSAN
    v = 2====>ZHANGSAN
    v = 3====>ZHANGSAN	
    
2.2.3、合并组合Flux
  1. merge():合并多个flux,按照源里每个元素的写入的时间排序,不会保证顺序

        public void merge() throws InterruptedException {
            Flux<String> flux = Flux.just("a","b");
            Mono<String> mono = Mono.just("c");
    
            Flux.merge(mono,flux)
                    .subscribe(System.out::println);
    
            Thread.sleep(5000);
        }
    /* 输出结果,可能会出现a、c、b
    a
    b
    c
    
  2. concat():合并多个flux,会严格保证元素的顺序。

        public void concat() throws InterruptedException {
            Flux<String> flux1 = Flux.just("a","b");
            Flux<String> flux2 = Flux.just("c","d");
    
            Flux.concat(flux1,flux2)
                    .subscribe(System.out::println);
    
            Thread.sleep(5000);
        }
    /* 输出结果,一定会说先输出完flux1,再到flux2
    a
    b
    c
    d
    
  3. zip():压缩多个flux,按照下标把每个flux的元素都放入一个数组,如果多个flux的长度不一样,以最短的为长度

        public void zip() throws IOException {
            Flux<String> flux1 = Flux.just("a", "b", "c");
            Flux<Integer> flux2 = Flux.just(1, 2, 3,4);
            Flux<String> flux3 = Flux.just("A", "B", "C","D","E");
    
            Flux.zip(flux1, flux2, flux3)
                    .map(Tuple2::toString)
                    .subscribe(System.out::println);
    
            System.in.read();
        }
    /* 输出结果。会保证每个数组的长度都一样
    [a,1,A]
    [b,2,B]
    [c,3,C]
    
  4. zipWith:压缩拼接,作用同上,只不过是只能一个一个的压缩。

        public void zipWith() throws IOException {
            Flux<String> flux1 = Flux.just("a", "b", "c");
    
             Flux.just(1, 2, 3, 4)
                    .zipWith(flux1)
                    .subscribe(System.out::println);
    
            System.in.read();
        }
    /* 输出结果。
    [1,a]
    [2,b]
    [3,c]
    
  5. combineLatest():把多个flux按照下标把值压缩再一起进行处理

        public void combineLatest() throws IOException {
            Flux<String> flux1 = Flux.just("a", "b", "c");
            Flux<Integer> flux2 = Flux.just(1, 2, 3,4);
            Flux<String> flux3 = Flux.just("A", "B", "C");
    
    //        两个的写法
    //        Flux.combineLatest(flux1, flux2, (f1,f2)->f1+"-"+f2)
    //                .map(String::toUpperCase)
    //                .subscribe(System.out::println);
    
    //        多个的写法
            Iterable<Publisher<?>> publishers = Arrays.asList(flux1, flux2,flux3);
            Flux.combineLatest(publishers,fs->fs[0]+"-"+fs[1]+"-"+fs[2])
                    .map(String::toUpperCase)
                    .subscribe(System.out::println);
    
            System.in.read();
        }
    /* 获取按照下标拼接后的值
    C-4-A
    C-4-B
    C-4-C
    
    
2.2.3、并发控制Flux

Scheduler:调度器
Reactor 中,Schedulers 提供了多种指定线程的方式,可以根据不同的场景选择合适的 Scheduler 来执行任务。下面是常用的Scheduler

  • Schedulers.immediate():默认,无执行上下文,当前线程运行所有操作

  • Schedulers.single():使用固定的单线程

  • Schedulers.boundedElastic():使用一个有界弹性线程池执行任务。最大线程数为 CPU 核心数 * 10。

  • Schedulers.parallel():使用一个固定大小的线程池执行任务,线程池的大小默认为 CPU 核心数

  • Schedulers.fromExecutor(Executor executor):使用自定义的 Executor 执行任务

  • Schedulers.newParallel(String name, int parallelism):也可以通过new的方式自定义Scheduler ,直接指定线程池大小,作用同上

  1. parallel(int parallelism):并行处理,将 Flux 的数据流分成多个并行的轨道

  2. runOn():指定后续操作的执行线程,一般会配合parallel()一起使用,使用parallel控制并发数,再通过runOn绑定线程池

        public void parallel() throws IOException {
            Flux.just("a","b","c","d","e")
                    .log()
                    .parallel(4) //指定并行数
                    .runOn(Schedulers.newParallel("yy"))  //指定线程名称
                    .map(String::toUpperCase)
                    .log()
                    .subscribe();
    
            System.in.read();
        }
    /* 通过log查看使用runOn前后线程,可以看到Map操作都是使用的runOn指定的yy名称的线程执行的
     [           main] reactor.Flux.Array.1                     : | onNext(a)
     [           main] reactor.Flux.Array.1                     : | onNext(b)
     [           yy-2] reactor.Parallel.Map.2                   : onNext(A)
     [           main] reactor.Flux.Array.1                     : | onNext(c)
     [           main] reactor.Flux.Array.1                     : | onNext(d)
     [           yy-3] reactor.Parallel.Map.2                   : onNext(B)
     [           yy-5] reactor.Parallel.Map.2                   : onNext(D)
     [           main] reactor.Flux.Array.1                     : | onNext(e)
     [           yy-4] reactor.Parallel.Map.2                   : onNext(C)
     [           yy-2] reactor.Parallel.Map.2                   : onNext(E)
    
    
  3. publishOn(Scheduler scheduler):指定下游操作执行的线程,作用差不多相当于上面两个的组合。不会影响上游操作的执行线程,只影响其后的操作符

        public void publishOn() throws  InterruptedException {
        //自定义线程池
            Scheduler scheduler = Schedulers.fromExecutor(new ThreadPoolExecutor(
                    4,
                    8,
                    60,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(100),
                    (r)->{
                        Thread thread = new Thread(r);
                        thread.setName("yy:"+ thread.getName());
                        return thread;
            }));
    
            Flux.just("a", "b", "c", "d", "e")
                    .log()
                    .publishOn(scheduler) //指定
                    .map(String::toUpperCase)
                    .log().subscribe();
    
            Thread.sleep(1000);
        }
    /* 执行结果,因为比较快所以只用了一条线程
     [           main] reactor.Flux.Array.1                     : | onNext(a)
     [           main] reactor.Flux.Array.1                     : | onNext(b)
     [           main] reactor.Flux.Array.1                     : | onNext(c)
     [           main] reactor.Flux.Array.1                     : | onNext(d)
     [           main] reactor.Flux.Array.1                     : | onNext(e)
     [           main] reactor.Flux.Array.1                     : | onComplete()
     [    yy:Thread-2] reactor.Flux.MapFuseable.2               : | onNext(A)
     [    yy:Thread-2] reactor.Flux.MapFuseable.2               : | onNext(B)
     [    yy:Thread-2] reactor.Flux.MapFuseable.2               : | onNext(C)
     [    yy:Thread-2] reactor.Flux.MapFuseable.2               : | onNext(D)
     [    yy:Thread-2] reactor.Flux.MapFuseable.2               : | onNext(E)
    
  4. subscribeOn(Scheduler scheduler):指定订阅发生和上游操作执行的线程

        public void subscribeOn() throws InterruptedException {
            Scheduler scheduler = Schedulers.fromExecutor(new ThreadPoolExecutor(
                    4,
                    8,
                    60,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(100),
                    (r)->{
                        Thread thread = new Thread(r);
                        thread.setName("yy:"+ thread.getName());
                        return thread;
                    }));
            
            Flux.just("a", "b", "c", "d", "e")
                    .log()
                    .subscribeOn(scheduler)
                    .map(String::toUpperCase)
                    .log().subscribe();
            Thread.sleep(1000);
        }
    /* 使用subscribeOn后,会把上游的操作也全部使用指定的线程
     [    yy:Thread-1] reactor.Flux.Array.1                     : | onNext(a)
     [    yy:Thread-1] reactor.Flux.Map.2                       : onNext(A)
     [    yy:Thread-1] reactor.Flux.Array.1                     : | onNext(b)
     [    yy:Thread-1] reactor.Flux.Map.2                       : onNext(B)
     [    yy:Thread-1] reactor.Flux.Array.1                     : | onNext(c)
     [    yy:Thread-1] reactor.Flux.Map.2                       : onNext(C)
     [    yy:Thread-1] reactor.Flux.Array.1                     : | onNext(d)
     [    yy:Thread-1] reactor.Flux.Map.2                       : onNext(D)
     [    yy:Thread-1] reactor.Flux.Array.1                     : | onNext(e)
     [    yy:Thread-1] reactor.Flux.Map.2                       : onNext(E)
    
2.2.4、doOnxxx:感知事件相关的 API

在 Reactor 中,感知事件相关的 API 主要用于处理数据流中的特殊事件,例如数据流的完成、错误以及取消等。这些 API 可以帮助我们更好地控制数据流的行为,并对不同的事件做出相应的处理。

  1. doOnComplete(Runnable onComplete):当数据流完成时执行指定的操作,可以用于清理资源、记录日志等。

        public void doOnComplete() throws  InterruptedException {
            Flux<String> flux = Flux.just("a", "b", "c")
                    .map(String::toUpperCase)
                    .doOnComplete(() -> System.out.println("执行完成!"));
    
            flux.subscribe(v-> System.out.println("v1:"+v));
            flux.subscribe(v-> System.out.println("v2:"+v));
    
            Thread.sleep(1000);
        }
    /* 输出结果,会再消费完成时感知
    v1:A
    v1:B
    v1:C
    执行完成!
    v2:A
    v2:B
    v2:C
    执行完成!
    
  2. doOnError(Consumer<? super Throwable> onError):当数据流发生错误时执行指定的操作,用作处理异常

        public void doOnError() throws  InterruptedException {
            Flux<String> flux = Flux.just("a", "b", "c")
                    .map(v-> {
                        if (v.equals("b")) {
                            throw new RuntimeException("b");
                        }
                        return v;
                    })
                    .doOnError((v) -> System.out.println("发生异常==>"+v));
            flux.subscribe(v-> System.out.println("v1:"+v));
    
            Thread.sleep(1000);
        }
    /*
    v1:a
    发生异常==>java.lang.RuntimeException: b
    
  3. doOnEach(Consumer<? super Signal<T>> signalConsumer):每个元素(流的数据和信号)到达的时候触发

  4. doOnNext(Consumer<? super T> onNext):每个数据(流的数据)到达的时候触发

        public void doOnNext() throws  InterruptedException {
            Flux<String> flux = Flux.just("a", "b", "c")
                    .map(String::toUpperCase)
                    .doOnEach((v) -> System.out.println("Each读取到==>"+v))
                    .doOnNext((v) -> System.out.println("Next读取到==>"+v));
            flux.subscribe(v-> System.out.println("v1:"+v));
    
            Thread.sleep(1000);
        }
    /* doOnNext只能读取到元素,doOnEach可以获取到信号
    Each读取到==>doOnEach_onNext(A)
    Next读取到==>A
    v1:A
    Each读取到==>doOnEach_onNext(B)
    Next读取到==>B
    v1:B
    Each读取到==>doOnEach_onNext(C)
    Next读取到==>C
    v1:C
    Each读取到==>onComplete()
    
  5. doOnCancel(Runnable onCancel):流被取消时触发,如通过take再还没读取完就取消。正常结束不会被触发

  6. doFinally(Consumer<SignalType> onFinally):流被订阅执行完成终止时触发,包括正常结束和异常结束

        public void doOnCancel() throws  InterruptedException {
            Flux<String> flux = Flux.just("a", "b", "c")
                    .doOnCancel(() -> System.out.println("流被取消"))
                    .doFinally((v) -> System.out.println("执行结束"+v))
                    .take(2);
    
            flux.subscribe(v-> System.out.println("v1:"+v));
            flux.subscribe(v-> System.out.println("v2:"+v));
    
            Thread.sleep(1000);
        }
    /* 正常结束不会有doOnCancel的触发
    v1:a
    v1:b
    流被取消
    执行结束cancel
    v2:a
    v2:b
    流被取消
    执行结束cancel
    
  7. doOnRequest(LongConsumer consumer):流被订阅者请求数据时触发

  8. doOnSubscribe(Consumer<? super Subscription> onSubscribe):流被订阅开始订阅时触发

        public void doOnRequest() throws  InterruptedException {
            Flux<String> flux = Flux.just("a", "b", "c")
                    .map(String::toUpperCase)
                    .doOnRequest((n) -> System.out.println("流被请求"+n))
                    .doOnSubscribe((n) -> System.out.println("流被订阅"+n));
    
            flux.subscribe(v-> System.out.println("v1:"+v));
            flux.subscribe(v-> System.out.println("v2:"+v));
    
            Thread.sleep(1000);
        }
    /* 订阅者会需要先订阅在请求数据,所以订阅会被触发在前面
    流被订阅reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber@60e06f7d
    流被请求9223372036854775807
    v1:A
    v1:B
    v1:C
    流被订阅reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber@59b32539
    流被请求9223372036854775807
    v2:A
    v2:B
    v2:C
    
2.2.4、onErrorXXX:异常处理相关的 API
  1. onErrorReturn():吃掉异常,消费者无异常感知,返回一个兜底默认值,并结束flux流

        public void onErrorReturn() throws  InterruptedException {
            Flux<String> flux = Flux.just("a", "b", "c")
                    .map(v->{
                        if(v.equals("b"))
                            throw new RuntimeException("b");
                        else
                            return v;
                    }).onErrorReturn("Error"); //发生异常返回兜底
            flux.subscribe(v-> System.out.println("v1:"+v));
            
            Thread.sleep(1000);
        }
    /* 会返回兜底的异常输出
    v1:a
    v1:Error
    
  2. onErrorResume():当发生错误时,使用另一个 Flux 继续执行流程。相当于一个补偿结果

        public void onErrorResume() throws  InterruptedException {
            Flux<String> flux = Flux.just("a", "b", "c")
                    .map(v->{
                        if(v.equals("b"))
                            throw new RuntimeException("b");
                        else
                            return v;
                    }).onErrorResume(e->{
                        System.err.println("发生异常==>"+e);
                        return Flux.just("D","E");
                    });
            flux.subscribe(v-> System.out.println("v1:"+v));
    
            Thread.sleep(1000);
        }
    /* 发生异常后,原始的flux会停止,会去执行onErrorResume内部的flux
    v1:a
    发生异常==>java.lang.RuntimeException: b
    v1:D
    v1:E
    
  3. onErrorMap():捕获并包装成一个业务异常,并重新抛出,消费者有感知

       public void onErrorMap() throws  InterruptedException {
            Flux<String> flux = Flux.just("a", "b", "c")
                    .map(v->{
                        if(v.equals("b"))
                            throw new RuntimeException("b");
                        else
                            return v;
                    }).onErrorMap(RuntimeException.class,e -> new IllegalArgumentException("计算失败:"+e.getMessage())
    
                    );
    
            flux.subscribe(v-> System.out.println("v1:"+v));
    
            Thread.sleep(1000);
        }
    /** 会捕获异常,然后在转换为onErrorMap内部定义的异常返回
    v1:a
    reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: 计算失败:b
    Caused by: java.lang.IllegalArgumentException: 计算失败:b
    
  4. onErrorContinue():忽略当前异常,仅通知记录,继续推进

        public void onErrorContinue() throws  InterruptedException {
            Flux<String> flux = Flux.just("a", "b", "c")
                    .map(v->{
                        if(v.equals("b"))
                            throw new RuntimeException("b");
                        else
                            return v;
                    }).onErrorContinue((e, obj) ->  忽略错误,并打印错误信息和当前元素
                            System.err.println("发生异常: " + e + ", 数据: " + obj)
                    );
            flux.subscribe(v-> System.out.println("v1:"+v));
    
            Thread.sleep(1000);
        }
    /**
    v1:a
    发生异常: java.lang.RuntimeException: b, 数据: b
    v1:c
    
  5. retry():当发生错误时,重新订阅 Flux 流,最多尝试指定次数

       public void retry() throws  InterruptedException {
            Flux<String> flux = Flux.just("a", "b", "c")
                    .map(v->{
                        if(v.equals("b"))
                            throw new RuntimeException("b");
                        else
                            return v;
                    }).retry(2); //最多尝试两次
            flux.subscribe(v-> System.out.println("v1:"+v));
    
            Thread.sleep(1000);
        }
    /* 发生错误时尝试重新订阅,如果超过设置的次数,则抛出错误
    v1:a
    v1:a
    v1:a
    
    reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: b
    
2.2.4、其他工具 API
  1. buffer():缓冲指定元素再消费,缓冲区:缓冲n个元素: 消费一次最多可以拿到n个元素; 凑满数批量发给消费者。

  2. blockFirst():阻塞当前线程,直到 Flux 发出其第一个元素 ,然后返回该元素

        public void blockFirst() throws  InterruptedException {
            List<String> block = Flux.just("a", "b", "c","d","e")
                    .map(String::toUpperCase)
                    .buffer(3) //创建缓冲区,接收到3个元素转为集合在一起返回订阅者
                    .blockFirst(); //阻塞当前线程,收到第一个元素就结束阻塞
            System.out.println(block);
    
            Thread.sleep(1000);
        }
    /* 首先是通过buffer设置缓冲区,然后把前三个元素一起发生过来到blockFirst,所以接受到的第一个元素就是被buffer转为数组的元素
    [A, B, C]
    
  3. bufferUntilChanged():作用和buffer一样,也是设置缓冲区,只不过不是固定的长度,而是根据条件分隔

        public void bufferUntilChanged() throws  InterruptedException {
            Flux<List<String>> flux = Flux.just("a1", "b1", "c2", "d2", "e1")
                    .map(String::toUpperCase)
                    .bufferUntilChanged(i -> i.contains("2")); //设置分割缓冲区的条件
    
            flux.subscribe(v-> System.out.println("v1:"+v));
    
            Thread.sleep(1000);
        }
    /** 包含2则拆分,然后2结束拆分,不会去跳着拆分
    v1:[A1, B1]
    v1:[C2, D2]
    v1:[E1]
    
  4. cache():缓存数据,把订阅的数据缓存,可以设置缓存大小,当第二个订阅者来获取数据的时候就只能到缓存区去取数据

        public void cache() throws IOException {
            Flux<String> cache = Flux.just("a", "b", "c" ,"d" , "e")
                    .cache(2);
            cache.subscribe(v-> System.out.println("v1:"+v));
            cache.subscribe(v-> System.out.println("v2:"+v));
    
            System.in.read();
        }
    /* 订阅者1直接获取到全部数据,然后把数据缓存到缓冲区,但只要两个大小,所以订阅者2就只拿到了最后放入缓冲区的两个数据
    v1:a
    v1:b
    v1:c
    v1:d
    v1:e
    v2:d
    v2:e
    
    1. handle():自定义流中元素处理规则,内部可以自定义设置处理规则,然后再通过sink.next(),把处理后的数据发送到下一个节点
        public void handle() throws InterruptedException {
            Flux.just("a", "b", "c")
                    .delayElements(Duration.ofSeconds(1))
                    .handle((v, sink)->{
                        sink.next("自定义增强:"+v);
                    })
                    .subscribe(System.out::println);
    
            Thread.sleep(5000);
        }
    /* 获取结果
    自定义增强:a
    自定义增强:b
    自定义增强:c
    

2.3、订阅流

常用汇总
方法作用
collectList()把flux转为Mono的list。
block()阻塞当前线程等待完成。
subscribe()订阅流,订阅的时候可以有多种方式订阅。除了直接订阅的,还可以捕获异常,以及监听结束等。
详情使用
  1. collectList():把flux转为Mono的list。这个开发中会很常用

        public void collectList() throws InterruptedException {
            Mono<List<String>> mono = Flux.just("a", "b", "c", "d", "e")
                    .collectList();
    
            mono.subscribe(v-> System.out.println("v:"+v));
    
            Thread.sleep(1000);
        }
    /* 结果
    v:[a, b, c, d, e]
    
  2. block():阻塞当前线程等待完成。一般不建议这样写,这样就丧失了响应式编程的本质了

        public void block() throws InterruptedException {
            Mono<List<String>> mono = Flux.just("a", "b", "c", "d", "e")
                    .collectList();
    
    
            List<String> block = mono.block();
            System.out.println(block);
    
            Thread.sleep(1000);
        }
    /* 直接阻塞获取到结果
    [a, b, c, d, e]
    
  3. subscribe():订阅流,订阅的时候可以有多种方式订阅。除了前面用的,直接订阅的,还可以捕获异常,以及监听结束等。

        public void subscribe() throws InterruptedException {
            Flux<String> flux = Flux.just("a", "b", "c", "d", "e")
                    .map(String::toUpperCase);
    
            flux.subscribe(v-> System.out.println("v:"+v),
                    throwable -> System.out.println("异常"+throwable.getMessage()),
                    ()-> System.out.println("流执行完成!!"));
    
            Thread.sleep(1000);
        }
    /* 正常结束,如果我异常结束会被捕获到
    v:A
    v:B
    v:C
    v:D
    v:E
    流执行完成!!
    
  4. BaseSubscriber():subscribe还可以通过BaseSubscriber来自定义消费者。

       public void BaseSubscriber() throws InterruptedException {
            Flux<String> flux = Flux.just("a", "b", "c", "d", "e")
                    .map(String::toUpperCase);
            
    //        Flux<String> flux2 = Flux.just("a", "b", "c", "d", "e")
    //                .map(v->{
    //                    if (v.equals("b")) {throw new RuntimeException("b");}
    //                    return v;
    //                });
    
            flux.subscribe(new BaseSubscriber<String>(){
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                    System.out.println(Thread.currentThread()+"流开始了:"+subscription);
                    request(1);
                }
    
                @Override
                protected void hookOnNext(String value) {
                    System.out.println(Thread.currentThread()+"开始接收元素:"+value);
                    request(1); //持续接收元素
                }
    
                @Override
                protected void hookOnComplete() {
                    System.out.println(Thread.currentThread()+"流正常结束");
                }
    
                @Override
                protected void hookOnError(Throwable throwable) {
                    System.out.println(Thread.currentThread()+"流错误结束:"+throwable);
                }
    
                @Override
                protected void hookOnCancel() {
                    System.out.println(Thread.currentThread()+"流被取消");
                }
    
                @Override
                protected void hookFinally(SignalType type) {
                    System.out.println(Thread.currentThread()+"最终回调");
                }
            });
    
            Thread.sleep(6000);
        }
    /* 到对应的节点时会 触发对应的回调方法
    
    Thread[main,5,main]流开始了:reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@782e6b40
    Thread[main,5,main]开始接收元素:A
    Thread[main,5,main]开始接收元素:B
    Thread[main,5,main]开始接收元素:C
    Thread[main,5,main]开始接收元素:D
    Thread[main,5,main]开始接收元素:E
    Thread[main,5,main]流正常结束
    Thread[main,5,main]最终回调
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2107747.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024 高教社杯 数学建模国赛 (A题)深度剖析|“板凳龙” 闹元宵|数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2022年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题&#xff01; CS团队倾注了大量时间和心血&#xff0c;深入挖掘解…

设计模式之装饰器模式:让对象功能扩展更优雅的艺术

一、什么是装饰器模式 装饰器模式&#xff08;Decorator Pattern&#xff09;是一种结构型设计模式&#xff08;Structural Pattern&#xff09;&#xff0c;它允许用户通过一种灵活的方式来动态地给一个对象添加一些额外的职责。就增加功能来说&#xff0c;装饰器模式相比使用…

“Jmeter-InfluxDB-Grafana“常见错误有哪些如何解决?

常见错误&#xff1a; 1.网络不同&#xff0c;检查网络IP是否写对&#xff0c;端口号有没有放开&#xff08;Centos7端口号命令&#xff09;&#xff0c;防火墙是否关闭 firewall-cmd --add-port3000/tcp --permanent firewall-cmd --add-port3000/udp --permanent firewall-…

中国剩余定理和扩展中国剩余定理(模板)

给你一元线性同余方程组&#xff0c;如下&#xff1a; 其中&#xff0c;当 , , ... , 两两互质的话就是中国剩余定理 &#xff0c; 不互质的话就是扩展中国剩余定理。 给出中国剩余定理的计算过程和扩展中国剩余定理的推理过程&#xff1a; #include<bits/stdc.h> us…

MT3516A-ASEMI三相整流桥MT3516A

编辑&#xff1a;ll MT3516A-ASEMI三相整流桥MT3516A 型号&#xff1a;MT3516A 品牌&#xff1a;ASEMI 封装&#xff1a;D-63 批号&#xff1a;2024 类型&#xff1a;三相整流桥 电流&#xff08;ID&#xff09;&#xff1a;35A 电压(VF)&#xff1a;1600V 安装方式&a…

C++开发基础之宏定义:入门、中级、高级用法示例解析

前言 在C开发中&#xff0c;宏定义是一种非常重要的预处理功能&#xff0c;能够简化代码、提高可读性、减少重复性工作。然而&#xff0c;宏的使用也存在一些潜在的风险&#xff0c;滥用宏可能导致代码难以调试和维护。在这篇博客中&#xff0c;我们将从入门、中级到高级&…

【数据库|第9期】SQL Server、Access和Sqlite 的字段别名详解

日期&#xff1a;2024年8月28日 作者&#xff1a;Commas 签名&#xff1a;(ง •_•)ง 积跬步以致千里,积小流以成江海…… 注释&#xff1a;如果您觉得有所帮助&#xff0c;帮忙点个赞&#xff0c;也可以关注我&#xff0c;我们一起成长&#xff1b;如果有不对的地方&#xf…

redis缓存的目的、场景、实现、一致性问题

文章目录 1、加缓存的目的&#xff08;作用&#xff09;&#xff1a;2、加缓存的场景&#xff1a;读多写少3、加不加缓存的标准&#xff1a;4、缓存的实现&#xff1a;5、缓存的实现方案&#xff1a;6、缓存的粒度问题7、缓存的一致性问题 专辑详情和声音详情属于并发量较高的数…

2024 高教社杯 数学建模国赛 (B题)深度剖析|生产过程中的决策问题|数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2022年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题&#xff01; CS团队倾注了大量时间和心血&#xff0c;深入挖掘解…

入门数据结构JAVA DS——如何实现简易的单链表(用JAVA实现)

前言 链表&#xff08;Linked List&#xff09;是一种线性数据结构&#xff0c;它由一系列节点组成&#xff0c;每个节点包含两个部分&#xff1a;存储数据的部分和指向下一个节点的指针&#xff08;或引用&#xff09;。链表的结构使得它能够动态地增长和收缩&#xff0c;适合…

Python操作ES集群API

前言&#xff1a;本博客仅作记录学习使用&#xff0c;部分图片出自网络&#xff0c;如有侵犯您的权益&#xff0c;请联系删除 学习B站博主教程笔记&#xff1a; 最新版适合自学的ElasticStack全套视频&#xff08;Elk零基础入门到精通教程&#xff09;Linux运维必备—Elastic…

光明乳业以“轻”礼庆团圆!第七届莫斯利安保加利亚国际酸奶文化节圆满落幕

近日&#xff0c;第七届莫斯利安保加利亚国际酸奶文化节圆满落下帷幕。今年国际酸奶文化节恰逢中秋佳节之际&#xff0c;光明莫斯利安联合上海博物馆&#xff0c;以其缂丝馆藏《灵仙祝寿图》为灵感&#xff0c;推出了一系列联名限定产品和周边&#xff0c;寓意健康团圆长长久久…

Elastic Stack--ES的DSL语句查询

前言&#xff1a;本博客仅作记录学习使用&#xff0c;部分图片出自网络&#xff0c;如有侵犯您的权益&#xff0c;请联系删除 学习B站博主教程笔记&#xff1a; 最新版适合自学的ElasticStack全套视频&#xff08;Elk零基础入门到精通教程&#xff09;Linux运维必备—Elastic…

工业边缘网关:智能制造的实时数据枢纽-天拓四方

在工业4.0的浪潮中&#xff0c;工业边缘网关已成为智能制造和工业物联网&#xff08;IIoT&#xff09;领域的关键技术。作为连接工业现场设备与云端平台的桥梁&#xff0c;边缘网关实现了数据的实时采集、处理和传输&#xff0c;为企业的生产管理和决策提供了重要支持。本文将重…

如何查看Pod的Container资源占用情况

云原生学习路线导航页&#xff08;持续更新中&#xff09; 方法一&#xff1a;直接查看pod的资源占用 kubectl top pods ${pod-name} -n ${ns} 方法二&#xff1a;通过运行的进程&#xff0c;查看pod的某个容器资源占用 1.找到pod所在node容器号&#xff1a;kubectl descri…

【Fastapi】使用Toml作为配置文件格式

【Fastapi】使用Toml作为配置文件格式 giteegithubtoml介绍我为什么用 toml作为配置文件格式具体使用&#xff08;没提到的请参考[官网](https://toml.io/cn/v1.0.0)&#xff09;文件格式代码中使用 gitee https://gitee.com/zz1521145346/fastapi_frame.git github https:/…

从羊城杯docCrack学习恶意宏

前言 一道涉及恶意宏的逆向题目&#xff0c;不算难。 知识点 关于OLE文件 office文档&#xff08;如.doc、.ppt、.xls等&#xff09;其实都是复合文档&#xff08;OLE&#xff09;&#xff0c;该文件格式全称为OLE复合文档格式&#xff0c;它允许多个数据流和存储在单个文件…

昂科烧录器支持ALLYSTAR华大北斗的GNSS芯片HD8020

芯片烧录行业领导者-昂科技术近日发布最新的烧录软件更新及新增支持的芯片型号列表&#xff0c;其中ALLYSTAR华大北斗的GNSS芯片HD8020已经被昂科的通用烧录平台AP8000所支持。 HD8020是一款能够实现单芯片解决方案&#xff0c;满足位置感知、物流运输等导航定位需求的SOC芯片…

java Abstract Queued Synchronizer

AbstractQueuedSynchronizer&#xff08;简称 AQS&#xff09;是 Java 中用于实现锁和同步器的一个基础框架&#xff0c;位于 java.util.concurrent.locks 包中。它提供了一种基于 FIFO&#xff08;先进先出&#xff09;队列的机制&#xff0c;帮助构建多线程之间的同步工具&am…

谈一谈MVCC

一 MVCC的定义 MVCC&#xff08;Multi-Version Concurrency Control&#xff0c;多版本并发控制&#xff09;是一种用于数据库管理系统&#xff08;DBMS&#xff09;中的并发控制方法&#xff0c;它允许数据库读写操作不加锁地并发执行&#xff0c;从而提高了数据库系统的并发性…