RabbitMQ 学习笔记

news2024/10/23 2:49:25

RabbitMQ学习笔记

 

一些概念

Broker :RabbitMQ服务。

virtual host: 其实就是分组。

Connection:连接,生产者消费者与Broker之间的TCP连接。

Channel:网络信道,轻量级的Connection,使用Channel可以减少Connection的建立,减少开销。

Message:消息,由 PropertiesBody组成,Properties可以对消息的优先级、延迟等特性进行记录,Body存储消息体的内容。

Exchange:交换机,没有消息存储功能,负责分发消息。

BindingExchangeQueue之间的虚拟连接,其中可以包含Routing Key

Routing Key:路由规则,用于确定如何分发、接收消息。

Queue:消息队列,保存消息并将其转发给消费者进行消费。

安装

Windows安装

安装erLang语言

进入官网

image-20220723085850289

 

 

下载完之后一直下一步安装即可,安装完成后进入目录,配置环境变量

image-20220723092150573

image-20220723092301127

安装RabbitMQ服务端

Release RabbitMQ 3.7.3 · rabbitmq/rabbitmq-server (github.com)

image-20220723091828280

一直下一步安装即可

安装完成后打开安装目录,进入到这个文件夹打开命令行

image-20220723093324568

输入命令安装插件

rabbitmq-plugins enable rabbitmq_management

完成后双击rabbitmq-server.bat

打开http://localhost:15672/

用户名密码是guest/guest

image-20220723093515104

image-20220723093550183

Linux下使用 Docker 安装

直接拉取最新版

docker pull rabbitmq

运行容器

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

进入容器

docker exec -it rabbitmq /bin/bash

开启管理插件

rabbitmq-plugins enable rabbitmq_management

image-20220723103556298

打开管理网站 http://localhost:15672/

4369, 25672 (Erlang发现&集群端口)

5672, 5671 (AMQP端口)

15672 (web管理后台端口)

61613, 61614 (STOMP协议端口)

1883, 8883 (MQTT协议端口)

用户名密码均为 guest

image-20220723103a414689

实操

官网例子

简单模式

11111

配置文件 application-easy.yml

spring:
  rabbitmq:
    host: 123.123.123.123
    port: 5672
    username: Gettler
    password: ********
    virtual-host: /
    queue: easy-queue

生产者:

package com.gettler.rabbitmq.easy;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("easy")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {
    // 队列名称
    @Value("${spring.rabbitmq.queue}")
    public String QUEUE_NAME;
    private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);

    @Test
    public void testProducer() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        /*
          创建一个队列
          1.队列名称
          2.队列里面的消息是否持久化(默认为false,代表消息存储在内存中)
          3.该队列是否只供一个消费者进行消费,是否进行共享(true表示可以多个消费者消费)
          4.表示最后一个消费者断开连接以后,该队列是否自动删除(true表示自动删除)
          5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello, this is an easy message";
        /*
          发送一个消息
          1.发送到那个交换机(空代表默认交换机)
          2.路由key
          3.其他的参数信息
          4.发送消息的消息体
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        logger.info("消息发送完毕");
    }
}

消费者:

package com.gettler.rabbitmq.easy;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("easy")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerTest {
    // 队列名称
    @Value("${spring.rabbitmq.queue}")
    public String QUEUE_NAME;
    private static final Logger logger = LoggerFactory.getLogger(ConsumerTest.class);

    @Test
    public void testConsumer() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 消费消息的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("消费消息成功,消息内容为:" + new String(message.getBody()));
        };
        // 取消消费的回调
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        /*
          消费者消费消息
          1.消费的队列名称
          2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)
          3.消费者消费消息的回调(函数式接口)
          4.消费者取消消费的回调(函数式接口)
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

image-20240620161232526

工作模式

在这里插入图片描述

配置文件 application-work.yml

spring:
  rabbitmq:
    host: 123.123.123.123
    port: 5672
    username: Gettler
    password: ********
    virtual-host: /
    queue: work-queue

生产者:

package com.gettler.rabbitmq.work;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Scanner;

@ActiveProfiles("work")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {
    // 队列名称
    @Value("${spring.rabbitmq.queue}")
    public String QUEUE_NAME;
    private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Test
    public void testProducer() throws Exception {
        System.out.println(this.host);
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("fanout", BuiltinExchangeType.FANOUT);
        /*
          创建一个队列
          1.队列名称
          2.队列里面的消息是否持久化(默认为false,代表消息存储在内存中)
          3.该队列是否只供一个消费者进行消费,是否进行共享(true表示可以多个消费者消费)
          4.表示最后一个消费者断开连接以后,该队列是否自动删除(true表示自动删除)
          5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            /*
              发送一个消息
              1.发送到那个交换机(空代表默认交换机)
              2.路由key
              3.其他的参数信息
              4.发送消息的消息体
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            logger.info("消息发送完毕");
        }
    }
}

消费者A:

package com.gettler.rabbitmq.work;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("work")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerATest {
    // 队列名称
    @Value("${spring.rabbitmq.queue}")
    public String QUEUE_NAME;
    private static final Logger logger = LoggerFactory.getLogger(ConsumerATest.class);

    @Test
    public void testConsumerA() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 消费消息的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("消费消息成功,消息内容为:" + new String(message.getBody()));
        };
        // 取消消费的回调
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        /*
          消费者消费消息
          1.消费的队列名称
          2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)
          3.消费者消费消息的回调(函数式接口)
          4.消费者取消消费的回调(函数式接口)
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

消费者B:

package com.gettler.rabbitmq.work;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("work")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerBTest {
    // 队列名称
    @Value("${spring.rabbitmq.queue}")
    public String QUEUE_NAME;
    private static final Logger logger = LoggerFactory.getLogger(ConsumerBTest.class);

    @Test
    public void testConsumerB() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 消费消息的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("消费消息成功,消息内容为:" + new String(message.getBody()));
        };
        // 取消消费的回调
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        /*
          消费者消费消息
          1.消费的队列名称
          2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)
          3.消费者消费消息的回调(函数式接口)
          4.消费者取消消费的回调(函数式接口)
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

image-20240620161656576

路由模式

配置文件 application-direct.yml

spring:
  rabbitmq:
    host: 123.123.123.123
    port: 5672
    username: Gettler
    password: ********
    virtual-host: /

生产者:

package com.gettler.rabbitmq.direct;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

@ActiveProfiles("direct")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {
    private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);


    @Test
    public void testProducer() throws Exception {
        // 创建channel
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("direct", BuiltinExchangeType.DIRECT);
        Map<String, String> messageMap = new HashMap<>();
        messageMap.put("info", "普通 info 信息");
        messageMap.put("warning", "警告 warning 信息");
        messageMap.put("error", "错误 error 信息");
        messageMap.put("debug", "调试 debug 信息");
        for (Map.Entry<String, String> mes : messageMap.entrySet()) {
            String routingKey = mes.getKey();
            String message = mes.getValue();
            channel.basicPublish("direct", routingKey, null, message.getBytes());
            logger.info("消息发送完毕");
        }
    }
}

消费者A:

package com.gettler.rabbitmq.direct;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("direct")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerATest {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerATest.class);

    @Test
    public void testConsumerA() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 创建channel
        // 声明交换机
        channel.exchangeDeclare("direct", BuiltinExchangeType.DIRECT);
        // 声明临时队列
        channel.queueDeclare("console", false, false, false, null);
        // 绑定队列与交换机
        channel.queueBind("console", "direct", "info");
        channel.queueBind("console", "direct", "warning");
        // 消费消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("获得消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        channel.basicConsume("console", true, deliverCallback, cancelCallback);
    }
}

消费者B:

package com.gettler.rabbitmq.direct;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("direct")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerBTest {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerBTest.class);

    @Test
    public void testConsumerB() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("direct", BuiltinExchangeType.DIRECT);
        // 声明临时队列
        channel.queueDeclare("disk", false, false, false, null);
        // 绑定队列与交换机
        channel.queueBind("disk", "direct", "error");
        // 消费消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("获得消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        channel.basicConsume("disk", true, deliverCallback, cancelCallback);
    }
}

image-20240620161838310

广播模式

配置文件 application-fanout.yml

spring:
  rabbitmq:
    host: 123.123.123.123
    port: 5672
    username: Gettler
    password: ********
    virtual-host: /

生产者:

package com.gettler.rabbitmq.fanout;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("fanout")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {
    private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);

    @Test
    public void testProducer() throws Exception {
        // 创建channel
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("fanout", BuiltinExchangeType.FANOUT);
        // 发送10条消息
        for (int i = 0; i < 10; i++) {
            String message = i + "";
            channel.basicPublish("fanout", "", null, message.getBytes());
            logger.info("消息发送完毕" + message);
        }
    }
}

消费者A:

package com.gettler.rabbitmq.fanout;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("fanout")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerATest {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerATest.class);

    @Test
    public void testConsumerA() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("fanout", BuiltinExchangeType.FANOUT);
        // 声明临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 绑定队列与交换机
        channel.queueBind(queueName, "fanout", "");
        // 消费消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("获得消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

消费者B:

package com.gettler.rabbitmq.fanout;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("fanout")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerBTest {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerBTest.class);

    @Test
    public void testConsumerB() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("fanout", BuiltinExchangeType.FANOUT);
        // 声明临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 绑定队列与交换机
        channel.queueBind(queueName, "fanout", "");
        // 消费消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("获得消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

image-20240620162526952

主题模式

配置文件 application-topic.yml

spring:
  rabbitmq:
    host: 123.123.123.123
    port: 5672
    username: Gettler
    password: ********
    virtual-host: /

生产者:

package com.gettler.rabbitmq.topic;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Gettler
 * @date 2024/06/13
 */
@ActiveProfiles("topic")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {
    private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);

    @Test
    public void testProducer() throws Exception {
        // 创建channel
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);
        Map<String, String> messageMap = new HashMap<>();
        messageMap.put("class1.DB.exam", "一班数据库考试通知");
        messageMap.put("class1.OS.exam", "一班操作系统考试通知");
        messageMap.put("class2.DB.exam", "二班数据库考试通知");
        messageMap.put("class2.OS.exam", "二班操作系统考试通知");
        for (Map.Entry<String, String> mes : messageMap.entrySet()) {
            String routingKey = mes.getKey();
            String message = mes.getValue();
            channel.basicPublish("topic", routingKey, null, message.getBytes());
            logger.info("消息发送完毕");
        }
    }
}

消费者A(模拟一班的学生):

package com.gettler.rabbitmq.topic;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author Gettler
 * @date 2024/06/13
 */
@ActiveProfiles("topic")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class StudentOfClass1Consumer {
    private static final Logger logger = LoggerFactory.getLogger(StudentOfClass1Consumer.class);

    @Test
    public void testStudentOfClass1Consumer() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);
        // 创建Q1队列
        channel.queueDeclare("student_of_class1", false, false, false, null);
        // 绑定队列与交换机
        channel.queueBind("student_of_class1", "topic", "class1.#");
        // 消费消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("获得消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            logger.info("消息消费被中断");
        };
        channel.basicConsume("student_of_class1", true, deliverCallback, cancelCallback);
    }
}

消费者B(模拟操作系统老师):

package com.gettler.rabbitmq.topic;

import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author Gettler
 * @date 2024/06/13
 */
@ActiveProfiles("topic")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =
        SpringBootTest.WebEnvironment.RANDOM_PORT)
public class TeacherConsumer {
    private static final Logger logger = LoggerFactory.getLogger(TeacherConsumer.class);

    @Test
    public void testTeacherConsumer() throws Exception {
        // 创建一个connection
        Connection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();
        // 创建一个channel
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);
        // 创建Q1队列
        channel.queueDeclare("teacher_of_OS", false, false, false, null);
        // 绑定队列与交换机
        channel.queueBind("teacher_of_OS", "topic", "#.OS.#");
        // 消费消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("获得消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断");
        };
        channel.basicConsume("teacher_of_OS", true, deliverCallback, cancelCallback);
    }
}

image-20240620162754734

谷粒商城 RabbitMQ 学习笔记

新建Maven项目

添加依赖

<dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.0.0</version>
</dependency>

编写发送端

package org.example;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Send  
{  
    //队列名称  
    private final static String QUEUE_NAME = "helloMQ";  
  
    public static void main(String[] argv) throws java.io.IOException, TimeoutException  
    {  
        /** 
         * 创建连接连接到MabbitMQ 
         */  
        ConnectionFactory factory = new ConnectionFactory();  
        //设置MabbitMQ所在主机ip或者主机名  
        factory.setHost("localhost");  
        //创建一个连接  
        Connection connection = factory.newConnection();  
        //创建一个频道  
        Channel channel = connection.createChannel();  
        //指定一个队列  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        //发送的消息  
        String message = "hello world!";  
        //往队列中发出一条消息  
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
        System.out.println(" [x] Sent '" + message + "'");  
        //关闭频道和连接  
        channel.close();  
        connection.close();  
     }  
}  

编写接收端

package org.example;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv {

    // 队列名称
    private final static String QUEUE_NAME = "helloMQ";

    public static void main(String[] argv) throws Exception {

        // 打开连接和创建频道,与发送端一样
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        //创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

运行接收端

image-20220723101156639

运行发送端,每运行一次发送一次消息

image-20220723101246973

管理网站上有接收端的连接(发送端发送后便断开连接了)

image-20220723101256826

添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写配置文件
spring.rabbitmq.host=192.168.3.200
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
创建Exchange
public void createExchange() {
    DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
    amqpAdmin.declareExchange(directExchange);
}
创建Queue
public void createQueue() {
    Queue queue = new Queue("hello-java-queue", true, false, false);
    amqpAdmin.declareQueue(queue);
}
连接Queue和Exchange
public void createBinding() {
    Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null);
    amqpAdmin.declareBinding(binding);
}
发送消息
public void sendMessage() {
    String msg = "hello world";
    List<String> s = new ArrayList<>();
    s.add(msg);
    s.add("List");
    rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", s, new CorrelationData(UUID.randomUUID().toString()));
}
接收消息

想要接受对象消息,需使用JSON序列化机制,进行消息转换

编写MyRabbitConfig配置类

@Configuration
public class MyRabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 使用JSON序列化机制,进行消息转换
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

使用RabbitListener注解监听队列,该注解参数可以是Object content, Message message, Channel channel。

@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Object message) {
    System.out.println("接受到消息内容:" + message);
}
可靠抵达

编写配置文件

# 开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
# 抵达队列后以异步发送优先回调抵达队列后的回调returnconfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

将MyRabbitConfig修改为

@Configuration
public class MyRabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 使用JSON序列化机制,进行消息转换
     *
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @PostConstruct // MyRabbitConfig对象创建完成后执行该方法
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 消息抵达节点的话ack就为true
             * @param correlationData   当前消息的唯一关联数据(消息唯一ID)
             * @param ack 消息是否成功收到
             * @param cause 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirming...correlationData{" + correlationData + "},ack{" + ack + "},cause{" + cause + "}");
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 没抵达队列,触发这个失败回调函数
             * @param message
             * @param replyCode
             * @param replyText
             * @param exchange
             * @param routingKey
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("Unreachable...message{" + message + "},replyCode{" + replyText + "},exchange{" + exchange + "},routingKey{" + routingKey + "}");
            }
        });
    }
}

监听队列方法修改为

@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, List list, Channel channel) throws IOException {
    System.out.println("接受到消息内容:" + list);
    // channel内按顺序递增
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    System.out.println(deliveryTag);
    // 签收
    try {
        channel.basicAck(deliveryTag, false); // 是否批量签收
    } catch (Exception e) {
        // 网络中断
        // b1 = false 丢弃, b1 = true 发回服务器,服务器重新入队。
        channel.basicNack(deliveryTag, false, false);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1843630.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SpringBoot+Vue北部湾地区助农平台设计和实现(源码+LW+调试文档+讲解等)

&#x1f497;博主介绍&#xff1a;✌全网粉丝1W,CSDN作者、博客专家、全栈领域优质创作者&#xff0c;博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f31f;文末获取源码数据库&#x1f31f; 感兴趣的可以先收藏起来&#xff0c;还…

Centos SFTP搭建

SFTP配置、连接及挂载教程_sftp连接-CSDN博客1、确认是否安装yum list installed | grep openssh-server 2、创建用户和组 sudo groupadd tksftpgroup sudo useradd -g tksftpgroup -d /home/www/tk_data -s /sbin/nologin tksftp01 sudo passwd tksftp013. 配置SFTP注意&a…

【ElasticSearch】ElasticSearch基本概念

ES 是一个开源的高扩展的分布式全文检索引擎&#xff0c;它是对开源库 Luence 的封装&#xff0c;提供 REST API 接口 MySQL 更适合数据的存储和关系管理&#xff0c;即 CRUD&#xff1b;而 ES 更适合做海量数据的检索和分析&#xff0c;它可以秒级地从数据库中检索出我们感兴…

数据结构和算法之复杂度比较

数据结构和算法之复杂度比较 参考如下网址&#xff1a;https://www.bigocheatsheet.com/ 方便快速查询 1. 复杂度比较 2. 常见数据结构复杂度 3. 常见算法复杂度

计网课设-发送TCP数据包

一、效果展示 二、代码实现 import nmap import socket import tkinter as tk from tkinter import messagebox,Listbox from threading import Thread#获取自身IP&#xff0c;从而确定当前局域网范围 def get_ip_address():#创建了一个socket对象&#xff0c;socket.AF_INET表…

计算机网络:应用层 - 文件传输协议 FTP 电子邮件

计算机网络&#xff1a;应用层 - 文件传输协议 FTP & 电子邮件 文件传输协议 FTP电子邮件 文件传输协议 FTP 文件传送协议 FTP(File Transfer Protocol)&#xff0c;曾是互联网祝频讲解上使用得最广泛的文件传送协议。 其特点是&#xff1a;若要存取一个文件&#xff0c;…

【ARMv8/v9 GIC 系列 3 -- GIC 的 类型寄存器 GICD_TYPER】

文章目录 GIC 类型寄存器 GICD_TYPERESPI_Range, 位[31:27]RSS, 位[26]No1N, 位[25]A3V, 位[24]IDBits, 位[23:19]DVIS, 位[18]LPIs, 位[17]MBIS, 位[16]NUM_LPIs, 位[15:11]SecurityExtn, 位[10]NMI, 位[9]ESPI, 位[8]CPUNumber, 位[7:5]ITLinesNumber, 位[4:0]GIC 类型寄存器…

嵌入式实验---实验四 DMA传输实验

一、实验目的 1、掌握STM32F103DMA传输程序设计流程&#xff1b; 2、熟悉STM32固件库的基本使用。 二、实验原理 1、利用外部按键KEY1来控制DMA的传送&#xff0c;每按一次KEY1&#xff0c;DMA就传送一次数据到USART1&#xff08;串口1&#xff09;&#xff1b; 2、该串口…

SAP BC OBB8 自解释字段50个字符加到100个字符的长度

开整 SE11 复制TEXT1_052 -> ZTEXT1_052 并把域 改成TEXT100 se11 修改T052 激活 报错了&#xff0c;是个视图的问题 参考 SAP COEP V_COEP列不一致的问题及处理_sap coep表报错-CSDN博客 更新一下 再激活成功了 但是OBB8 保存的还是50个字符长度 &#xff0c;中…

Ollma本地大模型沉浸式翻译【403报错解决】

最终效果 通过Chrome的 沉浸式翻译 插件&#xff0c;用OpenAI通用接口调用本地的Ollma上的模型&#xff0c;实现本地的大模型翻译文献。 官方文档指导的Ollama的配置&#xff1a;一定要配置环境变量&#xff0c;否则会出现【403报错】

【Unity设计模式】状态编程模式

前言 最近在学习Unity游戏设计模式&#xff0c;看到两本比较适合入门的书&#xff0c;一本是unity官方的 《Level up your programming with game programming patterns》 ,另一本是 《游戏编程模式》 这两本书介绍了大部分会使用到的设计模式&#xff0c;因此很值得学习 本…

PXE高效批量网络装机(补充) 实验部分

然后把防火墙、安全机制全都给关闭掉&#xff0c;不要让它们干扰后续的实验&#xff1a; 然后安装那几个需要用到的软件包&#xff1a; 如果重启了系统vsftpd是不能自动启动起来的&#xff0c;如果想让该服务每次开机都自动的启动起来&#xff0c;可以执行下图中的命令&#xf…

Python学习笔记15:进阶篇(四)文件的读写。

文件操作 学习编程操作中&#xff0c;我觉得文件操作是必不可少的一部分。不管是读书的时候学习的c&#xff0c;c&#xff0c;工作的前学的java&#xff0c;现在学的Python&#xff0c;没学过的php和go&#xff0c;都有文件操作的模块以及库的支持&#xff0c;重要性毫无疑问。…

eNSP学习——OSPF在帧中继网络中的配置

目录 主要命令 原理概述 实验目的 实验场景 实验拓扑 实验编址 实验步骤 1、基本配置 2、在帧中继上搭建OSPF网络 主要命令 //检查帧中继的虚电路状态 display fr pvc-info//检查帧中继的映射表 display fr map-info//手工指定OSPF邻居,采用单播方式发送报文 [R1]os…

这些帮助你成长的IOS应用,建议收藏

TrackIt TrackIt是一款功能丰富的任务清单、日程管理和习惯打卡应用&#xff0c;旨在帮助用户提高效率和专注力。通过这些功能&#xff0c;用户可以更好地规划时间和任务&#xff0c;从而实现个人目标和养成良好习惯。 在目标设定方面&#xff0c;SMART原则是一个常用的方法&a…

首个AI高考评测结果出炉,GPT-4o排名第二

近日&#xff0c;上海人工智能实验室利用其自主研发的“司南”评测体系OpenCompass&#xff0c;对国内外多个知名大模型进行了一场特殊的“高考”。这些来自阿里巴巴、智谱AI、Mistral等机构&#xff0c;以及OpenAI的GPT-4o等“考生”&#xff0c;接受了新课标I卷“语数外”的全…

wins系统资源监视器任务管理器运行监控CPU、内存、磁盘、网络运行状态

目录 1.Windows系统资源监视器的详细介绍2.通过任务管理器打开资源监视器3.任务管理中总体观察观察cpu、pid、应用程序、I/O次数或者说读写字节数 4.观察CPU观察cpu核心数&#xff0c;以及哪些占用cpu频率过高 5.观察内存观察各个应用占用的内存大小和对应线程 6.观察磁盘活动观…

hrome插件: JSONView 插件让你告别数据混乱!

在现代网页开发中&#xff0c;处理和查看JSON数据已经成为日常工作的一部分。对于开发者来说&#xff0c;如何快速、方便地查看和调试JSON数据显得尤为重要。正是在这样的背景下&#xff0c;JSONView插件应运而生&#xff0c;成为开发者们的得力助手&#xff0c;今天咱们来聊聊…

【机器学习】机器的登神长阶——AIGC

目录 什么是AIGC 普通用户接触AIGC网站推荐 通义千问 白马 普通用户如何用好AIGC 关键提示词的作用 AIGC的影响 就业市场&#xff1a; 教育领域&#xff1a; 创意产业&#xff1a; 经济活动&#xff1a; 社交媒体与信息传播&#xff1a; AIGC面临的挑战 什么是AIGC…

板凳-------unix 网络编程 卷1-1简介

unix网络编程进程通信 unpipc.h https://blog.csdn.net/u010527630/article/details/33814377?spm1001.2014.3001.5502 订阅专栏 1>解压源码unpv22e.tar.gz。 $tar zxvf unpv22e.tar.gz //这样源码就被解压到当前的目录下了 2>运行configure脚本&#xff0c;以生成正确…