MQ功能实现的具体代码:
import { v4 as uuidx } from 'uuid';
import emitter from '@/utils/mitt'
class Message {
// 过期时间,0表示马上就消费
exp: number;
// 消费标识,避免重复消费
tag : string;
// 消息体
body : any;
constructor( exp: number , body : any ) {
if (exp == 0) {
this.exp = 0;
}else {
this.exp = Date.now().valueOf() + exp;
}
this.body = body;
this.tag = uuidx();
}
}
export default class TsMQ {
static tsMQ : TsMQ;
repository : Map<string, any[]>;
/*
获取单列的MQ对象
*/
static getInstance() : TsMQ {
if(this.tsMQ == null) {
this.tsMQ = new TsMQ();
}
return this.tsMQ;
}
constructor() {
this.repository = new Map<string,any[]>();
// 把消息放入Mitt进行推送
setInterval(()=> {
Array.from(this.repository.keys()).forEach( key => {
let poll = this.repository.get(key) as any[];
if(poll.length > 0) {
poll.forEach( item => {
if (item.exp == 0 || item.exp <= Date.now().valueOf() - 100) {
emitter.emit(key,item.body);
let single : any[] = this.repository.get(key) as any[];
single = single.filter(dispose => {
return dispose.tag !== item.tag;
});
this.repository.set(key,single);
}
});
}
});
},100)
}
/*
* @describe 放消息入队列
* @param queue : string 队列名称
* @param exp : number 消息消费时间
* @param message : any 消息体
*/
pushMessage( queue : string , exp : number, message : any ) {
if(this.repository.has(queue)) {
let single : any[] = this.repository.get(queue) as any[];
single.push(new Message(exp,message));
this.repository.set(queue,single);
}else {
let temp = [];
temp.push(new Message(exp,message));
this.repository.set(queue,temp);
}
}
/*
* 直接获取消息,可以配合做本地缓存
* 就要去掉constructor的轮询推送
*/
takeMessage( queue : string ) : any {
let single : any[] = this.repository.get(queue) as any[];
if( single && single.length > 0) {
let message = single.shift();
this.repository.set(queue,single);
return message;
}else {
return '队列没有消息!';
}
}
}
提示:其中需要用到三方的uuid和mitt ,然后要消息持久化可以用到pinia 来让消息持久化,本案列没有采用持久化
使用方式:
<script setup lang="ts">
import TsMQ from '@/utils/TsMQ'
import emitter from '@/utils/mitt'
let tsMQ : TsMQ = TsMQ.getInstance();
//订阅消息
emitter.on("HelloWord",e => {
console.log(`收到消息:${JSON.stringify(e)}\n消息时间:${new Date().toLocaleString()}`)
})
//投送消息
function tsMQs() {
console.log(`M2投递时间:${new Date().toLocaleString()}`)
tsMQ.pushMessage("HelloWord",1000 * 20,{ name : 'M2', second:20 });
tsMQ.pushMessage("HelloWord",1000 * 3,{ name : 'M1', second:3 });
// MQ功能实现的具体代码,提示:其中需要用到三方的uuid和mitt,然后要消息持久化可以用到pinia来让消息持久化,本案列没有采用持久化
}
function takeMQs() {
console.log(tsMQ.takeMessage('1'));
}
</script>
<template>
<div id="app" style="display: flex;flex-direction: column;justify-content: center;height: 100%;width: 100%;margin: 0;padding: 0">
<span @click="tsMQs">MQ投送</span>
<span @click="takeMQs">MQ获取</span>
</div>
</template>
<style scoped>
</style>
效果:
总结:我们可以看到我们实现了这个功能 ,其中可以用来作为缓存使用,同理可写