springboot——消息中间件

news2024/11/16 11:26:53

消息的概念

  1. 从广义角度来说,消息其实就是信息,但是和信息又有所不同。信息通常被定义为一组数据,而消息除了具有数据的特征之外,还有消息的来源与接收的概念。通常发送消息的一方称为消息的生产者,接收消息的一方称为消息的消费者
    • 消息的发送方
      • 生产者
    • 消息的接收方
      • 消费者
  2. 为什么要设置生产者和消费者呢?这就是要说到消息的意义了。
    • 信息通常就是一组数据,但是消息由于有了生产者和消费者,就出现了消息中所包含的信息可以被二次解读,生产者发送消息,可以理解为生产者发送了一个信息,也可以理解为生产者发送了一个命令;消费者接收消息,可以理解为消费者得到了一个信息,也可以理解为消费者得到了一个命令。
    • 对比一下我们会发现信息是一个基本数据,而命令则可以关联下一个行为动作,这样就可以理解为基于接收的消息相当于得到了一个行为动作,使用这些行为动作就可以组织成一个业务逻辑,进行进一步的操作。
    • 总的来说,消息其实也是一组信息,只是为其赋予了全新的含义,因为有了消息的流动,并且是有方向性的流动,带来了基于流动的行为产生的全新解读。开发者就可以基于消息的这种特殊解,将其换成代码中的指令。
  3. 对于消息的理解,初学者总认为消息内部的数据非常复杂,这是一个误区。比如我发送了一个消息,要求接受者翻译发送过去的内容。初学者会认为消息中会包含被翻译的文字,以及本次操作要执行翻译操作而不是打印操作。其实这种现象有点过度解读了,发送的消息中仅仅包含被翻译的文字,但是可以通过控制不同的人接收此消息来确认要做的事情。
    • 例如发送被翻译的文字仅到A程序,而A程序只能进行翻译操作,这样就可以发送简单的信息完成复杂的业务了,是通过接收消息的主体不同,进而执行不同的操作,而不会在消息内部定义数据的操作行为,当然如果开发者希望消息中包含操作种类信息也是可以的,只是提出消息的内容可以更简单,更单一。
  4. 对于消息的生产者与消费者的工作模式,还可以将消息划分成两种模式同步消息与异步消息。
    • 所谓同步消息就是生产者发送完消息,等待消费者处理,消费者处理完将结果告知生产者,然后生产者继续向下执行业务。这种模式过于卡生产者的业务执行连续性,在现在的企业级开发中,上述这种业务场景通常不会采用消息的形式进行处理。
    • 所谓异步消息就是生产者发送完消息,无需等待消费者处理完毕,生产者继续向下执行其他动作。
      • 比如生产者发送了一个日志信息给日志系统,发送过去以后生产者就向下做其他事情了,无需关注日志系统的执行结果。日志系统根据接收到的日志信息继续进行业务执行,是单纯的记录日志,还是记录日志并报警,这些和生产者无关,这样生产者的业务执行效率就会大幅度提升。并且可以通过添加多个消费者来处理同一个生产者发送的消息来提高系统的高并发性,改善系统工作效率,提高用户体验。一旦某一个消费者由于各种问题宕机了,也不会对业务产生影响,提高了系统的高可用性
  5. 消息也可以叫做队列,一般都称为:消息队列。
  6. 消息的使用场景:
    在这里插入图片描述

Java处理消息的标准规范

  1. 目前企业级开发中广泛使用的消息处理技术共三大类(三大规范),具体如下:
    • JMS
    • AMQP
    • MQTT(了解即可)
  2. 为什么是三大类,而不是三个技术呢?因为这些都是规范,就像JDBC技术,是个规范,开发针对规范开发,运行还要靠实现类。
    • 例如MySQL提供了JDBC的实现,最终运行靠的还是实现。并且这三类规范都是针对异步消息进行处理的,也符合消息的设计本质,处理异步的业务。

JMS

  1. JMS(Java Message Service),这是一个规范,作用等同于JDBC规范,提供了与消息服务相关的API接口。
  2. JMS消息模型,JMS规范中规范了消息有两种模型(从消息的生产和消费过程来进行区分)。分别是点对点模型发布订阅模型
    • 点对点模型:peer-2-peer,生产者会将消息发送到一个保存消息的容器中,通常使用队列模型,使用队列保存消息。一个队列的消息只能被一个消费者消费,或未被及时消费导致超时。这种模型下,生产者和消费者是一对一绑定的。
    • 发布订阅模型:publish-subscribe,生产者将消息发送到一个保存消息的容器中,也是使用队列模型来保存。但是消息可以被多个消费者消费,生产者和消费者完全独立,相互不需要感知对方的存在。
  3. JMS消息种类,根据消息中包含的数据种类划分,可以将消息划分成6种消息。
    • TextMessage
    • MapMessage
    • BytesMessage
    • StreamMessage
    • ObjectMessage
    • Message (只有消息头和属性)
  4. 总结:JMS主张不同种类的消息,消费方式不同,可以根据使用需要,选择不同种类的消息。但是这一点也成为其诟病之处。整体上来说,JMS就是典型的保守派,什么都按照J2EE的规范来,做一套规范,定义若干个标准,每个标准下又提供一大批API。
    • 目前对JMS规范实现的消息中间件技术还是挺多的,毕竟是皇家御用。例如ActiveMQ、Redis、HornetMQ。但是也有一些不太规范的实现,参考JMS的标准设计,但是又不完全满足其规范,例如:RabbitMQ、RocketMQ。

AMQP

  1. JMS的问世为消息中间件提供了很强大的规范性支撑,但是使用的过程中就开始被人发现诟病:比如JMS设置的极其复杂的多种类消息处理机制

    • 本来分门别类处理挺好的,为什么会被诟病呢?
      • 原因就在于JMS的设计是J2EE规范,站在Java开发的角度思考问题。但是现实往往是复杂度很高的。
        • 比如我有一个.NET开发的系统A,有一个Java开发的系统B,现在要从A系统给B系统发业务消息,结果两边数据格式不统一,没法操作。JMS不是可以统一数据格式吗?提供了6种数据种类,总有一款适合你啊。NO,一个都不能用。因为A系统的底层语言不是Java语言开发的,根本不支持那些对象。这就意味着如果想使用现有的业务系统A继续开发已经不可能了,必须推翻重新做使用Java语言开发的A系统。
    • 这时候有人就提出说,你搞那么复杂,整那么多种类干什么?找一种大家都支持的消息数据类型不就解决这个跨平台的问题了吗?大家一想,对啊,于是AMQP孕育而生。
      • 单从上面的说明中其实可以明确感知到,AMQP的出现解决的是消息传递时使用的消息种类的问题,化繁为简,但是其并没有完全推翻JMS的操作API,所以说AMQP仅仅是一种协议,规范了数据传输的格式而已。
  2. AMQP(advanced message queuing protocol):一种协议(高级消息队列协议,也是消息代理规范),规范了网络交换的数据格式,兼容JMS操作。

    • 优点
      • 具有跨平台性,服务器供应商,生产者,消费者可以使用不同的语言来实现
  3. AMQP在JMS的消息模型基础上又进行了进一步的扩展,除了点对点和发布订阅的模型,开发了几种全新的消息模型,适应各种各样的消息发送。

    • AMQP消息模型
      • direct exchange
      • fanout exchange
      • topic exchange
      • headers exchange
      • system exchange
    • AMQP消息种类:byte[]。(为啥使用byte类型,目的是为了跨平台,不管你是用什么品牌技术,都可以读取字节。)
  4. 目前实现了AMQP协议的消息中间件技术也很多,而且都是较为流行的技术,例如:RabbitMQ、StormMQ、RocketMQ

MQTT

  1. MQTT(Message Queueing Telemetry Transport)消息队列遥测传输,专为小设备设计,是物联网(IOT)生态系统中主要成分之一。由于与JavaEE企业级开发没有交集,此处不作过多的说明。

KafKa

  1. Kafka,一种高吞吐量的分布式发布订阅消息系统,提供实时消息功能。Kafka技术并不是作为消息中间件为主要功能的产品,但是其拥有发布订阅的工作模式,也可以充当消息中间件来使用,而且目前企业级开发中其身影也不少见。

  2. springboot整合各种各样的消息中间件:各种消息中间件必须先安装再使用。

SpringBoot整合ActiveMQ

ActiveMQ的安装

  1. ActiveMQ是MQ产品中的元老级产品,早期标准MQ产品之一,在AMQP协议没有出现之前,占据了消息中间件市场的绝大部分份额,后期因为AMQP系列产品的出现,迅速走弱,目前仅在一些线上运行的产品中出现,新产品开发较少采用。
  2. windows版安装包下载地址:https://activemq.apache.org/components/classic/download/
  3. 下载的安装包是解压缩就能使用的zip文件,解压缩完毕后会得到如下文件
    在这里插入图片描述
  4. 启动服务器:运行bin目录下的win32或win64目录下的activemq.bat命令即可,根据自己的操作系统选择即可,默认对外服务端口61616。
    在这里插入图片描述
  5. 访问web管理服务:ActiveMQ启动后会启动一个Web控制台服务,可以通过该服务管理ActiveMQ。
http://127.0.0.1:8161/

web管理服务默认端口8161,访问后可以打开ActiveMQ的管理界面,首先输入访问用户名和密码,初始化用户名和密码相同,均为:admin,成功登录后进入管理后台界面
在这里插入图片描述
在这里插入图片描述
ActiveMQ运行后占用的端口有:61616、5672、61613、1883、61614,如果启动失败,请先管理对应端口即可。

ActiveMQ的整合

整合步骤;加坐标,做配置,调接口

在这里插入图片描述

  1. 步骤①:导入springboot整合ActiveMQ的starter
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
  1. 步骤②:配置ActiveMQ的服务器地址
spring:
  activemq:
    broker-url: tcp://localhost:61616
  1. 步骤③:使用JmsMessagingTemplate操作ActiveMQ
    • 发送消息需要先将消息的类型转换成字符串,然后再发送,所以是convertAndSend,定义消息发送的位置,和具体的消息内容,此处使用id作为消息内容。
    • 接收消息需要先将消息接收到,然后再转换成指定的数据类型,所以是receiveAndConvert,接收消息除了提供读取的位置,还要给出转换后的数据的具体类型。

MessageService

public interface MessageService {
//    自定义方法

//    把消息放入消息队列中
    void sendMessage(String id);

//    把消息从消息队列中取出来
    String doMessage();
}
@Service
public class MessageServiceActivemqImpl implements MessageService {
    @Autowired
    private JmsMessagingTemplate messagingTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列,id:"+id);
        //第一个参数是消息队列名
        messagingTemplate.convertAndSend("order.queue.id",id);
    }

    @Override
    public String doMessage() {
    //第一个参数是消息队列名
        String id = messagingTemplate.receiveAndConvert("order.queue.id",String.class);
        System.out.println("已完成短信发送业务,id:"+id);
        return id;
    }
}

MessageController

@RestController
@RequestMapping("/msgs")
public class MessageController {

    @Autowired
    private MessageService messageService;

    @GetMapping
    public String doMessage(){
        String id = messageService.doMessage();
        return id;
    }
}

orderService

public interface OrderService {
    void order(String id);
}

orderServiceImpl

@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private MessageService messageService;

//    处理消息
    @Override
    public void order(String id) {
        //一系列操作,包含各种服务调用,处理各种业务
        System.out.println("订单处理开始");
        //短信消息处理
        messageService.sendMessage(id);
        System.out.println("订单处理结束");
        System.out.println();
    }
}

OrderController

@RestController
@RequestMapping("/orders")
public class OrderController {

    @Autowired
    private OrderService orderService;

    @PostMapping("{id}")
    public void order(@PathVariable String id){
        orderService.order(id);
    }
}
  1. 步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息(就不用手动获取消息了,上面的MessageController的操作就可以取消了)
@Component
public class MessageListener {
    @JmsListener(destination = "order.queue.id") //监听ActiveMQ中指定名称的消息队列
    @SendTo("order.other.queue.id")//将下面方法的返回值传递到另一个消息队列中,参数就是消息队列的名称。
    public String receive(String id){
        System.out.println("已完成短信发送业务,id:"+id);
        return "new:"+id;
    }
}

使用注解@JmsListener定义当前方法监听ActiveMQ中指定名称的消息队列。
如果当前消息队列处理完还需要继续向下传递当前消息到另一个队列中使用注解@SendTo即可,这样即可构造连续执行的顺序消息队列。

  1. 步骤⑤:切换消息模型由点对点模型到发布订阅模型,修改jms配置即可
spring:
  activemq:
    broker-url: tcp://localhost:61616
  jms:
    pub-sub-domain: true

pub-sub-domain默认值为false,即点对点模型,修改为true后就是发布订阅模型。

  1. 总结
      1. springboot整合ActiveMQ提供了JmsMessagingTemplate对象作为客户端操作消息队列
      1. 操作ActiveMQ需要配置ActiveMQ服务器地址,默认端口61616
      1. 企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@JmsListener
      1. 配置jms的pub-sub-domain属性可以在点对点模型和发布订阅模型间切换消息模型
      1. 使用activeMQ之前记得打开MQ服务器

SpringBoot整合RabbitMQ

  1. RabbitMQ是MQ产品中的目前较为流行的产品之一,它遵从AMQP协议。RabbitMQ的底层实现语言使用的是Erlang,所以安装RabbitMQ需要先安装Erlang。

Erlang安装

  1. windows版安装包下载地址:https://www.erlang.org/downloads
  2. 下载完毕后得到exe安装文件,一键傻瓜式安装,安装完毕需要重启,需要重启,需要重启。
    • 安装的过程中可能会出现依赖Windows组件的提示,根据提示下载安装即可,都是自动执行的。
  3. Erlang安装后需要配置环境变量,否则RabbitMQ将无法找到安装的Erlang。需要配置项如下,作用等同JDK配置环境变量的作用。
  • ERLANG_HOME
  • PATH
    在这里插入图片描述
    在这里插入图片描述

rabbitmq-安装

  1. windows版安装包下载地址:https://rabbitmq.com/install-windows.html

  2. 下载完毕后得到exe安装文件,一键傻瓜式安装,安装完毕后会得到如下文件(安装后最好重启一下)
    在这里插入图片描述

  3. 启动服务器的命令:

rabbitmq-service.bat start		# 启动服务
rabbitmq-service.bat stop		# 停止服务
rabbitmqctl status				# 查看服务状态
  1. 运行sbin目录下的rabbitmq-service.bat命令即可,start参数表示启动,stop参数表示退出,默认对外服务端口5672。 有没有感觉5672的服务端口很熟悉?activemq与rabbitmq有一个端口冲突问题,这两个不能同时运行。(可以修改一个端口)
    • 注意:启动rabbitmq的过程实际上是开启rabbitmq对应的系统服务,需要管理员权限方可执行。
      • 不是管理员权限会发送访问错误
        在这里插入图片描述
        在这里插入图片描述
  2. 不喜欢命令行的可以使用任务管理器中的服务页,找到RabbitMQ服务,使用鼠标右键菜单控制服务的启停。
    在这里插入图片描述
  3. 访问web管理服务
    • RabbitMQ也提供有web控制台服务,但是此功能是一个插件,需要先启用才可以使用。
rabbitmq-plugins.bat list							# 查看当前所有插件的运行状态
rabbitmq-plugins.bat enable rabbitmq_management		# 启动rabbitmq_management插件

启动插件后可以在插件运行状态中查看是否运行,运行后通过浏览器即可打开服务后台管理界面

http://localhost:15672

rabbitmq的web管理服务默认端口15672,访问后可以打开RabbitMQ的管理界面,如下:

在这里插入图片描述
首先输入访问用户名和密码,初始化用户名和密码相同,均为:guest,成功登录后进入管理后台界面:在这里插入图片描述

springboot整合RabbitMq(direct模型)

在这里插入图片描述

RabbitMQ满足AMQP协议,因此不同的消息模型对应的制作不同,先使用最简单的direct模型开发。

  1. 步骤①:导入springboot整合amqp的starter,amqp协议默认实现为rabbitmq方案
 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

    </dependencies>
  1. 步骤②:配置RabbitMQ的服务器地址
spring:
  rabbitmq:
    host: localhost
    port: 5672  # 配置端口号(默认为5672)
  1. 步骤③:初始化直连模式系统设置

由于RabbitMQ不同模型要使用不同的交换机,因此需要先初始化RabbitMQ相关的对象,例如队列,交换机等

package com.knife.service.impl.rabbitmq.direct.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration //作为配置类
public class RabbitConfigDirect {


//   Queue的包选择 : import org.springframework.amqp.core.Queue;
//    定义一个存储消息的消息队列对象
    @Bean
    public Queue directQueue(){

//        return new Queue("direct_queue",true,false,false);
//         第一个参数      指定消息队列对象的名称
//         第二个参数      durable:是否持久化,默认为true
//        第三个参数      exclusive:是否当前连接专用,默认为false,连接关闭后队列即被删除
//       第四个参数      autoDelete:是否自动删除,当生产者活消费者不适应此队列,自动删除,默认为false

        return new Queue("direct_queue");
    }


//    定义一个交换机对象
    @Bean
    public DirectExchange directExchange(){
//          参数是自定义的交换机名称
        return new DirectExchange("directExchange");
    }

//    把队列和交换机绑定在一起,绑定关系
    @Bean
    public Binding bindingDirect(){
//        direct为绑定的名称
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
    }
    
    
    
    
//    可以定义多个消息队列对象,
    @Bean
    public Queue directQueue2(){
        return new Queue("direct_queue2");
    }

//    多个消息队列对象可以用同一台交换机绑定
    @Bean
    public Binding bindingDirect2(){
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("direct2");
    }

}

队列Queue与直连交换机DirectExchange创建后,还需要绑定他们之间的关系Binding,这样就可以通过交换机操作对应队列。

  1. 步骤④:使用AmqpTemplate操作RabbitMQ
@Service
public class MessageServiceRabbitmqDirectImpl implements MessageService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列(rabbitmq direct),id:"+id);
//        第一参数:要求传入一个交换机的名称
//        第二个参数:是routingKey的名称,即绑定消息队列对象与交换机关系的名称
//        第三个参数:要传的消息
        amqpTemplate.convertAndSend("directExchange","direct",id);

//        采用默认的交换机与默认的绑定关系
//        amqpTemplate.convertAndSend(id);
    }


//    一般消息都是自动执行的,所以不需要手动获取消息,直接使用listener
    @Override
    public String doMessage() {
        return null;
    }
}

amqp协议中的操作API接口名称看上去和jms规范的操作API接口很相似,但是传递参数差异很大。

  1. 步骤⑤:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
@Component
public class MessageListener {

//    定义指定监听的消息队列的名称
    @RabbitListener(queues = "direct_queue")
    public void receive(String id){
        System.out.println("已完成短信发送业务(rabbitmq direct),id:"+id);
    }

}
@Component
public class MessageListener2 {

//    可以配置多个监听器
//    如果有两组监听器,并且都监听同一个消息队列,它们会轮询处理
    @RabbitListener(queues = "direct_queue")
    public void receive(String id){
        System.out.println("已完成短信发送业务(rabbitmq direct two),id:"+id);
    }

}

使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。
在这里插入图片描述

springboot整合RabbitMq(topic模型)

在这里插入图片描述

  1. 步骤①:同上(direct模型)
  2. 步骤②:同上(direct模型)
  3. 步骤③:初始化主题模式系统设置
@Configuration //作为配置类
public class RabbitConfigTopic {


//   Queue的包选择 : import org.springframework.amqp.core.Queue;
//    定义一个存储消息的消息队列对象
    @Bean
    public Queue topicQueue(){

//        return new Queue("topic_queue",true,false,false);
//         第一个参数      指定消息队列对象的名称
//         第二个参数      durable:是否持久化,默认为true
//        第三个参数      exclusive:是否当前连接专用,默认为false,连接关闭后队列即被删除
//       第四个参数      autoDelete:是否自动删除,当生产者活消费者不适应此队列,自动删除,默认为false

        return new Queue("topic_queue");
    }


//    定义一个交换机对象
    @Bean
    public TopicExchange topicExchange(){
//          参数是自定义的交换机名称
        return new TopicExchange("topicExchange");
    }

//    把队列和交换机绑定在一起,绑定关系
    @Bean
    public Binding bindingTopic(){
//主题模式支持routingKey匹配模式,
// *表示匹配一个单词,#表示匹配任意内容,这样就可以通过主题交换机将消息分发到不同的队列中
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.*.id");
    }




//    可以定义多个消息队列对象,
    @Bean
    public Queue topicQueue2(){
        return new Queue("topic_queue2");
    }

//    多个消息队列对象可以用同一台交换机绑定
    @Bean
    public Binding bindingTopic2(){
//        主题模式支持routingKey匹配模式,*表示匹配一个单词,
//        #表示匹配任意内容,这样就可以通过主题交换机将消息分发到不同的队列中
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.orders.*");
    }

}

主题模式支持routingKey匹配模式,*表示匹配一个单词,#表示匹配任意内容,这样就可以通过主题交换机将消息分发到不同的队列中,详细内容请参看RabbitMQ系列课程。

匹配键topic.*.*topic.#
topic.order.idtruetrue
order.topic.idfalsefalse
topic.sm.order.idfalsetrue
topic.sm.idfalsetrue
topic.id.ordertruetrue
topic.idfalsetrue
topic.orderfalsetrue

步骤④:使用AmqpTemplate操作RabbitMQ

@Service
public class MessageServiceRabbitmqTopicImpl implements MessageService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列(rabbitmq topic),id:"+id);

//        第一参数:要求传入一个交换机的名称
//        第二个参数:是routingKey的名称,即绑定消息队列对象与交换机关系的名称
//        第三个参数:要传的消息
        amqpTemplate.convertAndSend("topicExchange","topic.orders.id",id);

//        采用默认的交换机与默认的绑定关系
//        amqpTemplate.convertAndSend(id);
    }


//    一般消息都是自动执行的,所以不需要手动获取消息,直接使用listener
    @Override
    public String doMessage() {
        return null;
    }
}

发送消息后,根据当前提供的routingKey与绑定交换机时设定的routingKey进行匹配,规则匹配成功消息才会进入到对应的队列中。

步骤⑤:使用消息监听器在服务器启动后,监听指定队列

@Component
public class MessageListener {

//    定义指定监听的消息队列的名称
    @RabbitListener(queues = "topic_queue")
    public void receive(String id){
        System.out.println("已完成短信发送业务(rabbitmq topic),id:"+id);
    }

    //    可以配置多个监听器
//    如果有两组监听器,并且都监听同一个消息队列,它们会轮询处理
    @RabbitListener(queues = "topic_queue2")
    public void receive2(String id){
        System.out.println("已完成短信发送业务(rabbitmq topic two),id:"+id);
    }
}

使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。
在这里插入图片描述

  1. 总结
      1. springboot整合RabbitMQ提供了AmqpTemplate对象作为客户端操作消息队列
      1. 操作ActiveMQ需要配置ActiveMQ服务器地址,默认端口5672
      1. 企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@RabbitListener
      1. RabbitMQ有5种消息模型,使用的队列相同,但是交换机不同。交换机不同,对应的消息进入的策略也不同

SpringBoot整合RocketMQ

  1. RocketMQ由阿里研发,后捐赠给apache基金会,目前是apache基金会顶级项目之一,也是目前市面上的MQ产品中较为流行的产品之一,它遵从AMQP协议。

RocketMQ安装

  1. windows版安装包下载地址:https://rocketmq.apache.org/
  2. 下载完毕后得到zip压缩文件,解压缩即可使用,解压后得到如下文件

在这里插入图片描述

  1. RocketMQ安装后需要配置环境变量,具体如下:
    • ROCKETMQ_HOME
    • PATH
    • NAMESRV_ADDR (建议): 127.0.0.1:9876

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
关于NAMESRV_ADDR对于初学者来说建议配置此项,也可以通过命令设置对应值,操作略显繁琐,建议配置。系统学习RocketMQ知识后即可灵活控制该项。

RocketMQ工作模式

  1. 在RocketMQ中,处理业务的服务器称为broker,生产者与消费者不是直接与broker联系的,而是通过命名服务器(NameServer)进行通信。broker启动后会通知命名服务器(NameServer)自己已经上线,这样命名服务器中就保存有所有的broker信息。当生产者与消费者需要连接broker时,通过命名服务器找到对应的处理业务的broker,因此命名服务器在整套结构中起到一个信息中心的作用。并且broker启动前必须保障命名服务器先启动

在这里插入图片描述

RocketMq启动服务器

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

mqnamesrv		# 启动命名服务器
mqbroker		# 启动broker

应该选择对应的.cmd启动

运行bin目录下的mqnamesrv命令即可启动命名服务器,默认对外服务端口9876。
运行bin目录下的mqbroker命令即可启动broker服务器,如果环境变量中没有设置NAMESRV_ADDR则需要在运行mqbroker指令前通过set指令设置NAMESRV_ADDR的值,并且每次开启均需要设置此项。

RocketMq测试服务器启动状态

  1. RocketMQ提供有一套测试服务器功能的测试程序,运行bin目录下的tools命令即可使用。
tools org.apache.rocketmq.example.quickstart.Producer		# 生产消息
tools org.apache.rocketmq.example.quickstart.Consumer		# 消费消息

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

整合RocketMQ(异步消息)

在这里插入图片描述

  1. 步骤①:导入springboot整合RocketMQ的starter,此坐标不由springboot维护版本
 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--rocketmq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.1</version>
        </dependency>

    </dependencies>
  1. 步骤②:配置RocketMQ的服务器地址
rocketmq:
  name-server: localhost:9876
  producer:
    group: group_rocketmq  # 设置默认的生产消费者所属组(自定义的名称)

设置默认的生产者消费者所属组group。

  1. 步骤③:使用RocketMQTemplate操作RocketMQ
@Service
public class MessageServiceRocketmqImpl implements MessageService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列(rocketmq),id:"+id);
        // rocketMQTemplate.convertAndSend("order_id",id); //同步消息
//        异步生产消息以后的回调方法
        SendCallback callback = new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
            //                消息发送成功要做什么
                System.out.println("消息发送成功");
            }
            @Override
            public void onException(Throwable e) {
            //                消息发送失败要做什么
                System.out.println("消息发送失败!!!!!");
            }
        };
        rocketMQTemplate.asyncSend("order_id",id,callback);//发送异步消息
    }
}

使用asyncSend方法发送异步消息。

  1. 步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
    • RocketMQ的监听器必须按照标准格式开发,实现RocketMQListener接口,泛型为消息类型。
    • 使用注解@RocketMQMessageListener定义当前类监听RabbitMQ中指定组、指定名称的消息队列。
@Component
// topic参数:你消费的信息来源与哪里(消息的生产者)
//consumerGroup参数:指定你是哪一个消费者组对应的信息(要和生产组名一致(yml文件配置的))
@RocketMQMessageListener(topic = "order_id",consumerGroup = "group_rocketmq")
public class MessageListener implements RocketMQListener<String> {//这里的泛型是指定消息的种类


//    后面接收到的消息都会传到参数id里面去
    @Override
    public void onMessage(String id) {
    //        这里面对消息进行处理
        System.out.println("已完成短信发送业务(rocketmq),id:"+id);
    }
}
  1. 总结
      1. springboot整合RocketMQ使用RocketMQTemplate对象作为客户端操作消息队列
      1. 操作RocketMQ需要配置RocketMQ服务器地址,默认端口9876
      1. 企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@RocketMQMessageListener

SpringBoot整合Kafka

Kafka的安装

  1. windows版安装包下载地址:https://kafka.apache.org/downloads
  2. 下载完毕后得到tgz压缩文件,使用解压缩软件解压缩即可使用,解压后得到如下文件

在这里插入图片描述
建议使用windows版2.8.1版本。

  1. 启动服务器
    • kafka服务器的功能相当于RocketMQ中的broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录中自带一个类似于命名服务器的工具,叫做zookeeper,它的作用是注册中心。
      • 运行bin目录下的windows目录下的zookeeper-server-start命令即可启动注册中心,默认对外服务端口2181。
      • 运行bin目录下的windows目录下的kafka-server-start命令即可启动kafka服务器,默认对外服务端口9092。
zookeeper-server-start.bat ..\..\config\zookeeper.properties		# 启动zookeeper
kafka-server-start.bat ..\..\config\server.properties				# 启动kafka
  1. 创建主题
    和之前操作其他MQ产品相似,kakfa也是基于主题操作,操作之前需要先初始化topic。
# 创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic itheima
# 查询topic
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list					
# 删除topic
kafka-topics.bat --delete --zookeeper localhost:2181 --topic itheima
  1. 测试服务器启动状态
  2. Kafka提供有一套测试服务器功能的测试程序,运行bin目录下的windows目录下的命令即可使用。
kafka-console-producer.bat --broker-list localhost:9092 --topic itheima							# 测试生产消息
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic itheima --from-beginning	# 测试消息消费

整合Kafka

  1. 步骤①:导入springboot整合Kafka的starter,此坐标由springboot维护版本
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  1. 步骤②:配置Kafka的服务器地址
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: order

​ 设置默认的生产者消费者所属组id。

  1. 步骤③:使用KafkaTemplate操作Kafka
@Service
public class MessageServiceKafkaImpl implements MessageService {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列(kafka),id:"+id);
        kafkaTemplate.send("itheima2022",id);
    }
}

使用send方法发送消息,需要传入topic名称。

  1. 步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
@Component
public class MessageListener {
    @KafkaListener(topics = "itheima2022")
    public void onMessage(ConsumerRecord<String,String> record){
        System.out.println("已完成短信发送业务(kafka),id:"+record.value());
    }
}

使用注解@KafkaListener定义当前方法监听Kafka中指定topic的消息,接收到的消息封装在对象ConsumerRecord中,获取数据从ConsumerRecord对象中获取即可。

  1. 总结
      1. springboot整合Kafka使用KafkaTemplate对象作为客户端操作消息队列
      1. 操作Kafka需要配置Kafka服务器地址,默认端口9092
      1. 企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@KafkaListener。接收消息保存在形参ConsumerRecord对象中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1372364.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言之三子棋小游戏的应用

文章目录 前言一、前期准备模块化设计 二、框架搭建三、游戏实现打印棋盘代码优化玩家下棋电脑下棋判断输赢 四、结束 前言 三子棋是一种民间传统游戏&#xff0c;又叫九宫棋、圈圈叉叉棋、一条龙、井字棋等。游戏分为双方对战&#xff0c;双方依次在9宫格棋盘上摆放棋子&#…

chatglm3的api调用

conda activate chatglm3 cd openai_api_demo python openai_api.py 启动ok&#xff0c;然后内网映射后 anaconda启动jupyter !pip install openai1.6.1 -i https://pypi.tuna.tsinghua.edu.cn/simple/ """ This script is an example of using the OpenAI …

uniapp自定义底部导航栏

1.新建 nav-custom.vue组件 <template><view class"nav-box" :style"{height:heightpx,background:bgColor}"><!-- 自定义导航栏 --><view class"status_bar" :style"{height:statusBarHeightpx}"><!-- u…

rke2 Offline Deploy Rancher v2.8.0 latest (helm 离线部署 rancher v2.8.0)

文章目录 预备条件为什么是三个节点&#xff1f;​预备条件配置私有仓库介质清单安装 helm安装 cert-manager下载介质镜像入库helm 部署卸载 安装 rancher镜像入库helm 安装 验证 预备条件 所有支持的操作系统都使用 64-bit x86 架构。Rancher 兼容当前所有的主流 Linux 发行版…

export default 和exprot

1.默认导入和默认导出 语法: export default {需要输出的内容} 接收: import 成员变量的名字 from 文件夹的路径 案例&#xff1a; a.mjs文件夹下默认导出 export default{a:10,b:20,show(){console.log(123);} } 在b.mjs文件中用成员变量进行接收 import AA from &q…

手撕单链表(单向,不循环,不带头结点)的基本操作

&#x1d649;&#x1d65e;&#x1d658;&#x1d65a;!!&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦ &#x1f44f;&#x1f3fb;‧✧̣̥̇:Solitary-walk ⸝⋆ ━━━┓ - 个性标签 - &#xff1a;来于“云”的“羽球人”。…

5,sharding-jdbc入门-sharding-jdbc广播表

执行sql #在数据库 user_db、order_db_1、order_db_2中均要建表 CREATE TABLE t_dict (dict_id BIGINT (20) NOT NULL COMMENT 字典id,type VARCHAR (50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 字典类型,code VARCHAR (50) CHARACTER SET utf8 COLLAT…

windows和liunx对比及Linux分类

windows一定比liunx差吗&#xff0c;这绝对是天大误解&#xff0c;不是说你常用的开始是liunx就代表windows差 windows和liunx对比 有人说Linux性能远高于Windows&#xff0c;这个笔者是不认可的&#xff0c;给Linux套上一个图形界面&#xff0c;你再使劲美化一下&#xff0c…

【论文精读】A Survey on Large Language Model based Autonomous Agents

A Survey on Large Language Model based Autonomous Agents 前言Abstract1 Introduction2 LLM-based Autonomous Agent Construction2.1 Agent Architecture Design2.1.1 Profiling Module2.1.2 Memory ModuleMemory StructuresMemory FormatsMemory Operations 2.1.3 Plannin…

Android Canvas图层saveLayer剪切clipPath原图addCircle绘制对应圆形区域并放大,Kotlin(3)

Android Canvas图层saveLayer剪切clipPath原图addCircle绘制对应圆形区域并放大&#xff0c;Kotlin&#xff08;3&#xff09; 在文章2 Android Canvas图层saveLayer剪切clipPath原图addCircle绘制对应圆形区域&#xff0c;Kotlin&#xff08;2&#xff09;-CSDN博客 的基础上&…

上海雏鸟科技无人机灯光秀跨年表演点亮三国五地夜空

2023年12月31日晚&#xff0c;五场别开生面的无人机灯光秀跨年表演在新加坡圣淘沙、印尼雅加达、中国江苏无锡、浙江衢州、陕西西安等五地同步举行。据悉&#xff0c;这5场表演背后均出自上海的一家无人机企业之手——上海雏鸟科技。 在新加坡圣淘沙西乐索海滩&#xff0c;500架…

【Python学习】Python学习11-元组

目录 【Python学习】Python学习11-元组 前言创建语法创建语法特殊形式访问元组操作元组元组运算符元组内置函数Python列表函数&方法参考 文章所属专区 Python学习 前言 本章节主要说明Python的Python 的元组与列表类似&#xff0c;不同之处在于元组的元素不能修改。通过小…

我的 AI 成长星球,邀请你加入

大家好啊&#xff0c;我是董董灿。 2023年终总结时我这个小白坚持写作一整年&#xff0c;赚了多少&#xff1f;提到了一点&#xff0c;2024希望自己创建一个免费星球。 其实一直就想弄一个高质量的 AI 知识交流平台&#xff0c;方便大家一起交流和学习&#xff0c;同时提高对 …

Python虚拟环境轻松配置:Jupyter Notebook中的内核管理指南

问题 在Python开发中&#xff0c;一些人在服务器上使用Jupyter Notebook中进行开发。一般是创建虚拟环境后&#xff0c;向Jupyter notebook中添加虚拟环境中的Kernel&#xff0c;后续新建Notebook中在该Kernel中进行开发&#xff0c;这里记录一下如何创建Python虚拟环境以及添…

关于Vue前端接口对接的思考

关于Vue前端接口对接的思考 目录概述需求&#xff1a; 设计思路实现思路分析1.vue 组件分类和获取数值的方式2.http 通信方式 分类 如何对接3.vue 组件分类和赋值方式&#xff0c; 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your p…

C/C++调用matlab

C/C调用matlab matlab虽然可以生成C/C的程序&#xff0c;但其能力很有限&#xff0c;很多操作无法生成C/C程序&#xff0c;比如函数求解、优化、拟合等。为了解决这个问题&#xff0c;可以采用matlab和C/C联合编程的方式进行。使用matlab将关键操作打包成dll环境&#xff0c;再…

仿蓝奏云网盘 /file/list SQL注入漏洞复现

0x01 产品简介 仿蓝奏网盘是一种类似于百度网盘的文件存储和共享解决方案。它为用户提供了一个便捷的平台,可以上传、存储和分享各种类型的文件,方便用户在不同设备之间进行文件传输和访问。 0x02 漏洞概述 仿蓝奏云网盘 /file/list接口处存在SQL注入漏洞,登录后台的攻击…

代码随想录day20 开始二叉搜索树

654.最大二叉树 题目 给定一个不含重复元素的整数数组。一个以此数组构建的最大二叉树定义如下&#xff1a; 二叉树的根是数组中的最大元素。左子树是通过数组中最大值左边部分构造出的最大二叉树。右子树是通过数组中最大值右边部分构造出的最大二叉树。 通过给定的数组构…

零基础学习数学建模——(二)数学建模的步骤

本篇博客将详细介绍数学建模的步骤。 文章目录 引例&#xff1a;年夜饭的准备第一步&#xff1a;模型准备第二步&#xff1a;模型假设第三步&#xff1a;模型建立第四步&#xff1a;模型求解第五步&#xff1a;结果分析第六步&#xff1a;模型检验第七步&#xff1a;模型应用及…

Kubernets(K8S)启动和运行01 快速入门

简介 Kubernetes is an open source orchestrator for deploying containerized applications. It was originally developed by Google, inspired by a decade of experience deploying scalable, reliable systems in containers via application-oriented APIs. Kubernete…