Spring Boot使用Disruptor做内部高性能消息队列

news2025/1/16 8:01:33

 ​

博客主页:     南来_北往

系列专栏:Spring Boot实战


背景

在现代应用开发中,特别是在构建高并发、低延迟的系统时,内部高性能消息队列的作用变得尤为重要。内部高性能消息队列,如Disruptor,为应用提供了一种高效、可靠的数据处理机制,以支持快速水平扩容,保证一致性和可用性,同时优化性能。以下是使用内部高性能消息队列的具体原因:

  1. 解耦和异步处理
    • 消息队列使得生产者和消费者之间的操作解耦,增强系统的可扩展性。
    • 通过将任务发送到消息队列,可以让生产者继续其他操作,而不需等待消费者完成处理,提高了系统的响应速度。
  2. 提高系统吞吐量和性能
    • 内部高性能消息队列通过优化数据结构和算法,减少锁的竞争,提高系统的吞吐量。
    • 例如,Disruptor使用环形缓冲区(RingBuffer)和多线程技术,有效提升消息处理速度。
  3. 保证消息的可靠性和一致性
    • 消息队列通过ACK机制、幂等性设计等方式确保消息能可靠地传输和处理。
    • 在分布式环境下,通过主从同步或异步复制等技术保证数据一致性。
  4. 错峰流控与广播
    • 消息队列可以作为缓冲区,平衡上下游系统的处理能力差异,避免高峰时刻的负载冲击。
    • 支持发布订阅模式,使得多个服务可以同时消费同一消息,实现灵活的消息分发。
  5. 容灾与高可用
    • 通过节点动态增删和消息持久化,消息队列能够提供容灾能力,增强系统的可用性。
    • 在单点故障或系统宕机的情况下,仍能保证消息不丢失,系统恢复后可继续处理。
  6. 灵活的处理策略
    • 支持FIFO、优先级等消息处理策略,满足不同业务需求。
    • 可通过调整配置实现不同的消息投递保证,如至少投递一次或仅投递一次。
  7. 系统的监控和管理
    • 消息队列提供了监控和管理的工具,帮助开发者追踪系统的运行状态,及时发现并处理问题。

总之,内部高性能消息队列在现代系统设计中发挥着关键作用。通过解耦、加速数据处理、提高系统可靠性和灵活性,它不仅提升了系统的整体性能,也简化了系统的设计和维护。对于需要处理大量数据、要求高性能和高可用性的应用场景,使用内部高性能消息队列是一个理想的选择。

Disruptor介绍

Disruptor是一个高性能、低延迟的消息传递框架,由英国外汇交易公司LMAX开发,旨在解决内存队列的延迟问题,实现高吞吐量和低延迟的数据交换

Disruptor之所以在性能上表现突出,主要得益于其独特的设计哲学和底层实现。传统的并发队列(如BlockingQueue)虽然简单易用,但在处理大量并发数据时,性能往往不尽如人意。Disruptor则通过一系列创新的技术手段,极大地提高了并发处理的效率。例如,它使用一种称为“Ring Buffer”的环形数据结构来高效地在生产者和消费者之间传递数据,避免了动态内存分配的性能损耗。同时,Disruptor采用了无锁设计和预分配数据的策略,减少了线程阻塞和数据传递的开销。

Disruptor的github主页:https://github.com/LMAX-Exchange/disruptor

Disruptor 的核心概念 

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。 

1. Ring Buffer 

如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。 

2. Sequence Disruptor 

通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。

(注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。

3. Sequencer

Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

4. Sequence Barrier

用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

5. Wait Strategy

定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)

6. Event

在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

7. EventProcessor

EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。

8. EventHandler

Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。

9. Producer

即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

案例-demo

要在Spring Boot中使用Disruptor作为内部高性能消息队列,你需要按照以下步骤操作: 

在你的pom.xml文件中添加Disruptor的依赖: 

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

 创建一个事件类,用于在Disruptor中传递数据。例如,创建一个名为MyEvent的事件类:

public class MyEvent {
    private String message;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

 创建一个事件工厂,用于生成事件对象。例如,创建一个名为MyEventFactory的事件工厂:

import com.lmax.disruptor.EventFactory;

public class MyEventFactory implements EventFactory<MyEvent> {
    @Override
    public MyEvent newInstance() {
        return new MyEvent();
    }
}

 创建一个事件处理器,用于处理事件。例如,创建一个名为MyEventHandler的事件处理器:

import com.lmax.disruptor.EventHandler;

public class MyEventHandler implements EventHandler<MyEvent> {
    @Override
    public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("Event: " + event.getMessage());
    }
}

 在你的Spring Boot应用中配置Disruptor。例如,在一个名为DisruptorConfig的配置类中进行配置:

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executors;

@Configuration
public class DisruptorConfig {
    @Bean
    public Disruptor<MyEvent> disruptor() {
        MyEventFactory factory = new MyEventFactory();
        int bufferSize = 1024; // 设置缓冲区大小
        Disruptor<MyEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new BlockingWaitStrategy());
        disruptor.handleEventsWith(new MyEventHandler());
        disruptor.start();
        return disruptor;
    }
}

 现在你可以在你的Spring Boot应用中使用Disruptor发送消息了。例如,在一个名为DisruptorService的服务类中发送消息:

import com.lmax.disruptor.RingBuffer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class DisruptorService {
    @Autowired
    private RingBuffer<MyEvent> ringBuffer;

    public void sendMessage(String message) {
        long sequence = ringBuffer.tryNext(); // 请求下一个事件序号
        try {
            MyEvent event = ringBuffer.get(sequence); // 获取该序号对应的事件对象
            event.setMessage(message); // 设置事件内容
        } finally {
            ringBuffer.publish(sequence); // 发布事件
        }
    }
}

 最后,你可以在你的应用中调用DisruptorServicesendMessage方法来发送消息。例如,在一个控制器类中调用该方法:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MyController {
    @Autowired
    private DisruptorService disruptorService;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("message") String message) {
        disruptorService.sendMessage(message);
        return "Message sent: " + message;
    }
}

现在,当你访问/send?message=Hello时,Disruptor将接收到消息并处理它。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1968114.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot使用jdk生成自签名证书

1.背景 许多生产中服务端应用系统需要ssl认证&#xff0c;走https访问&#xff0c;以满足等保要求。 有些前后端一体的项目工程&#xff0c;完全可以用jdk生成证书&#xff08;本章节介绍此&#xff09;&#xff1b; 若是前后端分离&#xff0c;使用nginx代理部署的&#xf…

Selenium怎么进行自动化测试?8年老鸟的我是这样做的...

自动化测试是软件测试过程中的重要一环&#xff0c;它可以帮助我们提高测试效率、减少重复工作&#xff0c;同时还可以提升测试的准确性。Selenium是一个广泛使用的自动化测试工具&#xff0c;它可以模拟用户在网页上的操作&#xff0c;比如点击、输入、检查元素等。 本文将从…

tomcat配置(java环境配置)

继昨天上线商城系统 [rootstaticserver eleme_web]# cd /usr/local/nginx/conf [rootstaticserver conf]# ls fastcgi.conf koi-utf nginx.conf scgi_params.default fastcgi.conf.default koi-win nginx.conf.bak uwsgi…

5G CPE SC100:5G时代的旗舰级无线路由器

作为星创易联CPE无线路由器SC100的产品经理,我很高兴能够与大家分享这款产品的特点和使用体验。经过我们团队的不懈努力,SC100终于面世,它集多项领先技术于一身,定位高端市场,希望能给用户带来极致的上网体验。下面就让我从硬件规格、无线性能、接口丰富程度、指示灯设计、便携…

简单洗牌算法

&#x1f389;欢迎大家收看&#xff0c;请多多支持&#x1f339; &#x1f970;关注小哇&#xff0c;和我一起成长&#x1f680;个人主页&#x1f680; ⭐目前主更 专栏Java ⭐数据结构 ⭐已更专栏有C语言、计算机网络⭐ 在学习了ArrayList之后&#xff0c;我们可以通过写一个洗…

老百姓:药房“难自医”

股价连创历史新低&#xff0c;董事长又被留置&#xff0c;药房“难自医”。今天我们聊聊正处在风口浪尖的——老百姓。 昨晚&#xff0c;老百姓大药房公告称&#xff0c; 收到公司实际控制人、董事长谢子龙于7月28日被湖南省监委留置、立案调查的通知。随后还是经典环节&#…

pytorch与cuda与TensorRT的版本选择

VScode版本 linux最新版本的vscode&#xff0c;可能无法进行python的调试 选择下载1.85 https://code.visualstudio.com/updates/v1_85 CUDA版本 https://developer.nvidia.com/Cuda-Toolkit-archive 由于受限于TRT的8.6&#xff08;下面会说明&#xff09;&#xff0c;所以…

求值(河南萌新2024)

我真的服了&#xff0c;注意数据范围&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#…

秋招突击——7/29——复习{有塔游戏——关联传递性}——新作{随机链表的复制、合并K个升序链表,二叉树——二叉树的中序遍历、二叉树的最大深度、反转二叉树}

文章目录 引言复习有塔游戏——关联传递性实现复习实现参考实现 新作随机链表的复制个人实现参考实现 排序链表个人实现参考实现 二叉树章节二叉树的中序遍历个人实现 二叉树的最大深度个人实现参考实现 反转二叉树个人实现参考实现 总结 引言 旅游完回来了&#xff0c;今天继…

SSM流浪狗信息管理系统-计算机毕业设计源码07154

目录 1 绪论 1.1 研究背景和意义 1.2国内外研究现状 1.3论文结构与章节安排 2 系统分析 2.1 可行性分析 2.1.1 技术可行性分析 2.1.2经济可行性分析 2.1.3操作可行性分析 2.2 系统功能分析 2.2.1 功能性分析 2.2.2 非功能性分析 2.3 系统用例分析 2.4 系统流程分析…

vmware上,虚机经常丢失网卡。导致无法上网。

1、winR 输入 services.msc 2、重启这两个服务。 VMware NAT service和VMware DHCP service

【Rust日报】query.rs - 搜索Rust的一切

rucola - 在终端中管理你的markdown文档 很酷的一个终端软件。 query.rs - 搜索Rust的一切 https://query.rs/ 文档&#xff0c;crate&#xff0c;错误代码等等。 就是Rust生态的google. 使用Rust 10年后&#xff0c;我想吐槽几句 这篇文章分享了作者在使用Rust编程语言中的经验…

“postman请求JSON格式,Body内数据无法被idea后端接收,值为null“问题的解决方式

问题描述&#xff1a; 传递数据一切正常&#xff0c;但是&#xff1a; 原因剖析&#xff1a; 这是因为我们实体类里面属性的命名格式不符合驼峰命名&#xff0c;比如我这种“大写字母开头如CAD”/“一个小写字母一个大写字母如aDddddd”都不行。 解决方法: 方法1&#xff1a…

LangChain大模型应用开发指南-大模型Memory不止于对话

上节课&#xff0c;我我为您介绍了LangChain中最基本的链式结构&#xff0c;以及基于这个链式结构演化出来的ReAct对话链模型。 今天我将由简入繁&#xff0c;为大家拆解LangChain内置的多种记忆机制。本教程将详细介绍这些记忆组件的工作原理、特性以及使用方法。 【一一AGI大…

二叉搜索树的第 k 大的节点

题目描述 给定一棵二叉搜索树&#xff0c;请找出其中第 k 大的节点。 解题基本知识 二叉搜索树&#xff08;Binary Search Tree&#xff09;又名二叉查找树、二叉排序树。它是一棵空树&#xff0c;或者是具有下列性质的二叉树&#xff1a; 若它的左子树不空&#xff0c;则左子…

C语言第九天笔记

数组的概念 什 么是数组 数组是 相同类型&#xff0c; 有序数据的集合。 数 组的特征 数组中的数据被称为数组的 元素&#xff0c;是同构的 数组中的元素存放在内存空间里 (char player_name[6]&#xff1a;申请在内存中开辟6块连续的基于char类 型的变量空间) 衍生概念&…

数据安全、信息安全、网络安全区别与联系

关键字&#xff1a; 信息安全 数据安全 网络安全 [导读] 让人更好理解 “数据安全”、“信息安全”、“网络安全” 三者间的区别与联系了&#xff0c;我们汇总了官方机构给这三者的定义&#xff0c;并且网友也给出了自己的看法&#xff0c;一起来看看。 在 “互联网 ” 被广…

tomcat10环境的搭建及发布一个动态服务

Day 15 # 构建项目 [rootstatic-server eleme_web]# npm run build # 将静态的项目移动到nginx [rootstatic-server eleme_web]# ls [rootstatic-server eleme_web]# ls dist/ css favicon.ico index.html js [rootstatic-server eleme_web]# vim dist/index.html [r…

数据虚拟化和传统数据集成方式(如 ETL)有何区别?

要理解数据虚拟化&#xff0c;我们先说一下什么是虚拟化&#xff08;Virtualization&#xff09;&#xff1f; 所谓虚拟化&#xff08;Virtualization&#xff09;&#xff0c;通常指的是对 IT 资源的抽象&#xff0c;它屏蔽了这些资源的物理性质和边界。IT 资源可以是服务器、…