RocketMQ基础API使用以及基本原理探究

news2024/11/24 15:31:44

文章目录

    • 同步发送
    • 异步发送
    • 单向发送
    • 拉模式
      • 随机获取一个queue的消息
      • 指定一个queue的消息
    • 顺序消息
    • 广播消息
    • 延迟消息
    • 批量消息
    • 过滤消息
      • Tag过滤
      • sql过滤
    • 事务消息
    • RocketMQ常见问题
      • RocketMQ如何保证消息不丢失?
      • RocketMQ的消息持久化机制
      • RocketMQ如何保证消息顺序
      • RocketMQ事务消息原理

同步发送

等待消息返回后再继续进行下面的操作

适合可靠性要求较高,数据量小,实时响应

消费者:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class SyncConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Sync","*");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

生产者:

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class SyncProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");

        for(int i=0;i<2;i++){
            Message message = new Message("Sync", "SyncTags", (i+"_SyncProducer").getBytes(StandardCharsets.UTF_8));
            // 同步发送
            SendResult sendResult = producer.send(message);
            System.out.println(i+"_消息发送成功"+sendResult);
        }

        producer.shutdown();

    }
}

其中消费者的输出为:

0_消息消费成功_0_SyncProducer
0_消息消费成功_1_SyncProducer

前面都是0说明这种推模式broker是分两次推送的

异步发送

不等待消息返回直接进入后续的流程

消费者:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class AsyncConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Async","*");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

生产者:

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/10
 */
public class AsyncProducer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");
        CountDownLatch countDownLatch = new CountDownLatch(100);

        for(int i=0;i<100;i++){
            final int index= i;

            Message message = new Message("Async", "AsyncTags", (i+"_AsyncProducer").getBytes(StandardCharsets.UTF_8));
            // 异步发送,回调方法由broker调用
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(index +"_消息发送成功_"+sendResult);
                    countDownLatch.countDown();
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.println(index +"_消息发送失败_"+throwable.getStackTrace());
                    countDownLatch.countDown();
                }
            });
        }

        // 主线程等待异步发送结束
        countDownLatch.await();
        System.out.println("发送结束");
        producer.shutdown();
    }
}

如果没有countDownLatch.await(),会出现下面的情况:

生产者启动成功
发送结束
0_消息发送失败_[Ljava.lang.StackTraceElement;@10c89bd0
10_消息发送失败_[Ljava.lang.StackTraceElement;@3d181392
11_消息发送失败_[Ljava.lang.StackTraceElement;@69da09fa
....

就是producer主线程先关闭了,但是异步发送还没结束,深入源码看这个send是用ThreadPoolExecutor异步线程池

其中消费者的输出为:

....
0_消息消费成功_78_AsyncProducer
0_消息消费成功_98_AsyncProducer
0_消息消费成功_99_AsyncProducer
0_消息消费成功_86_AsyncProducer
0_消息消费成功_87_AsyncProducer
0_消息消费成功_69_AsyncProducer
0_消息消费成功_9_AsyncProduce

单向发送

只负责发送,不管消息是否发送成功

适合收集日志,可靠性要求不高的场景

消费者:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class OnewayConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Oneway","*");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

发送者:

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class OnewayProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");

        for(int i=0;i<2;i++){
            Message message = new Message("Oneway", "OnewayTags", (i+"_OnewayProducer").getBytes(StandardCharsets.UTF_8));
            // 单向发送
            producer.send(message);
            System.out.println(i+"_消息发送成功");
        }

        producer.shutdown();

    }
}

消费者输出:

消费者启动成功
0_消息消费成功_0_OnewayProducer
0_消息消费成功_1_OnewayProduce

拉模式

随机获取一个queue的消息

生产者:

随便指定一种发送模式,这里选了异步发送

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/11
 */
public class LitePullProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException {
        DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");
        CountDownLatch countDownLatch = new CountDownLatch(100);

        for(int i=0;i<100;i++){
            final int index= i;

            Message message = new Message("LitePull", "LitePullTags", (i+"_LitePullProducer").getBytes(StandardCharsets.UTF_8));
            // 异步发送,回调方法由broker调用
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(index +"_消息发送成功_"+sendResult);
                    countDownLatch.countDown();
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.println(index +"_消息发送失败_"+throwable.getStackTrace());
                    countDownLatch.countDown();
                }
            });
        }

        // 主线程等待异步发送结束
        countDownLatch.await();
        System.out.println("发送结束");
        producer.shutdown();
    }
}

消费者:

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description 拉模式-随机获取一个queue的消息
 * @Date 2023/6/11
 */
public class LitePullConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("LitePullConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("LitePull","*");
        consumer.start();

        while(true){
            List<MessageExt> messageExts = consumer.poll();
            System.out.println("消息接收成功");
            messageExts.forEach(n->{
                System.out.println("消息消费成功_"+n);
            });
        }

    }
}

消费者输出:

可以看出一次拉出来的消息queueId都是一样的。

消息接收成功
消息接收成功
消息接收成功
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=199, queueOffset=0, sysFlag=0, bornTimestamp=1686479335622, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335660, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004DB87, commitLogOffset=318343, bodyCRC=44743681, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8C40001, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[54, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=199, queueOffset=1, sysFlag=0, bornTimestamp=1686479335622, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335661, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004DEA3, commitLogOffset=319139, bodyCRC=1669641829, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8C40007, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=2, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335661, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004DF6A, commitLogOffset=319338, bodyCRC=1570600805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D5000F, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 54, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=3, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335661, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004E032, commitLogOffset=319538, bodyCRC=852476292, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D50010, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 55, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=4, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335663, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004E4E1, commitLogOffset=320737, bodyCRC=924559879, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D5000B, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 53, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=5, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335666, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004E801, commitLogOffset=321537, bodyCRC=1481186534, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D50017, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 52, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=6, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335666, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004E991, commitLogOffset=321937, bodyCRC=1743184709, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D5001A, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[50, 56, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=7, sysFlag=0, bornTimestamp=1686479335636, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335667, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004EBE9, commitLogOffset=322537, bodyCRC=1446355043, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D4000A, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 50, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=8, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335667, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004F099, commitLogOffset=323737, bodyCRC=535165864, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D50014, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[50, 49, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=9, sysFlag=0, bornTimestamp=1686479335640, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335672, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004F869, commitLogOffset=325737, bodyCRC=1182517109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D80023, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[51, 55, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息接收成功
消息消费成功_MessageExt [brokerName=broker-a, queueId=1, storeSize=199, queueOffset=0, sysFlag=0, bornTimestamp=1686479335624, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335655, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004D86B, commitLogOffset=317547, bodyCRC=461876872, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25, UNIQ_KEY=7F5E0001F19018B4AAC23776D8C80008, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[56, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
......

指定一个queue的消息

生产者:

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/11
 */
public class LitePullProducerAssign {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException {
        DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");
        CountDownLatch countDownLatch = new CountDownLatch(100);

        for(int i=0;i<100;i++){
            final int index= i;

            Message message = new Message("LitePullAssign", "LitePullAssignTags", (i+"_LitePullProducerAssign").getBytes(StandardCharsets.UTF_8));
            // 异步发送,回调方法由broker调用
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(index +"_消息发送成功_"+sendResult);
                    countDownLatch.countDown();
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.println(index +"_消息发送失败_"+throwable.getStackTrace());
                    countDownLatch.countDown();
                }
            });
        }

        // 主线程等待异步发送结束
        countDownLatch.await();
        System.out.println("发送结束");
        producer.shutdown();
    }
}

消费者:

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description 拉模式-指定一个queue的消息
 * @Date 2023/6/11
 */
public class LitePullConsumerAssign {
    public static void main(String[] args) throws MQClientException {
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("LitePullConsumerAssign");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        Collection<MessageQueue> messageQueues = consumer.fetchMessageQueues("LitePullAssign");
        ArrayList<MessageQueue> messageQueues1 = new ArrayList<>(messageQueues);

        consumer.assign(messageQueues1);
        consumer.seek(messageQueues1.get(0),10);
        while(true){
            List<MessageExt> messageExts = consumer.poll();
            System.out.println("消息接收成功");
            messageExts.forEach(n->{
                System.out.println("消息消费成功_"+n);
            });
        }

    }
}

在检验的时候,需要生产者先把消息写到broker里面去,consumer才能拉取到,消费者输出如下:

消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=57, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F037, commitLogOffset=389175, bodyCRC=1241947329, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E491830029, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[52, 52, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=58, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F111, commitLogOffset=389393, bodyCRC=2065607606, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E49183002A, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[52, 51, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=59, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F2C5, commitLogOffset=389829, bodyCRC=1548273985, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E491830023, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[51, 55, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]
消息接收成功
消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=60, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F39F, commitLogOffset=390047, bodyCRC=322785486, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E49183002C, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[52, 54, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=61, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F479, commitLogOffset=390265, bodyCRC=1881692935, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E49183002E, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[52, 57, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]

顺序消息

生产者局部有序地发送到一个queue中,但多个queue之间是全局无序的。

生产者:通过MessageQueueSelector将消息有序地发送到同一个queue中。

package order;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/11
 */
public class OrderProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        // i的消息都是按顺序发送的
        for(int i=0;i<5;i++){
            for(int j=0;j<10;j++){
                Message message = new Message("Order","OrderTag",("order_"+i+"_step_"+j).getBytes(StandardCharsets.UTF_8));
                // MessageQueue分配
                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id=(Integer) arg;
                        int index= id%mqs.size();
                        return mqs.get(index); // 返回要发送到的MessageQueue
                    }
                }, i); // 传入的就是arg
                System.out.println("消息发送成功_"+sendResult);
            }
        }
        producer.shutdown();
    }
}

消费者:通过MessageListenerOrderly消费者每次读取消息都只从一个queue中获取。

package order;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/11
 */
public class OrderConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Order","*");

        // 顺序消费
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for(int i=0;i<msgs.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(msgs.get(i).getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

消费者输出:

外循环都是为3的时候,内循环按照顺序消费

消费者启动成功
0_消息消费成功_order_3_step_0
0_消息消费成功_order_3_step_1
0_消息消费成功_order_3_step_2
0_消息消费成功_order_3_step_3
0_消息消费成功_order_3_step_4
0_消息消费成功_order_3_step_5
0_消息消费成功_order_3_step_6
0_消息消费成功_order_3_step_7
0_消息消费成功_order_3_step_8
0_消息消费成功_order_3_step_9

如果改成并发消费,输出就乱了

消费者启动成功
0_消息消费成功_order_3_step_7
0_消息消费成功_order_3_step_9
0_消息消费成功_order_3_step_4
0_消息消费成功_order_3_step_8
0_消息消费成功_order_3_step_2
0_消息消费成功_order_3_step_5
0_消息消费成功_order_3_step_0
0_消息消费成功_order_3_step_6
0_消息消费成功_order_3_step_1
0_消息消费成功_order_3_step_3

广播消息

广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费横式。

  • MessageModel.BROADCASTING:广播消息。一条消息会发给所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组。
  • MessageModel.CLUSTERING:集群消息。每一条消息只会被同一个消费者组中的一个实例消费。

生产者:

选择了同步发送

package broadcast;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class BroadcastProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");

        for(int i=0;i<2;i++){
            Message message = new Message("Broadcast", "BroadcastTags", (i+"_BroadcastProducer").getBytes(StandardCharsets.UTF_8));
            // 同步发送
            SendResult sendResult = producer.send(message);
            System.out.println(i+"_消息发送成功"+sendResult);
        }

        producer.shutdown();

    }
}

消费者:

MessageModel.BROADCASTING:广播消息

package broadcast;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class BroadcastConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Broadcast","*");
        consumer.setMessageModel(MessageModel.BROADCASTING);

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

我这里开启了两个消费者

BroadcastConsumer输出:

消费者启动成功
0_消息消费成功_3_BroadcastProducer
0_消息消费成功_0_BroadcastProducer
0_消息消费成功_2_BroadcastProducer
0_消息消费成功_1_BroadcastProducer
0_消息消费成功_4_BroadcastProducer
0_消息消费成功_7_BroadcastProducer
0_消息消费成功_5_BroadcastProducer
0_消息消费成功_6_BroadcastProducer
0_消息消费成功_8_BroadcastProducer
0_消息消费成功_9_BroadcastProducer

BroadcastConsumer2输出:

消费者启动成功
0_消息消费成功_0_BroadcastProducer
0_消息消费成功_2_BroadcastProducer
0_消息消费成功_3_BroadcastProducer
0_消息消费成功_1_BroadcastProducer
0_消息消费成功_5_BroadcastProducer
0_消息消费成功_4_BroadcastProducer
0_消息消费成功_6_BroadcastProducer
0_消息消费成功_7_BroadcastProducer
0_消息消费成功_8_BroadcastProducer
0_消息消费成功_9_BroadcastProducer

MessageModel.CLUSTERING:集群消息

消费者改下:

package broadcast;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class BroadcastConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Broadcast","*");
        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

我这里开启了两个消费者

BroadcastConsumer输出:

消费者启动成功
0_消息消费成功_0_BroadcastProducer
0_消息消费成功_4_BroadcastProducer
0_消息消费成功_8_BroadcastProducer
0_消息消费成功_3_BroadcastProducer
0_消息消费成功_7_BroadcastProducer

BroadcastConsumer2输出:

消费者启动成功
0_消息消费成功_0_BroadcastProducer
0_消息消费成功_4_BroadcastProducer
0_消息消费成功_8_BroadcastProducer

每个消费者消费了部分

延迟消息

消息在producer.send以后不会立刻发出去,而是会在producer端延迟一段时间

生产者:

package schedule;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.time.LocalTime;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/12
 */
public class ScheduleProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");

        for(int i=0;i<2;i++){
            Message message = new Message("Schedule", "ScheduleTags", (i+"_ScheduleProducer").getBytes(StandardCharsets.UTF_8));
            // 延迟发送,单向发送
            message.setDelayTimeLevel(2); // 延迟5s
            producer.send(message);
            System.out.println(i+"_消息发送成功"+ LocalTime.now());
        }

        producer.shutdown();

    }
}

消费者:

package schedule;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.time.LocalTime;
import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/12
 */
public class ScheduleConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduleConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Schedule","*");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+ LocalTime.now());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

假设消费者可以立刻接收到生产者已经发出的消息

生产者输出:

生产者启动成功
0_消息发送成功22:41:44.096
1_消息发送成功22:41:44.102
22:41:44.148 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
22:41:44.151 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:10911] result: true

消费者输出:

消费者启动成功
0_消息消费成功_22:41:49.165
0_消息消费成功_22:41:49.165

批量消息

批量消息是指将多条消息合井成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。

批量消息的使用限制:

  • 消息大小不能超过4M,虽然源码注释不能超1M,但是实际使用不超过4M即可。平衡整体的性能,建议保持1M左右。
  • 相同的Topic
  • 相同的waitStoreMsgOK
  • 不能是延迟消息、事务消息等

生产者:

其实就是producer.send一个ArrayList

package batch;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class BatchProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");

        ArrayList<Message> list=new ArrayList<>();
        for(int i=0;i<10;i++){
            Message message = new Message("Batch", "BatchTags", (i+"_BatchProducer").getBytes(StandardCharsets.UTF_8));
            list.add(message);
        }
        // 同步发送,批量发送
        SendResult sendResult=producer.send(list);
        System.out.println("消息发送成功_"+sendResult);
        producer.shutdown();

    }
}

消费者:

package batch;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class BatchConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Batch","*");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

消费者输出:

消费者启动成功
0_消息消费成功_0_BatchProducer
0_消息消费成功_2_BatchProducer
0_消息消费成功_1_BatchProducer
0_消息消费成功_3_BatchProducer
0_消息消费成功_4_BatchProducer
0_消息消费成功_5_BatchProducer
0_消息消费成功_6_BatchProducer
0_消息消费成功_7_BatchProducer
0_消息消费成功_8_BatchProducer
0_消息消费成功_9_BatchProducer

过滤消息

  • 过滤消息是在broker端进行的,broker服务会比较繁忙,consumer是将过滤条件推给broker端的。
  • 只有推模式可以使用SQL过滤

Tag过滤

消费者:

package filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description 过滤消息消费者-Tag方式
 * @Date 2023/6/13
 */
public class FilterTagConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅指定Tag进行消息过滤
        consumer.subscribe("Filter","TagA || TagC");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

生产者:

package filter;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * @Author jiangxuzhao
 * @Description 过滤消息生产者-Tag方式
 * @Date 2023/6/13
 */
public class FilterTagProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");
        String[] Tags=new String[]{"TagA","TagB","TagC"};

        for(int i=0;i<10;i++){
            // 指定tag
            Message message = new Message("Filter", Tags[i%Tags.length], (Tags[i%Tags.length]+"_FilterTagProducer").getBytes(StandardCharsets.UTF_8));
            // 同步发送
            SendResult sendResult = producer.send(message);
            System.out.println(i+"_消息发送成功"+sendResult);
        }

        producer.shutdown();

    }
}

消费者输出:

接收地异常慢

消费者启动成功
0_消息消费成功_TagA_FilterTagProducer
.....

sql过滤

消费者:

package filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description 过滤消息消费者-Sql方式
 * @Date 2023/6/13
 */
public class FilterSqlConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // MessageSelector.bySql过滤消息
        consumer.subscribe("Filter", MessageSelector.bySql("TAGS is not null and TAGS in ('TagA', 'TagC')"+
                "and jiangxuzhao is not null and jiangxuzhao between 0 and 3"));

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

生产者:

package filter;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * @Author jiangxuzhao
 * @Description 过滤消息生产者-Sql方式
 * @Date 2023/6/13
 */
public class FilterSqlProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");
        String[] Tags=new String[]{"TagA","TagB","TagC"};

        for(int i=0;i<10;i++){
            // 指定tag
            Message message = new Message("Filter", Tags[i%Tags.length], (Tags[i%Tags.length]+"_"+i+"_FilterTagProducer").getBytes(StandardCharsets.UTF_8));
            // 在消息中放入过滤属性
            message.putUserProperty("jiangxuzhao",String.valueOf(i));
            // 同步发送
            SendResult sendResult = producer.send(message);
            System.out.println(Tags[i%Tags.length]+"_jiangxuzhao:"+i+"_消息发送成功"+sendResult);
        }

        producer.shutdown();

    }
}

消费者输出:还是很慢

消费者启动成功
0_消息消费成功_TagA_0_FilterTagProducer
0_消息消费成功_TagA_3_FilterTagProducer

事务消息

主要和生产者有关系。

事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。

在这里插入图片描述

事务消息机制的关键是在发送消息时会将消息转为一个half半消息,井存入RocketMQ内部的一个Topic(RMQ_ SYS_ TRANS_ HALF_ TOPIC),这个Topic对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。

  • 事务消息不支持延迟消息和批量消息。
  • 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的transactionCheckMax参数来修改此限制,如果已经检查某条消息超过N次的话 (N = transactionCheckMax)则 Broker将丢弃此消息,井在默认情況下同时打印错误日志。可以通过重写AbstractTransactionCheckListener类来修改这个行为。
  • 事务性消息可能不止一次被检查或消费。

生产者:

package transaction;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/14
 */
public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        // 事务消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("Transaction");
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 异步提交事务状态,提升性能
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("thread-jiangxuzhao");
                return thread;
            }
        });
        producer.setExecutorService(pool);
        // 本地事件监听器,与想要发送的Tag挂钩
        producer.setTransactionListener(new TransactionListenerImpl());
        producer.start();
        System.out.println("生产者启动成功");

        String[] Tags=new String[]{"TagA","TagB","TagC","TagD","TagE"};
        for(int i=0;i<10;i++){
            // 指定tag
            Message message = new Message("Transaction", Tags[i%Tags.length], (Tags[i%Tags.length]+"_TransactionProducer").getBytes(StandardCharsets.UTF_8));
            // 事务消息发送
            TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
            System.out.println(i+"_消息发送成功"+transactionSendResult);
        }

        // 主线程等待异步的线程回查都结束
        Thread.sleep(100000);

        producer.shutdown();
    }
}

消费者:

package transaction;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/14
 */
public class TransactionConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Transaction","*");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

本地事件监听者:

package transaction;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/14
 */
public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String tag = msg.getTags();
        if (StringUtils.contains("TagA",tag)) {
            return LocalTransactionState.COMMIT_MESSAGE; // 直接提交
        }
        if (StringUtils.contains("TagB",tag)) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else {
            return LocalTransactionState.UNKNOW;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String tag = msg.getTags();
        if (StringUtils.contains("TagC",tag)) {
            return LocalTransactionState.COMMIT_MESSAGE; // 直接提交
        }
        if (StringUtils.contains("TagD",tag)) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else {
            return LocalTransactionState.UNKNOW;
        }
    }
}

消费者输出:

TagA、TagC的消息是消费成功的

消费者启动成功
0_消息消费成功_TagA_TransactionProducer
0_消息消费成功_TagA_TransactionProducer
0_消息消费成功_TagC_TransactionProducer
0_消息消费成功_TagC_TransactionProducer
0_消息消费成功_TagC_TransactionProducer

RocketMQ常见问题

RocketMQ如何保证消息不丢失?

在这里插入图片描述
在这里插入图片描述

我们将消息流程分为三大部分,每一部分都有可能会丟失数据。

  • 生产阶段:Producer通过网络将消息发送给Broker,这个发送可能会发生丢失。比如网络延迟不可达等。
  • 存储阶段:Broker肯定是先把消息放到内存的,然后根据刷盘策路持久化到硬盘中,刚收到Producer的消息,放入内存,但是异常宕机了,导致消息丢失。
  • 消费阶段:消费失败。比如先提交ack再消费,消费过程中出现异常,该消息就出现了丢失。

解决方案:

  • 生产阶段:使用同步发送失败重试机制;异步发送重写回调方法检查发送结果;Ack确认机制。
  • 存储阶段:同步刷盘机制;集群模式采用同步复制。
  • 消费阶段:正常消费处理完成才提交ACK;如果处理异常返回重试标识。

RocketMQ的消息持久化机制

RocketMQ的消息持久化机制是指将消息存储在磁盘上,以确保消息能够可靠地存储和检素。RocketMQ 的消息持久化机制涉及到以下三个角色:Commitlog、ConsumeQueue 和 IndexFile.

  • CommitLog:消息真正的存储文件,所有的消息(所有messageQueue的消息)都存在 CommitLog文件中。

RocketMQ默认会将消息数据先存储到内存中的一个缓冲区,每当缓冲区中积累了一定量的消息或者一定时间后,就会将缓冲区中的消息批量写入到磁盘上的CommitLog 文件中(同步刷盘写入cache,继而直接写入磁盘;异步刷盘写入cache,就不管写入磁盘了)。消息在写入 CommitLog 文件后就可以被消费者消费了。

Commitlog文件的大小固定1G,写满之后生成新的文件。

并且采用的是顺序写的方式。

  • ConsumeQueue: 消息消费逻辑队列,类似数据库的索引文件。

RocketMQ 中每个主题下的每个消息队列都会对应一个 ConsumeQueue。 ConsumeQueue存储了消息的offset以及该offset对应的消息在CommitLog文件中的位置信息,便于消费者快速定位井消费消息。

每个ConsumeQueue文件固定由30万个固定大小20byte的数据块组成:据块的内容包括:commitLogOffset(8byte,消息在commitlog文件中的位置)+msgSize(4byte,消息在文件中占用的长度)+msg TagCode(8byte, 消息的tag的Hash值)。

  • IndexFile:消息索引文件,主要存储消息key与offset的对应关系,提升消息检索速度。

如果生产者在发送消息时设置了消息Key,那么RocketMQ会将消息Key值和CommitLog Offset存在IndexFie文件中,这样当消费者需要根据消息Key查询消息时,就可以直接在IndexFile文件中查找对应的CommitLog Offset,然后通过 ConsumeQueue文件快速定位并消费消息。

IndexFile文件大小固定400M,可以保存2000W个索引。

三个角色构成的消息存储结构如下:
在这里插入图片描述

RocketMQ如何保证消息顺序

RocketMQ架构本身是无法保证消息有序的,但是提供了相应的API保证消息有序消费。RocketMQ APl利用FIFO先进先出的特性,保证生产者消息有序进入同一队列,消费者在同一队列消费就能达到消息的有序消费。

  • 使用MessageQueueSelector编写有序消息生产者

有序消息生产者会按照一定的规则将消息发送到同一个队列中,从而保证同一个队列中的消息是有序的。RocketMQ 并不保证整个主题内所有队列的消息都是按照发送顺序排列的。

  • 使用MessageListenerOrderly进行顺序消费与之对应的MessageListenerConcurrently并行消费 (push模式)

MessageListenerOrderly是RocketMQ 专门提供的一种顺序消费的接口,它可以让消费者按照消息发送的顺序,一个一个地处理消息。这个接口支持按照消息的重试次数进行顺序消费、订单ID等作为消息键来实现顺序消费、批量消费等操作。

通过加锁的方式实现(有超时机制),一个队列同时只有一个消费者;井且存在一个定时任务,每隔一段时间就会延长锁的时间,直到整个消息队列全部消费结束。

  • 消费端自己保证消息顺序消费(pull模式)

  • 消费者井发消费时设置消费线程为1

RocketMQ 的消费者可以开启多个消费线程同时消费同一个队列中的消息,如果要保证消息的顺序,需要将消费线程数设置为1。这样,在同一个队列中,每个消息只会被单个消费者线程消费,从而保证消息的顺序性

RocketMQ事务消息原理

RocketMQ 的事务消息是一种保证消息可靠性的机制。在RocketMQ中,事务消息的实现原理主要是通过两个发送阶段和一个确认阶段来实现的。(个人理解就是分布式事务中的两阶段提交的实现)

  • 发送消息的预处理阶段:在发送事务消息之前,RocketMQ 会将消息的状态设置为 "Preparing”,并将消息存储到消息存储库中。
  • 执行本地事务:当预处理阶段完成后,消息发送者需要执行本地事务,并返回执行结果 (commit 或 rollback)
  • 消息的二次确认阶段:根据本地事务的执行结果,如果是commit,则 RocketMQ 将消息的状态设置为 “Committing”:否则将消息的状态设置为 “Rollback”
  • 完成事务:最后在消息的消费者消费该消息时,RocketMQ 会根据消息的状态来决定是否提交该消息。如果消息的状态是“Commiting”,则直接提交该消息;否则
    忽略该消息。

需要注意的是,如果在消息发送的过程中出现异常或者网络故降等问题,RocketMQ 会触发消息回查机制。在回查过程中,RocketMQ 会调用消息发送方提供的回查接口来确认事务的提交状态,从而解决消息投递的不确定性。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/648487.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Day07 Python函数详解

文章目录 第四章 Python函数使用4.1. 函数介绍4.2. 函数的定义与使用4.2.1. 函数的定义4.2.2. 调用 4.3. 函数的参数4.4. 函数的返回值4.4.1. 返回值介绍4.4.2. None类型 4.5. 函数说明4.5.1. 函数注释4.5.2. 函数的4中定义方式4.5.3. 函数的调用 4.6. 函数的嵌套调用4.7. 函数…

哪款 IMG BXS GPU 适合您的汽车?

Imagination 是汽车行业领先的图形处理器供应商。Imagination 的 GPU IP 经过了九代更新迭代&#xff0c;为车辆提供了舒适性和安全性&#xff0c;在汽车行业的总出货量接近 5 亿。通过将响应迅速的 HMI&#xff08;人机界面&#xff09;与功能日益强大的高级驾驶员辅助系统相结…

Linux:主机状态监控

查看系统的资源占用 可以通过top命令&#xff0c;查看系统CPU、内存使用情况 top命令内容详解&#xff1a; 第一行&#xff1a;top&#xff1a;命令名称&#xff0c;10.49.16&#xff1a;当前系统时间&#xff0c;up 4:40&#xff1a;启动了4小时40分&#xff0c;4 users&#…

Aspose.Pdf使用教程:为PDF文件添加swf注释

Aspose.PDF 是一款高级PDF处理API&#xff0c;可以在跨平台应用程序中轻松生成&#xff0c;修改&#xff0c;转换&#xff0c;呈现&#xff0c;保护和打印文档。无需使用Adobe Acrobat。此外&#xff0c;API提供压缩选项&#xff0c;表创建和处理&#xff0c;图形和图像功能&am…

STL之priority_queue与仿函数

目录 一.仿函数1.介绍2.示例 二.priority_queue1.介绍2.成员函数3.模拟实现4.使用 三.其他1.typename Container::value_type 一.仿函数 1.介绍 函数对象&#xff0c;又称仿函数&#xff0c;是可以像函数一样使用的对象&#xff0c;其原理就是重载了函数调用符&#xff1a;()…

浅谈数据中台之标签管理平台

在现如今的大数据时代&#xff0c;相信大家一定了解或者听说过下列几个场景&#xff1a; 购物APP&#xff1a;千人千面&#xff0c;意思不同用户使用相关的产品感觉是不一样的&#xff0c;不同用户看到的购物APP首页推荐内容和其他相关推荐流信息可能是完全不同的。 社交APP&…

实例:使用网络分析仪进行电缆测试

本应用测试针对非标称50Ω的线缆&#xff0c;包括同轴、双绞线、差分高速数据线的测试&#xff0c;包括阻抗参数、S参数&#xff08;插损、驻波、Smith图等等&#xff09;&#xff0c;也可以绘制眼图。 根据电缆的性能&#xff0c;如频率范围、长度、是否差分&#xff0c;设置…

Linux:root用户

root用户对Linux系统拥有最大的操作权限。 普通用户的权限一般都在home目录下&#xff0c;超过home目录后&#xff0c;普通用户在很多地方只有只读和执行的权限&#xff0c;但没有修改权限。 1、su命令&#xff1a;切换到root用户的命令语法&#xff1a; su -root “-”符号是可…

精密空调监控:不会这个技巧,千万不要尝试

随着科技的不断进步和信息化的发展&#xff0c;精密空调设备被广泛应用于数据中心、通信基站、医疗设施、实验室等对温度和湿度要求严格的环境中&#xff0c;以保证设备的正常运行和数据的安全性。 借助动环监控系统&#xff0c;精密空调可以实时了解设备的运行状态、温湿度的变…

数据库迁移 | Oracle数据迁移方案之技术两三点

今年Oracle似乎又火了&#xff0c;火得要下掉&#xff0c;目前中国大概有240数据库企业&#xff0c;在国产信创的大趋势下&#xff0c;一片欣欣向荣&#xff0c;国库之春已然来临。到今天为止&#xff0c;Oracle依旧是市场份额最大的数据库&#xff0c;天下苦秦久矣&#xff0c…

关于使用keil瑞萨RA4M2踩过的坑

一、之前在rasc添加的组件不能删除。 下面在rasc添加ThreadX&#xff0c;不只是RTOS&#xff0c;其他组件也出现这种情况。 当去掉组件不使用&#xff0c;重新配置。但是组件还是显示在软件包&#xff0c;导致编译出错。 解决方式&#xff0c;自己琢磨发现&#xff1a; 找到工…

腾讯视频技术团队偷懒了?!

&#x1f449;腾小云导读 PC Web 端、手机 H5 端、小程序端、App 安卓端、App iOS 端......在多端时代&#xff0c;一个应用往往需要支持多端。若每个端都独立开发一套系统来支持&#xff0c;将消耗巨大的人力和经费&#xff01;腾讯视频团队想到一个“偷懒”的方法——能不能只…

共建智慧工厂物联网平台方案 | 6.10 IoTDB X EMQ 主题 Meetup 回顾

6 月 10 日&#xff0c;IoTDB X EMQ 智慧工厂主题 Meetup 在深圳成功举办。工业物联网时序数据库研发商天谋科技、物联网数据基础设施软件供应商 EMQ 的两位技术大牛&#xff0c;针对多行业制造流程中数据传输、故障感知、决策执行等常见难题&#xff0c;通过数据基础设施平台的…

更智能、更强大:OpenAI发布升级版gpt-3.5-turbo-0613/16k速度提升,长度飙升4倍

OpenAI开发者平台最近推出了两个引人注目的GPT升级版本&#xff1a;gpt-3.5-turbo-0613和gpt-3.5-turbo-16k。这些新版本带来了一系列令人兴奋的功能和增强&#xff0c;为开发者提供了更加灵活和强大的自然语言处理工具。本文将为您介绍这两个版本的主要特点和优势。 gpt-3.5-t…

vue使用外部字体自定义LCD字体(晶管体)

大屏监控中常用到液晶字体效果&#xff0c;如下图所示&#xff1a; 一、下载字体格式 1、下载地址【Techno > LCD fonts | dafont.com】 二、解压字体 1、下载后&#xff0c;解压后都是.ttf文件&#xff0c;在Font Squirrel &#xff08;这个地址打开&#xff0c;直接可以…

Multi-headed Self-attention(多头自注意力)机制介绍

对于输入的序列 来说&#xff0c;与RNN/LSTM的处理过程不同&#xff0c;Self-attention机制能够并行对进行计算&#xff0c;这大大提高了对特征进行提取&#xff08;即获得&#xff09;的速度。结合上述Self-attention的计算过程&#xff0c;并行计算的原理如下图所示&#xff…

储存卡格式化,分享3个正确方法!

Dam是个摄影师&#xff0c;经常使用储存卡存储各种照片、视频。正好他明天又要出外景&#xff0c;但害怕内存不够&#xff0c;想把储存卡格式化&#xff0c;又担心自己操作失误。因此求助如何正确格式化储存卡。 储存卡为我们存储文件等带来了诸多便利。有时候&#xff0c;我们…

无人机上仅使用CPU实时运行Yolov5(OpenVINO实现)(下篇)

​上期中我们讲了Yolov5的前两节环境配置及简单运行&#xff0c;在本期中我们带来后面两节在不同处理器下的实验数据及如何训练自己的模型。​ 三、在不同处理器上的延迟与效果 为了查看Yolov5在不同设备上的延迟与效果&#xff0c;下面我们对Inter的i3、i5、i7三种处理器在同…

「深度学习之优化算法」笔记(三)之粒子群算法

1. 粒子群算法简介 粒子群算法&#xff08;Particle Swarm Optimization,PSO&#xff09;是一种模仿鸟群、鱼群觅食行为发展起来的一种进化算法。其概念简单易于编程实现且运行效率高、参数相对较少&#xff0c;应用非常广泛。粒子群算法于1995年提出&#xff0c;距今&#xff…

新世界-旧世界

以下内容是这两天朋友问答形成的一些观点&#xff0c;堆成一篇文章。看似没有关联性&#xff0c;但你仔细品味&#xff0c;你会感觉到它们其实讲的是一个事。至于是一个啥事&#xff0c;我不说&#xff0c;你们自己猜。 &#xff08;1&#xff09; 今年年初看见篇文章&#xff…