redis实现消息队列的几种方式

news2025/1/25 8:56:33

一、了解

众所周知,redis是我们日常开发过程中使用最多的非关系型数据库,也是消息中间件。实际上除了常用的rabbitmq、rocketmq、kafka消息队列(大家自己下去研究吧~模式都是通用的),我们也能使用redis实现消息队列。因为其他中间件可能更适用于大型/企业级项目,在咱们项目前期不需要这么多的数据,redis跟我们也是高度集成的。这里就简化了技术栈。

二、常用的几种使用redis实现的消息队列方式

1、List数据结构

Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)一个列表最多可以包含 232 - 1 个元素 (4294967295, 每个列表超过40亿个元素)。
这里的列表大家可以想想为一个横着的通道,假设我现在往右边插入第一条数据,这个元素就会被放在最左边,接着再放入第二条数据,它就会在左边第二条,以此类推…插入了100条数据。 假设这个时候我要取出第一条,我就从最左边取就好。
这就变相实现了有序消息队列。具体实现大家自己研究
优点:操作方便,可以有序的取出自己插入的数据
缺点:不能进行实时消费,没有消费者

2、pub/sub 订阅消费模式

这就是传统的生产者->队列->消费者的模式。生产者的消息所有订阅者都能收到。

优点:实现了发布订阅模式,可以实时进行消费
缺点:没有消息持久化,在系统崩溃、宕机的时候;消息会丢失

3、sorted set有序集合Redis

有序集合和集合一样也是 string 类型元素的集合,且不允许重复的成员。不同的是每一个元素都会关联一个double分数,redis就是通过分数为集合中的成员进行从大到小的排列。
有序集合的成员是唯一的,但是score是可以重复的。
生成消息直接往s-set中插入数据,将score设置为接收到数据的13位时间戳;需要使用的时候再根据score大小有序取出来就行了。

看到这里是不是大家能想到,既然每条消息都带有时间,那我是不是可以顺手实现延迟队列。
这里只需要将score设置为 接受消息的时间戳+延迟时间 。我在使用的时候获取当天时间戳的数据,这样就实现了延迟消息队列。

优点:操作方便,可以实现延迟队列
缺点:不能实时进行消费

4、stream流 (redis5.0版本以上才有 重点讲)

Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:
在这里插入图片描述
每个stream流都有自己的名称,它是redis的key,也可以理解为队列名称。
Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。

stream常用命令

  • XADD 定义stream流,写入消息体
XADD mystream * field1 A field2 B field3 C field4 D
mystream:自定义流名称
*:由redis生成流的id(也可以自定义,但是得保证自增唯一)
field1-A \field2-B\field3-C :保存的消息体,key-value形式

-- 举例
redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
  • XDEL 删除消息
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) "3"
  • XRANGE 获取消息队列数据
XRANGE key start end [COUNT count]

key:strem流名称
start:开始值,- 表示最小值
end:结束值,+ 表示最大值


-- 举例:
redis> XRANGE mystream - + 2
从mystrem全部数据中取出两条数据

redis> XRANGE mystream + - 1
从mystream倒叙取一条数据
  • XREVRANGE 自动过滤已删除的消息
redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) "1601372731459-3"
   2) 1) "name"
      2) "Ngozi"
      3) "surname"
      4) "Adichie"
redis>
  • XREAD 阻塞或者非阻塞获取消息
# 从 Stream 头部读取两条消息
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"
count :数量
milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
key :队列名
id :消息 ID
  • XGROUP CREATE 创建消费者组
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

key :队列名称,如果不存在就创建
groupname :组名。
$ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略

从头开始消费:
XGROUP CREATE mystream consumer-group-name 0-0  

从尾部开始消费:
XGROUP CREATE mystream consumer-group-name $

以上就是常用的steam流的命令,大家下来自己测试,练习。

三、springboot整合redis stream流

java中提供了连接redis的客户端,jedis和lettuce、redistemplate;RedisTemplate 是 Spring Data Redis 提供的一个高级抽象层,封装了 Jedis 或 Lettuce 等底层客户端。
它提供了丰富的功能,如序列化、事务支持、键过期等。这里主要讲主流的redistemplate整合,大家以后能直接使用。

实时消费

实时消费顾名思义,生产者发送消息,消费者立马进行消费逻辑处理。

  • RedisStreamUtils工具类,方便后续进行stream操作没根据自己项目需求来定义
@Configuration
@SuppressWarnings("all")
public class RedisStreamUtils {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 创建消费组
     *
     * @param streamKey   键名称
     * @param group 组名称
     * @return {@link String}
     */
    public String createGroup(String streamKey, String group) {
        return redisTemplate.opsForStream().createGroup(streamKey, group);
    }

    /**
     * 获取消费者信息
     *
     * @param streamKey   键名称
     * @param group 组名称
     * @return {@link StreamInfo.XInfoConsumers}
     */
    public StreamInfo.XInfoConsumers queryConsumers(String streamKey, String group) {
        return redisTemplate.opsForStream().consumers(streamKey, group);
    }

    /**
     * 查询组信息
     *
     * @param streamKey 键名称
     * @return
     */
    public StreamInfo.XInfoGroups queryGroups(String streamKey) {
        return redisTemplate.opsForStream().groups(streamKey);
    }

    // 添加Map消息
    public String addMap(String streamKey, Map<String, Object> value) {
        return Objects.requireNonNull(redisTemplate.opsForStream().add(streamKey, value)).getValue();
    }

    // 读取消息
    public List<MapRecord<String, Object, Object>> read(String streamKey) {
        return redisTemplate.opsForStream().read(StreamOffset.fromStart(streamKey));
    }

    // 确认消费
    public Long ack(String streamKey, String group, String... recordIds) {
        return redisTemplate.opsForStream().acknowledge(streamKey, group, recordIds);
    }

    // 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
    public Long del(String key, String... recordIds) {
        return redisTemplate.opsForStream().delete(key, recordIds);
    }

    // 判断是否存在key
    public boolean hasKey(String key) {
        Boolean aBoolean = redisTemplate.hasKey(key);
        return aBoolean != null && aBoolean;
    }


}
  • RedisConfig配置文件
@Configuration
@Slf4j
@RequiredArgsConstructor
public class RedisConfig {

    private final RedisStreamUtils redisStreamUtil;
    private final Environment environment;

	//消费者处理消息配置
    @Bean
    public Subscription subscription(RedisConnectionFactory factory) {
        AtomicInteger index = new AtomicInteger(1);
        //获取系统处理器数量 创建线程池,开启守护线程
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        //流消息监听容器参数设置 
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer
                        .StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多获取多少条消息
                        .batchSize(5)
                        //执行线程池
                        .executor(executor)
                        //阻塞消息读取(延迟消息)
                        .pollTimeout(Duration.ofSeconds(1))
                        //异常处理
                        .errorHandler(throwable -> {
                            log.error("[MQ handler exception]", throwable);
                            throwable.printStackTrace();
                        })
                        .build();

		//通过redis连接工厂,创建流消息监听容器
		var listenerContainer = StreamMessageListenerContainer.create(factory, options);
		//初始化流和消费者处理配置
		//初始化流和消费者处理配置
        Subscription subscription = initStreamAndConsumer(listenerContainer);
		//开启监听容器
        listenerContainer.start();
        return subscription;
    }


	private Subscription initStreamAndConsumer(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer){
		//↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
		//这一部分可以不用配置,可以根据自己的实际情况配置
        //该key和group可根据需求自定义配置
        String streamName = "mystream";
        String groupname = "mygroup";
        initStream(streamName, groupname);
        // 手动ask消息
        //消费者处理完消息之后,会进行确认;这里有一个pending状态会变成已处理
        Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),
                StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumer(redisStreamUtil));
        // 自动ask消息
           /* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/
        //这一部分可以不用配置,可以根据自己的实际情况配置
		//↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
		return subscription;
	}

    private void initStream(String key, String group) {
        boolean hasKey = redisStreamUtil.hasKey(key);
        if (!hasKey) {
            Map<String, Object> map = new HashMap<>(1);
            map.put("field", "value");
            //创建主题
            String result = redisStreamUtil.addMap(key, map);
            //创建消费组
            redisStreamUtil.createGroup(key, group);
            //将初始化的值删除掉
            redisStreamUtil.del(key, result);
            log.info("stream:{}-group:{} initialize success", key, group);
        }
    }
}

大家这里可以想一想,这种写法是不是符合生产过程中的创建队列/消费者的逻辑,是不是不方便。能不能在我需要的时候直接调用方法去创建???假设现在我新增了一个业务需求,需要用不同的业务逻辑去处理,而且我希望定制不同的消费者应答模式,这个时候就需要一个通用方法去实现,这里我是这样做的。还是在工具类中

创建redis流消息监听容器
主要参数 :定义线程池、一次最大获取消息数、超时重新获取、异常处理

  @Bean
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(RedisConnectionFactory factory) {
        log.info("redis ip:{},port:{}",environment.getProperty("spring.data.redis.host"),environment.getProperty("spring.data.redis.port"));
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer
                        .StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多获取多少条消息
                        .batchSize(5)
                        .executor(executor)
                        .pollTimeout(Duration.ofSeconds(3))
                        .errorHandler(throwable -> {
                            log.error("[MQ handler exception]", throwable);
                            throwable.printStackTrace();
                        })
                        .build();
        return StreamMessageListenerContainer.create(factory, options);
    }

//业务需求调用此方法即可
public void addNewStreamAndSubscribe(String streamName, String groupName, String consumerId, StreamListener listener) {
        initStream(streamName, groupName);
        subscribeToStream(streamName, groupName, Consumer.from(groupName, consumerId), listener);
    }

    public void addNewStreamAndSubscribe(String streamName, String groupName, String consumerId, RedisConsumer listener,Map<String,Object> recodMap) {
        initStream(streamName, groupName);
        subscribeToStream(streamName, groupName, Consumer.from(groupName, consumerId), listener);
        addMap(streamName, recodMap);
    }

    private void subscribeToStream(String streamName, String groupName, Consumer consumer, StreamListener listener) {
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = streamMessageListenerContainer(redisConnectionFactory);
        Subscription subscription = container.receive(Consumer.from(groupName, consumer.getName()),
                StreamOffset.create(streamName, ReadOffset.lastConsumed()), listener);
        //开始消息容器监听
        container.start();
        log.info("Subscribed to stream: {} with group: {} and consumer: {}", streamName, groupName, consumer);
    }

streamMessageListenerContainer中的 .batchSize(1) 设置需要着重说一下。意思是在消费者在监听到数据的时候,一次从redis中取出的多少条数据,假设我设置1,就意味着我的监听器会redis中取出1条未消费的数据,随后进入消费者逻辑,处理完毕之后返回;继续由监听器读取1条数据,在进入消费者逻辑;这个值设置得越小消息处理数据越快,但是也会增加redis链接的资源。
较大的 batchSize 可以减少与 Redis 服务器的交互次数,降低网络通信开销,提高处理效率。
较小的 batchSize 适用于需要低延迟处理的场景,但会增加网络通信开销和 CPU 使用率。

  • RedisConsumer消费者
@Component("RedisConsumer")
@RequiredArgsConstructor
@Slf4j
public class RedisConsumer implements StreamListener<String, MapRecord<String,String,String>> {

    private final RedisStreamUtils redisStreamUtils;
    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        try {
            log.info("RedisConsumer1获取到了消息:{}",message);
            String streamKey = message.getStream();
            RecordId recordId = message.getId();
            Map<String, String> value = message.getValue();

            //获取这个流下 所有的消费者组
            StreamInfo.XInfoGroups xInfoGroups = redisStreamUtils.queryGroups(streamKey);

            //处理逻辑
            //↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
            log.info("【streamKey】= {},【recordId】= {},【msg】= {}",streamKey,recordId, value);
             //手动确认ack消息,并删除已处理的消息
             //我这里使用手动
            xInfoGroups.forEach(xInfoGroup -> redisStreamUtils.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));
            //自动确认消息 ---------自己下来研究
            //↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
            //根据业务场景来看是否需要删除消息
//        redisStreamUtils.del(streamKey, recordId.getValue());
        } catch (Exception e) {
            throw new ServiceException("消费异常");
        }
    }
}
  • RedisConsumer2消费者
@Component("RedisConsumer2")
@RequiredArgsConstructor
@Slf4j
public class RedisConsumer2 implements StreamListener<String, MapRecord<String,String,String>> {

    private final RedisStreamUtils redisStreamUtils;
    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        try {
            log.info("RedisConsumer2获取到了消息:{}",message);
            String streamKey = message.getStream();
            RecordId recordId = message.getId();
            Map<String, String> value = message.getValue();
            //获取这个流下 所有的消费者组
            StreamInfo.XInfoGroups xInfoGroups = redisStreamUtils.queryGroups(streamKey);
            //处理逻辑
            //↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
            log.info("【streamKey】= {},【recordId】= {},【msg】= {}",streamKey,recordId, value);
            //↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
            //手动确认ack消息,并删除已处理的消息
            xInfoGroups.forEach(xInfoGroup -> redisStreamUtils.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));
//        redisStreamUtils.del(streamKey, recordId.getValue());
        } catch (Exception e) {
            throw new ServiceException("消费异常");
        }
    }
}
  • RedisStreamcontroller模拟测试
@RequestMapping(value = "/redisStream")
@RestController
@RequiredArgsConstructor
@Slf4j
@SuppressWarnings("all")
public class RedisStreamController {

    private final RedisStreamUtils redisStreamUtils;
    private final RedisConsumer redisConsumer;
    private final RedisTemplate redisTemplate;

    private final ApplicationContext applicationContext;

    @GetMapping(value = "/addNewStreamAndSubscribe")
    public ResultVO addNewStreamAndSubscribe(@RequestParam("streamKey") String streamKey,
                                              @RequestParam("groupName") String groupName,
                                              @RequestParam("consumer")String consumer,
                                             @RequestParam("consumerClass") String consumerClass){
        try {
            // 获取实现类的实例
            StreamListener consumerInstance = (StreamListener) applicationContext.getBean(consumerClass);
            redisStreamUtils.addNewStreamAndSubscribe(streamKey, groupName, consumer,consumerInstance );
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return ResultVO.success();
    }

    @GetMapping(value = "/addMap")
    public ResultVO addMap(@RequestParam("streamKey") String streamKey,
                           @RequestParam("key")String key,
                           @RequestParam("value")String value) {
        HashMap<String, Object> objectObjectHashMap = new HashMap<>();
        objectObjectHashMap.put(key,value);
        redisStreamUtils.addMap(streamKey,objectObjectHashMap);
        return ResultVO.success();
    }

    @GetMapping(value = "/getGroup")
    public ResultVO getGroup(@RequestParam("streamKey") String streamKey,
                             @RequestParam("groupName") String groupName) {
        boolean b = redisStreamUtils.hasKey(streamKey);
        if(b){
            StreamInfo.XInfoGroups xInfoGroups = redisStreamUtils.queryGroups(streamKey);
            List<Object> list = new ArrayList<>();
            for (StreamInfo.XInfoGroup xInfoGroup : xInfoGroups) {
                StreamInfo.XInfoConsumers xInfoConsumers = null;
                if(StrUtil.isNotEmpty(groupName)){
                    xInfoConsumers = redisStreamUtils.queryConsumers(streamKey, groupName);
                    for (StreamInfo.XInfoConsumer xInfoConsumer : xInfoConsumers) {
                        log.info("group:{},pending:{},consumerCount:{},consumerName:{},lastDeliveryId:{}"
                                ,xInfoGroup.groupName(),xInfoGroup.pendingCount(),xInfoGroup.consumerCount(),xInfoConsumer.consumerName(),xInfoGroup.lastDeliveredId());
                    }
                }
            }
        }else{
            log.info("streamKey不存在:{}",streamKey);
            return ResultVO.error("streamKey不存在");
        }
        return ResultVO.success();
    }

    @GetMapping(value = "/delStream")
    public ResultVO delStream(@RequestParam("streamKey") String streamKey){
        redisTemplate.delete(streamKey);
        return ResultVO.success();
    }

    @GetMapping(value = "/readMsg")
    public ResultVO readMsg(@RequestParam("streamKey") String streamKey,
                            @RequestParam("groupName") String groupName,
                            @RequestParam("consumer") String consumer){
        // 读取消息,每次读取最多 5 条
        List read = redisTemplate.opsForStream().read(
                Consumer.from(groupName, consumer),
                StreamReadOptions.empty().count(10).block(Duration.ofSeconds(1)),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed())
        );
        return ResultVO.success(JSON.toJSONString(read));
    }

项目启动

调用/addNewStreamAndSubscribe接口
  • 创建流、监听容器
  • 消费者绑定流+消费者逻辑处理类
  • 接收生产者消息方式(最新、偏移量)
  • 开启消息容器监听
调用/addMap接口,发送消息

如果只有一个消费者,那么当消费者出现异常的时候,直到服务恢复,会从上一次消费的数据开始进行消费。

假设现在消费者组有两个消费者,都绑定了同一个消息流,这个时候发送消息就是轮询访问。
RedisConsumer1获取到了消息
RedisConsumer2获取到了消息
RedisConsumer1获取到了消息
RedisConsumer2获取到了消息

如果consumer1出现了异常,这个时候consumer2会正常消费所有的数据。
stream本身就支持持久化数据,也是dbs和aof两种。不用担心数据丢失。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2242066.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

单片机智能家居火灾环境安全检测

目录 前言 一、本设计主要实现哪些很“开门”功能&#xff1f; 二、电路设计原理图 电路图采用Altium Designer进行设计&#xff1a; 三、实物设计图 四、程序源代码设计 五、获取资料内容 前言 在现代社会&#xff0c;火灾安全始终是人们关注的重点问题。随着科技的不…

【目标检测】用YOLOv8-Segment训练语义分割数据集(保姆级教学)

前言 这篇教程会手把手带你用 YOLOv8-Segment 搭建一个属于自己的分割任务项目。从环境配置到数据集准备&#xff0c;再到模型训练和测试&#xff0c;所有步骤都有详细说明&#xff0c;适合初学者使用。你将学会如何安装必要的软件&#xff0c;标注自己的数据&#xff0c;并使…

mac2019环境 Airflow+hive+spark+hadoop本地环境安装

1 环境介绍 本地安装可分为两个部分&#xff0c;mac软件环境&#xff0c; python开发环境 ps: 安装过程参考chatgpt、csdn文章 1.1 mac软件环境 目标安装的的软件是hive、apache-spark、hadoop&#xff0c;但是这三个软件又依赖java(spark依赖&#xff09;、ssh&#xff08…

1.7 JS性能优化

从输入url到页面加载完成都做了些什么 输入 URL - 资源定位符 http://www.zhaowa.com - http 协议 域名解析 https://www.zhaowa.com > ip 1. 切HOST&#xff1f; > 浏览器缓存映射、系统、路由、运营商、根服务器 2. 实际的静态文件存放&#xff1f; 大流量 > 多个…

【Ansible常用命令+模块+Playbook+Roles】

Ansible 一、命令1.1 常用命令 二、模块2.1 shell模块2.2 复制模块2.3 用户模块2.4 软件包管理2.5 服务模块2.6 文件模块2.7 收集模块2.8 fetch2.9 cron2.10 group2.11 script2.12 unarchive 三、YAML Roles3.1 目录结构3.2 文件内容tasks/main.yamlnginx.conf.j2vars/main.yam…

Oracle19C AWR报告分析之Wait Classes by Total Wait Time

Oracle19C AWR报告分析之Wait Classes by Total Wait Time 一、分析数据二、详细分析2.1 指标参数介绍2.2 数据库性能分析2.3 综合性能评估 在 Oracle 数据库的 AWR 报告中&#xff0c;Wait Classes by Total Wait Time 是评估数据库性能的重要部分。本篇文章主要是介绍指标参数…

嵌入式硬件电子电路设计(五)MOS管详解(NMOS、PMOS、三极管跟mos管的区别)

引言&#xff1a;在我们的日常使用中&#xff0c;MOS就是个纯粹的电子开关&#xff0c;虽然MOS管也有放大作用&#xff0c;但是几乎用不到&#xff0c;只用它的开关作用&#xff0c;一般的电机驱动&#xff0c;开关电源&#xff0c;逆变器等大功率设备&#xff0c;全部使用MOS管…

问题大集-01-kafka问题

1、问题&#xff1a;Windows下启动单机kafka出现&#xff1a;系统找不到指定路径 解决&#xff1a; 是kafka不能识别本机的java环境&#xff08;JVM&#xff09;&#xff0c;故需要指定java路径&#xff0c; 进入kafka路径下的\bin\windows&#xff0c;找到&#xff1a;kafk…

C++ 的发展

目录 C 的发展总结&#xff1a;​编辑 1. C 的早期发展&#xff08;1979-1985&#xff09; 2. C 标准化过程&#xff08;1985-1998&#xff09; 3. C 标准演化&#xff08;2003-2011&#xff09; 4. C11&#xff08;2011年&#xff09; 5. C14&#xff08;2014年&#xf…

Ubuntu问题 -- 允许ssh使用root用户登陆

目的 新重装的系统, 普通用户可以使用ssh登陆服务器, 但是root不能使用ssh登陆 方法 vim 编辑ssh配置文件 sudo vim /etc/ssh/sshd_config找到 PermitRootLogin 这一行, 把后面值改成 yes 重启ssh sudo service sshd restart然后使用root账号登陆即可

HarmonyOS4+NEXT星河版入门与项目实战--------开发工具与环境准备

文章目录 1、熟悉鸿蒙官网1、打开官网2、下载 DevEco Studio3、HarmonyOS 资源库4、开发指南与API 2、安装 DevEco Studio1、软件安装2、配置开发工具 1、熟悉鸿蒙官网 1、打开官网 百度搜索 鸿蒙开发者官网 点击进入开发者官网&#xff0c;点击开发&#xff0c;可以看到各种…

使用 start-local 脚本在本地运行 Elasticsearch

警告&#xff1a;请勿将这些说明用于生产部署 本页上的说明仅适用于本地开发。请勿将此配置用于生产部署&#xff0c;因为它不安全。请参阅部署选项以获取生产部署选项列表。 使用 start-local 脚本在 Docker 中快速设置 Elasticsearch 和 Kibana 以进行本地开发或测试。 此设…

【大数据学习 | HBASE高级】hbase-phoenix 与二次索引应用

1. hbase-phoenix的应用 1.1 概述&#xff1a; 上面我们学会了hbase的操作和原理&#xff0c;以及外部集成的mr的计算方式&#xff0c;但是我们在使用hbase的时候&#xff0c;有的时候我们要直接操作hbase做部分数据的查询和插入&#xff0c;这种原生的方式操作在工作过程中还…

Nginx server_name配置错误导致路由upstream超时问题

一、问题描述 某次本平台和外部平台接口调用&#xff0c;同样Nginx location配置&#xff0c;测试环境调用正常&#xff0c;生产环境调用返回失败&#xff1b; 相关链接&#xff1a;Nginx官方文档、server_name、How nginx processes a request 二、排查处理 1&#xff09…

Android Studio 控制台输出的中文显示乱码

1. Android Studio 控制台输出的中文显示乱码 1.1. 问题 安卓在调试阶段&#xff0c;需要查看app运行时的输出信息、出错提示信息。乱码&#xff0c;会极大的阻碍开发者前进的信心&#xff0c;不能及时的根据提示信息定位问题&#xff0c;因此我们需要查看没有乱码的打印信息。…

linux001.在Oracle VM VirtualBox中ubuntu虚拟系统扩容

1.打开终端切换到virtualBox安装目录 2.输入命令扩容 如上终端中的代码解释&#xff1a; D:\Program Files\Oracle\VirtualBox>.\VBoxManage modifyhd D:\ubuntu18.04\Ubuntu18.04\Ubuntu18.04.vdi --resize 40960如上代码说明&#xff1a;D:\Program Files\Oracle\Virtual…

【桌面应用程序】Vue-Electron 环境构建、打包与测试(Windows)

前言 Vue 与 Electron 环境构建、打包与测试。 目录 前言 一、基本环境准备 二、配置npm源 三、创建Vue项目 四、添加Electron支持 五、应用启动 ​六、添加UI框架 ElementUI ​七、打包 一、基本环境准备 npm版本&#xff1a;8.6.0node版本&#xff1a;v18.0.0Vue/…

C#获取视频第一帧_腾讯云媒体处理获取视频第一帧

一、 使用步骤&#xff1a; 第一步、腾讯云开启万象 第二步、安装Tencent.QCloud.Cos.Sdk 包 第三步、修改 腾讯云配置 图片存储目录配置 第四步、执行获取图片并保存 二、封装代码 using System.Text; using System.Threading.Tasks;using COSXML.Model.CI; using COSXML.A…

Jav项目实战II基于微信小程序的助农扶贫的设计与实现(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 在当前社会…

ffmpeg+D3D实现的MFC音视频播放器,支持录像、截图、音视频播放、码流信息显示等功能

一、简介 本播放器是在vs2019 x86下开发&#xff0c;通过ffmpeg实现拉流解码功能&#xff0c;通过D3D实现视频的渲染功能。截图功能采用libjpeg实现&#xff0c;可以截取jpg图片&#xff0c;图片的默认保存路径是在C:\MYRecPath中。录像功能采用封装好的类Mp4Record实现&#x…