Rabbitmq的几种工作模式

news2025/1/21 5:57:19

工具类

public class RabbitMQConnection {
    public static Connection getConnection() throws Exception{
        //1.创建connectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.配置Host
        connectionFactory.setHost("127.0.0.1");
        //3.设置Port
        connectionFactory.setPort(5672);
        //4.设置账户和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //5.设置VirtualHost
        connectionFactory.setVirtualHost("0517");
        return connectionFactory.newConnection();
    }
}

点对点(简单)的队列

图解:

        

生产者代码
public class Producer {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        try(Connection connection = factory.newConnection(); Channel channel =
                connection.createChannel()) {
            /**
             * 生成一个队列
             * 1.队列名称
             * 2.队列里面的消息是否持久化 默认消息存储在内存中
             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
             * 5.其他参数
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String message="hello world";
            /**
             * 发送一个消息
             * 1.发送到那个交换机
             * 2.路由的 key 是哪个
             * 3.其他的参数信息
             * 4.发送消息的消息体
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送完毕");
        }
    }
}
消费者代码
public class Consumer {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("等待接收消息....");
        //推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String message= new String(delivery.getBody());
            System.out.println(message);
        };
        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3.消费者未成功消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
重点解析:点对点没什么可说的,就是生产者产生消息给到消息队列,第一次已推送的形式给到消费者,之后就是消费者端主动的拉取,需要在生产端创建好队列或在图形化页面创建好队列

工作(公平性)队列模式

图解:

       

生产者代码
public class Task01 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        try(Channel channel=RabbitMqUtils.getChannel();) {
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //从控制台当中接受信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("发送消息完成:"+message);
            }
        }
    }
}
消费者代码

消费者1:

public class Consumer1 {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C1 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

消费者2:

public class Consumer2 {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();

        //推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C2 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
重点解析
        工作(公平性)队列模式,和点对点差不多,就是生产者将消息直接存放到队列中,然后队列默认采用轮询的形式选择消费者进行消费

        当然也可以设置channel.basicQos(i)的形式进行公平分发(谁处理快,谁做的多)

        这里公平的意思是谁做的多,谁处理的多,并不是平均分配的意思

发布订阅模式

图解:

       

生产者代码
public class ProducerFanout {
    //定义交换机名称
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws  Exception{
        //创建连接
        Connection connection = RabbitMQConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //通道关联交换机(创建交换机)(fanout类型会自动创建)
        //channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
        //channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
        String msg = "程子强你好";
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        channel.close();
        connection.close();
    }
}
消费者代码

消费者1:

public class MailConsumer {
    /**
     * 定义邮件队列
     */
    private static final String QUEUE_NAME = "fanout_email_queue";
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws  Exception{
        System.out.println("邮件消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

消费者2:

public class SmsConsumer {
    /**
     * 定义短信队列
     */
    private static final String QUEUE_NAME = "fanout_email_sms";
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "fanout_exchange";
    public static void main(String[] args) throws Exception{
        System.out.println("短信消费者...");
    // 创建我们的连接
    Connection connection = RabbitMQConnection.getConnection();
    // 创建我们通道
    final Channel channel = connection.createChannel();
    // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String msg = new String(body, "UTF-8");
            System.out.println("短信消费者获取消息:" + msg);
        }
    };
    // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

}
}
重点解析
       发布订阅模式,和前两种模式不同这里用到了一个fanout类型的交换机(具体交换机的类型和概念小伙伴们可以自行查阅下,这里主要讲工作模式),生产者将消息发送给这个交换机,这个交换机把消息发送给每一个和其绑定的队列(注意fanout类型的交换机不需要key所以生产者传递直接传""就好)

路由模式Routing

图解:

       

生产者代码
public class ReceiveLogsDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] args) throws Exception {
        try (Connection connection = RabbitMQConnection.getConnection(); Channel channel =
                connection.createChannel()) {
            //创建交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);
            //创建多个 bindingKey
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("info","普通 info 信息");
            bindingKeyMap.put("warning","警告 warning 信息");
            bindingKeyMap.put("error","错误 error 信息");
            //debug 没有消费这接收这个消息 所有就丢失了
            bindingKeyMap.put("debug","调试 debug 信息");

            for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
                String bindingKey = bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
                        message.getBytes("UTF-8"));
                System.out.println("生产者发出消息:" + message);
            }
        }
    }
}
消费者代码

消费者1:

public class ReceiveLogsDirect01 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws  Exception{
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        //写不写都可以,如果代码创建在生产端写,如果是浏览器创建,就不需要写这段代码
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);
        String queueName = "disk";
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            message="接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;
            File file = new File("E:\\xxx\\rabbitmq_info.txt");//路径任意写
            FileUtils.writeStringToFile(file,message,"UTF-8");
            System.out.println("错误日志已经接收");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

消费者2:

public class ReceiveLogsDirect02 {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv) throws Exception {
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);
        String queueName = "console";
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}
重点解析
       路由模式,用到了direct类型的交换机,简单讲就是队列通过key和交换机进行绑定,生产者那边传入的key和消息给交换机,如果该队列绑定的key与其传入的key相同则,交换机讲该消息传给对应的队列,一个队列可以绑定多个key

通配符模式Topics(主题)

图解:

       

生产者代码
public class ProducerTopic {
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception{
        //  创建Connection
        Connection connection = RabbitMQConnection.getConnection();
        // 创建Channel
        Channel channel = connection.createChannel();
        // 通道关联交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
        String msg = "我是xxx";
        channel.basicPublish(EXCHANGE_NAME, "czq.hhh.aa", null, msg.getBytes());
        channel.close();
        connection.close();
    }
}
消费者代码

消费者1:

public class SmsConsumer {
    /**
     * 定义短信队列
     */
    private static final String QUEUE_NAME = "topic_sms_queue";
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception{
        System.out.println("短信消费者...");
        //  创建Connection
        Connection connection = RabbitMQConnection.getConnection();
        // 创建Channel
        Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "czq.#");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

消费者2:

public class MailConsumer {
    /**
     * 定义邮件队列
     */
    private static final String QUEUE_NAME = "topic_email_queue";
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception{
        System.out.println("邮件消费者...");
        //  创建Connection
        Connection connection = RabbitMQConnection.getConnection();
        // 创建Channel
        Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.boyatop.#");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}
重点解析
       通配符模式,用到了topic类型的交换机,简单讲与通配符模式原理大致,差别在于根据队列绑定的路由建模糊转发到具体的队列中存放。其中#号表示支持匹配多个词;*号表示只能匹配一个词,假如同一个队列与交换机直接设置的多个模糊的key都符合传入的,那么也只传送一次

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1976372.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

kafka从浅入深

一、什么是kafka&#xff1f; kafka本质上是一个消息队列MQ&#xff08;Message Queue&#xff09;&#xff0c;用做数据流转。 1.使用消息队列的好处&#xff1f; 1.1、解耦&#xff1a;允许独立扩展或修改队列两头的处理过程&#xff1b; 1.2、可恢复性&#xff1a;即使一个…

Unity 资源之 Break Items - Toon VFX破碎物品与卡通硬币动画分享

Unity 特效资源分享 - 破碎物品与卡通硬币动画 一、前言二&#xff0c;资源包内容三、免费获取资源包 一、前言 今天为大家带来一份超级实用的视觉特效资源分享&#xff01;我们精心整理了 6 个令人惊叹的破碎物品效果和 1 个萌趣十足的卡通硬币动画视觉特效&#xff0c;让您的…

编译和汇编的区别

一、编译 编译是将高级语言&#xff08;如C、C、Java等&#xff09;编写的源代码转换成计算机可以直接执行的低级语言&#xff08;通常是机器语言或汇编语言&#xff09;的过程 编译 —— 将人类可读的源代码转换为计算机可执行的指令集 编译过程 通常包括词法分析、语法分…

bootloader开发总结

bootloader开发总结 首先明白了BootLoader和应用程序之间跳转&#xff0c;就明白了大概。上电启动程序&#xff0c;会有一个程序入口&#xff0c;这个入口由0x33fff6(28335的)地址决定。 应用程序也会有一个启动入口&#xff0c;这个用户可以自己决定。 bin文件是高地址在前&a…

【数据结构】非线性表----二叉树详解

二叉树与普通的树的本质上的区别实际上只有一个——子结点的数量。 普通的树&#xff1a;任意数量的子结点 二叉树&#xff1a;只有两个子结点&#xff0c;也称为左孩子和右孩子结点。 二叉树一共有五种形态&#xff1a; 1.空二叉树。 2.只有一个根结点。 3.根结点只有左子树…

【OpenCV C++20 学习笔记】图像缩放-高斯金字塔

图像缩放-高斯金字塔 原理高斯金字塔 代码实现放大缩小形成金字塔 原理 在图像处理中&#xff0c;经常需要将图像转化成不同的尺寸&#xff0c;即放大或缩小。 除了直接用resize()函数重新设置图片尺寸&#xff0c;另一种常用的方法就是“图像金字塔”。 图像金字塔是从底层的…

vector的底层原理剖析及其实现

vector 一、定义二、常用接口及模拟实现三、vector迭代器失效问题四、使用memcpy拷贝会出现的问题五、二维数组vector<vector< T >> vv 一、定义 vector 是 C 标准模板库&#xff08;Standard Template Library, STL&#xff09;中的一个非常有用的容器。它是一个…

23款奔驰GLS450加装原厂电吸门配置,提升车辆舒适性和便利性

今天是一台22款奔驰GLS450&#xff0c;车主是佛山的 以前被不良商家坑了 装了副厂的电吸门 刚开始就很正常 用了半年之后 就开始开不了门&#xff0c;被锁在里面&#xff0c;刚开始车主以为是零件坏了 后来越来越频繁&#xff0c;本来是为了家里老人小孩关门方便而升级的&#…

J031_使用TCP协议支持与多个客户端同时通信

一、需求文档 使用TCP协议支持与多个客户端同时通信。 1.1 Client package com.itheima.tcp2;import java.io.DataOutputStream; import java.io.OutputStream; import java.net.Socket; import java.util.Scanner;public class Client {public static void main(String[] a…

软件设计之Java入门视频(22)

软件设计之Java入门视频(22) 视频教程来自B站尚硅谷&#xff1a; 尚硅谷Java入门视频教程&#xff0c;宋红康java基础视频 相关文件资料&#xff08;百度网盘&#xff09; 提取密码&#xff1a;8op3 idea 下载可以关注 软件管家 公众号 学习内容&#xff1a; 该视频共分为1-7…

Flask 介绍

Flask 介绍 为什么要学 Flask框架对比设计哲学功能特点适用场景学习曲线总结 Flask 的特点Flask 常用扩展包Flask 的基本组件Flask 的应用场景官方文档官方文档链接文档内容概述学习建议 Flask 是一个使用 Python 编写的轻量级 Web 应用框架。它旨在让 Web 开发变得快速、简单且…

ACl访问控制实验

要求&#xff1a;PC1可以telnet登录r1&#xff0c;不能ping通r1&#xff0c;pc1可以ping通r2&#xff0c;但不能telnet登录r2&#xff0c;pc2的所有限制与pc1相反 实验思路&#xff1a;因为华为的ensp默认允许所有&#xff0c;所以只写拒绝规则就行 rule 5 deny icmp source 19…

只需0.5秒 Stability AI新模型超快生成3D图像

生成式人工智能&#xff08;AI&#xff09;明星初创公司Stability AI 8月发布最新突破性3D模型Stable Fast 3D&#xff0c;将单张图片生成3D图像的速度大幅提升。Stability AI今年3月发布的3D模型SV3D需要多达10分钟生成3D资产&#xff0c;基于TripoSR的新模型Stable Fast 3D完…

【面试官:我看你SQL语句掌握的怎么样?面试SQL语句专题3】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

【教程】Python语言的地球科学常见数据—— IMS积雪覆盖数据的处理

将ASCII数据转化为netCDF数据、分析新疆北疆、青藏高原和东北地区气候态积雪分布、分析新疆北疆、青藏高原和东北地区积雪面积变化规律。 美国国家冰雪中心&#xff08;NSIDC&#xff09;从 1997 年 2 月至今的北半球雪盖和海冰的地图。这些数据以 ASCII 文本和 GeoTIFF 格式提…

AIWEB1综合靶场通关教程,从外网打到内网【附靶场环境】

前言 靶场获取后台回【aiweb1】 下载之后设置为nat模式 启动即可&#xff0c;不需要登录 靶机复现 主机发现 访问即可 信息收集robots.txt文件 访问尝试&#xff0c;原来是什么也没有的&#xff0c;404 我们去访问这个上级目录&#xff0c;发现有一个id 注入测试 语法错误&am…

基于Protobuf的RPC

先上UserServer提供服务的函数要求proto文件内容&#xff1a; syntax"proto3"; package fixbug; option cc_generic_servicestrue; message LoginRequest {bytes name1;bytes pwd2; } message LoginResponse {ResultCode result1;bool sucess2; } #调用远程服务的入…

JAVA游戏源码:跑酷|大学生练手项目

学习java朋友们&#xff0c;福利来了&#xff0c;今天小编给大家带来了一款跑酷源码。注意&#xff1a;此源码仅供学习使用!! 源码搭建和讲解 启动main入口&#xff1a; //************************************************************************ // ************完整源码…

AcWing食物链

Q1&#xff1a;怎么判断X和Y是不是同类? A:判断这俩是不是在一个集合中,如果在同一个集合中&#xff0c;那么判断X到祖先节点的距离D[X]和D[Y]到祖先节点的距离是否有D[X]%3D[Y]%3,也就是3同余 若果是&#xff0c;那么是同类。如果X和Y不在一个集合里面&#xff0c;那么把X和Y…

护网总结汇报PPT一键生成,还要啥售前工程师

【文末送&#xff1a;技战法】 干技术的&#xff0c;特别是干安服的&#xff0c;你让我日个站觉得没问题&#xff0c;你让我写个文档我挠挠头&#xff0c;抓抓背也能凑一篇&#xff0c;但是你要让我写个ppt&#xff0c;那我觉得你在为难我。 报告我都写好了&#xff0c;为啥还…