RocketMQ详解+实战+常见面试题

news2025/1/12 10:53:43

目录

RocketMQ是什么?

RocketMQ的主要特点包括

RocketMQ 的消息模型 

队列模型/点对点

发布/订阅模型

RocketMQ 的消息模型

RocketMQ中重要的组件

RocketMQ消费模式

RoctetMQ基本架构

springboot实战+讲解

基本样例

单向发送

异步发送

同步发送

推模式

拉模式-随机获取一个queue

拉模式-指定获取一个queue

顺序消息

顺序消息生产者

顺序消息消费者

广播消息

广播消息消费模式

延迟消息

预定日程生产者

预定日程消费者

批量消息

批量消息

分批发送批量消息

过滤消息

过滤消息-tag过滤消费者

过滤消息-SQL过滤生产者

过滤消息-SQL过滤消费者

事务消息

事务消息生产者

本地事务监听器

RocketMQ使用中常见的问题

RocketMQ如何保证消息顺序

RocketMQ的事务消息原理


RocketMQ是什么?

RocketMQ是一种开源的分布式消息中间件系统,最初由阿里巴巴开发并开源。它是一种快速、可靠、可伸缩的消息队列系统,旨在解决分布式系统中的异步通信和数据传输问题。

RocketMQ的主要特点包括

1. 高吞吐量:RocketMQ具有出色的性能表现,能够处理大规模消息的传输和处理。它通过支持并行化和异步传输等机制来实现高吞吐量。

2. 可靠性:RocketMQ在消息传输过程中提供了可靠性保证。它采用了主从复制的方式来确保消息的高可用性,并提供了严格的消息顺序传输保证。

3. 分布式支持:RocketMQ是为分布式系统设计的,它可以在多个节点上进行部署,并支持横向扩展。它提供了自动的负载均衡和故障转移机制,以实现高可用性和可伸缩性。

4. 灵活的消息模型:RocketMQ支持多种消息模型,包括发布/订阅模型和点对点模型。这使得开发人员可以根据实际需求选择适合的消息传输方式。

5. 实时性:RocketMQ具有低延迟和高实时性的特点,适用于需要快速响应的场景,如实时数据分析、流式处理等。

6. 可管理性:RocketMQ提供了可视化的管理工具和监控指标,方便管理员对消息队列进行配置、监控和管理。

7. 丰富的特性:RocketMQ还提供了许多其他功能,如事务消息、延迟消息、批量消息发送、消息过滤等,以满足不同场景下的需求。

总的来说,RocketMQ是一个功能强大的消息中间件系统,具备高吞吐量、可靠性、分布式支持和灵活的消息模型等特点,适用于构建可靠的分布式系统和实时数据处理系统。

RocketMQ 的消息模型 

RocketMQ的消息模型是基于发布/订阅模式和点对点模式的混合模型。

消息队列有两种消息模型分别是队列模型(点对点)和发布/订阅模型。

队列模型/点对点

队列模型是最开始的一种消息队列模型,对应着消息队列“发-存-收”的模型。生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者,但是消费者之间是竞争关系,也就是说每条消息只能被一个消费者消费。

发布/订阅模型

如果需要将一份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。很显然,队列模型无法满足这个需求。于是,发布/订阅模型就诞生了。

在发布-订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。

RocketMQ 的消息模型

RocketMQ中重要的组件

  1. Producer(生产者):Producer负责将消息发送到RocketMQ的Broker,它将消息封装成消息对象并指定消息的主题(Topic)。生产者可以根据需要选择同步发送或异步发送消息。
  2. Consumer(消费者):Consumer从RocketMQ的Broker订阅消息,并对消息进行消费处理。消费者可以按照订阅的主题(Topic)和标签(Tag)来过滤需要的消息。
  3. Broker(消息代理):Broker是RocketMQ的核心组件,负责存储和转发消息。Broker接收生产者发送的消息,并将其存储在内部的存储引擎中,等待消费者订阅并消费。Broker还负责处理消费者的消费进度和消息的复制和同步。
  4. Name Server(命名服务器):Name Server是RocketMQ集群的管理节点,它负责管理和维护整个RocketMQ集群的元数据信息。Producer和Consumer通过Name Server来发现Broker的位置和状态信息。
  5. Topic(主题):主题是消息的逻辑分类,Producer将消息发送到特定的主题,而Consumer可以订阅并消费特定的主题。一个主题可以有多个消息队列,每个消息队列在一个Broker上。
  6. Message Queue(消息队列):消息队列是RocketMQ中消息的存储和传输单位。每个主题可以被分为多个消息队列,每个消息队列在一个Broker上,可以并行地处理消息。
  7. Consumer Group(消费者组):消费者组是一组具有相同消费逻辑的Consumer实例的集合。在一个消费者组中,每个消息队列只能由一个Consumer实例进行消费,但一个Consumer组可以有多个Consumer实例,从而实现负载均衡和高可用性。
  8. Offset:在Topic的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要RocketMQ为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。也可以这么说,Queue 是一个长度无限的数组,Offset 就是下标。

RocketMQ消费模式

消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)。

默认情况下就是集群消费,这种模式下一个消费者组共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。

而广播消费消息会发给消费者组中的每一个消费者进行消费。

RoctetMQ基本架构

 RocketMQ 一共有四个部分组成:NameServer,Broker,Producer 生产者,Consumer 消费者,它们对应了:发现、发、存、收,为了保证高可用,一般每一部分都是集群部署的。

springboot实战+讲解

实战之前,我们需要先搭建一个基于Maven的springboot项目,只需要加入以下依赖:

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.6</version>
        </dependency>

基本样例

消息生产者分别通过三种方式发送消息:

  • 同步发送:等待消息返回后再继续进行下面的操作。
  • 异步发送:不等待消息返回直接进入后续流程。broker将结果返回后调用callback函数,并使用CountDownLatch计数。
  • 单向发送:只负责发送,不管消息是否发送成功。

单向发送

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;

/**
* 单向发送
*/
public class OnewayProducer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message message = new Message("Simple","TagA", "Simple-Oneway".getBytes(StandardCharsets.UTF_8));
            producer.sendOneway(message);
            System.out.printf("%d 消息发送完成 %n" , i);
        }
        Thread.sleep(5000);
        producer.shutdown();
    }
}

异步发送

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* 异步发送
*/
public class AsyncProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        CountDownLatch countDownLatch = new CountDownLatch(100);//计数
        for (int i = 0; i < 10; i++) {
            Message message = new Message("Simple", "TagA", "Simple-Async".getBytes(StandardCharsets.UTF_8));
            final int index = i;
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    countDownLatch.countDown();
                    System.out.printf("%d 消息发送成功%s%n", index, sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    countDownLatch.countDown();
                    System.out.printf("%d 消息失败%s%n", index, throwable);
                    throwable.printStackTrace();
                }
            }
                         );
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

同步发送

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * 同步发送
 */
public class SyncProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 2; i++) {
            Message msg = new Message("Simple", //主题
                    "TagA",  //设置消息Tag,用于消费端根据指定Tag过滤消息。
                    "Simple-Sync".getBytes(StandardCharsets.UTF_8) //消息体。
            );
            SendResult send = producer.send(msg);
            System.out.printf(i + ".发送消息成功:%s%n", send);
        }
        producer.shutdown();
    }
}

消费者消费消息分两种:

  • 拉模式:消费者主动去Broker上拉取消息。
  • 推模式:消费者等待Broker把消息推送过来。

推模式

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
* 推模式
*/
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
        pushConsumer.setNamesrvAddr("127.0.0.1:9876");
        pushConsumer.subscribe("Simple","*");
        pushConsumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach( n->{
                    System.out.printf("收到消息: %s%n" , n);
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        pushConsumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

拉模式-随机获取一个queue

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 拉模式-随机获取一个queue
 */
public class PullLiteConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("SimpleLitePullConsumer");
        litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
        litePullConsumer.subscribe("Simple","");
        litePullConsumer.start();
        while (true) {
            List<MessageExt> poll = litePullConsumer.poll();
            System.out.printf("消息拉取成功 %s%n" , poll);
        }
    }
}

拉模式-指定获取一个queue

public class PullLiteConsumerAssign {
    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("SimpleLitePullConsumer");
        litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
        litePullConsumer.start();
        Collection<MessageQueue> messageQueues = litePullConsumer.fetchMessageQueues("Simple");
        List<MessageQueue> list = new ArrayList<>(messageQueues);
        litePullConsumer.assign(list);
        litePullConsumer.seek(list.get(0), 10);
        try {
            while (true) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s %n", messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}

顺序消息

顺序消息指生产者局部有序发送到一个queue,但多个queue之间是全局无序的。

  • 顺序消息生产者样例:通过MessageQueueSelector将消息有序发送到同一个queue中。
  • 顺序消息消费者样例:通过MessageListenerOrderly消费者每次读取消息都只从一个queue中获取(通过加锁的方式实现)。

顺序消息生产者

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
 * 顺序消息生产者
 * 通过MessageQueueSelector将消息有序发送到同一个queue中。
 */
public class OrderProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int j = 0; j < 5; j++) {
            for (int i = 0; i < 10; i++) {
                Message message = new Message("OrderTopic", "TagA",
                        ("order_" + j + "_step_" + i).getBytes(StandardCharsets.UTF_8));
                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                        Integer id = (Integer) o;
                        int index = id % list.size();
                        return list.get(index);
                    }
                }, j,30000);
                System.out.printf("%s%n", sendResult);
            }
        }
        producer.shutdown();
    }
}

顺序消息消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 顺序消息消费者
 * 通过MessageListenerOrderly消费者每次读取消息都只从一个queue中获取(通过加锁的方式实现)。
 */
public class OrderConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("OrderTopic","*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                list.forEach(n->{
                    System.out.println("QueueId:"+n.getQueueId() + "收到消息内容 "+new String(n.getBody()));
                });
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

广播消息

广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。

  • MessageModel.BROADCASTING:广播消息。一条消息会发给所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组。
  • MessageModel.CLUSTERING:集群消息。每一条消息只会被同一个消费者组中的一个实例消费。

广播消息消费模式

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * 广播消息消费模式
 * 广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。
 * ● MessageModel.BROADCASTING:广播消息。一条消息会发给所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组。
 * ● MessageModel.CLUSTERING:集群消息。每一条消息只会被同一个消费者组中的一个实例消费。
 */
public class BroadcastConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BroadCastConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Simple","*");
        consumer.setMessageModel(MessageModel.BROADCASTING); //广播模式
//        consumer.setMessageModel(MessageModel.CLUSTERING);//集群模式
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach(n->{
                    System.out.println("QueueId:"+n.getQueueId() + "收到消息内容 "+new String(n.getBody()));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

延迟消息

延迟消息实现的效果就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。

  • message.setDelayTimeLevel(3):预定日常定时发送。1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;可以在dashboard中broker配置查看。
  • msg.setDelayTimeMs(10L):指定时间定时发送。默认支持最大延迟时间为3天,可以根据broker配置:timerMaxDelaySec修改。(5.0才有,这里不做展示了)

预定日程生产者

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.time.LocalTime;

/**
 * 预定日程定时发送
 */
public class ScheduleProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ScheduleProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 2; i++) {
            Message msg = new Message("Schedule", //主题
                    "TagA",  //设置消息Tag,用于消费端根据指定Tag过滤消息。
                    "ScheduleProducer".getBytes(StandardCharsets.UTF_8) //消息体。
            );
            //1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
            msg.setDelayTimeLevel(3);
            producer.send(msg,30000);
            System.out.printf(i + ".发送消息成功:%s%n", LocalTime.now());
        }
        producer.shutdown();
    }
}

预定日程消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.List;

/**
 * 预定日程消费者
 */
public class ScheduleConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
        pushConsumer.setNamesrvAddr("127.0.0.1:9876");
        pushConsumer.subscribe("Schedule","*");
        pushConsumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach( n->{
                    System.out.printf("接收时间:%s %n", LocalTime.now());
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        pushConsumer.start();
        System.out.printf("Simple Consumer Started.%n");
    }
}

批量消息

批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。

批量消息的使用限制:

  • 消息大小不能超过4M,虽然源码注释不能超1M,但是实际使用不超过4M即可。平衡整体的性能,建议保持1M左右。
  • 相同的Topic,
  • 相同的waitStoreMsgOK
  • 不能是延迟消息、事务消息等

批量消息

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;

/**
 * 批量发送消息
 */
public class BatchProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        ArrayList<Message> list = new ArrayList<>();
        list.add(new Message("simple","TagA", "BatchProducer0".getBytes(StandardCharsets.UTF_8)));
        list.add(new Message("simple","TagA", "BatchProducer1".getBytes(StandardCharsets.UTF_8)));
        list.add(new Message("simple","TagA", "BatchProducer2".getBytes(StandardCharsets.UTF_8)));
        SendResult send = producer.send(list);
        System.out.printf(".发送消息成功:%s%n", send);
        producer.shutdown();
    }
}

分批发送批量消息

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * 分批批量发送消息
 * 注意修改SIZE_LIMIT为 = 10 * 1000,不然发送消息时会提示消息体积过大
 */
public class SplitBatchProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SplitBatchProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        ArrayList<Message> list = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            list.add(new Message("simple","TagA", ("SplitBatchProducer"+i).getBytes(StandardCharsets.UTF_8)));
        }
        ListSplitter splitter = new ListSplitter(list);
        while (splitter.hasNext()) {
            List<Message> listItem = splitter.next();
            SendResult sendResult = producer.send(listItem,30000);
            System.out.printf(".发送消息成功:%s%n", sendResult);
        }
        producer.shutdown();
    }
}

class ListSplitter implements Iterator<List<Message>> {
    private static final int SIZE_LIMIT = 10 * 1000;  // 每个消息批次的最大大小
    private final List<Message> messages;	// 待发送的消息列表
    private int currIndex; // 当前拆分到的位置

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20;
            // 如果超过了单个批次所允许的大小,就将此消息之前的消息作为下一个子列表返回
            if (tmpSize > SIZE_LIMIT) {
                // 如果是第一条消息就超出大小限制,就跳过这条消息再继续扫描
                if (nextIndex - currIndex == 0) {
                    nextIndex++;
                }
                break;
            }
            // 如果当前子列表大小已经超出所允许的单个批次大小,那么就暂停添加消息
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        // 返回从currIndex到nextIndex之间的所有消息
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

过滤消息

使用Tag方式过滤(通过consumer.subscribe("TagFilterTest", "TagA || TagC")实现):

在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * 过滤消息-tag过滤生产者
 */
public class TagFilterProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        String[] tags = new String[] {"TagA","TagB","TagC"};
        for (int i = 0; i < 15; i++) {
            Message msg = new Message("FilterTopic", //主题
                    tags[i % tags.length],  //设置消息Tag,用于消费端根据指定Tag过滤消息。
                    ("TagFilterProducer_"+tags[i % tags.length]).getBytes(StandardCharsets.UTF_8) //消息体。
            );
            SendResult send = producer.send(msg);
            System.out.printf(i + ".发送消息成功:%s%n", send);
        }
        producer.shutdown();
    }
}

过滤消息-tag过滤消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 过滤消息-tag过滤消费者
 */
public class TagFilterConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
        pushConsumer.setNamesrvAddr("127.0.0.1:9876");
        pushConsumer.subscribe("FilterTopic","TagA || TagC");
        pushConsumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach( n->{
                    System.out.printf("收到消息: %s%n" , new String(n.getBody()));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        pushConsumer.start();
        System.out.printf("TagFilter Consumer Started.%n");
    }
}

使用Sql方式过滤(通过MessageSelector.bySql(String sql)参数实现):

Tag是RocketMQ中特有的一个消息属性。

RocketMQ的最佳实践中就建议使用RocketMQ时,一个应用可以就用一个Topic,而应用中的不同业务就用Tag来区分。

Tag方式有一个很大的限制,就是一个消息只能有一个Tag,这在一些比较复杂的场景就有点不足了。 这时候可以使用SQL表达式来对消息进行过滤。

过滤消息-SQL过滤生产者

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
* 过滤消息-SQL过滤生产者
*/
public class SqlFilterProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        String[] tags = new String[] {"TagA","TagB","TagC"};
        for (int i = 0; i < 15; i++) {
            Message msg = new Message("FilterTopic", //主题
                                      tags[i % tags.length],  //设置消息Tag,用于消费端根据指定Tag过滤消息。
                                      ("TagFilterProducer_"+tags[i % tags.length] +  "_i_" + i).getBytes(StandardCharsets.UTF_8) //消息体。
                                     );
            msg.putUserProperty("lasse", String.valueOf(i));
            SendResult send = producer.send(msg);
            System.out.printf(i + ".发送消息成功:%s%n", send);
        }
        producer.shutdown();
    }
}

过滤消息-SQL过滤消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 过滤消息-SQL过滤消费者
 */
public class SqlFilterConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
        pushConsumer.setNamesrvAddr("127.0.0.1:9876");
        pushConsumer.subscribe("FilterTopic", MessageSelector.bySql("(TAGS is not null And TAGS IN ('TagA','TagC'))"
        + "and (lasse is not null and lasse between 0 and 3)"));
        pushConsumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach( n->{
                    System.out.printf("收到消息: %s%n" , new String(n.getBody()));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        pushConsumer.start();
        System.out.printf("SqlFilter Consumer Started.%n");
    }
}

SQL92语法:

RocketMQ只定义了一些基本语法来支持这个特性。我们可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL ,IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:'abc',必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

使用注意:

  • 只有推模式的消费者可以使用SQL过滤。拉模式是用不了的;
  • 另外消息过滤是在Broker端进行的,提升网络传输性能,但是broker服务会比较繁忙。(consumer将过滤条件推送给broker端)

事务消息

事务消息生产者

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;

/**
 * 事务消息生产者
 */
public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionMQProducer producer = new TransactionMQProducer("TransProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        //使用executorService异步提交事务状态,从而提高系统的性能和可靠性
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        producer.setExecutorService(executorService);

        //本地事务监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);

        producer.start();//启动消息生产者同时半事务发送1.开启事务
        try {
            Message message = new Message("TransactionTopic", null,
                    ("A向B系统转100块钱").getBytes(StandardCharsets.UTF_8));
            TransactionSendResult result = producer.sendMessageInTransaction(message, null);
            System.out.printf("%s%n", result.getSendStatus());//半事务消息是否成功
        } catch (Exception e) {
                //回滚操作
        }


        Thread.sleep(100000);//等待broker端回调
        producer.shutdown();
    }
}

本地事务监听器

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 本地事务监听器
 */
public class TransactionListenerImpl implements TransactionListener {

    @Override
    /**
     * 在提交完事务消息后执行。
     * 返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
     * 返回ROLLBACK_MESSAGE状态的消息会被丢弃。
     * 返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
     */
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        //一端代码疯狂操作
        switch (0) {
            case 0:
                //情况一:本地事务成功
                return LocalTransactionState.COMMIT_MESSAGE;
            case 1:
                //情况二:本地事务失败
                return LocalTransactionState.ROLLBACK_MESSAGE;
            default:
                //情况三:业务复杂,中间过程或者依赖其他操作的返回结果,丢进事务回查(checkLocalTransaction方法↓)
                return LocalTransactionState.UNKNOW;
        }

    }

    //事务回查,默认是60s检查一次
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        //打印每次回查时间
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(sdf.format(new Date()));
        //一端代码疯狂操作
        switch (0) {
            case 0:
                //情况一:本地事务成功
                return LocalTransactionState.COMMIT_MESSAGE;
            case 1:
                //情况二:本地事务失败
                return LocalTransactionState.ROLLBACK_MESSAGE;
            default:
                //情况三:业务复杂,中间过程或者依赖其他操作的返回结果,丢进事务回查(checkLocalTransaction方法↓)
                return LocalTransactionState.UNKNOW;
        }
    }
}

 事务消息消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 消费者使用的尽最大可能性确保成功消费(重试机制+死信队列特殊处理),所以B系统的处理比较简单,开启事务确保消费成功即可。
 * @author Lasse
 */
public class TransactionConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TransactionConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("TransactionTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                try{
                    //开启事务
                    for (MessageExt msg : msgs) {
                        //执行本地事务
                        System.out.println("一顿操作幂等性代码");
                        //本地事务成功
                        System.out.println("commit"+msg.getTransactionId());
                    }
                }catch (Exception e){
                    e.printStackTrace();
                    System.out.println("本地事务失败,重试消费");
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

RocketMQ使用中常见的问题

我们将消息流程分为三大部分,每一部分都有可能会丢失数据。

  • 生产阶段:Producer通过网络将消息发送给Broker,这个发送可能会发生丢失。比如网络延迟不可达等。
  • 存储阶段:Broker肯定是先把消息放到内存的,然后根据刷盘策略持久化到硬盘中。刚收到Producer的消息,放入内存,但是异常宕机了,导致消息丢失。
  • 消费阶段:消费失败。比如先提交ack再消费,处理过程中出现异常,该消息就出现了丢失。

解决方案:

  • 生产阶段:使用同步发送失败重试机制;异步发送重写回调方法检查发送结果;Ack确认机制。
  • 存储阶段:同步刷盘机制;集群模式采用同步复制。
  • 消费阶段:正常消费处理完成才提交ACK;如果处理异常返回重试标识。

RocketMQ如何保证消息顺序

RocketMQ架构本身是无法保证消息有序的,但是提供了相应的API保证消息有序消费。RocketMQ API利用FIFO先进先出的特性,保证生产者消息有序进入同一队列,消费者在同一队列消费就能达到消息的有序消费。

  1. 创建顺序消息:首先,Producer需要确保按照业务需求将消息发送为顺序消息。这意味着在发送消息时,需要指定相同的消息队列选择策略,并保证相同的业务键(Business Key)或消息队列选择器(Message Queue Selector)。
  2. 定义消息队列选择策略:RocketMQ提供了多种消息队列选择策略,用于确定消息发送到哪个队列。其中,按照业务键或消息队列选择器来选择消息队列是实现顺序消费的关键。你可以自定义消息队列选择器来实现自定义的顺序规则。
  3. 设置消费模式:对于顺序消费,Consumer需要设置为集群模式(Cluster Mode),而不是广播模式(Broadcast Mode)。在集群模式下,同一消费者组中的每个Consumer实例只会消费特定队列的消息,从而实现顺序消费。
  4. 注册顺序消费监听器:Consumer需要注册顺序消费的监听器(MessageListener),并实现相应的逻辑处理。监听器将按照顺序接收到的消息进行消费,并保证消息的有序性。

需要注意的是,顺序消费在一些特殊情况下可能会受到限制。例如,如果Broker节点发生故障导致消息迁移或重新负载,可能会破坏消息的严格顺序。此外,由于RocketMQ的分布式特性,顺序消费也会受到网络延迟和负载均衡等因素的影响。因此,在设计应用程序时,需要权衡顺序消费的需求和系统的实际情况。

RocketMQ的事务消息原理

RocketMQ 的事务消息是一种保证消息可靠性的机制。在RocketMQ中,事务消息的实现原理主要是通过两个发送阶段和一个确认阶段来实现的。

  • 发送消息的预处理阶段:在发送事务消息之前,RocketMQ 会将消息的状态设置为“Preparing”,并将消息存储到消息存储库中。
  • 执行本地事务:当预处理阶段完成后,消息发送者需要执行本地事务,并返回执行结果(commit 或 rollback)。
  • 消息的二次确认阶段:根据本地事务的执行结果,如果是 commit,则 RocketMQ 将消息的状态设置为“Committing”;否则将消息的状态设置为“Rollback”。
  • 完成事务:最后在消息的消费者消费该消息时,RocketMQ 会根据消息的状态来决定是否提交该消息。如果消息的状态是“Committing”,则直接提交该消息;否则忽略该消息。

需要注意的是,如果在消息发送的过程中出现异常或者网络故障等问题,RocketMQ 会触发消息回查机制。在回查过程中,RocketMQ 会调用消息发送方提供的回查接口来确认事务的提交状态,从而解决消息投递的不确定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/615915.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

chatgpt赋能python:Python多行换行:如何在Python代码中正确处理多行

Python多行换行&#xff1a;如何在Python代码中正确处理多行 在Python中&#xff0c;代码往往需要按照一定的格式进行排版&#xff0c;以使代码更易于阅读和维护。在编写长代码时经常会遇到一个问题&#xff1a;如何正确处理多行&#xff1f;本文将介绍Python中多行换行的方法…

嵌入式系统中SPI 子系统基本原理实现

1、SPI hardware SPI&#xff1a;Serial Perripheral Interface&#xff0c;串行外围设备接口&#xff0c;由 Motorola 公司提出&#xff0c;是一种高速、全双工、同步通信总线。SPI 以主从方式工作&#xff0c;通常是有一个主设备和一个或多个从设备&#xff0c;无应答机制。…

Mendix 创客访谈录 | 低代码开启新世界

本期创客 李垚 西门子汽车事业部IOT专家 任职于西门子中国DI事业部&#xff0c;主要负责汽车行业西门子产品业务拓展与支持&#xff0c;服务于中国区汽车生产企业。本人主要负责Siplant、边缘计算&#xff0c;低代码技术等产品的售前、售后技术支持和故障排除, 解决客户在产品…

VSCode+GDB+Qemu调试ARM64 linux内核

俗话说&#xff0c;工欲善其事 必先利其器。linux kernel是一个非常复杂的系统&#xff0c;初学者会很难入门。 如果有一个方便的调试环境&#xff0c;学习效率至少能有5-10倍的提升。 为了学习linux内核&#xff0c;通常有这两个需要 可以摆脱硬件&#xff0c;方便的编译和…

React新版扩展特性

目录 Hooks 三个常用的Hook State Hook Effect Hook Ref Hook Context Router 6 声明式路由 编程式路由导航 Hooks (1) Hook是react 18.8.0版本新增的特性/语法 (2) 可以让我们在函数式组件中使用state以及其他的react特性 三个常用的Hook (1) State Hook: React.useSt…

Redis进阶:缓存穿透|缓存击穿|缓存雪崩问题

Redis应用问题 1. 缓存穿透问题1.1 问题描述1.2 解决方案方法一&#xff1a;空值缓存方法二&#xff1a;设置可访问的名单&#xff08;白名单&#xff09;方法三&#xff1a;采用布隆过滤器方法四&#xff1a;进行实时监控 2. 缓存击穿问题2.1 问题描述2.2 解决方案方法一&…

在金融行业工作有哪些必须熟练掌握的 Excel 公式?

金融中需要掌握的 Excel 公式有很多&#xff0c;以下是一些重要的公式和技巧&#xff1a; 1. SUM、AVERAGE、MIN、MAX 等基本统计函数 2. IF、AND、OR 等逻辑函数 3. VLOOKUP、HLOOKUP、INDEX、MATCH 等查找函数 4. PMT、PV、FV、NPV、IRR 等财务函数 5. CONCATENATE、LEF…

nw.js桌面软件开发系列 第0.1节 HTML5和桌面软件开发的碰撞

第0.1节 HTML5和桌面软件开发的碰撞 当我们谈论桌面软件开发技术的时候&#xff0c;你会想到什么&#xff1f;如果不对技术本身进行更为深入的探讨&#xff0c;在我的世界里&#xff0c;有这么多技术概念可以被罗列出来&#xff08;请原谅我本质上是一个Windows程序员的事实&a…

HarmoneyOS--从简单页面跳转开始2

此处对上个页面跳转适当增加内容&#xff0c;主要为List组件和grid组件的使用&#xff0c;适当熟悉最基本的容器Row和Column的使用 Login.ets // ts-nocheck import router from ohos.router; Entry Component struct TextDemo {State name:string State password:string bui…

vue2 + antd 封装动态表单组件(三)

最近有小伙伴在问动态表单如何再次封装&#xff0c;比如配合模态框或者抽屉封装多一层&#xff0c;这样可以大大提高开发效率&#xff0c;结合之前的写的 vue2 antd 封装动态表单组件&#xff08;一&#xff09; vue2 antd 封装动态表单组件&#xff08;二&#xff09; 做了…

【技术选型】时序数据库选型

文章目录 1、前言2、概述2.1 时序数据库的定义2.2 时序数据库的概念2.3 时序数据库的趋势 3、时序数据库对比3.1 influxdb3.2 Prometheus3.3 TDengine3.4 DolphinDB 4、选型结论 1、前言 时序数据治理是数据治理领域核心、打通IT与OT域数据链路&#xff0c;是工业物联网基石、…

Macos中Automator自动操作实现文本加解密、Json格式化、字数统计等操作

文章目录 一、说明打开Automator效果 二、文本加密三、文本解密四、Json格式化五、字数统计 一、说明 在 Automator 的工作流程中&#xff0c;动作是按照从上到下的顺序执行的&#xff0c;每一个动作的输出默认会成为下一个动作的输入。 打开Automator Command 空格 &#…

万万没想到系列,世界上最知名的失败建筑设计合集!

​ 大家好&#xff0c;这里是建模助手。 我们生活在由建筑包围的世界里&#xff0c;生活的面貌造就了建筑的多样性。而矗立的建筑也无言的记录着时代&#xff0c;尤其是一些建筑大师们的作品&#xff0c;可谓是集艺术和美学于一体的一流名作。 但&#xff0c;这不是凡事都有例…

虚拟内存原理介绍

文章目录 1. 虚拟内存介绍2. 虚拟寻址3. 虚拟地址空间3. 页表4. 地址翻译5. TLB加速地址翻译6. 多级页表7. 页面置换算法 1. 虚拟内存介绍 我们知道系统中的所有进程都是共享CPU和主存资源&#xff0c;但这样就会存在一个问题&#xff0c;这么多进程怎么知道主存上的一块内存是…

使用Java语言开发高效易用的--雅书阁商城管理系统

使用Java语言开发雅书阁商城管理系统 如果你正在寻找一种简单而优雅的方式来管理图书&#xff0c;那么使用Java语言开发雅书阁商城管理系统就是一个好选择。下面我们来详细介绍这个系统的开发过程。 效果展示 1.首页 2.注册界面 3.登录成功商城首页 4.购物车 5.电子书…

避开测试开发的常见陷阱:一份实战指南

陷阱一&#xff1a;过度依赖自动化测试 过度依赖自动化测试可能导致对复杂的用户交互和体验不够重视。自动化测试的力量在于它的一致性和覆盖广泛的可能性&#xff0c;但人工测试也同样重要&#xff0c;尤其是对于用户体验和复杂的用户交互。 示例&#xff1a;在一个电商网站…

ROS和ROS2使用

ubuntu20.04下安装qt5.12 https://blog.csdn.net/lj19990824/article/details/121013721 Ubuntu 20.04在桌面左侧边栏添加QT creator快捷图标 https://blog.csdn.net/kavieen/article/details/118695038 Qt和ROS&#xff1a;https://github.com/chengyangkj?tabrepositories…

操作系统原理 —— 内存覆盖与交换(十九)

什么情况下需要覆盖与交换 要弄清楚什么是覆盖与交换的概念&#xff0c;首先我们要知道在什么情况下才会使用到覆盖与交换。 在早期的计算机内存很小的时候&#xff0c;比如 IBM 推出的第一台 PC 机最大只支持 1 MB 大小的内存&#xff0c;因此会经常出现内存大小不够的情况&…

c++函数重载与运算符重载基础

什么是重载 重载&#xff0c;简单说&#xff0c;就是函数或者方法有相同的名称&#xff0c;但是参数列表不相同的情形&#xff0c;这样的同名不同参数的函数或者方法之间&#xff0c;互相称之为重载函数或者方法。 重载的作用&#xff1a;重载函数常用来实现功能类似而所处理的…

【C语言】数组和字符串

目录 数组和字符串 概述 一维数组 一维数组的定义和使用 一维数组的初始化 数组名 二维数组 字符数组与字符串 字符数组与字符串区别 数组和字符串 概述 在程序设计中&#xff0c;为了方便处理数据把具有相同类型的若干变量按有序形式组织起来——称为数组。 数组就…