ROCKETMQ极简介绍,顺序,事务示例

news2024/11/25 10:38:04

整体架构

 

Name Server

管理Broker实例的注册,提供心跳检测机制

路由管理: Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息

生产者 Producer

以生产者组的形式出现,一个生产者组可以同时发送多个主题的消息

Broker

存储消息、转发消息

Consumer消费者

以消费组的形式出现

同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息

消息模型

 

  • 消息写入能力的水平扩展,RocketMQ 对 Topic进行了分区,这种操作被称为队列(MessageQueue)
  • ConsumerGroup下的消费者主要有两种负载均衡模式,即广播模式,和集群模式(一般使用这个)
    • 集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费
    • 广播模式下,同一个 ConsumerGroup 中的每个Consumer 实例都处理全部的队列

可靠性

生产者可靠性 - 重试策略

如果同步模式发送失败,则轮转到下一个Broker进行重试,重试2次

如果异步模式发送失败,则轮转到当前Broker进行重试,重试2次

Broker 可靠性 - 刷盘与同步机制

 

刷盘机制

刷盘方式

说明

特点

同步刷盘

写PageCache,立即刷盘,刷盘完成,返回成功

数据安全,吞吐量不大

异步刷盘

写PageCache,返回成功

依靠刷盘机制刷盘

PageCache中的消息积累到一定的量

或定时触发一次写磁盘操作

吞吐量大,性能高,PageCache可能丢失

同步机制

同步机制

说明

特地

同步复制(推荐)

主从,都写入成功后,返回成功

易恢复,写入延迟大,降低系统吞吐量

异步复制

写主成功,就返回成功

数据可能丢失,写入性能高,系统吞吐量大

消息者可靠性 - 重试策略

  • 返回CONSUME_SUCCESS才算消费完成
  • 16次消费都失败,进入死信队列
  • CONSUME_LATER按不同messageDelayLevel时间进行再次消费,最长时间为2个小时

Exactly Once需要依托于本地事务表

首选选定唯一键,msgId,或者业务唯一键,例如订单Id

如果 本地事务表中,没有就插入之后执行消费。

实例- 事务消息,顺序消息,tag过滤

一般使用pull模式消费,一个应用一个topic,多个tags模式

pom

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

配置

#nameserver地址 host:port;host:port
rocketmq.name-server = 192.168.1.8:9876
#消费者不配置
rocketmq.producer.group= wenlei-producer-group

普通消息,带tag,keys

//普通消息,带tag,keys
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult  commonMsg() {
    Message message = MessageBuilder.withPayload("消息体")
       .setHeader("KEYS", "我是Key").build();
  //topic:tag 标记要发送的tag
    SendResult sendResult = rocketMQTemplate
          .syncSend("wenlei-topic:tag1", message);
    log.info("sendResult:{},{},sendStatus{}",
             sendResult.getMsgId(),keys,sendResult.getSendStatus().name());
    return sendResult;
}

顺序消息

public SendResult  order() { 
    String shardingKey =  UUID.randomUUID().toString();
    Message message = MessageBuilder
      .withPayload("顺序消息体").setHeader("KEYS", shardingKey).build();
    SendResult sendResult = rocketMQTemplate
      .syncSendOrderly("wenlei-topic:tag1", message,shardingKey);
    log.info("sendResult:{},{},sendStatus{}"
             ,sendResult.getMsgId(),shardingKey
             ,sendResult.getSendStatus().name());
    return sendResult;
}

事务消息

一个rocketMQTemplate 只能有一个RocketMQLocalTransactionListener, 下面是做额外的ExtRocketMQTemplate

@ExtRocketMQTemplateConfiguration
@Component("extRocketMQTemplate")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

RocketMQLocalTransactionListener 执行本地事务,查询本地事务的状态。

@Slf4j
// 绑定extRocketMQTemplate
@RocketMQTransactionListener(
  rocketMQTemplateBeanName ="extRocketMQTemplate")
public class TransactionMsgListener 
  implements RocketMQLocalTransactionListener {
    @Override    
    public RocketMQLocalTransactionState
    executeLocalTransaction(Message msg, Object arg) {
        try {
           log.info("本地的业务工作");
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            e.printStackTrace();
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

  @Override    
  public RocketMQLocalTransactionState 
    checkLocalTransaction(Message msg) {
        log.info("本地的业务工作的状态");
        if(成功状态){
            return RocketMQLocalTransactionState.COMMIT;
        }else if(失败状态){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}
// 发送事务消息
public TransactionSendResult  tranction() {
    String transactionId = UUID.randomUUID().toString();
    TransactionSendResult result = this.extRocketMQTemplate
      .sendMessageInTransaction("wenlei-topic:tag2",
            MessageBuilder.withPayload(param)
            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
            .build(), param);
    return result;
}

按tag消费

@Component
@RocketMQMessageListener(
        // topic:消息的发送者使用同一个topic      
          topic = "wenlei-topic",
        //group:在RocketMQ中消费者和发送者组没有关系        
         consumerGroup = "test-group",
        //tag:设置为 * 时,表示全部。       
         selectorExpression = "tag1 || tag2 || tag3",
        //消费模式:默认 CLUSTERING ( CLUSTERING:负载均衡 )
        //( BROADCASTING:广播机制 ) 一般不用     
          messageModel = MessageModel.CLUSTERING  )
@Slf4j
public class MyConsumer implements RocketMQListener<MessageExt> {
    @Override    public void onMessage(MessageExt message) {
        log.info("consumer:{},tag:{},keys:{}",
        new String(message.getBody(), Charset.forName("utf8")),
        message.getTags(),message.getKeys());
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/648121.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机组成原理 之 第五章 中央处理器

1. CPU的功能和基本结构 &#xff08;1-1&#xff09;运算器的基本结构 a. 寄存器与ALU通讯方式一&#xff1a;专用数据通路方式 是专用数据通路方式&#xff08;并行传递&#xff09; 多路选择器&#xff08;MUX&#xff09;或三态门 b. 寄存器与ALU通讯方式二&#xff1a;CP…

chatgpt赋能python:Python中添加SEO元素的实践

Python中添加SEO元素的实践 在SEO&#xff08;Search Engine Optimization&#xff09;优化中&#xff0c;添加正确的SEO元素对网站的排名和可见性至关重要。Python作为一种广泛应用于网络开发的编程语言&#xff0c;提供了许多用于添加和管理SEO元素的工具和技术。在这篇文章…

chatgpt赋能python:Python怎么求完数

Python怎么求完数 什么是完数&#xff1f; 在数学中&#xff0c;完数指一个正整数&#xff0c;它的所有因子&#xff08;除了本身以外&#xff09;之和恰好等于该数本身。例如&#xff0c;6是一个完数&#xff0c;因为6的因子为1、2和3&#xff0c;而1 2 3 6。 Python如何…

springboot+vue在线课程大纲知识点管理系统

对于之前在线课程管理系统的管理&#xff0c;大部分都是使用传统的人工方式去管理&#xff0c;这样导致了管理效率低下、出错频率高。而且&#xff0c;时间一长的话&#xff0c;积累下来的数据信息不容易保存&#xff0c;对于查询、更新还有维护会带来不少问题。对于数据交接也…

每天一道算法题第3天--排序子序列

排序子序列 1.题目2.题目解析3.代码 1.题目 链接: 排序子序列 2.题目解析 【题目解析】&#xff1a; 本题要求解的是排序子序列&#xff0c;排序子序列为非递增或者非递减&#xff0c;很多同学在这个非递增、非递减问题上很纠 结&#xff0c;注意&#xff1a;非递减就是a[i…

facebook文本生成音乐项目-audiocraft 安装教程

文章目录 所需环境安装ffmpeg克隆项目仓库安装相关依赖库运行项目模型下载自动下载模型失败MusicGen 模型下载地址 所需环境 ffmpegpython>3.9gitcuda118&#xff08;torch>2.0&#xff09; 安装ffmpeg 下载地址 下载后解压&#xff0c;然后将解压后的目录配置到系统…

chatgpt赋能python:Python怎么求最大值

Python怎么求最大值 如果您正在寻找一种简便快捷的方法来从一组数字中找到最大值&#xff0c;那么Python就是您的选择。作为一种易于学习和使用的编程语言&#xff0c;Python在数据处理和分析方面越来越受欢迎。本文将介绍如何使用Python来找出一组数字中的最大值&#xff0c;…

MySQL数据库基本命令操作

MySQL数据库基本命令操作 一、MySQL基本命令操作指令二、查看数据库结构1&#xff0e;查看当前服务器中的数据库2&#xff0e;查看数据库中包含的表3&#xff0e;查看表的结构&#xff08;字段&#xff09; 三、SQL语句1、SQL语言分类 四、SQL语句操作1、创建及删除数据库和表2…

Nucleo-F411RE (STM32F411)LL库体验 2 -sysclk的配置

Nucleo-F411RE &#xff08;STM32F411&#xff09;LL库体验 2 -sysclk的配置 1、Nucleo-F411RE 时钟源 Nucleo-F411RE开发版只有一个8M的晶振&#xff0c;看起来像是给st-link提供时钟的&#xff0c;所以猜测F411RE时钟的来源应该来自st-link&#xff08;stm32F103c8t6&#…

数据库是什么?为什么要使用它?

作者&#xff1a;Insist-- 个人主页&#xff1a;insist--个人主页 作者会持续更新网络知识和python基础知识&#xff0c;期待你的关注 目录 一、数据库是什么&#xff1f; 二、为什么要⽤数据库 1、方便用户 2、安全的保存数据 3、利用数据库分析 三、数据库的分类 1、关…

chatgpt赋能python:Python添加包的方法

Python添加包的方法 Python是一种高级编程语言&#xff0c;拥有着强大的库和模块。在开发过程中&#xff0c;很大一部分时间会用于查找、安装和更新各种包或依赖库。本文将介绍Python添加包的方法&#xff0c;以帮助开发者更快地找到并安装所需的包及依赖库。 什么是包 在Py…

Java 中的重载(overload)和重写(override)

​ 重载和重写都是面向对象编程中的概念&#xff0c;但我们或许还听说过一种叫做覆写&#xff08;overwrite&#xff09;的概念。C 是拥有这个概念的&#xff0c;Java 只有 overload 和 override&#xff0c;Python 只有隐式的 overload 和 override&#xff0c;没有 o…

海思平台上USB WIFI的移植与局域网无线调试和视频流预览

目录 1.海思平台上USB WIFI移植概述 1.1、移植WIFI背景 1.2、移植的起点 1.3、实验案例 2.AP模式USB WIFI驱动移植 2.1、源码 2.2、修改移植 3.AP模式USB WIFI传输视频实战 3.1、部署USB WIFI驱动使之工作为AP 3.2、准备测试用例 3.3、测试实验 4.USB WIFI做sta模式…

python函数详解(超详细)

❄️作者介绍&#xff1a;奇妙的大歪❄️ &#x1f380;个人名言&#xff1a;但行前路&#xff0c;不负韶华&#xff01;&#x1f380; &#x1f43d;个人简介&#xff1a;云计算网络运维专业人员&#x1f43d; 前言 首先零基础是能学python的&#xff0c;很多编程大神入门之前…

Qt中的坐标体系和内存回收

目录 坐标体系 窗口的坐标原点 窗口的相对坐标 示例 内存回收 1. 自动垃圾回收机制 2. 对象树机制 示例 坐标体系 窗口的坐标原点 在Qt中&#xff0c;坐标系统一般是以窗口左上角为原点&#xff0c;向右为正方向X轴&#xff0c;向下为正方向Y轴。 窗口的相对坐标 在一个…

node.js+vue企业人事管理系统q731f

中小企业人事管理系统的主要开发目标如下&#xff1a; &#xff08;1&#xff09;实现管理系统信息关系的系统化、规范化和自动化&#xff1b; &#xff08;2&#xff09;减少维护人员的工作量以及实现员工对信息的控制和管理。 &#xff08;3&#xff09;方便查询信息及管理信…

chatgpt赋能python:Python编程中的警告是什么?

Python编程中的警告是什么&#xff1f; 在Python编程中&#xff0c;警告是指在程序运行时出现的一些提示信息。这些警告通常不会导致程序崩溃&#xff0c;但却表明程序中存在某些问题。例如&#xff0c;警告可以是一个不推荐使用的语法或者可能导致性能退化的某个编码模式。通…

发布关于Strve.js的动态,被Vite团队核心成员点赞的那些事!

近日&#xff0c;在浏览之前发的推特文章时&#xff0c;发现了曾经被Vite团队核心成员 Matias Capeletto 点赞的一篇动态&#xff0c;心里格外的高兴。 被大佬认可&#xff0c;也是值得开心的一件事情。毕竟独立开发一个前端框架&#xff0c;其中的艰辛也只有像参与过类似设计框…

IAB讨论互联网碎片化:互操作性是互联网的关键属性

日前&#xff0c;一场针对互联网碎片化的讨论由互联网体系结构委员会&#xff08;IAB&#xff09;发起&#xff0c;主题涉及互联网碎片化的定义、概念、治理方式与未来发展。互联网治理论坛&#xff08;IGF&#xff09;咨询顾问维姆德格泽尔与大西洋理事会成员康斯坦丁诺斯科迈…

我的个人网站 —— 直接使用GPT4

前期回顾 打造极简风格动效 —— 5 分钟轻松实现惊艳、震撼人心的视觉效果_彩色之外的博客-CSDN博客css Loading 实战教学https://blog.csdn.net/m0_57904695/article/details/131156011?spm1001.2014.3001.5501 目录 ✈ 线上预览&#xff1a; ✅ G4 WEB 效果图例 &…