SpringBoot整合【RocketMQ】

news2025/1/20 2:49:01

目录

1.POM文件添加依赖及yml配置

2.RocketmqUtil

3.生产者(异步发送示例)

4.消费者

5.测试


1.POM文件添加依赖及yml配置

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: My_Group
    send-message-timeout: 3000
    retry-times-when-send-failed: 3
    retry-times-when-send-async-failed: 3

2.RocketmqUtil

package com.kaying.marketing.platform.common.util.rocketMq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * @Description: RocketMQ消息的生产者
 * @Author: hwk
 */

@Component
@Slf4j
public class RocketMqUtil {
    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    public void sendMsg(String topic,String data) {
        rocketMqTemplate.convertAndSend(topic,data);
        log.info("【RocketMQ】发送同步消息:{}", data);
    }

    public void asyncSend(String topic, String tag, String data,Integer messageDelayLevel) {
        rocketMqTemplate.asyncSend(topic + ":" + tag, MessageBuilder.withPayload(data).build(), new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        // 消息发送成功
                        log.error("消息发送成功"+sendResult);
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        // 消息发送异常
                        log.error("异步发送消息异常。topic:" + topic + ";tag:" + tag + ";mqMsg" + data, throwable);
                    }
                },
                3000L,
                // messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
                // messageDelayLevel = 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18 19
                messageDelayLevel);
    }

    /**
     * 发送同步消息:消息响应后发送下一条消息
     *
     * @param topic 消息主题
     * @param tag   消息tag
     * @param key   业务号
     * @param data  消息内容
     */
    public void sendSyncMsg(String topic, String tag, String key, String data) {
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        SendResult sendResult = rocketMqTemplate.syncSend(destination, message);

        log.info("【RocketMQ】发送同步消息:{}", sendResult);
    }

    /**
     * 发送异步消息:异步回调通知消息发送的状况
     *
     * @param topic 消息主题
     * @param tag   消息tag
     * @param key   业务号
     * @param data  消息内容
     */
    public void sendAsyncMsg(String topic, String tag, String key, String data) {
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        rocketMqTemplate.asyncSend(destination, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("【RocketMQ】发送异步消息:{}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.info("【RocketMQ】发送异步消息异常:{}", e.getMessage());
            }
        });
    }


    /**
     * 发送单向消息:消息发送后无响应,可靠性差,效率高
     *
     * @param topic 消息主题
     * @param tag   消息tag
     * @param key   业务号
     * @param data  消息内容
     */
    public void sendOneWayMsg(String topic, String tag, String key, String data) {
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        rocketMqTemplate.sendOneWay(destination, message);
    }


    /**
     * 同步延迟消息
     *
     * @param topic      主题
     * @param tag        标签
     * @param key        业务号
     * @param data       消息体
     * @param timeout    发送消息的过期时间
     * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     *
     */
    public void sendSyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {
        // messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
        // messageDelayLevel = 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18 19
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        SendResult sendResult = rocketMqTemplate.syncSend(destination, message, timeout, delayLevel);
        log.info("【RocketMQ】发送同步延迟消息:{}", sendResult);
    }


    /**
     * 异步延迟消息
     *
     * @param topic      主题
     * @param tag        标签
     * @param key        业务号
     * @param data       消息体
     * @param timeout    发送消息的过期时间
     * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     */
    public void sendAsyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        rocketMqTemplate.asyncSend(destination, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("【RocketMQ】发送异步延迟消息:{}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.info("【RocketMQ】发送异步延迟消息异常:{}", e.getMessage());
            }
        }, timeout, delayLevel);
    }


    /**
     * 同步顺序消息
     *
     * @param topic 主题
     * @param tag   标签
     * @param key   业务号
     * @param data  消息体
     */
    public void sendSyncOrderlyMsg(String topic, String tag, String key, String data) {
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        SendResult sendResult = rocketMqTemplate.syncSendOrderly(destination, message, key);
        log.info("【RocketMQ】发送同步顺序消息:{}", sendResult);
    }


    /**
     * 异步顺序消息
     *
     * @param topic 主题
     * @param tag   标签
     * @param key   业务号
     * @param data  消息体
     */
    public void sendAsyncOrderlyMsg(String topic, String tag, String key, String data) {
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        rocketMqTemplate.asyncSendOrderly(destination, message, key, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("【RocketMQ】发送异步顺序消息:{}", sendResult);
            }
            @Override
            public void onException(Throwable e) {
                log.info("【RocketMQ】发送异步顺序消息异常:{}", e.getMessage());
            }
        });
    }
}

3.生产者(异步发送示例)

//异步发送消息代码示例
rocketMqUtil.sendAsyncMsg(RocketConstant.TEST_TOPIC1, RocketConstant.TEST_TAG1, UUID.randomUUID().toString(), "测试消息一");

4.消费者

简单的负载均衡消费的示例(指定topic和tag,相同的组即为负载均衡消费)

也可以指定不同的topic和不同的tag进行消息区分

注意线上和本地连接同一个MQ也会导致负载均衡,导致线上消息丢失

    @RocketMQMessageListener(consumerGroup = "1",
            topic = RocketConstant.TEST_TOPIC1,
            selectorExpression = RocketConstant.TEST_TAG1)
    @Service
    public class RocketConsumerTag1 implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            String orderNo = message;
            log.info("tag1,接收:{}", orderNo);

        }
    }

    @RocketMQMessageListener(consumerGroup ="1",
            topic = RocketConstant.TEST_TOPIC1,
            selectorExpression = RocketConstant.TEST_TAG1)
    @Service
    public class RocketConsumerTag2 implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            String orderNo = message;
            log.info("tag2,接收:{}", orderNo);

        }
    }

5.测试

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1496103.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

html--3D爱心

文章目录 代码效果 代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>爱心</title><style type"text/css">*{margin: 0px;border: 0px;}body{overflow: hidden;background-…

简单整理vue-router,路由知识

1.项目中引入 1.1 安装注册 1.2 封装抽离 在main.js中 书写,会造成单个js文件过于臃肿的情况,需要将路由配置部分抽离出来,在src下新建router文件夹,新建index.js文件 import Vue from vue import VueRouter from vue-router import HomeView from ../views/HomeView.vue im…

Wireshark——获取指定协议的数据包

1、问题 使用Wireshark捕获了大量的数据包&#xff0c;但是只想要指定协议的数据包。 2、方法 例如&#xff0c;只想要Modbus/TCP协议的数据包。 在应用显示过滤器中输入协议的名称&#xff08;小写&#xff09;&#xff0c;回车。 选择文件&#xff0c;导出特定分组。 将所…

Matlab 机器人工具箱 RobotArm类

文章目录 1 RobotArm1.1 方法1.2 注意2 RobotArm.RobotArm3 RobotArm.cmove4 其他官网:Robotics Toolbox - Peter Corke 1 RobotArm 串联机械臂类 1.1 方法 方法描述plot显示机器人的图形表示teach驱动物理和图形机器人mirror使用机器人作为从机来驱动图形</

影响哈默纳科Harmonic减速机使用寿命的5大因素

哈默纳科HarmonicDrive减速机以其轻量、小型、传动效率高、减速范围广、精度高等特点&#xff0c;被广泛应用于各种传动系统中。然而&#xff0c;尽管哈默纳科Harmonic减速机具有诸多优势&#xff0c;但其使用寿命仍可能受到多种因素的影响。 首先&#xff0c;环境因素对哈默纳…

【ESP32 IDF快速入门】点亮第一个LED灯与流水灯

文章目录 前言一、有哪些工作模式&#xff1f;1.1 GPIO的详细介绍1.2 GPIO的内部框图输入模式输出部分 二、GPIO操作函数2.1 GPIO 汇总2.2 GPIO操作函数gpio_config配置引脚reset 引脚函数设置引脚电平选中对应引脚设置引脚的方向 2.3 点亮第一个灯 三、流水灯总结 前言 ESP32…

基于深度学习的苹果叶片病害检测系统(含UI界面、yolov8、Python代码、数据集)

项目介绍 项目中所用到的算法模型和数据集等信息如下&#xff1a; 算法模型&#xff1a;     yolov8 yolov8主要包含以下几种创新&#xff1a;         1. 可以任意更换主干结构&#xff0c;支持几百种网络主干。 数据集&#xff1a;     网上下载的数据集&#x…

UE4 Niagara 关卡1.4官方案例解析

sprites can face the camera&#xff0c;or they can face any arbitrary vector&#xff0c;in this case the vector between the center of the system and the particle itself&#xff08;粒子可以面对摄影机&#xff0c;也可以面对任意向量&#xff0c;在这个实例中的向…

为国产信创服务器提供LDAP统一身份认证方案

金融信创作为 8 大行业信创之首&#xff0c;早已成为其他行业信创建设的参考。金融行业有着极为复杂的业务场景&#xff0c;对系统有着极高的稳定可靠需求&#xff0c;因此&#xff0c;在寻找微软 AD 国产化替代方案时&#xff0c;常会涉及到更深层次的场景。例如&#xff0c;最…

免费体验重保利器!AI加持智胜攻防,企业安全巡查活动等你加入

两会时刻&#xff0c;重保启动 今年&#xff0c;亚信安全护航重保 又有新“利器”加持 新增AI智能降噪算法的 “外部攻击面管理”服务 升级加入攻防“编制” 国内TOP级攻防专家团队&#xff0c;亚信安全北极狐高级攻防实验室赋能支撑&#xff0c;正式推出“攻防利器系列行动…

【b站咸虾米】1 Vue介绍 2021最新Vue从基础到实例高级_vue2_vuecli脚手架博客案例

课程地址&#xff1a;【2021最新Vue从基础到实例高级_vue2_vuecli脚手架博客案例】 https://www.bilibili.com/video/BV1pz4y1S7bC/?share_sourcecopy_web&vd_sourceb1cb921b73fe3808550eaf2224d1c155 感觉尚硅谷的Vue看完忘得差不多了&#xff0c;且之前学过咸虾米的unia…

Python与FPGA——sobel边缘检测

文章目录 前言一、sobel边缘检测二、Python sobel边缘检测三、FPGA sobel边缘检测总结 前言 边缘存在于目标、背景区域之间&#xff0c;它是图像分割所依赖的较重要的依据&#xff0c;也是图像匹配的重要特征。边缘检测在图像处理和计算机视觉中&#xff0c;尤其在图像的特征提…

Day37 socket、TCP、UDP

socket类型 流式套接字(SOCK_STREAM) TCP 提供了一个面向连接、可靠的数据传输服务&#xff0c;数据无差错、无重复的发送且按发送顺序接收。内设置流量控制&#xff0c;避免数据流淹没慢的接收方。数据被看作是字节流&#xff0c;无长度限制。 数据报套接字(SOCK_DGRAM) UD…

InnoDB存储引擎对MVCC的实现

MVCC MVCC的目的 在搞清楚MVCC之前,我们要搞懂一个问题,MVCC到底解决的是什么问题? 我用一句话概括,那就是为了解决读-写可以一起的问题! 在我们的印象里,InnoDB可以读读并发,不能读写并发,或者写写并发 这是很正常的想法,因为如果读写并发的化,会有并发问题 而对于写写…

设计模式:什么是设计模式?①

一、什么是设计模式&#xff1f; 1. 是一类程序设计思想 2. 是在大量实践过程中摸索总结出的标准经验提炼 3. 具有多样性和丰富性&#xff0c;不同情况应用的思想不同 二、设计模式的好处 1. 代码生产力和效率的提升 2. 让代码表现更为规整&#xff0c;简洁。阅读维护管理的成本…

InfluxDB SHOW SERIES语句按照什么顺序返回?

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。 本作品 (李兆龙 博文, 由 李兆龙 创作)&#xff0c;由 李兆龙 确认&#xff0c;转载请注明版权。 文章目录 引言样例SHOW SERIES比较原理结论结束语 引言 influxdb的计算引擎为了做到自底而上的…

曲线曲面 - 连续性, 坐标变换矩阵

连续性 有两种&#xff1a;参数连续性&#xff08;Parametric Continuity&#xff09;、几何连续性&#xff08;Geometric Continuity&#xff09;参数连续性&#xff1a; 零阶参数连续性&#xff0c;记为&#xff0c;指相邻两段曲线在结合点处具有相同的坐标 一阶参数连续性&…

前缀和+哈希表:联手合击Leetcode 560.和为k的子数组

题目 给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 该数组中和为 k 的子数组的个数 。 子数组是数组中元素的连续非空序列。 示例 1&#xff1a; 输入&#xff1a;nums [1,1,1], k 2 输出&#xff1a;2示例 2&#xff1a; 输入&#xff1a;nums [1,2…

GPT-4技术解析:与Claude3、Gemini、Sora的技术差异与优势对比

【最新增加Claude3、Gemini、Sora、GPTs讲解及AI领域中的集中大模型的最新技术】 2023年随着OpenAI开发者大会的召开&#xff0c;最重磅更新当属GPTs&#xff0c;多模态API&#xff0c;未来自定义专属的GPT。微软创始人比尔盖茨称ChatGPT的出现有着重大历史意义&#xff0c;不亚…

【笔记】OpenHarmony和HarmonyOS区别及应用开发简介

一、概念 OpenHarmony(OH) &#xff1a; OpenAtom OpenHarmonyHarmonyOS(HO)&#xff1a;开发 | 华为开发者联盟 (huawei.com) HO当前最高是3.1&#xff0c;在华为mate 60上面也是。关于4.0、5.0和next这类版本说法都是面向用户的&#xff0c;不是开发人员。对于程序员&#…