RocketMq顺序消息

news2025/1/12 3:58:44

RocketMq顺序消息

  • 1.RocketMq 架构图
  • 2.RocketMq顺序消息
    • 2.1部分消息有序
      • 2.1.1 生产者构建
      • 2.1.2 生产者保证有序
      • 2.1.3 消费者保证有序性
  • 3.使用rocketmq-spring-boot-starter发送消息如何指定tag与key?
  • 问题
    • 1.MessagingException: sendDefaultImpl call timeout

1.RocketMq 架构图

在这里插入图片描述

2.RocketMq顺序消息

顺序消息分为全局有序消息和部分有序消息,全局有序消息是指一个topic下所有的消息都是有序的,而部分有序消息是指同一类型的消息有序,举个例子,如订单创建、订单支付、订单完成。同一个订单的这三种消息保证有序就可以了,订单之间可以不用有序,这就是部分有序。

2.1部分消息有序

2.1.1 生产者构建

1.依赖

       <!-- 实现对 RocketMQ 的自动化配置 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

2.配置

rocketmq:
  name-server: 10.10.10.130:9876
  producer:
    group: my-producer-group
    send-message-timeout: 15000

3.代码

    @Autowired
	private RocketMQTemplate rocketMQTemplate;

    rocketMQTemplate.syncSendOrderly(RealtimeSubscribe.SCADA_QUEUE_KEY,gn + "," + av,item);

2.1.2 生产者保证有序

要保证生产者发送消息的有序性,必须满足以下两个条件:

1、消息的发送必须是同步发送
如果是异步的,由于网络抖动等原因,有可能导致消息到达broker的顺序与发送不一致。使用spring RocketMQTemplate 类,可以使用sync开头的方法,表示同步发送。如下:

rocketMQTemplate.syncSendOrderly(RealtimeSubscribe.SCADA_QUEUE_KEY,gn + "," + av,item);

2、要保证顺序的消息必须发送到同一个queue里

在RocketMq的实现里,为了实现高并发,一个topic是可以设置为多个队列的,这多个队列有可能是在一个broker里,也有可能是在多个broker里。如果要保证顺序的消息被分别发到不同的队列,那有可能会被不同机器的不同线程同时消费,这就达不到顺序的要求。如何做到向同一个队列发送呢?队列选择它有一个hashkey的设计,只要保证需要顺序性的消息的hashkey一致,那么这些消息就会向同一个队列发送。如下代码:
第1步 传入hashKey

   public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
        return syncSendOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
    }

第2步,根据hashKey 选择队列

public class SelectMessageQueueByHash implements MessageQueueSelector {
    public SelectMessageQueueByHash() {
    }

    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value %= mqs.size();
        return (MessageQueue)mqs.get(value);
    }
}

在5月11号的时候修改发送代码如下,传入相同的hashKey之后,所有的消息便往同一个队列发送了。

rocketMQTemplate.syncSendOrderly(RealtimeSubscribe.SCADA_QUEUE_KEY,gn + "," + av,item);

在这里插入图片描述

实际上,采用队列选择器的方法不能保证消息的严格顺序,我们的目的是将消息发送到同一个队列中,如果某个broker挂了,那么队列就会减少一部分,如果采用取余的方式投递,将可能导致同一个业务中的不同消息被发送到不同的队列中,导致同一个业务的不同消息被存入不同的队列中,短暂的造成部分消息无序。同样的,如果增加了服务器,那么也会造成短暂的造成部分消息无序。

2.1.3 消费者保证有序性

3.使用rocketmq-spring-boot-starter发送消息如何指定tag与key?

使用rocketmq-spring-boot-starter发送消息如何指定tag与key

问题

1.MessagingException: sendDefaultImpl call timeout

org.springframework.messaging.MessagingException: sendDefaultImpl call timeout; nested exception is org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
	at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:551)
	at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:472)
	at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:460)
	at org.apache.rocketmq.spring.core.RocketMQTemplate.doSend(RocketMQTemplate.java:867)
	at org.apache.rocketmq.spring.core.RocketMQTemplate.doSend(RocketMQTemplate.java:55)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:129)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:122)
	at com.sx.mapi.subscribe.RealtimeSubscribe.onResponse(RealtimeSubscribe.java:100)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)
	at org.springframework.cloud.context.scope.GenericScope$LockedScopedProxyFactoryBean.invoke(GenericScope.java:499)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
	at com.sx.mapi.subscribe.RealtimeSubscribe$$EnhancerBySpringCGLIB$$1.onResponse(<generated>)
	at com.magus.jdbc.net.OPSubscribe.onEvent(OPSubscribe.java:291)
Caused by: org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:667)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343)
	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:344)
	at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:543)
	... 20 more

解决方法:加上如下配置

rocketmq:
  producer:
    send-message-timeout: 15000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1294910.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

高精度加法,减法,乘法,除法(下)(C语言)

前言 上一篇博客我们分享了高精度加法&#xff0c;减法,这一期我将为大家讲解高精度乘法和高精度除法。那让我们开始吧&#xff01; 对加法和减法感兴趣的话就点我 文章目录 1&#xff0c;乘法2&#xff0c;除法3&#xff0c;尾声 1&#xff0c;乘法 让我们想想我们平时做数学…

STC15F100E单片机模拟串口

文章目录 一、芯片简介二、开发环境三、软件模拟串口参考 一、芯片简介 STC15F100系列单片机是宏晶科技生产的单时钟/机器周期(1T)的单片机&#xff0c;新一代8051单片机&#xff0c;指令代码完全兼容传统8051&#xff0c;但是速度快6-12倍。 内部集成R/C时钟&#xff0c;5MHz…

52 代码审计-PHP项目类RCE及文件包含下载删除

目录 漏洞关键字:演示案例:xhcms-无框架-文件包含跨站-搜索或应用-includeearmusic-无框架-文件下载-搜索或应用功能-down等zzzcms-无框架-文件删除RCE-搜索或应用-unlink、eval 漏洞关键字: SQL注入&#xff1a; select insert update mysql_query mysql等 文件上传&#xff…

守护安全,六氟化硫气体泄漏报警装置校准服务

在电力工业中&#xff0c;六氟化硫&#xff08;SF6&#xff09;气体是一种重要的介质&#xff0c;它用作封闭式中、高压开关的灭弧和绝缘气体。六氟化硫气体的卓越性能实现了装置经济化、低维护化的操作。与普通装置相比&#xff0c;可以节省最多90&#xff05;的空间。 六氟化…

Themis: Fast, Strong Order-Fairness in Byzantine Consensus

目录 笔记后续的研究方向摘要引言秩序井然 Themis: Fast, Strong Order-Fairness in Byzantine Consensus CCS 2023 笔记 后续的研究方向 摘要 我们介绍了Themis&#xff0c;这是一种将交易的公平排序引入&#xff08;许可的&#xff09;拜占庭共识协议的方案&#xff0c;最…

ubuntu系统下搭建本地物联网mqtt服务器的步骤

那么假如我们需要做一些终端设备&#xff0c;例如温湿度传感器、光照等物联网采集设备要接入呢&#xff1f;怎么样才能将数据报送到服务器呢&#xff1f; 以下内容基于我们ubuntu系统下的emqx成功启动的基础上。我们可以用浏览器键入控制板的地址&#xff0c;如果启动成功&…

python pyaudio显示音频波形图

python pyaudio显示音频波形图 代码如下&#xff1a; import numpy as np import matplotlib.pylab as plb import wave# 读取 wav wf wave.open("./output.wav", "rb")# 获取音频相关参数&#xff1a;声道数、量化位数、采样频率、采样帧数 nchannels,…

案例二:SQL Server数据库的备份和还原

1、备份类型。 在 SQL Server 中提供了三种常用的备份类型&#xff0c;分别是完整备份&#xff0e;差异备份和事务日志备份。 完整备份&#xff1a; 完整备份包括对整个数据库、部分事务日志、数据库结构和文件结构的备份。完整备份代表的是备份完成时刻的数据库。 完整备份是…

「Verilog学习笔记」格雷码计数器

专栏前言 本专栏的内容主要是记录本人学习Verilog过程中的一些知识点&#xff0c;刷题网站用的是牛客网 timescale 1ns/1nsmodule gray_counter(input clk,input rst_n,output reg [3:0] gray_out );reg [3:0] binary_cnt ; reg flag ; always (posedge clk or negedge r…

【高效开发工具系列】Mac删除启动台图标

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

IP地址如何用于流量管理?

随着互联网的普及和网络流量的不断增加&#xff0c;流量管理成为了网络运营中至关重要的一环。而IP地址作为互联网中的重要标识符&#xff0c;也可以被广泛应用于流量管理中。 IP地址是互联网协议&#xff08;IP&#xff09;中用于标识和定位网络设备的32位二进制地址。通过IP地…

Redis Reactor事件驱动模型源码

前置学习&#xff1a;Redis server启动源码-CSDN博客 Redis采用单线程Reactor模型 三个关键角色&#xff0c;即 reactor、acceptor、handler 三类处理事件&#xff0c;即连接事件、写事件、读事件。 建立连接&#xff08;Acceptor&#xff09;、监听accept、read、write事件…

使用高防IP防护有哪些优势

高防IP是针对互联网服务器在遭受大流量的DDoS攻击后导致服务不可用的情况下&#xff0c;推出的付费增值服务&#xff0c;用户可以通过配置高防IP&#xff0c;将攻击流量引流到高防IP&#xff0c;确保源站的稳定可靠。高防IP相当于搭建完转发的服务器。 高防IP有两种接入方式&a…

《Easy3d+Qt+VTK》学习

《Easy3dQtVTK》学习-1、编译与配置 一、编译二、配置注 一、编译 1、 资源下载&#xff1a;easy3d giuhub 2、解压缩 3、用qt打开CMakeLists.txt即可 4、点击项目&#xff0c;选择debug或者release&#xff0c;图中3处可自行选择&#xff0c;因为我的qt版本是6&#xff0c…

unity 2d 入门 飞翔小鸟 小鸟跳跃 碰撞停止挥动翅膀动画(十)

1、切换到动画器 点击make transition和exit关联起来 2、设置参数 勾选掉Has Exit Time 3、脚本给动画器传参 using System.Collections; using System.Collections.Generic; using UnityEngine;public class Fly : MonoBehaviour {//获取小鸟&#xff08;刚体&#xff09;p…

云原生系列1

1、虚拟机集群环境准备 VirtualBox类似vmware的虚拟化软件&#xff0c;去官网https://www.virtualbox.org/下载最新版本免费的&#xff0c;VirtualBox中鼠标右ctrl加home跳出鼠标到wins中。 VirtualBox安装步骤 https://blog.csdn.net/rfc2544/article/details/131338906 cent…

【广州华锐互动】VR煤矿生产事故体验系统为矿工提供一个身临其境的安全实训环境

随着科技的不断发展&#xff0c;虚拟现实&#xff08;VR&#xff09;技术已经逐渐走进我们的生活&#xff0c;为我们带来了前所未有的沉浸式体验。在许多领域&#xff0c;如教育、医疗、娱乐等&#xff0c;VR技术都发挥着重要作用。然而&#xff0c;当这项技术被用于模拟煤矿坍…

angular状态管理方案(ngrx)

完全基于redux的ngrx方案&#xff0c;我们看看在angular中如何实现。通过一个简单的计数器例子梳理下整个流程 一 安装 &#xff1a;npm i ngrx/store 这里特别要注意一点&#xff1a;安装 ngrx/store的时候会出现和angular版本不一致的问题 所以检查一下angular/core的版本…

东北地理所最新Nature通讯文章

作为城市的重要组织部分&#xff0c;城市湿地在水源供给、增湿降温、雨洪调蓄等多个方面发挥着极其重要的作用&#xff0c;2024年国际湿地日主题定为“湿地与人类福祉”。在此背景下&#xff0c;中国科学院东北地理与农业生态研究所毛德华研究员等在12月5日出版的Nature发表题为…

CentOS系统装机流程

目录 1、进入装机页面 2、配置分区 3、设置语言 4、软件安装&#xff08;我这里选的是最小化安装&#xff0c;一般情况下应该选Server&#xff09; 5、时区配置 ​编辑 6、Root登录密码 7、开始装机&#xff0c;重启后装机完成 1、进入装机页面 2、配置分区 3、设置语言…