RocketMQ-消息消费模式 顺序消费

news2025/1/10 12:04:18

RocketMQ-消息消费模式 顺序消费

  • RocketMQ-消息消费模式
    • 集群模式
      • 集群模式的演示(本身就默认)
      • Rocketmq存储队列
    • 广播模式
  • 顺序消费
    • 如何改实现顺序消费


RocketMQ-消息消费模式

集群模式

在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到
在这里插入图片描述
在这里插入图片描述

集群模式的演示(本身就默认)

假设我们生产者生产了十条信息 ,当我们集群了两台消费者服务器的时候,就会每个服务器执行五条

在这里插入图片描述在这里插入图片描述

Rocketmq存储队列

在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的
如果只有一个队列,保证线程安全,必须得给队列进行写操作的时候上锁。
多几个队列,降低并发度,等待时间就短一些。

为什么是四个队列?

因为大多数服务器只有四核,意味着同时最多只能有CPU同时工作
在这里插入图片描述

广播模式

在消费模式为集群的情况下,如果机器是集群的,消费是会给集群中的所有机器所消费到

public class Consumer {
    public static void main(String[] args) throws Exception {
        //定义消息消费者(在同一个JVM中,消费者的组名不能重复)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
        //设置nameServer地址
        consumer.setNamesrvAddr("43.143.161.59:9876");
        //设置订阅的主题
        consumer.subscribe("helloTopic","*");
        //设置消费模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //设置消息的监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(MessageExt msg:list){
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

运行结果:

生产者发送了十条消息之后,消费者集群的每个服务器均收到十条数据
在这里插入图片描述
在这里插入图片描述


顺序消费

实现生产顺序:12345消费顺序12345
哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程

假设我们没有实现顺序消费的时候

创建生产者

1.创建实体类

@Setter
@Getter
public class OrderStep {
    private long orderId;
    private String desc;

    @Override
    public String toString() {
        return "OrderStep{" +
                "orderId=" + orderId +
                ", desc='" + desc + '\'' +
                '}';
    }
}

2.创建测试类

public class OrderUtil {
    public static List<OrderStep> buildOrders(){
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        return orderList;
    }
}

3.创建生产者类

public class Producer {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");
            producer.setNamesrvAddr("43.143.161.59:9876");
            producer.start();
            String topic = "orderTopic";
            List<OrderStep> orderSteps = OrderUtil.buildOrders();
            for(OrderStep step:orderSteps){
                Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));
                producer.sendOneway(msg);
            }
            producer.shutdown();
        }
}

创建消费者类

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");
        consumer.setNamesrvAddr("43.143.161.59:9876");
        consumer.subscribe("orderTopic","*");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(MessageExt msg:list){
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

运行结果:

可以看出和我们生产数据的顺序完全不同,整个订单的顺序都反了
在这里插入图片描述

如何改实现顺序消费

生产者类

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");
        producer.setNamesrvAddr("43.143.161.59:9876");
        producer.start();
        String topic = "orderTopic";
        List<OrderStep> orderSteps = OrderUtil.buildOrders();
        //设置队列选择器
        MessageQueueSelector selector = new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                System.out.println("队列个数"+list.size());
                Long orderId = (Long) o;
                int index = (int)(orderId % list.size());
                return list.get(index);
            }
        };
        for(OrderStep step:orderSteps){
            Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));
            //指定消息选择器,换入的参数
            producer.send(msg,selector,step.getOrderId());
        }
        producer.shutdown();
    }
}

消费者类

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");
        consumer.setNamesrvAddr("43.143.161.59:9876");
        consumer.subscribe("orderTopic","*");
        //从什么地方开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //一个队列对应一个线程
        consumer.setMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                for(MessageExt msg:list){
                    System.out.println("当前线程:"+Thread.currentThread()+":,队列ID"+msg.getQueueId()+",消息内容:"+new String(msg.getBody(),Charset.defaultCharset()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/338619.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构】双向链表的模拟实现(无头)

目录 前言&#xff1a; 1、认识双向链表中的结点 2、认识并创建无头双向链表 3、实现双向链表当中的一些方法 3.1、遍历输出方法&#xff08;display&#xff09; 3.2、得到链表的长度&#xff08;size&#xff09; 3.3、查找关键字key是否包含在双链表中(contains) 3.…

基于I2S通讯MAX98357模块的JetsonNano声音外放

前言有很多方法可以为 Jetson 设备添加音频功能。USB 扬声器和USB 麦克风是一种简单的解决方案&#xff0c;但它们确实占用了宝贵的 USB 插槽&#xff0c;这些插槽可能更适合用于键盘、蓝牙功能、Internet Keys 和其他配件。在 Jetson 设备上&#xff0c;NVIDIA 通过 40 针 GPI…

微软发布会精华回顾:“台式电脑”抢了风头

Lightbot北京时间2016年10月26日晚10点&#xff0c;微软在纽约发布了名为 Surface Studio 的一体机、名为 Surface Dial 的配件以及外观未变的顶配版 Surface Book。同时&#xff0c;微软宣布了 Windows 10 下一个重要版本——“Creators Update”的数项新功能&#xff0c;包括…

【Linux】冯诺依曼体系结构和操作系统概念

文章目录&#x1f3aa; 冯诺依曼体系结构&#x1f680;1.体系概述&#x1f680;2.CPU和内存的数据交换&#x1f680;3.体系结构中数据的流动&#x1f3aa; 操作系统概念理解&#x1f680;1.简述&#x1f680;2.设计目的&#x1f680;3.定位&#x1f680;4.理解&#x1f680;5.管…

AOP面向切面编程思想。

目录 一、AOP工作流程 1、基本概念 2、AOP工作流程 二、AOP核心配置 1、AOP切入点表达式 2、AOP通知类型 三、AOP通知获取数据 1、获取参数 2、获取返回值 3、获取异常 四、AOP事务管理 1、Spring事务简介 2、Spring事务角色 3、事务属性 一、AOP工作流程 1、…

Linux内核启动(理论,0.11版本)分段与分页

为什么要虚拟内存 我们知道&#xff0c;在之前上微机原理时&#xff0c;我们的程序是可以直接访问内存的&#xff0c;而且访问的是直接的物理内存&#xff0c;在实模式下&#xff0c;寄存器是16位的&#xff0c;数组总线&#xff08;data bus&#xff09;是16位的&#xff0c;…

设计模式-值类型与引用类型、深拷贝与浅拷贝、原型模式详解

一. 值类型和引用类型 1. 前言 (1). 分类 值类型包括&#xff1a;布尔类型、浮点类型(float、double、decimal、byte)、字符类型(char)、整型&#xff08;int、long、short等&#xff09;、枚举(entum)、结构体(struct)。 引用类型&#xff1a;数组、字符串(string)、类、接口…

DamiCMS SQL注入分析

2023年将会持续于B站、CSDN等各大平台更新&#xff0c;可加入粉丝群与博主交流:838681355&#xff0c;为了老板大G共同努力。 一、入口文件(单入口文件模式) 看一下Index.php文件代码&#xff1a;引入了php_safe.php文件 查看一下php_safe.php防御文件&#xff1a; 对变量e…

2019_41 考研408

2019年(单链表)41.(13分)设线性表采用带头结点的单链表保存&#xff0c;链表中的结点定义如下:typedef struct node {int data;struct node* next;}NODE;请设计一个空间复杂度为O(1)且时间上尽可能高效的算法&#xff0c;重新排列L中的各结点&#xff0c;得到线性表L(q,a,,a,an…

【正则表达式】获取html代码文本内所有<script>标签内容

文章目录一. 背景二. 思路与过程1. 正则表达式中需要限定<script>开头与结尾2. 增加标签格式的限定3. 不限制<script>首尾的内部内容4. 中间的内容不能出现闭合的情况三. 结果与代码四. 正则辅助工具一. 背景 之前要对学生提交的html代码进行检查&#xff0c;在获…

牛客小白月赛66

牛客小白月赛66_ACM/NOI/CSP/CCPC/ICPC算法编程高难度练习赛_牛客竞赛OJ (nowcoder.com)冒着期末挂科的风险打了打&#xff0c;缓解了一下网瘾&#xff0c;感觉还行最近为了期末鸽了很多期的div3&#xff0c;一学期末就手痒想训&#xff0c;感觉再不打人要没了&#xff0c;结果…

linux性能优化-内存回收

linux文件页、脏页、匿名页 缓存和缓冲区&#xff0c;就属于可回收内存。它们在内存管理中&#xff0c;通常被叫做文件页&#xff08;File-backed Page&#xff09;。通过内存映射获取的文件映射页&#xff0c;也是一种常见的文件页。它也可以被释放掉&#xff0c;下次再访问的…

DOM编程-显示网页时钟

<!DOCTYPE html> <html> <head> <meta charset"utf-8"> <title>显示网页时钟</title> </head> <body bgcolor"antiquewhite"> <script type"text/javascrip…

剑指offer(中等)

目录 二维数组中的查找 重建二叉树 矩阵中的路径 剪绳子 剪绳子② 数值的整数次方 表示数值的字符串 树的子结构 栈的压入、弹出序列 从上到下打印二叉树① 从上到下打印二叉树③ 二叉搜索树的后序遍历序列 二叉树中和为某一值的路径 复杂链表的复制 二叉搜索树与…

C++复习笔记8

泛型编程&#xff1a;编写的是与类型无关的通用代码&#xff0c;是代码复用的一种手段&#xff0c;模板是泛型编程的基础。 1.函数模板&#xff1a;类型参数化&#xff0c;增加代码复用性。例如对于swap函数&#xff0c;不同类型之间进行交换都需要进行重载&#xff0c;但是函数…

K_A12_003 基于STM32等单片机采集光敏二极管模块参数 串口与OLED0.96双显示

K_A12_003 基于STM32等单片机采集光敏二极管模块参数 串口与OLED0.96双显示一、资源说明二、基本参数参数引脚说明三、驱动说明IIC地址/采集通道选择/时序对应程序:四、部分代码说明1、接线引脚定义1.1、STC89C52RC光敏二极管模块1.2、STM32F103C8T6光敏二极管模块五、基础知识…

面向 3DoF+的虚拟视图合成算法研究(陈 莹)

面向 3DoF的虚拟视图合成算法研究&#xff08;陈 莹&#xff09;论文贡献多视点联合的虚拟视图合成算法视图合成中多视点伪影消除算法面向虚拟视图合成算法的 3DoF系统基于深度的虚拟视图合成算法视点映射&#xff08;3D-Warping&#xff09;三维空间映射变换&#xff08;3D-Wa…

TYPE-C 手机/电脑同时充电直播 视频采集卡方案

Type-C音视频采集卡有什么作用&#xff1f; ​能够帮助专业用户和游戏玩家迅速搭建简单、高性价比的音视频解决方案。可将新闻联播、体育竞赛、视频教学课程、网络视频等&#xff0c;通过HDMI高清视频信号分段或整体录制在本地计算机共享使用。支持多种带HDMI接口的游戏机设备…

生物素-琥珀酰亚胺酯Biotin-NHS;CAS号:35013-72-0;可对溶液中的抗体,蛋白质和任何其他含伯胺的大分子进行简单有效的生物素标记。

结构式&#xff1a; ​ 生物素-琥珀酰亚胺酯Biotin NHS CAS号&#xff1a;35013-72-0 英文名称&#xff1a;Biotin-NHS 中文名称&#xff1a;D-生物素 N-羟基琥珀酰亚胺酯&#xff1b;生物素&#xff0d;琥珀酰亚胺酯 CAS号&#xff1a;35013-72-0 密度&#xff1a;1.50.1 …

vue项目第二天

项目中使用element-ui库中文网https://element.eleme.cn/#/zh-CN安装命令npm install element-ui安装按需加载babel插件npm install babel-plugin-component -Dnpm i //可以通过npm i 的指令让配置刷新重新配置一下项目中使用element-ui组件抽离文件中按需使用element ui &…