使用双异步后,如何保证数据一致性?

news2024/11/14 2:07:59

在这里插入图片描述

目录

    • 一、前情提要
    • 二、通过Future获取异步返回值
      • 1、FutureTask 是基于 AbstractQueuedSynchronizer实现的
      • 2、FutureTask执行流程
      • 3、get()方法执行流程
    • 三、FutureTask源码具体分析
      • 1、FutureTask源码
        • 2、将异步方法的返回值改为```Future<Integer>```,将返回值放到```new AsyncResult<>();```中;
      • 3、通过```Future<Integer>.get()```获取返回值:
      • 4、这里也可以通过新线程+Future获取Future返回值
      • 在BUG中磨砺,在优化中成长

大家好,我是哪吒。

一、前情提要

在上一篇文章中,我们通过双异步的方式导入了10万行的Excel,有个小伙伴在评论区问我,如何保证插入后数据的一致性呢?

很简单,通过对比Excel文件行数和入库数量是否相等即可。

那么,如何获取异步线程的返回值呢?

在这里插入图片描述

二、通过Future获取异步返回值

我们可以通过给异步方法添加Future返回值的方式获取结果。

FutureTask 除了实现 Future 接口外,还实现了 Runnable 接口。因此,FutureTask 可以交给 Executor 执行,也可以由调用线程直接执行FutureTask.run()。

1、FutureTask 是基于 AbstractQueuedSynchronizer实现的

AbstractQueuedSynchronizer简称AQS,它是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及 维护被阻塞线程的队列。
基于 AQS 实现的同步器包括: ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。

基于 AQS实现的同步器包含两种操作:

  1. acquire,阻塞调用线程,直到AQS的状态允许这个线程继续执行,在FutureTask中,get()就是这个方法;
  2. release,改变AQS的状态,使state变为非阻塞状态,在FutureTask中,可以通过run()和cancel()实现。

2、FutureTask执行流程

在这里插入图片描述

  1. 执行@Async异步方法;
  2. 建立新线程async-executor-X,执行Runnable的run()方法,(FutureTask实现RunnableFuture,RunnableFuture实现Runnable);
  3. 判断状态state;
    • 如果未新建或者不处于AQS,直接返回;
    • 否则进入COMPLETING状态,执行异步线程代码;
  4. 如果执行cancel()方法改变AQS的状态时,会唤醒AQS等待队列中的第一个线程线程async-executor-1;
  5. 线程async-executor-1被唤醒后
    • 将自己从AQS队列中移除;
    • 然后唤醒next线程async-executor-2;
    • 改变线程async-executor-1的state;
    • 等待get()线程取值。
  6. next等待线程被唤醒后,循环线程async-executor-1的步骤
    • 被唤醒
    • 从AQS队列中移除
    • 唤醒next线程
    • 改变异步线程状态
  7. 新建线程async-executor-N,监听异步方法的state
    • 如果处于EXCEPTIONAL以上状态,抛出异常;
    • 如果处于COMPLETING状态,加入AQS队列等待;
    • 如果处于NORMAL状态,返回结果;

3、get()方法执行流程

get()方法通过判断状态state观测异步线程是否已结束,如果结束直接将结果返回,否则会将等待节点扔进等待队列自旋,阻塞住线程。

自旋直至异步线程执行完毕,获取另一边的线程计算出结果或取消后,将等待队列里的所有节点依次唤醒并移除队列。

在这里插入图片描述

  1. 如果state小于等于COMPLETING,表示任务还在执行中;
    • 如果线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常;
    • 如果state大于COMPLETING;
      • 如果已有等待节点WaitNode,将线程置空;
      • 返回当前状态;
    • 如果任务正在执行,让出时间片;
    • 如果还未构造等待节点,则new一个新的等待节点;
    • 如果未入队列,CAS尝试入队;
    • 如果有超时时间参数;
      • 计算超时时间;
      • 如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态state;
      • 阻塞队列nanos毫秒。
    • 否则阻塞队列;
  2. 如果state大于COMPLETING;
    • 如果执行完毕,返回结果;
    • 如果大于等于取消状态,则抛出异常。

很多小朋友对读源码,嗤之以鼻,工作3年、5年,还是没认真读过任何源码,觉得读了也没啥用,或者读了也看不懂~

其实,只要把源码的执行流程通过画图的形式呈现出来,你就会幡然醒悟,原来是这样的~

简而言之:

1. 如果异步线程还没执行完,则进入CAS自旋;
2. 其它线程获取结果或取消后,重新唤醒CAS队列中等待的线程;
3. 再通过get()判断状态state;
4. 直至返回结果或(取消、超时、异常)为止。

三、FutureTask源码具体分析

1、FutureTask源码

通过定义整形状态值,判断state大小,这个思想很有意思,值得学习。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
public class FutureTask<V> implements RunnableFuture<V> {

	// 最初始的状态是new 新建状态
	private volatile int state;
    private static final int NEW          = 0; // 新建状态
    private static final int COMPLETING   = 1; // 完成中
    private static final int NORMAL       = 2; // 正常执行完
    private static final int EXCEPTIONAL  = 3; // 异常
    private static final int CANCELLED    = 4; // 取消
    private static final int INTERRUPTING = 5; // 正在中断
    private static final int INTERRUPTED  = 6; // 已中断

	public V get() throws InterruptedException, ExecutionException {
	    int s = state;
	    // 任务还在执行中
	    if (s <= COMPLETING)
	        s = awaitDone(false, 0L);
	    return report(s);
	}
	
	private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
        	// 线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 任务已执行完毕或取消
            if (s > COMPLETING) {
            	// 如果已有等待节点WaitNode,将线程置空
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 任务正在执行,让出时间片
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // 还未构造等待节点,则new一个新的等待节点
            else if (q == null)
                q = new WaitNode();
            // 未入队列,CAS尝试入队
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 如果有超时时间参数
            else if (timed) {
            	// 计算超时时间
                nanos = deadline - System.nanoTime();
                // 如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态state
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // 阻塞队列nanos毫秒
                LockSupport.parkNanos(this, nanos);
            }
            else
            	// 阻塞队列
                LockSupport.park(this);
        }
    }
    
	private V report(int s) throws ExecutionException {
		// 获取outcome中记录的返回结果
        Object x = outcome;
        // 如果执行完毕,返回结果
        if (s == NORMAL)
            return (V)x;
            // 如果大于等于取消状态,则抛出异常
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
}
2、将异步方法的返回值改为Future<Integer>,将返回值放到new AsyncResult<>();中;
@Async("async-executor")
public void readXls(String filePath, String filename) {
    try {
    	// 此代码为简化关键性代码
        List<Future<Integer>> futureList = new ArrayList<>();
        for (int time = 0; time < times; time++) {
            Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
            futureList.add(sumFuture);
        }
    }catch (Exception e){
        logger.error("readXlsCacheAsync---插入数据异常:",e);
    }
}
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {
    try {
        // 此代码为简化关键性代码
        return new AsyncResult<>(sum);
    }catch (Exception e){
        return new AsyncResult<>(0);
    }
}

3、通过Future<Integer>.get()获取返回值:

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) throws Exception{
    int[] futureSumArr = new int[futureList.size()];
    for (int i = 0;i<futureList.size();i++) {
        try {
            Future<Integer> future = futureList.get(i);
            while (true) {
                if (future.isDone() && !future.isCancelled()) {
                    Integer futureSum = future.get();
                    logger.info("获取Future返回值成功"+"----Future:" + future
                            + ",Result:" + futureSum);
                    futureSumArr[i] += futureSum;
                    break;
                } else {
                    logger.info("Future正在执行---获取Future返回值中---等待3秒");
                    Thread.sleep(3000);
                }
            }
        } catch (Exception e) {
            logger.error("获取Future返回值异常: ", e);
        }
    }
    
    boolean insertFlag = getInsertSum(futureSumArr, excelRow);
    logger.info("获取所有异步线程Future的返回值成功,Excel插入结果="+insertFlag);
    return insertFlag;
}

4、这里也可以通过新线程+Future获取Future返回值

不过感觉多此一举了,就当练习Future异步取返回值了~

public static Future<Boolean> getFutureResultThreadFuture(List<Future<Integer>> futureList, int excelRow) throws Exception {
    ExecutorService service = Executors.newSingleThreadExecutor();
    final boolean[] insertFlag = {false};
    service.execute(new Runnable() {
        public void run() {
            try {
                insertFlag[0] = getFutureResult(futureList, excelRow);
            } catch (Exception e) {
                logger.error("新线程+Future获取Future返回值异常: ", e);
                insertFlag[0] = false;
            }
        }
    });
    service.shutdown();
    return new AsyncResult<>(insertFlag[0]);
}

获取异步线程结果后,我们可以通过添加事务的方式,实现Excel入库操作的数据一致性。

但Future会造成主线程的阻塞,这个就很不友好了,有没有更优解呢?


在BUG中磨砺,在优化中成长

使用双异步后,从 191s 优化到 2s

增加索引 + 异步 + 不落地后,从 12h 优化到 15 min

使用懒加载 + 零拷贝后,程序的秒开率提升至99.99%

性能优化2.0,新增缓存后,程序的秒开率不升反降


🏆文章收录于:100天精通Java从入门到就业

全网最细Java零基础手把手入门教程,系列课程包括:Java基础、Java8新特性、Java集合、高并发、性能优化等,适合零基础和进阶提升的同学。

🏆哪吒多年工作总结:Java学习路线总结,搬砖工逆袭Java架构师

华为OD机试 2023B卷题库疯狂收录中,刷题点这里

刷的越多,抽中的概率越大,每一题都有详细的答题思路、详细的代码注释、样例测试,发现新题目,随时更新,全天CSDN在线答疑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1404456.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Databend 开源周报第 129 期

Databend 是一款现代云数仓。专为弹性和高效设计&#xff0c;为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务&#xff1a;https://app.databend.cn 。 Whats On In Databend 探索 Databend 本周新进展&#xff0c;遇到更贴近你心意的 Databend 。 支持标准流 标…

关于图像分割项目的可视化脚本

1. 前言 之前实现了目标检测和图像分类任务的可视化脚本&#xff0c;本章将最后一个分割任务的可视化脚本实现 效果展示如下&#xff1a; 代码会在当前目录保存展示好的图片&#xff0c;从左到右依次为&#xff0c;原图、mask图、mask覆盖在原图的掩膜图 关于目标检测的可视化…

x-cmd pkg | hurl - HTTP 请求处理工具

目录 简介首次用户功能特点竞品和相关作品进一步探索 简介 Hurl 是 HTTP 请求处理工具&#xff0c;支持使用简单的纯文本格式定义的 HTTP 请求。它的用途非常广泛&#xff0c;既可以用于获取数据&#xff0c;也可以用于测试HTTP会话。 它可以链式处理请求&#xff0c;捕获数值…

在Go中处理HTTPS请求:一场加密的舞蹈

嘿&#xff0c;Go语言的爱好者们&#xff0c;你们准备好跳一场加密的舞蹈了吗&#xff1f;今天&#xff0c;我们要一起探讨如何在Go中处理那些神秘的HTTPS请求。 首先&#xff0c;我们要明白HTTPS是什么。简单来说&#xff0c;HTTPS就是给HTTP穿上了一层"加密的外套"…

如何根据openai官网的FileID下载文件

我的chatgpt网站&#xff0c;哈哈&#xff1a; https://chat.xutongbao.top/ file-type的版本需要注意&#xff1a; "file-type": "^15.0.0", const FileType require(file-type)const assistantsDownloadFileOnAzure async (req, res) > {let { apiK…

抖音出的AI工具火了!自动生成抖音文案,一键脚本数字人成片!

一些结论 抖音即创是一个一站式的智能创意生产与管理平台。 视频创作: AI视频脚本、数字人、一键成片 图文创作: 商品卡、图文工具 直播创作: AI背景、AI文案 抖音即创目前处于公测&#xff0c;全部功能免费使用&#xff01; 抖音即创是什么&#xff1f; “抖音即创”是一…

复杂高层建筑环境多模态导航服务和引导管理机器人系统设计(预告)

课题基础 机器人工程ROS方向应用型本科毕业设计重点课题学生验收成果 将上面这篇所涉及的算法等应用到如下环境中。 Gazebo新环境AWS RoboMaker Hospital医院场景适用于ROS1和ROS2 高层可以简化为多层测试。最典型的就是两层及以上。 简介 随着城市化进程的加速和高层建筑…

08-微服务Seata分布式事务使用

一、分布式事务简介 1.1 概念 事务ACID&#xff1a; A&#xff08;Atomic&#xff09;&#xff1a;原子性&#xff0c;构成事务的所有操作&#xff0c;要么都执行完成&#xff0c;要么全部不执行&#xff0c;不可能出现部分成功部分失 败的情况。 C&#xff08;Consistency&…

软件设计师——法律法规(四)

&#x1f4d1;前言 本文主要是【法律法规】——软件设计师——法律法规的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是听风与他&#x1f947; ☁️博客首页&#xff1a;CSDN主页听风与他 &#x1f304…

encodeURI 和 encodeURIComponent

encodeURI 和 encodeURIComponent 是用来处理加密 decodeURI 和 decodeURIComponent 是用来处理解密 encodeURI 和encodeURIComponent 区别&#xff1a; 唯一区别就是编码的字符范围 encodeURI方法不会对下列字符编码 ASCII字母 数字 ~!#$&():/,;?’ encodeURIComponent方…

电脑录屏软件大比拼,哪个最适合你?

现如今&#xff0c;电脑录屏软件成为了许多用户记录、分享和教学的重要工具。从游戏玩家到专业制作人员&#xff0c;都需要高效的录屏软件。本文将介绍三款优秀的电脑录屏软件&#xff0c;通过详细的步骤和简洁的介绍&#xff0c;帮助用户轻松掌握这些工具的使用方法。 电脑录屏…

基于springboot+vue的台球管理系统

摘要 台球管理系统是一款基于Spring Boot和Vue.js技术栈构建的现代化系统&#xff0c;旨在提供全面而高效的台球场馆管理服务。该系统通过整合前后端技术&#xff0c;实现了场馆预约、会员管理、比赛统计等核心功能&#xff0c;为台球场馆管理员和玩家提供了便捷、智能的管理和…

在铸造铸铁平台时应用工艺有哪些——河北北重

铸造铸铁平台时&#xff0c;常用的工艺包括砂型铸造、金属型铸造和连铸工艺。 砂型铸造&#xff1a;砂型铸造是最常用的铸造工艺之一&#xff0c;适用于中小型铸铁平台的生产。该工艺使用砂模具&#xff0c;将铁水倒入模具中&#xff0c;待冷却后取出成型。砂型铸造工艺成本较低…

Flutter 滚动布局:sliver模型

一、滚动布局 Flutter中可滚动布局基本都来自Sliver模型&#xff0c;原理和安卓传统UI的ListView、RecyclerView类似&#xff0c;滚动布局里面的每个子组件的样式往往是相同的&#xff0c;由于组件占用内存较大&#xff0c;所以在内存上我们可以缓存有限个组件&#xff0c;滚动…

说说你对选择排序的理解?如何实现?应用场景?

一、是什么 选择排序&#xff08;Selection sort&#xff09;是一种简单直观的排序算法&#xff0c;无论什么数据进去都是 O(n) 的时间复杂度&#xff0c;所以用到它的时候&#xff0c;数据规模越小越好 其基本思想是&#xff1a;首先在未排序的数列中找到最小(or最大)元素&a…

工业计算机应用——AGV自动导引车行业

工业计算机在AGV行业的应用 自动导引车(AGV)是现代物流系统中的重要组成部分,能够在无人操作的情况下自动完成货物的搬运和运输。随着工业自动化的发展,工业计算机在AGV行业的应用越来越广泛,为AGV系统的智能化和高效化提供了有力支持。 一、工业计算机在AGV行业的应用场…

ICC2:如何优化网表中的assign语法

更多学习内容请关注「拾陆楼」知识星球 拾陆楼知识星球入口 问题来自星球提问: ICC2中有两种解决方法: 1) set_app_options -name opt.port.eliminate_verilog_assign -value true 工具优化时自己插buffer解决 2) change_name -hier -rule verilog 需要注意的是: 第一个opti…

Git学习笔记(第6章):GitHub操作(远程库操作)

目录 6.1 远程库操作 6.1.1 创建远程库 6.1.2 命名远程库 6.1.3 本地库推送到远程库(push) 6.1.4 远程库拉取到本地库(pull) 6.1.5 远程库克隆到本地库(clone) 6.2 团队内协作 6.3 跨团队协作 6.4 SSH免密登录 6.1 远程库操作 命令 作用 git remote -v 查看所有远程…

《吐血整理》进阶系列教程-拿捏Fiddler抓包教程(16)-Fiddler如何充当第三者,再识AutoResponder标签-上篇

1.简介 Fiddler充当第三者&#xff0c;主要是通过AutoResponder标签在客户端和服务端之间&#xff0c;Fiddler抓包&#xff0c;然后改包&#xff0c;最后发送。AutoResponder这个功能可以算的上是Fiddler最实用的功能&#xff0c;可以让我们修改服务器端返回的数据&#xff0c…

MySQL-SQL-DCL

DCL-介绍 DCL-管理用户 1、查询用户 2、创建用户 3、修改用户密码 4、删除用户 注意&#xff1a; DCL-权限控制 1、查询权限 2、授予权限 授予全部权限 3、撤销权限 注意&#xff1a;