MyBatis-Plus多数据源dynamic-datasource解决多线程情境下数据源切换失效问题

news2024/9/24 11:33:03

前言:项目中使用MyBatis-Plus多数据源dynamic-datasource,完成多数据源的切换;但是在并发场景下,我们会发现线程会一直访问默认数据源(配置的Master数据),并没有访问我们在上一步切换后的数据源,之前切换的数据源失效了;显然多数据源对于并发的处理并不友好,那么我们怎么解决这个问题呢。

本文是在springboot项目已集成dynamic-datasource 基础上延伸的问题,项目集成多数据源可以参考:Idea+maven+spring-cloud项目搭建系列–13 整合MyBatis-Plus多数据源dynamic-datasource

1 问题产生的原因:

问题的产生来源于多数据源com.baomidou.dynamic.datasource.toolkit 包下DynamicDataSourceContextHolder 类的问题,当我们打开这个类,会发现,存储当前线程的数据源使用了 ThreadLocal:

package com.baomidou.dynamic.datasource.toolkit;

import java.util.ArrayDeque;
import java.util.Deque;
import org.springframework.core.NamedThreadLocal;
import org.springframework.util.StringUtils;

public final class DynamicDataSourceContextHolder {
	// 线程数据源的存储
    private static final ThreadLocal<Deque<String>> LOOKUP_KEY_HOLDER = new NamedThreadLocal<Deque<String>>("dynamic-datasource") {
        protected Deque<String> initialValue() {
            return new ArrayDeque();
        }
    };

    private DynamicDataSourceContextHolder() {
    }

    public static String peek() {
    	//  访问数据库时 从队列中peek 出来数据源
        return (String)((Deque)LOOKUP_KEY_HOLDER.get()).peek();
    }
	// 放入要切换的数据源
    public static String push(String ds) {
        String dataSourceStr = StringUtils.isEmpty(ds) ? "" : ds;
        ((Deque)LOOKUP_KEY_HOLDER.get()).push(dataSourceStr);
        return dataSourceStr;
    }
	// 从队列获取数据源
    public static void poll() {
        Deque<String> deque = (Deque)LOOKUP_KEY_HOLDER.get();
        deque.poll();
        if (deque.isEmpty()) {
            LOOKUP_KEY_HOLDER.remove();
        }

    }
	// 清除数据源
    public static void clear() {
        LOOKUP_KEY_HOLDER.remove();
    }
}

再来看下 NamedThreadLocal:

// 此处可以看到继承了 ThreadLocal 类
public class NamedThreadLocal<T> extends ThreadLocal<T> {

	private final String name;


	/**
	 * Create a new NamedThreadLocal with the given name.
	 * @param name a descriptive name for this ThreadLocal
	 */
	public NamedThreadLocal(String name) {
		Assert.hasText(name, "Name must not be empty");
		this.name = name;
	}

	@Override
	public String toString() {
		return this.name;
	}

}

简单概况下数据源的切换流程:
当我们进行数据源切换的时候,实际上是向当前线程所持有的LOOKUP_KEY_HOLDER 的ThreadLocal 对象放入数据源,这样在当前线程在进行数据库访问的时候,会得到当前的数据源,然后找到对应的jdbc 连接,完成数据的访问;
因为LOOKUP_KEY_HOLDER 对象是用ThreadLocal 修饰的,也就是说它是线程隔离的,所以当我们在切换完数据源之后,在子线程中维护的LOOKUP_KEY_HOLDER 是空的,再找不到数据源的情况下,就访问到了默认的数据源;

2 问题处理的思路:

既然是由于线程中保存数据源是每个线程隔离的,要想在并发的情形下仍然可以正常的数据源切换,要就需要打破其隔离性:
解决思路1:在开启线程执行任务时 ,先获取到父线程的数据源,然后在子线程内手动完成数据源的切换,保证子父线程数据源的一致性;
解决思路2:在项目中创建一个特殊的线程池,当有任务的执行时,进行拦截,获取父线程的数据源然后手动进行数据源的切换;
解决思路3:项目中覆盖DynamicDataSourceContextHolder 类修改LOOKUP_KEY_HOLDER 的对象,使得子线程在执行任务时,可以拿到父线程的数据源标识,这样也可以保证,子父线程访问数据源的一致性;改方法可以在不入侵原有业务代码的情况下,在业务开发者无感知的情况下,做到统一拦截并进行代理,完成父类数据源的传递;

3 问题解决的办法:

3.1 针对于解决思路1:
在执行线程任务时,进行手动的切换 demo:

// 获取当前父线程的数据源
String parentDb = "";
  new Thread(()->{
         // 切换数据源
        DynamicDataSourceContextHolder.push(parentDb ); 
       try {
       		// do some thing
		  }finally {
		  	// 最后移除数据源
		  	 DynamicDataSourceContextHolder.clear();
		  }    
  }).start();
// 在子线程执行任务时

3.2 针对于解决思路2:
创建一个线程池,当执行任务时,都使用改线程池:
线程配置类:TaskExecutionConfig

import org.springframework.context.annotation.*;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class TaskExecutionConfig {
    // cpu 核心数
    private static final int DEFAULT_THREADS = Math.max(1, Runtime.getRuntime().availableProcessors());


    @Primary
    @Bean(name = {"taskHolderExecutorProxy", "executor"})
    public TaskHolderExecutorProxy threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();

        threadPoolTaskExecutor.setCorePoolSize(DEFAULT_THREADS);
        threadPoolTaskExecutor.setMaxPoolSize(DEFAULT_THREADS << 1);
        threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);
        threadPoolTaskExecutor.setKeepAliveSeconds(120);
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        threadPoolTaskExecutor.initialize();
        return new TaskHolderExecutorProxy(threadPoolTaskExecutor);
    }
}

线程执行任务时进行拦截进行数据源切换:TaskHolderExecutorProxy

import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

import java.util.concurrent.Executor;

/**
 * 使用多线程并行查询时,非主线程 尝试获取 用户上下文 (即httpServletRequest)时
 * 用户上下文为空,会导致 使用多线程查询的服务 无法使用多租户功能
 * 所以这个proxy在提交任务到线程池之前先保存线程的上下文,
 * 这样非主线程也能拿到主线程的用户上下文,从而使用多租户
 */

public class TaskHolderExecutorProxy implements Executor {

    /**
     * 被代理的线程池
     */
    private final Executor executor;

    public TaskHolderExecutorProxy(Executor executor) {
        this.executor = executor;
    }

    @Override
    public void execute(Runnable command) {
//		保存主线程的 用户上下文
       // RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();
		// 获取当前父线程的数据源
		String parentDb = "";
        executor.execute(() -> {

//			为线程池 设置用户上下文
          //  RequestContextHolder.setRequestAttributes(requestAttributes);
            // 切换数据源
            DynamicDataSourceContextHolder.push(parentDb ); 
            try {
                command.run();
            } finally {
//				清理线程池线程的上下文
//                RequestContextHolder.resetRequestAttributes();
 				// 最后移除数据源
		  		 DynamicDataSourceContextHolder.clear();
            }
        });

    }
}

3.3 针对于解决思路3:重写DynamicDataSourceContextHolder 类覆盖掉MyBatis-Plus 原有的类,并进行代理,在子线程任务执行之前放入父线程的数据源标识,并在子线程任务执行结束之后移除改数据源标识:
3.3.1 首先需要引入一个阿里的jar ,让其可以帮助我们将父线程ThreadLocal 修饰的常量,可以继承到子线程中:

  <!-- https://mvnrepository.com/artifact/com.alibaba/transmittable-thread-local -->
   <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>transmittable-thread-local</artifactId>
        <version>2.12.1</version>
    </dependency>

3.3.2 重写 DynamicDataSourceContextHolder 类:
我们需要在项目中创建一个路径和MyBatis-Plus 下 DynamicDataSourceContextHolder 类 路径相同,类名相同的DynamicDataSourceContextHolder 类:
在这里插入图片描述
DynamicDataSourceContextHolder 中我们重新定义LOOKUP_KEY_HOLDER

package com.baomidou.dynamic.datasource.toolkit;

import org.springframework.util.StringUtils;

import java.util.ArrayDeque;
import java.util.Deque;

public class DynamicDataSourceContextHolder {
    private static final ThreadLocal<Deque<String>> LOOKUP_KEY_HOLDER = new ChildThreadTreadLocal<Deque<String>>("dynamic-datasource") {
        protected Deque<String> initialValue() {
            return new ArrayDeque();
        }
    };

    private DynamicDataSourceContextHolder() {
    }

    public static String peek() {
        return (String)((Deque)LOOKUP_KEY_HOLDER.get()).peek();
    }

    public static String push(String ds) {
        String dataSourceStr = StringUtils.isEmpty(ds) ? "" : ds;
        ((Deque)LOOKUP_KEY_HOLDER.get()).push(dataSourceStr);
        return dataSourceStr;
    }

    public static void poll() {
        Deque<String> deque = (Deque)LOOKUP_KEY_HOLDER.get();
        deque.poll();
        if (deque.isEmpty()) {
            LOOKUP_KEY_HOLDER.remove();
        }

    }

    public static void clear() {
        LOOKUP_KEY_HOLDER.remove();
    }
}

相同包路径下定义ChildThreadTreadLocal类:在该类中我们继承TransmittableThreadLocal 类帮我进行父子线程数据的传递

package com.baomidou.dynamic.datasource.toolkit;

import com.alibaba.ttl.TransmittableThreadLocal;
import org.springframework.util.Assert;

public class ChildThreadTreadLocal<T> extends TransmittableThreadLocal {
    private final String name;

    public ChildThreadTreadLocal(String name) {
        Assert.hasText(name, "Name must not be empty");
        this.name = name;
    }

    public String toString() {
        return this.name;
    }
}

3.3.3 对项目中所以线程任务的执行增加代理
在需要代理的项目跟路径下放入之前pom 下载到maven 仓库的transmittable-thread-local-2.12.1.jar 包
新建buildlocal 文件包,并放入transmittable-thread-local-2.12.1.jar 包:
在这里插入图片描述
3.3.4 项目启动的jvm 参数增加代理:
在这里插入图片描述
-javaagent:xxxx/buildlocal/transmittable-thread-local-2.12.1.jar

3.3.5 对于线上部署docker 时 ,在doker 容器启动时增加代理:
在这里插入图片描述
4 总结:

  • 针对方法1和方法2:都需要侵入代码进行数据源的切换和移除;
  • 针对方法3 因为重新了DynamicDataSourceContextHolder 并且对数据源对象LOOKUP_KEY_HOLDER 使用TransmittableThreadLocal 进行修饰,当启动项目是使用-javaagent:完成代理后,每次在子线程进行任务执行时子线程都可以获取到父线程中的数据源,从而保证了子父线程数据源的一致性,并且该方法不需要入侵原有的业务代码;

5 扩展:
在项目开启-javaagent:xxxx/buildlocal/transmittable-thread-local-2.12.1.jar 线程的代理后,测试ThreadLocal 数据的可见性:


import com.alibaba.ttl.TransmittableThreadLocal;
import com.cric.zhongjian.common.datasource.Master;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
public class ThreadTestController {
    private final static ThreadLocal<String> threadLocal1 = new ThreadLocal<>();
    private final static ThreadLocal<String> threadLocal2 = new InheritableThreadLocal<>();
    private final static ThreadLocal<String> threadLocal3= new TransmittableThreadLocal<>();

    @Master
    @GetMapping("threadlocal")
    public void testThread() throws InterruptedException {
        List<String > a = Arrays.asList("100,200".split(","));
        threadLocal1.set("x");
        threadLocal2.set("y");
        threadLocal3.set("z");

        new Thread(()->{
            System.out.println( Thread.currentThread().getId()+":"+ Thread.currentThread().getName());
            System.out.println("ExecutorServicex threadLocal1.get() = " + threadLocal1.get());
            System.out.println("ExecutorServicex threadLocal2.get() = " + threadLocal2.get());
            System.out.println("ExecutorServicex threadLocal3.get() = " + threadLocal3.get());
            System.out.println("================== ");
        }).start();
        Thread.sleep(1000);

        threadLocal1.set("a");
        threadLocal2.set("b");
        threadLocal3.set("c");
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
        fixedThreadPool.submit(()->{
            System.out.println( Thread.currentThread().getId()+":"+ Thread.currentThread().getName());
            System.out.println("ExecutorService1 threadLocal1.get() = " + threadLocal1.get());
            System.out.println("ExecutorService1 threadLocal2.get() = " + threadLocal2.get());
            System.out.println("ExecutorService1 threadLocal3.get() = " + threadLocal3.get());
            System.out.println("================== ");
        });
        Thread.sleep(1000);
        threadLocal1.set("1");
        threadLocal2.set("2");
        threadLocal3.set("3");
        fixedThreadPool.submit(()->{
            System.out.println( Thread.currentThread().getId()+":"+ Thread.currentThread().getName());
            System.out.println("ExecutorService2 threadLocal1.get() = " + threadLocal1.get());
            System.out.println("ExecutorService2 threadLocal2.get() = " + threadLocal2.get());
            System.out.println("ExecutorService2 threadLocal3.get() = " + threadLocal3.get());
            System.out.println("================== ");
        });

        Thread.sleep(1000);
        threadLocal1.set("aa");
        threadLocal2.set("bb");
        threadLocal3.set("cc");
        a.parallelStream().forEach(e->{
            System.out.println(Thread.currentThread().getName()+":parallelStream threadLocal1.get() = " + threadLocal1.get());
            System.out.println(Thread.currentThread().getName()+":parallelStream threadLocal2.get() = " + threadLocal2.get());
            System.out.println(Thread.currentThread().getName()+":parallelStream threadLocal3.get() = " + threadLocal3.get());
            System.out.println("================== ");

        });


    }
}

测试结果:

160:Thread-30
ExecutorServicex threadLocal1.get() = null
ExecutorServicex threadLocal2.get() = y
ExecutorServicex threadLocal3.get() = z
================== 
161:pool-9-thread-1
ExecutorService1 threadLocal1.get() = null
ExecutorService1 threadLocal2.get() = b
ExecutorService1 threadLocal3.get() = c
================== 
161:pool-9-thread-1
ExecutorService2 threadLocal1.get() = null
ExecutorService2 threadLocal2.get() = b
ExecutorService2 threadLocal3.get() = 3
================== 
http-nio-9201-exec-2:parallelStream threadLocal1.get() = aa
http-nio-9201-exec-2:parallelStream threadLocal2.get() = bb
http-nio-9201-exec-2:parallelStream threadLocal3.get() = cc
================== 
ForkJoinPool.commonPool-worker-3:parallelStream threadLocal1.get() = null
ForkJoinPool.commonPool-worker-3:parallelStream threadLocal2.get() = bb
ForkJoinPool.commonPool-worker-3:parallelStream threadLocal3.get() = cc
================== 

可以看到当使用TransmittableThreadLocal 修饰后,在项目中进行子线程任务的执行时,子线程都可以拿到父线程的ThreadLocal 数据;

6 参考:
6.1 TransmittableThreadLocal的使用及原理解析;
6.2 springboot springmvc 拦截线程池线程执行业务逻辑;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/452427.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何把电脑文件备份?本指南很详细

概述&#xff1a;当文件发生丢失的时候&#xff0c;如果我们有备份&#xff0c;直接进行还原就可以了。可见备份的重要性。如何把电脑文件备份&#xff1f;本文将为大家详细介绍。 一、为什么要备份文件 我们在电脑上面会保存很多文件&#xff0c;有的文件是很重要的&#xf…

基于vfw的局域网语音聊天室系统源码论文

语音视频聊天 UDP套接字的运用 在实现语音视频聊天时,采用的是基于UDP套接字的点对点模式,而UDP面向的是无连接的数据服务,其套接字的使用如图10所示。 图10 UDP套接字的使用 视频的捕获 利用VFW接口&#xff0c;视频捕获可以分为以下几个步骤&#xff1a; 建立视频采集窗口…

如何把数据库中的数据显示到页面

主要内容&#xff1a;使用JDBC访问数据库中数据&#xff08;Java Web数据可视化案例&#xff09; 文章目录 前期准备&#xff1a;案例&#xff1a;第一步&#xff1a;创建数据库及数据第二步&#xff1a;编写实体类第三步&#xff1a;编写Dao类第四步&#xff1a;编写Servlet代…

SpringBoot中使用redis事务

本文基于SpringBoot 2.X 事务在关系型数据库的开发中经常用到&#xff0c;其实非关系型数据库&#xff0c;比如redis也有对事务的支持&#xff0c;本文主要探讨在SpringBoot中如何使用redis事务。 事务的相关介绍可以参考&#xff1a; 0、起因 在一次线上事故中&#xff0c;我们…

与春相拥,在职读研邂逅中国人民大学与加拿大女王大学金融硕士项目何其有幸

工作几年后的你是否有冒出在职读研的想法&#xff1f;或许你为此纠结了一段时间&#xff0c;在职读研要考虑到的因素众多&#xff0c;年龄、精力分配等&#xff0c;工作几年&#xff0c;经历了职场的磨练&#xff0c;更能知道自己家想要的是什么&#xff0c;对于是否读研会有一…

红海云签约长久数科,引领汽车流通行业人力资源数字化创新

上海铂中数字科技有限公司&#xff08;以下简称“长久数科”&#xff09;是国内领先的数字化汽车供应链服务企业&#xff0c;致力于通过整合客户资源、技术资源、产业链配套资源以及地面服务资源&#xff0c;打造一站式服务的汽车生态大数据SaaS平台。 近日&#xff0c;长久数…

编译原理陈火旺第三版第六章课后题答案

下面的答案仅供参考&#xff01; 1.按照表6.1所示的属性文法&#xff0c;构造表达式(4*71) *2的附注语法树。 答&#xff1a; 首先考虑最底最左边的内部结点,它对应于产生式F→digit,相应的语义规则为F. val: digit.lexval,由于这个结点的子结点digit的属性digit . lexval的…

一文彻底理解Java 17中的新特性密封类

密封类的作用 在面向对象语言中&#xff0c;我们可以通过继承&#xff08;extend&#xff09;来实现类的能力复用、扩展与增强。但有的时候&#xff0c;有些能力我们不希望被继承了去做一些不可预知的扩展。所以&#xff0c;我们需要对继承关系有一些限制的控制手段。而密封类…

android framework-ActivityManagerService(AMS)上

一、SystemServer android-10.0.0_r41\frameworks\base\services\java\com\android\server\SystemServer.java 1.1、startOtherService AMS初始化完成后&#xff0c;会调用systemReady方法。 mActivityManagerService.systemReady(() -> {Slog.i(TAG, "Making service…

springboot +flowable,简单实现工作流基础功能的demo例子

一.简介 对于flowable是什么以及关于此框架的具体信息可以参看此项目的官方文档&#xff1a;https://www.flowable.org/docs/userguide/index.html Flowable is a light-weight business process engine written in Java.这是官网文档对此框架的完美解释&#xff1a;Flowable…

NetApp AFF C 系列全闪存存储解决方案

NetApp AFF C 系列: “C”代表“酷炫”(Cool) 采用最新的容量闪存技术&#xff0c;辅以若干一流的智能技术&#xff0c;您将获得一个经济实惠的闪存解决方案&#xff0c;它重新定义了安全性、可扩展性和可持续性。 为什么选择 AFF C 系列的容量闪存解决方案&#xff1f; 实现…

jmeter获取图片验证码-解密图片并识别

说明&#xff1a; 关于图片验证码的处理方式有三种方法&#xff1a;一是让开发屏蔽验证码&#xff0c;二是让开发后端指定一个万能验证码&#xff0c;三是使用OCR工具进行图片验证码的解密及识别&#xff0c;推荐使用前两种方法最省事&#xff1b; OCRServer工具识别图片验证码…

Netty核心源码分析(二),Netty的Server端接收请求过程源码分析

文章目录 系列文章目录一、连接请求接受过程源码分析1、事件的值2、processSelectedKeys获取事件&#xff08;1&#xff09;doReadMessages方法&#xff08;2&#xff09;pipeline的fireChannelRead方法&#xff08;3&#xff09;ServerBootstrapAcceptor的channelRead方法 3、…

关于数据挖掘和数据集成?

按照数据的生命周期&#xff0c;我们通常将大数据技术分为数据集成、数据存储、批流处理、数据查询与分析、数据调度与编排、数据开发、BI 7 个部分。 可以看到数据集成在数据生命周期最前面的位置&#xff0c;它负责将多个来自不同数据源的数据聚合存放在一个数据存储中&…

分布式任务调度框架Power-Job

分布式任务调度框架的由来及对比 在大型业务业务系统中&#xff0c;不可避免会出现一些需要定时执行需求的场景&#xff0c;例如定时同步数据&#xff0c;定时清洗数据&#xff0c;定时生成报表&#xff0c;大量机器一同执行某个任务&#xff0c;甚至有些需要分布式处理的任务…

中继器+js组件化GIS地图

虽然可以使用JavaScript注入的方式将GIS地图嵌入Axure&#xff0c;但每次使用地图都需要重复嵌入并修改代码&#xff0c;不太方便。那么&#xff0c;能不能实现组件化呢&#xff1f;我们可以使用中继器&#xff08;repeater&#xff09;将常用的地图参数提取出来&#xff0c;通…

力扣题库刷题笔记406-根据身高重建队列

1、题目如下&#xff1a; 2、个人Python代码实现 这里需要单独备注一下截图中第21行代码&#xff1a; 上图可以看到&#xff0c;已经对[5, 2]等元素进行了遍历循环&#xff0c;且[5, 2]左侧确实只存在[7, 0][6, 1]两个元素身高高于他&#xff0c;但是继续[5,0]循环完成后&#…

@Async异步线程:Spring 自带的异步解决方案

前言 在项目应用中&#xff0c;使用MQ异步调用来实现系统性能优化&#xff0c;完成服务间数据同步是常用的技术手段。如果是在同一台服务器内部&#xff0c;不涉及到分布式系统&#xff0c;单纯的想实现部分业务的异步执行&#xff0c;这里介绍一个更简单的异步方法调用。 对于…

FreeRTOS - 计数信号量

一.任务功能 1、修改按键功能&#xff0c;模拟停车位出入功能 2、当按键按下 获取车位 3、当按键抬起 释放车位 二.API接口 函数原型SemaphoreHandle_t xSemaphoreCreateCounting( ①UBaseType_t uxMaxCount,②UBaseType_t uxInitialCount );功能概述创建计数信号量&#xff0c…

详解空气质量API 使用

引言 空气污染是当今世界面临的一大环境问题&#xff0c;而空气质量监测数据是制定环境政策和公众健康计划的重要依据。通过提供空气质量查询 API&#xff0c;开发人员可以方便地获取中国境内多个城市的空气质量数据&#xff0c;从而更好地监测和管理空气质量。 本文将介绍的…