【梦辛工作室】java实现简易消息队列处理器 可分区 分区顺序消费MxMQ

news2024/9/26 5:25:08

大家好哇,又是我,梦辛工作室的灵,最近在巩固JUC并发包,突然想到如果自己的应用体量不大,但有需要消息队列来实现应用解耦和削峰来缓解服务器突增压力,比如抢票时,突然有比较用户同时抢票,就容易造成服务器同时连接数较多,拒绝其他用户的使用,就想着可以用消息队列来缓解,但是体量有不大,还没必要用MQ框架,那就直接自己写一个,这样,抢票请求来了就直接丢给队列处理器,然后再延迟查询处理结果,这样能减轻不少压力,老样子,先看下实现效果:
在这里插入图片描述
然后看下测试代码:

public class TestOptional {
    @Test
    public void doTestOptional(){

        MxMQ<Message> mxMQ = MxMQ.getInstance();

        /**
         * 添加分区 无消息一直阻塞
         */
        mxMQ.addPartion("test", new MQHandler<Message>() {
            @Override
            public void hand(Message message) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(message.getMessage());
            }
        });
        /**
         * 添加分区 无消息且等待时长超过20秒自动移除该分区
         */
        mxMQ.addPartionAutoRemove("test2", new MQHandler<Message>() {
            @Override
            public void hand(Message message) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(message.getMessage());
            }
        });

        for(int index = 0;index < 20;index++){
            int finalIndex = index;
            Message message = new Message("test_" + finalIndex);
            Message message2 = new Message("test2_" + finalIndex);
            try {
                mxMQ.sendMessage("test",message);
                mxMQ.sendMessage("test2",message2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        while (true){}

    }
}

还可以自定义不同分区不同的处理器,逻辑自由定义,下面看下几个关键类:
MxMQRunnable:

package com.mx.mxmq;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class MxMQRunnable<T> implements Runnable{

    boolean isRun = false;
    ArrayBlockingQueue<T> arrayBlockingQueue = null;
    MQHandler<T> mqHandler = null;
    int state = 0;

    MxMQ.QueueEmpty queueEmpty = null;

    public void setQueueEmpty(MxMQ.QueueEmpty queueEmpty) {
        this.queueEmpty = queueEmpty;
    }

    public MxMQRunnable(MQHandler<T> mqHandler){
        isRun = true;
        arrayBlockingQueue = new ArrayBlockingQueue(50);
        this.mqHandler = mqHandler;
        state = MxMQ.STATE_WAIT;
    }

    public MxMQRunnable(int number,MQHandler<T> mqHandler){
        arrayBlockingQueue = new ArrayBlockingQueue(number);
        this.mqHandler = mqHandler;
        state = MxMQ.STATE_WAIT;
    }

    public void setState(int state) {
        this.state = state;
    }

    @Override
    public void run() {
        while (isRun){
            try {
                T t = null;
                if(state == MxMQ.STATE_WAIT){
                   t = arrayBlockingQueue.take();
                } else {
                   t = arrayBlockingQueue.poll(20,TimeUnit.SECONDS);
                   if(t == null){
                       close();
                       queueEmpty.empty(this);
                       break;
                   }
                }
                if(mqHandler != null){
                    mqHandler.hand(t);
                }
            } catch (Exception e) {
                 e.printStackTrace();
            }
        }
    }

    public boolean sendMessage(T t) throws InterruptedException {
        return arrayBlockingQueue.offer(t,20, TimeUnit.SECONDS);
    }

    public boolean removeMessage(T t){
        return arrayBlockingQueue.remove(t);
    }

    public void close(){
        isRun = false;
    }

}

MxMQ:

package com.mx.mxmq;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MxMQ<T> {

    public static final int STATE_WAIT = 0;
    public static final int STATE_REMOVE = 1;

    private MxMQ(){
        executors = Executors.newCachedThreadPool();
        partionRunMap = new ConcurrentHashMap<>();
    }

    public static MxMQ getInstance() {
        if(instance == null){
            synchronized (MxMQ.class){
                if(instance == null){
                    instance = new MxMQ();
                }
            }
        }
        return instance;
    }

    private static volatile MxMQ instance = null;

    private ConcurrentHashMap<String,MxMQRunnable<T>> partionRunMap = null;

    private ExecutorService executors =  null;

    /**
     * 添加分区
     * @param partion 分区
     * @param mxHandler 处理器
     * @return
     */
    public boolean addPartion(String partion,MQHandler<T> mxHandler){
        if(partionRunMap.get(partion) == null){
            MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
            partionRunMap.put(partion,curMxMQRunnable);
            executors.execute(curMxMQRunnable);
            System.out.println(partion+"被添加");
            return true;
        }
        return false;
    }

    /**
     * 当分区里面没有任务超过20秒后就会自动移除分区
     * @param partion 分区
     * @param mxHandler 处理器
     * @return
     */
    public boolean addPartionAutoRemove(String partion,MQHandler<T> mxHandler){
        if(partionRunMap.get(partion) == null){
            MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
            curMxMQRunnable.setState(STATE_REMOVE);
            curMxMQRunnable.setQueueEmpty(new QueueEmpty() {
                @Override
                public void empty(MxMQRunnable mxMQRunnable) {
                    removePartion(partion);
                }
            });
            partionRunMap.put(partion,curMxMQRunnable);
            executors.execute(curMxMQRunnable);
            System.out.println(partion+"被添加");
            return true;
        }
        return false;
    }

    public boolean removePartion(String partion){
        if(partionRunMap.get(partion) != null){
            MxMQRunnable<T> remove = partionRunMap.remove(partion);
            remove.close();
            System.out.println(partion+"被移除");
            return true;
        }
        return false;
    }

    public boolean sendMessage(String partion,T t) throws InterruptedException {
        MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
        if(tMxMQRunnable != null){
            tMxMQRunnable.sendMessage(t);
            return true;
        }
        return false;
    }

    public boolean removeMessage(String partion,T t){
        MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
        if(tMxMQRunnable != null){
            return tMxMQRunnable.removeMessage(t);
        }
        return false;
    }

    interface QueueEmpty{
        void empty(MxMQRunnable mxMQRunnable);
    }

}

MQHandler:

package com.mx.mxmq;

public interface MQHandler<T> {
    void hand(T t);
}

Message:

package com.mx.mxmq;

public class Message {
    String message;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Message(String message){
        this.message = message;
    }
}

好了,收,大概就是这样子,主要应用场景为:需要轻量级的顺序队列消费 应用场景

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/712538.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++之GNU C的__attribute__((constructor))优先级使用(一百四十九)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

C++ 设计模式----“对象性能“模式

“对象性能”模式 面向对象很好地解决了“抽象”的问题&#xff0c;但是必不可免地要付出一定的代价。对于通常情况来讲&#xff0c;面向对象的成本大都可以忽略不计。但是某些情况&#xff0c;面向对象所带来的成本必须谨慎处理。 典型模式 Singleto Flyweighta Singlet…

51单片机笔记

51单片机笔记 一、编程区域 1.1 用户应用程序区&#xff08;AP区&#xff09; 是指用户自己编写的程序区 1.2 ISP监控程序区&#xff1a; ISP区是指芯片出厂时就已经固化在单片机内部的一段程序&#xff0c;STC单片机可以进行ISP串行下载程序&#xff0c;这就是因为芯片在出…

在Blender和Zbrush中创建激光指示器,新手硬表面建模码住!

大家好&#xff0c;今天云渲染小编给大家带来的分享是硬表面建模&#xff0c;CG艺术家Lyubov使用Blender和Zbrush创建激光指示器的幕后花絮。 介绍 我叫 Lyubov&#xff0c;来自俄罗斯圣彼得堡&#xff0c;是一名 3D 建模的初学者。虽然学习还不到一年&#xff0c;但是我对它…

etcd安装

ETCD安装 windows版本 下载 下载地址https://github.com/etcd-io/etcd/releases 安装 其实也不用安装&#xff0c;下载解压后&#xff0c;得到如下 选中etcd.exe&#xff0c;右键→属性→兼容性→以管理员身份运行此程序勾上&#xff0c;当然&#xff0c;每次运行时候右键…

kettle作业循环实现

kettle作业循环实现 使用kettle作业中的JavaScript实现作业循环&#xff0c;这里是固定循环10次 JavaScript2 parent_job.setVariable("max",10); parent_job.setVariable("count",1); true;检验字段的值 JavaScript var current parent_job.getVari…

MySQL事务+存储引擎

文章目录 MySQL事务存储引擎1 事务1.1 事务的概念1.2 事务的ACID特点1.3 导致问题1.4 事务控制语句1.4.1 查看修改隔离级别1.4.2 使用set设置控制事务1.4.3 查看事务自动提交功能 2 存储引擎2.1 存储格式2.2 常用存储引擎2.3 查看表使用的存储引擎2.4 修改存储引擎2.5 InnoDB行…

证券市场基本概念

证券市场基本概念 一、 指数分类1.1 什么是指数1.2 指数分类 二 、交易所及板块2.1 交易所及板块2.2 股票代码规则 三 、指数、ETF、股票的关系3.1 指数和股票的关系3.2 指数和指数ETF的关系3.3 ETF概念 四、 股票行业分类4.1 申万行业分类4.2 股票与申万行业分类的关系 五 、指…

idea - 插件之 codeium(安装篇)

idea - 插件之 codeium 插件官网地址&#xff1a;https://codeium.com/ Idea 版本&#xff1a;2021.3.2 关于插件作用就不多做介绍&#xff0c;接下来开始正文。 由于目前有很多博客文章没有对 Idea 安装进行详细的讲解和遇到问题的处理讲解&#xff0c;所以我经过踩坑后&am…

【C++学习】C++的动态内存管理 | new和delete的底层 | 初识模板

目录 1. C的动态内存管理 2. new和delete的底层 3. 定位new 4. new和malloc 的区别总结 5. 模板 写在最后&#xff1a; 1. C的动态内存管理 上一篇文章已经大致介绍完new和delete的用法&#xff0c; 以及C和C语言两种动态内存管理方式的区别&#xff0c;这里简单总结一…

【MMCV python安装指南】

MMCV python安装指南 MMCV 介绍安装教程1.系统环境2.python版本3.torch版本4.mmcv版本 安装示例 MMCV 介绍 mmcv 是用于计算机视觉研究的基础 Python 库&#xff0c;支持 MMLAB 中的许多研究项目&#xff0c;例如 MMDetection https://github.com/open-mmlab/mmdetection MMAct…

arc163 C 思维构造

题意&#xff1a;https://atcoder.jp/contests/arc163/tasks/arc163_c 思路&#xff1a;本题构造主要就是围绕 初始放入2&#xff0c;3&#xff0c;6&#xff0c;然后一直将一个拆解直到满足大小。 /*keep on going and never give up*/ #include<cstdio> #include<…

【择校】肠子悔青,录取分数断层,超过第二名44分,超过最后一名146分!

一、学校及专业介绍 中南民族大学&#xff08;South-Central Minzu University&#xff09;坐落于湖北省武汉市&#xff0c;中华人民共和国国家民族事务委员会直属高校&#xff0c;位列湖北省“国内一流大学建设高校”、“少数民族高层次骨干人才计划”资格高校、全国深化创新创…

15. python从入门到精通——Pygame游戏编程

目录 游戏的原理 安装Pygame Pygame常用模块 Pygame的基本使用 实例:制作一个跳跃的小球游戏&#xff0c;如果碰到窗口边缘会改变小球移动方向 实现步骤&#xff1a; 运行效果&#xff1a; 小球图片&#xff1a; python代码&#xff1a; 开发Flappy Bird游戏 …

生成古风少女图片【InsCode Stable Diffusion美图活动一期】

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​ 目录 写在前面 Stable Diffusion 模型在线使用地址&#xff1a; 工具介绍 一.如何使用S…

基于Java的酒店管理系统设计与实现(源码+文档+报告+任务书)

本系统采用Java语言进行开发&#xff0c;实现了跨平台的部署。使用了MySQL数据库进行数据存储&#xff0c;保证了数据的稳定性与可靠性。通过使用Spring框架&#xff0c;实现了对各个模块的解耦&#xff0c;使得系统更易于维护与升级。前端使用了Node.jsVue以提供易用、美观的用…

spring复习:(4)AbstractEnvironment

该类中指定了激活profile属性的名称(spring.profiles.active)&#xff0c;默认profile属性的名称(spring.profiles.default)&#xff0c;以及默认的profile的名字(default)。

springboot项目集成nacos配置中心踩坑

前提 在使用nacos的配置中心功能&#xff0c;发现在application.yml中配置地址后仍然读取不到配置中心地址&#xff0c;配置项和值都是正确的。但就是读不到&#xff0c;现在来分析下 配置项 spring:application:name: test-servicemain:allow-bean-definition-overriding: …

Ubuntu 20.04 LTS x86_64 SPEC CPU 2006 cpu2006-1.2.iso 测试笔记

环境 安装依赖项 sudo apt install gfortran 挂载iso sudo mkdir /mnt/cpu2006 sudo mount cpu2006-1.2.iso /mnt/cpu2006 安装 cd /mnt/cpu2006 sh install.sh -d /home/speccpu/cpu2006 SPEC CPU2006 InstallationTop of the CPU2006 tree is /mnt/cpu2006Installing F…

【Vue3】学习笔记-toRef

作用 创建一个ref对象&#xff0c;其value值指向另一个对象中的某个属性。 语法 const nametoRef(person,‘name’) 应用 要将响应式对象中的某个属性单独提供给外部使用时 #扩展 toRefs与toRef功能一直&#xff0c;但可以批量创建多个ref对象,语法&#xff1a;toRefs(per…