RabbitMQ-java使用消息队列

news2024/12/23 4:09:27

1 java操作消息队列

1.1 java实现生产者

新建一个springboot项目,导入依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.2</version>
</dependency>

在这里插入图片描述
导入依赖后,实现生产者和消费者,首先是生产者,生产者负责将消息发送到消息队列

    public static void main(String[] args) {
        //使用ConnectionFactory来创建连接
        ConnectionFactory factory = new ConnectionFactory();

        //设定连接信息,基操
        factory.setHost("8.130.172.119");
        factory.setPort(5672);  //注意这里写5672,是amqp协议端口
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/test");

        //创建连接
        try(Connection connection = factory.newConnection()){

        }catch (Exception e){
            e.printStackTrace();
        }
    }

在这里插入图片描述
直接在程序种定义并创建消息队列,客户端需要通过连接(connection)创建一个新的通道(Channel),同一个连接下可以很多个通道,这样就不用创建很多个连接也能支持分开发送。

try(Connection connection = factory.newConnection();
    Channel channel = connection.createChannel()){   //通过Connection创建新的Channel
  	//声明队列,如果此队列不存在,会自动创建
    channel.queueDeclare("yyds", false, false, false, null);
  	//将队列绑定到交换机
    channel.queueBind("yyds", "amq.direct", "my-yyds");
  	//发布新的消息,注意消息需要转换为byte[]
    channel.basicPublish("amq.direct", "my-yyds", null, "Hello World!".getBytes());
}catch (Exception e){
    e.printStackTrace();
}

其中queeuDeclare方法的参数如下:

  • queue: 队列的名称 (默认创建后routingKey和队列名称一致)
  • durable: 是否持久化。
  • exclusive: 是否排他,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。排他队列是基于Connection可见,同-个Connection的不同Channel是可以同时访问同-个连接创建的排他队列,并且,如果一个Connection已经声明了一个排他队列,其他的Connection是不允许建立同名的排他队列的,即使该队列是持久化的,一旦Connection关闭或者客户端退出,该排他队列都会自动被删除。
  • autoDelete: 是否自动删除
  • arguments:设置队列的其他一些参数,这里我们暂时不需要什么其他参数

其中queueBind方法的参数如下:

  • queue: 需要绑定的队列名称
  • exchange: 需要绑定的交换机名称。
  • routingKey:

其中basicPublic方法的参数如下:

  • exchange: 对应的Exchange名称,我们这里就使用第二个直连交换机。
  • routingKey: 这里我们填写绑定时指定的routingKey,其实和之前在管理页面操作一样
  • props: 其他的配置。
  • body: 消息本体

当前队列状态
在这里插入图片描述
运行测试类
在这里插入图片描述
出现新的队列
在这里插入图片描述
点击队列名称进入详情
在这里插入图片描述
获取到java发送的消息
在这里插入图片描述

1.2 java实现消费者

直接从指定的队列去取出数据,不需要再管交换机,但是还是通过connection去创建channel。消费者代码如下

package com.example.rbbitmqtest;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
    public static void main(String[] args) throws IOException, TimeoutException {
        //使用ConnectionFactory来创建连接
        ConnectionFactory factory = new ConnectionFactory();

        //设定连接信息,基操
        factory.setHost("8.130.172.119");
        factory.setPort(5672);  //注意这里写5672,是amqp协议端口
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/test");

        //这里不使用try-with-resource,因为消费者是一直等待新的消息到来,然后按照
        //我们设定的逻辑进行处理,所以这里不能在定义完成之后就关闭连接
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //创建一个基本的消费者
        channel.basicConsume("yyds", false, (s, delivery) -> {
            System.out.println(new String(delivery.getBody()));
            //basicAck是确认应答,第一个参数是当前的消息标签,后面的参数是
            //是否批量处理消息队列中所有的消息,如果为false表示只处理当前消息
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            //basicNack是拒绝应答,最后一个参数表示是否将当前消息放回队列,如果
            //为false,那么消息就会被丢弃
            //channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            //跟上面一样,最后一个参数为false,只不过这里省了
            //channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
        }, s -> {});
    }

当创建完basicConsume后会一直等待消息,不会自动关闭。

其中basucConsume方法参数如下:

  • queue - 消息队列名称,直接指定。
  • autoAck今自动应答,消费者从消息队列取出数据后,需要跟服务器进行确认应答,当服务器收到确认后,会自动将消息删除,如果开启自动应答,那么消息发出后会直接删除。
  • deliver - 消息接收后的函数回调,我们可以在回调中对消息进行处理,处理完成后,需要给服务器确认应答
  • cancel - 当消费者取消订阅时进行的函数回调,这里暂时用不到。
    其中第二个参数为false时,需要手动调用ack的四种方式,若为true,则会默认为ack中的第二个方式,即拿到消息后就把消息消耗掉队列就不存在。
    在这里插入图片描述

测试channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false,false);
其中第一个false代表是否把后边消息全部处理了,false代表不处理
第二个false代表是否丢回消息队列,若为true,得到消息后还会把消息返回给消息队列

在这里插入图片描述
在这里插入图片描述
此时消息队列没有消息了
在这里插入图片描述
在界面把队列删除
在这里插入图片描述
以上为RabbitMQ的简单使用,通过java连接到服务器。

2 SpringBoot整合消息队列客户端

新建一个springboot项目

导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在这里插入图片描述

修改配置文件

在这里插入图片描述

创建配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
    @Bean("directExchange")  //定义交换机Bean,可以很多个
    public Exchange exchange(){
        return ExchangeBuilder.directExchange("amq.direct").build();
    }

    @Bean("yydsQueue")     //定义消息队列
    public Queue queue(){
        return QueueBuilder
          				.nonDurable("yyds")   //非持久化类型
          				.build();
    }

    @Bean("binding")
    public Binding binding(@Qualifier("directExchange") Exchange exchange,
                           @Qualifier("yydsQueue") Queue queue){
      	//将我们刚刚定义的交换机和队列进行绑定
        return BindingBuilder
                .bind(queue)   //绑定队列
                .to(exchange)  //到交换机
                .with("my-yyds")   //使用自定义的routingKey
                .noargs();
    }
}

在这里插入图片描述

2.1 创建一个生产者

当前的管理端队列为空
在这里插入图片描述

直接在测试类中:

@SpringBootTest
class SpringCloudMqApplicationTests {

  	//RabbitTemplate为我们封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
    @Resource
    RabbitTemplate template;

    @Test
    void publisher() {
      	//使用convertAndSend方法一步到位,参数基本和之前是一样的
      	//最后一个消息本体可以是Object类型,非常方便
        template.convertAndSend("amq.direct", "my-yyds", "Hello World11!");
    }

}

运行测试类
在这里插入图片描述
新的消息队列
在这里插入图片描述
查看详情
在这里插入图片描述
取出消息
在这里插入图片描述

2.2 创建消费者

因为消费者实际上就是一直等待消息然后处理的角色,因此只需要创建一个监听器就行了,它会一直等待消息到来然后在进行处理:

@Component  //注册为Bean
public class TestListener {

    @RabbitListener(queues = "yyds")   //定义此方法为队列yyds的监听器,一旦监听到新的消息,就会接受并处理
    public void test(Message message){
        System.out.println(new String(message.getBody()));
    }
}

在这里插入图片描述

修改pom文件,改变依赖

在这里插入图片描述
在这里插入图片描述

修改依赖改为web方式后会一直启动,这样监听器也会一直监听

在这里插入图片描述

启动后可以看到队列的消息即被监听到了。

在这里插入图片描述

测试监听效果

在这里插入图片描述

控制台不用重启可以监听到消息

在这里插入图片描述

若需要确保消息能够被消费者接收并处理,然后得到消费者的反馈,也可以,修改测试类中生产者测试代码:

@Test
void publisher() {
  	//会等待消费者消费然后返回响应结果
    Object res = template.convertSendAndReceive("amq.direct", "my-yyds", "Hello World!");
    System.out.println("收到消费者响应:"+res);
}

在这里插入图片描述

修改消费者监听代码

    @RabbitListener(queues = "yyds")   //定义此方法为队列yyds的监听器,一旦监听到新的消息,就会接受并处理
    public String test(String message){
        System.out.println(message);
        return "响应成功";
    }

在这里插入图片描述
重启application,然后运行生产者发送消息测试类.在这里插入图片描述

如果说需要直接接收一个JSON格式消息,并希望得到实体类。

在这里插入图片描述
在这里插入图片描述

创建用于JSON转换的Bean,这样就可以接收的JSON格式转化为实体类对象,发送的时候也是以JSON格式发送

在这里插入图片描述

消费者指定转换器,修改接收对象。然后重启application服务

在这里插入图片描述

在rabbitmq网页管理端发送json数据{"id":1,"name":"LB"}
在这里插入图片描述

接收成功

在这里插入图片描述

测试类发送实体类格式信息

在这里插入图片描述

接收到消息

在这里插入图片描述
这样就实现了Springboot操作rabbitmq消息队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1059780.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ctfshow web入门 php特性 web113-web125

1.web113 和上题一样,/proc/self/root代表根目录&#xff0c;进行目录溢出&#xff0c;超过is_file能处理的最大长度就不认为是个文件了 payload: compress.zlib://flag.php /proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/p roc/self/root/p…

操作系统内存管理相关

1. 虚拟内存 1.1 什么是虚拟内存 虚拟内存是计算机系统内存管理的一种技术&#xff0c;我们可以手动设置自己电脑的虚拟内存。不要单纯认为虚拟内存只是“使用硬盘空间来扩展内存“的技术。虚拟内存的重要意义是它定义了一个连续的虚拟地址空间&#xff0c;并且 把内存扩展到硬…

SLAM从入门到精通(用python实现机器人运动控制)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 在ROS下面&#xff0c;开发的方法很多&#xff0c;可以是c&#xff0c;可以是python。大部分接口操作类的应用&#xff0c;其实都可以用python来开…

(五)激光线扫描-位移台标定

线激光属于主动测量方式,但是由于线激光的特性,我们只能通过提取激光中心线获取这一条线上的高度信息,那么要进行三维重建的话,就需要通过平移或者是旋转的方式,来让线激光扫描被测物体的完整轮廓,也就是整个表面。激光线的密度越高还原出来的物体越细腻,但由于数据量大…

计算机中丢失vcomp140.dll解决方案,可以使用这几个最新方法来修复

今天早上&#xff0c;当我打开电脑时&#xff0c;突然看到一个提示窗口&#xff0c;显示找不到 vcomp140.dll 文件。我一下子懵了&#xff0c;不知道这是怎么回事&#xff0c;也不知道如何解决这个问题。于是&#xff0c;我开始了寻找答案的旅程。 首先&#xff0c;我了解到 v…

<C++> 异常

C语言传统的处理错误的方式 传统的错误处理机制&#xff1a; 终止程序&#xff0c;如assert&#xff0c;缺陷&#xff1a;用户难以接受。如发生内存错误&#xff0c;除0错误时就会终止程序。返回错误码&#xff0c;缺陷&#xff1a;需要程序员自己去查找对应的错误。如系统的…

Autosar诊断实战系列21-UDS连续帧(CF)数据接收代码级分析

本文框架 前言1. 长帧数据的连续帧接收2. 连续帧的处理前言 在本系列笔者将结合工作中对诊断实战部分的应用经验进一步介绍常用UDS服务的进一步探讨及开发中注意事项, Dem/Dcm/CanTp/Fim模块配置开发及注意事项,诊断与BswM/NvM关联模块的应用开发及诊断capl测试脚本开发等诊…

计算机类毕业设计选题60套!太全了!快收藏!

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人七年开发经验&#xff0c;擅长Java、Python、PHP、.NET、微信小程序、爬虫、大数据等&#xff0c;大家有这一块的问题可以一起交流&#xff01; &#x1f495;&…

基于MDK-Keil环境如何把STM32程序直接下载到SRAM运行

1. 前言 对于 Cortex-M 内核的微控制器&#xff0c;它们都可以支持在 RAM 中执行程序&#xff0c;有些非 ARM 的微控制器是不支持的。 在内部 SRAM 执行程序&#xff0c;有基于以下几方面的原因&#xff1a; 1、所使用的设备可能具有OTP&#xff08;One-time Programmable&a…

《CPU设计实战》第四章lab3记录找bug

修bug之路 1. debug_wb_pc 一个信号一个信号找下去&#xff0c;发现ID_stage.v中load_op未赋值 assign load_op inst_lw; 代码解释 module decoder_5_32(input [ 4:0] in,output [31:0] out ); //这个循环被命名为 gen_for_dec_5_32。 genvar i; generate for (i0; i<…

c++中的动态内存管理

目录 1.内存分布 2.c语言动态内存管理 3.c动态内存管理 4.operator new 与operator delete 函数 5.定位new 6.malloc/free 与 new/delete 的区别 1.内存分布 首先我们需要了解一下数据在内存中的分布&#xff0c;请看以下代码&#xff1a; int globalVar 1; static in…

Day 04 python学习笔记

Python数据容器 元组 元组的声明 变量名称&#xff08;元素1&#xff0c;元素2&#xff0c;元素3&#xff0c;元素4…….&#xff09; &#xff08;元素类型可以不同&#xff09; eg: tuple_01 ("hello", 1, 2,-20,[11,22,33]) print(type(tuple_01))结果&#x…

基于Java的4S店汽车商城系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作…

C++17中头文件filesystem的使用

C17引入了std::filesystem库(文件系统库, filesystem library)&#xff0c;相关类及函数的声明在头文件filesystem中&#xff0c;命名空间为std::filesystem。 1.path类&#xff1a;文件路径相关操作&#xff0c;如指定的路径是否存在等&#xff0c;其介绍参见&#xff1a;http…

STM32复习笔记(四):看门狗

目录 &#xff08;一&#xff09;简介 &#xff08;二&#xff09;IWDG IWDG的CUBEMX工程配置 IWDG相关函数&#xff08;非常少&#xff0c;所以直接贴上来&#xff09;&#xff1a; &#xff08;三&#xff09;WWDG &#xff08;一&#xff09;简介 看门狗分为独立看门…

【Java】微服务——Nacos注册中心

目录 1.Nacos快速入门1.1.服务注册到nacos1&#xff09;引入依赖2&#xff09;配置nacos地址3&#xff09;重启 2.服务分级存储模型2.1.给user-service配置集群2.2.同集群优先的负载均衡 3.权重配置4.环境隔离4.1.创建namespace4.2.给微服务配置namespace 5.Nacos与Eureka的区别…

[C++随想录] 优先级队列的模拟实现

优先级队列的模拟实现 底层结构初始化向下调整 && 向上调整push && poptop && empty && size源码 底层结构 namespace muyu {template <class T, class Continer std::vector<T>, class Compare less<T> >class priority_…

C#停车场管理系统

目录 一、绪论1.1内容简介及意义1.2开发工具及技术介绍 二、总体设计2.1系统总体架构2.2登录模块总体设计2.3主界面模块总体设计2.4停车证管理模块总体设计2.5停车位管理模块总体设计2.6员工管理模块总体设计2.7其他模块总体设计 三、详细设计3.1登录模块设计3.2主界面模块设计…

力扣:119. 杨辉三角 II(Python3)

题目&#xff1a; 给定一个非负索引 rowIndex&#xff0c;返回「杨辉三角」的第 rowIndex 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 链接&#xff1a;力扣&#xff08;LeetCode&#xff09…

【Linux】ping命令详解

目录 一、ping概述 二、Ping用法 三、ping参数详解 四、使用 五、Wireshark抓取ICMP请求应答消息 一、ping概述 ping 命令用于测试与目标主机之间的连接。它向目标主机发送一个ICMP&#xff08;Internet Control Message Protocol&#xff09;Internet控制报文协议回显请求…