springboot+disruptor再体验

news2024/9/20 16:41:20

Disruptor是一个高性能队列,常见的还有kafka、rabbitmq等,下面体验一下~

1、Disruptor简介
Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注。

其特点简单总结如下:

  • 开源的java框架,用于生产者-消费者场景;
  • 高吞吐量和低延迟;
  • 有界队列;

disruptor在github网址为:https://github.com/LMAX-Exchange/disruptor
在这里插入图片描述
2、Disruptor概念

  • Ring Buffer:环形的缓冲区,环形数组中的元素采用覆盖方式,避免了jvm的GC;
  • Sequence Disruptor:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理;
  • Sequencer:Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法;
  • Sequence Barrier:用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用;
  • Wait Strategy:定义 Consumer 如何进行等待下一个事件的策略;
  • Event:在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定;
  • EventProcessor:EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop);
  • EventHandler:定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现;
  • Producer:生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型;

3、springboot+disruptor实例

在pom.xml文件中添加依赖

		<dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.4</version>
        </dependency>

消息体Model

@Data
public class MessageModel {
    private String message;
}

构造EventFactory

public class HelloEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
        return new MessageModel();
    }
}

构造消费者

@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
    @Override
    public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
        try {
            //这里停止1000ms是为了确定消费消息是异步的
            Thread.sleep(1000);
            log.info("消费者处理消息开始");
            if (event != null) {
                log.info("消费者消费的信息是:{}",event);
            }
        } catch (Exception e) {
            log.info("消费者处理消息失败");
        }
        log.info("消费者处理消息结束");
    }
}

构造MQManager

@Configuration
public class MqManager {

    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
        ExecutorService executor = Executors.newFixedThreadPool(2);

        //指定事件工厂
        HelloEventFactory factory = new HelloEventFactory();

        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;

        //单线程模式,获取额外的性能
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());

        //设置事件业务处理器---消费者
        disruptor.handleEventsWith(new HelloEventHandler());

        //启动disruptor线程
        disruptor.start();

        //获取ringbuffer环,用于接取生产者生产的事件
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }

}

构造生产者

@Slf4j
@Component
public class HelloEventProducer {

    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;

    public void sayHelloMq(String message) {
        log.info("生产消息: {}",message);
        //获取下一个Event槽的下标
        long sequence = messageModelRingBuffer.next();
        try {
            //给Event填充数据
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("往消息队列中添加消息:{}", event);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
        } finally {
            //发布Event,激活观察者去消费,将sequence传递给改消费者
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }
    }
}

测试

	/**
     * 项目内部使用Disruptor做消息队列
     * @throws Exception
     */
    @Test
    public void sayHelloMqTest() throws Exception{
        helloEventProducer.sayHelloMq("Hello world!");
        log.info("消息队列已发送完毕");
        //这里停止2000ms是为了确定是处理消息是异步的
        Thread.sleep(2000);
    }

运行结果如下
在这里插入图片描述
4、小结
引用disruptor作为内部的高性能队列,应用于生产者-消费者模式中还是非常nice的,后面若有工程需求可以尝试一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/139070.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[C++]STL之string的模拟实现

上一章我们对string的常见接口及使用进行了讲解&#xff0c;接下来我们将对一些常见的接口&#xff0c;包括构造函数&#xff0c;析构函数&#xff0c;运算符重载等等进行模拟实现.方便我们理解string接口实现的原理. 在讲解之前先说一下string的成员变量. 首先是字符串内容_…

微信小程序picker组件遇到的问题以及解决办法

一、picker基本概念二、遇到的问题三、如何解决四、延伸五、效果图一、picker基本概念 先来看一下官方文档中picker的基本概念&#xff1a; 从底部弹起的滚动选择器&#xff0c;现支持三种选择器&#xff0c;通过mode来区分&#xff0c;分别是普通选择器&#xff0c;时间选择器…

Bochs下载安装

文章目录下载Bochs配置BochsBochs Bochs是一个x86硬件平台的开源模拟器。它可以模拟各种硬件的配置。Bochs模拟的是整个PC平台&#xff0c;包括I/O设备、内存和BIOS。更为有趣的是&#xff0c;甚至可以不使用PC硬件来运行Bochs。事实上&#xff0c;它可以在任何编译运行Bochs的…

【Unity3D编辑器扩展】Unity3D中实现Text的字体的替换

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址我的个人博客 大家好&#xff0c;我是佛系工程师☆恬静的小魔龙☆&#xff0c;不定时更新Unity开发技巧&#xff0c;觉得有用记得一键三连哦。 一、前言 在开发中会遇到要将场景中的Text的字体全部替换的情况。 所以…

NetInside网络分析帮您解决系统性能问题(一)

前言 某大学信息中心负责人表示&#xff0c;有用户反馈&#xff0c;在通过VPN访问某一IP的80端口时连接时断时续。同时信息中心给到的信息是通过VPN&#xff1a;XXX.XXX.253.5访问IP地址XXX.XXX.130.200的80端口出现访问时断时续问题。 需要通过分析系统看一下实际情况&#…

云原生周刊 | 人类、机器人与 Kubernetes

近日 Grafana 官网发表了一篇博客介绍了 2022 年比较有意思、脑洞大开的一些 Grafana 使用案例&#xff0c;比如监控特斯拉 Model 3 的充电状态、OTA 更新状况等等。 海事技术供应商 Royal IHC 利用 Grafana 展示客户船队的关键性能指标&#xff0c;例如燃料消耗、服务时间、大…

Allegro174版本新功能介绍之打开坐标超链接功能

Allegro174版本新功能介绍之打开坐标超链接功能 Allegro在升级到174的时候默认打开时,报表中的坐标是不带超链接的,如下图 直接点击坐标,是无法自动跳转到坐标所在位置的 但是Allegro174是开放了打开超链接的功能的,具体操作如下 选择Setup选择User Preferences

【 Vue3 + Vite + setup语法糖 + Pinia + VueRouter + Element Plus 第一篇】(持续更新中)

【 Vue3 Vite setup语法糖 Pinia VueRouter Element Plus 第一篇】(持续更新中) 1.使用 Vite脚手架创建 Vue3 项目 终端输入命令 npm create vite 项目名选择 Vue项目并回车根据自己的爱好&#xff0c;选择配置即可 2. 开启 Network 访问地址 npm run dev后 提示 use -…

磨金石教育||商业插画的发展现状如何?学习插画可以月入过万吗?

商业插画是什么&#xff1f;现如今&#xff0c;商业插画已经在生活中随处可见。你买的所有带包装的产品&#xff0c;上面的各种有趣的产品插图&#xff0c;就是插画师做的产品插画。特别是一些零食类的产品&#xff0c;在包装箱上&#xff0c;我们常可以看到各种大眼睛拟人化的…

电脑出现0xc00000e9错误代码的解决方法

每当假期结束回来&#xff0c;经常发现Windows系统的电脑一段时间不开机&#xff0c;开机就出现0xc00000e9的错误代码。为什么明明没有任何操作却出现错误呢&#xff1f;驱动人生带大家一文了解。 出现0xc00000e9错误代码的原因 先来了解一下电脑出现0xc00000e9错误代码的主要…

数字孪生架构

很多同学对数字孪生特别感兴趣&#xff0c;经常有同学问我&#xff1a;数据孪生系统怎么做&#xff1f;有没有教程&#xff1f;除了Unity开发&#xff0c;开发数字孪生还需要掌握什么技能&#xff1f;有人介绍了一个数字孪生的外包&#xff0c;从来没做过&#xff0c;能不能接&…

Spring 中常用的几个工具类

AnnotatedElementUtils 类 获取某个类的某个方法上是否有标注注解&#xff0c;并可以通过其他 API 获取到这个类注解上的属性值&#xff0c;该工具类其他 API 下面截图可以查看。 public static boolean isBeanAnnotated(Method method) {return AnnotatedElementUtils.hasAn…

Redis 应用问题解决

缓存穿透 key 对应的数据在数据源并不存在&#xff0c;每次针对此key的请求从缓存中获取不到&#xff0c;请求会都压到数据源&#xff0c;从而可能压垮数据源。 解决方案 一个一定不存在的缓存及查询不到的数据&#xff0c;由于缓存是不命中时被动写的&#xff0c;并且处于容…

docker 19.03构建跨平台的镜像包并推送到私有仓库

默认的docker构建image镜像是不能跨平台的,如果需要构建跨平台的镜像,需要docker的版本在19.03版本以上,并开启buildx。以下为具体的步骤 版本:docker 19.03。 一.安装/开启 buildx 1.1.手动开启dockerx开关 docker 19.3 暂默认不开启dockerx,需要手动开启 vim /etc/pro…

Scala 数据结构-集合

文章目录Scala 数据结构-集合一、集合简介1、不可变集合继承图2、可变集合继承图二、数组1、不可变数组(1) 创建数组(2) 访问数组(3) 遍历数组(4) 添加元素Scala 数据结构-集合 一、集合简介 1&#xff09;Scala的集合有三大类&#xff1a;序列seq&#xff0c;集合Set&#x…

解决fstab丢失,重启系统变为只读模式

现象描述&#xff1a; 背景&#xff1a;openEuler20.03 在/etc/fstab文件丢失、重启系统后&#xff0c;系统变为只读模式 [rootlocalhost ~]# echo 111 > 1.txt -bash: 1.txt: Read-only file system 解决方法&#xff1a; 查看系统信息&#xff0c;确认挂载信息&#…

【C进阶】数据在内存中的存储

数据在内存中的存储前言一、数据类型介绍&#xff08;一&#xff09;基本概念&#xff08;二&#xff09;类型的基本归类1.整型家族2.浮点型家族3.构造类型4.指针类型5.空类型二、整形在内存中的存储&#xff08;一&#xff09;原码、反码、补码1.概念2.为什么内存中存的是补码…

android input 事件分发 --- 注册input

android input 事件分发 --- 注册input应用注册input事件应用注册input事件 应用如果要监听input的事件&#xff0c;那么肯定就存在一个注册监听input事件的过程&#xff0c;跟随着addView方法我们跟着走一下frameworks/base/core/java/android/view/WindowManagerImpl.java Ov…

Centos7 安装 MongoDB

使用docker安装Mongo 1、拉取镜像 注&#xff1a;需要科学上网 docker pull mongo [rootlocalhost ~]# docker pull mongo Using default tag: latest latest: Pulling from library/mongo 846c0b181fff: Pull complete ef773e84b43a: Pull complete 2bfad1efb664: Pull co…

LeetCode:14. 最长公共前缀

14. 最长公共前缀1&#xff09;题目2&#xff09;思路3&#xff09;代码4&#xff09;结果1&#xff09;题目 编写一个函数来查找字符串数组中的最长公共前缀。 如果不存在公共前缀&#xff0c;返回空字符串 ""。 示例 1&#xff1a; 输入&#xff1a;strs [“flo…