Redis的Stream 和 实现队列的方式【List、SortedSet、发布订阅、Stream、Java】

news2025/1/12 12:22:15

Redis队列与Stream、Redis 6多线程详解

  • Redis队列与Stream
    • Stream总述
      • 常用操作命令
        • 生产端
        • 消费端
          • 单消费者
          • 消费组
            • 消息消费
  • Redis队列几种实现的总结
    • 基于List的 LPUSH+BRPOP 的实现
    • 基于Sorted-Set的实现
    • PUB/SUB,订阅/发布模式
    • 基于Stream类型的实现
    • 与Java的集成
  • 消息队列问题
    • Stream 消息太多怎么办?(会限制长度 干掉老信息)
    • 消息如果忘记 ACK 会怎样?(堆积在PEL中)
    • PEL 如何避免消息丢失?
    • 死信问题
    • Stream 的高可用
    • 分区 Partition
  • Stream小结

Redis队列与Stream

Redis5.0 最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化的消息队列,作者声明Redis Stream地借鉴了 Kafka 的设计。

Stream总述

在这里插入图片描述
Redis Stream 的结构如上图所示,每一个Stream都有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。==消息是持久化的,Redis 重启后,内容还在。 ==

==每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用xadd指令追加消息时自动创建。 ==

每个 Stream 都可以挂多个消费组,每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息 ID 开始消费这个 ID 用来初始化last_delivered_id变量。

每个消费组 (Consumer Group) 的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到

同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者有一个组内唯一名称。

消费者 (Consumer) 内部会有个状态变量pending_ids,它记录了当前已经被客户端读取,但是还没有 ack的消息。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为PEL,也就是Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

消息 ID 的形式是timestampInMillis-sequence,例如1527846880572-5,它表示当前的消息在毫米时间戳1527846880572时产生,并且是该毫秒内产生的第 5 条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。
消息内容就是键值对,形如 hash 结构的键值对,这没什么特别之处。

常用操作命令

生产端

xadd 追加消息
xdel 删除消息,这里的删除仅仅是设置了标志位,不会实际删除消息。
xrange 获取消息列表,会自动过滤已经删除的消息
xlen 消息长度
del 删除 Stream

xadd streamtest * name mark age 18
在这里插入图片描述
streamtest 表示当前这个队列的名字,也就是我们一般意义上Redis中的key,* 号表示服务器自动生成 ID,后面顺序跟着==“name mark age 18”,是我们存入当前streamtest 这个队列的消息,采用的也是 key/value的存储形式==

返回值1626705954593-0 则是生成的消息 ID,由两部分组成:时间戳-序号。时间戳时毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型。序号是在这个毫秒时间点内的消息序号。它也是个64位整型。

为了保证消息是有序的,因此Redis生成的ID是单调递增有序的。由于ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis的每个Stream类型数据都维护一个latest_generated_id属性,用于记录最后一个消息的ID。若发现当前时间戳退后(小于latest_generated_id所记录的),则采用时间戳不变而序号递增的方案来作为新消息ID(这也是序号为什么使用int64的原因,保证有足够多的的序号),从而保证ID的单调递增性质。
如果不是非常特别的需求,强烈建议使用Redis的方案生成消息ID,因为这种时间戳+序号的单调递增的ID方案,几乎可以满足全部的需求,但ID是支持自定义的。
在这里插入图片描述

xrange streamtest - +
其中-表示最小值 , + 表示最大值
在这里插入图片描述
或者我们可以指定消息 ID 的列表:
在这里插入图片描述
xdel streamtest 1626706380924-0
xlen streamtest
在这里插入图片描述
del streamtest 删除整个 Stream
在这里插入图片描述

消费端
单消费者

虽然Stream中有消费者组的概念,但是可以在不定义消费组的情况下进行 Stream 消息的独立消费,当 Stream 没有新消息时,甚至可以阻塞等待。Redis 设计了一个单独的消费指令xread,可以将 Stream 当成普通的消息队列 (list) 来使用。使用 xread 时,我们可以完全忽略消费组 (Consumer Group) 的存在,就好比 Stream 就是一个普通的列表 (list)

xread count 1 streams stream2 0-0
“count 1”表示从 Stream 读取1条消息,缺省当然是头部,“streams”可以理解为Redis关键字,“stream2”指明了要读取的队列名称,“0-0”指从头开始
在这里插入图片描述
xread count 2 streams stream2 1626710882927-0
也可以指定从streams的消息Id开始(不包括命令中的消息id)
在这里插入图片描述
xread count 1 streams stream2 $
$代表从尾部读取,上面的意思就是从尾部读取最新的一条消息,此时默认不返回任何消息
在这里插入图片描述
所以最好以阻塞的方式读取尾部最新的一条消息,直到新的消息的到来
xread block 0 count 1 streams stream2 $
block后面的数字代表阻塞时间,单位毫秒
在这里插入图片描述
此时我们新开一个客户端,往stream2中写入一条消息
在这里插入图片描述
可以看到阻塞解除了,返回了新的消息内容,而且还显示了一个等待时间,这里我们等待了127.87s
在这里插入图片描述
一般来说客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。

消费组

创建消费组
Stream 通过xgroup create指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化last_delivered_id变量。
xgroup create stream2 cg1 0-0
“stream2”指明了要读取的队列名称,“cg1”表示消费组的名称,“0-0”表示从头开始消费
在这里插入图片描述
xgroup create stream2 cg2 $
$ 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略
在这里插入图片描述
现在我们可以用xinfo命令来看看stream2的情况:
xinfo stream stream2
在这里插入图片描述
xinfo groups stream2
在这里插入图片描述

消息消费

有了消费组,自然还需要消费者,Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。
它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。
xreadgroup GROUP cg1 c1 count 1 streams stream2 >
“GROUP”属于关键字,“cg1”是消费组名称,“c1”是消费者名称,“count 1”指明了消费数量,> 号表示从当前消费组的 last_delivered_id 后面开始读,每当消费者读取一条消息,last_delivered_id 变量就会前进

在这里插入图片描述
前面我们定义cg1的时候是从头开始消费的,自然就获得Stream2中第一条消息
再执行一次上面的命令
在这里插入图片描述
自然就读取到了下条消息。
我们将Stream2中的消息读取完
xreadgroup GROUP cg1 c1 count 2 streams stream2 >
很自然就没有消息可读了, xreadgroup GROUP cg1 c1 count 1 streams stream2 >
在这里插入图片描述
然后设置阻塞等待
xreadgroup GROUP cg1 c1 block 0 count 1 streams stream2 >
在这里插入图片描述
我们新开一个客户端,发送消息到stream2
xadd stream2 * name lison score 98
在这里插入图片描述
回到原来的客户端,发现阻塞解除,收到新消息
在这里插入图片描述
我们来观察一下观察消费组状态
在这里插入图片描述
如果同一个消费组有多个消费者,我们还可以通过 xinfo consumers 指令观察每个消费者的状态
xinfo consumers stream2 cg1
在这里插入图片描述
可以看到目前c1这个消费者有 5 条待ACK的消息,空闲了441340 ms 没有读取消息。
如果我们确认一条消息
xack stream2 cg1 1626751586744-0
就可以看到待确认消息变成了4条
在这里插入图片描述
xack允许带多个消息id,比如
在这里插入图片描述
同时Stream还提供了命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息,每个Pending的消息有4个属性:
消息ID
所属消费者
IDLE,已读取时长
delivery counter,消息被读取次数
命令XCLAIM用以进行消息转移的操作,将某个消息转移到自己的Pending列表中。需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。
更多的Redis的Stream命令请大家参考Redis官方文档:
https://redis.io/topics/streams-intro
https://redis.io/commands
同时Redis文档中,在每个命令的详情页右边会显示“Related commands”,可以通过这个列表快速了解相关的命令和进入具体命令的详情页。

Redis队列几种实现的总结

基于List的 LPUSH+BRPOP 的实现

足够简单,消费消息延迟几乎为零,但是需要处理空闲连接的问题。
如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常,所以在编写客户端消费者的时候要小心,如果捕获到异常需要重试。
其他缺点包括:
做消费者确认ACK麻烦,不能保证消费者消费消息后是否成功处理的问题(宕机或处理异常等),通常需要维护一个Pending列表,保证消息处理确认;不能做广播模式,如pub/sub,消息发布/订阅模型;不能重复消费,一旦消费就会被删除;不支持分组消费。

@Component
public class ListVer{
    public final static String RS_LIST_MQ_NS = "rlm:";

    @Autowired
    private JedisPool jedisPool;

    /*消费者接受消息*/
    public List<String> get(String key) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.brpop(0,RS_LIST_MQ_NS +key);
        } catch (Exception e) {
            throw new RuntimeException("接受消息失败!");
        } finally {
            jedis.close();
        }
    }

    /*生产者发送消息*/
    public void put(String key, String message) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.lpush(RS_LIST_MQ_NS+key,message);
        } catch (Exception e) {
            throw new RuntimeException("发送消息失败!");
        } finally {
            jedis.close();
        }
    }
}

基于Sorted-Set的实现

多用来实现延迟队列,当然也可以实现有序的普通的消息队列,但是消费者无法阻塞的获取消息,只能轮询,不允许重复消息。

@Component
public class ZSetVer {
    public final static String RS_ZS_MQ_NS = "rzsm:";

    @Autowired
    private JedisPool jedisPool;

    /*生产者,消息的发送,实际生产中,相关参数,
    比如订单信息,过期时间等应该传入,可以考虑将订单信息json化存入redis*/
    public void producer() {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            for (int i = 0; i < 5; i++) {
                String order_id = "000000000"+i;
                double score = System.currentTimeMillis()+(i*1000);
                jedis.zadd(RS_ZS_MQ_NS+"orderId",score, order_id);
                System.out.println("生产订单: " + order_id + " 当前时间:"
                        + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                System.out.println((3 + i) + "秒后执行");
            }
        } catch (Exception e) {
            throw new RuntimeException("生产消息失败!");
        } finally {
            jedis.close();
        }

    }

    //消费者,取订单
    public void consumerDelayMessage() {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            while (true) {
                Set<String> order = jedis.zrangeByScore(RS_ZS_MQ_NS+"orderId", 0,
                        System.currentTimeMillis(), 0,1);
                if (order == null || order.isEmpty()) {
                    System.out.println("当前没有等待的任务");
                    try {
                        TimeUnit.MICROSECONDS.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    continue;
                }
                String s = order.iterator().next();

                if (jedis.zrem(RS_ZS_MQ_NS+"orderId", s)>0) {
                    /*业务处理*/
                    System.out.println(s);
                }
            }

        } catch (Exception e) {
            throw new RuntimeException("消费消息失败!");
        } finally {
            jedis.close();
        }
    }

PUB/SUB,订阅/发布模式

优点:
典型的广播模式,一个消息可以发布到多个消费者;多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息;消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息。
缺点:
消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回;不能保证每个消费者接收的时间是一致的;若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产远大于消费速度时;可见,Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务。

@Component
public class PSVer extends JedisPubSub {
    public final static String RS_PS_MQ_NS = "rpsm:";

    @Autowired
    private JedisPool jedisPool;

    @Override
    public void onMessage(String channel, String message) {

        System.out.println("Accept "+channel+" message:"+message);
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("Subscribe "+channel+" count:"+subscribedChannels);
    }

    public void pub(String channel, String message) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.publish(RS_PS_MQ_NS+channel,message);
            System.out.println("发布消息到"+RS_PS_MQ_NS+channel+" message="+message);
        } catch (Exception e) {
            throw new RuntimeException("发布消息失败!");
        } finally {
            jedis.close();
        }
    }

    public void sub(String... channels) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(this,channels);
        } catch (Exception e) {
            throw new RuntimeException("订阅频道失败!");
        } finally {
            jedis.close();
        }
    }

}

基于Stream类型的实现

基本上已经有了一个消息中间件的雏形,可以考虑在生产过程中使用,当然真正要在生产中应用,要做的事情还很多,比如消息队列的管理和监控就需要花大力气去实现,而专业消息队列都已经自带或者存在着很好的第三方方案和插件。

@Component
public class StreamVer {
    public final static String RS_STREAM_MQ_NS = "rsm:";

    @Autowired
    private JedisPool jedisPool;

    /**
     * 发布消息到Stream
     * @param key
     * @param message
     * @return
     */
    public StreamEntryID produce(String key,Map<String,String> message){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            StreamEntryID id = jedis.xadd(RS_STREAM_MQ_NS+key, StreamEntryID.NEW_ENTRY, message);
            System.out.println("发布消息到"+RS_STREAM_MQ_NS+key+" 返回消息id="+id.toString());
            return id;
        } catch (Exception e) {
            throw new RuntimeException("发布消息失败!");
        } finally {
            jedis.close();
        }
    }


    /**
     * 创建消费群组,消费群组不可重复创建
     * @param key
     * @param groupName
     * @param lastDeliveredId
     */
    public void createCustomGroup(String key, String groupName, String lastDeliveredId){
        Jedis jedis = null;
        try {
            StreamEntryID id = null;
            if (lastDeliveredId==null){
                lastDeliveredId = "0-0";
            }
            id = new StreamEntryID(lastDeliveredId);
            jedis = jedisPool.getResource();
            /*makeStream表示没有时是否自动创建stream,但是如果有,再自动创建会异常*/
            jedis.xgroupCreate(RS_STREAM_MQ_NS+key,groupName,id,false);
            System.out.println("创建消费群组成功:"+groupName);
        } catch (Exception e) {
            throw new RuntimeException("创建消费群组失败!",e);
        } finally {
            jedis.close();
        }
    }


    /**
     * 消息消费
     * @param key
     * @param customerName
     * @param groupName
     * @return
     */
    public List<Map.Entry<String, List<StreamEntry>>> consume(String key, String customerName,String groupName){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            /*消息消费时的参数*/
            XReadGroupParams xReadGroupParams = new XReadGroupParams().block(0).count(1);
            Map<String, StreamEntryID> streams = new HashMap<>();
            streams.put(RS_STREAM_MQ_NS+key,StreamEntryID.UNRECEIVED_ENTRY);
            List<Map.Entry<String, List<StreamEntry>>> result
                    = jedis.xreadGroup(groupName, customerName, xReadGroupParams, streams);
            System.out.println(groupName+"从"+RS_STREAM_MQ_NS+key+"接受消息, 返回消息:"+result);
            return result;
        } catch (Exception e) {
            throw new RuntimeException("消息消费失败!",e);
        } finally {
            jedis.close();
        }
    }

    /**
     * 消息确认
     * @param key
     * @param groupName
     * @param msgId
     */
    public void ackMsg(String key, String groupName,StreamEntryID msgId){
        if (msgId==null) throw new RuntimeException("msgId为空!");
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            System.out.println(jedis.xack(key,groupName,msgId));
            System.out.println(RS_STREAM_MQ_NS+key+",消费群组"+groupName+" 消息已确认");
        } catch (Exception e) {
            throw new RuntimeException("消息确认失败!",e);
        } finally {
            jedis.close();
        }
    }

    /*
    检查消费者群组是否存在,辅助方法
    * */
    public boolean checkGroup(String key, String groupName){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            List<StreamGroupInfo> xinfoGroupResult = jedis.xinfoGroup(RS_STREAM_MQ_NS+key);
            for(StreamGroupInfo groupinfo : xinfoGroupResult) {
                if(groupName.equals(groupinfo.getName())) return true;
            }
            return false;
        } catch (Exception e) {
            throw new RuntimeException("检查消费群组失败!",e);
        } finally {
            jedis.close();
        }
    }

    public final static int MQ_INFO_CONSUMER = 1;
    public final static int MQ_INFO_GROUP = 2;
    public final static int MQ_INFO_STREAM = 0;
    /**
     * 消息队列信息查看
     * @param type
     */
    public void MqInfo(int type,String key, String groupName){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            if(type==MQ_INFO_CONSUMER){
                List<StreamConsumersInfo> xinfoConsumersResult = jedis.xinfoConsumers(RS_STREAM_MQ_NS+key, groupName);
                System.out.println(RS_STREAM_MQ_NS+key+" 消费者信息:" + xinfoConsumersResult);
                for( StreamConsumersInfo consumersinfo : xinfoConsumersResult) {
                    System.out.println("-ConsumerInfo:" + consumersinfo.getConsumerInfo());
                    System.out.println("--Name:" + consumersinfo.getName());
                    System.out.println("--Pending:" + consumersinfo.getPending());
                    System.out.println("--Idle:" + consumersinfo.getIdle());
                }
            }else if (type==MQ_INFO_GROUP){
                List<StreamGroupInfo> xinfoGroupResult = jedis.xinfoGroup(RS_STREAM_MQ_NS+key);
                System.out.println(RS_STREAM_MQ_NS+key+"消费者群组信息:" + xinfoGroupResult);
                for(StreamGroupInfo groupinfo : xinfoGroupResult) {
                    System.out.println("-GroupInfo:" + groupinfo.getGroupInfo());
                    System.out.println("--Name:" + groupinfo.getName());
                    System.out.println("--Consumers:" + groupinfo.getConsumers());
                    System.out.println("--Pending:" + groupinfo.getPending());
                    System.out.println("--LastDeliveredId:" + groupinfo.getLastDeliveredId());
                }
            }else{
                StreamInfo xinfoStreamResult = jedis.xinfoStream(RS_STREAM_MQ_NS+key);
                System.out.println(RS_STREAM_MQ_NS+key+"队列信息:" + xinfoStreamResult);
                System.out.println("-StreamInfo:" + xinfoStreamResult.getStreamInfo());
                System.out.println("--Length:" + xinfoStreamResult.getLength());
                System.out.println("--RadixTreeKeys:" + xinfoStreamResult.getRadixTreeKeys());
                System.out.println("--RadixTreeNodes():" + xinfoStreamResult.getRadixTreeNodes());
                System.out.println("--Groups:" + xinfoStreamResult.getGroups());
                System.out.println("--LastGeneratedId:" + xinfoStreamResult.getLastGeneratedId());
                System.out.println("--FirstEntry:" + xinfoStreamResult.getFirstEntry());
                System.out.println("--LastEntry:" + xinfoStreamResult.getLastEntry());
            }
        } catch (Exception e) {
            throw new RuntimeException("消息队列信息检索失败!",e);
        } finally {
            jedis.close();
        }
    }

}

与Java的集成

可以参见cn.tuling.redis.redismq.StreamVer

消息队列问题

从我们上面对Stream的使用表明,Stream已经具备了一个消息队列的基本要素,生产者API、消费者API,消息Broker,消息的确认机制等等,所以在使用消息中间件中产生的问题,这里一样也会遇到。

Stream 消息太多怎么办?(会限制长度 干掉老信息)

要是消息积累太多,Stream 的链表岂不是很长,内容会不会爆掉?xdel 指令又不会删除消息,它只是给消息做了个标志位。
Redis 自然考虑到了这一点,所以它提供了一个定长 Stream 功能。在 xadd 的指令提供一个定长长度 maxlen,就可以将老的消息干掉,确保最多不超过指定长度。

消息如果忘记 ACK 会怎样?(堆积在PEL中)

Stream 在每个消费者结构中保存了正在处理中的消息 ID 列表 PEL,如果消费者收到了消息处理完了但是没有回复 ack,就会导致 PEL 列表不断增长,如果有很多消费组的话,那么这个 PEL 占用的内存就会放大。所以消息要尽可能的快速消费并确认。

PEL 如何避免消息丢失?

在客户端消费者读取 Stream 消息时,Redis 服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是 PEL 里已经保存了发出去的消息 ID。待客户端重新连上之后,可以再次收到 PEL 中的消息 ID 列表。不过此时 xreadgroup 的起始消息 ID 不能为参数>,而必须是任意有效的消息 ID,一般将参数设为 0-0,表示读取所有的 PEL 消息以及自last_delivered_id之后的新消息。

死信问题

如果某个消息,不能被消费者处理,也就是不能被XACK,这是要长时间处于Pending列表中,即使被反复的转移给各个消费者也是如此。此时该消息的delivery counter(通过XPENDING可以查询到)就会累加,当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用XDEL语法,注意,这个命令并没有删除Pending中的消息,因此查看Pending,消息还会在,可以在执行执行XDEL之后,XACK这个消息标识其处理完毕。

Stream 的高可用

Stream 的高可用是建立主从复制基础上的,它和其它数据结构的复制机制没有区别,也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的。不过鉴于 Redis 的指令复制是异步的,在 failover 发生时,Redis 可能会丢失极小部分数据,这点 Redis 的其它数据结构也是一样的。
分区 Partition

分区 Partition

Redis 的服务器没有原生支持分区能力,如果想要使用分区,那就需要分配多个 Stream,然后在客户端使用一定的策略来生产消息到不同的 Stream。

Stream小结

Stream 的消费模型借鉴了 Kafka 的消费分组的概念,它弥补了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同于 kafka,Kafka 的消息可以分 partition,而 Stream 不行。如果非要分 parition 的话,得在客户端做,提供不同的 Stream 名称,对消息进行 hash 取模来选择往哪个 Stream 里塞。
总的来说,如果是中小项目和企业,在工作中已经使用了Redis,在业务量不是很大,而又需要消息中间件功能的情况下,可以考虑使用Redis的Stream功能。但是如果并发量很高,资源足够支持下,还是以专业的消息中间件,比如RocketMQ、Kafka等来支持业务更好。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1609475.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL高级(索引-性能分析-profile)

show profiles 能够在做SQL优化时帮助我们了解时间都耗费到哪去了。通过 have_profiling参数&#xff0c;能够看到当前MySQL 是否支持 profile 操作&#xff1a; select have_profiling 默认 profiling 是关闭的 select profiling; 可以通过 set 语句在 session / global 级…

【python】flask操作数据库工具SQLAlchemy,详细用法和应用实战

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

萌新_1 环境安装(基于QQNT框架 Python Flask)

遇到问题加QQ群聊 群主在线解答 点击加入群聊【星辰开发】 一&#xff1a;安装QQ 目前为开发&#xff0c;推荐都安装到一台电脑上 直接安装到本地windows电脑&#xff0c; 优点方便开发 一键安装 Windows 用户一键安装方案 https://github.com/super1207/install_llob/rel…

从零到一品牌电商私域流量代运营规划方案

【干货资料持续更新&#xff0c;以防走丢】 从零到一品牌电商私域流量代运营规划方案 部分资料预览 资料部分是网络整理&#xff0c;仅供学习参考。 PPT共50页&#xff08;完整资料包含以下内容&#xff09; 目录 私域运营方案&#xff1a; 一、项目背景与目标 - 开创数智化…

kaggle电子邮件分类xgboost建模可视化模型评估混淆矩阵范例

目录 概述 依赖环境 代码解读 库的导入 数据读取 扇形图可视化统计 词云图可视化 分布条形图可视化 数据预处理 划分数据集 模型训练 模型预测和评估 ROC曲线评估 混淆矩阵评估 多维度交叉评估 配套源码和数据集 xgboost邮件分类配套数据集和源码下载地址 概述…

RK3568 学习笔记 : u-boot 千兆网络功能验证

前言 开发板型号&#xff1a; 【正点原子】 的 RK3568 开发板 使用 虚拟机 ubuntu 20.04 编译 RK3568 Linux SDK&#xff0c;生成镜像&#xff0c;烧写后&#xff0c;Linux 系统正常启动 开启后可以使用 CTRLC 进入 u-boot 本篇验证一下 u-boot 下网络功能 【正点原子】 rk…

OpenHarmony 视图缩放组件—subsampling-scale-image-view

简介 深度缩放视图&#xff0c;图像显示&#xff0c;手势平移缩放双击等 效果图&#xff08;旋转、缩放、平移&#xff09; 下载安装 ohpm install ohos/subsampling-scale-image-view OpenHarmony ohpm 环境配置等更多内容&#xff0c;请参考如何安装 OpenHarmony ohpm 包 使…

【Linux C | 多线程编程】线程同步 | 信号量(无名信号量) 及其使用例子

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; &#x1f923;本文内容&#x1f923;&a…

先撸清楚:并发/并行、单线程/多线程、同步/异步

前言 在编码的过程中经常会遇到并发/并行、同步/异步、单线程/多线程等术语&#xff0c;在分析JS setTimeout/Promise之前先把这些概念厘清。 通过本篇文章&#xff0c;你将了解&#xff1a; 并发/并行的概念及区别同步/异步的概念及区别单线程/多线程的概念及区别主线程和子线…

Next App Router(中)

1.定义布局 布局是指多个页面共享的 UI。在导航的时候&#xff0c;布局会保留状态、保持可交互性并且不会重新渲染&#xff0c;比如用来实现后台管理系统的侧边导航栏。 定义一个布局&#xff0c;你需要新建一个名为 layout.js的文件&#xff0c;该文件默认导出一个 React 组…

XUbuntu18.04 源码编译Qt4.5.3的过程

由于新公司很多旧的软件都是基于这个版本做的嵌入式开发。 所以想要自己搭一套基于Linux的非嵌入式开发环境&#xff0c;方便用来调试和编译代码。 这样就可以完成在linux下开发&#xff0c;然后直接嵌入式打包&#xff0c;涉及到界面的部分就不需要上机调试看问题了。 所以…

一个完全用rust写的开源操作系统-Starry

1. Starry Starry是2023年全国大学生计算机系统能力大赛操作系统设计赛-内核实现赛的二等奖作品。Starry是在组件化OS的arceos的基础上&#xff0c;进行二次开发的操作系统内核&#xff0c;使用宏内核架构&#xff0c;能够运行Linux应用的内核。 原始的操作系统大赛的仓库为 …

linux 基础命令docker及防火墙iptables详解

应用场景&#xff1a; web应用自动打包和发布 自动化测试&#xff0c;持续集成、发布 在服务环境中部署后台应用 搭建paaS平台 安装应用 apt install docker.io#kali中 配置docker源&#xff0c;文件位置/etc/docker/daemon.json { "registry-mirrors": [ "h…

原牛角源码(修罗bbs)全站程序打包带数据库备份

原牛角源码(修罗bbs)全站程序打包带数据库备份&#xff0c;牛角源码全站数据全站文件、插件打包分享给大家&#xff0c;有兴趣的可以搭建玩玩&#xff01; conf文件夹中自己配置conf.php里面的数据库链接文件&#xff0c;默认管理账号&#xff1a;admin&#xff0c;密码&#…

【大数据】bigtable,分布式数据库的鼻祖

目录 1.概述 2.数据模型 3.API 4.架构 5.一个完整的读写过程 6.如何查找到要的tablet 7.LSM树 1.概述 本文是作者阅读完bigtable论文后对bigtable进行的一个梳理&#xff0c;只涉及核心概念不涉及具体实操&#xff0c;具体实操会在后续的文章中推出。 GFS的出现虽然解…

上位机图像处理和嵌入式模块部署(树莓派4b实现xmlrpc通信)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 前面&#xff0c;我们也用纯API实现过上位机和开发板之间的通信。当时使用的方法&#xff0c;就是用windows自带的网络sdk和linux自带的api函数来完…

分享4张亚马逊云科技AWS免费云开发和AI证书(有答案)

今天给大家带来特别福利&#xff0c;一口气带来亚马逊云科技AWS4张免费云开发/AI证书(有Credly徽章&#xff0c;有答案)&#xff0c;这四门都是云开发相关的硬核知识&#xff0c;含金量极高。 主要考察如何用AWS AI服务进行开发、以及当下热门的云原生改造&#xff0c;16道题80…

葡萄书--关系图卷积神经网络

异质图和知识图谱 同质图与异质图 同质图指的是图中的节点类型和关系类型都仅有一种 异质图是指图中的节点类型或关系类型多于一种 知识图谱 知识图谱包含实体和实体之间的关系&#xff0c;并以三元组的形式存储&#xff08;<头实体, 关系, 尾实体>&#xff0c;即异…

IP地址定位:揭秘精准定位的技术与应用

在数字化时代&#xff0c;IP地址已成为连接互联网世界的关键标识之一。但是&#xff0c;很多人对于IP地址的精准定位能力存在疑虑。本文将深入探讨IP地址定位的技术原理以及其在实际应用中的精确度。 IP地址查询&#xff1a;IP数据云 - 免费IP地址查询 - 全球IP地址定位平台 …

Python中的设计模式与最佳实践

&#x1f47d;发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 Python中的设计模式与最佳实践 在软件开发中&#xff0c;设计模式是一种解决常见问题的经过…