(一)基于Spring Reactor框架响应式异步编程|道法术器

news2024/11/25 16:18:26

                       


Spring WebFlux 响应式异步编程|道法术器(一)

 Spring WeFlux响应式编程整合另一种方案|道法术器(二)

 


R2DBC简介

Spring data R2DBC是更大的Spring data 系列的一部分,它使得实现基于R2DBC的存储库变得容易。R2DBC代表反应式关系数据库连接,这是一种使用反应式驱动程序集成SQL数据库的规范。Spring Data R2DBC使用属性的Spring抽象和Repository支持应用于R2DBC。它使得在反应式应用程序堆栈中使用关系数据访问技术构建Spring驱动的应用程序变得更加容易。

Spring Data R2DBC的目标是在概念上变得简单。为了实现这一点,它不提供缓存、延迟加载、写后处理或ORM框架的许多其他特性。这使得Spring Data R2DBC成为一个简单、有限、固执己见的对象映射器。

Spring Data R2DBC允许一种 functional 方法与数据库交互,提供R2dbcEntityTemplate作为应用程序的入口点。

首先选择数据库驱动程序并创建R2dbcEntityTemplate实例


使用传统 web 框架,比如 SpringMVC,这些基于 Servlet 容器。
Webflux 是一种异步非阻塞的框架,异步非阻塞的框架在 Servlet3.1 以后才支持,核心是基于 Reactor 的相关 API 实现的。
什么是异步非阻塞
异步和同步
异步和同步针对调用者,调用者发送请求,如果等着对方回应之后才去做其他事情就是同步,如果发送请求之后不等着对方回应就去做其他事情就是异步。
非阻塞和阻塞
阻塞和非阻塞针对被调用者,被调用者收到请求之后,做完请求任务之后才给出反馈就是阻塞,收到请求之后马上给出反馈然后再去做事情就是非阻塞。
Webflux 特点:
第一 非阻塞式:在有限资源下,提高系统吞吐量和伸缩性,以 Reactor 为基础实现响应式编程。
第二 函数式编程:Spring5 框架基于 java8,Webflux 使用 Java8 函数式编程方式实现路由请求。

 


Spring Data R2DBC可以与Spring Data JPA结合使用,其实R2DBC与原来的JPA使用方式差别不大,使用非常简单。
只是Spring Data JPA中方法返回的是真实的值,而R2DBC中,返回的是数据流Mono,Flux。

简单介绍一个Spring Data JPA。Spring Data JPA是Spring基于ORM框架、JPA规范的基础上封装的一套 JPA (Java Persistence API) 应用框架,简单说,就是类似Mybatis,Hibernate的框架(Spring Data JPA底层通过Hibernate操作数据库)。

Repository是Spring Data R2DBC中的重要概念,封装了对一个实体的操作,相当于一个dao(Data Access Object,数据访问对象)
 


官网连接:Spring Data R2DBC - Reference Documentation


5. Requirements

The Spring Data R2DBC 1.x binaries require:

  • JDK level 8.0 and above

  • Spring Framework 5.3.8 and above

  • R2DBC Arabba-SR10 and above

  • 这是官网对搭建非阻塞响应式编程的环境要求:


 一,本节将从简单的搭建开,体验下响应式非阻塞编程的大致概况:

    1.1 搭建环境:

    

<!--设置spring-boot依赖的版本 -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.6.5</version> <!--2.4.11-->
    <relativePath/> <!-- lookup parent from repository -->
</parent>

  

<!-- 响应式编程集成-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql-connector-java.version}</version>
</dependency>
<!--R2DBC是基于Reactive Streams标准来设计的。通过使用R2DBC,你可以使用reactive API来操作数据。同时R2DBC只是一个开放的标准,而各个具体的数据库连接实现,需要实现这个标准。-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
    <groupId>com.github.jasync-sql</groupId>
    <artifactId>jasync-r2dbc-mysql</artifactId>
    <version>1.2.3</version>
</dependency>

额外可有可无
<!--reactor-test测试相关类-->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
</dependency>

 第二: 基础配置application.yml文件

server:
  port: 9999
  servlet:
    context-path: /
spring:
  #连接数据库的url,前缀不再是jdbc而是换成r2dbc
  #这里可以配置连接池相关的其它属性,这里为了简洁不配置
  r2dbc:
    url: mysql://localhost:3306/tope-pay-user?&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&useSSL=false
    username: root
    password: 123456

logging:
  level:
    org.springframework.r2dbc: INFO  #输出执行的sql
    org.springframework.cloud.web.reactive: info
    reactor.ipc.netty: info










第三: javaConfig文件编写,读取初始化化R2dbc连接的相关参数

package org.jd.websocket.auth.data.reactor.config;

import com.github.jasync.r2dbc.mysql.JasyncConnectionFactory;
import com.github.jasync.sql.db.mysql.pool.MySQLConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.r2dbc.R2dbcProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.core.R2dbcEntityOperations;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.r2dbc.connection.R2dbcTransactionManager;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import java.net.URI;
import java.net.URISyntaxException;

/**
 * R2dbcProperties 看源代码中,数据库连接池的相关配置
 */
@Configuration
@EnableTransactionManagement // 开启事务的支持
public class DatabaseConfiguration {

    @Bean
    @Qualifier("mysqlConnectionFactory")
    public ConnectionFactory connectionFactory(R2dbcProperties properties) throws URISyntaxException {
        // 从R2dbcProperties中,解析出 host、port、database
        URI uri = new URI(properties.getUrl());
        String host = uri.getHost();
        int port = uri.getPort();
        String database = uri.getPath().substring(1); // 去掉首位的 / 斜杠
        // 创建 Configuration 配置配置对象
        com.github.jasync.sql.db.Configuration configuration = new com.github.jasync.sql.db.Configuration(
                properties.getUsername(), host, port, properties.getPassword(), database);
        // 创建 ConnectionFactory 对象
        JasyncConnectionFactory jasyncConnectionFactory = new JasyncConnectionFactory(new MySQLConnectionFactory(configuration));
        return jasyncConnectionFactory;
    }

    @Bean
    public R2dbcEntityOperations mysqlR2dbcEntityOperations(@Qualifier("mysqlConnectionFactory") ConnectionFactory connectionFactory) {
        return new R2dbcEntityTemplate(connectionFactory);
    }

    @Bean
    public ReactiveTransactionManager transactionManager(R2dbcProperties properties) throws URISyntaxException {
        return new R2dbcTransactionManager(this.connectionFactory(properties));
    }
}

四:数据持久层: 响应式非阻塞编程 

package org.jd.websocket.auth.data.reactor.repository;


import org.jd.websocket.auth.data.reactor.entity.RSysSystem;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.data.repository.reactive.ReactiveSortingRepository;
import org.springframework.stereotype.Repository;


/**
 * 持久层:非阻塞异步访问
 */
@Repository
public interface RSysSystemReactiveRepository extends ReactiveCrudRepository<RSysSystem, Long>, ReactiveSortingRepository<RSysSystem, Long> {
}

五:业务层:

package org.jd.websocket.auth.data.reactor.service;

import org.jd.websocket.auth.data.reactor.entity.RSysSystem;
import reactor.core.publisher.Mono;


public interface RSysSystemService {
    /**
     * 通过ID查找单条记录
     *
     * @param systemId 系统服务ID
     * @return {@link Mono<RSysSystem>}
     */
    Mono<RSysSystem> findById(Long systemId);

    /**
     * 插入记录信息
     *
     * @param system
     * @return {@link Mono<RSysSystem>)
     */
    Mono<RSysSystem> insert(RSysSystem system);

    /**
     * 通过ID查询是否存在记录
     *
     * @param systemId 系统ID
     * @return {@link Mono<Boolean>}
     */
    Mono<Boolean> exists(Long systemId);

    /**
     * 查询记录数
     *
     * @return {@link Mono<Long>}
     */
    Mono<Long> count();
}

package org.jd.websocket.auth.data.reactor.service.impl;f

import lombok.extern.slf4j.Slf4j;
import org.jd.websocket.auth.data.reactor.entity.RSysSystem;
import org.jd.websocket.auth.data.reactor.repository.RSysSystemReactiveRepository;
import org.jd.websocket.auth.data.reactor.service.RSysSystemService;
import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;


/**
 * 构建全调用链路异步响应式编程
 * 系统响应式查询服务
 */
@Slf4j
@Service
public class RSysSystemServiceImpl implements RSysSystemService {
    
    @Resource
    private RSysSystemReactiveRepository sysSystemReactiveRepository;


    @Override
    public Mono<RSysSystem> findById(Long systemId) {
        return sysSystemReactiveRepository.findById(systemId);
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Mono<RSysSystem> insert(RSysSystem system) {
        return sysSystemReactiveRepository.save(system);
    }

    @Override
    public Mono<Boolean> exists(Long systemId) {
        return sysSystemReactiveRepository.existsById(systemId);
    }

    @Override
    public Mono<Long> count() {
        return sysSystemReactiveRepository.count();
    }
}

 六:服务器访问层

基于注解方式编程

package org.jd.websocket.auth.data.reactor.controller;


import org.jd.websocket.auth.data.reactor.entity.RSysSystem;
import org.jd.websocket.auth.data.reactor.service.RSysSystemService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;


import javax.annotation.Resource;


@RestController
@RequestMapping("/system")
public class SysSystemController {
    @Resource
    private RSysSystemService rSysSystemService;

    @GetMapping("/getSysSystemById/{systemId}")
    public Mono<RSysSystem> getSySystem(@PathVariable("systemId") Long systemId) {
        Mono<RSysSystem> result = rSysSystemService.findById(systemId);
        System.out.println("result:" + result.toString());
        return result;
    }


}

七: 领域模型类

package org.jd.websocket.auth.data.reactor.entity;

import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;
import org.springframework.data.annotation.Version;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;
import org.springframework.format.annotation.DateTimeFormat;

/**
 * 属性上的注解使用Spring-data中的相关注解
 */
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;

@Data
@RequiredArgsConstructor
@Table(value = "sys_system")
public class RSysSystem implements Serializable {
    @Transient
    private static final long serialVersionUID = 7481799808203597699L;

    // 主键自增
    @Id
    @Column(value = "system_id")
    private Long systemId;

    /**
     * 系统名称
     * 字段映射和约束条件
     * //对应数据库表中哪个列字段及对该字段的自定义
     */
    @Column(value = "system_name")
    private String systemName;

    /**
     * 详细功能描述: 描述该系统主要包含那些那些模块,每个模块的大致功能
     */
    @Column(value = "system_detail_desc")
    private String systemDetailDesc;
    /**
     * 系统跳转到功能版块路径
     */
    @Column(value = "path_function_url")
    private String pathFunctionUrl;
    /**
     * 系统包含那些模块
     * 该字段不参与数据库映射
     */
    @Transient
    private List<RSysModule> sysModules;
    /**
     *
     * 创建时间
     */


    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @Column(value = "create_time")
    private LocalDateTime createTime;
    /**
     * 更新时间
     */

    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @Column(value = "update_time")
    private LocalDateTime updateTime;
    /**
     * 版本号(用于乐观锁, 默认为 1)
     * 使用 @Version 注解标注对应的实体类。
     * 可以通过 @TableField 进行数据自动填充。
     */
    @Version
    private Integer version;
}

测试脚本:


CREATE TABLE `sys_system` (
  `system_id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '系统主键',
  `system_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '系统短名称',
  `system_detail_desc` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '系统简介',
  `path_function_url` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '' COMMENT '跳转到版块页路径',
  `create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号',
  PRIMARY KEY (`system_id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

 

运行测试结果:

http://localhost:9999/system/getSysSystemById/1


 可能会遇到时间字段(LocalDateTime)转换的问题:使用下面的配置转换类即可

package org.jd.websocket.auth.data.reactor.config;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.time.*;
import java.time.format.DateTimeFormatter;

@Configuration
public class LocalDateTimeSerializerConfig {
    @Bean
    public Jackson2ObjectMapperBuilderCustomizer jackson2ObjectMapperBuilderCustomizer() {
        return builder -> {
            //序列化
            builder.serializerByType(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            //反序列化
            builder.deserializerByType(LocalDateTime.class, new LocalDateTimeDeserializer());
        };
    }

    // 反序列化
    public static class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {
        @Override
        public LocalDateTime deserialize(JsonParser p, DeserializationContext deserializationContext)
                throws IOException {
            long timestamp = p.getValueAsLong();
            if (timestamp > 0) {
                return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
            } else {
                return null;
            }
        }
    }
}

至此,基础搭建完成,后续会持续系列多篇讲解,撸下源代码及相关知识........待续..... 

参考序列:

* 官方文档
* https://github.com/spring-projects/spring-data-examples/tree/master/r2dbc/example
* https://www.reactiveprinciples.org/  中文官网

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/824347.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络四元组

问题描述与解释 四元组&#xff0c;简单理解就是在 TCP 协议中&#xff0c;去确定一个客户端连接的组成要素&#xff0c;它包括 1、源 IP 地址 2、目标 IP 地址 3、源端口号 4、目标端口号 正常情况下&#xff0c;我们对于网络通信的认识可能是这样&#xff08;如图&#xff09…

【力扣每日一题】2023.8.2 翻转卡片游戏

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 这道题不是什么翻转卡片游戏&#xff0c;这就是纯纯的文字游戏&#xff0c;要是能看懂题目那就是非常简单&#xff0c;接下来我就给大家分…

相机存储卡数据恢复,掌握这2个方法就够啦!

“上次和朋友出去旅游拍了好多好看的视频和照片&#xff0c;都特别有纪念意义。但将相机存储卡插入电脑后&#xff0c;很多照片和视频都消失了&#xff0c;怎么恢复相机存储卡里照片呢&#xff1f;求帮助&#xff01;” 对于喜欢拍摄的朋友来说&#xff0c;相机的存储卡真的是个…

elb 直接配置到后端服务器组

出现上图报错的原因是&#xff0c;前面elb配置了https证书&#xff0c;后端的nginx也配置了证书&#xff0c;导致冲突。 需要修改后端的nginx配置文件&#xff0c;将证书配置注释掉。 如果出现健康检查异常&#xff0c;需要在对应服务器的安全组上配置elb所在的网段的访问权限…

不同USB口上的颜色各有什么含义和区别?

在生活中&#xff0c;当我们仔细观察手机、电视、电脑、音箱等电子设备时&#xff0c;就会发现USB端口的颜色有很多。单纯的你可能会认为只是为了好看&#xff0c;实际上不同的颜色代表着不同性能。 01.USB接口的概念 USB通用串行总线&#xff08;Universal Serial Bus&#x…

JVM内存模型【入门】

计算机结构简图 JVM内存模型 详细说明&#xff1a;https://blog.csdn.net/m0_71777195/article/details/126247090 什么是JVM&#xff1f; JVM是Java Virtual Machine&#xff08;Java虚拟机&#xff09;的缩写&#xff0c;JVM是一个虚构出来的计算机&#xff0c;有着自己完善…

VMware vSphere整体解决方案及实验拓扑

VMware vSphere整体解决方案及实验拓扑 VMware vSphere完整的解决方案 VMware vSphere有两个核心组件&#xff1a;ESXI&#xff0c;vCenter。ESXI实现的是单机虚拟化&#xff0c;而vCenter实现集群虚拟化&#xff0c;把所有的ESXI统一进行管理。当然了&#xff0c;要想是实现…

IntelliJ IDEA 2023.2新特性详解第二弹!

4 性能分析器 4.1 从 Run&#xff08;运行&#xff09;工具窗口使用分析功能 2023.2 中&#xff0c;可直接从 Run&#xff08;运行&#xff09;工具窗口轻松访问 IntelliJ 分析器的功能。 使用新按钮&#xff0c;点击即可调用&#xff1a; Attach IntelliJ Profiler&#xff…

基于“RWEQ+”集成技术在土壤风蚀模拟与风蚀模数估算、变化归因分析中的应用

土壤风蚀是一个全球性的环境问题。中国是世界上受土壤风蚀危害最严重的国家之一&#xff0c;土壤风蚀是中国干旱、半干旱及部分湿润地区土地荒漠化的首要过程。中国风蚀荒漠化面积达160.74104km2&#xff0c;占国土总面积的16.7%&#xff0c;严重影响这些地区的资源开发和社会经…

Linux【网络基础】之宏观认识

文章目录 一、计算机网络背景二、计算机网络协议&#xff08;1&#xff09;网络协议的概念&#xff08;2&#xff09;协议分层&#xff08;3&#xff09;数据封装与分用&#xff08;4&#xff09;地址管理 一、计算机网络背景 学习计算机网络我们首先要有宏观的认识&#xff0…

03|Oracle学习(主键约束、联合主键)

1. 主键约束介绍 主键&#xff1a;数据表中的一个或多个字段&#xff0c;用于唯一标识数据表中的一条记录。 2. 添加主键约束 2.1 在创建表时添加约束 写法1&#xff1a; CREATE TABLE tb_students(stu_num char(5) primary key,stu_name varchar(10) not null,stu_sex cha…

在政策+市场双轮驱动下,深眸科技助力机器视觉行业走向成熟

近年来&#xff0c;随着人工智能发展的不断提速&#xff0c;机器视觉作为其重要的前沿分支&#xff0c;凭借着机器代替人眼来做测量和判断的能力&#xff0c;广泛应用于工业领域的制造生产环节&#xff0c;用来保证产品质量、控制生产流程、感知环境等&#xff0c;并迸发出强劲…

滴滴数据服务体系建设实践

什么是数据服务化 大数据开发的主要流程分为数据集成、数据开发、数据生产和数据回流四个阶段。数据集成打通了业务系统数据进入大数据环境的通道&#xff0c;通常包含周期性导入离线表、实时采集并清洗导入离线表和实时写入对应数据源三种方式&#xff0c;当前滴滴内部同步中心…

sql入门基础-2

Dml语句 对数据的增删改查 关键字 Insert增 Update删 Delete改 添加数据 给指定字段添加数据 Insert into 表明 (字段名1&#xff0c;字段名2) values&#xff08;值1&#xff0c;值2&#xff09;; 给全部字段添加数据--(根据位置对应添加到字段下) Insert into 表名 values…

24考研数据结构-树与二叉树的基本概念

目录 第五章&#xff1a;树5.1树的基本概念5.1.1树的定义5.1.2 基本术语5.1.3 树的性质 5.2二叉树的概念5.2.1 二叉树的定义与特性5.2.2 几种特殊的二叉树5.2.3 二叉树的性质5.2.4 完全二叉树的性质5.2.5 二叉树的存储结构1. 顺序存储重要的基本操作非完全二叉树2. 链式存储逆向…

Flowable-子流程-事件子流程

目录 定义图形标记XML内容使用示例视频教程 定义 事件子流程是 BPMN 2.0 中加入的新元素&#xff0c;它是指通过事件触发的子流程&#xff0c;可以存在于在流程 级别&#xff0c;或者任何子流程级别。和内嵌子流程类似&#xff0c;把一系列的活动归结到一起处理&#xff0c;不…

vue: dev-tools控制台不显示问题

安装好dev-tools 导致控制台不显示的原因&#xff1a; 使用的vue压缩版本 vue.min.js。配置问题导致 1.查看public/index.html中引用的vue是否为压缩版本&#xff08;vue.min.js&#xff09;。 需要把压缩版换成vue.js <% if (process.env.NODE_ENV production) { %> &…

C语言中char、short、int、long各占多少字节

1byte 8bit 一个字节占8个二进制位 windows操作系统&#xff0c;32位机中&#xff0c; char&#xff1a; 1个字节 short&#xff1a; 2个字节 int&#xff1a; 4个字节 long&#xff1a; 4个字节 以下是windows操作系统&#xff0c;32位机下的代码测试结果&#xff08;3…

嵌入式高星github项目仓库

awesome-embedded soft-and-hard freemodbus

C# 外观模式

概述 外观模式&#xff08;Facade Pattern&#xff09;是一种结构型设计模式&#xff0c;它提供了一个统一的接口&#xff0c;用于访问子系统中的一组接口。外观模式隐藏了子系统的复杂性&#xff0c;使得客户端可以通过简单的接口与子系统进行交互。 外观模式定义了一个高层…