【Spring】SpringBoot整合ShardingSphere并实现多线程分批插入10000条数据(进行分库分表操作)。

news2024/11/26 16:37:05

  📝个人主页:哈__

期待您的关注 

一、ShardingSphere简介

ShardingSphere是一套开源的分布式数据库中间件解决方案组成的生态圈,它由Sharding-JDBC、Sharding-Proxy和Sharding-Sidecar(计划中)这3款相互独立的产品组成。 他们均提供标准化的数据分片、分布式事务和数据库治理功能,可适用于如Java同构、异构语言、容器、云原生等各种多样化的应用场景。

ShardingSphere定位为关系型数据库中间件,旨在充分合理地在分布式的场景下利用关系型数据库的计算和存储能力,而并非实现一个全新的关系型数据库。 它与NoSQL和NewSQL是并存而非互斥的关系。NoSQL和NewSQL作为新技术探索的前沿,放眼未来,拥抱变化,是非常值得推荐的。反之,也可以用另一种思路看待问题,放眼未来,关注不变的东西,进而抓住事物本质。 关系型数据库当今依然占有巨大市场,是各个公司核心业务的基石,未来也难于撼动,我们目前阶段更加关注在原有基础上的增量,而非颠覆。----来自官方

 1.Sharding-JDBC

定位为轻量级Java框架,在Java的JDBC层提供的额外服务。 它使用客户端直连数据库,以jar包形式提供服务,无需额外部署和依赖,可理解为增强版的JDBC驱动,完全兼容JDBC和各种ORM框架。

  • 适用于任何基于Java的ORM框架,如:JPA, Hibernate, Mybatis, Spring JDBC Template或直接使用JDBC。
  • 基于任何第三方的数据库连接池,如:DBCP, C3P0, BoneCP, Druid, HikariCP等。
  • 支持任意实现JDBC规范的数据库。目前支持MySQL,Oracle,SQLServer和PostgreSQL。

2.Sharding-Proxy 

定位为透明化的数据库代理端,提供封装了数据库二进制协议的服务端版本,用于完成对异构语言的支持。 目前先提供MySQL版本,它可以使用任何兼容MySQL协议的访问客户端(如:MySQL Command Client, MySQL Workbench等)操作数据,对DBA更加友好。

  • 向应用程序完全透明,可直接当做MySQL使用。
  • 适用于任何兼容MySQL协议的客户端。

 

3.Sharding-Sidecar(TBD) 

定位为Kubernetes或Mesos的云原生数据库代理,以DaemonSet的形式代理所有对数据库的访问。 通过无中心、零侵入的方案提供与数据库交互的的啮合层,即Database Mesh,又可称数据网格。

Database Mesh的关注重点在于如何将分布式的数据访问应用与数据库有机串联起来,它更加关注的是交互,是将杂乱无章的应用与数据库之间的交互有效的梳理。使用Database Mesh,访问数据库的应用和数据库终将形成一个巨大的网格体系,应用和数据库只需在网格体系中对号入座即可,它们都是被啮合层所治理的对象。

二、为什么用到ShardingSphere 

从性能方面来说,由于关系型数据库大多采用B+树类型的索引,在数据量超过阈值的情况下,索引深度的增加也将使得磁盘访问的IO次数增加,进而导致查询性能的下降;同时,高并发访问请求也使得集中式数据库成为系统的最大瓶颈。

从可用性的方面来讲,服务化的无状态型,能够达到较小成本的随意扩容,这必然导致系统的最终压力都落在数据库之上。而单一的数据节点,或者简单的主从架构,已经越来越难以承担。数据库的可用性,已成为整个系统的关键。

从运维成本方面考虑,当一个数据库实例中的数据达到阈值以上,对于DBA的运维压力就会增大。数据备份和恢复的时间成本都将随着数据量的大小而愈发不可控。一般来讲,单一数据库实例的数据的阈值在1TB之内,是比较合理的范围。

在传统的关系型数据库无法满足互联网场景需要的情况下,将数据存储至原生支持分布式的NoSQL的尝试越来越多。 但NoSQL对SQL的不兼容性以及生态圈的不完善,使得它们在与关系型数据库的博弈中始终无法完成致命一击,而关系型数据库的地位却依然不可撼动。

三、数据分片

水平分片又称为横向拆分。它不再将数据根据业务逻辑分类,而是通过某个字段(或某几个字段),根据某种规则将数据分散至多个库或表中,每个分片仅包含数据的一部分。 例如:根据主键分片,偶数主键的记录放入0库(或表),奇数主键的记录放入1库(或表),如下图所示。

简单的来说,水平分片就是把一张大表的数据进行一个水平切割,将切割出来的不同的部分添加到不同的表当中,我们举这样的一个例子,在一家银行当中,最开始只开放了一个业务窗口,因为一开始的业务量不大,一个窗口足以解决这一天当中的所有问题,但是由于业务员的出色的业务能力,越来越多的人开始到这个银行办理业务了,这时一个窗口就不够了,需要多开几个窗口分担业务压力。我们这样设定一下,一共开放5个窗口,去哪个窗口取决于个人的身份证最后一位%5取余+1,如果是X那么就直接到1号窗口。

那么对于实际的业务来说,我们也是如此,一张订单表我们可以根据订单号进行取余操作分配表。

除了分表之外我们还可以分库,具体的思想还是一致的。

四、SpringBoot整合ShardingSphere

1.创建我们的数据库ds0和ds1。分别创建我们的表格order0,order1,order2。(两个数据库都运行一下)

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for t_order0
-- ----------------------------
DROP TABLE IF EXISTS `t_order0`;
CREATE TABLE `t_order0`  (
  `order_id` bigint(20) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) NOT NULL,
  `order_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
  PRIMARY KEY (`order_id`) USING BTREE
) ENGINE = InnoDB  CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Compact;

-- ----------------------------
-- Table structure for t_order1
-- ----------------------------
DROP TABLE IF EXISTS `t_order1`;
CREATE TABLE `t_order1`  (
  `order_id` bigint(20) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) NOT NULL,
  `order_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
  PRIMARY KEY (`order_id`) USING BTREE
) ENGINE = InnoDB  CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Compact;

-- ----------------------------
-- Table structure for t_order2
-- ----------------------------
DROP TABLE IF EXISTS `t_order2`;
CREATE TABLE `t_order2`  (
  `order_id` bigint(20) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) NOT NULL,
  `order_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
  PRIMARY KEY (`order_id`) USING BTREE
) ENGINE = InnoDB  CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Compact;

SET FOREIGN_KEY_CHECKS = 1;

2.引入依赖

这里的依赖是为了实现我的们的目标,进行多线程分库分表插入。

 <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
            <version>5.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.18</version>
        </dependency>

3.添加配置文件。创建application.yml

我来讲解一下这些配置文件都是干啥的,都写到注释了。

spring:
  shardingsphere:
    props:
      #d打印Sql语句
      sql-show: true
    datasource:
      #创建我们的ds0数据源
      ds0:
        #下边这些都是老套路了
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://localhost:3306/ds0?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT&allowPublicKeyRetrieval=true
        password: 2020
        type: com.zaxxer.hikari.HikariDataSource
        username: root
      #创建我们的ds1数据源
      ds1:
        #一样的老套路
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://localhost:3306/ds1?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT&allowPublicKeyRetrieval=true
        password: 2020
        type: com.zaxxer.hikari.HikariDataSource
        username: root
      names: ds0,ds1
    #这里就比较重要了,这里是定义我们的分库分表的规则
    rules:
      sharding:
        #分片算法
        sharding-algorithms:
          #为分库定义一个算法 到底是如何分的库
          custom-db-inline:
            props:
              # 这里是具体的算法,我们根据userId取余进行分库,余数是几就分到ds几
              algorithm-expression: ds$->{user_id%2}
            type: INLINE
          # 如何分表
          custom-table-inline:
            props:
              # 根据orderId取余分表
              algorithm-expression: t_order$->{order_id%3}
            type: INLINE
        tables:
          # 这是我们的逻辑表 因为我们根本没有t_order这个表,这是我们的t_order0 1 2抽象出来的
          t_order:
            # 这是我们的真实表
            actual-data-nodes: ds$->{0..1}.t_order$->{0..2}
            database-strategy:
              standard:
                # 分库算法的名称 也就是上边的
                sharding-algorithm-name: custom-db-inline
                sharding-column: user_id
            table-strategy:
              standard:
                # 分表算法名称
                sharding-algorithm-name: custom-table-inline
                sharding-column: order_id
async:
  executor:
    thread:
      core_pool_size: 5
      max_pool_size: 20
      queue_capacity: 90000
      name:
        prefix: async-
mybatis-plus:
  global-config:
    db-config:
      id-type: assign_id

4.创建我们的框架结构

 

三层Order的代码如下。

// Order实体
@Data
@TableName("t_order")
@SuppressWarnings("serial")
public class Order extends Model<Order> {

    @TableId(type = IdType.ASSIGN_ID)
    private Long orderId;

    private Integer userId;

    private String orderName;

    @Override
    public Serializable pkVal() {
        return this.orderId;
    }
}

//mapper
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}



//Order的service接口
public interface OrderService extends IService<Order> {
}


//接口实现
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {
}

ExecutorConfig,配置我们的线程池。

@Configuration
public class ExecutorConfig {
    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        //在这里修改
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //配置队列大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}

 创建AsyncService接口和实现类。

public interface AsyncService {
     void add(List<Order> orderList, CountDownLatch countDownLatch);
}
@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {
    @Resource
    private OrderServiceImpl orderService;
    @Async("asyncServiceExecutor")
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void add(List<Order> orderList, CountDownLatch countDownLatch) {
        try {
            log.debug(Thread.currentThread().getName()+"开始插入数据");
            orderService.saveBatch(orderList);
            log.debug(Thread.currentThread().getName()+"插入数据完成");
        }finally {
            countDownLatch.countDown();
        }

    }
}

 要使用多线程异步调用要在启动程序上加上注解。

@SpringBootApplication
@EnableAsync
@EnableTransactionManagement
public class ShardingSphereApplication {
    public static void main(String[] args) {
        SpringApplication.run(ShardingSphereApplication.class, args);
    }

}

现在来看我们的AysncController。我定义了一个getData的方法,用于模拟生成我们的数据,当然我设置的名称都差不多,一共一万条数据,通过user_id进行分库,通过order_id进行分表,userId使用的是for循环的i索引,orderId使用的是雪花算法生成的Id序列。

在testAsyncInsert方法中。使用ListUtils的方法进行数据切片,每两千条数据切割成一个list,然后执行异步添加操作。待所有线程执行完毕之后,打印输出语句。

@RestController
public class AsyncController {
    @Autowired
    private AsyncService asyncService;

    @GetMapping("/test")
    public String testAsyncInsert(){
        CountDownLatch c;
        try {
            List<Order> data = getData();
            List<List<Order>> partition = ListUtil.partition(data, 2000);
            c= new CountDownLatch(partition.size());
            for (List<Order> list : partition) {
                asyncService.add(list,c);
            }
            c.await();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            System.out.println("所有的数据插入完毕");
        }
        return "执行完毕";
    }
    private List<Order> getData(){
        List<Order> list = new ArrayList<>();
        for(int i = 0;i<10000;i++){
            Order o = new Order();
            o.setOrderName("苹果"+i);
            o.setUserId(i+1);
            list.add(o);
        }
        return list;
    }
}

看结果。 大家可以自己去验证一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1565318.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

hadoop 高可用(HA)、HDFS HA、Yarn HA

目录 hadoop 高可用(HA) HDFS高可用 HDFS高可用架构 QJM 主备切换&#xff1a; Yarn高可用 hadoop 高可用(HA) HDFS高可用 HDFS高可用架构 QJM 主备切换&#xff1a; Yarn高可用

Oracle基础-PL/SQL编程 备份

1、PL/SQL简介 PL/SQL块结构 约定&#xff1a;为了方便&#xff0c;本文后面把PL/SQL简称PL。 PL程序都是以块&#xff08;BLOCK&#xff09;为基本单位&#xff0c;整个PL块分三部分&#xff1a;声明部分&#xff08;使用DECLARE开头&#xff09;、执行部分(以BEGIN开头)和异…

IP地址获取不到的原因是什么?

在数字化时代的今天&#xff0c;互联网已成为我们日常生活和工作中不可或缺的一部分。而IP地址&#xff0c;作为互联网通信的基础&#xff0c;其重要性不言而喻。然而&#xff0c;有时我们可能会遇到IP地址获取不到的问题&#xff0c;这会给我们的网络使用带来诸多不便。那么&a…

利用开源AI引擎:构建本地化部署的工业级智能安全监测系统

随着科技的不断进步&#xff0c;传统的安全管理方法已经无法满足现代社会对于安全保障的高标准要求。环境隐患的及时发现与处理&#xff0c;对于保障人民生命财产安全、维护社会稳定具有重要意义。本文将探讨如何通过图像处理、图像识别和目标检测技术&#xff0c;实现对环境隐…

力扣2684---矩阵中移动的最大次数(DFS,Java、中等题)

目录 题目描述&#xff1a; 思路描述&#xff1a; 代码&#xff1a; 纯递归&#xff1a; 带有记忆化搜索的递归&#xff1a; 题目描述&#xff1a; 给你一个下标从 0 开始、大小为 m x n 的矩阵 grid &#xff0c;矩阵由若干 正 整数组成。 你可以从矩阵第一列中的 任一 单…

Jenkins执行策略(图文讲解)

Jenkins执行策略-图文讲解 一&#xff1a;手动执行1、手动执行流程2、手动执行操作 二、通过构建触发器——定时执行1、定时执行流程2、定时执行操作 三、当开发部署成功之后进行执行——在测试项配置——关注的项目1、执行流程2、操作流程 四、测试代码有更新的时候自动构建1、…

破解密码:掌握2024年的营销归因

Cracking the Code: Mastering Marketing Attribution in 2024 营销归因是识别哪些营销渠道和触及点有助于销售或转化的过程。随着消费者继续通过多个渠道与品牌互动&#xff0c;掌握营销归因对企业来说变得越来越重要。在这篇文章中&#xff0c;我们将探讨破解代码和有效衡量…

算法基础--递推

&#x1f600;前言 递推算法在计算机科学中扮演着重要的角色。通过递推&#xff0c;我们可以根据已知的初始条件&#xff0c;通过一定的规则推导出后续的结果&#xff0c;从而解决各种实际问题。本文将介绍递推算法的基础知识&#xff0c;并通过一些入门例题来帮助读者更好地理…

如何从文本数据中提取子列表

提取文本数据中的子列表可以通过各种方式实现&#xff0c;具体取决于文本数据的结构和提取子列表的条件。例如&#xff1a;使用字符串操作和条件判断、使用正则表达式、使用自然语言处理工具、使用自定义解析器等几种模式&#xff0c;那么对于在日常使用中会有那些问题呢 &…

黄金票据的复现

实验环境以及工具 服务器&#xff1a;Windows server 2003 用户&#xff1a;Windows 7旗舰版 工具&#xff1a;mimikatz 搭建服务器环境 参考&#xff1a;内网横向——域渗透之黄金票据复现-CSDN博客 创建用户 使用gpupdate刷新策略&#xff1b; 搭建win7环境 设置ip ‘…

IDEA无法连接虚拟机中的Redis的解决方案,无法连接Jedis,无法ping通虚拟机的解决方案

首先&#xff0c;笔者先说明一下自身的情况&#xff0c;怎么连接都连不上&#xff0c;网上的教程全部都看了一遍&#xff0c;基本上没用得上的&#xff0c;这篇文章里面的解决方案包括了笔者能在网上找到了最全面的办法总结&#xff0c;最后终于是连上了 目录 一.连接Jedis出错…

专题三_二分查找(3)

目录 153. 寻找旋转排序数组中的最小值 解析 题解 LCR 173. 点名 解析 题解 153. 寻找旋转排序数组中的最小值 153. 寻找旋转排序数组中的最小值 - 力扣&#xff08;LeetCode&#xff09; 解析 题解 class Solution { public:int findMin(vector<int>& nums) …

Qt实现Kermit协议(三)

3 实现 3.2 KermitSendFile 该模块实现了Kermit发送文件功能。 序列图如下&#xff1a; 3.2.1 KermitSendFile定义 class QSerialPort; class KermitSendFile : public QObject, public Kermit {Q_OBJECT public:explicit KermitSendFile(QSerialPort *serial, QObject *…

软考中项新版第1章脑图发布,用5幅图掌控信息化发展的考点地图

2024年1月开年&#xff0c;软考系统集成项目管理工程师官方教程&#xff0c;迎来了阔别7年的大改版&#xff0c;改版之后的软考中项考试&#xff0c;离同宗兄弟高项考试渐行渐远。 中项第3版教程&#xff0c;仅仅从教程来看&#xff0c;其难度已经不亚于高级的信息系统项目管理…

15.Python访问数据库

如果数据量较少&#xff0c;则我们可以将数据保存到文件中&#xff1b;如果数据量较 大&#xff0c;则我们可以将数据保存到数据库中。 1 SQLite数据库 SQLite是嵌入式系统使用的关系数据库&#xff0c;目前的主流版本是SQLite 3。SQLite是开源的&#xff0c;采用C语言编写而…

使用TCP协议就一定零丢包了吗?

简述数据包发送流程 为了简化模型&#xff0c;我们把中间的服务器给省略掉&#xff0c;假设这是个端到端的通信。且为了保证消息的可靠性&#xff0c;它们之间用的是TCP协议进行通信。 为了发送数据包&#xff0c;两端首先会通过三次握手&#xff0c;建立TCP连接。 一个数据包&…

STM32 uC/OS-III

What is uC/OS-III? C/OS-III 的发音为“Micro C O S Three”&#xff0c;这意味着 C/OS-III 是基于 C 语言编写的第三代 小型操作系统&#xff0c;当然这里所说的第三代是相对于 C/OS 的前两个版本 C/OS 和 C/OS-II 而言 的&#xff0c;后面也会介绍这三个版本的差别。C/OS/…

华为OD机试 - 最大社交距离(Java 2024 C卷 100分)

华为OD机试 2024C卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷C卷&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;每一题都有详细的答题思路、详细的代码注释、样例测试…

this.$route.back()时的组件缓存

1.this.$route.back()回到上一个路径会重新加载 跳转时,前一个路由的内容会被销毁,当回来时,重新创建树,组件内有保存了距离,没有一开始是0. 2.keep-alive写在router-view上面,这个地方所代表的路由会被保存,因此可以写在上面,保存,当返回时,如果是这个路由,里面的内容是一样…

深入理解数据结构(1):复杂度详解

文章主题&#xff1a;复杂度详解&#x1f331;所属专栏&#xff1a;深入理解数据结构&#x1f4d8;作者简介&#xff1a;更新有关深入理解数据结构知识的博主一枚&#xff0c;记录分享自己对数据结构的深入解读。&#x1f604;个人主页&#xff1a;[₽]的个人主页&#x1f525;…