Spring5 框架新功能(Webflux)

news2024/11/24 12:39:49

目录

1、SpringWebflux 介绍

2、响应式编程(Java 实现)

3、响应式编程(Reactor 实现)


1、SpringWebflux 介绍

(1)SpringWebflux 是 Spring5 添加新的模块,用于 web 开发的,功能和 SpringMVC 类似,Webflux 使用当前一种比较流行的响应式编程出现的框架。

(2)使用传统 web 框架,比如 SpringMVC,是基于 Servlet 容器,而Webflux 是一种异步非阻
塞的框架,异步非阻塞的框架在 Servlet3.1 以后才支持,核心是基于 Reactor 的相关 API 实现
的。
(3)什么是异步非阻塞
* 异步和同步
* 非阻塞和阻塞
** 上面都是针对对象不一样
** 异步和同步针对调用者,调用者发送请求,如果等着对方回应之后才去做其他事情就是同
步,如果发送请求之后不等着对方回应就去做其他事情就是异步
** 阻塞和非阻塞针对被调用者,被调用者受到请求之后,做完请求任务之后才给出反馈就是阻
塞,受到请求之后马上给出反馈然后再去做事情就是非阻塞
(4)Webflux 特点
第一 非阻塞式:在有限资源下,提高系统吞吐量和伸缩性,以 Reactor 为基础实现响应式编程
第二 函数式编程:Spring5 框架基于 java8,Webflux 使用 Java8 函数式编程方式实现路由请求
(5)比较 SpringMVC
第一 两个框架都可以使用注解方式,都运行在 Tomcat 等容器中
第二 SpringMVC 采用命令式编程,Webflux 采用异步响应式编程

2、响应式编程(Java 实现)

(1)什么是响应式编程
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似 "=B1+C1" 的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。
(2)Java8 及其之前版本
* 提供的观察者模式两个类 Observer 和 Observable
public class ObserverDemo extends Observable {

    public static void main(String[] args) {
        ObserverDemo observer = new ObserverDemo();
        //观察者
        observer.addObserver((o, arg) ->{
            System.out.println("数据发生改变");
        });

        observer.addObserver((o, arg) -> {
            System.out.println("收到被观察者通知,准备改变");
        });

        observer.setChanged();

        observer.notifyObservers();
    }
}

java9中,Observer 和 Observable被Flow替代,而Flow是真正的响应式编程

public final class Flow {

    private Flow() {} // uninstantiable

    /**
     * A producer of items (and related control messages) received by
     * Subscribers.  Each current {@link Subscriber} receives the same
     * items (via method {@code onNext}) in the same order, unless
     * drops or errors are encountered. If a Publisher encounters an
     * error that does not allow items to be issued to a Subscriber,
     * that Subscriber receives {@code onError}, and then receives no
     * further messages.  Otherwise, when it is known that no further
     * messages will be issued to it, a subscriber receives {@code
     * onComplete}.  Publishers ensure that Subscriber method
     * invocations for each subscription are strictly ordered in <a
     * href="package-summary.html#MemoryVisibility"><i>happens-before</i></a>
     * order.
     *
     * <p>Publishers may vary in policy about whether drops (failures
     * to issue an item because of resource limitations) are treated
     * as unrecoverable errors.  Publishers may also vary about
     * whether Subscribers receive items that were produced or
     * available before they subscribed.
     *
     * @param <T> the published item type
     */
    @FunctionalInterface
    public static interface Publisher<T> {
        /**
         * Adds the given Subscriber if possible.  If already
         * subscribed, or the attempt to subscribe fails due to policy
         * violations or errors, the Subscriber's {@code onError}
         * method is invoked with an {@link IllegalStateException}.
         * Otherwise, the Subscriber's {@code onSubscribe} method is
         * invoked with a new {@link Subscription}.  Subscribers may
         * enable receiving items by invoking the {@code request}
         * method of this Subscription, and may unsubscribe by
         * invoking its {@code cancel} method.
         *
         * @param subscriber the subscriber
         * @throws NullPointerException if subscriber is null
         */
        public void subscribe(Subscriber<? super T> subscriber);
    }

    /**
     * A receiver of messages.  The methods in this interface are
     * invoked in strict sequential order for each {@link
     * Subscription}.
     *
     * @param <T> the subscribed item type
     */
    public static interface Subscriber<T> {
        /**
         * Method invoked prior to invoking any other Subscriber
         * methods for the given Subscription. If this method throws
         * an exception, resulting behavior is not guaranteed, but may
         * cause the Subscription not to be established or to be cancelled.
         *
         * <p>Typically, implementations of this method invoke {@code
         * subscription.request} to enable receiving items.
         *
         * @param subscription a new subscription
         */
        public void onSubscribe(Subscription subscription);

        /**
         * Method invoked with a Subscription's next item.  If this
         * method throws an exception, resulting behavior is not
         * guaranteed, but may cause the Subscription to be cancelled.
         *
         * @param item the item
         */
        public void onNext(T item);

        /**
         * Method invoked upon an unrecoverable error encountered by a
         * Publisher or Subscription, after which no other Subscriber
         * methods are invoked by the Subscription.  If this method
         * itself throws an exception, resulting behavior is
         * undefined.
         *
         * @param throwable the exception
         */
        public void onError(Throwable throwable);

        /**
         * Method invoked when it is known that no additional
         * Subscriber method invocations will occur for a Subscription
         * that is not already terminated by error, after which no
         * other Subscriber methods are invoked by the Subscription.
         * If this method throws an exception, resulting behavior is
         * undefined.
         */
        public void onComplete();
    }

    /**
     * Message control linking a {@link Publisher} and {@link
     * Subscriber}.  Subscribers receive items only when requested,
     * and may cancel at any time. The methods in this interface are
     * intended to be invoked only by their Subscribers; usages in
     * other contexts have undefined effects.
     */
    public static interface Subscription {
        /**
         * Adds the given number {@code n} of items to the current
         * unfulfilled demand for this subscription.  If {@code n} is
         * less than or equal to zero, the Subscriber will receive an
         * {@code onError} signal with an {@link
         * IllegalArgumentException} argument.  Otherwise, the
         * Subscriber will receive up to {@code n} additional {@code
         * onNext} invocations (or fewer if terminated).
         *
         * @param n the increment of demand; a value of {@code
         * Long.MAX_VALUE} may be considered as effectively unbounded
         */
        public void request(long n);

        /**
         * Causes the Subscriber to (eventually) stop receiving
         * messages.  Implementation is best-effort -- additional
         * messages may be received after invoking this method.
         * A cancelled subscription need not ever receive an
         * {@code onComplete} or {@code onError} signal.
         */
        public void cancel();
    }

    /**
     * A component that acts as both a Subscriber and Publisher.
     *
     * @param <T> the subscribed item type
     * @param <R> the published item type
     */
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
    }

    static final int DEFAULT_BUFFER_SIZE = 256;

    /**
     * Returns a default value for Publisher or Subscriber buffering,
     * that may be used in the absence of other constraints.
     *
     * @implNote
     * The current value returned is 256.
     *
     * @return the buffer size value
     */
    public static int defaultBufferSize() {
        return DEFAULT_BUFFER_SIZE;
    }

}
public class Main {

    public static void main(String[] args) {
        Flow.Publisher<String> publisher = subscriber -> {
            subscriber.onNext("1"); // 1
            subscriber.onNext("2");
            subscriber.onError(new RuntimeException("出错")); // 2
            //  subscriber.onComplete();
        };


        publisher.subscribe(new Flow.Subscriber<>() {
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscription.cancel();
            }

            @Override
            public void onNext(String item) {
                System.out.println(item);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("出错");
            }

            @Override
            public void onComplete() {
                System.out.println("publish complete");
            }
        });
    }
}

3、响应式编程(Reactor 实现)

(1)响应式编程操作中,Reactor 是满足 Reactive 规范框架
(2)Reactor 有两个核心类,Mono 和 Flux,这两个类实现接口 Publisher,提供丰富操作
符。Flux 对象实现发布者,返回 N 个元素;Mono 实现发布者,返回 0 或者 1 个元素
(3)Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉
订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者

示例图:

 

 (4)代码演示 Flux 和 Mono

第一步 引入依赖
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.1.5.RELEASE</version>
        </dependency>
第二步 编程代码
public class TestReator {

    public static void main(String[] args) {
        //just 方法直接声明
        Flux.just(1,2,3,4);
        Mono.just(1);
        //其他的方法
        Integer[] array = {1, 2, 3, 4};
        Flux.fromArray(array);

        List<Integer> list = Arrays.asList(array);
        Flux.fromIterable(list);
        Stream<Integer> stream = list.stream();
        Flux.fromStream(stream);
    }
}
(5)三种信号特点
* 错误信号和完成信号都是终止信号,不能共存的
* 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
* 如果没有错误信号,没有完成信号,表示是无限数据流
(6)调用 just 或者其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触
发数据流,不订阅什么都不会发生的
        Flux.just(1,2,3,4).subscribe(System.out::println);
        Mono.just(1).subscribe(System.out::println);

(7)操作符
* 对数据流进行一道道操作,成为操作符,比如工厂流水线
第一 map 元素映射为新元素

 

第二 flatMap 元素映射为流
* 把每个元素转换流,把转换之后多个流合并大的流

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/188994.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JDBC用PrepareStatement解决SQL注入

什么是sql注入&#xff1f;SQL注入(SQL injection)是因为应用程序在执行SQL语句的时候没有正确的处理用户输入字符串&#xff0c;将用户输入的恶意字符串拼接到了SQL语句中执行&#xff0c;从而导致了SQL注入。例如&#xff1a;当你的用户名为 qwert or 11;# 密码为123&#xf…

springmvc拦截器及源码分析

springmvc拦截器是我们项目开发中用到的一个功能&#xff0c;常常用于对Handler进行预处理和后处理。本案例来演示一个较简单的springmvc拦截器的使用&#xff0c;并通过分析源码来探究拦截器的执行顺序是如何控制的。1、springmvc拦截器使用1.1 项目初始搭建1.1.1 创建一个mav…

如何用ffmpeg截取视频片段截取时间不准确的坑

之前在工作中&#xff0c;有遇到需要程序化截取视频片段的场景&#xff0c;这里使用ffmpeg命令行就可以很容易实现&#xff0c;这里也记录下我们使用过程中遇到的坑&#xff0c;希望对大家也有所帮助。    举个例子&#xff0c;当我们要截取视频文件中input.mp4的第15秒到第9…

windows+python+bleak+BLE低功耗蓝牙通讯连接

前言 1.为什么选bleak   参考这篇知乎&#xff1a;https://zhuanlan.zhihu.com/p/577687336   windows端使用python连接常规的BLE设备&#xff08;蓝牙4.0&#xff09;&#xff0c;仅考虑bleak模块&#xff08;排除pybluez、pybluez2、pygatt&#xff09;。 2.本文主要参…

【c语言】对结构体数组按照某项规则进行排序

这是基于qsort()函数进行的简单排序。&#xff08;附带其他类型的数组使用qsort()进行的排序&#xff09; 目录 一、qsort()函数 二、compare()函数 1.结构体数组 1&#xff09;升序实现 2&#xff09;降序实现 2.整型数组 为什么不直接返回 a>b&#xff08;a&#x…

CentOS 下PostgreSQL安装、简单配置及数据迁移(存储目录迁移)

目录 数据库安装 数据库初始化 配置修改 1、修改监听范围 2、修改数据库用户密码 3、开启远程连接 附件内容&#xff1a;PostgreSQL数据迁移 方式一&#xff0c;从新初始化数据库在导出导入 方式二&#xff1a;存储文件物理迁移 数据库安装 安装包下载请参考PG官网(根据…

OAuth2介绍(一)

目录 1. 什么是OAuth2.0 2. OAuth2中的角色 3. 认证流程 4. 生活中的Oauth2思维 5. 令牌的特点 6. OAuth2授权方式 6.1 授权码 6.2 隐藏方式 6.3 密码方式 6.4 凭证方式 7. 流程 7.1 资源所有者 7.2 客户 7.3 客户 7.4 认证服务器 7.5 客户 7.6 资源服务器 1.…

【高并发】- 分布式事务都不会?

前言 本章主要对分布式事务进行梳理和讲解。可能在业务设计过程中&#xff0c;各微服务都采用了独立数据库&#xff0c;所以&#xff0c;这些微服务之间的数据共享有了更高的要求&#xff1a;要解决数据一致性的问题。 1. 数据一致性 数据一致性是指&#xff1a;数据被多次操作…

【自然语言处理】主题建模评估:连贯性分数(Coherence Score)

主题建模评估&#xff1a;连贯性分数&#xff08;Coherence Score&#xff09;1.主题连贯性分数 主题连贯性分数&#xff08;Coherence Score&#xff09;是一种客观的衡量标准&#xff0c;它基于语言学的分布假设&#xff1a;具有相似含义的词往往出现在相似的上下文中。 如果…

如何使用ArcGIS计算道路中心线

1.概述 在制图等应用的时候&#xff0c;有时需要将双线的面状道路提取中心线&#xff0c;转换为线状的道路。 由于道路多为不规则的图形&#xff0c;提取难度比较高&#xff0c;加上能提取中心线的软件有限&#xff0c;更加增加了提取的难度。 ArcGIS虽然提供了提取中心线的…

C语言文件操作(二)

文件的随机读写fseek函数#include <stdio.h>int main() {FILE* pf fopen("test.txt", "r");if (NULL pf){perror("fopen");return 1;}char ch fgetc(pf);printf("%c\n", ch);fseek(pf, 2, SEEK_SET);ch fgetc(pf);printf(&q…

Mysql第四期 运算符规则计算】

文章目录写在前面1.算数运算符2.比较运算符3.逻辑运算符4.位运算符5.运算符的优先级拓展&#xff1a;使用正则表达式查询写在前面 基本的运算符号在计算机编程领域都是相通的&#xff0c;会有自己的一些特定符号语言&#xff0c;就像是各地的普通话一样&#xff0c;尽管语音描…

C语言小题,又3个学生的信息,放在结构体数组中,要求输出全部学生的信息。(指向结构体数组的指针)

前言&#xff1a; 此篇是针对 指向结构体数组的指针 方面的练习。 解题思路&#xff1a; 用指向结构体变量的指针来处理&#xff1a; &#xff08;1&#xff09;声明结构体类型 struct Student &#xff0c;并定义结构体数组&#xff0c;同时使之初始化&#xff1b; &#xff…

SpringBoot项目如何引入外部jar及将外部jar打包到项目发布war包

1 Springboot项目如何打成war包 1.1 环境准备 打包成war整体思路就是排查web容器依赖&#xff0c;添加maven-war-plugin插件。接下来就使用Tomcat容器给大家做个示范&#xff0c;亲测有效。 在讲解下说明一下环境&#xff0c;避免因为环境的问题&#xff0c;给大家带来不必要…

设计师都在用的5个设计素材库

作为一名设计师推荐几个设计素材网站&#xff0c;建议收藏起来&#xff01; 1、菜鸟图库 https://www.sucai999.com/?vNTYxMjky 站内平面海报、UI设计、电商淘宝、高清图片、样机模板等素材非常齐全。还有在线抠图、CDR版本转换功能&#xff0c;能有效的为设计师节省找素材时…

嵌入式Linux-线程属性

1. 线程的属性 1.1 概念 如前所述&#xff0c;调用 pthread_create()创建线程&#xff0c;可对新建线程的各种属性进行设置。在 Linux 下&#xff0c;使用pthread_attr_t 数据类型定义线程的所有属性。 调用 pthread_create()创建线程时&#xff0c;参数 attr 设置为 NULL&a…

Three.js 初阶入门篇(一)

系列文章目录 文章目录系列文章目录学习背景一、什么是3D&#xff08;直接看作品吧&#xff09;&#xff1f;汽车作品欣赏鼠标可以随意转动角度打开机盖&#xff08;交互效果&#xff09;尾部3D链接&#x1f517;如下&#xff08;链接打开会有一些慢&#xff09;二、如何创建一…

零入门容器云网络实战-7->Mac环境下为虚拟机磁盘空间进行扩容

在Mac环境下&#xff0c;使用PD软件创建的虚拟机磁盘空间不够时&#xff0c;如何扩容呢&#xff1f; 主要分两大步骤&#xff1a; 先通过PD界面&#xff0c;设置增加多少空间进入虚拟机里&#xff0c;通过fdisk等相关命令&#xff0c;使其增加的空间生效 1、第一大步&#xf…

机器学习之线性模型

定义 线性模型非常常见&#xff0c;但详细了解其中原理是必要的。 一般将样本特征进行线性组合达到预测的目标&#xff0c;如表达式yf(X;W)byf(X;W)byf(X;W)b,其中XXX为输入的样本数据&#xff0c;WWW为权重系数&#xff0c;bbb为偏置系数。 如对于图片样本&#xff0c;一种…

兔年春晚一大怪像,影视演员变成了万能,专业歌手却被晾在一边

怪事年年有&#xff0c;今年特别多。谁也没有想到&#xff0c;兔年春节还没有过去&#xff0c;就出现了一种奇怪的现象。中央电视台春晚&#xff0c;曾经执全国春晚之牛耳&#xff0c;然而谁能想到&#xff0c;四十多年后的今天&#xff0c;曾经的扛把子却变成了鸡肋。 今年央视…