【2023】redis-stream配合spring的data-redis详细使用

news2024/11/19 8:22:09

目录

  • 一、简介
  • 1、介绍
  • 2、对比
  • 二、整合spring的data-redis实现
  • 1、使用依赖
  • 2、配置类
  • 2.1、配置RedisTemplate bean
  • 2.2、异常类
  • 3、实体类
  • 3.1、User
  • 3.2、Book
  • 4、发送消息
  • 4.1、RedisStreamUtil工具类
  • 4.2、通过延时队列线程池模拟发送消息
  • 4.3、通过http主动发送消息
  • 5、🌟消息接收
  • 5.1、不绑定消费组---可以实现广播📢效果
  • 方式1:主动读取
  • 测试日志
  • 🍉方式2:通过监听器监听是否有新消息
  • 5.2、指定消费组----实现一个组内只有一个成员可以接收到
  • 5.2.1、配置类
  • 5.2.2、监听器
  • 通过延时队列发送到消息---测试结果:
  • 通过http发送User到子类Book对象数据测试结果
  • 三、完整代码
  • 四、引用

背景:
使用该方式实现,主要是因为项目中有个地方刚好需要异步来实现,而项目又没有配置专业的消息中间件,并且使用的也不是太频繁,就觉得没必要专门安装一个MQ服务了,直接通过现有的redis的stream来实现异步消息接收直接具体的业务逻辑。

一、简介

1、介绍

Redis Stream(Redis Streams)是Redis 5.0版本引入的一种数据结构,用于处理时间序列数据、消息队列和日志流。它提供了高吞吐量、持久性、有序、可扩展的消息传递解决方案。Redis Stream 结构是对传统发布/订阅模式的增强,使你能够更灵活地处理数据流,并提供了以下主要特性:

  1. 多生产者和多消费者:多个生产者可以同时向 Stream 中写入消息,而多个消费者可以独立订阅并消费消息。每个消费者可以有不同的消费速率。

  2. 消费组:Redis Stream引入了消费者组的概念,多个消费者可以加入同一个消费者组并共同消费消息,这确保了消息在消费时不会被多次处理。

  3. 消费者阻塞:消费者可以使用 XREADGROUP 命令以阻塞方式获取消息,只有当有新消息到达时才会被推送给消费者。

  4. 消费者自动确认:Redis Stream 支持自动确认消息,消费者可以告诉 Redis 何时确认已经成功处理了一条消息。

  5. 多 Stream 支持:你可以创建多个 Stream 来存储不同种类的数据,并分别处理它们。

  6. 有序性:消息在 Stream 中按照消息的时间戳有序存储,因此你可以按照消息的顺序读取数据。

  7. 持久性存储:Redis Stream 使用内存数据结构,但也支持将数据异步保存到磁盘,以确保数据不会丢失。

2、对比

对比redis的其他几种实现方式来说功能更加全面,支持可持久化和通过ack确认的方式基本实现了消息丢失的问题,当然对比专业的消息队列中间件来说还是有些不足的。
需要看详细对比可以看 🔗redis队列对比 这篇文章

在这里插入图片描述

二、整合spring的data-redis实现

1、使用依赖

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.11.1</version>
  </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
          </dependencies>

2、配置类

2.1、配置RedisTemplate bean

重点是下面这一句,不能用json的序列化类,否则会序列化失败
redisTemplate.setHashValueSerializer(RedisSerializer.string());

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        // 这个地方不可使用 json 序列化,如果使用的是ObjectRecord传输对象时,可能会有问题,会出现一个 java.lang.IllegalArgumentException: Value must not be null! 错误
        redisTemplate.setHashValueSerializer(RedisSerializer.string());
        return redisTemplate;
    }

2.2、异常类

@Slf4j
public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable throwable) {
        log.error("发生了异常",throwable);
    }
}

3、实体类

该地方使用了两个实体类,主要是用于测试,如果不是指定的同一个类型时,指定的是父类的类型,是否可以正常反序列化接收消息

3.1、User

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Book extends User{
    private String title;
    private String author;
}

3.2、Book

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class User implements Serializable {
    private String name;
    private Integer age;
}

4、发送消息

发送消息主要是通过redisTemplate.opsForStream().add(record);进行发送到redis中(接收时会分两种方式接收,看后续!)

4.1、RedisStreamUtil工具类

用于实现消息发送、初始化组、key绑定组、清除消费了的消息等方法

  • 在第一次发送消息时需要先绑定接收的组和可key,否则在接收时会报不存在该组的异常
  • 发送消息后,需要把该条消费了的消息清除掉,否则会一直保持在stream中
@Component
@Slf4j
public class RedisStreamUtil {
    public static final String STREAM_KEY_001 = "stream-001";
    @Resource
    private RedisTemplate<String,Object> redisTemplate;


    /**
     * 添加记录到流中
     * @param streamKey
     * @param t
     * @param <T>
     */
    public <T> RecordId add(String streamKey,T t){
        ObjectRecord<String, T> record = StreamRecords.newRecord()
                .in(streamKey)  //key
                .ofObject(t) //消息数据
                .withId(RecordId.autoGenerate());
//        发送消息
        RecordId recordId = redisTemplate.opsForStream().add(record);

        log.info("添加成功,返回的record-id[{}]",recordId);


        return recordId;
    }

    /**
     * 用来创建绑定流和组
     */
    public void addGroup(String key, String groupName){
        redisTemplate.opsForStream().createGroup(key,groupName);
    }
    /**
     * 用来判断key是否存在
     */
    public boolean hasKey(String key){
        if(key==null){
            return false;
        }else{
            return redisTemplate.hasKey(key);
        }

    }
    /**
     * 用来删除掉消费了的消息
     */
    public void delField(String key,RecordId recordIds){
        redisTemplate.opsForStream().delete(key,recordIds);
    }

    /**
     * 用来初始化 实现绑定key和消费组
     */
    public void initStream(String key, String group){
        //判断key是否存在,如果不存在则创建
        boolean hasKey = hasKey(key);
        if(!hasKey){
            Map<String,Object> map = new HashMap<>();
            map.put("key","value");
            RecordId recordId = add(key, map);
            addGroup(key,group);   //第一次初始化时需要把Stream和group绑定,
            delField(key,recordId);  //清除掉该条无用数据
            log.info("stream:{}-group:{} initialize success",key,group);
        }
    }


    public String getStreamKey001(){
        return STREAM_KEY_001;
    }
}

4.2、通过延时队列线程池模拟发送消息

  • 该方法里通过模拟延时5秒后,每隔3秒发送一条数据,发送10条后关闭线程池
/**
 * 在spring初始化时执行,定时发送消息到stream中,用于模拟发送消息
 */
//@Component
public class StreamMessageRunner implements ApplicationRunner {

    @Resource
    private RedisStreamUtil redisStreamUtil;


    @Override
    public void run(ApplicationArguments args) throws Exception {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

        AtomicInteger integer = new AtomicInteger(0);
        //使用延时队列线程池模拟发送数据消息
        pool.scheduleAtFixedRate(()->{
            User zhangsan = new User("zhangsan"+integer.get(), 1 + integer.get());
            RecordId recordId = redisStreamUtil.add(redisStreamUtil.getStreamKey001(), zhangsan);
            integer.getAndIncrement();
//需要把消费了的消息清除掉,否则会一直保持在stream中,会被重复消费
            redisStreamUtil.delField(redisStreamUtil.getStreamKey001(),recordId);
            if (integer.get()>10){
                System.out.println("---------退出发送消息--------");
                pool.shutdown();
            }
        },5,3, TimeUnit.SECONDS);
    }
}

4.3、通过http主动发送消息

  • 通过分别发送父类和子类比对查看不同效果
@RestController
@RequestMapping("/index")
public class index {
    @Resource
    private RedisStreamUtil redisStreamUtil;

   /**
    * 父类
    */
    @GetMapping("/login")
    public String login(User user){

        RecordId recordId = redisStreamUtil.add(redisStreamUtil.getStreamKey001(), user);
        redisStreamUtil.delField(redisStreamUtil.getStreamKey001(),recordId);

        return "成功!";
    }
    /**
    * 子类
    */
    @GetMapping("/login2")
    public String login(Book book){

        RecordId recordId = redisStreamUtil.add(redisStreamUtil.getStreamKey001(), book);

        redisStreamUtil.delField(redisStreamUtil.getStreamKey001(),recordId);
        return "成功!";
    }
}

5、🌟消息接收

5.1、不绑定消费组---可以实现广播📢效果

节点消费者不绑定消费组,直接和stream进行绑定,即可实现广播的效果,每次有消息发送到该指定节点的stream,都可以接收到。

如下图:有消息发送到redis Stream 里面绑定的A0到B2全部可以接收到这条消息

在这里插入图片描述

方式1:主动读取

通过redisTemplate.opsForStream().read()方法主动去stream中读取消息

/**
 * 独立消费者---可以读取到该key的全部消息
 */
@Component
@Slf4j
public class XreadNonBlockConsumer01 implements InitializingBean, DisposableBean {
    private ThreadPoolExecutor threadPoolExecutor;
    @Resource
    private RedisTemplate<String,Object> redisTemplate;
    private volatile boolean stop = false;


    /**
     * 初始化bean时执行,以轮询的方式主动去stream的指定key里读取消息
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        // 初始化线程池
        threadPoolExecutor = new ThreadPoolExecutor(3, 5, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("xread-nonblock-01");
            return thread;
        });

        StreamReadOptions options = StreamReadOptions.empty()
//                如果没有数据,则阻塞1s,阻塞时间需要小于·spring.redis.timeout·
                .block(Duration.ofMillis(1000))
//                一直阻塞直到获取数据,可能会报超时异常
//                .block(Duration.ofMillis(0))
//                一次获取10条数据
                .count(10);

        StringBuilder readBuilder = new StringBuilder("0-0");

        threadPoolExecutor.execute(()->{
            while (!stop){
                //主动到redis的stream中去读取,options设置了每读取一次阻塞一秒
                List<ObjectRecord<String, User>> objectRecords = redisTemplate.opsForStream()
                        .read(User.class, options,
                                StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.from(readBuilder.toString())));

                if (CollectionUtils.isEmpty(objectRecords)){
                    log.warn("没有读取到数据");
                    continue;
                }
                objectRecords.stream().forEach(objectRecord->{
                    log.info("获取到的数据信息 id:[{}] book:[{}]", objectRecord.getId(), objectRecord.getValue());
                    readBuilder.setLength(0);
                    readBuilder.append(objectRecord.getId());

                });
            }
        });


    }

    /**
     * 在销毁bean时把线程池关闭
     * @throws Exception
     */
    @Override
    public void destroy() throws Exception {
        stop = true;
        threadPoolExecutor.shutdown();
        threadPoolExecutor.awaitTermination(3,TimeUnit.SECONDS);
    }

}
测试日志

在这里插入图片描述

🍉方式2:通过监听器监听是否有新消息

具体代码和分组的是一样的,只不过不指定组而已,就合并在下面写了
主要通过StreamMessageListenerContainer这个监听器类实现。

主要通过下面这一句:

container.receive(StreamOffset.fromStart(RedisStreamUtil.STREAM_KEY_001), new MonitorStreamListener("独立消费", null, null));

5.2、指定消费组----实现一个组内只有一个成员可以接收到

进行分组之后,一个组内,只会有一个成员可以读到消息,具体如下图,当然在使用时也可以绑定多个组,每个组接收不听的消息。下面方式就相当于mq了,交换机,队列和路由键的关系

在这里插入图片描述

5.2.1、配置类

下面代码具体流程时先创建一个线程池;然后在配置消息监听容器,最后在把用于接收消息的监听器放入到监听容器中去,最后把这个侦听容器注入到bean去

@Configuration
public class RedisStreamConfiguration {

    @Resource
    private RedisStreamUtil redisStreamUtil;
    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    @Bean(initMethod = "start",destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String,User>> streamMessageListenerContainer(){
        AtomicInteger index = new AtomicInteger(1);
//        获取本机线程数
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(50)
                , (r) -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        }, new ThreadPoolExecutor.CallerRunsPolicy());

//        消息监听容器,不能在外部实现。创建后,StreamMessageListenerContainer可以订阅Redis流并使用传入的消息
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,ObjectRecord<String,User>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多获取多少条消息
                        .batchSize(10)
                        // 运行 Stream 的 poll task
                        .executor(executor)
                        // Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小
                        .pollTimeout(Duration.ofSeconds(1))
                        // ObjectRecord 时,将 对象的 filed 和 value 转换成一个 Map 比如:将Book对象转换成map
//                        .objectMapper(new ObjectHashMapper())
                        // 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理
                        .errorHandler(new CustomErrorHandler())
                        // 将发送到Stream中的Record转换成ObjectRecord,转换成具体的类型是这个地方指定的类型
                        .targetType(User.class)
                        .build();


        StreamMessageListenerContainer<String, ObjectRecord<String, User>> container = StreamMessageListenerContainer.create(redisConnectionFactory, options);


        //        初始化-绑定key和消费组
        redisStreamUtil.initStream(RedisStreamUtil.STREAM_KEY_001,"group-a");

//        不绑定消费组,独立消费
        container.receive(StreamOffset.fromStart(RedisStreamUtil.STREAM_KEY_001), new MonitorStreamListener("独立消费", null, null));

        // 消费组A,不自动ack
        // 从消费组中没有分配给消费者的消息开始消费
//        container.receive(Consumer.from("group-a","consumer-a"),
//                StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.lastConsumed()),new MonitorStreamListener("消费者组A","group-a", "consumer-a"));

//        自动ack
        container.receiveAutoAck(Consumer.from("group-a","consumer-b"),
                StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.lastConsumed()),new MonitorStreamListener("消费者组B","group-a", "consumer-b"));

        return container;
    }
}

重要代码解析:
1. .targetType(User.class) :在配置监听容器时,用于指定类型,不指定时默认是string类型,如果你传入的不是string机需要指定;如果配置的是父类的,也可以接收子类的消息,进行转换。但如果是配置的Object类型,接收时就会为路径,不能正常得到传入的对象(不知道为什么,有研究懂的可以解答一下)
2. redisStreamUtil.initStream(RedisStreamUtil.STREAM_KEY_001,"group-a"):在第一次生成时,需要把消费组绑定该stream的key,否则会报错,具体内部执行逻辑可以看initStream()方法(或者自己手动通过命令到redis去绑定:xgroup create stream-001 group-a $)stream-001(key) group-a(消费组)
3. container.receive(StreamOffset.fromStart(RedisStreamUtil.STREAM_KEY_001), new MonitorStreamListener("独立消费", null, null)) :该句是不绑定消费组,也就是广播的方式监听该key中的所有消息(和上面的区别是,该方式是被动的监听消息
4. 🌟container.receiveAutoAck(Consumer.from("group-a","consumer-b") ,StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.lastConsumed()),new MonitorStreamListener("消费者组B","group-a", "consumer-b"))就是通过该句代码实现分组监听消息的,绑定了消费组和消费者的名字,以及监听器类。然后使用的自动ack的方式回复stream确认接收到了消息(或者通过手动ack的方式返回stream接收到了消息,否则会重复发送),

5.2.2、监听器

用于接收消息然后实现具体业务代码

@Slf4j
public class MonitorStreamListener <T> implements StreamListener<String, ObjectRecord<String,T>> {

    /**
     * 消费者类型:独立消费、消费组消费
     */
    private String consumerType;
    /**
     * 消费组
     */
    private String group;
    /**
     * 消费组中的某个消费者
     */
    private String consumerName;

    public MonitorStreamListener(String consumerType, String group, String consumerName) {
        this.consumerType = consumerType;
        this.group = group;
        this.consumerName = consumerName;
    }
    
    @Override
    public void onMessage(ObjectRecord<String, T> message) {
        log.info("接受到来自redis的消息");

        String stream = message.getStream();
        RecordId id = message.getId();
        User value = (User) message.getValue();
        value.getName();

//        执行具体的接收到消息的业务逻辑
        if (StringUtils.isEmpty(group)) {
            log.info("[{}]: 接收到一个消息 stream:[{}],id:[{}],value:[{}]", consumerType, stream, id, value);

        } else {
            log.info("[{}] group:[{}] consumerName:[{}] 接收到一个消息 stream:[{}],id:[{}],value:[{}]", consumerType,
                    group, consumerName, stream, id, value);
        }
        
        // 当是消费组消费时,如果不是自动ack,则需要在这个地方手动ack
//        redisTemplate.opsForStream()
//                 .acknowledge("key","group","recordId");
        
    }
}
通过延时队列发送到消息---测试结果:

在这里插入图片描述

通过http发送User到子类Book对象数据测试结果

在这里插入图片描述


结果:也是可以正常接受到的

在这里插入图片描述

三、完整代码

🪟完整代码仓库地址

四、引用

https://juejin.cn/post/7029302992364896270#heading-0
https://juejin.cn/post/6844904125822435341%EF%BC%9FsearchId=202310141054532F9807A1000F6680C0DF#heading-1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1112545.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

003数据安全传输-多端协议传输平台:Protobuf - 部署

文章目录 一、Windows环境二、Linux Centos环境三、protobuf测试3.1 新建.proto文件生成相应的类3.2 .proto生成相应的类的使用3.3 配置VS3.4 test代码 一、Windows环境 在windows下配置&#xff0c;无论protobuf是什么版本&#xff0c;IDE和编译器的版本都要保持一致。 比如…

Linux - 大括号的妙用

示例1 touch demo_{1..10}.txt示例2 touch case_{a,b,c,d}.txt示例3 touch {a,b}{1..4}.txt

第三章 内存管理 十一、虚拟内存的基本概念

目录 一、传统存储管理 1、缺点 二、局部性原理 1、时间局部性&#xff1a; 2、空间局部性&#xff1a; 三、虚拟内存的定义和特征 1、结构 ​编辑 2、定义 3、特征 &#xff08;1&#xff09;多次性: &#xff08;2&#xff09;对换性: &#xff08;3&#xff09;…

【来点小剧场--项目测试报告】个人博客系统测试报告

一、项目背景 个人博客系统采用前后端分离的方法来实现&#xff0c;使用了MySQL数据库来存储相关的数据&#xff0c;同时对Redis进行配置&#xff0c;将session会话存储在redis中以方便分布式运转&#xff0c;最后通过云服务器将项目部署到网络上。前端主要有六个页面构成&…

Vue3 + Nodejs 实战 ,文件上传项目--大文件分片上传+断点续传

目录 1.大文件上传的场景 2.前端实现 2.1 对文件进行分片 2.2 生成hash值&#xff08;唯一标识&#xff09; 2.3 发送上传文件请求 3.后端实现 3.1 接收分片数据临时存储 3.2 合并分片 4.完成段点续传 4.1修改后端 4.2 修改前端 5.测试 博客主页&#xff1a;専心_前端…

[牛客]计算机网络习题笔记_1019

1、物理层&#xff1a;以太网 调制解调器 电力线通信(PLC) SONET/SDH G.709 光导纤维 同轴电缆 双绞线等。 2、数据链路层&#xff08;网络接口层包括物理层和数据链路层&#xff09;&#xff1a;Wi-Fi(IEEE 802.11) WiMAX(IEEE 802.16) ATM DTM 令牌环 以太网 FDD…

高校教务系统登录页面JS分析——华东交通大学

高校教务系统密码加密逻辑及JS逆向 本文将介绍高校教务系统的密码加密逻辑以及使用JavaScript进行逆向分析的过程。通过本文&#xff0c;你将了解到密码加密的基本概念、常用加密算法以及如何通过逆向分析来破解密码。 本文仅供交流学习&#xff0c;勿用于非法用途。 一、密码加…

android studio打开flutter项目报红

一、android studio打开flutter项目报红&#xff0c;如下图&#xff1a; 二、解决方法&#xff1a; 2.1 在这个build.gradle添加以下代码&#xff0c;如图&#xff1a; 2.2 在build.gradle最顶部添加如下代码&#xff1a; def localProperties new Properties() def localPr…

水经注地图服务 5.0.1-rc 版发布

《水经注地图服务》&#xff08;WeServer&#xff09;是一款可快速发布全国乃至全球海量卫星影像的地图发布服务产品。 它可以轻松发布260TB级海量卫星影像&#xff0c;从而使“在内网建立一个离线版的地球”不只是一个梦想&#xff01; ​01 新版发布 水经注地图服务 5.0…

NodeMCU ESP8266 读取按键外部输入信号详解(图文并茂)

NodeMCU ESP8266 读取按键外部输入信号教程&#xff08;图文并茂&#xff09; 文章目录 NodeMCU ESP8266 读取按键外部输入信号教程&#xff08;图文并茂&#xff09;前言按键输入常用接口pinModedigitalRead 示例代码结论 前言 ESP8266如何检测外部信号的输入&#xff0c;通常…

10kV-35kV交联电缆油杯终端

武汉凯迪正大油杯产品简介 KDZD-10 /KDZD-35 油杯终端是我公司在总结了大量的现场经验的基础上&#xff0c;自行开发、设计的一种 10~35kV 以下交联电缆和工频耐压试验的简易试验终端&#xff0c;该油杯操作简便&#xff0c;使用可靠。 目前电缆厂均拥有多条 XLPE 生产线&…

【Git】升级MacOS系统,git命令无法使用

终端执行git命令报错 xcrun: error: invalid active developer path (/Library/Developer/CommandLineTools), missing xcrun at: /Library/Developer/CommandLineTools/usr/bin/xcrun安装这个东东&#xff0c;&#xff1f;需要42小时 最终解决&#xff1a; 下载安装 https…

C语言进阶第七课-----------自定义类型的讲解(结构体枚举联合)

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; ​&#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382; &#x1f389;&#x1f389;&#x1f389…

2023CRM排行:深度对比16款CRM

客户关系管理系统&#xff08;CRM&#xff09;作为数字化转型的重要载体&#xff0c;选择一个优秀的CRM系统将为企业未来健康增长保障。市场上CRM软件众多&#xff0c;但很难分清哪个适合自己&#xff0c;最近赶在公司选型&#xff0c;我对市场所有软件进行了一个调研&#xff…

postgresql(openGauss)模糊匹配参数

被pg系这个show要求精准匹配参数恶心的不轻。 原理是用.psqlrc&#xff08;openGauss用.gsqlrc&#xff09;文件set一个select常量进去&#xff0c;需要用&#xff1a;调用这个常量。理论上也可以增强其他的各种功能。 我在openGauss做的一个例子 .gsqlrc&#xff08;.psqlrc…

容灾备份——容灾系统介绍

目录 基本概述 容灾关键技术 容灾系统的级别 容灾主要技术 基本概述 容灾与备份的区别 容灾备份——备份技术系统架构与备份网络方案-CSDN博客https://blog.csdn.net/m0_49864110/article/details/123969802?ops_request_misc%257B%2522request%255Fid%2522%253A%252216…

【Java 进阶篇】JavaScript 表单验证详解

JavaScript 表单验证是网页开发中不可或缺的一部分。它允许您确保用户在提交表单数据之前输入了有效的信息。无论您是一个初学者还是一个有经验的开发人员&#xff0c;本文将为您详细介绍如何使用 JavaScript 来进行表单验证。我们将从基础知识开始&#xff0c;逐步深入&#x…

ubuntu20.04运用startup application开机自启动python程序

运用startup application开机自启动python程序。在终端中输入gnome-session-properties,如果显示没有则先进行安装&#xff0c;sudo apt-get update 和sudo apt install StartupApplications(根据显示提示安装)。在显示程序中搜索startup&#xff0c;打开应用程序。 在程序目录…

LeetCode 1595. 连通两组点的最小成本【记忆化搜索,状压DP】2537

本文属于「征服LeetCode」系列文章之一&#xff0c;这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁&#xff0c;本系列将至少持续到刷完所有无锁题之日为止&#xff1b;由于LeetCode还在不断地创建新题&#xff0c;本系列的终止日期可能是永远。在这一系列刷题文章…

环形链表的约瑟夫问题

前言&#xff1a; 据说著名犹太历史学家Josephus有过如下故事&#xff1a; 在罗马人占领乔塔帕特后&#xff0c;39个犹太人和Josephus及他的朋友躲进一个洞里&#xff0c;39个犹太人决定宁愿死也不要被敌人抓到&#xff0c;于是决定了一个自杀方式&#xff0c;41个人排成一个…