初试rabbitmq

news2024/12/24 20:56:28

rabbitmq的七种模式 

 Hello word

客户端引入依赖

<!--rabbitmq 依赖客户端-->
 <dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>5.8.0</version>
 </dependency>

生产者



import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
                //创建一个连接工厂
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("43.139.59.28");
                factory.setUsername("guest");
                factory.setPassword("guest");
                //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
                try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
                /**
                * 生成一个队列
                * 1.队列名称
                * 2.队列里面的消息是否持久化 默认消息存储在内存中
                * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
                * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
                * 5.其他参数
                */
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                String message="hello world!!!!";
                /**
                * 发送一个消息
                * 1.发送到那个交换机
                * 2.路由的 key 是哪个
                * 3.其他的参数信息
                * 4.发送消息的消息体
                */
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("消息发送完毕");
           }
    }
}

消费者



import com.rabbitmq.client.*;

public class Consumer {
       private final static String QUEUE_NAME = "hello";

      public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("43.139.59.28");
            factory.setUsername("guest");
            factory.setPassword("guest");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            System.out.println("等待接收消息....");
            //推送的消息如何进行消费的接口回调
            DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String message= new String(delivery.getBody());
            System.out.println(message);
            };
            //取消消费的一个回调接口 如在消费的时候队列被删除掉了
            CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("消息消费被中断");
            };
             /**
              * 消费者消费消息
              * 1.消费哪个队列
              * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
              * 3.消费者未成功消费的回调
              */
             channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
      }
}

获取消息

 工作队列

封装获取getChannel的工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {
     //得到一个连接的 channel
     public static Channel getChannel() throws Exception{
         //创建一个连接工厂
         ConnectionFactory factory = new ConnectionFactory();
         factory.setHost("43.139.59.28");
         factory.setUsername("guest");
         factory.setPassword("guest");
         Connection connection = factory.newConnection();
         Channel channel = connection.createChannel();
         return channel;
     }
}

接收消息工作1线程

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Worker01 {
     private static final String QUEUE_NAME="hello";

     public static void main(String[] args) throws Exception {
         Channel channel = RabbitMqUtils.getChannel();
         DeliverCallback deliverCallback=(consumerTag, delivery)->{
         String receivedMessage = new String(delivery.getBody());
         System.out.println("接收到消息:"+receivedMessage);
         };
         CancelCallback cancelCallback=(consumerTag)->{
         System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
         };
         System.out.println("C1 消费者启动等待消费......");
         channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
     }
}

 接收消息工作线程2

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Worker02 {
    private static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C2 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

发送10次消息

线程1和线程2平分消息

发布订阅 

 RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

总共有以下类型: 直接(direct), 主题 (topic) , 标题 (headers) , 扇出 (fanout)

fanout

接收者1 

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs01 {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        /**
        * 生成一个临时的队列 队列的名称是随机的
        * 当消费者断开和该队列的连接时 队列自动删除
        */
        String queueName = channel.queueDeclare().getQueue();
        //把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println("等待接收消息,把接收到的消息打印在屏幕.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
           String message = new String(delivery.getBody(), "UTF-8");
           System.out.println("控制台打印接收到的消息"+message);
        };
     channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
   }
}

接收者2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;

import java.io.File;

public class ReceiveLogs02 {
    private static final String EXCHANGE_NAME = "logs";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        /**
        * 生成一个临时的队列 队列的名称是随机的
        * 当消费者断开和该队列的连接时 队列自动删除
        */
        String queueName = channel.queueDeclare().getQueue();
        //把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println("等待接收消息,把接收到的消息写到文件.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
           String message = new String(delivery.getBody(), "UTF-8");
//           File file = new File("C:\\work\\rabbitmq_info.txt");
//           FileUtils.writeStringToFile(file,message,"UTF-8");
           System.out.println("数据写入文件成功"+message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

 发送者

import com.rabbitmq.client.Channel;

import java.util.Scanner;

public class EmitLog {
     private static final String EXCHANGE_NAME = "logs";

     public static void main(String[] argv) throws Exception {
         try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
            * 声明一个 exchange
            * 1.exchange 的名称
            * 2.exchange 的类型
            */
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            Scanner sc = new Scanner(System.in);
            System.out.println("请输入信息");
            while (sc.hasNext()) {
               String message = sc.nextLine();
               channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
               System.out.println("生产者发出消息" + message);
            }
         }
     }
}

 结果

 Direct

接收者1 ,写入错误日志

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;

import java.io.File;

public class ReceiveLogsDirect01 {
    private static final String EXCHANGE_NAME = "direct_logs";
      public static void main(String[] argv) throws Exception {
         Channel channel = RabbitMqUtils.getChannel();
         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
         String queueName = "disk";
         channel.queueDeclare(queueName, false, false, false, null);
         channel.queueBind(queueName, EXCHANGE_NAME, "error");
         System.out.println("等待接收消息.....");
         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
             String message = new String(delivery.getBody(), "UTF-8");
             message="接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;
//             File file = new File("C:\\work\\rabbitmq_info.txt");
//             FileUtils.writeStringToFile(file,message,"UTF-8");
             System.out.println("错误日志已经接收"+message);
         };
         channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
         });
      }
}

 接收者2,打印控制台信息

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsDirect02 {
        private static final String EXCHANGE_NAME = "direct_logs";
        public static void main(String[] argv) throws Exception {
           Channel channel = RabbitMqUtils.getChannel();
           channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
           String queueName = "console";
           channel.queueDeclare(queueName, false, false, false, null);
           channel.queueBind(queueName, EXCHANGE_NAME, "info");
           channel.queueBind(queueName, EXCHANGE_NAME, "warning");
           System.out.println("等待接收消息.....");
           DeliverCallback deliverCallback = (consumerTag, delivery) -> {
              String message = new String(delivery.getBody(), "UTF-8");
              System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);
           };
           channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
           });
        }
}

 发送者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
     public static void main(String[] argv) throws Exception {
     try (Channel channel = RabbitMqUtils.getChannel()) {
         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
         //创建多个 bindingKey
         Map<String, String> bindingKeyMap = new HashMap<>();
         bindingKeyMap.put("info","普通 info 信息");
         bindingKeyMap.put("warning","警告 warning 信息");
         bindingKeyMap.put("error","错误 error 信息");
         //debug 没有消费这接收这个消息 所有就丢失了
         bindingKeyMap.put("debug","调试 debug 信息");
         for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
            String bindingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,bindingKey, null, 
            message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:" + message);
         }
     }
     }
}

接收到信息

Topics  

主题1 

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic01 {
       private static final String EXCHANGE_NAME = "topic_logs";
       public static void main(String[] argv) throws Exception {
          Channel channel = RabbitMqUtils.getChannel();
          channel.exchangeDeclare(EXCHANGE_NAME, "topic");
          //声明 Q1 队列与绑定关系
          String queueName="Q1";
          channel.queueDeclare(queueName, false, false, false, null);
          channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
          System.out.println("等待接收消息.....");
          DeliverCallback deliverCallback = (consumerTag, delivery) -> {
             String message = new String(delivery.getBody(), "UTF-8");
             System.out.println(" 接收队列 :"+queueName+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
          };
          channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
          });
       }
}

 主题2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic02 {
     private static final String EXCHANGE_NAME = "topic_logs";
      public static void main(String[] argv) throws Exception {
          Channel channel = RabbitMqUtils.getChannel();
          channel.exchangeDeclare(EXCHANGE_NAME, "topic");
          //声明 Q2 队列与绑定关系
          String queueName="Q2";
          channel.queueDeclare(queueName, false, false, false, null);
          channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
          channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
          System.out.println("等待接收消息.....");
          DeliverCallback deliverCallback = (consumerTag, delivery) -> {
             String message = new String(delivery.getBody(), "UTF-8");
             System.out.println(" 接收队列 :"+queueName+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
          };
          channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
          });
      }
}

发送者

import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";
     public static void main(String[] argv) throws Exception {
         try (Channel channel = RabbitMqUtils.getChannel()) {
         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
         /**
         * Q1-->绑定的是
         * 中间带 orange 带 3 个单词的字符串(*.orange.*)
         * Q2-->绑定的是
         * 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
         * 第一个单词是 lazy 的多个单词(lazy.#)
         *
         */
         Map<String, String> bindingKeyMap = new HashMap<>();
         bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
         bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
         bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
         bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
         bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
         bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
         bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
         bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
           for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
              String bindingKey = bindingKeyEntry.getKey();
              String message = bindingKeyEntry.getValue();
              channel.basicPublish(EXCHANGE_NAME,bindingKey, null, 
             message.getBytes("UTF-8"));
              System.out.println("生产者发出消息" + message);
           }
         }
     }
}

 结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/892162.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

相对于多进程,你真的知道为什么要使用多线程吗(C/C++多线程编程)

目录 前言 线程VS进程 POSIX线程库的使用 线程创建 线程等待 线程分离 线程状态 可结合态线程实例 分离态线程实例 线程退出 线程的同步与互斥 同步互斥的概念 互斥锁&#xff08;互斥&#xff09; 互斥锁的使用步骤 总结说明 信号量 信号量的使用步骤 条件变…

数据包如何游走于 Iptables 规则之间?

在前文《Linux路由三大件》中&#xff0c;我们提到了 iptables 可以修改数据包的特征从而影响其路由。这个功能无论是传统场景下的 防火墙&#xff0c;还是云原生场景下的 服务路由&#xff08;k8s service&#xff09;、网络策略(calico network policy) 等都有依赖。 虽然业…

7.逻辑结构VS物理结构

第四章 文件管理 7.逻辑结构VS物理结构 ​   fopen这个函数做的事情就是打开了“test.txt”这个文件&#xff0c;并且“w”说明是以“写”的方式打开的&#xff0c;以“写”的方式打开才能往这个文件里写入数据&#xff0c;如果文件打开了那么fp这个指针就可以指向和这个文件…

Eclipse如何设置快捷键

在eclopse设置注释行和取消注释行 // 打开eclipse&#xff0c;依次打开&#xff1a;Window -> Preferences -> General -> Key&#xff0c;

数据结构--关键路径

数据结构–关键路径 AOE⽹ 在 带权有向图 \color{red}带权有向图 带权有向图中&#xff0c;以 顶点表示事件 \color{red}顶点表示事件 顶点表示事件&#xff0c;以 有向边表示活动 \color{red}有向边表示活动 有向边表示活动&#xff0c;以 边上的权值表示完成该活动的开销 \…

HCIE--------------------------------------第一节OSPF快速收敛(OSPF与BGP联动)

一、OSPF快速收敛概述 OSPF快速收敛是为了提高路由的收敛速度而做的扩展特性&#xff0c;包括&#xff1a;PRC&#xff08;Partial Route Calculation&#xff0c;部分路由计算&#xff09;和智能定时器。 同时&#xff0c;OSPF支持故障恢复快速收敛&#xff0c;例如通过OSPF …

Linux Server 20.04 Qt5.14.2配置Jetson Orin Nano Developer Kit 交叉编译环境

最近公司给了我一块Jetson Orin Nano的板子&#xff0c;让我搭建交叉编译环境&#xff0c;所以有了下面的文章 一 :Qt5.14.2交叉编译环境安装 1.准备 1.1设备环境 1.1.1 Server: Ubuntu20.04: Qt 源码 5.14.2 Qt 软件 5.14.2 gcc 版本 9.4.0 g 版本 9.4.0 1.1.2 Jetson …

在 React 中获取数据的6种方法

一、前言 数据获取是任何 react 应用程序的核心方面。对于 React 开发人员来说&#xff0c;了解不同的数据获取方法以及哪些用例最适合他们很重要。 但首先&#xff0c;让我们了解 JavaScript Promises。 简而言之&#xff0c;promise 是一个 JavaScript 对象&#xff0c;它将…

openGauss学习笔记-42 openGauss 高级数据管理-触发器

文章目录 openGauss学习笔记-42 openGauss 高级数据管理-触发器42.1 语法格式42.2 参数说明42.3 示例 openGauss学习笔记-42 openGauss 高级数据管理-触发器 触发器会在指定的数据库事件发生时自动执行函数。 42.1 语法格式 创建触发器 CREATE TRIGGER trigger_name { BEFORE…

Java8实战-总结16

Java8实战-总结16 引入流流与集合只能遍历一次外部迭代与内部迭代 引入流 流与集合 只能遍历一次 和迭代器类似&#xff0c;流只能遍历一次。遍历完之后&#xff0c;这个流就已经被消费掉了。可以从原始数据源那里再获得一个新的流来重新遍历一遍&#xff0c;就像迭代器一样…

使用qsqlmysql操作mysql提示Driver not loaded

环境: win10 IDE: qt creator 编译器: mingw32 这里简单的记录下。我遇到的情况是在IDE使用debug和release程序都是运行正常&#xff0c;但是当我编译成发布版本之后。老是提示Driver not load。 这就很奇诡了。 回顾了下编译的时候是需要在使用qt先编译下libqsqlmysql.dll的…

从入门到精通Python隧道代理的使用与优化

哈喽&#xff0c;Python爬虫小伙伴们&#xff01;今天我们来聊聊如何从入门到精通地使用和优化Python隧道代理&#xff0c;让我们的爬虫程序更加稳定、高效&#xff01;今天我们将对使用和优化进行一个简单的梳理&#xff0c;并且会提供相应的代码示例。 1. 什么是隧道代理&…

V2board缓存投毒漏洞复现

1.什么是缓存投毒 缓存投毒&#xff08;Cache poisoning&#xff09;&#xff0c;通常也称为域名系统投毒&#xff08;domain name system poisoning&#xff09;&#xff0c;或DNS缓存投毒&#xff08;DNS cache poisoning&#xff09;。它是利用虚假Internet地址替换掉域名系…

数据结构—排序

8.排序 8.1排序的概念 什么是排序&#xff1f; 排序&#xff1a;将一组杂乱无章的数据按一定规律顺序排列起来。即&#xff0c;将无序序列排成一个有序序列&#xff08;由小到大或由大到小&#xff09;的运算。 如果参加排序的数据结点包含多个数据域&#xff0c;那么排序往…

Arduino 入门学习笔记10 使用I2C的OLED屏幕

Arduino 入门学习笔记10 使用I2C的OLED屏幕 一、准备工具二、JMD0.96C-1介绍1. 显示屏参数2. SSD1306驱动芯片介绍&#xff1a; 三、使用Arduino开发步骤1. 安装库&#xff08;1&#xff09;Adafruit_GFX_Library 库&#xff08;2&#xff09;Adafruit_SSD1306 驱动库&#xff…

HCIP——STP配置案例

STP配置案例 一、简介二、实现说明1、华为实现说明2、其他厂商实现 三、STP原理1、协商原则2、角色和状态3、报文格式4、BPDU报文处理流程4.1 BPDU报文的分类4.2 BPDU报文的处理流程4.3 BPDU报文格式 四、使用注意事项五、配置举例1、组网需求2、配置思路3、操作步骤4、配置文件…

多维时序 | MATLAB实现WOA-CNN鲸鱼算法优化卷积神经网络的数据多变量时间序列预测

多维时序 | MATLAB实现WOA-CNN鲸鱼算法优化卷积神经网络的数据多变量时间序列预测 目录 多维时序 | MATLAB实现WOA-CNN鲸鱼算法优化卷积神经网络的数据多变量时间序列预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 多维时序 | MATLAB实现WOA-CNN鲸鱼算法优化卷积神经…

大模型技术实践(一)|ChatGLM2-6B基于UCloud UK8S的创新应用

近半年来&#xff0c;通过对多款主流大语言模型进行了调研&#xff0c;我们针对其训练方法和模型特点进行逐一分析&#xff0c;方便大家更加深入了解和使用大模型。本文将重点分享ChatGLM2-6B基于UCloud云平台的UK8S实践应用。 01各模型结构及特点 自从2017年6月谷歌推出Transf…

【OpenCV学习笔记】我的OpenCV学习之路

刚开始接触OpenCV是因为需要进行图像的处理&#xff0c;由于之前没有接触过&#xff0c;所以只能自己进行学习&#xff0c;下面将学习的过程做简单记录分享。 OpenCV专栏链接 OpenCV学习笔记 一、引言 OpenCV&#xff08;Open Source Computer Vision Library&#xff09;是…

【C# 基础精讲】文件读取和写入

文件读取和写入是计算机程序中常见的操作&#xff0c;用于从文件中读取数据或将数据写入文件。在C#中&#xff0c;使用System.IO命名空间中的类来进行文件读写操作。本文将详细介绍如何在C#中进行文件读取和写入&#xff0c;包括读取文本文件、写入文本文件、读取二进制文件和写…