【redis】springboot 用redis stream实现MQ消息队列 考虑异常ack重试场景

news2025/1/23 22:03:58

redis stream是redis5引入的特性,一定程度上借鉴了kafka等MQ的设计,部署的redis版本必须 >= 5

本文主要讲的是思路,结合简单的源码分析(放心,无需深入大量源码);讲述在redis stream文档缺乏,网上资料欠缺,gpt回答不上来的情况下,博主是如何用两三天的时间 从没接触过redis stream 到分析完成了redis stream mq功能 。博主始终认为 有明确的思路 才能知道什么代码是正确的 能复制拿来用,什么代码只是单纯跑起来demo的 绝对达不到生产级别。
本文源自csdn博主:孟秋与你 ,博主虽才疏学浅 却也是在资料极少的情况下 ,辛苦研究源码、整理思路 撰写的本文,转载请声明出处。

文章目录

  • redisTemplate API的熟悉
  • 配置
    • redis mq config
    • 监听器:
    • 定时器
  • 优化方向

(本文基于springboot3.3 jdk17 redis6环境,
理论上springboot2 redis5也是通用教程 可能会有细微的api差异 稍微分析一下源码方法都能处理)

redisTemplate API的熟悉

我们在操作redis的时候 通常是使用spring-data-redis提供的redisTemplate或者jedis 本文以redisTemplate为例。
(实际业务场景可能需要考虑用jedis替换 因为mq通常在数据量、并发量都大的场景;redisTemplate的优势在于和springboot的完美集成,且不需要考虑通过连接池来管理线程安全问题)

用过redisTemplate的同学应该都会自己封装一下工具类,因为redisTemplate封装的不够好,不管怎么样 我们都需要先看看这个类

redisTemplate.opsForHash()redisTemplate.opsForValue()

各位应该很熟悉了, stream是一种新引入的格式,那么我们直接在RedisTemplate类里面搜stream就好了,正常都会有对应API
(没对应API那就是spring版本太老了 spring那个老版本出来的时候 redis还没出到5 )

搜到了opsForStream()方法在这里插入图片描述 继续查看方法 如下图: 在这里插入图片描述

这里说明一下,redis的streamKey就类似mq的topic, group是消费者组,cousumer是消费者,acknowledge即ack 应答机制 告诉mq已经成功消费了,claim是强制将消息转至其它消费者 通常用于消费失败/多次消费失败的场景,pending存放的是未ack的消息 就比如消费某个消息时 出现了异常 没能执行到ack 这些消息就会放在pending list 确保消息不丢失。

通过api,加上我们掌握的mq基本知识,大概就能理解是怎么一回事了。demo搭建不难,但是代码要上生产,我们就必须考虑消息消费失败了怎么办 该如何重试,也就是说重点的api在acknowledge和pending上面。

一个简单的封装

	
	@Component
	public class RedisStreamUtil {
	    @Autowired
	    private RedisTemplate<String, Object> redisTemplate;
	
	    /**
	     * 创建消费组
	     *
	     * @param key   键名称
	     * @param group 组名称
	     * @return {@link String}
	     */
	    public String createGroup(String key, String group) {
	        return redisTemplate.opsForStream().createGroup(key, group);
	    }
	
	    /**
	     * 获取消费者信息
	     *
	     * @param key   键名称
	     * @param group 组名称
	     * @return {@link StreamInfo.XInfoConsumers}
	     */
	    public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {
	        return redisTemplate.opsForStream().consumers(key, group);
	    }
	
	    /**
	     * 查询组信息
	     *
	     * @param key 键名称
	     * @return
	     */
	    public StreamInfo.XInfoGroups queryGroups(String key) {
	        return redisTemplate.opsForStream().groups(key);
	    }
	
	    /**
	     * 添加Map消息
	     * @param key
	     * @param value
	     */
	    public String addMap(String key, Map<String, Object> value) {
	        return redisTemplate.opsForStream().add(key, value).getValue();
	    }
	
	    /**
	     * 读取消息
	     * @param key
	     */
	    public List<MapRecord<String, Object, Object>> read(String key) {
	        return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
	    }
	
	    /**
	     * 确认消费
	     * @param key
	     * @param group
	     * @param recordIds
	     */
	    public Long ack(String key, String group, String... recordIds) {
	        return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
	    }
	
	    /**
	     * 删除消息
	     * 当一个节点的所有消息都被删除,那么该节点会自动销毁
	     * @param key
	     * @param recordIds
	     */
	    public Long del(String key, String... recordIds) {
	        return redisTemplate.opsForStream().delete(key, recordIds);
	    }
	
	    /**
	     *  判断是否存在key
	     * @param key
	     */
	    public boolean hasKey(String key) {
	        Boolean flag= redisTemplate.hasKey(key);
	        return flag != null && flag;
	    }
	
	}
	

注意:会有循环依赖的问题,如果没有那就是springboot版本太低,低版本默认是开启允许循环依赖的,高版本默认不允许(2.7已经不允许了 具体版本不记得了)

解决方法1: 在yml配置里面允许循环依赖

server:
  port: 8586

spring:
  application:
    name: springboot3-demo
  data:
    redis:
      port: 6579
      host: 192.168.1.1
      password: xxxxxxx
      database: 1
      lettuce:
        pool:
          max-wait: 5000ms
          max-active: 1000

  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test?characterEncoding=utf8&serverTimezone=UTC&rewriteBatchedStatements=true
    type: com.alibaba.druid.pool.DruidDataSource
    username: root
    password: root
# 允许循环依赖
  main:
    allow-circular-references: true

解决方法2:该工具类不交给spring托管 代码如下图所示
在spring bean初始化的时候 把redisTemplate bean赋值到工具类即可,工具类方法变成静态方法
在这里插入图片描述

配置

redis mq config

以下代码展示了如何配置多个生产者,也是这个代码最难写。


package com.qiuhuanhen.springboot3demo.redis.config;

import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qiuhuanhen.springboot3demo.redis.RedisStreamUtil;
import com.qiuhuanhen.springboot3demo.redis.consumer.RedisConsumer;
import com.qiuhuanhen.springboot3demo.redis.consumer.listener.RedisConsumersListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisServerCommands;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@Slf4j
public class RedisConfig{

    @Autowired
    private RedisStreamUtil redisStreamUtil;

    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;

    @Autowired
    private Map<String, RedisConsumer> redisConsumer;


    /**
     * redis序列化
     *
     * @param redisConnectionFactory
     * @return {@code RedisTemplate<String, Object>}
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.activateDefaultTyping(om.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(om,Object.class);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public List<Subscription> subscriptions(RedisConnectionFactory factory) {
        List<Subscription> subscriptions = new ArrayList<>();
        subscriptions.add( createSubscription(factory, "orderStream", "orderGroup", "orderConsumer"));
        subscriptions.add( createSubscription(factory, "productStream", "productGroup", "productConsumer"));
        return subscriptions;
    }

    /**
     * @param factory
     * @param streamName   类似 topic
     * @param groupName    消费组是 Redis Streams 中的一个重要特性,它允许多个消费者协作消费同一个流中的消息。每个消费组可以有多个消费者。
     * @param consumerName 这是消费组中的具体消费者名称。每个消费者会从消费组中领取消息进行处理。
     * @return
     */
    private Subscription createSubscription(RedisConnectionFactory factory, String streamName, String groupName, String consumerName) {

        initStream(streamName, groupName);

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer
                        .StreamMessageListenerContainerOptions
                        .builder()
                        // 每次从Redis Stream中读取消息的最大条数 (32为rocketmq的pullBatchSize默认数量)
                        .batchSize(32)
                        .executor(threadPoolExecutor)
                        // 轮询拉取消息的时间 (如果流中没有消息,它会等待这么久的时间,然后再次检查。)
                        .pollTimeout(Duration.ofSeconds(1))
                        .errorHandler(throwable -> {
                            log.error("[redis MQ handler exception]", throwable);
                            throwable.printStackTrace();
                        })
                        .build();

        var listenerContainer = StreamMessageListenerContainer.create(factory, options);

        // 手动ask消息
//        Subscription subscription = listenerContainer.receive(Consumer.from(groupName, consumerName),
//                //  创建一个流的偏移量实例。 含义: 指定从哪个偏移量开始读取消息。ReadOffset.lastConsumed()表示从上次消费的位置开始。
//                StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);

        // 自动ask消息
//            Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(groupName, consumerName),
//                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);

        // 手动创建 核心在于 cancelOnError(t -> false)  出现异常不退出
        StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed()))
                .consumer(Consumer.from(groupName, consumerName))
                .autoAcknowledge(false)
                // 重要!
                .cancelOnError(t -> false).build();

        Subscription subscription = listenerContainer.register(build, new RedisConsumersListener(redisStreamUtil));
        listenerContainer.start();
        return subscription;
    }

    /**
     * 初始化流 保证stream流程是正常的
     *
     * @param key
     * @param group
     */
    private void initStream(String key, String group) {
        boolean hasKey = redisStreamUtil.hasKey(key);
        if (!hasKey) {
            Map<String, Object> map = new HashMap<>(1);
            map.put("field", "value");
            //创建主题
            String result = redisStreamUtil.addMap(key, map);
            //创建消费组
            redisStreamUtil.createGroup(key, group);
            //将初始化的值删除掉
            redisStreamUtil.del(key, result);
            log.info("stream:{}-group:{} initialize success", key, group);
        }
    }


    /**
     * 校验 Redis 版本号,是否满足最低的版本号要求!
     */
    private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
        // 获得 Redis 版本
        Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
        String version = MapUtil.getStr(info, "redis_version");
        // 校验最低版本必须大于等于 5.0.0
        int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
        if (majorVersion < 5) {
            throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!", version));
        }
    }
}


监听器:

(核心是实现StreamListener接口)

@Slf4j
public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {

    private RedisStreamUtil redisStreamUtil;

    public RedisConsumersListener(RedisStreamUtil redisStreamUtil) {
        this.redisStreamUtil = redisStreamUtil;
    }

    /**
     * 监听器
     *
     * @param message
     */
    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        // stream的key值
        String streamName = message.getStream();
        //消息ID
        RecordId recordId = message.getId();
        //消息内容
        Map<String, String> msg = message.getValue();
			// do something 处理 (这里一般是通过设计模式获取实现类方法 统一处理)
       
            //逻辑处理完成后,ack消息,删除消息,group为消费组名称
            StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamName);
            xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamName, xInfoGroup.groupName(), recordId.getValue()));
            redisStreamUtil.del(streamName, recordId.getValue());
        }
        log.info("【streamName】= " + streamName + ",【recordId】= " + recordId + ",【msg】=" + msg);

    }
}



感兴趣可以看博主踩到的坑, 看完思路才能自行判断 代码是否能直接复制使用 (个人感觉这才是分析技术最精彩的地方 有正确的思路才能在使用新技术时披荆斩棘); 不感兴趣可以直接跳到下一目录

===== ====== ====== 踩坑start ===== ==== ===== =====
一开始使用的是receive方法 (被注释的部分)

        // 手动ask消息
//        Subscription subscription = listenerContainer.receive(Consumer.from(groupName, consumerName),
//                //  创建一个流的偏移量实例。 含义: 指定从哪个偏移量开始读取消息。ReadOffset.lastConsumed()表示从上次消费的位置开始。
//                StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);

这也是网上使用最多的方法,通过方法名我们可以判断出 receiveAutoAck是会自动ack的,不出异常还好,那如果出现异常呢 如何ack? 所以我们肯定是要手动控制的。
在这里插入图片描述
我们可以看看源码 它们的差异:
在这里插入图片描述
是的,就是一个是否自动ack的差别。

既然引入了消息队列,那说明数据量是比较大的,所以肯定是需要考虑异常情况下 消息不能丢失的,于是博主在消费时,故意编写了异常模拟不触发ack的场景. 结果发现 一旦消费出现异常 没有ack时,pending list不再新增数据,在项目重启后数据又增加了,但是再次消息异常时 pending list又阻塞了,这种现象非常奇怪! 难道一个消息没ack redis stream就阻塞吗?这显然不符合设计。 反复思考后,看起来像是出现异常后就停止了轮询,这个mq就像极了是一次性的。
但是和轮询相关的 也就一个pollTimeout参数,它能掀起多大的火花呢?

于是继续看代码 配置redis mq时,都有哪些api. 使用receive方法后 返回的是一个Subscription ,Subscription类有isActive()方法 ,于是在定时器中打印subsciption.isActive() 发现它竟然为false

于是我们追踪这个方法:
在这里插入图片描述
追踪到了StreamPollTask类
在这里插入图片描述
如果是task类 那么应该会有run方法 ,我们直接在里面搜run()

在这里插入图片描述
run方法里面主要就这两个方法
this.pollState.running();
this.doLoop();
第一个running方法 一眼看到头,没什么东西 ;我们看doLoop() 这个方法看起来是循环执行,如果任务中断了 说明是loop出问题了
在这里插入图片描述
里面有行代码:

    if (this.cancelSubscriptionOnError.test(ex)) {
                    this.cancel();
                }

也就是说在cancelSubscriptionOnError.test为true的时候 会取消执行
在这里插入图片描述

还记得isActive()方法吗 它正是去判断该状态的.

通过构造方法 可以看出 该参数是StreamMessageListenerContainer.StreamReadRequest streamRequest 传进来的
在这里插入图片描述

StreamMessageListenerContainer.StreamReadRequest在我们查看listenerContainer.receive源码时 有过一面之缘:
在这里插入图片描述

我们再看看StreamReadRequest.builder出来的StreamReadRequestBuilder类:
在这里插入图片描述
至此,分析完成了闭环,因为receive方法创建出来 默认是遇到异常就取消执行 这明显不符合实际使用,这个设计个人感觉非常欠佳。

这便是为什么使用以下代码来创建的原因

     StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed()))
                .consumer(Consumer.from(groupName, consumerName))
                .autoAcknowledge(false)
                // 重要!
                .cancelOnError(t -> false).build();

===== ====== ====== 踩坑end ===== ==== ===== =====

定时器

代码比较乱 注释代码比较多的原因 不是因为瞎写,而是那些api 在实际业务中可能会使用到,所以特地写在下面了


  // 定期处理 pending list 中的消息
    @Scheduled(cron = "0/20 * * * * ?")
    public void processPendingMessages() {
        String streamKey = "orderStream"; // Redis Stream 的键
        String groupName = "orderGroup";  // 消费者组的名称
        String consumerName = "orderConsumer"; // 当前消费者的名称

        for (Subscription each : subscription) {
            System.out.println(each.isActive());
        }
        StreamOperations<String, String, String> streamOps = redisTemplate.opsForStream();

        // 获取 pending list 中未确认的消息概要
        PendingMessagesSummary pendingSummary = streamOps.pending(streamKey, groupName);

        // 所有pending消息的数量
        long totalPendingMessages = pendingSummary.getTotalPendingMessages();

        if (pendingSummary.getTotalPendingMessages() == 0L) {
            return;
        }

        // 消费组名称
        String groupName1 = pendingSummary.getGroupName();

        // pending队列中的最小ID
        String minMessageId = pendingSummary.minMessageId();

        // pending队列中的最大ID
        String maxMessageId = pendingSummary.maxMessageId();

        if (pendingSummary.getTotalPendingMessages() > 0) {
            // 读取消费者pending队列的前10条记录,从ID=0的记录开始,一直到ID最大值
//            PendingMessages pendingMessages = streamOps.pending(streamKey, Consumer.from(groupName, consumerName), Range.closed("0", "+"), 10);

            // 获取 pending list 中具体的消息
            PendingMessages pendingMessages = streamOps.pending(streamKey, groupName, Range.unbounded(), 10000);
            int size = pendingMessages.size();
            // 获取当前批次的消息
            PendingMessage currentBatchMin = pendingMessages.get(0);
            PendingMessage currentBatchMax = pendingMessages.get(size-1);

            pendingMessages.forEach(
                    pendingMessage ->
                    {

                        // 消息被获取的次数 可以根据次数做不同业务 超过一定次数未消费 考虑是否要ack并del
                        long deliveryCount = pendingMessage.getTotalDeliveryCount();

                        // 读取每个未确认的消息
//                List<MapRecord<String,String,String>> messages = streamOps.read(
//                        StreamReadOptions.empty(),
//                        StreamOffset.create(streamKey,ReadOffset.lastConsumed())
                        StreamOffset.create(streamKey,ReadOffset.from("0"))
//                );

                        List<MapRecord<String, String, String>> messages = streamOps.range(streamKey, Range.closed(currentBatchMin.getId().toString(), currentBatchMax.getId().toString()), Limit.limit().count(10000));

                        for (MapRecord<String, String, String> message : messages) {
                            try {
                                // 处理消息
                                processMessage(message);
                                // 成功处理后确认消息
                                streamOps.acknowledge(streamKey, groupName, message.getId());
                                streamOps.delete(streamKey, message.getId());
                            } catch (Exception e) {
                                // 处理异常情况
                                e.printStackTrace();
                            }
                        }

                    }
            );
        }
    }

至于如何触发就比较简单了,往redis添加一个streamKey即可

   @GetMapping("/stream")
    public String testStream() {

        String mystream = "";
        for (int i = 0; i < 10; i++) {
            Oper oper = new Oper();
            oper.setTestId(11111111L);
            oper.setTestDesc("订单消息队列");
            oper.setVersion(i);
            oper.setTestXxx(LocalDateTime.now().toString());
            Map<String, Object> map = new HashMap<>();
            map.put("oper", oper);
            try {
                Thread.sleep(10);
                mystream = redisStreamUtil.addMap("orderStream", map);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        return String.valueOf(mystream);
    }

优化方向

  1. 建立一个消费者抽象类,定义消费方法

  2. 建议一个降级处理抽象类,定义补偿方法(即消费失败时的处理)

  3. 定义spring的properties类 把生产者消费者字段写到里面

  4. redis需要部署集群,可在博主的主页搜索哨兵,有哨兵架构教程。

  5. 实际业务中,消费消息很可能是存入数据库,在入库完成之后 redis ack完成之前,如果这一瞬间突然宕机了,而数据量又非常大,可能会导致消费重复的情况,因为没有完成ack 下次还是会把该数据从pending list里面取出来。

    解决方案1 :考虑是加redisson锁
    解决方案2:数据库存入消息id字段并建立唯一索引
    (唯一索引的魅力体现出来了)

至此,一份生产级别的redis stream mq架构成立。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1985658.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

港科夜闻 | 香港科大,中大,港大及国大获旭日慈善基金捐款港币五千万元,支持基础数学研究及人才发展...

关注并星标 每周阅读港科夜闻 建立新视野 开启新思维 1、香港科大、中大、港大及国大获旭日慈善基金捐款港币五千万元&#xff0c;支持基础数学研究及人才发展。香港科大校长叶玉如教授在会上代表四所大学&#xff0c;就旭日慈善基金会对推动高等教育及基础研究发展的慷慨支持&…

探索智谱AI的视频生成神器:CogVideoX完全指南

引言 在当今数字化和内容创作高度发达的时代&#xff0c;视频已经成为信息传播和营销的重要工具。然而&#xff0c;对于许多缺乏视频制作经验或资源的个人和企业而言&#xff0c;如何快速、高效地创建吸引人的视频仍然是一个挑战。智谱AI推出的CogVideoX&#xff0c;作为一款先…

PuerTS和HybridCLR哪个更适合开发微信小游戏

1&#xff09;PuerTS和HybridCLR哪个更适合开发微信小游戏 2&#xff09;使用了Play Asset Delivery提交版本被Google报错 3&#xff09;怎样设置normalize来改变摄像机位置 4&#xff09;如何禁用增强型输入法中除某些输入操作之外的输入操作 这是第397篇UWA技术知识分享的推送…

CodeWave常用功能

1、CodeWave添加H5或PC端 CodeWave在左侧侧边栏&#xff0c;可通过“”按钮&#xff0c;直接添加PC端或H5端&#xff0c;或添加页面。 2、修改主题颜色 CodeWave左侧栏对应端的更多按钮中&#xff0c;可对权限及主题色进行修改。 在主题样式修改页面&#xff0c;右侧提供了预…

视频监控汇聚平台LntonCVS视频监控管理平台解决方案和常见的接入方式

一、视频融合平台 LntonCVS是一款支持多种协议和设备接入的视频汇聚流媒体平台。它能够统一管理和整合不同品牌、不同协议的视频资源&#xff0c;构建视频数据资源池&#xff0c;并通过视频资源目录为各类业务场景提供丰富、实时、高清的视频资源。 二、接入方式 1. 前端设备…

成都跃享未来教育咨询抖音小店共绘未来发展

在数字经济的浪潮中&#xff0c;教育行业正经历着前所未有的变革与升级。成都&#xff0c;这座历史悠久而又充满活力的城市&#xff0c;正以其独特的地理位置、深厚的文化底蕴和前瞻性的发展战略&#xff0c;孕育着教育创新的新篇章。其中&#xff0c;成都跃享未来教育咨询抖音…

水域救援设备,保护水域安全_鼎跃安全

季作为一年中最炎热的季节&#xff0c;不仅带来了难耐的高温&#xff0c;也悄然间加剧了水域安全问题的严峻性。这一时期&#xff0c;正值学生群体享受悠长暑假的宝贵时光&#xff0c;他们往往倾向于寻找清凉之地以解酷暑&#xff0c;水域因此成为了不少学生的首选之地。然而&a…

Linux(CentOS)环境搭建Gitea做私有的git服务器

基本分三大步骤&#xff0c;1.安装Gitea&#xff0c;2.安装MySQL&#xff08;或者SQlite等其中一款数据库&#xff09;3.安装Git 一.Gitea Gitea文档地址&#xff1a;文档 - Docs (gitea.io) Gitea的官网&#xff1a;https://gitea.io Gitea最新版本的下载地址&#xff1a;…

【一竞技CS2】Twistzz秋季小组赛rating最高指挥

1、BLAST秋季小组赛于刚刚落下帷幕&#xff0c;数据统计显示&#xff0c;Liquid战队选手Twistzz是秋季小组赛里Rating最高指挥。 2、HLTV发布本周最新世界排名。TOP10战队方面凭借着在小组赛双杀NAVI头名晋级&#xff0c;Liquid战队新阵容一跃进入TOP10的行列&#xff0c;目前位…

湖北职称评审条件是什么?

其实湖北职称评审&#xff0c;要求很多&#xff0c;具体是根据评审专业大类来划分的&#xff0c;不同的专业要求略微有不同&#xff0c;主要是表现在相应的资料准备上&#xff0c;那么职称具体的有哪些要求&#xff1f; 别老听别人说湖北职称申报要求是什么&#xff0c;甘建二告…

基于R语言生物信息学大数据分析与绘图

随着高通量测序以及生物信息学的发展&#xff0c;R语言在生物大数据分析以及数据挖掘中发挥着越来越重要的作用。想要成为一名优秀的生物数据分析者与科研团队不可或缺的人才&#xff0c;除了掌握对生物大数据挖掘与分析技能之外&#xff0c;还要具备一定的统计分析能力与SCI论…

文本加密工具类-支持MD5、SHA1、SHA256、SHA224、SHA512、SHA384、SHA3、RIPMD160算法

文本加密工具类 1.算法简介1.1 MD51.2 SHA-11.3 SHA-2&#xff08;推荐使用&#xff09;1.4 SHA-3&#xff08;推荐使用&#xff09;1.5 RIPEMD-160 2.工具类案例2.1POM导入2.2代码编写2.3 输出示例 1.算法简介 1.1 MD5 MD5 (Message-Digest Algorithm 5) 描述&#xff1a;M…

一文理清生产管理的“4管”和“8理”!

一提到生产管理&#xff0c;很多人的第一反应可能是车间里忙碌的身影、流水线上飞速运转的机器&#xff0c;还有一张张密密麻麻的生产计划表。但实际上&#xff0c;生产管理远不止于此。 “科学管理之父”弗雷德里克温斯洛泰勒认为&#xff1a;管理就是确切地知道你要别人干什…

【Python】数据类型之元组

列表&#xff08;list&#xff09;是一个有序且可变的容器&#xff0c;在里面可以存放多个不同类型的元素。 元组&#xff08;tuple&#xff09;是一个有序且不可变的容器&#xff0c;在里面可以存放多个不同类型的元素。 1、定义 元组中的元素与元素之间用逗号相隔&#xf…

mybatis插件代码生成。

mybatis插件代码生成。 第一步连接数据库&#xff1a;第二步&#xff0c;选择数据库表&#xff1a;第三步&#xff0c;进行配置选择第四步、就生成了有关于表的实体类和其他的表数据。 第一步连接数据库&#xff1a; 在右边&#xff0c;拉出数据库的操作栏 输入用户名密码&am…

虚拟机Windows10系统安装QEMU

文章目录 1. QEMU安装1.1 安装准备1.1.1 安装平台1.1.2 软件下载 1.2 安装QEMU1.2.1 找到下载的QEMU软件&#xff0c;双击开始安装1.2.2 设置语言1.2.3 安装向导&#xff0c;点击 Next1.2.4 点击“I Agree”1.2.5 点击Next1.2.6 设置软件安装位置1.2.7 点击 finish1.2.8 编辑系…

Wi-Fi 7信号标志着行业新的关注重点:稳定性

多链路操作和 6GHz 频段保证了比以往更高的可靠性 Wi-Fi 无疑是我们生活中一项广泛使用的成功技术,但它仍存在一些缺陷,如服务质量不稳定、网速较慢或网络总是中断等问题,给人一种可靠性不佳的印象。 随着 Wi-Fi 7 在今年问世,这一代 Wi-Fi 的重点将转向改善其可靠性。以往每…

【C++】STL | vector 详解及重要函数的实现

目录 前言 总代码 vector类框架建立&#xff08;模板与成员变量&#xff09; 构造、析构、swap 与 赋值重载 构造 析构 swap 赋值重载 reserve 扩容&#xff08;重要&#xff01;&#xff01;&#xff09;、size、capacity operator[ ]重载 insert 插入 逻辑讲解 i…

手撸高性能日志系统(一):百万日志,秒秒落盘(小试牛刀篇)

一、需求一丢&#xff0c;谁累成狗 最近由于某些需要&#xff0c;计划手撸一个高性能的日志系统。需求很简单&#xff1a; 1、 不允许丢一条日志信息&#xff08;很重要很重要&#xff09; 2、支持多线程&#xff0c;必须线程安全 3、性能要越优越好&#xff0c;尽量百万可秒级…

【逗老师的无线电】QRZ快速得到Incoming请求的准确QSO时间

各位友台&#xff0c;有没有遇到过别人从QRZ发过来了Incoming的QSO请求&#xff0c;但是我完全不记得QSO的时间和波段&#xff0c;盲猜要猜好久。尤其是下面这种&#xff0c;8月份发来的6月份的通联记录&#xff0c;这我天天FT8&#xff0c;上哪翻当天的记录啊&#xff08;大概…