消息队列 (9)-消费者核心类的实现

news2025/1/22 19:43:13

目录

  • 前言
  • 消费者类设计思路
    • 核心API
    • 总体代码

前言

我们上一篇博客,写了虚拟主机的实现, 在虚拟主机中需要用到俩个未实现的类,分别是验证绑定关键字和消费者类,接下来我们实现消费者类的核心代码

消费者类设计思路

在这个类中,首先我们要持有virtualHost对象来操作数据, 然后我们指定一个线程池负责具体的回调函数,通过一个扫描队列来不停的扫描所有的队列,看那个队列有新的消息,如果有就放到阻塞队列中去,消费者每次从阻塞队列中取出一个消息来响应。如果是多个消费者都订阅了一个消息,那么就使用轮询的方式来获取消息
在这里插入图片描述

核心API

属性

虚拟主机
线程池
阻塞 队列
扫描线程

方法
①往阻塞队列中添加消息

// 往阻塞队列中添加消息
    public void notifyConsume(String queueName) throws InterruptedException {
        tokenQueue.put(queueName);
    }

②订阅消息

我们的思路是,先找到对应的队列,然后去查看队列中是否有消息,如果有就要消费掉这些消息

 //添加订阅者
    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
        // 1 找到对应的队列
        MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);

        if (queue == null){
            throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);
        }
        ConsumEnv consumEnv = new ConsumEnv(consumerTag,queueName,autoAck,consumer);
        synchronized (queue){
            queue.addConsumEnv(consumEnv);
            // 如果当前队列中已经有了一些消息了, 需要立即就消费掉.
            int n = virtuaHost.getMemoryDataCenter().getMessageCount(queueName);
            for (int i = 0; i < n; i++) {
                // 这个方法调用一次就消费一条消息.
                consumeMessage(queue);
            }
        }
    }

③消费消息
关于消费消息,我们按照轮询的方式来依次消费

 // 消费消息
    private void consumeMessage(MSGQueue queue) {
        // 1. 按照轮询的方式, 找个消费者出来.
        ConsumEnv luckyDog = queue.chooseConsumEnv();
        if (luckyDog == null){
            // 说明没有消费者
            return;
        }
        // 2. 从队列中取出一个消息
        Message message = virtuaHost.getMemoryDataCenter().pollMessage(queue.getName());
        if (message == null){
            // 说明没有消息,不能消费
            return;
        }
        // 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.
        workPool.submit(()->{
            try {
                //1,将消息放到待确认的集合中, 这个操作在回调函数之前
                virtuaHost.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);
                //2. 执行回调函数
                luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());
                System.out.println("[ConsumerManager] 消息被成功消费, queueName = "+queue.getName());
                //3. 如果是自动应答, 就可以之间删除消息
                // 如果是手动应答,  就先什么也不做
                if (luckyDog.isAutoAck()){
                    // 1删除硬盘上的消息
                    if (message.getDeliverMode() == 2){
                        virtuaHost.getDiskDataCenter().deleteMessage(queue,message);
                    }
                    //2 删除待确认的消息
                    virtuaHost.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
                    // 3 删除内存中的消息
                    virtuaHost.getMemoryDataCenter().removeMessage(message.getMessageId());
                    System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());
                }
            } catch (IOException | ClassNotFoundException | MqException e) {
                e.printStackTrace();
            }
        });
    }

总体代码

package com.example.demo.mqServer.core;

import com.example.demo.Common.ConsumEnv;
import com.example.demo.Common.Consumer;
import com.example.demo.Common.MqException;
import com.example.demo.mqServer.VirtuaHost;

import java.io.IOException;
import java.util.concurrent.*;

/*
* 通过这个类, 来实现来实现消费者消费消息的核心功能
* */
public class ConsumerManager {
    // 持有上层对象 VirtualHost 调用 ,来操作数据
    private VirtuaHost virtuaHost;
    // 指定一个线程池, 负责执行具体的回调函数
    private ExecutorService workPool = Executors.newFixedThreadPool(4);
    //  存放令牌的队列  - 阻塞队列
    private BlockingDeque<String> tokenQueue = new LinkedBlockingDeque<>();
    //  扫描线程
    private Thread scannerThread = null;
    //
    public ConsumerManager(VirtuaHost virtuaHost) {
        this.virtuaHost = virtuaHost;
        scannerThread =new Thread(()->{
            while (true){
                try {
                    String queueName = tokenQueue.take();
                    MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);
                    if (queue == null){
                        throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);
                    }
                    synchronized (queue){
                        consumeMessage(queue);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (MqException e) {
                    e.printStackTrace();
                }
            }
        });
        scannerThread.setDaemon(true);
        scannerThread.start();
    }
    // 往阻塞队列中添加消息
    public void notifyConsume(String queueName) throws InterruptedException {
        tokenQueue.put(queueName);
    }
    // 增加订阅
    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
       // 先找到对应的队列
        MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);
        if (queue == null){
            throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);
        }
        ConsumEnv consumEnv = new ConsumEnv(consumerTag,queueName,autoAck,consumer);
        synchronized (queue){
            queue.addConsumEnv(consumEnv);
            // 如果当前队列中已经有了一些消息了, 需要立即就消费掉.
            int n = virtuaHost.getMemoryDataCenter().getMessageCount(queueName);
            for (int i = 0; i < n; i++) {
                // 这个方法调用一次就消费一条消息.
                consumeMessage(queue);
            }
        }
    }
    private void consumeMessage(MSGQueue queue) {
        // 1. 按照轮询的方式, 找个消费者出来.
        ConsumEnv luckyDog = queue.chooseConsumEnv();
        if (luckyDog == null){
            // 说明没有消费者
            return;
        }
        // 2. 从队列中取出一个消息
        Message message = virtuaHost.getMemoryDataCenter().pollMessage(queue.getName());
        if (message == null){
            // 说明没有消息,不能消费
            return;
        }
        // 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.
        workPool.submit(()->{
            try {
                //1,将消息放到待确认的集合中, 这个操作在回调函数之前
                virtuaHost.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);
                //2. 执行回调函数
                luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());
                System.out.println("[ConsumerManager] 消息被成功消费, queueName = "+queue.getName());
                //3. 如果是自动应答, 就可以之间删除消息
                // 如果是手动应答,  就先什么也不做
                if (luckyDog.isAutoAck()){
                    // 1删除硬盘上的消息
                    if (message.getDeliverMode() == 2){
                        virtuaHost.getDiskDataCenter().deleteMessage(queue,message);
                    }
                    //2 删除待确认的消息
                    virtuaHost.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
                    // 3 删除内存中的消息
                    virtuaHost.getMemoryDataCenter().removeMessage(message.getMessageId());
                    System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());
                }
            } catch (IOException | ClassNotFoundException | MqException e) {
                e.printStackTrace();
            }
        });
    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/852408.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第十六次CCF计算机软件能力认证

第一题&#xff1a;小中大 在数据分析中&#xff0c;最小值最大值以及中位数是常用的统计信息。 老师给了你 n 个整数组成的测量数据&#xff0c;保证有序&#xff08;可能为升序或降序)&#xff0c;可能存在重复的数据。 请统计出这组测量数据中的最大值、中位数以及最小值&am…

【数据结构】‘双向链表’冲冲冲

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …

【雕爷学编程】Arduino动手做(199)---8x32位WS2812B全彩屏模块6

37款传感器与模块的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&#x…

git 报错 protocol ‘https‘ is not supported解决

报错原因&#xff1a;选择不了其他分支代码&#xff0c;甚至都看不到其他分支&#xff0c;我这边解决了两次报错&#xff0c;情况如下&#xff1a; 第一种报错&#xff1a; idea中刷新分支报错如下&#xff1a; Fetch Failed protocol https is not supported 话不多说&#…

echats词云无法显示空白问题解决

<script src"https://cdn.jsdelivr.net/npm/echarts5.4.3/dist/echarts.min.js"></script> <script src"https://oisanjavax.github.io/echarts-wordcloud/dist/echarts-wordcloud.min.js"></script>展示为空 原因为导入版本过高不…

如何共享笔记本电脑网络

为了让手机连接笔记本网络&#xff0c;我们通常通过在笔记本上安装诸如WiFi共享精灵来实现&#xff0c;其实没有那么麻烦&#xff1a; 1、在电脑上打开“设置”—>选择“网络和Internet”—>选择“移动热点”&#xff08;如果系统是Windows 7或更低版本&#xff0c;则需要…

Ansible从入门到精通【六】

大家好&#xff0c;我是早九晚十二&#xff0c;目前是做运维相关的工作。写博客是为了积累&#xff0c;希望大家一起进步&#xff01; 我的主页&#xff1a;早九晚十二 专栏名称&#xff1a;Ansible从入门到精通 立志成为ansible大佬 ansible templates 模板&#xff08;templa…

机器学习复习题

1 单选题 ID3算法、C4.5算法、CART算法都是&#xff08; &#xff09;研究方向的算法。 A . 决策树 B. 随机森林 C. 人工神经网络 D. 贝叶斯学习 参考答案&#xff1a;A &#xff08; &#xff09;作为机器学习重要算法之一&#xff0c;是一种利用多个树分类器进行分类和预测…

【交换排序】冒泡排序 与 快速排序

交换排序基本思想&#xff1a; 所谓交换&#xff0c;就是根据序列中两个记录键值的比较结果来对换这两个记录在序列中的位置&#xff0c;交换排序的特点是&#xff1a;将键值较大的记录向序列的尾部移动&#xff0c;键值较小的记录向序列的前部移动。 目录 1.冒泡排序 2.快…

Day 76:通用BP神经网络 (3. 综合测试)

1 代码&#xff1a; package dl;import java.util.Arrays;/*** Full ANN with a number of layers.** author Fan Min minfanphd163.com.*/ public class FullAnn extends GeneralAnn {/*** The layers.*/AnnLayer[] layers;/*********************** The first constructor.*…

Python爬虫——selenium的安装和基本使用

1.什么是selenium&#xff1f; selenium是一个用于web应用程序测试的工具selenium测试直接运行在浏览器中&#xff0c;就像真正的用户在操作一样支持通过各种driver&#xff08;FrifoxDriver&#xff0c;ItenrentExploreDriver&#xff0c;OperaDriver&#xff0c;ChromeDrive…

【linux开发基础知识】

基础--图形界面 基础--终端 基础-用户/组 基础-目录 基础-文件 Shell-基础 Shell-参数 Shell-if Shell-while/for Shell-until/case Shell-函数 Shell-正则表达式 Shell-常用命令 开发-svn 开发-编辑(vim) 开发-编译 开发-调试 开发-部署 建议 ● http://linux.vbir…

海外应用商店关键词优化之如何提高应用可见度

应用商店关键词优化是确保用户可以在Google Play和Apple应用商店中找到我们应用的重要一步。我们需要选择正确的关键词&#xff0c;将它们放在正确的位置&#xff0c;并合并一系列不同的关键词&#xff0c;同时确保拥有良好的转化率。 1、了解当前的元数据是关键字选择的第一步…

selenium环境搭建

文章目录 1、下载谷歌浏览器2、下载谷歌驱动 1、下载谷歌浏览器 浏览器下载完成后&#xff0c;在任务管理器中禁止浏览器的自动更新。因为驱动版本必须和浏览器一致&#xff0c;如果浏览器更新了&#xff0c;驱动就用不起了。 2、下载谷歌驱动 谷歌驱动需要和谷歌浏览器版本…

Eigen在QT中的配置

Eigen简介 Eigen支持包括固定大小、任意大小的所有矩阵操作&#xff0c;甚至是稀疏矩阵&#xff1b;支持所有标准的数值类型&#xff0c;并且可以扩展为自定义的数值类型&#xff1b;支持多种矩阵分解及其几何特征的求解&#xff1b;它不支持的模块生态系统提供了许多专门的功能…

浏览器自动访问打开网址的软件小工具模拟测试

用微软框架写了个浏览器自动访问和打开网址的工具&#xff0c;进行测试模拟&#xff1a; 1、获取链接方式&#xff0c;可通过API接口返回JSON链接格式&#xff0c;也可以集成到文档手动录入链接由软件进行循环运行。 2、配置一些参数&#xff1a;数量、次数、时间间隔等 看下演…

常用dbGet命令

我正在「拾陆楼」和朋友们讨论有趣的话题&#xff0c;你⼀起来吧&#xff1f;拾陆楼知识星球入口 Examples of dbGet Command 1. Find the top name of the design dbGet top.name 2. Get all the attributes of a selected object dbGet selected.?? If you press tab key…

华为OD机试真题 Java 实现【判断字符串子序列】【2023 B卷 100分】,倒序遍历

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&#…

智汇云舟入选IDC《中国智慧城市数字孪生技术评估,2023》报告

8月7日&#xff0c;国际数据公司&#xff08;IDC&#xff09;发布了《中国智慧城市数字孪生技术评估&#xff0c;2023》报告。智汇云舟凭借在数字孪生领域的创新技术与产品&#xff0c;入选《2023中国数字孪生城市技术提供商图谱》。 报告通过公开征集的形式进行申报&am…

PCkit3如何刷固件

PCkit3如何刷固件 一般在MAPLAB安装时&#xff0c;在安装路径下面都会自带所有烧写器的固件包&#xff0c;找到对应的固件包利用MPLAB进行刷新就行了&#xff0c;具体步骤如下&#xff1a; 首先打开MPLAB软件&#xff0c;然后Programmer->Settings…然后点击configuration …