技术学习-消息队列

news2025/2/28 20:15:38

什么是消息队列

可以简单理解为存放消息的队列,数据结构模型和队列一样,都是先进先出。主要用不同线程(Thread)/进程(Process)

为什么需要消息队列

(1)不同进程之间传递消息是,因为进程的耦合度高,改动一个进程,引发必须修改另一个进程。为了隔离两个进程,因此需要在进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,不会影响另一个。
(2)在两个不停运行的线程中,一个线程对另一个线程,进行通信,往往容易因为消息量大,而导致另一个线程处理不过来,导致请求丢失或是请求顺序错乱。当出现多生产多消费者的情况时,这种消息错乱的问题会更加明显。因此,我们需要一个消息队列维护消息的规范。

消息队列的应用场景

(1)上游不关心下游处理结果的场景
这一类场景就是类似请求生产者-消费者的场景。这种场景往往就是生产者(只需要生产,将消息给到消费者,打印机拿到消息后只需要消费,并且生产者不需要等待回应结果。举例说,A线程是负责对数据进行计算分析,B线程是负责对A分析的结果进行打印输出的,A+B线程完成一个数据计算分析后,打印输出的任务。A完成一次计算分析任务是不需要去等待B的回应,A只管计算分析,B只管拿到A的结果输出。这种情况下去使用消息队列是比较合适的,A只需要把分析结果写入消息队列,B只需要拿消息队列里面的消息进行打印,这样两者是互不影响的,也能解决并发引起的顺序和丢失问题。
值得注意的是,消息队列传递的是线程的输入输出数据,而且上游线程不需要关注输出数据。

(2)基于数据驱动的场景
这两类任务往往是多个线程配合完成一个工作,而且每一个线程的处理时长都比较久,外部传进来一个data数据包,A线程进行操作1,B线程要等待A线程处理后的data数据包完成操作2,C线程也同样需要B线程的处理结果进行操作3。
这种场景最常用的解决方案,往往是根据经验统计每一个任务的执行时间,然后人工定制一个排班时间表,然后通过时间表执行,这样往往会出现执行时长变化导致的任务无法正常执行
这时候消息队列主要的任务不是传输输入输出数据,传输的是信号,是通知下游线程执行的信号。同样这种场景也是和场景(1)一样,生产者不需要等待回应结果。那为什么不用场景(1)那样去直接用消息队列去传输处理完的data包呢?根据不同的业务可能会有以下原因:a,data数据包体积大,用在消息队列上面传输影响存储性能,这种情况往往是data数据包在一个地方因为某些原因不方便转移,而ABC线程是分别取到那里进行处理的。

(3)上游关注下游执行结果,但是执行时间很长。
有时候上游需要关注执行结果,但是执行结果时间很长(典型的就是调用离线处理,或是跨公网调用),也常用回调网关+MQ来解耦。
例如:微信支付,跨公网调用微信接口,执行时间会比较长,但调用方式又非常关注执行结果

这时候的解决方案流程就是
1)调用方直接跨公网调用微信接口
2)微信返回调用成功,此时不代表返回成功
3)微信执行完成后,回调统一网关
4)网关将返回结果通知MQ
5)请求方收到通知结果

这里需要注意的是,不应该由回调网关调用上游来通知结果,如果是这样的话,每次新增调用方,回调网关都需要修改代码,仍然会反向依赖,使用回调网关+MQ的方案,新增任何对微信支付的调用,都不需要修改代码,因为微信支付直接把调用接口的行为结果返回给你,告诉调用者已经成功调用了支付功能,但是微信后台可能也存在一个消息队列去接收这些支出请求去处理,处理完成了才会通知网关回调通知,至于支付有没有真正成功,就由网关这边进行进行MQ通知。所以这种场景,上游得到的执行结果也不是真正的执行结果,只是通知主线程可以进行下一个异步处理的通知,因为也会有调用支付成功的返回,但是又因为内部系统原因,没有支付成功的情况。所以,消息队列并不适合上游需要实时处理结果的场景

总结:
什么时候使用MQ?
1)数据驱动的任务依赖
2)上游不关心多下游执行结果
3)异步返回执行时间长

其他思考

MQ的缺点

(1)系统复杂程度提升,逻辑变得复杂。同时引入MQ组件,将会对原来的系统设计,通信方式产生大变化。同时系统的改造需要花费大量的资源,同时也要承担引入新组件带来的系统不稳定性。
(2)系统可用性降低,引入mq组件后,通信模型将会发生改变,消息的传递将会依赖于MQ,假如MQ挂了,将会直接导致系统崩溃,这应该怎么处理
(3)一致性问题。一个任务需要ABCD四个系统轮流去处理,A 系统处理完了直接返回成功了,大家都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?这样就会导致数据不一致。

流量整形,削峰填谷

(1)为什么要流量整形?
流量冲击(高并发情况下带来的突发流量):上游调用方(push)不限速。很快就会把下游压垮,这种场景往往发生在上游逻辑容易突然暴增,而同时下游的操作非常多,完成一次需要较多的时间。例如:上游发起下单操作,下游完成秒杀业务逻辑(库存检查,库存加锁,余额检查,订单生成,余额扣减,库存扣减,生成流水,余额解锁,库存解锁),上游业务简单,每秒发生10000个请求,下游业务复杂,每秒只能处理2000个请求,上游不限速下单,会导致下游系统处理不了庞大信息量,引发雪崩。
(2)常见的优化方案
a.上游队列缓冲(put阻塞),限速发送
b.下游队列缓冲(定时或者批量拉取pull,可以起到削平流量),限速执行

ps:如果上流发送流量过大,mq提供拉模式可以起到下游自我保护的作用,会不会引起mq队列的消息堆积呢?
答:下游MQ-client拉取消息,消息接收方能够批量获取消息,需要下游消息接收方进行优化(提供批处理,比如批量写),否则整体吞吐量低,也会导致mq堆积。

高并发系统保护策略

1.缓存
缓存不单单能够提升系统访问速度,也是保护数据库,保护系统的有效方式。大型网站一般主要是”读“,数据先进数据库,然后再走缓存。在大型”写“系统中,先走缓存,在走数据库,对数据库进行批处理操作。(积累一些数据,批量写入;内存里面的缓存队列,mq像是缓存队列)
2.降级
根据服务器压力,指定某些服务或者页面的级别(需求不同,降级策略也不同),为此释放服务器资源,保证核心任务的正常运行
根据服务方式:可以拒接服务,可以延迟服务,也有时候可以随机服务。
根据服务范围:可以砍掉某个功能,可以砍掉某些模块
如果不是核心链路,那么就把这个服务降级掉。打个比喻,现在的APP都讲究千人千面,拿到数据后,做个性化排序展示,如果在大流量下,这个排序就可以降级掉!
3.限流
限制系统的输入和输出流量以达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,一旦达到阈值,就需要限制流量。比如:延迟处理,拒绝处理,部分处理等等

实际场景中常用的限流策略:
(1)nginx前端限流,
按照一定的规则如ip,账号,调用逻辑等在nginx层面做限流
(2)业务应用系统限流
(a)客户端限流(验证码;获取动态请求路径pathvariable,达到接口地址隐藏的效果)
(b)服务端限流(redis限速器,延迟队列)
(3)数据库限流
数据库连接池化,mysql(如max_connections),redis(如tcp-backlog)都会有类似的限制连接数的配置

限流算法

(1)计算器

计算器是一种比较简单的限流算法,用途比较广泛,在接口层面,很多地方使用这种方式限流。在一段时间内,进行计数,与阈值进行比较,到了时间临界点,将计数清0.

示例代码:

public class CustomerDemo {
    private static long timeStamp = System.currentTimeMillis();
    //限制为1s内 限制100请求
    private static long limitCount = 100;
    private static long interval = 1000;

    //请求数
    private static long reqCount = 0;

    public static boolean grant(){ //判断是否需要限流
        long now = System.currentTimeMillis();
        if (now < timeStamp+interval){ //当前时间在1s内
            if (reqCount < limitCount){//当前请求的数量不超过最大限制数,允许请求
                ++reqCount;
                return true;
            }else{//否则限流
                return false;
            }
        }else{//超过了当前时间,计数器清0 
            timeStamp = System.currentTimeMillis();
            reqCount = 0;
            return  false;
        }
    }

    public static void main(String[] args) {
        for (int i =0;i<500;i++){//模拟500个订单同时请求
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if(grant()){
                        System.out.println("执行业务逻辑");
                    }else{
                        System.out.println("限流");
                    }
                }
            }).start();
        }
    }
}

这里需要注意的是,存在一个时间临界点缺陷的问题。举个栗子,在12:01:00到12:01:58这段时间内没有用户请求,然后在12:01:59这一瞬时发出100个请求,OK,然后在12:02:00这一瞬时又发出了100个请求。这里你应该能感受到,在这个临界点可能会承受恶意用户的大量请求,甚至超出系统预期的承受。

滑动窗口

由于计数器存在临界点缺陷,后来出现了 滑动窗口算法来解决
滑动窗口的意思是说把固定时间片,进行划分,并且随着时间的流逝,进行移动,这样就巧妙的避开了计数器的临界点问题。也就是说这些固定数量的可以移动的格子,将会进行计数判断阈值,因此格子的数量影响这滑动窗口算法的精度。
在这里插入图片描述

漏桶算法

虽然滑动窗口有效避免了时间临界点的问题,但是依然有时间片的概念,而漏桶算法在这方面比滑动窗口而言更加先进。
思想:有一个固定的桶,进水的速率是不确定的,但是出水的速率是恒定的,当水满的时候会溢出。

代码实现

public class LeakBucketDemo {
    //时间刻度
    private static long time = System.currentTimeMillis();
    //桶里面现在的水
    private static int water = 0;
    //桶的大小
    private static int size = 10;
    //出水率
    private static int rate = 3;

    public static boolean grant(){
        //计算出水数量
        long now = System.currentTimeMillis();
        int out = (int)((now - time)/700*rate);//出水量
        //漏水后的剩余
        water = Math.max(0,water-out);//避免剩余水量为负数
        time = now;
        if((water+1)<size){
            ++water;
            return true;
        }else{
            return  false;
        }
    }

    public static void main(String[] args) {
        for (int i =0;i<500;i++){ //模拟500个订单同时请求
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if(grant()){
                        System.out.println("执行业务逻辑");
                    }else{
                        System.out.println("限流");
                    }
                }
            }).start();
        }
    }
}

令牌桶算法

注意到,漏桶的出水速度是恒定的,那么意味着如果瞬时大流量的话,将有大部分请求被丢弃掉(也就是所谓的溢出)。为了解决这个问题,令牌桶进行了算法的改进
思想:令牌桶算法和漏桶算法不同的是,令牌桶是将以恒定的速度生成令牌放入到桶中,然后让请求过来了,拿到了令牌就请求,否则就丢弃;而漏桶是把请求放入漏桶,恒定速度去处理。这种方式可以避免了瞬间大流量而丢掉大量的请求。

在这里插入图片描述

生成令牌的速度是恒定的,而请求去拿令牌是没有速度限制的。这意味,面对瞬时大流量,该算法可以在短时间内请求拿到大量令牌,而且拿令牌的过程并不是消耗很大的事情。(有一点生产令牌,消费令牌的意味) 不论是对于令牌桶拿不到令牌被拒绝,还是漏桶的水满了溢出,都是为了保证大部分流量的正常使用,而牺牲掉了少部分流量,这是合理的,如果因为极少部分流量需要保证的话,那么就可能导致系统达到极限而挂掉,得不偿失。

代码实现

public class TokenBucketDemo {
    private static long time = System.currentTimeMillis();
    private static int createTokenRate = 3;
    private static  int size = 10;

    //当前令牌数量
    private static int token = 0;

    public static boolean grant(){
        long now  = System.currentTimeMillis();
        //当前时间需要生产令牌数
        int in = (int)((now-time)/50*createTokenRate);
        token = Math.min(size,token+in);
        time = now;
        if (token>0){
            --token;
            return true;
        }else{
            return false;
        }
    }

    public static void main(String[] args) {
        for (int i =0;i<500;i++){ //模拟500个订单同时请求
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if(grant()){
                        System.out.println("执行业务逻辑");
                    }else{
                        System.out.println("限流");
                    }
                }
            }).start();
        }
    }
}

常见消息队列对比

在这里插入图片描述

C++实现简单消息队列

在目前的工作中也用到多线程技术去完成一些相关的业务,其中利用消息队列在多线程之间通信中起到了非常重要的作用。因为我现在工作主要是用c++,所以下面附上自己在工作中要到c++简单实现的消息队列代码
MessageQueue.h

#ifndef MESSAGEQUEUE_H
#define MESSAGEQUEUE_H

#include <queue>
#include <map>
#include <string>
#include <pthread.h>

#define MSG_QUIT                    0

using namespace std;

typedef struct {
	int code;
	void* data;
} MSG;

typedef struct {
	pthread_mutex_t qmutex;
	pthread_cond_t qready;
} CondMutex;

typedef struct {
	int id;
	string strJson;
} CONFIG_PACKET;

class MessageQueue
{
public:
	MessageQueue();
	virtual ~MessageQueue();

	bool push(int code, void* data, int max_msg = 100);
	bool pop(MSG** pmsg);
	void wait(void);
	void wait(MSG** pmsg);

protected:
	CondMutex m_mutex;
	queue<MSG*> m_msgs;
	map<int, int> msg_cnt;
};

#endif // MESSAGEQUEUE_H

MessageQueue.cpp

#include "MessageQueue.h"

MessageQueue::MessageQueue()
{
	pthread_mutex_init(&m_mutex.qmutex, NULL);
	pthread_cond_init(&m_mutex.qready, NULL);
}

MessageQueue::~MessageQueue()
{
	pthread_mutex_destroy(&m_mutex.qmutex);
	pthread_cond_destroy(&m_mutex.qready);
}

bool MessageQueue::push(int code, void* data, int max_msg)
{
	bool bSuccess = false;
	pthread_mutex_lock(&m_mutex.qmutex);
	int& cnt = msg_cnt[code];
	if (cnt < max_msg) {
		MSG* pmsg = new MSG;
		pmsg->code = code;
		pmsg->data = data;
		m_msgs.push(pmsg);
		cnt++;
		pthread_cond_signal(&m_mutex.qready);
		bSuccess = true;
	}
	pthread_mutex_unlock(&m_mutex.qmutex);
	return bSuccess;
}

bool MessageQueue::pop(MSG** pmsg)
{
	*pmsg = NULL;
	pthread_mutex_lock(&m_mutex.qmutex);

	if (!m_msgs.empty()) {
		*pmsg = m_msgs.front();
		msg_cnt[(*pmsg)->code]--;
		m_msgs.pop();
	}
	pthread_mutex_unlock(&m_mutex.qmutex);
	return *pmsg != NULL;
}

void MessageQueue::wait(void)
{
	pthread_mutex_lock(&m_mutex.qmutex);

	while (m_msgs.empty()) {
		pthread_cond_wait(&m_mutex.qready, &m_mutex.qmutex);
	}
	pthread_mutex_unlock(&m_mutex.qmutex);
}

void MessageQueue::wait(MSG** pmsg)
{
	*pmsg = NULL;
	pthread_mutex_lock(&m_mutex.qmutex);

	while (m_msgs.empty()) {
		pthread_cond_wait(&m_mutex.qready, &m_mutex.qmutex);
	}

	*pmsg = m_msgs.front();
	msg_cnt[(*pmsg)->code]--;
	m_msgs.pop();
	pthread_mutex_unlock(&m_mutex.qmutex);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/361170.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SVPWM改进的永磁同步电机直接转矩控制二更

导读&#xff1a;本期对基于SVPWM的永磁同步电机直接转矩控制进行全面的分析和仿真搭建。之后与传统的DTC进行比较&#xff0c;凸显基于SVPWM改进的DTC方法的有效性。如果需要文中的仿真模型&#xff0c;关注微信公众号&#xff1a;浅谈电机控制&#xff0c;留言获取。一、 传统…

Long型数据后端查询结果为null,返回前端显示-1,使用@JsonSerialize注解

使用场景 在开发中&#xff0c;我们将对象序列化为JSON传输给前端&#xff0c;有时候我们的某个或者某些字段需要特殊处理&#xff0c;比如我们有一个日期字段&#xff0c;我们希望当日期为NULL时给前端不返回NULL而返回为未完成等信息&#xff0c;或者我们有一些状态字段&…

达梦数据库(DM8)集成使用 Geoserver(2.22.2) 以及其他对应版本详解

达梦数据库&#xff08;DM8&#xff09;集成使用 Geoserver&#xff08;2.22.2&#xff09; 以及其他对应版本详解系统环境版本Geoserver 驱动对应版本达梦 8 集成 Geoserver 过程试错过程问题总结项目需要国产化&#xff0c;选择使用达梦数据库&#xff0c;在技术测试阶段&…

K-近邻算法(KNN)

K-近邻算法&#xff08;KNN&#xff09; K nearest neighbour 0、导引 如何进行电影分类 众所周知&#xff0c;电影可以按照题材分类&#xff0c;然而题材本身是如何定义的?由谁来判定某部电影属于哪 个题材?也就是说同一题材的电影具有哪些公共特征?这些都是在进行电影…

美创科技荣获“PostgreSQL中国最佳运维服务商”

近日&#xff0c;由中国开源软件推进联盟PostgreSQL分会&中科院软件所&CSDN联合举办主办的“中国PostgreSQL数据库生态大会”在北京中科院软件所隆重召开。美创科技受邀参加&#xff0c;三位顶级数据库技术专家亮相&#xff0c;分享美创科技在数据库内核技术、PostgreS…

IMX6ULL学习笔记(17)——工程管理

一、简介 之前我们把所有源码文件放在一个文件夹下。 这样做存在两个主要问题&#xff0c;第一&#xff0c;代码存放混乱不易阅读。第二&#xff0c;程序可移植性差。如果工程源文件达到几十、甚至数百个的时候&#xff0c;这样一股脑全部放到根目录下就会使工程显得混乱不堪。…

STM32开发(13)----获取唯一设备标识符UID

获取唯一设备标识符UID前言一、什么事UID二、实验过程1.CubeMx配置2.代码实现3.实验结果总结前言 这一章节介绍如何获取STM32芯片中的唯一的ID号的两种方法。 一、什么事UID 在许多项目中&#xff0c;识别设备是必要的。从简单的设备描述到更复杂的设备&#xff0c;如 USB 串…

Open3d入门

目录 点云数据 1 主成分分析 1.1 Method 1.2 Results 2 表面法线估计 2.1 Method 2.2 Results 3 体素网格下采样 3.1 Method 3.2 Results 点云数据 常用数据下载&#xff08;免积分&#xff09; 1 主成分分析 1.1 Method 对点云进行主成分分析&#xff08;PCA&…

[5/101] 101次面试之经典面试题

目录 01、什么是黑盒测试? 02、为什么要做黑盒测试? 03、你在软件生命周期中的哪些测试阶段用到过黑盒测试? 04、什么是白盒测试&#xff1f; 05、白盒测试与黑盒测试有什么区别&#xff1f; 06、为什么要对程序进行单元测试&#xff1f; 07、由谁来做单元测试&#…

RK3566添加湿度传感器以及浅析hal层

RK3566添加一款温湿度传感器gxht3x.挂在i2c总线下。驱动部分就不多做解析。大致流程硬件接好i2c线以及vcc gnd。后看数据手册。初始化寄存器&#xff0c;然后要读数据的话读那个寄存器&#xff0c;读出来的数据要做一个转化,然后实现open read write ioctl函数就行了。本文主要…

【LeetCode】剑指 Offer 10- Ⅲ. 矩形覆盖 p79 -- Java Version

题目链接&#xff1a;无 1. 题目介绍&#xff08;10- Ⅲ. 矩形覆盖 &#xff09; 我们可以用2x1的小矩形横着或者竖着去覆盖更大的矩形。请问用n个2x1的小矩形无重叠地覆盖一个2xn的大矩形&#xff0c;总共有多少种方法&#xff1f; 【测试用例】&#xff1a; 示例 1&#xff…

[学习笔记]Rocket.Chat业务数据备份

Rocket.Chat 的业务数据主要存储于mongodb数据库的rocketchat库中&#xff0c;聊天中通过发送文件功能产生的文件储存于/app/uploads中&#xff08;文件方式设置为"FileSystem"&#xff09;&#xff0c;因此在对Rocket.Chat做数据移动或备份主要分为两步&#xff0c;…

JavaScript高级程序设计读书分享之4章——4.2执行上下文与作用域

JavaScript高级程序设计(第4版)读书分享笔记记录 适用于刚入门前端的同志 执行上下文 变量或函数的上下文决定 了它们可以访问哪些数据&#xff0c;以及它们的行为。在浏览器中&#xff0c;全局上下文就是我们常说的 window 对象&#xff08;第 12 章会详细介绍&#xff09;&am…

【技术分享】Web自动化之Selenium安装

Web 应用程序的验收测试常常涉及一些手工任务&#xff0c;例如打开一个浏览器&#xff0c;并执行一个测试用例中所描述的操作。但是手工执行的任务容易出现人为的错误&#xff0c;也比较费时间。因此&#xff0c;将这些任务自动化&#xff0c;就可以消除人为因素。Selenium 可以…

Js中blob、file、FormData、DataView、TypedArray

引言 最开始我们看网页时&#xff0c;对网页的需求不高&#xff0c;显示点文字&#xff0c;显示点图片就很满足了&#xff0c;所以对于浏览器而言其操作的数据其实并不多&#xff08;比如读取本地图片显示出来&#xff0c;或上传图片到服务器&#xff09;&#xff0c;那么浏览器…

网络安全之认识挖矿木马

一、什么是挖矿木马&#xff1f; 比特币是以区块链技术为基础的虚拟加密货币&#xff0c;比特币具有匿名性和难以追踪的特点&#xff0c;经过十余年的发展&#xff0c;已成为网络黑产最爱使用的交易媒介。大多数勒索病毒在加密受害者数据后&#xff0c;会勒索代价高昂的比特币…

GEE学习笔记 六十三:新的地图图层ui.Map.CloudStorageLayer

在GEE中导出数据有一种方式是直接导出地图到Google Cloud Storage中&#xff0c;也就是Export.map.toCloudStorage(xxx)&#xff0c;这种方式是将我们计算生成影像导出成为静态瓦片的格式存放在Google Cloud Storage中。我们可以在其他的前端程序比如OpenLayer、Mapbox GL JS等…

Python开发-学生管理系统

文章目录1、需求分析2、系统设计3、系统开发必备4、主函数设计5、 学生信息维护模块设计6、 查询/统计模块设计7、排序模块设计8、 项目打包1、需求分析 学生管理系统应具备的功能&#xff1a; ●添加学生及成绩信息 ●将学生信息保存到文件中 ●修改和删除学生信息 ●查询学生…

Docker之路(1.Docker概述、组成以及特点)

1.docker为什么会出现&#xff1f; 一款产品或者项目来说&#xff0c;一般有三个环境&#xff0c;日常/测试环境、预发环境、正式/线上环境 这么多环境&#xff0c;对其环境的配置是十分麻烦的&#xff0c;每一个机器都要部署环境&#xff0c;有的会有集群Redis、Hadoop等&…

最全es6数组方法

1.arr.push()从后面添加元素,返回值为添加完后的数组的长度 let arr [1,2,3,4,5] console.log(arr.push(5)) // 6 console.log(arr) // [1,2,3,4,5,5]2.arr.pop()从后面删除元素,只能是一个&#xff0c;返回值是删除的元素 let arr [1,2,3,4,5] console.log(arr.pop())//5 …