八股初始:RocketMQ

news2024/11/16 13:35:23

一、消息队列介绍

消息队列是什么

对于 MQ 来说,其实不管是 RocketMQ、Kafka 还是其他消息队列,它们的本质都是:一发一存一消费

将 MQ 掰开了揉碎了来看,都是「一发一存一消费」,再直白点就是一个「转发器」。生产者先将消息投递一个叫做「队列」的容器中,然后再从这个容器中取出消息,最后再转发给消费者,仅此而已。

上面这个图便是消息队列最原始的模型,它包含了两个关键词:消息和队列。

1、消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自定义的复杂格式(只要能按预定格式解析出来即可)。

2、队列:大家应该再熟悉不过了,是一种先进先出数据结构。它是存放消息的容器,消息从队尾入队,从队头出队,入队即发消息的过程,出队即收消息的过程。

MQ的应用场景

目前,MQ 的应用场景非常多,大家能倒背如流的是:系统解耦、异步通信和流量削峰。除此之外,还有延迟通知、最终一致性保证、顺序消息、流式处理等等。

高可用松耦合架构设计

举一个实际例子,比如说电商业务中最常见的「订单支付」场景:在订单支付成功后,需要更新订单状态、更新用户积分、通知商家有新订单、更新推荐系统中的用户画像等等。

引入 MQ 后,订单支付现在只需要关注它最重要的流程:更新订单状态即可。其他不重要的事情全部交给 MQ 来通知。这便是 MQ 解决的最核心的问题:系统解耦。

削峰填谷

诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,削峰填谷是解决该问题的最佳方式;

分布式事务消息

电商的交易系统、支付红包等场景需要确保数据的最终一致性,大量引入 MQ 的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。

RocketMQ介绍

     RocketMQ是一款分布式、队列模型的消息中间件,是阿里巴巴集团自主研发的专业消息中间件,借鉴参考了JMS规范的MQ实现,更参考了优秀的开源消息中间件KAFKA,实现了业务消峰、分布式事务的优秀框架。

   产品基于高可用分布式集群技术,提供消息订阅和发布、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。MQ 历史超过9年,为分布式应用系统提供异步解耦、削峰填谷的能力,同时具备海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性,是阿里巴巴双11使用的核心产品。

特点:

    • 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点
    • Producer、Consumer、队列都可以分布式
    • Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 Consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 Topic 对应的队列集合
    • 能够保证严格的消息顺序
    • 支持拉(pull)和推(push)两种消息模式
    • 高效的订阅者水平扩展能力
    • 实时的消息订阅机制
    • 亿级消息堆积能力
    • 支持多种消息协议,如 JMS、OpenMessaging 等
    • 较少的依赖

消息队列的选择(参考):该如何选择消息队列? - 武培轩 - 博客园

消息中间件的对比

二、核心概念

生产者Producer

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。与 Name Server 集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长链接,且定时向 Master Broker 发送心跳。

RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

同步发送:同步发送指消息发送⽅发出数据后会在收到接收⽅发回响应之后才发下⼀个数据包。⼀般⽤于重要通知消息,例如重要通知邮件、营销短信。

异步发送:异步发送指发送⽅发出数据后,不等接收⽅发回响应,接着发送下个数据包,⼀般⽤于可能链路耗时较⻓⽽对响应时间敏感的业务场景,例如⽤户视频上传后通知启动转码服务。

单向发送:单向发送是指只负责发送消息⽽不等待服务器回应且没有回调函数触发,适⽤于某些耗时⾮常短但对可靠性要求并不⾼的场景,例如⽇志收集。

生产者组Producer Group

生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。

消费者Consumer

与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。

push consumer:消费者端设置Listener

pull consumer:应用可主动从Broker获取消息,主动拉取会存在消费记录位置问题(如果不记录位置,会出现重复消费)

消费者组Consumer Group

消费者组,和生产者类似,消费同一类消息的多个 Consumer 实例组成一个消费者组。

通过组机制(Group),RocketMQ可以天然支持分布式(如下所示)。

名字服务Name Server

名称服务充当路由消息的提供者。是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。在消息队列 RocketMQ 中提供命名服务,更新和发现 Broker 服务。

   NameServer即名称服务,两个功能:

    • 接收broker的请求,注册broker的路由信息
    • 接收client(producer/consumer)的请求,根据某个topic获取其到broker的路由信息NameServer没有状态,可以横向扩展。每个broker在启动的时候会到NameServer注册;Producer在发送消息前会根据topic到NameServer获取路由(到broker)信息;Consumer也会定时获取topic路由信息。

ps:无状态服务

无状态服务(stateless service)对单次请求的处理,不依赖其他请求,也就是说,处理一次请求所需的全部信息,要么都包含在这个请求里,要么可以从外部获取到(比如说数据库),服务器本身不存储任何信息 。

nameServer与consumer、producer以及broker之间的联系(参考):RocketMQ nameserver、broker、生产者和消费者之间的关系_LinYaoGai的博客-CSDN博客_rocketmq broker topic 关系

代理服务器Broker Server

消息中转角色,负责存储消息,转发消息。可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。broker是RocketMQ的核心,它不能挂的,所以需要保证broker的高可用。

  broker分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。

  Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

  每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。

消息主题Topic

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

标签Tag

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

消息队列MessageQueue

对于每个Topic都可以设置一定数量的消息队列用来进行数据的读取,可以理解成ConsumerQueue。里面存入包括topic、起始偏移量、消息长度等内容,消费者消费信息时,根据topic查询consumerqueue文件,找到对应的topic,开始读取消息,此时读取到的数据是消息的起始偏移量和消息长度,根据消息的起始偏移量从commitlog中查找对应的偏移量位置,然后根据消息长度取commitlog中的数据,即取到了指定的消息内容。

三、源码包结构说明

rocketmq-acl:权限相关管理

rocketmq-broker:包含主要的业务逻辑,消息收发,主从同步,pagecache

rocketmq-client:一般我们使用的Consumer和Producer,都是这个包下的

rocketmq-common:公共数据结构等等    

rocketmq-distribution:编译模块,编译输出

rocketmq-example:官方示例

rocketmq-filter:过滤

rocketmq-logappender:日志相关模块

rocketmq-logging:日志相关模块

rocketmq-namesrv:namesrv服务,用于服务协调

rocketmq-openmessaging:提供消息接口

rocketmq-remoting:通信组件模块,封装netty底层通信

rocketmq-srvutil:提供了一些工具类方法

rocketmq-store:消息存储

rocketmq-test:测试模块

rocketmq-tools:消息管理工具,例如mqadmin工具

四、环境搭建(基于Docker)

RocketMQ环境搭建

查找rocketmq镜像

docker search rocketmq

[root@localhost ~]# docker search rocketmq
INDEX       NAME                                           DESCRIPTION                                     STARS     OFFICIAL   AUTOMATED
docker.io   docker.io/foxiswho/rocketmq                    rocketmq                                        54                   
docker.io   docker.io/rocketmqinc/rocketmq                 Image repository for Apache RocketMQ            51                   
docker.io   docker.io/styletang/rocketmq-console-ng        rocketmq-console-ng                             34                   
docker.io   docker.io/apacherocketmq/rocketmq              Docker Image for Apache RocketMQ                19                   
docker.io   docker.io/laoyumi/rocketmq                                                                     10                   [OK]

以第一个为例,查找这个镜像下的版本,选择一个broker和server下载

查看镜像版本

curl https://registry.hub.docker.com/v1/repositories/foxiswho/rocketmq/tags\
| tr -d '[\[\]" ]' | tr '}' '\n'\
| awk -F: -v image='foxiswho/rocketmq' '{if(NR!=NF && $3 != ""){printf("%s:%s\n",image,$3)}}'

为了管理上的方便,rocketmq console也是必不可少的工具

docker pull docker.io/styletang/rocketmq-console-ng

启动nameserver、broker以及rocketmq-console

docker run -d -p 9876:9876 --name rmqserver  foxiswho/rocketmq:server-4.3.2
docker run -d -p 10911:10911 -p 10909:10909 --name rmqbroker --link rmqserver:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m" foxiswho/rocketmq:broker-4.3.2

docker run -d --name rmqconsole -p 8180:8080 --link rmqserver:namesrv\
 -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876\
 -Dcom.rocketmq.sendMessageWithVIPChannel=false"\
 -t styletang/rocketmq-console-ng

访问:http://192.168.106.129:8180/#/,可见环境搭建成功。

ps:

  1. broker服务ip需要修改下,进入broker容器内部,修改 etc/rocketmq/broker.conf 里的brokerIP1=192.168.106.129,否则连接错误

项目集成RocketMQ

引入依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>

测试发送消息

public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.106.129:9876");
        producer.start();
        Message message = new Message("test","生产者发送消息".getBytes());
        producer.send(message);
        producer.shutdown();
    }

测试消费消息

public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer");
        consumer.setNamesrvAddr("192.168.106.129:9876");
        //3 绑定消息的主题(主题订阅)
        consumer.subscribe("test","*");
        //4 消费者监听处理消息的方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println("消费程:"+Thread.currentThread().getName()+
                            ",消息ID:"+msg.getMsgId()+
                            ",消息内容:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }

五、RocketMQ基础使用

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>	

发送消息的三种方式

同步

必须要等消息持久化到磁盘中以后,rocketmq会给生产者返回一个信息,持久化完成。

异步

不需要等消息持久化到磁盘中,就可以执行后面的业务逻辑,可以获得到执行结果。

producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送成功");
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.println("发送失败");
                }
            });
// 注:发送完之后不要立即执行producer.shutdown();否则发不出去

单向

把消息发送到消息中间件中,不关心返回的结果的。

producer.sendOneway(message);

RocketMQ消费模式

集群消费模式

CLUSTERING,可以理解为同组公共消费。公共资源我拿了你就没有。即同一 Topic 下,一个 ConsumerGroup 下如果有多个实例(可以是多个进程,或者多个机器),那么这些实例会均摊消费这些消息,但我消费了这条消费你就不会再消费。消费者默认是集群消费方式。适用于大部分消息业务。

广播消费模式

BROADCASTING,可以理解为同组各自消费。即同一 Topic 下,同一消息会被多个实例各自都消费一次。所以,广播         消费模式中的 ConsumerGroup 概念没有太大的意义。这适用于一些分发消息的场景。

集群模式,一条消息只能被消费一次;广播模式,一条消息可以被多个服务消费。

//设置广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//设置集群模式(默认)
consumer.setMessageModel(MessageModel.CLUSTERING);

RocketMQ消费方式

(1)pull方式

由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;

DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("consumerGroup");

(2)push方式

由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;

使用代码:

public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer");
        consumer.setNamesrvAddr("192.168.106.129:9876");
        //3 绑定消息的主题
        consumer.subscribe("test","tag1||taga");
        // consumer.setMessageModel(MessageModel.BROADCASTING);
        //4 消费者监听处理消息的方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println("消费程:"+Thread.currentThread().getName()+
                            ",消息ID:"+msg.getMsgId()+
                            ",消息内容:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.groupName}", topic = "test")
public class MyMQListener implements RocketMQListener {
    @Override
    public void onMessage(Object o) {
        System.out.println("接收到消息,开始消费"+o.toString());
    }
}

长轮询机制:

RocketMQ的消费方式都是基于拉模式拉取消息的,而在这其中有一种长轮询机制(对普通轮询的一种优化),来平衡上面Push/Pull模型的各自缺点。基本设计思路是:消费者如果第一次尝试Pull消息失败(比如:Broker端没有可以消费的消息),并不立即给消费者客户端返回Response的响应,而是先hold住并且挂起请求(将请求保存至pullRequestTable本地缓存变量中),然后Broker端的后台独立线程—PullRequestHoldService会从pullRequestTable本地缓存变量中不断地去取,具体的做法是查询待拉取消息的偏移量是否小于消费队列最大偏移量,如果条件成立则说明有新消息达到Broker端。

ps:想知道push实现过程可以参考这个:RocketMQ的push模式机制 - 知乎

延时消息

RocketMQ提供了延时消息类型,简单来说就是生产者在发送消息的时候指定一个延时时间,当到达延时时间之后消息才能够被投送到消费者。目前指定的延时时间间隔有1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h,用等级来表示时间间隔。

延时消息实现原理图

业务代码commitLog.java-->asyncPutMessage

// 判断消息类型      
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
				// 把消息存到SCHEDULE_TOPIC_XXXX这个topic中
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

延时消息处理

 RocketMQ提供了定时任务服务ScheduleMessageService,通过定时任务的方式不断的读取topic为SCHEDULE_TOPIC_XXXX何queueId为延时等级的消息进行消息还原处理,这样消息被还原之后消费者就可以拉取消息了。

public void executeOnTimeup() {
			//获取消费者消息
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
 
            long failScheduleOffset = offset;
 
            if (cq != null) {
				//读取消息
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();
 
                            if (cq.isExtAddr(tagsCode)) {
                                if (cq.getExt(tagsCode, cqExtUnit)) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    //can't find ext content.So re compute tags code.
                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                        tagsCode, offsetPy, sizePy);
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }
 
                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
 
                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 
                            long countdown = deliverTimestamp - now;
 							// 到了延时时间还原消息
                            if (countdown <= 0) {
								//获取消息
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);
 
                                if (msgExt != null) {
                                    try {
										//还原消息信息topic名称等
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
										//重新将消息持久化到commitlog中
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.defaultMessageStore
                                                .putMessage(msgInner);
 
                                        if (putMessageResult != null
                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                            continue;
                                        } else {
                                            // XXX: warn and notify me
                                            log.error(
                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                                msgExt.getTopic(), msgExt.getMsgId());
                                            ScheduleMessageService.this.timer.schedule(
                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                nextOffset);
                                            return;
                                        }
                                    } catch (Exception e) {
                                        /*
                                         * XXX: warn and notify me
                                         */
                                        log.error(
                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                                + offsetPy + ",sizePy=" + sizePy, e);
                                    }
                                }
                            } else {
                                // 没到延时时间,进行下一次定时任务
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                    countdown);
                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                return;
                            }
                        } // end of for
 
                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    } finally {
 
                        bufferCQ.release();
                    }
                } // end of if (bufferCQ != null)
                else {
 
                    long cqMinOffset = cq.getMinOffsetInQueue();
                    if (offset < cqMinOffset) {
                        failScheduleOffset = cqMinOffset;
                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                            + cqMinOffset + ", queueId=" + cq.getQueueId());
                    }
                }
            } // end of if (cq != null)
 
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }

发送消息示例

public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.106.129:9876");
        producer.start();
        int totalMessageSend = 10;
        for (int i = 0;i<totalMessageSend;i++){
            Message message = new Message("test","生产者发送消息".getBytes());
            // 设置延时等级
            message.setDelayTimeLevel(i);
            producer.send(message);
        }
        producer.shutdown();
    }

顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

生产者

public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.106.129:9876");
        producer.start();
        int totalMessageSend = 20;
        for (int i = 0;i<totalMessageSend;i++){
            Message message = new Message("test2","*",("生产者发送消息sync"+i).getBytes());
            //message.setDelayTimeLevel(0);
            producer.send(message,new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;  //可以根据订单id选择发送queue
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, 1L); // 这个1可以是订单id
        }
        // Thread.sleep(1000);
        producer.shutdown();
    }

消费者:

public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer");
        consumer.setNamesrvAddr("192.168.106.129:9876");
        //3 绑定消息的主题
        consumer.subscribe("test2","*");
        // consumer.setMessageModel(MessageModel.BROADCASTING);
        //4 消费者监听处理消息的方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println("消费程:"+Thread.currentThread().getName()+
                            ",消息ID:"+msg.getMsgId()+
                            ",消息内容:"+new String(msg.getBody())+"队列id:"+msg.getQueueId());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }

执行结果:

消息过滤

Tag标签过滤

producer

Message message = new Message("test","taga",("生产者发送消息sync"+i).getBytes());
consumer

consumer.subscribe("test","tag1||taga");

sql92(参考官方样例)

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它,只有使用push模式的消费者才能用使用SQL92标准的sql语句。。

    • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
    • 字符比较,比如:=,<>,IN;
    • IS NULL 或者 IS NOT NULL;
    • 逻辑符号 AND,OR,NOT;

常量支持类型为:

    • 数值,比如:123,3.1415;
    • 字符,比如:'abc',必须用单引号包裹起来;
    • NULL,特殊的常量
    • 布尔值,TRUEFALSE

样例:

生产者

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
   tag,
   ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();

消费者

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();

springboot整合RocketMQ

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>	

可以使用 RocketMQTemplate rocketMQTemplate进行发送消息

@Component
public class RocketMqHelper {
    /**
     * 日志
     */
    private static final Logger LOG = LoggerFactory.getLogger(RocketMqHelper.class);

    /**
     * rocketmq模板注入
     */
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @PostConstruct
    public void init() {
        LOG.info("---RocketMq助手初始化---");
    }

    /**
     * 发送异步消息
     *
     * @param topic   消息Topic
     * @param message 消息实体
     */
    public void asyncSend(Enum topic, Message<?> message) {
        asyncSend(topic.name(), message, getDefaultSendCallBack());
    }


    /**
     * 发送异步消息
     *
     * @param topic        消息Topic
     * @param message      消息实体
     * @param sendCallback 回调函数
     */
    public void asyncSend(Enum topic, Message<?> message, SendCallback sendCallback) {
        asyncSend(topic.name(), message, sendCallback);
    }

    /**
     * 发送异步消息
     *
     * @param topic   消息Topic
     * @param message 消息实体
     */
    public void asyncSend(String topic, Message<?> message) {
        rocketMQTemplate.asyncSend(topic, message, getDefaultSendCallBack());
    }

    /**
     * 发送异步消息
     *
     * @param topic        消息Topic
     * @param message      消息实体
     * @param sendCallback 回调函数
     */
    public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback);
    }

    /**
     * 发送异步消息
     *
     * @param topic        消息Topic
     * @param message      消息实体
     * @param sendCallback 回调函数
     * @param timeout      超时时间
     */
    public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
    }

    /**
     * 发送异步消息
     *
     * @param topic        消息Topic
     * @param message      消息实体
     * @param sendCallback 回调函数
     * @param timeout      超时时间
     * @param delayLevel   延迟消息的级别
     */
    public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
    }

    /**
     * 发送顺序消息
     *
     * @param message
     * @param topic
     * @param hashKey
     */
    public void syncSendOrderly(Enum topic, Message<?> message, String hashKey) {
        syncSendOrderly(topic.name(), message, hashKey);
    }


    /**
     * 发送顺序消息
     *
     * @param message
     * @param topic
     * @param hashKey
     */
    public void syncSendOrderly(String topic, Message<?> message, String hashKey) {
        LOG.info("发送顺序消息,topic:" + topic + ",hashKey:" + hashKey);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
    }

    /**
     * 发送顺序消息
     *
     * @param message
     * @param topic
     * @param hashKey
     * @param timeout
     */
    public void syncSendOrderly(String topic, Message<?> message, String hashKey, long timeout) {
        LOG.info("发送顺序消息,topic:" + topic + ",hashKey:" + hashKey + ",timeout:" + timeout);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
    }

    public void sendFilterProperty(String topic, String message, Map<String,Object> map){
        rocketMQTemplate.convertAndSend(topic,message,map);
    }

    /**
     * 默认CallBack函数
     *
     * @return
     */
    private SendCallback getDefaultSendCallBack() {
        return new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                LOG.info("---发送MQ成功---");
            }

            @Override
            public void onException(Throwable throwable) {
                LOG.error("---发送MQ失败---"+throwable.getMessage(), throwable.getMessage());
            }
        };
    }


    @PreDestroy
    public void destroy() {
        LOG.info("---RocketMq助手注销---");
    }
}

六、事务消息

Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC(两阶段提交)的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。

RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:

Half(Prepare) Message——半消息(预处理消息)

半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。

Message Status Check——消息状态回查

由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

1.事务消息发送及提交:

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

2.补偿流程:

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

half消息

在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。(ps:类似延时消息原理)

七、消息持久化

我们知道 RocketMQ 是一款高性能、高可靠的分布式消息中间件,高性能和高可靠是很难兼得的。因为要保证高可靠,那么数据就必须持久化到磁盘上,将数据持久化到磁盘,那么可能就不能保证高性能了。

RocketMQ 在持久化的设计上,采取的是「消息顺序写、随机读的策略」,利用磁盘顺序写的速度,让磁盘的写速度不会成为系统的瓶颈。并且采用 MMPP 这种“零拷贝”技术,提高消息存盘和网络发送的速度。极力满足 RocketMQ 的高性能、高可靠要求。

消息存储架构

RocketMQ持久化机制架构图

在 RocketMQ 持久化机制中,涉及到了三个角色:

    • 「CommitLog」:消息真正的存储文件,所有消息都存储在 CommitLog 文件中。
    • 「ConsumeQueue」:消息消费逻辑队列,类似数据库的索引文件。
    • 「IndexFile」:消息索引文件,主要存储消息 Key 与 offset 对应关系,提升消息检索速度。

CommitLog 文件是存放消息数据的地方,所有的消息都将存入到 CommitLog 文件中。生产者将消息发送到 RocketMQ 的 Broker 后,Broker 服务器会将「消息顺序写入到 CommitLog 文件中」,这也就是 RocketMQ 高性能的原因,因为我们知道磁盘顺序写特别快,RocketMQ 充分利用了这一点,极大的提高消息写入效率。

但是消费者消费消息的时候,可能就会遇到麻烦,每一个消费者只能订阅一个主题,消费者关心的是订阅主题下的所有消息,但是同一主题的消息在 CommitLog 文件中可能是不连续的,那么「消费者消费消息的时候,需要将 CommitLog 文件加载到内存中遍历查找订阅主题下的消息,频繁的 IO 操作,性能就会急速下降」。

为了解决这个问题,RocketMQ 引入了 Consumequeue 文件。「Consumequeue 文件可以看作是索引文件,类似于 MySQL 中的二级索引」。在存放了同一主题下的所有消息,消费者消费的时候只需要去对应的 Consumequeue 组中取消息即可。Consumequeue 文件不会存储消息的全量信息,原理类似MySQL 索引

关于indexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

ps:上图针对的是集群模式下的消费者消费信息,那么广播模式怎么实现的,一个消费这消费所有消息。

简单说就是两方面:

1:广播模式消费场景下的rebalance过程相比集群模式而言不存在分配MessageQueue的过程,每个consumer负责订阅的topic下的所有MessageQueue。

参考源码:RebalanceImpl

private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    // 负载均衡的过程中没有进行AllocateMessageQueueStrategy进行分配
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}",
                            consumerGroup,
                            topic,
                            mqSet,
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

2:针对广播模式,我们使用本地文件LocalFileOffsetStore来存储消费位移。

参考源码:DefaultMQPushConsumerImpl

public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();

                this.copySubscription();

                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }

                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            // 使用本地存储
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();

                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();

                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
    }

消息刷盘

在 RocketMQ 中提供了「同步刷盘」「异步刷盘」两种刷盘方式,可以通过 Broker 配置文中中的 flushDiskType 参数来设置(SYNC_FLUSH、ASYNC_FLUSH)。

「异步刷盘方式(默认)」:消息写入到内存的 PAGECACHE中,就立刻给客户端返回写操作成功,当 PAGECACHE 中的消息积累到一定的量时,触发一次写操作,将 PAGECACHE 中的消息写入到磁盘中。这种方式「吞吐量大,性能高,但是 PAGECACHE 中的数据可能丢失,不能保证数据绝对的安全」

「同步刷盘方式」:消息写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式「可以保证数据绝对安全,但是吞吐量不大」

欢迎关注公众号

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/339864.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

记一次上环境获取资源失败的案例

代码结构以及资源位置 测试代码 RestController RequestMapping("/json") public class JsonController {GetMapping("/user/1")public String queryUserInfo() throws Exception {// 如果使用全路径, 必须使用/开头String path JsonController.class.ge…

《计算机组成与设计》03. 计算机的算术运算

文章目录整数运算加法与减法乘法普通十进制乘法硬件中实现步骤例子乘法器的设计除法普通十进制除法硬件中实现步骤例子除法器的设计浮点数运算科学计数法、规格化数浮点表示单精度浮点数双精度浮点数移码表示法IEEE 754指数偏移值&#xff08;exponent bias&#xff09;规格化的…

计算机网络4:计算机网络体系结构

目录计算机网络体系结构1.网络模型2.每一层的代表含义2.1 OSI7层模型2.2 五层协议2.3 TCP/IP 四层协议3.数据在各层之间的传输过程4.为什么要进行分层计算机网络体系结构 1.网络模型 2.每一层的代表含义 2.1 OSI7层模型 &#xff08;1&#xff09;物理层&#xff1a;比特流–…

STC15中断系统介绍

STC15中断系统介绍✨本篇参考来源于STC官方stc15系列手册:538页- 589页。&#xff08;文末提供该摘取部分的文档资料&#xff09; &#x1f389;在官方提供的手册资料中&#xff0c;一个系列一份手册&#xff0c;手册内容涵盖了数据手册和参考手册以及例程案例。对于学习着来说…

彻底搞懂分布式系统服务注册与发现原理

目录 引入服务注册与发现组件的原因 单体架构 应用与数据分离

火遍全球的ChatGPT技术简介与主干网络代码

如果说当下最火的AI技术和话题是什么&#xff0c;恐怕很难绕开ChatGPT。各大厂商都在表示未来要跟进ChatGPT技术&#xff0c;开发在自然语言处理智能系统&#xff0c;可见其影响力。本篇博客追个热度&#xff0c;来简单的介绍下ChatGPT到底是一项什么技术&#xff0c;究竟如何完…

深入理解innodb存储格式,双写机制,buffer pool底层结构和淘汰策略

MySql系列整体栏目 内容链接地址【一】深入理解mysql索引本质https://blog.csdn.net/zhenghuishengq/article/details/121027025【二】深入理解mysql索引优化以及explain关键字https://blog.csdn.net/zhenghuishengq/article/details/124552080【三】深入理解mysql的索引分类&a…

【网络编程】Java中的Socket

文章目录前言socket是什么&#xff1f;Java中的SocketJava实现网络上传文件前言 所谓Socket&#xff08;套接字&#xff09;&#xff0c;就是对网络中不同主机上的应用进程之间进行双向通信的端点的抽象。一个套接字就是网络上进程通信的一端&#xff0c;提供了应用层进程利用…

kafka入门篇

文章目录前言介绍概念与说明安装启动配置命令操作创建topic查看topic列表发送消息&#xff08;启动一个生产者&#xff09;消费消息&#xff08;启动一个消费者&#xff09;查询topic信息删除topic集群关机使用报错java连接示例前言 作为入门篇&#xff0c;主要是了解Kafka的概…

在windows下载安装netcat(nc)命令

参考文章 一、netcat(nc)下载 网盘下载 netcat(nc)下载地址&#xff1a;netcat 1.11 for Win32/Win64 二、配置环境变量 在Path里添加netcat的存放路径 参数 说明 -C 类似-L选项&#xff0c;一直不断连接[1.13版本新加的功能] -d 后台执行 -e prog 程序重定向&am…

能取代90%人工作的ChatGPT到底牛在哪?

&#x1f4e3;&#x1f4e3;&#x1f4e3;&#x1f4e3;&#x1f4e3;&#x1f4e3;&#x1f4e3; &#x1f38d;大家好&#xff0c;我是慕枫 &#x1f38d;前阿里巴巴高级工程师&#xff0c;InfoQ签约作者、阿里云专家博主&#xff0c;一直致力于用大白话讲解技术知识 &#x…

Web 框架 Flask 快速入门(二)表单

课程地址&#xff1a;Python Web 框架 Flask 快速入门 文章目录&#x1f334; 表单1、表单介绍2、表单的简单实现1. 代码2. 代码的执行逻辑3、使用wtf扩展实现4、bug记录&#xff1a;表单验证总是失败&#x1f334; 表单 1、表单介绍 当我们在网页上填写账号密码进行登录的时…

Spring 面试题(一):Spring 如何处理全局异常?

❤️ 博客首页&#xff1a;水滴技术 &#x1f680; 支持水滴&#xff1a;点赞&#x1f44d; 收藏⭐ 留言&#x1f4ac; &#x1f338; 订阅专栏&#xff1a;Spring 教程&#xff1a;从入门到精通 文章目录1、如何处理全局异常2、代码示例2.1、定义统一的“响应结果对象”2.2、…

Leetcode 回溯详解

回溯法 回溯法有“通用解题法”之称&#xff0c;用它可以系统地搜索问题的所有解。回溯法是一个既带有系统性又带有跳跃性的搜索算法。 在包含问题的所有解的解空间树中&#xff0c;按照深度优先搜索(DFS)&#xff09;的策略&#xff0c;从根结点出发深度探索解空间树。当探索…

MWORKS--同元软控MWORKS介绍、安装与使用

MWORKS--同元软控MWORKS介绍、安装与使用1 同元软控介绍1.1 同元软控简介1.2 同元软控发展历史2 MWORKS介绍2.1 MWORKS简介2.2 MWORKS产品描述3 装备数字化3.1 发展3.2 内涵3.3 系统模型发展成为产品的一部分3.4 MWORKS系统模型数据管理3.4 MWORKS为装备数字化提供的套件参考1 …

springcloud集成seata(AT)分布式事务

目录 一、 下载seata server和seata源码 二、配置启动seata 2.1 在nacos控制台&#xff0c;新建一个seata的名称空间&#xff0c;用于存放seata的专用配置 2.2 创建seata server的mysql库 2.3 在nacos上配置seata相关配置 &#xff08;seata名称空间&#xff09; 2.4 启动…

家政服务小程序实战教程08-宫格导航

小程序一般会在首页显示商品的分类&#xff0c;这类需求我们在微搭中是使用宫格导航组件来实现。 01 组件说明 宫格导航组件可以在导航配置里设置菜单&#xff0c;可以手动添加&#xff0c;也可以变量绑定 因为我们一般的分类是动态变化的&#xff0c;品类会不断的调整&#…

阿里代码规范插件中,Apache Beanutils为什么被禁止使用?

在实际的项目开发中&#xff0c;对象间赋值普遍存在&#xff0c;随着双十一、秒杀等电商过程愈加复杂&#xff0c;数据量也在不断攀升&#xff0c;效率问题&#xff0c;浮出水面。 问&#xff1a;如果是你来写对象间赋值的代码&#xff0c;你会怎么做&#xff1f; 答&#xf…

[Java 进阶面试题] CAS 和 Synchronized 优化过程

最有用的东西,是你手里的钱,有钱就有底气,还不快去挣钱~ 文章目录CAS 和 Synchronized 优化过程1. CAS1.1 CAS的原理1.2 CAS实现自增自减的原子性1.3 CAS实现自旋锁1.4 CAS针对ABA问题的优化2. synchronized2.1 synchronized加锁阶段分析2.2 synchronized优化CAS 和 Synchroniz…

nodejs+vue大学生在线选课系统vscode - Visual Studio Code

3、数据库进行设计&#xff0c;建立约束和联系。 4、创建程序框架&#xff0c;代码分成三层结构&#xff1a;接口层、业务层、表示层&#xff0c;设计窗口和主窗口&#xff0c;主窗口菜单项依照系统模块图设计。 5、设计数据访问的接口&#xff0c;供各模块调用。完成登录功能…