Spring Boot3.x集成Disruptor4.0

news2025/1/10 1:51:41

Disruptor介绍

Disruptor是一个高性能内存队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。

Disruptor 是一个 Java 的并发编程框架,大大的简化了并发程序开发的难度,在性能上也比 Java 本身提供的一些并发包要好。它源于LMAX对并发性 、性能和非阻塞算法的研究,如今构成了其Exchange基础架构的核心部分。

Disruptor的队列功能和传统的MQ队列服务不同(比如:kafka\rabbitMq等等),Disruptor而是一个基于JDK的高性能内存队列,例如与Java的BlockingQueue进行对比。与队列一样,Disruptor的目的是在同一进程内的线程之间传递数据(例如消息或事件)。

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。在美团技术团队它也有不少应用,有的项目架构借鉴了它的设计机制。

Disruptor功能

  • 高性能消息传递:Disruptor 能够通过避免锁和减少线程间的数据交换来提高性能。
  • 支持多生产者和多消费者:可以由多个生产者向队列中添加事件,同时多个消费者处理这些事件。
  • 事件处理模型:Disruptor 使用预分配事件的环形数组结构,每个事件槽可以被重复使用,减少了对象创建的开销。
  • 内存屏障优化:利用内存屏障来减少不必要的CPU缓存刷新,提高效率。

Disruptor优点

  • 极高的吞吐量和低延迟:通过减少锁的使用和优化内存操作,Disruptor 能够实现极高的数据处理速率和低延迟。
  • 避免了线程阻塞:使用无锁的设计,避免了传统队列中的线程阻塞问题。
  • 资源利用率高:通过重复使用事件对象,减少了垃圾回收的压力。

Disruptor缺点

  • 复杂性:Disruptor 的使用和理解比标准的队列或者其他并发模型要复杂,需要更多的学习和调试。
  • 适用场景有限:主要适用于需要极高性能和低延迟的系统,对于一般的应用场景可能是过度设计。
  • 调试困难:由于其无锁的设计和复杂的内部结构,当出现问题时,调试可能比较困难。
  • 总的来说,Disruptor 是一个专为高性能计算设计的工具,适用于那些对性能有极端要求的场景。对于普通应用或者数据量不大的情况,使用传统的并发模型可能更为合适。

Disruptor特征

Disruptor的目标之一是在低延迟环境中使用,在低延迟系统中,必须减少或移除内存分配;

在基于Java的系统中,目的是减少由于垃圾收集导致的系统停顿;为了支持这一点,用户可以预先分配Disruptor中事件所需的存储空间(也就是声明RingBuffer的大小)。

在构造RingBuffer期间,EventFactory由用户提供,并将在Disruptor的Ring Buffer中每个事件元素创建时候被调用。将新数据发布到Disruptor时,API将允许用户获取构造的对象,以便他们可以调用方法或更新该存储对象上的字段,Disruptor保证这些操作只要正确实现就是并发安全的。

官方文档

github开源地址

GitHub - LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library

github介绍文档

LMAX Disruptor

开发示例

我们以一个项目来演示,开发一个订单业务消息处理服务,来模拟采用Disruptor队列来对订单进行管理;

采用一个生产者,多个消费者模式,并且多个消费者按不同的顺序进行链路排例,对生产者消息进行消费;

如:

某电商平台存在以下服务功能

  • 订单管理服务:生成订单后,过行订单管理与跟踪
  • 用户等级服务:用户购买商品后,重新评估用户星级等级,提供对标服务
  • 电商客服服务:负责售前售后服务,有3组,分别为A组、B组、C组,每个订单只分配到其中一组提供对接客服服务
  • 仓储管理服务:管理平台所有商品仓库,并提供已购买商品
  • 物流投递服务:负责从仓储中获取商品,投送到用户手中

电商平台每完成一笔支付订单,将消息发送到此Disruptor示例服务。

  • 首先分别经过订单管理服务和用户等级服务,注意:两个服务均为独立消费消息
  • 完成前置两个服务消费后,才能执行电商客服A组、电商客服B组、电商客服C组其中任意一个服务独立消费消息,注意:三选一,不能全部执行
  • 完成前置电商客服服务消费后,执行仓储管理服务消费消息
  • 完成前置仓储管理服务消费后,最后执行物流投递服务

消费链路流程如下:

工程环境

  • JDK:17
  • SpringBoot:3.1.3-SNAPSHOT

注:假设你已创建基础工程,并完成SpringBoot组件引入

引入Disruptor依赖

<dependencies>
    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>4.0.0</version>
    </dependency>
</dependencies>

项目Disruptor配置

package com.example.disruptor;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RequestPredicate;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

/**
 * @Description 服务配置类
 */
@Slf4j
@Configuration(proxyBeanMethods = false)
public class WebConfig {

    private static final RequestPredicate JSON_ACCEPT = accept(new MediaType(MediaType.APPLICATION_JSON, StandardCharsets.UTF_8));
    //缓冲区大小,必需是2的N次方
    final static int BUFFER_SIZE = 1024 * 1024;

    /**
     * 创建基于环的可重用队列存储,实现消息数据存储与推送到处理器执行
     * @return
     */
    @Bean("orderRingBuffer")
    public RingBuffer<OrderEvent> createRingBuffer(){
        // 创建消息处理器,注:正常业务模式下,由不同的业务类实现;此处为了简化演示,从createEventHandler()方法中获取模拟实现类;
        EventHandler<OrderEvent> orderHandler = createEventHandler("消息序例:{}, 发送到《订单管理服务》, 订单详情:{}");
        EventHandler<OrderEvent> userLevelHandler = createEventHandler("消息序例:{}, 发送到《用户等级服务》, 订单详情:{}");
        EventHandler<OrderEvent> customerService0Handler = createEventHandler(0, 3, "消息序例:{}, 发送到《电商客服A组》, 订单详情:{}");
        EventHandler<OrderEvent> customerService1Handler = createEventHandler(1, 3, "消息序例:{}, 发送到《电商客服B组》, 订单详情:{}");
        EventHandler<OrderEvent> customerService2Handler = createEventHandler(2, 3, "消息序例:{}, 发送到《电商客服C组》, 订单详情:{}");
        EventHandler<OrderEvent> storageHandler = createEventHandler("消息序例:{}, 发送到《仓储管理服务》, 订单详情:{}");
        EventHandler<OrderEvent> deliveryHandler = createEventHandler("消息序例:{}, 发送到《物流投递服务》, 订单详情:{}");

        //创建环形缓冲区处理器事件生成器
        Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(OrderEvent::new,
                //缓冲区大小
                BUFFER_SIZE,
                //默认线程工厂
                Executors.defaultThreadFactory(),
                //ProducerType.SINGLE(表示生产者只有一个)和ProducerType.MULTY(表示有多个生产者)
                ProducerType.SINGLE,
                /**
                 可用事件策略:
                 BlockingWaitStrategy:用了ReentrantLock的等待&&唤醒机制实现等待逻辑,是默认策略,比较节省CPU
                 BusySpinWaitStrategy:持续自旋,JDK9之下慎用(最好别用)
                 DummyWaitStrategy:返回的Sequence值为0,正常环境是用不上的
                 LiteBlockingWaitStrategy:基于BlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作,但是作者说测试不充分,不建议使用
                 TimeoutBlockingWaitStrategy:带超时的等待,超时后会执行业务指定的处理逻辑
                 LiteTimeoutBlockingWaitStrategy:基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
                 SleepingWaitStrategy:三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的的睡眠
                 YieldingWaitStrategy:二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
                 PhasedBackoffWaitStrategy:四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个
                 注意:
                 BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
                 SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景
                 YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性
                 */
                new YieldingWaitStrategy());

        //消息消费配置处理器执行链路:orderHandler, userLevelHandler》customerService[0~2]Handler》storageHandler》deliveryHandler
        //分别独立执行:orderHandler(订单管理服务), userLevelHandler(用户等级服务)
        disruptor.handleEventsWith(orderHandler, userLevelHandler)
                //前置消费后,再执行其中任意一个处理器:customerService[0~2]Handler(电商客服A组\电商客服B组\电商客服C组,三选一)
                .then(customerService0Handler, customerService1Handler, customerService2Handler)
                //前置消费后,再执行处理器:storageHandler(仓储管理服务)
                .then(storageHandler)
                //前置消费后,再执行处理器:deliveryHandler(物流投递服务)
                .then(deliveryHandler);

        //启动disruptor服务
        disruptor.start();
        return disruptor.getRingBuffer();
    }

    /**
     * 创建消息处理器
     * @param msg
     * @return
     */
    private EventHandler<OrderEvent> createEventHandler(final String msg){
        return (event, sequence, endOfBatch)->{
            log.info(msg, sequence,event);
            //业务逻辑代码...
        };
    }

    /**
     * 创建消息处理器,支持相同处理器多选一(取模计算)
     * @param index
     * @param handlerCount
     * @param msg
     * @return
     */
    private EventHandler<OrderEvent> createEventHandler(final int index, final int handlerCount, final String msg){
        return (event, sequence, endOfBatch)->{
            if (sequence % handlerCount == index) {
                log.info(msg, sequence, event);
                //业务逻辑代码...
            }
        };
    }

    /**
     * 后台服务请求router
     * @param orderRingBuffer
     * @return
     */
    @Bean
    public RouterFunction<ServerResponse> webRouterFunction(RingBuffer<OrderEvent> orderRingBuffer) {
        return route()
                .POST("/order/push", JSON_ACCEPT, (request)->{
                    String orderId = request.queryParam("orderId").orElse("");
                    String name = request.queryParam("name").orElse("");
                    String price = request.queryParam("price").orElse("");
                    orderRingBuffer.publishEvent((orderEvent, sequence) -> {
                        orderEvent.setOrderId(orderId);
                        orderEvent.setName(name);
                        orderEvent.setPrice(Double.parseDouble(price));
                    });
                    return ServerResponse.status(HttpStatus.OK).bodyValue("ok,200");
                })
                .build();
    }
}

订单对象

package com.example.disruptor;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class OrderEvent {
    private String orderId;
    private String name;
    private Double price;
}

启动类

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DisruptorSamplesApplication {
    public static void main(String[] args) {
        SpringApplication.run(DisruptorSamplesApplication.class, args);
    }
}

YML配置文件

# 本地服务访问
server:
  # 服务端口
  port: 8080
  # 服务IP
  address: 0.0.0.0
# 配置日志
logging:
  level:
    org.springframework: info
# 开启debug模式
debug: false

工程测试

Postman发送POST请求,将模拟表单数据提交到服务端;

服务打印日志

2024-04-30T18:08:05.227+08:00  INFO 39828 --- [pool-4-thread-1] com.example.disruptor.WebConfig: 消息序例:0, 发送到《订单管理服务》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)
2024-04-30T18:08:05.227+08:00  INFO 39828 --- [pool-4-thread-2] com.example.disruptor.WebConfig: 消息序例:0, 发送到《用户等级服务》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)
2024-04-30T18:08:05.230+08:00  INFO 39828 --- [pool-4-thread-3] com.example.disruptor.WebConfig: 消息序例:0, 发送到《电商客服A组》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)
2024-04-30T18:08:05.230+08:00  INFO 39828 --- [pool-4-thread-6] com.example.disruptor.WebConfig: 消息序例:0, 发送到《仓储管理服务》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)
2024-04-30T18:08:05.230+08:00  INFO 39828 --- [pool-4-thread-7] com.example.disruptor.WebConfig: 消息序例:0, 发送到《物流投递服务》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)

通过日志清楚的展示了,消费者消息处理器按程序执行配置的链路顺序正确打印;

结束

以上介绍了disruptor的基本信息与特点,并通过代码工程演示了dirsruptor在项目中如何开发,以及使用场景等;本文内容介绍有限,实际应用过程中disruptor还有很多其它用法,并未通过本文全整的展述,比如:如何使用多个消费者在线程池下完成消费,以及多生产者模式;可以通过官方文档与源码了解更多dirsruptor用法与功能;本文如有不足之处,欢迎指正与交流;

参考:

高性能队列——Disruptor-CSDN博客

LMAX Disruptor

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1649413.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++手写协程项目(协程实现线程结构体、线程调度器定义,线程挂起函数、线程切换函数、线程恢复函数、线程结束函数、线程结束判断函数,模块测试)

协程结构体定义 之前我们使用linux下协程函数实现了线程切换&#xff0c;使用的是ucontext_t结构体&#xff0c;和基于这个结构体的四个函数。现在我们要用这些工具来实现我们自己的一个线程结构体&#xff0c;并实现线程调度和线程切换、挂起。 首先我们来实现以下线程结构体…

【iOS】——浅析CALayer

文章目录 一、CALayer介绍二、UIview与CALayer1.区别2.联系 三、CALayer的使用1.初始化方法2.常用属性 四.CALayer坐标系1.position属性和anchorPoint属性2.position和anchorPoint的关系3.position、anchorPoint和frame的关系 五、CALayerDelegate六、CALayer绘图机制1.绘图流程…

官方教程来啦!上手体验YashanDB主备部署、同步延迟和自动切换能力

在上一篇深度干货 | 如何兼顾性能与可靠性&#xff1f;一文解析YashanDB主备高可用技术中&#xff0c;我们深入探讨了YashanDB高可用的架构设计原理和关键技术&#xff0c;本文将聚焦于实践操作&#xff0c;快速体验YashanDB的主备高可用能力。 概要 YashanDB提供了不同部署形…

C++程序设计教案

文章目录&#xff1a; 一&#xff1a;软件安装环境 第一种&#xff1a;vc2012 第二种&#xff1a;Dev-C 第三种&#xff1a;小熊猫C 二&#xff1a;语法基础 1.相关 1.1 注释 1.2 换行符 1.3 规范 1.4 关键字 1.5 ASCll码表 1.6 转义字符 2.基本框架 2.1 第一种&…

如果insightface/instantID安装失败怎么办(关于InsightFaceLoader_Zho节点的报错)

可能性有很多&#xff0c;但是今天帮朋友解决问题的时候又收集了一种新的思路。 首先&#xff0c;可以先按照这篇文章里边提到的方法去安装&#xff1a; 【全网最详细】ComfyUI下&#xff0c;Insightface安装指南-聚梦小课堂_insightface如何安装-CSDN博客 其次&#xff0c;…

解决Python中的 `ModuleNotFoundError: No module named ‘fcmeans‘` 错误

ModuleNotFoundError: No module named fcmeans 解决Python中的 ModuleNotFoundError: No module named fcmeans 错误如何解决这个错误fcmeans 库简介应用实例 解决Python中的 ModuleNotFoundError: No module named fcmeans 错误 在进行数据科学或机器学习项目时&#xff0c;…

Linux内核之获取文件系统超级块:sget用法实例(六十八)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

大眼橙C1 Air投影仪:千元预算内的明智之选

在科技日新月异的今天&#xff0c;投影仪已经不再是会议室或教室的专属&#xff0c;而是越来越多地走入了寻常百姓家。家庭影院的概念越来越流行&#xff0c;尤其在都市人之间逐渐成为一股风尚。市场上投影仪非常多&#xff0c;如何选到一台合适的投影仪也成为困扰广大用户的一…

了解TMS运输管理系统,实现物流高效运转

TMS运输管理系统&#xff08;Transportation Management System&#xff09;是一种集成物流和信息技术的解决方案&#xff0c;通过优化运输流程、实时跟踪货物信息和自动化管理操作&#xff0c;提高物流效率&#xff0c;降低运营成本&#xff0c;实现高效运输。 TMS运输管理系…

软件设计师-重点的构造型设计模式

一、桥接模式&#xff08;Bridge&#xff09;&#xff1a; 意图&#xff1a; 将抽象部分与其实现部分分离&#xff0c;使它们都可以独立地变化。 结构&#xff1a; 适用性&#xff1a; 不希望在抽象和它的实现部分之间有一个固定的绑定关系。例如&#xff0c;这种情况可能是…

探索大模型能力--prompt工程

1 prompt工程是什么 1.1 什么是Prompt&#xff1f; LLM大语言模型终究也只是一个工具&#xff0c;我们不可能每个人都去训一个大模型&#xff0c;但是我们可以思考如何利用好大模型&#xff0c;让他提升我们的工作效率。就像计算器工具一样&#xff0c;要你算10的10倍&#x…

【计算机网络】计算机网络的性能指标

计算机网络的性能指标被用来从不同方面度量计算机网络的性能。常用的八个计算机网络性能指标&#xff1a;速率、带宽、吞吐量、时延、时延带宽积、往返时间、利用率、丢包率。 一.速率 (1) 数据量 比特&#xff08;bit&#xff0c;记为小写b&#xff09;是计算机中数据量的基…

JavaWEB 框架安全:Spring 漏洞序列.(CVE-2022-22965)

什么叫 Spring 框架. Spring 框架是一个用于构建企业级应用程序的开源框架。它提供了一种全面的编程和配置模型&#xff0c;可以简化应用程序的开发过程。Spring 框架的核心特性包括依赖注入&#xff08;Dependency Injection&#xff09;、面向切面编程&#xff08;Aspect-Or…

c++ 线程交叉场景试验

1.需求 1.处理一个列表的数据&#xff0c;要求按照列表的数据处理10个数据 2.可以使用多线程处理&#xff0c;但是针对每个线程&#xff0c;1~10的处理顺序不能变。 3.每个数据的处理必须原子&#xff0c;即只有一个线程可以针对某个数据进行处理&#xff0c;但是10个数据是可…

2024年CSC公派联合培养博士项目申报即将开始~

一、选派计划 联合培养博士研究生面向全国各博士学位授予单位选拔。 联合培养博士研究生的留学期限、资助期限为6-24个月。留学期限应根据拟留学单位学制、外方录取通知&#xff08;或正式邀请信&#xff09;中列明的留学时间确定。个人申报的资助期限应不超过留学期限&#…

79、贪心-跳跃游戏II

思路&#xff1a; 首先理解题意&#xff1a;从首位置跳最少多少次到达末尾。 第一种&#xff1a;使用递归&#xff0c;将所有跳转路径都获取到进行求出最小值。 第二种&#xff1a;使用动态规划&#xff0c;下一次最优取决上一次的最优解 第三针&#xff1a;贪心&#xff…

python数据分析常用基础语法

Python语言基础——语法基础 前言一、变量的介绍与使用变量的介绍变量命名规则变量的使用拓展 二、标识符标识符命名命名规则注意事项 三、数据类型数据类型的介绍数据类型的查看示例 四、输入与输出输入和输出的介绍format格式化输出占位符 五、代码缩进与注释代码缩进 前言 …

Spring Cloud 整合Sentinel

1、引入依赖 版本说明 alibaba/spring-cloud-alibaba Wiki GitHub 父pom <spring.cloud.version>Hoxton.SR12</spring.cloud.version> <spring.cloud.alibaba.version>2.2.10-RC1</spring.cloud.alibaba.version>Sentinel应用直接引用starter <…

0508_IO2

练习&#xff1a; 将一张图片修改为德国国旗 1 #include <stdio.h>2 #include <string.h>3 #include <stdlib.h>4 #include <sys/types.h>5 #include <unistd.h>6 #include <sys/stat.h>7 #include <fcntl.h>8 #include <pthrea…

Codigger:Web应用赋能的分布式操作系统让用户卓越体验

Codigger&#xff0c;作为一个分布式操作系统&#xff0c;其独特之处在于其采用的浏览器/服务器&#xff08;Browser/Server&#xff0c;简称B/S&#xff09;架构。这种架构的核心思想是&#xff0c;通过浏览器来进入工作界面&#xff0c;页面交互部分事务逻辑在前端&#xff0…