RabbitMQ-基础学习

news2025/1/17 18:04:37

在虚拟机上安装Erlang的GCC环境,装erlong,然后安装rabbitmq

参考:安装说明链接

安装web端面板

在这里插入图片描述

创建交换机

在这里插入图片描述

先学习一下工作模式(详细介绍可见官网)

在这里插入图片描述

上代码

1.Hello Word模式

在这里插入图片描述

写在测试类中:
Providucer

@Test
	void contextLoads()throws Exception {
		//1.创建链接
		ConnectionFactory factory = new ConnectionFactory();
		//2。设置参数
		factory.setHost("192.168.63.130");
		factory.setPort(5672);
		factory.setVirtualHost("/peng");
		factory.setUsername("peng");
		factory.setPassword("peng");
		//3.创建链接Connection
		Connection connection = factory.newConnection();

		//4.创建Channel
		Channel channel = connection.createChannel();
		//5.创建队列Queue
		//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
		/**
		 * 1.queue 队列名
		 * 2.durable 是否持久化
		 * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
		 * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
		 * 5.arguments:
		 * */

		channel.queueDeclare("peng",true,false,false,null);

		//6.发送消息
		//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
		/**
		 * 1.exchange:交换机名称
		 * 2.routingKey:路由名称
		 * 3.props:配置信息
		 * 4.body:发送的消息数据
		 */
		String body="第一个消息";
		channel.basicPublish("","peng",null,body.getBytes());
		//7.释放资源
		channel.close();
		connection.close();
	}

Consumer

@Test
	void contextLoads()throws Exception {
		//1.创建链接
		ConnectionFactory factory = new ConnectionFactory();
		//2。设置参数
		factory.setHost("192.168.63.130");
		factory.setPort(5672);
		factory.setVirtualHost("/peng");
		factory.setUsername("peng");
		factory.setPassword("peng");
		//3.创建链接Connection
		Connection connection = factory.newConnection();

		//4.创建Channel
		Channel channel = connection.createChannel();
		//5.创建队列Queue
		//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
		/**
		 * 1.queue 队列名
		 * 2.durable 是否持久化
		 * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
		 * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
		 * 5.arguments:
		 * */

		channel.queueDeclare("peng",true,false,false,null);
		//basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
		/**
		 * 1.queue:队列名
		 * 2.deliverCallback:是否自动确认收到
		 * 3.cancelCallback:回调对象
		 */
		Consumer consumer= new DefaultConsumer(channel){
			/**
			 * 1.consumerTag:
			 * 2.envelope:
			 * 3.properties:
			 * 4.body:
			 * @param consumerTag
			 * @param envelope
			 * @param properties
			 * @param body
			 * @throws IOException
			 */
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				System.out.println("consumerTag"+consumerTag);
				System.out.println("envelope"+envelope.getExchange());
				System.out.println("properties"+envelope.getRoutingKey());
				System.out.println("properties"+properties);
				System.out.println("body"+new String(body));
			}
		};
		channel.basicConsume("peng",true,consumer);
	}

2.Work Queues模式

在这里插入图片描述
生产者生产,两个消费者循环消费
P:

package com.providucer.factory;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @ClassName: Providucerfactory
 * @author: 鹏
 * @date: 2023/7/4 14:42
 */

public class ProvideFactory {
    public static void main(String[] args) throws Exception{
        //1.创建链接
        ConnectionFactory factory = new ConnectionFactory();
        //2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");
        //3.创建链接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();
        //5.创建队列Queue
        //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        /**
         * 1.queue 队列名
         * 2.durable 是否持久化
         * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
         * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
         * 5.arguments:
         * */
        channel.queueDeclare("pengwork",true,false,false,null);
        //6.发送消息
        //basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        /**
         * 1.exchange:交换机名称
         * 2.routingKey:路由名称
         * 3.props:配置信息
         * 4.body:发送的消息数据
         */
        for (int i = 1; i <= 10; i++) {
            String body="第"+i+"个消息";
            channel.basicPublish("","pengwork",null,body.getBytes());
        }
        //7.释放资源
        channel.close();
        connection.close();
    }
}

C1:

package com.consumer.factory;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: ConsumerFactory
 * @author: 鹏
 * @date: 2023/7/4 14:44
 */

public class ConsumerFactory {
    public static void main(String[] args)throws Exception {
        //1.创建链接
        ConnectionFactory factory = new ConnectionFactory();
        //2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");
        //3.创建链接Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();
        //5.创建队列Queue
        //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        /**
         * 1.queue 队列名
         * 2.durable 是否持久化
         * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
         * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
         * 5.arguments:
         * */

        channel.queueDeclare("pengwork",true,false,false,null);
        //basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
        /**
         * 1.queue:队列名
         * 2.deliverCallback:是否自动确认收到
         * 3.cancelCallback:回调对象
         */
        Consumer consumer= new DefaultConsumer(channel){
            /**
             * 1.consumerTag:
             * 2.envelope:
             * 3.properties:
             * 4.body:
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*                System.out.println("consumerTag"+consumerTag);
                System.out.println("envelope"+envelope.getExchange());
                System.out.println("properties"+envelope.getRoutingKey());
                System.out.println("properties"+properties);*/
                System.out.println("body"+new String(body));
            }
        };
        channel.basicConsume("pengwork",true,consumer);
    }
}

C2:

package com.consumer.factory;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: ConsumerFactory
 * @author: 鹏
 * @date: 2023/7/4 14:44
 */

public class ConsumerFactory1 {
    public static void main(String[] args)throws Exception {
        //1.创建链接
        ConnectionFactory factory = new ConnectionFactory();
        //2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");
        //3.创建链接Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();
        //5.创建队列Queue
        //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        /**
         * 1.queue 队列名
         * 2.durable 是否持久化
         * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
         * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
         * 5.arguments:
         * */

        channel.queueDeclare("pengwork",true,false,false,null);
        //basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
        /**
         * 1.queue:队列名
         * 2.deliverCallback:是否自动确认收到
         * 3.cancelCallback:回调对象
         */
        Consumer consumer= new DefaultConsumer(channel){
            /**
             * 1.consumerTag:
             * 2.envelope:
             * 3.properties:
             * 4.body:
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*                System.out.println("consumerTag"+consumerTag);
                System.out.println("envelope"+envelope.getExchange());
                System.out.println("properties"+envelope.getRoutingKey());
                System.out.println("properties"+properties);*/
                System.out.println("body"+new String(body));
            }
        };
        channel.basicConsume("pengwork",true,consumer);
    }
}

消费结果:
在这里插入图片描述
在这里插入图片描述

3.Publish/Subscribe订阅模式

在这里插入图片描述
在这里插入图片描述
消费着只需要绑定相应的队列,生产者需要创建交换机

public class PubFactory {
    public static void main(String[] args) throws Exception{
        //1.创建链接
        ConnectionFactory factory = new ConnectionFactory();
        //2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");
        //3.创建链接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();
        //5.创建交换机
        //exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
        String exchange="test_fanout";
        channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,
                true,false,false,null);
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        //1.exchange:交换机名称
        //2.type:交换机类型
        /**
         *        DIRECT("direct"),定向
         *         FANOUT("fanout"),扇形(广播)
         *         TOPIC("topic"),通配符方式
         *         HEADERS("headers")参数匹配
         */
        //3.durable:是否持久化
        //4.autoDelete:是福哦自动删除
        //5.internal: 内部使用一般用false
        //6.arguments: 参数

        //channel.exchangeDeclare();
        //6.创建队列
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7.绑定交换机与队列
        /**
         * 1.queue:队列名称
         * 2.exchange:交换机名称
         * 3.routingKey:路由键,绑定规则
         *      如果交换机的类型为fanout,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchange,"");
        channel.queueBind(queue2Name,exchange,"");
        String body="日志信息:接收成功";
        channel.basicPublish(exchange,"",null,body.getBytes());
        //8.释放资源
        channel.close();
        connection.close();
    }
}

4.Routing路由模式

路由模式相当于增加一层限制,只有通过相应的限制交换机才能将消息发布到对应的队列,也就是在发布的时候路由参数数设置值,且交换机类型必须为direct
channel.basicPublish(exchange,"error",null,body.getBytes());此处限制队列路由为error的可以发送


public class RoutingFactory {
    public static void main(String[] args) throws Exception{
        //1.创建链接
        ConnectionFactory factory = new ConnectionFactory();
        //2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");
        //3.创建链接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();
        //5.创建交换机
        //exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
        String exchange="test_direct";
        channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT,
                true,false,false,null);
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        //1.exchange:交换机名称
        //2.type:交换机类型
        /**
         *        DIRECT("direct"),定向
         *         FANOUT("fanout"),扇形(广播)
         *         TOPIC("topic"),通配符方式
         *         HEADERS("headers")参数匹配
         */
        //3.durable:是否持久化
        //4.autoDelete:是福哦自动删除
        //5.internal: 内部使用一般用false
        //6.arguments: 参数

        //channel.exchangeDeclare();
        //6.创建队列
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7.绑定交换机与队列
        /**
         * 1.queue:队列名称
         * 2.exchange:交换机名称
         * 3.routingKey:路由键,绑定规则
         *      如果交换机的类型为fanout,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchange,"error");
        channel.queueBind(queue2Name,exchange,"info");
        channel.queueBind(queue2Name,exchange,"error");
        channel.queueBind(queue2Name,exchange,"warming");
        String body="日志信息:接收成功";
        channel.basicPublish(exchange,"error",null,body.getBytes());
        //8.释放资源
        channel.close();
        connection.close();
    }
}

5. Topics模式

相对于routing在队列增加了匹配规则,让交换机发送与队列接受更加灵活*匹配一个单词,#匹配多个单词
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true,false,false,null);
设置为BuiltinExchangeType.TOPIC
在这里插入图片描述

public class TopicsFactory {
    public static void main(String[] args) throws Exception{
        //1.创建链接
        ConnectionFactory factory = new ConnectionFactory();
        //2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");
        //3.创建链接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();
        //5.创建交换机
        //exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
        String exchange="test_topic";
        channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,
                true,false,false,null);
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        //1.exchange:交换机名称
        //2.type:交换机类型
        /**
         *        DIRECT("direct"),定向
         *         FANOUT("fanout"),扇形(广播)
         *         TOPIC("topic"),通配符方式
         *         HEADERS("headers")参数匹配
         */
        //3.durable:是否持久化
        //4.autoDelete:是福哦自动删除
        //5.internal: 内部使用一般用false
        //6.arguments: 参数

        //channel.exchangeDeclare();
        //6.创建队列
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7.绑定交换机与队列
        /**
         * 1.queue:队列名称
         * 2.exchange:交换机名称
         * 3.routingKey:路由键,绑定规则
         *      如果交换机的类型为fanout,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchange,"*.*");
        channel.queueBind(queue2Name,exchange,"*.one");
        channel.queueBind(queue2Name,exchange,"*.two");
        channel.queueBind(queue2Name,exchange,"ok.*");
        String body="日志信息:接收成功";
        channel.basicPublish(exchange,"error",null,body.getBytes());
        channel.basicPublish(exchange,"123.one",null,body.getBytes());
        channel.basicPublish(exchange,"123.two",null,body.getBytes());
        //8.释放资源
        channel.close();
        connection.close();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/723995.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3 springboot更改tomcat的端口和启动时的banner

3.1 更改tomcat端口 点击resources下的application.properties。 然后&#xff0c;添加以下信息&#xff0c;即可把端口号更改为8081。 # 更改项目的端口号 server.port80813.2 更改启动时的banner 首先&#xff0c;进入网站&#xff1a;https://www.bootschool.net/ascii-art…

git切换账户问题

之前一直用另一个github账户提交代码 今天新创建了一个github账户 用这个账户git项目修改后&#xff0c;push时有问题 1 先执行下面命令&#xff0c;切换了用户 git config --local user.name “xxx” git config --local user.email “xxx” 执行 git config user.name 查看…

电脑端anconda的安装和配置

1.下载官网Anaconda | The World’s Most Popular Data Science Platform 1.1如果上述不行就去清华园源下载镜像Index of / 2.点击下载软件安装&#xff0c;按照图的安装步骤就可以了 安装完毕后点击next就可以了 3.测试是否安装配置成功 WINR键调出运行窗口&#xff0c;输入…

AI免费写作

随着科技的不断发展&#xff0c;人工智能(AI)正逐渐渗透进各个领域&#xff0c;包括以前我们认为只有人类才能胜任的创作型任务——写作。本文将通过深入浅出的方式&#xff0c;为大家剖析AI写作的具体运作机制&#xff0c;并结合案例&#xff0c;带大家一起探索AI写作的无穷可…

方向盘脱手检测原理及主流方案

随着高阶辅助驾驶逐渐普及&#xff0c;逐渐从驾驶员驾驶过渡到人机共驾最终到自动驾驶。而目前阶段受限于技术以及发规等&#xff0c;主要还是人机共驾&#xff0c;由于车辆是辅助人来进行驾驶&#xff0c;因此驾驶员还需要起到主要的监测作用&#xff0c;此时对驾驶员的监控变…

凝心聚力,奋楫启程—易我文化系列课《战略方向定位》讲座圆满举行

易我文化系列课自开课以来&#xff0c;受到了易我员工的一致好评和热烈欢迎。2023年6月20日&#xff0c;易我文化系列课再次发力&#xff0c;《战略方向定位》讲座如期举行&#xff0c;并且取得了圆满的成功。 本次讲座特别邀请易我总经理——万建华先生为大家授课&#xff0c…

Github Pages 快速搭建个人网站教程

官方教程&#xff1a;https://pages.github.com/ 1 创建仓库 命名为 你的名字.github.io 克隆项目 git clone https://github.com/username/username.github.io加入index.html页面 在克隆的项目中&#xff0c;加入一个index.html html文件简单写几个dom <!DOCTYPE html…

最牛,python接口自动化测试-fixtures固件使用详细(实战)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 全局设置请求头部…

2023年7月实时获取地图边界数据方法,省市区县街道多级联动【附实时geoJson数据下载】

首先&#xff0c;来看下效果图 在线体验地址&#xff1a;https://geojson.hxkj.vip&#xff0c;并提供实时geoJson数据文件下载 可下载的数据包含省级geojson行政边界数据、市级geojson行政边界数据、区/县级geojson行政边界数据、省市区县街道行政编码四级联动数据&#xff0…

@Data失效 Lombok使用与失效

Data失效 1注入pom </dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency>2下载插件&#xf…

极速冲浪:影视网站推荐

在快节奏的现代生活中&#xff0c;影视娱乐成为了人们放松和娱乐的重要方式。随着高速互联网的普及&#xff0c;极速冲浪在各种影视网站上成为了我们追逐电影、剧集和综艺节目的常态。这些影视网站不仅提供了海量的内容资源&#xff0c;还通过便捷的在线观看和下载功能&#xf…

Go语言程序设计(五)切片

一、切片的定义 在Go语言中,切片(Slice)是数组的一个引用,它会生成一个指向数组的指针,并通过切片长度关联到底层数组部分或者全部元素。切片还提供了一系列对数组的管理功能(append、copy)&#xff0c;可以随时动态扩充存储空间&#xff0c;并且可以被随意传递而不会导致所管理…

[RapidVideOCR周边] RapidVideOCR初级教程(界面版 下载解压即可使用)

引言 考虑到提取视频字幕的小伙伴大多不是程序员行当&#xff0c;为了降低使用门槛&#xff0c;特此推出界面版的RapidVideOCR Desktop.RapidVideOCR Desktop需要搭配VideoSubFinder使用。它们两个关系如下图所示&#xff1a; #mermaid-svg-keuknVOG1YkfjOkw {font-family:&qu…

在Linux中部署Ansible

Ansible是自动化运维工具&#xff0c;基于模块化工作&#xff0c;本身没有批量部署的能力。 Ansible只是提供一种框架&#xff0c;Ansible运行的模块才有批量部署的能力。 Ansible使用SSH协议对设备进行管理&#xff0c;只需在主控端部署Ansible环境&#xff0c;被控端无需做…

CSS+HTML实现元素定位

文章目录 相对定位 position: relative;总结 绝对定位 position: absolute;总结 固定定位 position: fixed;总结 z-index样式总结 数量角标透明度设置 opacity返回顶部 相对定位 position: relative; position: relative; 【示例代码&#xff1a;】 <!DOCTYPE html> &…

【广州华锐互动】智慧物流3D可视化数据分析展示平台

智慧物流3D可视化数据分析展示平台是一种基于数字孪生技术的新型物流管理工具&#xff0c;它可以帮助物流企业实现对物流过程的全面监控和管理。相比传统的物流管理方式&#xff0c;智慧物流3D可视化数据分析展示平台具有以下意义&#xff1a; 1.提高物流效率&#xff1a;智慧…

vue自定义菜单栏并循环便利使用

浅尝vue 前言&#xff1a; 在网上找了蛮多关于自定义表单对象进行循环处理&#xff0c;写的我都看的一脸懵&#xff0c;最后还是直接修改组件完善了&#xff0c;直接用v-for 进行循环绑定实现了。本例实现了自定义菜单栏和vue-router 路由指向菜单并进行路由跳转&#xff0c;主…

Idea打包Jar文件

https://blog.csdn.net/qq_35356840/article/details/98725948 注意这个坑&#xff1a;

html前端输入框模糊查询

1、一个页面内多个模糊查询情况&#xff1a; <!DOCTYPE html> <html> <head> <meta charset"UTF-8" /> <meta name"viewport" content"widthdevice-width, initial-scale1.0, user-scalable0, minimum-scale1.0, maximum-…

PHP中的变量

在PHP中变量是用于储存信息的容器&#xff0c;我们命令服务器去干活的时候&#xff0c;往往需要产生一些数据&#xff0c;需要临时性存放起来&#xff0c;方便取用赋值方法与数学中的代数相类似 1、在PHP中变量是用于储存信息的容器&#xff0c;类似于数学中的集合 2、赋值方法…