SpringBoot + Disruptor 实现特快高并发处理,支撑每秒 600 万订单无压力!

news2025/1/16 4:42:28

背景

工作中遇到项目使用Disruptor做消息队列,对你没看错,不是Kafka也不是rabbitmq。Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录。

Disruptor介绍

Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。

基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注。

Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。

从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。

Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升。

其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案。

Disruptor的github主页:https://github.com/LMAX-Exchange/disruptor

Disruptor 的核心概念

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。
1. Ring Buffer
如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
2. Sequence Disruptor
通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。

虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。

注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述。
3. Sequencer

Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
4. Sequence Barrier
用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
5. Wait Strategy
定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
6. Event
在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
7. EventProcessor
EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
8. EventHandler
Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
9. Producer
即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
在这里插入图片描述

案例-demo

通过下面8个步骤,你就能将Disruptor Get回家啦:

1、添加pom.xml依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

2、消息体Model


/**
 * 消息体
 */
@Data
public class MessageModel {
    private String message;
}

3、构造EventFactory

public class HelloEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
        return new MessageModel();
    }
}

4、构造EventHandler-消费者

@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
    @Override
    public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
        try {
            //这里停止1000ms是为了确定消费消息是异步的
            Thread.sleep(1000);
            log.info("消费者处理消息开始");
            if (event != null) {
                log.info("消费者消费的信息是:{}",event);
            }
        } catch (Exception e) {
            log.info("消费者处理消息失败");
        }
        log.info("消费者处理消息结束");
    }
}

5、构造BeanManager

/**
 * 获取实例化对象
 */
@Component
public class BeanManager implements ApplicationContextAware {

    private static ApplicationContext applicationContext = null;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() { return applicationContext; }

    public static Object getBean(String name) {
        return applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> clazz) {
        return applicationContext.getBean(clazz);
    }
}

6、构造MQManager

@Configuration
public class MQManager {

    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
        ExecutorService executor = Executors.newFixedThreadPool(2);

        //指定事件工厂
        HelloEventFactory factory = new HelloEventFactory();

        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;

        //单线程模式,获取额外的性能
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
                ProducerType.SINGLE, new BlockingWaitStrategy());

        //设置事件业务处理器---消费者
        disruptor.handleEventsWith(new HelloEventHandler());

        // 启动disruptor线程
        disruptor.start();

        //获取ringbuffer环,用于接取生产者生产的事件
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }
}

7、构造Mqservice和实现类-生产者

public interface DisruptorMqService {

    /**
     * 消息
     * @param message
     */
    void sayHelloMq(String message);
}

@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {

    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;


    @Override
    public void sayHelloMq(String message) {
        log.info("record the message: {}",message);
        //获取下一个Event槽的下标
        long sequence = messageModelRingBuffer.next();
        try {
            //给Event填充数据
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("往消息队列中添加消息:{}", event);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
        } finally {
            //发布Event,激活观察者去消费,将sequence传递给改消费者
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }
    }
}

8、构造测试类及方法


@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class DemoApplicationTests {

    @Autowired
    private DisruptorMqService disruptorMqService;
    /**
     * 项目内部使用Disruptor做消息队列
     * @throws Exception
     */
    @Test
    public void sayHelloMqTest() throws Exception{
        disruptorMqService.sayHelloMq("消息到了,Hello world!");
        log.info("消息队列已发送完毕");
        //这里停止2000ms是为了确定是处理消息是异步的
        Thread.sleep(2000);
    }
}

测试运行结果
5总结
其实 生成者 -> 消费者 模式是很常见的,通过一些消息队列也可以轻松做到上述的效果。不同的地方在于,Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1104641.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中国式复杂报表制作工具及技巧,解决90%效率问题

最大的数据杀手——中国式复杂报表 作为资料人&#xff0c;在日常生活和工作之中&#xff0c;我们是否经常被要求制作如下图所示的一些数据统计表格&#xff1a; 总的来看&#xff0c;很多人经常发现自己虽然有数据&#xff0c;却不知道用什么图表来进行数据最佳形式的价值表…

「我在淘天做技术」一篇文章告诉你商品团队在做哪些有意思的事?

作者:许令波(君山) 近期淘天集团秋季 2024 届校园招聘正式启动&#xff0c;预计将发放 2000 多个 offer&#xff0c;其中技术类岗位占比超过 50%。为了方便大家更真实地了解淘天技术的布局和现状&#xff0c;我们策划了「我在淘天做技术」系列&#xff0c;首次全面分享淘天技术…

【限时优惠】RHCE9.0培训考证-红帽官方授权中心

【微|信|公|众|号&#xff1a;厦门微思网络】 官网&#xff1a; www.xmws.cn 相信关注红帽认证的小伙伴都已经知道了&#xff1a;2022 年 5 月 18 日&#xff0c;红帽公司宣布推出红帽企业 Linux 9 (RHCE 9)&#xff0c;这是世界领先的企业 Linux 平台的最新版本。 特别提醒的是…

怎样成功部署CRM销售管理系统?

部署CRM销售管理系统可以是自上而下的落实&#xff0c;也可以自下而上让基层员工提出他们的建议&#xff0c;毕竟他们才是系统的使用者。成功部署CRM销售管理系统离不开以下几点要素&#xff1a; 1、全渠道沟通 在通讯技术发达的今天&#xff0c;人们可以在任何地方进行视频通…

Unity3D Shader新手入门教程:3D溶解与腐蚀特效详解

引言 在游戏开发中&#xff0c;特效是非常重要的一部分&#xff0c;它能够增加游戏的趣味性和可玩性。其中&#xff0c;Shader特效是一种非常常见和常用的特效&#xff0c;它能够通过改变物体表面的渲染方式来实现各种各样的特效效果。本文将详细介绍Unity3D中的Shader 3D溶解与…

华为云应用中间件DCS系列—Redis实现(电商网站)秒杀抢购示例

云服务、API、SDK&#xff0c;调试&#xff0c;查看&#xff0c;我都行 阅读短文您可以学习到&#xff1a;应用中间件系列之Redis实现&#xff08;电商网站&#xff09;秒杀抢购示例 1 什么是DEVKIT 华为云开发者插件&#xff08;Huawei Cloud Toolkit&#xff09;&…

游戏设计模式专栏(十二):在Cocos游戏开发中运用代理模式

点击上方亿元程序员关注和★星标 引言 大家好&#xff0c;我是亿元程序员&#xff0c;一位有着8年游戏行业经验的主程。 本系列是《和8年游戏主程一起学习设计模式》&#xff0c;让糟糕的代码在潜移默化中升华&#xff0c;欢迎大家关注分享收藏订阅。 代理模式&#xff08…

【Tomcat】为Tomcat服务配置本地Apr库以提升性能

关于 apr 和 apr-util 对 Tomcat 服务的性能提升的说明&#xff1a; 要测APR给tomcat带来的好处最好的方法是在慢速网络上&#xff08;模拟Internet&#xff09;&#xff0c;将Tomcat线程数开到300以上的水平&#xff0c;然后模拟一大堆并发请求。如果不配APR&#xff0c;基本…

el-pagination怎么修改样式,分页修改样式

/* 分页距离右边20&#xff0c;距离底边20 */ .pagination-container .el-pagination{position:absolute;right:20px;bottom:20px;} 自己写一个分页组件&#xff0c;用到绝对定位和相对定位

Cornerstone for Mac:高效SVN管理的黄金标准

在当今的软件开发领域&#xff0c;版本控制系统是不可或缺的一部分。其中&#xff0c;Subversion&#xff08;SVN&#xff09;是一个广泛使用的版本控制系统&#xff0c;有助于团队协同工作&#xff0c;实现代码的版本管理和追踪。对于Mac用户来说&#xff0c;Cornerstone是一款…

PLC寄存器基础知识

这篇博客介绍的内容其实是微机原理的相关知识&#xff0c;如果没有修过微机原理&#xff0c;可以找相关书籍看一看&#xff0c;众所知周PLC也是属于微控制器。下面我们看下西门子PLC常用的寄存器地址关系。 1、西门子寄存器地址关系 待续......

togaf入门介绍

TOGAF标准是一个开放的、行业共识的企业架构框架。 它是一个基础框架&#xff0c;这意味着它适用于任何环境下的任何类型的架构的开发。这一基础框架是由The Open Group TOGAF补充的库&#xff0c;该库是一个广泛和不断增长的指导材料组合&#xff0c;为在具体情况下应用TOGAF …

SAP S4 BAPI更新BP税号类型CN0自动覆盖CN5

BAPI更新BP税号类型CN0自动覆盖CN5 使用cl_md_bp_maintain>maintain更新BP税号CN0的数据&#xff0c;更新结果都会变成CN5类型&#xff0c;CN1类型一切正常。 1、BP税号 2、跟踪方法中代码 查看底层逻辑&#xff0c;发现CN0都被强制替换成CN5了&#xff0c;BP GUI界面还能…

MultiPlayerShoot----C++学习记录01打包测试项目

首先将多人游戏插件plug文件夹移至项目目录 打开config/DefaultEngine.ini&#xff08;5.0的虚幻引擎内容略不一样&#xff09;和Game.ini对里边的内容进行编辑。 DefaultEngine.ini [/Script/Engine.GameEngine] NetDriverDefinitions(DefName"GameNetDriver",Driv…

win 10怎么录屏?教你轻松捕捉屏幕活动

在当今科技快速发展的时代&#xff0c;录屏已成为信息分享、教学、游戏直播等方面的重要工具。无论是为了制作教程、分享游戏过程还是保存重要信息&#xff0c;录屏功能都发挥着举足轻重的作用。可是很多人不知道win 10怎么录屏&#xff0c;本文将详细介绍win10的三种常用录屏方…

物流监管:智慧仓储数据可视化监控平台

随着市场竞争加剧和市场需求的不断提高&#xff0c;企业亟需更加高效、智能且可靠的仓储物流管理方式&#xff0c;以提升企业的物流效率&#xff0c;减少其输出成本&#xff0c;有效应对市场上的变化和挑战。 图扑自研 HT for Web 产品搭建的 2D 智慧仓储可视化平台&#xff0c…

操作系统进程2---进程成员以及fork

在上一次我们认识了什么是进程以及进程在操作系统中是如何被管理的。今天我们来认识一下pcb中的成员Linux中我们可以使用ps命令中的ajx选项来输出当前系统中所有的进程 而我们就先从pid和ppid来入手。 文章目录 1.进程中的pid和ppid2.父进程与子进程的简单认识3.系统调用函数…

【2023集创赛】信诺达杯三等奖:关于LM386N-1音频功率放大器性能的测量指南

本文为2023年第七届全国大学生集成电路创新创业大赛&#xff08;“集创赛”&#xff09;信诺达杯全国三等奖作品分享&#xff0c;参加极术社区的【有奖征集】分享你的2023集创赛作品&#xff0c;秀出作品风采&#xff0c;分享2023集创赛作品扩大影响力&#xff0c;更有丰富电子…

牛客:FZ35 滑动窗口最小值

FZ35 滑动窗口最小值 文章目录 FZ35 滑动窗口最小值题目描述题解思路题解代码 题目描述 题解思路 遍历数组&#xff0c;然后遍历窗口找到最小值&#xff0c;加入到结果集里面 题解代码 func minSlidingWindow( nums []int , k int ) []int {// write code heren : len(nums…

基于内存的分布式NoSQL数据库Redis(五)数据存储与RDB设计

文章目录 知识点18&#xff1a;数据存储设计知识点19&#xff1a;Redis持久化&#xff1a;RDB设计知识点20&#xff1a;Redis持久化&#xff1a;RDB测试后记 知识点18&#xff1a;数据存储设计 目标&#xff1a;掌握常见数据存储的设计 实施 问题 数据存储如何保证数据安全&am…