RocketMQ消息汇总

news2024/9/21 12:32:46
当物理文件删除了 队列中的下标的消息也被删除了 但是即使物理删除了 队列中的偏移量还是会持续上升



每天凌晨4点  定时清理
在 RocketMQ 中,消息的物理删除是通过定期清理 CommitLog 文件来实现的。CommitLog 文件中存储的是所有主题和队列的消息,一旦这些文件中的数据超过了文件保留时间(fileReservedTime)或者文件大小限制,老旧的文件将会被删除。对于 ConsumeQueue 文件,它们的清理依赖于 CommitLog。如果 CommitLog 中的消息被删除了,那么对应的 ConsumeQueue 文件中的条目也会失效,并在随后的清理中被删除

nameserver每次不想写可以配置成环境变量



<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>5.3.0</version>
    </dependency>
</dependencies>

ACL访问控制列表

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>5.3.0</version>
</dependency>
基础演示
package com.example;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;

/**
 * @author hrui
 * @date 2024/8/2 18:07
 */
public class MQProducer {
    public static void main(String[] args) {
        //创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("mq_producer_group_test");
        //设置nameserver地址
        producer.setNamesrvAddr("10.8.0.1:9876");

        try {
            //启动生产者
            producer.start();
            //发送消息
            User user = new User("hrui",18,"北京");
            Message message = new Message("mq_topic_test",serialize(user));//生产环境最好把自动创建Topic关闭
            SendResult sendResult = producer.send(message);
            //[sendStatus=SEND_OK, msgId=24098A282821613099CCCEAC6E72517F7C0818B4AAC20921F6960000, offsetMsgId=AC15217300002A9F00000000000B186D, messageQueue=MessageQueue [topic=mq_topic_test, brokerName=broker-a, queueId=2], queueOffset=0]
            System.out.println("消息已发送,返回结果:"+sendResult);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            producer.shutdown();
        }
    }

    public static byte[] serialize(Object object) throws IOException {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
            objectOutputStream.writeObject(object);
            return byteArrayOutputStream.toByteArray();
        }
    }
}
package com.example;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.List;

/**
 * @author hrui
 * @date 2024/8/2 18:30
 */
public class MQConsumer {
    public static void main(String[] args) {
        //两种消费模式
        //DefaultMQPushConsumer:采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。
        //DefaultMQPullConsumer:消费者明确主动拉取消息,控制权完全在消费者手中,适合需要严格控制消息拉取节奏的场景。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mq_consumer_group_test");
        //设置namesrv地址
        consumer.setNamesrvAddr("10.8.0.1:9876");
        try {
            //订阅topic
            consumer.subscribe("mq_topic_test", "*");
            //注册监听器,Broker推送消息触发
            //1. MessageListenerOrderly(顺序消费):保证消息按顺序处理
            //2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序
            consumer.registerMessageListener(new MessageListenerConcurrently(){

                @Override//consumeConcurrentlyContext 是个消费上下文对象
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    //不同线程
                    System.out.println(Thread.currentThread().getName());
                    System.out.println("list.size="+list.size());
                    for (MessageExt messageExt : list) {
                        //将messageExt里的User 反序列化
                        User user = deserialize(messageExt.getBody());
                        System.out.println("消费者接收到消息: " + user);

                        //获取当前消息队列
                        System.out.println("当前消息队列: " + consumeConcurrentlyContext.getMessageQueue());//MessageQueue [topic=mq_topic_test, brokerName=broker-a, queueId=0]

                        //获取下一次消费时的延迟级别
                        int delayLevel = consumeConcurrentlyContext.getDelayLevelWhenNextConsume();
                        System.out.println("消费时的延迟级别: " + delayLevel);
                        //设置下一次消费时的延迟级别
                        //consumeConcurrentlyContext.setDelayLevelWhenNextConsume(2);

                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //启动消费者
            consumer.start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    // 反序列化方法
    public static User deserialize(byte[] data) {
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
             ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
            return (User) objectInputStream.readObject();
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
            return null;
        }
    }
}

上面示例,如果有多个消费者,消息只会被其中一个消费者消费一次 

Topic,Broker,messageQueue,queueOffset,tags大概说明
单机环境下

topic是个逻辑概念,可以在配置环境conf/broker.conf中配置 autoCreateTopicEnable=true  # 允许自动创建 Topic

在单机环境下,一个Topic主题创建后,默认会有4个队列(messageQueue),每条消息都有唯一标识,避免重复发送

当生产者发送一条消息到MQ时候,需要选择一个Topic,随机(或者说MQ有自己的策略),放到4个队列中的其中一个

而偏移量(queueOffset)其实就是该消息在该队列中的下标

tags:消息tag用于消息过滤

主从或者多个Broker下

注意注意注意::::主从关系的broker 一般只有主Broker和消费者关联,从Broker只做数据备份

多个Broker,当Topic创建之后,多个Broker可能有相同的Topic主题

但是生产者发送的消息还是会根据MQ的策略选择其中一个Broker的该主题下一般4个队列选择一个队列存放 

DefaultMQProducer

默认的消息生产者实现类,当自动创建主题开启,可以创建主题,设置队列数量(不指定,默认一个Broker 4个队列)

消息长度验证

消息发送之前,确保生产者发送的消息符合相应规范,具体规范要求是,主题名称(Topic),消息体(body)不能为空,消息长度>0并且小于4M(1024*1024*4)

死信队列的工作机制

在 RocketMQ 中,当消费者消费消息时,出现以下情况之一时,消息会被投递到死信队列:

  1. 消息被重复消费多次仍然失败:RocketMQ 中默认的最大重试次数是 16 次。如果消息在多次重试后仍然消费失败,则会被放入死信队列。

  2. 消费者抛出异常且没有成功消费消息:如果消费者在处理消息时抛出异常,并且消费未成功,则该消息会被重新放回队列并尝试重新消费。经过多次尝试仍然失败,最终会进入死信队列。

1.简单消息(普通消息)
可靠同步消息:

生产者向RocketMQ执行发送API时,同步等待,直到MQ服务器返回发送结果.

package com.example.simple;

import com.example.User;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;

/**
 * @author hrui
 * @date 2024/8/2 20:45
 */
public class SyncMQProducer {
    public static void main(String[] args) {
        //创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("mq_producer_group_test");
        //设置nameserver地址
        producer.setNamesrvAddr("10.8.0.1:9876");

        try {
            //启动生产者
            producer.start();
            //发送消息
            User user = new User("hrui", 18, "北京");
            Message message = new Message("mq_topic_test", serialize(user));
            //同步发送消息,等待返回结果
            SendResult sendResult = producer.send(message);
            System.out.println("消息已发送, 返回结果: " + sendResult);
            //根据返回结果进行后续处理
            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                System.out.println("消息发送成功");
            } else {
                System.out.println("消息发送失败");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }

    public static byte[] serialize(Object object) throws IOException {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
            objectOutputStream.writeObject(object);
            return byteArrayOutputStream.toByteArray();
        }
    }

}

可靠异步消息

生产者在发送消息后,不等待Broker的返回结果,而是继续执行后续的业务逻辑。当消息发送成功或失败时,Broker会通过回调函数在一个新的线程中通知生产者结果。这种方式可以提高系统的吞吐量和响应速度。

同步发送和异步发送都是用send方法

异步的send方法多了一个参数 SEndCallback回调

package com.example.simple;

import com.example.User;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @author hrui
 * @date 2024/8/2 20:53
 */
public class AsyncMQProducer {
    public static void main(String[] args) {
        //创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("mq_producer_group_test");
        //设置nameserver地址
        producer.setNamesrvAddr("10.8.0.1:9876");

        try {
            //启动生产者
            producer.start();
            //发送消息
            User user = new User("hrui", 18, "北京");
            Message message = new Message("mq_topic_test", serialize(user));
            //使用 CountDownLatch 确保异步发送完成
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //异步发送消息,设置回调函数处理返回结果
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    //发送成功的回调
                    System.out.println("消息发送成功:"+sendResult);
                    countDownLatch.countDown();  // 减少计数器
                }

                @Override
                public void onException(Throwable e) {
                    //发送失败的回调
                    System.out.println("消息发送失败:"+e.getMessage());
                    e.printStackTrace();
                    countDownLatch.countDown();  // 减少计数器
                }
            });
            //等待消息发送完成(设置超时时间为 5 秒)
            boolean await = countDownLatch.await(5, TimeUnit.SECONDS);
            if (!await) {
                System.out.println("消息发送超时");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭生产者
            producer.shutdown();//这里不要关闭,你这里直接关闭了,MQ回调是异步的,等回调完再关
        }
    }

    public static byte[] serialize(Object object) throws IOException {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
            objectOutputStream.writeObject(object);
            return byteArrayOutputStream.toByteArray();
        }
    }
}
单向消息

生产者只负责发送消息,不关心发送结果,也不需要等待服务器的响应。这种方式非常适合需要极高吞吐量的场景,例如日志采集。你只管发,成功失败无关紧要

简单发送sendOneway

package com.example.simple;

import com.example.User;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;

/**
 * @author hrui
 * @date 2024/8/2 21:23
 */
public class OnewayMQProducer {
    public static void main(String[] args) {
        //创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("mq_producer_group_test");
        //设置nameserver地址
        producer.setNamesrvAddr("10.8.0.1:9876");

        try {
            //启动生产者
            producer.start();
            System.out.println("生产者启动成功");

            //发送消息
            User user = new User("hrui", 18, "北京");
            Message message = new Message("mq_topic_test", serialize(user));

            //单向发送消息
            producer.sendOneway(message);
            System.out.println("消息已发送");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭生产者
            producer.shutdown();
        }
    }

    public static byte[] serialize(Object object) throws IOException {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
            objectOutputStream.writeObject(object);
            return byteArrayOutputStream.toByteArray();
        }
    }
}

以上示例消费者都可以用 

package com.example;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.List;

/**
 * @author hrui
 * @date 2024/8/2 18:30
 */
public class MQConsumer {
    public static void main(String[] args) {
        //两种消费模式
        //DefaultMQPushConsumer:采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。
        //DefaultMQPullConsumer:消费者明确主动拉取消息,控制权完全在消费者手中,适合需要严格控制消息拉取节奏的场景。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mq_consumer_group_test");
        //设置namesrv地址
        consumer.setNamesrvAddr("xxx.xxx.xxx:9876");
        try {
            //订阅topic
            consumer.subscribe("mq_topic_test", "*");
            //注册监听器,Broker推送消息触发
            //1. MessageListenerOrderly(顺序消费):保证消息按顺序处理
            //2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序
            consumer.registerMessageListener(new MessageListenerConcurrently(){

                @Override//consumeConcurrentlyContext 是个消费上下文对象
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    System.out.println(Thread.currentThread().getName());
                    System.out.println("list.size="+list.size());
                    for (MessageExt messageExt : list) {
                        //将messageExt里的User 反序列化
                        User user = deserialize(messageExt.getBody());
                        System.out.println("消费者接收到消息: " + user);

                        //获取当前消息队列
                        System.out.println("当前消息队列: " + consumeConcurrentlyContext.getMessageQueue());//MessageQueue [topic=mq_topic_test, brokerName=broker-a, queueId=0]

                        //获取下一次消费时的延迟级别
                        int delayLevel = consumeConcurrentlyContext.getDelayLevelWhenNextConsume();
                        System.out.println("消费时的延迟级别: " + delayLevel);
                        //设置下一次消费时的延迟级别
                        //consumeConcurrentlyContext.setDelayLevelWhenNextConsume(2);

                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //启动消费者
            consumer.start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    // 反序列化方法
    public static User deserialize(byte[] data) {
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
             ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
            return (User) objectInputStream.readObject();
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
            return null;
        }
    }
}
2.顺序消息

顺序消息指消费者消费某个topic的某个队列中的消息是顺序的.

要保证顺序消息两个前提

生产者:确保将相同顺序要求的消息发送到同一个Broker的同一个队列

消费者:确保只消费特定的队列内的消息

不然无法保证顺序消息

最简单的实现方式,全局就一个Broker,Broker内就一个队列

修改conf/broker.conf

# Broker 配置文件 broker.conf

defaultTopicQueueNums=1

但是这样设置会导致其他Topic也只有一个队列,事实上我们在代码里也可以指定,或者在dashboard页面中创建Topic时候指定用哪个Broker中有几个队列

能否这么做,我用不同的Topic代表不同的消息,而在创建顺序消息的Topic时候,我指定就一个Broker有并且这个broker里的队列只有一个

因自己这边本身就是单机部署的MQ

                主题                 对应放置的消息

   并且把顺序消息放到一个broker的一个队列中

下面代码用来创建Topic

package com.example.topic;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

/**
 * @author hrui
 * @date 2024/8/3 15:17
 */
public class CreateTopics {

    public static void main(String[] args) {
        // 创建带有ACL凭证的RPCHook
        AclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxx"));

        // 创建DefaultMQAdminExt实例并设置RPCHook
        DefaultMQAdminExt adminExt = new DefaultMQAdminExt(aclHook);
        adminExt.setNamesrvAddr("xxx.xxx.xxx:9876");
        //adminExt.setInstanceName("admin-instance");

        try {
            adminExt.start();

            // 创建普通消息主题
            createTopic(adminExt, "normalTopic", 4, 4, new String[]{"xxx.xxx.xxx:10911"});

            // 创建顺序消息主题,只在broker-a上创建,且只有一个队列
            createTopic(adminExt, "sequentialTopic", 1, 1, new String[]{"10.8.0.1:10911"});

            // 创建延时消息主题
            createTopic(adminExt, "delayedTopic", 4, 4, new String[]{"xxx.xxx.xxx:10911"});

            // 创建广播消息主题
            createTopic(adminExt, "broadcastTopic", 4, 4, new String[]{"xxx.xxx.xxx:10911"});

            // 创建事务消息主题
            createTopic(adminExt, "transactionTopic", 4, 4, new String[]{"xxx.xxx.xxx:10911"});

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            adminExt.shutdown();
        }
    }

    public static void createTopic(DefaultMQAdminExt adminExt, String topicName, int writeQueueNums, int readQueueNums, String[] brokers) throws Exception {
        TopicConfig topicConfig = new TopicConfig(topicName);
        topicConfig.setWriteQueueNums(writeQueueNums);
        topicConfig.setReadQueueNums(readQueueNums);
        topicConfig.setPerm(6);

        for (String broker : brokers) {
            adminExt.createAndUpdateTopicConfig(broker, topicConfig);
        }
    }
}

顺序消费,指定存入一个Broker的队列  生产者  因为上面创建sequentialTopic主题时候,里面就放了一个队列

package com.example.order;

import com.example.User;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.List;

public class OrderMQProducer {
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("mq_producer_group_test", getAclRPCHook());
        producer.setNamesrvAddr("xxx.xxx.xxx:9876");

        try {
            producer.start();

            // 发送多条消息
            for (int i = 0; i < 100; i++) {
                User user = new User("hrui-" + i, 18 + i, "北京");
                Message message = new Message("sequentialTopic", serialize(user));
                // 选择具体某个队列
                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
                        // 假设只在 broker-a 上创建了一个队列,那么直接返回该队列
                        for (MessageQueue mq : list) {
                            if ("broker-a".equals(mq.getBrokerName()) && mq.getQueueId() == 0) {
                                return mq;
                            }
                        }
                        // 如果没有找到指定的队列,则返回默认队列
                        return list.get(0);
                    }
                }, null);

                System.out.println("消息发送结果:" + sendResult);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }

    public static byte[] serialize(Object object) throws IOException {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
            objectOutputStream.writeObject(object);
            return byteArrayOutputStream.toByteArray();
        }
    }

    // 访问权限
    public static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxxx"));
    }
}

消费者订阅sequentialTopic  并且里面只有一个队列

package com.example.order;

import com.example.User;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.List;

/**
 * 要保证顺序消费
 * 消费者侧:确保每个消费者实例只消费特定的队列。
 */
public class OrderMQConsumer {
    public static void main(String[] args) {
        // 创建消费者并配置ACL
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group", getAclRPCHook());
        consumer.setNamesrvAddr("xxxx.xxx.xxx:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        try {
            // 订阅顺序消息主题
            consumer.subscribe("sequentialTopic", "*");

            // 注册顺序消息监听器
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println("消费消息: " + deserialize(msg.getBody()));
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });

            // 启动消费者
            consumer.start();
            System.out.println("顺序消息消费者启动成功");

        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    // 反序列化方法
    public static User deserialize(byte[] data) {
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
             ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
            return (User) objectInputStream.readObject();
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
            return null;
        }
    }
    // 访问权限
    public static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxxx"));
    }
}

3.广播消息

是向所有订阅了该主题的订阅者发送消息.订阅同一个topic的多个消费者,能全量收到生产者发送

的所有消息

生产者可以随意发送消息

广播模式主要在消费者端设置下广播模式的类型 

MessageModel.BROADCASTING:广播消息.一条消息会发给所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组.

MessageModel.CLUSTERING:集群消息.每条消息只会被同一个消费者组中的一个实例消费.

关于

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

这个设置

重新启动只会消费新的数据,不会消费老数据

package com.example.broadcast;

import com.example.User;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;

/**
 * 广播消息
 * @author hrui
 * @date 2024/8/3 17:17
 */
public class BroadcastProducer {

    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("broadcast-producer-group",getAclRPCHook());

        //设置nameserver地址
        producer.setNamesrvAddr("xxx.xxx.xxx:9876");
        producer.setSendMsgTimeout(10000); // 设置发送超时时间为10秒  如果循环发送消息  可能报超时  设置下
        try {
            producer.start();
            for (int i = 0; i < 10; i++) {

                User user = new User();
                user.setName("hrui_"+i);
                user.setAge(18);
                user.setAddress("北京");
                Message message = new Message("broadcastTopic", "tagA", serialize(user));

                //发送消息
                SendResult sendResult = producer.send(message);
                //根据返回结果进行后续处理
                if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                    System.out.println("消息发送成功");
                } else {
                    System.out.println("消息发送失败");
                }
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            producer.shutdown();
        }
    }
    public static byte[] serialize(Object object) throws IOException {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
            objectOutputStream.writeObject(object);
            return byteArrayOutputStream.toByteArray();
        }
    }
    public static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxx"));
    }
}

package com.example.broadcast;

import com.example.User;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.List;

/**
 * 广播消息消费者
 * 该类实现了一个广播消息的消费者,能够从RocketMQ集群中接收并处理消息。
 * 使用了RocketMQ的ACL进行权限控制。
 *
 * @author hrui
 * @date 2024/8/3 17:17
 */
public class BroadcastConsumer {

    public static void main(String[] args) {
        // 创建消费者实例,并指定消费者组名和ACL权限验证
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast-consumer-group", getAclRPCHook());

        // 设置NameServer地址
        consumer.setNamesrvAddr("xxxx.xxx.xxx:9876");
        //RocketMQ设计的目标是让所有的消费者接收到它们启动后发送到主题的所有消息,而不是从队列的历史开始位置重播所有消息
        // 设置从队列的最开始位置消费  如果不设置默认consumeFromWhere:CONSUME_FROM_LAST_OFFSET
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);


        ConsumeFromWhere consumeFromWhere = consumer.getConsumeFromWhere();
        System.out.println("consumeFromWhere:"+consumeFromWhere);

        // 设置消息模式为广播模式
        consumer.setMessageModel(MessageModel.CLUSTERING);

        try {
            // 订阅主题并指定Tag,*表示全部Tag
            consumer.subscribe("broadcastTopic", "*");

            // 注册消息监听器,处理接收到的消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        // 反序列化消息体
                        User user = deserialize(msg.getBody());
                        System.out.println("收到消息:"+user);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            // 启动消费者
            consumer.start();
            System.out.println("Broadcast Consumer started.");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 这里没有需要在finally中关闭的资源,但如果有资源需要关闭,可以在这里处理
        }
    }

    /**
     * 反序列化方法,将字节数组转换为User对象
     *
     * @param data 字节数组
     * @return User对象
     */
    public static User deserialize(byte[] data) {
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
             ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
            return (User) objectInputStream.readObject();
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取ACL权限验证的RPCHook
     *
     * @return RPCHook
     */
    public static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxx"));
    }
}

4.延迟消息

延迟消息与普通消息的不同之处在于,他们要等到指定的时间之后才会被传递

可以在conf/broker.conf中配置

延迟消息大概逻辑是  系统为延迟任务创建了topic  当然 你只需要关注于自己的topic就行了

分了18个等级对应18个队列

package com.example.schedule;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;

/**
 * @author hrui
 * @date 2024/8/3 19:34
 */
public class ScheduledProducer {

    public static void main(String[] args) throws Exception {
        // 创建生产者,指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducerGroup",getAclRPCHook());
        // 设置NameServer地址
        producer.setNamesrvAddr("xxx.xxx.xxx:9876");
        producer.setSendMsgTimeout(10000); // 设置发送超时时间为10秒   可能报超时  设置下

        // 启动生产者
        producer.start();

        int delayLevel = 3; // 延迟级别,对应某个具体的延迟时间(如1分钟)

        // 创建消息实例,指定topic,tag和消息体
        Message message = new Message("delayedTopic", "TagA", "OrderID188", "Hello scheduled message".getBytes());
        // 设置延迟级别
        message.setDelayTimeLevel(delayLevel);

        // 发送消息
        SendResult sendResult = producer.send(message);
        System.out.println("发送结果: " + sendResult);

        // 关闭生产者
        producer.shutdown();
    }

    // 访问权限
    public static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxxx"));
    }

}
package com.example.schedule;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

/**
 * @author hrui
 * @date 2024/8/3 19:34
 */
public class ScheduleConsumer {
    public static void main(String[] args) throws MQClientException {
        // 创建消费者,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumerGroup",getAclRPCHook());
        // 设置NameServer地址
        consumer.setNamesrvAddr("xxx.xxx.xxx:9876");

        // 订阅topic
        consumer.subscribe("delayedTopic", "*");

        // 注册消息监听器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                String keys = msg.getKeys();
                System.out.println("Receive message[msgId=" + msg.getMsgId() + "] " +
                        "(body: " + new String(msg.getBody()) + ", keys: " + keys + ") " +
                        "at time: " + System.currentTimeMillis());
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        // 启动消费者
        consumer.start();
        System.out.println("延迟任务消费者启动");
    }

    public static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxx"));
    }
}

5.批量消息

批量消息为了减少IO

批量消息(Batch Message)是一种将多条消息组合成一个单一的消息发送的方式,以提高消息的发送效率。通过这种方式,可以减少网络调用的次数,从而提高吞吐量。

消息不要超过4M   超过就分批次

package com.example.bach;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.ArrayList;
import java.util.List;

/**
 * @author hrui
 * @date 2024/8/3 21:56
 */
public class BatchProducer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("batch_producer_group",getAclRPCHook());
        producer.setNamesrvAddr("xxx.xxx.xxx:9876");
        producer.setSendMsgTimeout(20000); // 设置发送超时时间为10秒  如果循环发送消息  可能报超时  设置下
        producer.start();

        // 创建批量消息
        String topic = "BatchTopic";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "TagA", "OrderID001", "Hello World 0".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID002", "Hello World 1".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID003", "Hello World 2".getBytes()));

        // 发送批量消息
        SendResult sendResult = producer.send(messages);
        //根据返回结果进行后续处理
        if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
            System.out.println("消息发送成功");
        } else {
            System.out.println("消息发送失败");
        }

        // 关闭生产者
        producer.shutdown();
    }
    public static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxxx"));
    }
}
package com.example.bach;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

/**
 * @author hrui
 * @date 2024/8/3 22:13
 */
public class BatchConsumer {
    public static void main(String[] args) throws MQClientException {
        // 创建消费者实例,并指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchConsumerGroup", getAclRPCHook());
        // 设置NameServer地址
        consumer.setNamesrvAddr("xxx.xxx.xxx:9876");
        // 订阅topic
        consumer.subscribe("BatchTopic", "*");

        // 注册消息监听器,处理接收到的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("当前线程:"+Thread.currentThread().getName()+",消息集合的大小:"+msgs.size());//不同线程推
                for (MessageExt msg : msgs) {
                    System.out.println(Thread.currentThread().getName() + " 收到新消息: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("批量消费者已启动.");
    }

    public static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxxx"));
    }
}

当超过4M了怎么办

package com.example.batch;

import com.example.bach.ListSplitter;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;

/**
 * @author hrui
 * @date 2024/8/3 22:21
 */
public class BatchProducer4M {

    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException {
        // 创建一个生产者实例,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroup", getAclRPCHook());
        // 设置NameServer地址
        producer.setNamesrvAddr("xxx.xxx.xxx:9876");
        producer.setSendMsgTimeout(50000); // 设置发送超时时间为50秒

        // 启动生产者
        producer.start();

        // 创建消息列表
        List<Message> messages = new ArrayList<>();
        for (int i = 0; i < 100000; i++) {
            byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET);
            Message msg = new Message("BatchTopic", "TagA", "OrderID188", body);
            messages.add(msg);
        }

        // 使用ListSplitter将大消息分割成小消息
        ListSplitter splitter = new ListSplitter(messages);

        while (splitter.hasNext()) {
            try {
                List<Message> listItem = splitter.next();
                SendResult sendResult = producer.send(listItem);
                if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                    System.out.println("消息发送成功");
                } else {
                    System.out.println("消息发送失败");
                }
            } catch (Exception e) {
                e.printStackTrace();
                // 处理发送失败的情况
            }
        }

        // 关闭生产者
        producer.shutdown();
    }

    public static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxxxx"));
    }
}
package com.example.bach;

import org.apache.rocketmq.common.message.Message;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * 将所有传入的消息按照 <SIZE_LIMIT> 的要求进行分块
 */
public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024 * 4; // 4MB
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int startIndex = getStartIndex();
        int nextIndex = startIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = calcMessageSize(message);
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        List<Message> subList = messages.subList(startIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

    /**
     * 获取从currIndex开始第一个符合<=SIZE_LIMIT的元素
     * @return currIndex
     */
    private int getStartIndex() {
        Message currMessage = messages.get(currIndex);
        int tmpSize = calcMessageSize(currMessage);
        while (tmpSize > SIZE_LIMIT) {
            currIndex += 1;
            Message message = messages.get(currIndex);
            tmpSize = calcMessageSize(message);
        }
        return currIndex;
    }

    /**
     * 计算消息的大小,包括主题,属性和日志开销
     * @param message
     * @return 消息大小(字节)
     */
    private int calcMessageSize(Message message) {
        int tmpSize = message.getTopic().length() + message.getBody().length;
        Map<String, String> properties = message.getProperties();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length();
        }
        tmpSize = tmpSize + 180; // 增加日志的开销20字节
        return tmpSize;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("不支持删除操作");
    }
}
6.过滤消息

Tag标记主要就是用来过滤的,如果将Topic看出1级目录  那么Tag可以看出2级

RocketMQ 提供了两种方式:基于标签(Tag)的过滤和基于 SQL92 的过滤

基于标签(Tag)的过滤

基于标签的过滤是 RocketMQ 默认支持的过滤方式。生产者发送消息时指定一个标签,消费者订阅消息时也指定相应的标签,从而只接收指定标签的消息。

按 Tag 过滤

consumer.subscribe("TopicTest", "TagA || TagB");
 

package com.example.filter;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;

/**
 * @author hrui
 * @date 2024/8/3 23:07
 */
public class TagProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("TagProducerGroup", getAclRPCHook());
        producer.setNamesrvAddr("xxx.xxx.xxx:9876");
        producer.setSendMsgTimeout(10000); // 设置发送超时时间为10秒  如果循环发送消息  可能报超时  设置下
        producer.start();

        //发送带有TagA标签的消息
        Message messageA = new Message("TagTopic", "TagA", "Hello TagA".getBytes());
        SendResult sendResultA = producer.send(messageA);
        System.out.println("Send Result A: " + sendResultA);

        //发送带有TagB标签的消息
        Message messageB = new Message("TagTopic", "TagB", "Hello TagB".getBytes());
        SendResult sendResultB = producer.send(messageB);
        System.out.println("Send Result B: " + sendResultB);

        producer.shutdown();
    }

    public static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxxx"));
    }
}

package com.example.filter;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

/**
 * @author hrui
 * @date 2024/8/3 23:07
 */
public class TagConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumerGroup", getAclRPCHook());
        consumer.setNamesrvAddr("xxxx.xxx.xxx:9876");

        //只订阅带有TagA标签的消息
        consumer.subscribe("TagTopic", "TagA");

        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                System.out.println("Received Message: " + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.println("Tag Consumer Started.");
    }

    public static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxxx"));
    }
}
基于 SQL92 的过滤

基于 SQL92 的过滤需要在 Broker 上配置 enablePropertyFilter=true,并且生产者发送消息时添加自定义属性,消费者通过 SQL92 表达式进行过滤。

SQL92 过滤 需要用到  MessageSelector

按 SQL92 表达式过滤

consumer.subscribe("TopicTest", MessageSelector.bySql("a > 5"));
 

package com.example.filter;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;

/**
 * @author hrui
 * @date 2024/8/3 23:11
 */
public class SQLProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("SQLProducerGroup", getAclRPCHook());
        producer.setNamesrvAddr("xxx.xxx.xxx:9876");
        producer.setSendMsgTimeout(10000); // 设置发送超时时间为10秒  如果循环发送消息  可能报超时  设置下
        producer.start();

        //发送带有自定义属性的消息
        Message message = new Message("SQLTopic", "TagA", "Hello SQL".getBytes());
        message.putUserProperty("age", "20");
        SendResult sendResult = producer.send(message);
        System.out.println("Send Result: " + sendResult);


        Message message2 = new Message("SQLTopic", "TagB", "Hello SQL".getBytes());
        message.putUserProperty("age", "10");
        SendResult sendResult2= producer.send(message2);
        System.out.println("Send Result: " + sendResult2);

        producer.shutdown();
    }

    public static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxx"));
    }
}
package com.example.filter;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class SQLConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SQLConsumerGroup", getAclRPCHook());
        consumer.setNamesrvAddr("xxx.xxx.xxx:9876");

        // 通过SQL92表达式订阅消息
        consumer.subscribe("SQLTopic", MessageSelector.bySql("age > 18"));

        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.println("接收到消息:");
            for (MessageExt msg : msgs) {
                System.out.println("Received Message: " + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.println("SQL Consumer Started.");
    }

    public static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxxx"));
    }
}
7.事务消息

RocketMQ的事务是一个两阶段提交消息实现,确保分布式系统的最终一致性

事务消息的三种状态

1.TransactionStatus.CommitTransaction:提交事务,表示允许消费者消费该消息

2.TransactionStatus.RollbackTransaction:回滚事务,表示该消息将被删除,不允许消费

3.TransactionStatus.Unknow:中间状态,表示需要MQ回查才能确定状态

RocketMQ事务消息主要在生产者处理.

  1. 发送事务消息
    • 事务消息的生产者先发送一条半消息(half message)到Broker。这条消息会被标记为未决状态(pending status)。
  2. Broker接收半消息并回复
    • Broker接收到半消息后,会持久化这条消息,并回复生产者确认接收成功。这时消息还不会被消费者消费。
  3. 执行本地事务
    • 生产者接收到Broker的确认回复后,开始执行本地事务操作。
  4. 提交本地事务状态
    • 本地事务执行完毕后,生产者会根据事务执行的结果提交事务状态(commit或rollback)给Broker。
  5. Broker处理事务状态
    • 如果生产者提交的是commit状态,Broker会将这条消息的状态更新为可消费状态,消费者可以消费这条消息。
    • 如果生产者提交的是rollback状态,Broker会删除这条消息,消费者不会看到这条消息。
  6. 事务状态回查
    • 如果在步骤4中,生产者提交事务状态失败或超时,Broker会定期回查生产者的事务状态(通过TransactionListener),确认本地事务的最终状态。根据回查结果,Broker会决定提交还是回滚消息。

从图中的流程可以看出,这一流程图正确反映了RocketMQ事务消息的处理步骤:

  1. 发送事务消息(半消息)。
  2. Broker接收并确认半消息。
  3. 生产者执行本地事务。
  4. 生产者提交本地事务状态给Broker。
  5. Broker根据事务状态更新消息状态。
  6. 若事务状态提交失败,Broker进行回查。(如果第4不生产者提交事务状态失败,超时,或者Unknow状态时候,开启回查)

package com.example.transaction;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

/**
 * @author hrui
 * @date 2024/8/4 0:31
 */
public class TransactionProducer {

    public static void main(String[] args) throws Exception {
        //创建一个事务性消息生产者,并指定生产者组名
        TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup", getAclRPCHook());
        producer.setNamesrvAddr("xxxx.xxx.xxxx:9876");
        producer.setSendMsgTimeout(20000); //设置发送超时时间为10秒   可能报超时  设置下
        //设置事务监听器
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                System.out.println("执行本地事务,消息:" + new String(msg.getBody()));
                // 执行本地事务
                boolean localTransactionSuccess = executeLocalTransactionLogic();
                return localTransactionSuccess ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("回查本地事务,消息:" + new String(msg.getBody()));
                // 根据本地事务执行结果返回COMMIT_MESSAGE或ROLLBACK_MESSAGE
                boolean localTransactionSuccess = checkLocalTransactionResult();
                return localTransactionSuccess ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
            }
        });

        // 启动生产者
        producer.start();

        // 发送事务消息
        Message message = new Message("TransactionTopic", "TagA", "Hello Transaction Message".getBytes());
        SendResult sendResult = producer.sendMessageInTransaction(message, null);
        System.out.println("发送结果:" + sendResult);

        // 保持生产者运行以接收事务回查
        Thread.sleep(100000);

        // 关闭生产者
        producer.shutdown();
    }

    private static boolean executeLocalTransactionLogic() {
        // 执行本地事务逻辑,如数据库操作
        // 返回true表示本地事务成功,返回false表示失败
        System.out.println("本地事务执行成功");
        return true;
    }

    private static boolean checkLocalTransactionResult() {
        // 检查本地事务执行结果
        // 返回true表示本地事务成功,返回false表示失败
        return true;
    }

    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxxx"));
    }
}
package com.example.transaction;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

/**
 * @author hrui
 * @date 2024/8/4 0:30
 */
public class TransactionConsumer {
    public static void main(String[] args) throws Exception {
        //创建消费者,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TransactionConsumerGroup", getAclRPCHook());
        consumer.setNamesrvAddr("xxx.xxx.xxx:9876");

        //订阅主题
        consumer.subscribe("TransactionTopic", "*");

        //注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("接收到消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //启动消费者
        consumer.start();
        System.out.println("消费者启动完成。");
    }

    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxxx"));
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1977389.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于图片导入Eagle弹出“抱歉,eagle发生了一些问题”的解决办法 | 如何查看Eagle调试报告查询错误文件方法

教程不易&#xff0c;希望得到关注 先说解决办法 使用格式工厂将所有图片或报错图片文件再次转为JPG文件&#xff0c;即可正常导入。 官网入口 http://www.pcgeshi.com/ 吐槽一下现在搜索软件搜“格式工厂官网”第一页全是盗版软件和流氓网页&#xff0c;什么什么金X 风X格式…

使用 Streamlit 和 Python 构建 Web 应用程序

一.介绍 在本文中&#xff0c;我们将探讨如何使用 Streamlit 构建一个简单的 Web 应用程序。Streamlit 是一个功能强大的 Python 库&#xff0c;允许开发人员快速轻松地创建交互式 Web 应用程序。Streamlit 旨在让 Python 开发人员尽可能轻松地创建 Web 应用程序。以下是一些主…

TCP/UDP Socket 测试小工具,作为网工不可以不知道

背景 阿祥今天推荐一款TCP/UDP Socket 测试工具&#xff0c;所谓TCP/IP调试工具是用于在TCP/UDP的应用层上进行通信连接、数据传输的Windows工具。所谓应用层上就是说&#xff0c;TCP调试工具是不涉及TCP/IP协议层实现的问题&#xff0c;而只是利用TCP/IP进行数据传输的工具。 …

建模杂谈系列246 数据模型

说明 如果说微服务化(API接口、Web页面、Docker镜像)是架构方面的基准&#xff0c;那么数据模型就是逻辑处理方面的基准 内容 以下是一个样例&#xff1a; import redef extract_utf8_chars(input_string None):# 定义一个正则表达式&#xff0c;用于匹配所有的UTF-8字符utf…

OpenStack Yoga版安装笔记(十一)nova安装(上)

1、官方文档 OpenStack Installation Guidehttps://docs.openstack.org/install-guide/ 本次安装是在Ubuntu 22.04上进行&#xff0c;基本按照OpenStack Installation Guide顺序执行&#xff0c;主要内容包括&#xff1a; 环境安装 &#xff08;已完成&#xff09;OpenStack…

一文详解大模型蒸馏工具TextBrewer

原文&#xff1a;https://zhuanlan.zhihu.com/p/648674584 本文分享自华为云社区《TextBrewer&#xff1a;融合并改进了NLP和CV中的多种知识蒸馏技术、提供便捷快速的知识蒸馏框架、提升模型的推理速度&#xff0c;减少内存占用》&#xff0c;作者&#xff1a;汀丶。 TextBre…

谷粒商城实战笔记-122~124-全文检索-ElasticSearch-分词

文章目录 一&#xff0c;122-全文检索-ElasticSearch-分词-分词&安装ik分词二&#xff0c;124-全文检索-ElasticSearch-分词-自定义扩展词库1&#xff0c;创建nginx容器1.1 创建nginx文件夹1.2 创建nginx容器获取nginx配置1.3 复制nginx容器配置文件1.4 删除临时的nginx容器…

《Milvus Cloud向量数据库指南》——什么是高可用:深入理解数据库系统中的高可用性架构

什么是高可用:深入理解数据库系统中的高可用性架构 在信息技术日新月异的今天,高可用性(High Availability,简称HA)已成为衡量一个系统,尤其是数据库系统稳定性和可靠性的重要标准。高可用性的核心目标在于确保系统能够持续不断地提供服务,最大限度地减少因维护活动、硬…

从零开始安装Jupyter Notebook和Jupyter Lab图文教程

前言 随着人工智能热浪&#xff08;机器学习、深度学习、卷积神经网络、强化学习、AGC以及大语言模型LLM, 真的是一浪又一浪&#xff09;的兴起&#xff0c;小伙伴们Python学习的热情达到了空前的高度。当我20年前接触Python的时候&#xff0c;做梦也没有想到Python会发展得怎么…

【初阶数据结构题目】10. 链表的回文结构

链表的回文结构 点击链接做题 思路1&#xff1a;创建新的数组&#xff0c;遍历原链表&#xff0c;遍历原链表&#xff0c;将链表节点中的值放入数组中&#xff0c;在数组中判断是否为回文结构。 例如&#xff1a; 排序前&#xff1a;1->2->2->1 设置数组来存储链表&a…

KubeSphere 最佳实战:探索 K8s GPU 资源的管理,在 KubeSphere 上部署 AI 大模型 Ollama

转载&#xff1a;KubeSphere 最佳实战&#xff1a;探索 K8s GPU 资源的管理&#xff0c;在 KubeSphere 上部署 AI 大模型 Ollama 随着人工智能、机器学习、AI 大模型技术的迅猛发展&#xff0c;我们对计算资源的需求也在不断攀升。特别是对于需要处理大规模数据和复杂算法的 AI…

数据恢复软件:电脑丢失文件,及时使用数据恢复软件恢复!

数据恢复软件什么时候会用到&#xff1f; 答&#xff1a;如果真的不小心删除文件&#xff0c;清空回收站&#xff0c;电脑重装系统等情况发生&#xff0c;我们要懂的及时停止使用电子设备&#xff0c;使用可靠的数据恢复软件&#xff0c;帮助我们恢复这些电子设备的数据&#…

【SQL Server 】故障排除:端口冲突排查、网络问题诊断及日志分析与监控6.1 端口冲突排查

目录 第6章&#xff1a;故障排除 端口冲突排查 示例&#xff1a;使用 PowerShell 排查端口冲突 网络问题诊断 示例&#xff1a;使用 Wireshark 捕获 SQL Server 网络流量 日志分析与监控 示例&#xff1a;使用 SQL Server Profiler 监控网络连接 安全注意事项 第6章&am…

Celery:Python异步任务处理的终极利器

文章目录 **Celery&#xff1a;Python异步任务处理的终极利器**第一部分&#xff1a;背景介绍异步任务处理的挑战为什么选择Celery&#xff1f;引入Celery 第二部分&#xff1a;Celery概述什么是Celery&#xff1f; 第三部分&#xff1a;安装Celery使用pip安装Celery 第四部分&…

腰部 KOL 发展潜力预测与企业定制 AI 智能名片 O2O 商城小程序的协同发展

摘要&#xff1a;随着社交媒体和内容创作平台的蓬勃发展&#xff0c;KOL&#xff08;关键意见领袖&#xff09;在品牌推广和营销领域的作用日益凸显。在头部 KOL 资源竞争激烈的当下&#xff0c;腰部 KOL 成为了新的运营重点。然而&#xff0c;挖掘有潜力的腰部 KOL 并非易事。…

【机器学习】重塑游戏世界:机器学习如何赋能游戏创新与体验升级

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀目录 &#x1f50d;1. 引言&#xff1a;游戏世界的变革前夜&#x1f4d2;2. 机器学习驱动的游戏创新&#x1f31e;智能化游戏设计与开发&…

项目实战_图书管理系统(简易版)

你能学到什么 一个简单的项目——图书管理系统&#xff08;浏览器&#xff1a;谷歌&#xff09;基础版我们只做两个功能&#xff08;因为其它的功能涉及的会比较多&#xff0c;索性就放在升级版里了&#xff0c;基础版先入个门&#xff09; 登录: ⽤⼾输⼊账号,密码完成登录功…

华水2022年专升本计算机培养方案

华水2022年专升本计算机培养方案 文章目录 华水2022年专升本计算机培养方案计科第一学期第二学期第三学期第四学期 软工第一学期第二学期第三学期第四学期 计科 第一学期 通识必修课 大学外语线性代数离散数学 专业基础课 高级语言程序设计 专业选修课 Java 第二学期 通识…

我知道越来越多的专业摄影师在他们的修饰工作流程中使用 Portraiture,因为它可以让你在保持重要纹理的同时使皮肤非常光滑

Portraiture4.5新版功能亮点&#xff1a; 1. 高级皮肤修饰技术&#xff1a;4.5版本引入了更为先进的皮肤修饰算法&#xff0c;能够更自然地平滑皮肤&#xff0c;同时保留必要的皮肤纹理和细节&#xff0c;实现专业级别的人像修饰效果。 Portraiture4.5新版 2. 智能面部特征识…

计算机的错误计算(五十一)

摘要 探讨 的符号。 例1. 请确定 的符号[1]。 在计算过程中&#xff0c;若保留8位、16位、20位有效数字&#xff0c;则计算过程与结果分别如下: 若在Windows 10&#xff0c;Visual Studio 2010下计算&#xff1a; #include <math.h>double ysin(pow(2,(double)1…