redis实现分布式延时队列

news2024/12/23 16:56:42

文章目录

  • 延时队列简介
  • 应用场景
  • 案例:
  • 考虑:
  • 实现:
    • 整体思路:
    • 具体实现
      • 生产者
      • 消费者
    • 运行结果
  • redis分布式延时队列优势
  • redis分布式延时队列劣势

延时队列简介

延时队列是一种特殊的消息队列,它允许将消息在一定的延迟时间后再进行消费。延时队列的主要特点是可以延迟消息的处理时间,以满足定时任务或者定时事件的需求。

总之,延时队列通过延迟消息的消费时间,提供了一种方便、可靠的方式来处理定时任务和定时事件。它在分布式系统中具有重要的作用,能够提高系统的可靠性和性能。

延时队列的实现方式可以有多种,本文介绍一种redis实现的分布式延时队列。

应用场景

  • 定时任务:可以将需要在特定时间执行的任务封装为延时消息,通过延时队列来触发任务的执行。

  • 订单超时处理:可以将订单消息发送到延时队列中,并设置订单的超时时间,超过时间后,消费者从队列中获取到超时的订单消息,进行相应的处理。

  • 消息重试机制:当某个消息处理失败时,可以将该消息发送到延时队列中,并设置一定的重试时间,超过时间后再次尝试处理。

案例:

12306火车票购买,抢了订单后,45分钟没有支付,自动取消订单

考虑:

数据持久化:redis是支持的,可以使用rdb,也可以使用aof

有序存储:因为只要最小的没过期,后面的肯定就没过期,这样的话检查最小的节点就行了,考虑使用redis中的zset结构

高可用:考虑哨兵或者cluster

高伸缩:因为12306用户量非常大,可能导致redis中存储的任务空间非常大,所以考虑扩展节点,从这个角度来说,使用cluster集群模式,哨兵只有一个节点即主节点写数据。

实现:

整体思路:

  • 生产消费者模型:因为12306的用户量非常大,所以考虑生产者和消费者有多个节点;
  • 采用cluster模式实现高可用以及高伸缩性
  • 采用zset存储延时任务(zadd key score member,score表示时间);
  • 为了让数据均匀分布在cluster集群中的多个主节点中:构建多个zset,每个zset对应一个消费者,生产者随机向某个zset中生产数据。

具体实现

生产者

需要安装hiredis-cluster集群,安装编译如下:

git clone https://github.com/Nordix/hiredis-cluster.git
cd hiredis-cluster
mkdir build
cd build
cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -
DENABLE_SSL=ON ..
make
sudo make install
sudo ldconfig

需要安装libevent库,最后编译时执行gcc producer.c -o producer -levent -lhiredis_cluster -lhiredis -lhiredis_ssl编译生产者可执行程序

#include <hiredis_cluster/adapters/libevent.h>
#include <hiredis_cluster/hircluster.h>
#include <event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <sys/time.h>

int64_t g_taskid = 0;

#define MAX_KEY 10

static int64_t hi_msec_now() {
    int64_t msec;
    struct timeval now;
    int status;
    status = gettimeofday(&now, NULL);
    if (status < 0) {
        return -1;
    }
    msec = (int64_t)now.tv_sec * 1000LL + (int64_t)(now.tv_usec / 1000LL);
    return msec;
}

static int _vscnprintf(char *buf, size_t size, const char *fmt, va_list args) {
    int n;
    n = vsnprintf(buf, size, fmt, args);
    if (n <= 0) {
        return 0;
    }
    if (n <= (int)size) {
        return n;
    }
    return (int)(size-1);
}

static int _scnprintf(char *buf, size_t size, const char *fmt, ...) {
    va_list args;
    int n;
    va_start(args, fmt);
    n = _vscnprintf(buf, size, fmt, args);
    va_end(args);
    return n;
}

void connectCallback(const redisAsyncContext *ac, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", ac->errstr);
        return;
    }
    printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}

void disconnectCallback(const redisAsyncContext *ac, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", ac->errstr);
        return;
    }
    printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}

void addTaskCallback(redisClusterAsyncContext *cc, void *r, void *privdata) {
    redisReply *reply = (redisReply *)r;
    if (reply == NULL) {
        if (cc->errstr) {
            printf("errstr: %s\n", cc->errstr);
        }
        return;
    }

    int64_t now = hi_msec_now() / 10;
    printf("add task success reply: %lld now=%ld\n", reply->integer, now);
}

int addTask(redisClusterAsyncContext *cc, char *desc) {
    /* 转化为厘米秒 */
    int64_t now = hi_msec_now() / 10;
    g_taskid++;
    
    /* key */
    char key[256] = {0};
	// 为了让数据均匀分布在cluster集群中的多个主节点中:// 构建多个zset,每个zset对应一个消费者,生产者随机向某个zset中生产数据,
	// 生产者可以有很多个,只需要保证向task_group:0-task_group:9中均匀的生产数据即可
    int len = _scnprintf(key, 255, "task_group:%ld", g_taskid % MAX_KEY);
    key[len] = '\0';
    
    /* member */
    char mem[1024] = {0};
    len = _scnprintf(mem, 1023, "task:%ld:%s", g_taskid, desc);
    mem[len] = '\0';
    
    int status;
	// 为每一个任务延时5秒中去处理
    status = redisClusterAsyncCommand(cc, addTaskCallback, "",
                                      "zadd %s %ld %s", key, now+500, mem);

    printf("redisClusterAsyncCommand:zadd %s %ld %s\n", key, now+500, mem);
    if (status != REDIS_OK) {
        printf("error: err=%d errstr=%s\n", cc->err, cc->errstr);
    }
    return 0;
}

void stdio_callback(struct bufferevent *bev, void *arg) {
    redisClusterAsyncContext *cc = (redisClusterAsyncContext *)arg;
    struct evbuffer *evbuf = bufferevent_get_input(bev);
    char *msg = evbuffer_readln(evbuf, NULL, EVBUFFER_EOL_LF);
    if (!msg) return;

    if (strcmp(msg, "quit") == 0) {
        printf("safe exit!!!\n");
        exit(0);
        return;
    }
    if (strlen(msg) > 1024-5-13-1) {
        printf("[err]msg is too long, try again...\n");
        return;
    }

    addTask(cc, msg);
    printf("stdio read the data: %s\n", msg);
}

int main(int argc, char **argv) {
    printf("Connecting...\n");
	// 连接cluster集群,可以从cluster集群中任意一个节点出发连接集群
    redisClusterAsyncContext *cc =
        redisClusterAsyncConnect("127.0.0.1:7006", HIRCLUSTER_FLAG_NULL);
    printf("redisClusterAsyncContext...\n");
    if (cc && cc->err) {
        printf("Error: %s\n", cc->errstr);
        return 1;
    }

    struct event_base *base = event_base_new();
    redisClusterLibeventAttach(cc, base);
    redisClusterAsyncSetConnectCallback(cc, connectCallback);
    redisClusterAsyncSetDisconnectCallback(cc, disconnectCallback);

    // nodeIterator ni;
    // initNodeIterator(&ni, cc->cc);
    // cluster_node *node;
    // while ((node = nodeNext(&ni)) != NULL) {
    //     printf("node %s:%d role:%d pad:%d\n", node->host, node->port, node->role, node->pad);
    // }
    struct bufferevent *ioev = bufferevent_socket_new(base, 0, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(ioev, stdio_callback, NULL, NULL, cc);
    bufferevent_enable(ioev, EV_READ | EV_PERSIST);

    printf("Dispatch..\n");
    event_base_dispatch(base);

    printf("Done..\n");
    redisClusterAsyncFree(cc);
    event_base_free(base);
    return 0;
}

// 需要安装 hiredis-cluster libevent
// gcc producer.c -o producer -levent -lhiredis_cluster -lhiredis -lhiredis_ssl

说明:

这里构建了10个zset,分别是task_group:0,task_group:1,…,task_group:9作为10个zset的key,zset的数据其实就代表着消费者的数量,通常消费者的功能是一摸一样的,生产者就不管你有多少个了,只需要将任务均匀的打散在不同的zset中就行了(具体实现可以搞一个全局的id,每一次添加任务时id++,然后再对zset个数10取模,最终可以得到0-9之间的一个数,然后再与task_group拼接,这样就可以将任务均匀的打散在不同的zset中)。

消费者

消费者是采用skynet+lua脚本实现的,每个消费者会不断的去检查redis中的任务有没有过期,如果过期,就取出来删除(这里只是demo,只是打印之后删除任务)

local skynet = require "skynet"

local function table_dump( object )
    if type(object) == 'table' then
        local s = '{ '
        for k,v in pairs(object) do
            if type(k) ~= 'number' then k = string.format("%q", k) end
            s = s .. '['..k..'] = ' .. table_dump(v) .. ','
        end
        return s .. '} '
    elseif type(object) == 'function' then
        return tostring(object)
    elseif type(object) == 'string' then
        return string.format("%q", object)
    else
        return tostring(object)
    end
end

local mode, key = ...
if mode == "slave" then
    local rediscluster = require "skynet.db.redis.cluster"
    local function onmessage(data,channel,pchannel)
        print("onmessage",data,channel,pchannel)
    end
    skynet.start(function ()
        local db = rediscluster.new({
                {host="127.0.0.1",port=7001},
            },
            {read_slave=true,auth=nil,db=0,},
            onmessage
        )
        assert(db, "redis-cluster startup error")
        skynet.fork(function ()
            while true do
                local res = db:zrange(key, 0, 0, "withscores")
                if not next(res) then
                    skynet.sleep(50)
                else
                    local expire = tonumber(res[2])
                    local now = skynet.time()*100
                    if now >= expire then
                        print(("%s is comsumed:expire_time:%d"):format(res[1], expire))
                        db:zrem(key, res[1])
                    else
                        skynet.sleep(10)
                    end
                end
            end
        end)
    end)

else
    skynet.start(function ()	-- // 启动10个程序,并把"slave"传入mode,task_group:i传入到key中,即每个程序只消费一个
        for i=0,9 do
            skynet.newservice(SERVICE_NAME, "slave", "task_group:"..i)

运行结果

在这里插入图片描述

redis分布式延时队列优势

1.Redis zset支持高性能的 score 排序。

2.Redis是在内存上进行操作的,速度非常快。

3.Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。

4.Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性

redis分布式延时队列劣势

使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题:

  • 没有重试机制 - 处理消息出现异常没有重试机制, 这些需要自己去实现, 包括重试次数的实现等;
  • 没有 ACK 机制 - 例如在获取消息并已经删除了消息情况下, 正在处理消息的时候客户端崩溃了, 这条正在处理的这些消息就会丢失, MQ 是需要明确的返回一个值给 MQ 才会认为这个消息是被正确的消费了。

总结:如果对消息可靠性要求较高, 推荐使用 MQ 来实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1147905.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【系统安全】等保二级、三级自查对比

目录 1、定义&#xff1a;二级 VS 三级 2、要求&#xff1a;二级 VS 三级 3、适用企业、单位 1、定义&#xff1a;二级 VS 三级 等保&#xff1a;全称网络安全等级保护&#xff0c;是指对信息系统进行分级&#xff0c;对不同等级的信息系统进行不同的安全保护和监管的工作。…

仿写知乎日报第二周

新学到的 新学到了WKWebView&#xff1a; WKWebView是是苹果推崇的一个新的类&#xff0c;它用于将一个网页嵌套在软件里。这里我是将点击cell后的内容中放入WKWebView对象。WKWebView的使用&#xff1a; 首先&#xff0c;要导入这个类&#xff1a; #import <WebKit/WebK…

3.7 移动端TB(D)R架构基础

一、各类电子设备功耗对比 桌面级主流性能平台&#xff0c;功耗一般为300W&#xff08;R7/I7X60级别显卡&#xff09;&#xff0c;游戏主机150-200W入门和旗舰游戏本平台功耗为100W主流笔记本为50-60W&#xff0c;超极本为15-25W&#xff0c;旗舰平板为8-15W旗舰手机为5-8W&am…

《python语言程序设计》(2018版)第5章编程题 第41题第3次路过。总结一下。没有走完的路

这道题最大的需要就是能够进行两个数值的对比&#xff0c;同时还能让更多的数值依次进入到对比中。 这道题的解题版本 这个版本只是能统计出谁是最大数。但是无法统计最大数出现了多少次。 number "" count 0 data_number 0 while number ! 0:number eval(inpu…

No172.精选前端面试题,享受每天的挑战和学习

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

MinIO安装

Minio是一个开源的分布式对象存储服务器&#xff0c;它兼容Amazon S3服务接口。它可以用于构建私有云存储&#xff0c;为应用程序提供可扩展的对象存储功能。 安装 docker安装 docker run -d -p 9000:9000 -p 50000:50000 --name minio \ -e "MINIO_ROOT_USERadminpili…

泰州市旅游景点门票预订管理系统 vue+uniapp微信小程序

本文从管理员、用户的功能要求出发&#xff0c;泰州市旅游景点管理小程序中的功能模块主要是实现用户、景点类型、景区信息、门票预定。经过认真细致的研究&#xff0c;精心准备和规划&#xff0c;最后测试成功&#xff0c;系统可以正常使用。分析功能调整与泰州市旅游景点管理…

Leetcode—275.H指数II【中等】

2023每日刷题&#xff08;十三&#xff09; Leetcode—275.H指数II 算法思想 实现代码 int minValue(int a, int b) {return a < b ? a : b; }int hIndex(int* citations, int citationsSize){int left, right;left 0;right citationsSize - 1;while(left < right) …

AD教程(一)工程组成及创建

AD教程&#xff08;一&#xff09;工程组成及创建 工程组成 原理图库 绘制电阻模型、芯片模型、电容模型等&#xff0c;即将元件模型绘制出来。 原理图 将绘制的原件模型放置到原理图中&#xff0c;然后再添加连接的导线、网络标号。器件和器件之间的连接关系&#xff0c;在原…

粤嵌实训医疗项目--day04(Vue + SpringBoot)

往期回顾 粤嵌实训医疗项目--day03&#xff08;Vue SpringBoot&#xff09;-CSDN博客粤嵌实训医疗项目day02&#xff08;Vue SpringBoot&#xff09;-CSDN博客粤嵌实训医疗项目--day01&#xff08;VueSpringBoot&#xff09;-CSDN博客 目录 一、用户详细信息查询 (查询信息与…

业务设计——用户敏感信息展示脱敏及其反脱敏

业务需求 将用户敏感信息脱敏展示到前端是出于保护用户隐私和信息安全的考虑。 敏感信息包括但不限于手机号码、身份证号、银行卡号等&#xff0c;这些信息泄露可能导致用户个人信息的滥用、身份盗用等严重问题。脱敏是一种常用的保护用户隐私的方式&#xff0c;它的目的是减少…

华为云资源搭建过程

网络搭建 EIP&#xff1a; 弹性EIP&#xff0c;支持IPv4和IPv6。 弹性公网IP&#xff08;Elastic IP&#xff09;提供独立的公网IP资源&#xff0c;包括公网IP地址与公网出口带宽服务。可以与弹性云服务器、裸金属服务器、虚拟IP、弹性负载均衡、NAT网关等资源灵活地绑定及解绑…

FL Studio21.2.0.3421最新汉化破解版中文解锁下载完整版本

音乐在人们心中的地位日益增高&#xff0c;近几年音乐选秀的节目更是层出不穷&#xff0c;喜爱音乐&#xff0c;创作音乐的朋友们也是越来越多&#xff0c;音乐的类型有很多&#xff0c;好比古典&#xff0c;流行&#xff0c;摇滚等等。对新手友好程度基本上在首位&#xff0c;…

Affinity Photo 2.2.1 高端专业Mac PS修图软件

Affinity Photo Mac中文版是一款面向专业摄影师和其他视觉艺术家的专业图像处理软件&#xff0c;拥有众多专业高端功能&#xff0c;如Raw处理、PSD导入和导出、16位通道的编辑和ICC色彩管理以及兼容大量图片格式。是现在最快、最顺、最精准的专业修图软件。Affinity Photo Mac是…

微信小程序vue+uniapp旅游景点门票预订系统 名胜风景推荐系统

与此同时越来越多的旅游公司建立了自己的基于微信小程序的名胜风景推荐平台&#xff0c;管理员通过网站可以添加用户、景点分类、景点信息、在线预订、最新推荐&#xff0c;用户可以对景点信息进行在线预订&#xff0c;以及开展电子商务等。互联网的世界里蕴藏无限生机&#xf…

calloc、malloc、realloc函数的区别及用法

三者都是分配内存&#xff0c;都是stdlib.h库里的函数&#xff0c;但是也存在一些差异。 &#xff08;1&#xff09;malloc函数。其原型void *malloc(unsigned int num_bytes)&#xff1b; num_byte为要申请的空间大小&#xff0c;需要我们手动的去计算&#xff0c;如int *p …

VBA还能这么玩?Word文档一秒自动排版

Hello,各位小伙伴们大家好呀,真的是好久不见了。旅行者1号也才用20个小时回传数据到地球。距离 Yogurt 上次正儿八经的教程推文已经快 700 天了,时间过得真快呀,感谢各位的不离不弃。 这两天群里比较活跃,便上去看看发生了啥事儿,不看不知道,一看,这推文的素材不就来…

「实验记录」CS144 Lab0 networking warmup

文章目录 一、Motivation二、SolutionsS1 - Writing webgetS2 - An in-memory reliable byte stream 三、Results四、Source 一、Motivation 第一个小测试 webget 是想让我们体验并模拟一下在浏览器中键入 URL 后获得远程服务器传来的内容&#xff0c;这并没有太大的难度&…

Spring cloud教程Gateway服务网关

Spring cloud教程|Gateway服务网关 写在前面的话&#xff1a; 本笔记在参考网上视频以及博客的基础上&#xff0c;只做个人学习笔记&#xff0c;如有侵权&#xff0c;请联系删除&#xff0c;谢谢&#xff01; Spring Cloud Gateway 是 Spring Cloud 的一个全新项目&#xff0c;…

数字频带传输——二进制数字调制及MATLAB仿真

文章目录 前言一、OOK1、表达式2、功率谱密度3、调制框图 二、2PSK1、表达式2、功率谱密度 三、2FSK1、表达式 四、MATLAB 仿真1、MATLAB 源码2、仿真及结果①、输入信号及频谱图②、2ASK 调制③、2PSK 调制④、2FSK 调制⑤、随机相位 2FSK 调制 五、资源自取 前言 数字频带信…