SpringBoot + Disruptor 实现特快高并发处理

news2024/11/15 20:55:27

使用Disruptor做消息队列,解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)
【基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单】

核心概念:

  • Ring Buffer 环形的缓冲区,从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
  • Sequence Disruptor 通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。
  • Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
  • 用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
  • 定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
  • 在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
  • EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
  • EventHandler 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
  • Producer即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

在这里插入图片描述

一、依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

二、消息体Model

/**
 * 消息体
 */
@Data
public class MessageModel {
    private String message;
}

三、构造EventFactory

public class HelloEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
        return new MessageModel();
    }
}

四、构造EventHandler-消费者

@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
    @Override
    public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
        try {
            //这里停止1000ms是为了确定消费消息是异步的
            Thread.sleep(1000);
            log.info("消费者处理消息开始");
            if (event != null) {
                log.info("消费者消费的信息是:{}",event);
            }
        } catch (Exception e) {
            log.info("消费者处理消息失败");
        }
        log.info("消费者处理消息结束");
    }
}

五、构造BeanManager

/**
 * 获取实例化对象
 */
@Component
public class BeanManager implements ApplicationContextAware {

    private static ApplicationContext applicationContext = null;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() { return applicationContext; }

    public static Object getBean(String name) {
        return applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> clazz) {
        return applicationContext.getBean(clazz);
    }
}

六、构造MQManager

@Configuration
public class MQManager {

    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
        ExecutorService executor = Executors.newFixedThreadPool(2);

        //指定事件工厂
        HelloEventFactory factory = new HelloEventFactory();

        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;

        //单线程模式,获取额外的性能
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
                ProducerType.SINGLE, new BlockingWaitStrategy());

        //设置事件业务处理器---消费者
        disruptor.handleEventsWith(new HelloEventHandler());

        // 启动disruptor线程
        disruptor.start();

        //获取ringbuffer环,用于接取生产者生产的事件
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }
}

七、构造Mqservice和实现类-生产者

public interface DisruptorMqService {

    /**
     * 消息
     * @param message
     */
    void sayHelloMq(String message);
}
@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {

    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;


    @Override
    public void sayHelloMq(String message) {
        log.info("record the message: {}",message);
        //获取下一个Event槽的下标
        long sequence = messageModelRingBuffer.next();
        try {
            //给Event填充数据
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("往消息队列中添加消息:{}", event);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
        } finally {
            //发布Event,激活观察者去消费,将sequence传递给改消费者
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }
    }
}

八、构造测试类及方法

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class DemoApplicationTests {

    @Autowired
    private DisruptorMqService disruptorMqService;
    /**
     * 项目内部使用Disruptor做消息队列
     * @throws Exception
     */
    @Test
    public void sayHelloMqTest() throws Exception{
        disruptorMqService.sayHelloMq("消息到了,Hello world!");
        log.info("消息队列已发送完毕");
        //这里停止2000ms是为了确定是处理消息是异步的
        Thread.sleep(2000);
    }
}
2020-04-05 14:31:18.543  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : record the message: 消息到了,Hello world!
2020-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : 往消息队列中添加消息:MessageModel(message=消息到了,Hello world!)
2020-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.utils.demo.DemoApplicationTests      : 消息队列已发送完毕
2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者处理消息开始
2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者消费的信息是:MessageModel(message=消息到了,Hello world!)
2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者处理消息结束

其实 生成者 -> 消费者 模式是很常见的,通过一些消息队列也可以轻松做到上述的效果。不同的地方在于,Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1489752.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SQL 查询一张卡的最新使用记录

SQL 查询一张卡的最新使用记录 1. 问题描述 1. 问题描述 一张卡&#xff0c;有一个底表记录这个卡的基本信息&#xff0c;还有一个使用卡的记录表&#xff0c;记录着&#xff0c;这张卡的使用记录&#xff0c;但我们要获取这张卡的最新使用记录&#xff0c;该如何写SQL呢&…

【Linux命令】fuser

fuser 使用文件或文件结构识别进程。 详细 fuser命令用于报告进程使用的文件和网络套接字。fuser命令列出了本地进程的进程号&#xff0c;哪些本地进程使用file&#xff0c;参数指定的本地或远程文件。 每个进程号后面都跟随一个字母&#xff0c;该字母指示进程如何使用该文…

Python实现CCI工具判断信号:股票技术分析的工具系列(5)

Python实现CCI工具判断信号&#xff1a;股票技术分析的工具系列&#xff08;5&#xff09; 介绍算法解释 代码rolling函数介绍完整代码data代码CCI.py 介绍 在股票技术分析中&#xff0c;CCI (商品路径指标&#xff09;是一种常用的技术指标&#xff0c;用于衡量股价是否处于超…

MATLAB知识点:使用for循环时需要注意的事项

​讲解视频&#xff1a;可以在bilibili搜索《MATLAB教程新手入门篇——数学建模清风主讲》。​ MATLAB教程新手入门篇&#xff08;数学建模清风主讲&#xff0c;适合零基础同学观看&#xff09;_哔哩哔哩_bilibili 节选自​第4章&#xff1a;MATLAB程序流程控制 在使用for循环…

HarmonyOS—HAP唯一性校验逻辑

HAP是应用安装的基本单位&#xff0c;在DevEco Studio工程目录中&#xff0c;一个HAP对应一个Module。应用打包时&#xff0c;每个Module生成一个.hap文件。 应用如果包含多个Module&#xff0c;在应用市场上架时&#xff0c;会将多个.hap文件打包成一个.app文件&#xff08;称…

第 125 场 LeetCode 双周赛题解

A 超过阈值的最少操作数 I 排序然后查找第一个大于等于 k 的元素所在的位置 class Solution { public:int minOperations(vector<int> &nums, int k) {sort(nums.begin(), nums.end());return lower_bound(nums.begin(), nums.end(), k) - nums.begin();} };B 超过阈…

数据结构(一)综述

一、常见的数据结构 数据结构优点缺点数组查找快增删慢链表增删快查找慢哈希表增删、查找都快数据散列&#xff0c;对存储空间有浪费栈顶部元素插入和取出快除顶部元素外&#xff0c;存取其他元素都很慢队列顶部元素取出和尾部元素插入快存取其他元素都很慢二叉树增删、查找都快…

自学高效备考2025年AMC8数学竞赛:2000-2024年AMC8真题解析

今天继续来随机看五道AMC8的真题和解析&#xff0c;根据实践经验&#xff0c;对于想了解或者加AMC8美国数学竞赛的孩子来说&#xff0c;吃透AMC8历年真题是备考最科学、最有效的方法之一。即使不参加AMC8竞赛&#xff0c;吃透了历年真题600道和背后的知识体系&#xff0c;那么小…

深入理解Tomcat

目录&#xff1a; TomcatTomcat简介如何下载tomcatTomcat工作原理Tomcat架构图Tomcat组件Server组件Service组件Connector组件Engine组件Host组件Context组件 配置虚拟主机(Host)配置Context Tomcat Tomcat简介 Tomcat服务器是Apache的一个开源免费的Web容器。它实现了JavaEE…

计算机网络-物理层-传输媒体

传输媒体的分类 导向型-同轴电缆 导向型-双绞线 导向型-光纤 非导向型

卡密交易系统 卡密社区SUP系统源码 分销系统平台 分销商城系统开发

卡密社区SUP系统总控源码主站分销系统功能源码 跟以前的卡盟那种控制端差不多总控可以给别人开通&#xff0c;分销&#xff0c;主站&#xff0c;类似自己做系统商一样&#xff0c;自助发卡&#xff0c;卡密交易系统。 搭建环境Nginx1.22 mysql 5.7 php8.1 rids 7.2 安装方法…

避坑——Matlab c# 联合编程——Native

相同的库&#xff0c;Matlab生成供.net调用的库时会有两套&#xff0c;也就是Native&#xff08;本地&#xff09;&#xff0c;两套库各有优缺点&#xff0c;这这里就不说了&#xff0c;可以翻看网上其他博文 主要是MWStructArray&#xff0c;MWArray等数据交换对象有两套&…

C语言:qsort的使用方法

目录 1. qsort是什么&#xff1f; 2. 为什么要使用qsort 3. qsort的使用 3.1 qsort的返回值和参数 3.2 qsort的compare函数参数 3.3 int类型数组的qsort完整代码 4. qsort完整代码 1. qsort是什么&#xff1f; qsort中的q在英语中是quick&#xff0c;快速的意思了&#…

Platformview在iOS与Android上的实现方式对比

Android中早期版本Platformview的实现基于Virtual Display。VirtualDisplay方案的原理是&#xff0c;先将Native View绘制到虚显&#xff0c;然后Flutter通过从虚显输出中获取纹理并将其与自己内部的widget树进行合成&#xff0c;最后作为Flutter在 Android 上更大的纹理输出的…

【经验】f-string 的一些点

【经验】f-string 的一些点 省几个字别数错了对齐它现在几点 省几个字 让 f-string 给你写表达式&#xff0c;在 f-string 中使用 来自动打印表达式 a 10 b 25 print(f"{a b }") >>> a b 35别数错了 对于过大的数字难以一眼看出来它的数量级&#xf…

Access AR Foundation 5.1 in Unity 2022

如果已经下载安装了ARF但版本是5.0.7 可以通过下面的方式修改 修改后面的数字会自动更新 更新完成后查看版本 官方文档 Access AR Foundation 5.1 in Unity 2021 | AR Foundation | 5.1.2

93. 通用防重幂等设计

文章目录 一、防重与幂等的区别二、幂等性的应用场景三、幂等性与防重关系四、处理流程 一、防重与幂等的区别 防重与幂等是在 Web 应用程序和分布式系统中重要而又非常常见的问题。 防重 防重是指在多次提交同样的请求过程中&#xff0c;系统会检测和消除重复的数据&#xf…

网络安全: Kali Linux 使用 docker-compose 部署 openvas

目录 一、实验 1.环境 2.Kali Linux 安装docker与docker-compose 3.Kali Linux 使用docker-compose方式部署 openvas 4. KaliLinux 使用openvas 二、问题 1. 信息安全漏洞库 2.信息安全漏洞共享平台 3.Windows 更新指南与查询 4.CVE 查询 5.docker-compose 如何修改o…

适配Ollama的前端界面Open WebUI

在前文 本地大模型运行框架Ollama 中&#xff0c;老苏留了个尾巴&#xff0c;限于篇幅只是提了一下 Open WebUI&#xff0c;有网友留言说自己安装没搞定&#xff0c;今天我们来补上 文章传送门&#xff1a;本地大模型运行框架Ollama 什么是 Open WebUI &#xff1f; Open WebUI…

深度学习_19_卷积

理论&#xff1a; 目前问题在于识别图片所需要的模型权重数量会比较大 一般图片像素在12M也就是一千两百万像素&#xff0c;要用模型对其整体识别的话&#xff0c;需要至少一千两百万权重&#xff0c;那也仅仅是线性模型&#xff0c;若用多层感知机的话&#xff0c;模型的数据…