RabbitMQ六种工作模式01

news2025/1/22 12:26:37

01: Work Queue工作队列模式

 

 

 

 

 

 

 

 

//接口所有的属性都是静态常量属性
public interface RabbitContent {

    //队列
    String QEUEU_HELLO = "hello";
    String QUEUE_WORKING ="working";
    String QUEUE_BAIDU ="baidu";
    String QUEUE_SINA ="sina";


    //交换机
    String CHANGE_WEATHER="weather";
    String CHANGE_ROUTING="weather_routing";
    String CHANGE_TOPIC="weather_topic";
}

 SMS:

public class SMS implements Serializable {
    private static final long serialVersionUID = 3185271845751784313L;
    private String name;
    private String phone;
    private String content;
//+  get/set  无参带参构造方法
}
WorkProducer:
package cpm.pb.working;


import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import cpm.pb.test.utils.RabbitContent;
import cpm.pb.test.utils.RabbitUtils;

import java.io.IOException;

public class WorkProducer {

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(RabbitContent.QUEUE_WORKING, false, false, false, null);

        //在这里面我们共发送200条消息
        for(int i = 1; i <=200; i++) {
            SMS  sms = new SMS("admin"+i, "12345678976"+i, "购票成功");
            //把对象转换成字符串
            //JSONlib   fastJson   Jankson, gson
            String s = new Gson().toJson(sms);
            //System.out.println(s);
            //发送消息
            channel.basicPublish("", RabbitContent.QUEUE_WORKING, null, s.getBytes());
        }
        System.out.println("消息发送成功...");
        channel.close();
        connection.close();
    }
}

运行:

 消费者1:

SMSOrder1:

public class SMSOrder1 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(RabbitContent.QUEUE_WORKING, false, false, false, null);

        channel.basicQos(1);    //排队


        channel.basicConsume(RabbitContent.QUEUE_WORKING, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("SMSOrder1收到消息: " + new String(body));

                //签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

    }
}

赋值三个  三个消费者  SMSOrder1:  SMSOrder2: SMSOrder3

分别运行:

 让每个消费者  做延迟:

 try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);

每个消费者会消费不同的消息:

发布(publish)/订阅(Subscribe)模式:

 

 

 

 

 

Weather.Java:

/**
 * 气象局[供应商和应用商操作]
 */
public class Weather {
    public static void main(String[] args) throws Exception {
        //这里我们不创建队列
        //只创建交换机
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        //创建Fanout类型的交换机
        channel.exchangeDeclare(RabbitContent.CHANGE_WEATHER, BuiltinExchangeType.FANOUT,false,false,false, null);

        //发送消息我们把数据送入先关的交换机
        channel.basicPublish(RabbitContent.CHANGE_WEATHER, "",null, "北京38°晴转阴".getBytes());

        channel.close();
        connection.close();
    }
}

 Baidu.java:

public class Baidu {
    public static void main(String[] args) throws Exception {
        //这里我们不创建队列
        //只创建交换机
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(RabbitContent.QUEUE_BAIDU,false,false, false, null);

        //交换机和队列进行绑定*****************
        channel.queueBind(RabbitContent.QUEUE_BAIDU, RabbitContent.CHANGE_WEATHER,"");
        channel.basicConsume(RabbitContent.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到消息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

Sina.java:

public class Sina {
    public static void main(String[] args) throws Exception {
        //这里我们不创建队列
        //只创建交换机
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(RabbitContent.QUEUE_SINA,false,false, false, null);

        //交换机和队列进行绑定
        channel.queueBind(RabbitContent.QUEUE_SINA, RabbitContent.CHANGE_WEATHER, "");

        channel.basicConsume(RabbitContent.QUEUE_SINA, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("新浪收到消息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

 路由Routing模式:

 

 

 

 手动添加一个交换机

 

 Baidu.java:

public class Baidu {
    public static void main(String[] args) throws Exception {
        //这里我们不创建队列
        //只创建交换机
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(RabbitContent.QUEUE_BAIDU,false,false, false, null);

        channel.queueBind(RabbitContent.QUEUE_BAIDU, RabbitContent.CHANGE_ROUTING, "china.henan.zhengzhou.20991011");

        channel.basicConsume(RabbitContent.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到了消息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

Sina.java:

public class Sina {
    public static void main(String[] args) throws Exception {
        //这里我们不创建队列
        //只创建交换机
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(RabbitContent.QUEUE_SINA,false,false, false, null);

        channel.queueBind(RabbitContent.QUEUE_SINA, RabbitContent.CHANGE_ROUTING, "us.cal.la.20991012");

        channel.basicConsume(RabbitContent.QUEUE_SINA, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("新浪收到了消息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

 

 主题Topic模式(常用):

 

 

 TopicProducer.java:

public class TopicProducer {
    public static void main(String[] args) throws Exception {
        //这里我们不创建队列
        //只创建交换机
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        Map<String, String> area = new LinkedHashMap<String, String>();
        area.put("china.hebei.shijiazhuang.20991011", "中国河北石家庄20991011天气数据");
        area.put("china.shandong.qingdao.20991011", "中国山东青岛20991011天气数据");
        area.put("china.henan.zhengzhou.20991011", "中国河南郑州20991011天气数据");
        area.put("us.cal.la.20991011", "美国加州洛杉矶20991011天气数据");

        area.put("china.hebei.shijiazhuang.20991012", "中国河北石家庄20991012天气数据");
        area.put("china.shandong.qingdao.20991012", "中国山东青岛20991012天气数据");
        area.put("china.henan.zhengzhou.20991012", "中国河南郑州20991012天气数据");
        area.put("us.cal.la.20991012", "美国加州洛杉矶20991012天气数据");


        //发送数据
        Iterator<Map.Entry<String, String>> iterator = area.entrySet().iterator();
        while(iterator.hasNext()) {
            Map.Entry<String, String> next = iterator.next();
            //发送数据
            channel.basicPublish(RabbitContent.CHANGE_TOPIC, next.getKey(), true, null, next.getValue().getBytes());
        }

        System.out.println("消息发送成功");


        //channel.close();
        //connection.close();

    }

}

Baidu.java:

public class Baidu {
    public static void main(String[] args) throws Exception {
        //这里我们不创建队列
        //只创建交换机
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(RabbitContent.QUEUE_BAIDU,false,false, false, null);

        channel.queueBind(RabbitContent.QUEUE_BAIDU, RabbitContent.CHANGE_TOPIC, "china.#");

        channel.basicConsume(RabbitContent.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到了消息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

Sina.java:

public class Sina {
    public static void main(String[] args) throws Exception {
        //这里我们不创建队列
        //只创建交换机
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(RabbitContent.QUEUE_SINA,false,false, false, null);

        channel.queueBind(RabbitContent.QUEUE_SINA, RabbitContent.CHANGE_TOPIC, "us.*.*.*");

        channel.basicConsume(RabbitContent.QUEUE_SINA, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("新浪收到了消息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });

    }
}

 MQ的消息确认机制 Confirm&return 与消费者没有关系

 

 

 

 TopicProducer.java:

public class TopicProducer {
    public static void main(String[] args) throws Exception {
        //这里我们不创建队列
        //只创建交换机
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        Map<String, String> area = new LinkedHashMap<String, String>();
        area.put("china.hebei.shijiazhuang.20991011", "中国河北石家庄20991011天气数据");
        area.put("china.shandong.qingdao.20991011", "中国山东青岛20991011天气数据");
        area.put("china.henan.zhengzhou.20991011", "中国河南郑州20991011天气数据");
        area.put("us.cal.la.20991011", "美国加州洛杉矶20991011天气数据");

        area.put("china.hebei.shijiazhuang.20991012", "中国河北石家庄20991012天气数据");
        area.put("china.shandong.qingdao.20991012", "中国山东青岛20991012天气数据");
        area.put("china.henan.zhengzhou.20991012", "中国河南郑州20991012天气数据");
        area.put("us.cal.la.20991012", "美国加州洛杉矶20991012天气数据");

        channel.confirmSelect();  //开启监听事件

        channel.addConfirmListener(new ConfirmListener() {

            //消息被mq接受,状态
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息被mq接受:" + deliveryTag + "-----" + multiple);
            }

            //消息被mq举手的状态
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息被mq拒收:" + deliveryTag + "-----" + multiple);
            }
        });


        //消息返回操作
        channel.addReturnListener(new ReturnCallback() {
            @Override
            public void handle(Return msg) {
                System.out.println("返回消息的状态吗" + msg.getReplyCode());
                System.out.println("返回消息的文本信息" + msg.getReplyText());
                System.out.println("返回消息的交换机" + msg.getExchange());
                System.out.println("返回消息的路由" + msg.getRoutingKey());
                System.out.println("返回消息为" + new String(msg.getBody()));
            }
        });

        //发送数据
        Iterator<Map.Entry<String, String>> iterator = area.entrySet().iterator();
        while(iterator.hasNext()) {
            Map.Entry<String, String> next = iterator.next();
            //发送数据
            channel.basicPublish(RabbitContent.CHANGE_TOPIC, next.getKey(), true, null, next.getValue().getBytes());
        }

        System.out.println("消息发送成功");


        //channel.close();
        //connection.close();

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/584451.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【操作系统笔记】南京大学jyy老师

系列综述&#xff1a; &#x1f49e;目的&#xff1a;本系列是个人整理为了操作系统学习&#xff0c;整理期间苛求每个知识点&#xff0c;平衡理解简易度与深入程度。 &#x1f970;来源&#xff1a;材料主要源于南京大学操作系统jyy老师课程进行的&#xff0c;每个知识点的修正…

Windows系统内核溢出漏洞提权

目录 Windows内核溢出漏洞原理 溢出漏洞简介 什么是缓冲区 缓冲区溢出 缓冲区溢出目的 Windows内核溢出漏洞利用流程 提权实战思路 手工提权测试 辅助提权 EXP如何寻找 使用MSF提权 关于提权时可能遇到的问题 如果提权的时候发现无法执行命令的话&#xff0c;可以上…

零售EDI:如何与Transgourmet 建立EDI连接?

Transgourmet是一家总部位于法国的批发和供应商公司&#xff0c;为酒店、餐馆和快餐行业提供食品和非食品产品。在欧洲拥有广泛的市场覆盖&#xff0c;经营范围涵盖法国、德国、奥地利、波兰、罗马尼亚和瑞士等国家。 Transgourmet EDI 需求分析 1.传输协议 Transgourmet选择…

一文看懂企业性能测试,指标解析+代码演示,简洁易懂!

目录 前言&#xff1a; 一、性能测试流程简介 二、性能测试指标分析 三、性能测试代码演示 四、性能测试结论 五、总结 前言&#xff1a; 性能测试在企业应用中是非常重要的一环&#xff0c;它可以帮助企业对自身的应用和系统进行全面评估&#xff0c;提高其性能、稳定性…

simbertmilvus实现相似句检索

朋友们&#xff0c;simbert模型是一个较好的相似句检索模型&#xff0c;但是在大规模检索中&#xff0c;需要实现快速检索&#xff0c;这个时候离不开milvus等向量检索库&#xff0c;下面用实际代码来讲一下simbert之milvus应用。 import numpy as np from bert4keras.backen…

addr2line 使用,定位kernel panic 代码位置

在kernel崩溃时&#xff0c;方便定位代码。 需要打开kernel配置CONFIG_DEBUG_INFO。 需要有System.map和vmlinux文件&#xff0c;一般在out目录。 一般panic的时候会有给出panic的指针&#xff0c;如下down_write。 el1_data说明发生异常了&#xff0c;进入和entry.S文件&a…

视频转换、视频压缩、录屏等工具合集:迅捷视频工具箱

这是一款功能强大的视频处理软件&#xff0c;提供了多种视频处理功能。可以使用该软件进行视频剪辑、视频转换、音频转换、视频录像、视频压缩、字幕贴图等多种操作。软件界面简洁易用&#xff0c;操作方便&#xff0c;可以满足各种视频处理需求。 基本功能 视频压缩&#xff…

华为设备这14个广域网命令,值得网工收藏

华为设备广域网命令是网络管理员在运维过程中常用的一类命令。该命令集涵盖了DCC配置命令、PPP配置命令、MP配置命令、PPPoE命令、ATM配置命令、帧中继配置命令、HDLC配置命令、LAPB配置命令、X.25配置命令、IP-Trunk配置命令、ISDN配置命令、Modem配置命令、RTC终端接入配置命…

商场楼层索引图怎么做?商场内部地图导航怎么做?

商场内部地图导航怎么做&#xff1f;最近&#xff0c;某论坛上有一个帖子&#xff0c;主题是谈谈“逛商场中最糗的事情”&#xff0c;网友们纷纷跟帖回应&#xff0c;讲述自己在商场里遇到的尴尬&#xff0c;从在停车场找车如何困难&#xff0c;还有在商场里“迷路”的经历………

解决Ubuntu系统/usr/lib/xorg/Xorg占用显卡内存问题

问题描述&#xff1a; 服务器新安装的Ubuntu系统&#xff0c;开机默认/usr/lib/xorg/Xorg线程会占用显卡内存&#xff0c;占用内存大小为4Mb&#xff0c;虽然占用量不大&#xff0c;但是对于强迫症患者来说实在太不友好&#xff01; 解决方法&#xff1a;将xorg的线程移动到集…

为什么我们需要API接口?API接口的核心又是什么?

API&#xff08;Application Programming Interface&#xff09;是一种连接不同软件之间的标准化的接口&#xff0c;可以让不同软件间进行数据交互和通信。API接口的作用很多&#xff0c;以下是几个主要的原因&#xff1a; 1.提高软件系统的灵活性和可扩展性。API接口可以将不…

大数据 | Hadoop HA高可用搭建保姆级教程(大二学长的万字笔记)

知识目录 一、写在前面&#x1f388;二、集群准备&#x1f35f;2.1 集群规划2.2 集群解释 三、说明&#x1f511;3.1 主机名说明3.2 用户名说明3.3 操作目录说明3.3 必要工具说明 四、上传资料&#x1f335;4.1 资料准备4.2 脚本准备4.3 配置文件准备 五、解压与修改文件&#…

华为OD机试真题 Java 实现【递增字符串】【2023Q1 200分】,附详细解题思路

一、题目描述 定义字符串完全由“A’和B"组成&#xff0c;当然也可以全是"A"或全是"B。如果字符串从前往后都是以字典序排列的&#xff0c;那么我们称之为严格递增字符串。 给出一个字符串5&#xff0c;允许修改字符串中的任意字符&#xff0c;即可以将任…

色彩空间转换 HSV,GRAY

RGB色彩空间是一种比较常见的色彩空间&#xff0c;除此之外比较常见的色彩空间还包括GRAY色彩空间&#xff08;灰度图像&#xff09;、YCrCb色彩空间、HSV色彩空间、HLS色彩空间、CIEL&#xff0a;a&#xff0a;b&#xff0a;色彩空间、CIEL&#xff0a;u&#xff0a;v&#xf…

租售keysight E8257D 50G模拟信号发生器 销售/回收

是德&#xff08;Keysight&#xff09; E8257D 模拟信号发生器 Keysight E8257D (Agilent) PSG 模拟信号发生器提供业界领先的输出功率、电平精度和高达 67 GHz 的相位噪声性能&#xff08;工作频率可达 70 GHz&#xff09;。Agilent PSG 模拟信号发生器的高输出功率和卓越的电…

SpringBoot + MyBatis报错:Invalid bound statement (not found)解决

背景&#xff1a;XML配置文件规范 使用Mybatis的注解方式&#xff0c;主要是来完成一些简单的增删改查功能。如果需要实现复杂的SQL功能&#xff0c;建议使用XML来配置映射语句&#xff0c;也就是将SQL语句写在XML配置文件中。 在Mybatis中使用XML映射文件方式开发&#xff0c…

从速度、质量到成本,Grid分布式并行测试在web自动化测试中尽显优势。

目录 前言&#xff1a; 一、Grid分布式并行测试简介 二、Grid分布式并行测试的优势 三、Grid分布式并行测试架构 四、Grid分布式并行测试封装 五、结语 前言&#xff1a; WEB自动化测试已经成为了软件开发流程中不可或缺的一部分。测试人员通过编写脚本&#xff0c;模拟用…

由于找不到msvcp140.dll文件,我们要怎么解决这种情况?

在使用电脑的过程中&#xff0c;我们经常会遇到各种各样的问题&#xff0c;其中之一就是缺少msvcp140.dll文件。这个问题通常会导致某些软件无法正常运行&#xff0c;而且很多人对于如何解决这个问题并不是很清楚。本文将会介绍多种修复方法&#xff0c;并对比哪种方法比较方便…

海睿思分享 | 颠覆传统方式的数仓构建工具

你还在为构建数仓的低效率而发愁吗&#xff1f; 你还在为数仓构建不能体系化而苦恼吗&#xff1f; 也许大家都不愿意承认&#xff0c;但是绝大部分的企业当前是没有统一、标准、公共、全局的模型设计的&#xff0c;而仅仅是把数据同步上来&#xff0c;然后基于业务需求做烟囱式…

pwn中利用off by null的一个思路,构造假chunk的难以触及pre_size咋整

题目分享 children_tcache 链接&#xff1a;https://pan.baidu.com/s/1jARmxmGaoN_VADlb6m0D8A?pwdra0l 提取码&#xff1a;ra0l 参考博客&#xff1a; tcache在pwn题中常见的利用姿势 - 先知社区 (aliyun.com) 开始&#xff1a; 这道题的具体写法我就不说了&#xff0…