RabbitMQ:work结构

news2025/1/17 3:16:50

> 只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange

>  消费者指定Qoa和手动ack

生产者

package com.qf.mq2302.work;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {
    public static final String QUEUE_NAME="work";

    public static void main(String[] args) throws Exception {

        //1.获取连接对象
        Connection conn = MQUtils.getConnection();

        //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
        Channel channel = conn.createChannel();

        //3.声明了一个队列
        /**
         * queue – the name of the queue
         * durable – true代表创建的队列是持久化的(当mq重启后,该对立依然存在)
         * exclusive – 该队列是不是排他的 (该对立是否只能由当前创建该队列的连接使用)
         * autoDelete – 该队列是否可以被mq服务器自动删除
         * arguments – 队列的其他参数,可以为null
         */
     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello doubleasdasda!";

        //生产者如何发送消息,使用下面的方法即可
        /**
         * exchange – 交换机的名字 ,如果是空串,说明是把消息发给了默认交换机
         * routingKey – 路由的key,当发送消息给默认交换机时,routingkey代表队列的名字
         * other properties - 消息的其他属性,可以为null
         * body – 消息的内容,注意,要是有 字节数组
         */
        for (int i = 0; i < 21; i++) {
            channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
        }
        System.out.println(" [x] Sent '" + message + "'");

        //关闭资源
        channel.close();
        conn.close();
    }
}

消费者一

package com.qf.mq2302.work;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;

public class Recv {
   private  final  static  String QUEUE_NAME="work";

    public static void main(String[] args) throws Exception {
        //1.获取连接对象
        Connection conn = MQUtils.getConnection();

        //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
        Channel channel = conn.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中
        DeliverCallback deliverCallback =new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {

            //从Delivery对象中可以获取到生产者,发送的消息的字节数组
                byte[] body = message.getBody();
                String msg = new String(body, "utf-8");

                try {
                    Thread.sleep(400);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                //在这里写消费者的业务逻辑,例如,发送邮件
                System.out.println("消费者01:"+msg);


                //手动ack
                //从message对象中取
                long deliveryTag = message.getEnvelope().getDeliveryTag();
                /**
                 * 第一个参数:消息编号
                 * 第二个参数: false,代表只确认这一个消息
                 */
                channel.basicAck(deliveryTag,false);
            }
        };

        //设置该消费者,每次只能从mq中获取一条消息
        channel.basicQos(1);
        //4.让当前消费者开始消费(QUEUE_NAME)队列中的消息
        /**
          *把消费者的确认模式,设置为 手动 ack
         *
         */
      channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag -> {});





    }





}

消费者二

package com.qf.mq2302.work;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;

public class Recv02 {
   private  final  static  String QUEUE_NAME="work";

    public static void main(String[] args) throws Exception {
        //1.获取连接对象
        Connection conn = MQUtils.getConnection();

        //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
        Channel channel = conn.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中
        DeliverCallback deliverCallback =new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {

            //从Delivery对象中可以获取到生产者,发送的消息的字节数组
                byte[] body = message.getBody();
                String msg = new String(body, "utf-8");

                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                //在这里写消费者的业务逻辑,例如,发送邮件
                System.out.println("消费者02:"+msg);


                long deliveryTag = message.getEnvelope().getDeliveryTag();
                channel.basicAck(deliveryTag,false);
            }
        };
        //注意:这个是可以存三个,而不是一次发三个
        channel.basicQos(3);
        //4.让当前消费者开始消费(QUEUE_NAME)队列中的消息
        /**
         * queue – the name of the queue
         * autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。
         * deliverCallback – 当有消息发送给该消费者时,消费者如何处理消息的逻辑
         * cancelCallback – 当消费者被取消掉时,如果要执行代码,写到这里
         */
      channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag -> {});





    }





}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/983311.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

图床项目详解

文章目录 一、图床项目介绍二、图床项目架构三、图床功能实现3.1 注册功能3.2 登录功能3.3 用户文件列表3.4 上传文件3.5 上传文件之秒传3.6 获取共享文件列表或下载榜3.7 分享/ 删除文件/ 更新下载数3.8 取消分享/ 转存/ 更新下载计数3.9 图床分享图片 一、图床项目介绍 实现…

感应型静电消除器的组成和工作原理

感应型静电消除器是一种常用于消除物体表面静电的设备。它通过感测周围环境的静电电荷变化&#xff0c;并采取相应的措施来中和或消除这些电荷&#xff0c;以防止静电造成的问题。 感测型静电消除器通常由以下几个关键组件组成&#xff1a; 1. 静电感测器&#xff1a;用于检测…

CUDA相关知识科普

显卡 显卡&#xff08;Video card&#xff0c;Graphics card&#xff09;全称显示接口卡&#xff0c;又称显示适配器&#xff0c;是计算机最基本配置、最重要的配件之一。就像电脑联网需要网卡&#xff0c;主机里的数据要显示在屏幕上就需要显卡。因此&#xff0c;显卡是电脑进…

ChatGPT是如何辅助高效撰写论文及使用ChatGPT注意事项

ChatGPT发布近1年&#xff0c;各大高校对它的态度也发生了极大转变&#xff0c;今年3月发布ChatGPT禁令的牛剑等世界顶级名校也在近期解除了ChatGPT禁令&#xff0c;发布了生成式人工智能使用指南。 ChatGPT一定程度上可以解放科研人员的劳动力&#xff0c;与其直接禁止不如教…

【深入理解Linux内核锁】六、信号量

我的圈子: 高级工程师聚集地 我是董哥,高级嵌入式软件开发工程师,从事嵌入式Linux驱动开发和系统开发,曾就职于世界500强企业! 创作理念:专注分享高质量嵌入式文章,让大家读有所得! 文章目录 1、信号量介绍2、信号量的API3、API实现3.1 semaphore3.2 sema_init3.3 down…

口袋参谋:高流量权重标题,都是利用了这套工具玩法!

​近来无事&#xff0c;与几位电商大佬们一起喝茶聊天。在谈到提升宝贝流量最直接的方式&#xff0c;大家异口同声的说到&#xff1a;“搜索流量&#xff01;” 根据我近十年的电商经验&#xff0c;一个好的标题&#xff0c;不仅要契合宝贝核心关键词&#xff0c;同时也要契合…

网络技术十:交换机端口安全技术

交换机端口安全技术 802.1X 定义 起源于WLAN协议802.11&#xff0c;解决局域网终端的接入认证问题 认证方式 本地认证&#xff1a;由设备端内置本地服务器对客户端进行认证 远程集中认证&#xff1a;由远程的认证服务器对客户端进行认证 端口接入控制方式 基于端口认证…

配电房智能化系统

配电房智能化系统依托电易云-智慧电力物联网&#xff0c;综合利用现代先进技术&#xff0c;通过对配电房的监控、数据采集、自动控制和管理&#xff0c;实现对配电房的安全、可靠、高效、节能和环保监控的综合管理系统。 配电房智能化系统功能&#xff1a; 1.运行状态实时监测…

【C++漂流记】简易理解引用的基本语法和使用及其注意实现

引用是C中的一种数据类型&#xff0c;它允许我们使用一个已经存在的变量来创建一个新的名称或别名&#xff0c;以便可以通过这个别名来访问和修改原始变量的值。引用的本质是一个别名或者一个变量的别名。 文章目录 基本语法引用的注意事项引用做函数参数引用的本质常量引用 基…

04 卷积神经网络搭建

一、数据集 MNIST数据集是从NIST的两个手写数字数据集&#xff1a;Special Database 3 和Special Database 1中分别取出部分图像&#xff0c;并经过一些图像处理后得到的[参考]。 MNIST数据集共有70000张图像&#xff0c;其中训练集60000张&#xff0c;测试集10000张。所有图…

vue表格不显示列号123456

我在网上找了半天&#xff0c;都是如何添加列号123456的&#xff0c;没有找到不显示列号的参考&#xff0c;现在把这个解决了&#xff0c;特此记录一下。 没有加右边的就会显示&#xff0c;加上右边的就隐藏了

python+django协同过滤算法的音乐推荐系统研究vue

本系统提供给管理员对用户、音乐分类、歌手、热门歌曲等诸多功能进行管理。本系统对于用户输入的任何信息都进行了一定的验证&#xff0c;为管理员操作提高了效率&#xff0c;也使其数据安全性得到了保障。本音乐推荐研究以Django作为框架&#xff0c;B/S模式以及MySql作为后台…

QtCreator CMakeLists.txt添加模块(Modules)

find_package(QT NAMES Qt6 Qt5 REQUIRED COMPONENTS Widgets Sql) find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS Widgets Sql) target_link_libraries(HookeViscometer PRIVATE Qt${QT_VERSION_MAJOR}::Widgets Qt${QT_VERSION_MAJOR}::Sql) 蓝色部分为添加的Q…

13分钟聊聊并发包中常用同步组件并手写一个自定义同步组件

本篇文章通过AQS自己来实现一个同步组件&#xff0c;并从源码级别聊聊JUC并发包中的常用同步组件 本篇文章需要的前置知识就是AQS&#xff0c;阅读本篇文章大概需要13分钟 自定义同步组件 为了更容易理解其他同步组件&#xff0c;我们先来使用AQS自己来实现一个常用的可重入…

(数字图像处理MATLAB+Python)第十二章图像编码-第三、四节:有损编码和JPEG

文章目录 一&#xff1a;有损编码&#xff08;1&#xff09;预测编码A&#xff1a;概述B&#xff1a;DM编码C&#xff1a;最优预测器 &#xff08;2&#xff09;变换编码A&#xff1a;概述B&#xff1a;实现变换编码的主要问题 二&#xff1a;JPEG 一&#xff1a;有损编码 &am…

Kafka3.0.0版本——消费者(消费者总体工作流程图解)

一、消费者总体工作流程图解 角色划分&#xff1a;生产者、zookeeper、kafka集群、消费者、消费者组。如下图所示: 生产者发送消息给leader&#xff0c;followerr主动从leader同步数据&#xff0c;一个消费者可以消费某一个分区数据或者一个消费者可以消费多个分区数据。如下图…

9月6日上课内容 redis高可用

RDB 持久化 RDB持久化是指在指定的时间间隔内将内存中当前进程中的数据生成快照保存到硬盘(因此也称作快照持久化)&#xff0c;用二进制压缩存储&#xff0c;保存的文件后缀是rdb&#xff1b;当Redis重新启动时&#xff0c;可以读取快照文件恢复数据。 1. 触发条件 RDB持久化…

解锁前端Vue3宝藏级资料 Vue3全面解析 第二章 Vue3 基础语法指令

本章主要介绍vue3中的基础指令使用方式和一些开发技巧。分为基础指令&#xff0c;逻辑指令&#xff0c;列表指令&#xff0c;事件&#xff0c;MVVM数据绑定与监听。本章中所有代码例子都是在使用Vite 创建的 vue项目中来完成的。 基础语法指令 2.1 基础指令2.1.1 设置变量2.1.2…

记一次生产环境服务卡死排查记录

接现场运维报告某java服务CPU狂飙&#xff0c;服务处于卡死无响应状态 询问现场运维什么场景造成的&#xff0c;答复是偶发现象&#xff0c;没有规律&#xff0c;和请求高峰期并没有关系。 因为服务是负载均衡的&#xff08;A、B两台&#xff09;&#xff0c;临时处理让运维重…

【AIGC系列】Stable Diffusion 小白快速入门课程大纲

一、前言 本文是《Stable Diffusion 从入门到企业级应用实战》系列课程的前置学习引导部分&#xff0c;《Stable Diffusion新手完整学习地图课程》的课程大纲。该课程主要的培训对象是&#xff1a; 没有人工智能背景,想快速上手Stable Diffusion的初学者&#xff1b;想掌握St…