搭建大型分布式服务(三十六)SpringBoot 零代码方式整合多个kafka数据源

news2025/1/11 17:45:31

系列文章目录


文章目录

  • 系列文章目录
  • 前言
      • 一、本文要点
      • 二、开发环境
      • 三、创建项目
      • 四、测试一下
      • 五、小结


前言

让我们来看一下网上是怎样使用SpringBoot整合kafka数据源的,都存在哪些痛点?

痛点一:
手撸kafka配置代码,各种硬编码,无法利用SpringBoot的约定大于配置的优势。
在这里插入图片描述
痛点二:
当项目需要消费的topic,而且他们在不同集群时,需要不断地复制粘贴config和factory,如果项目需要5个不同集群的topic以上,那么这些代码将面临巨大维护压力,并且极其容易出错。
在这里插入图片描述
在这里插入图片描述
痛点三:
假如来了个新业务,也是消费kafka,然后做一些业务逻辑处理,你会发现你不得不又搭建一个新工程,然后重复上述步骤,把代码和配置都复制粘贴一遍。
在这里插入图片描述

分析了上述痛点,我们都知道得把配置和业务代码分离,减轻我们的无意义的重复劳作。那么我们怎么把这些跟kafka相关的东西都抽象成一个模块,让我们精力回归到业务开发本身上呢?

例如简单这样配置就完成SpringBoot和kafka的整合,我们只需要关心com.mmc.multi.kafka.starter.OneProcessorcom.mmc.multi.kafka.starter.TwoProcessor 和业务相关的代码开发。

## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=com.mmc.multi.kafka.starter.TwoProcessor // 业务处理类名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

国籍惯例,先上源码:Github源码

一、本文要点

本文将介绍通过封装一个starter,来实现多kafka数据源的配置,通过通过源码,可以学习以下特性。系列文章完整目录

  • SpringBoot 整合多个kafka数据源
  • SpringBoot 批量消费kafka消息
  • SpringBoot 优雅地启动或停止消费kafka
  • SpringBoot kafka本地单元测试(免集群)
  • SpringBoot 利用map注入多份配置
  • SpringBoot BeanPostProcessor 后置处理器使用方式
  • SpringBoot 将自定义类注册到IOC容器
  • SpringBoot 注入bean到自定义类成员变量

二、开发环境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • kafka-client 2.6.6
  • idea 2020

三、创建项目

1、引入SpringBoot的starter和kafka相关jar包。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>


        <!-- kafka -->
        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
        </dependency>

2、在项目工程resources目录下新建META-INF/spring.factories文件,指定配置类,让它完成kafka相关配置。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.mmc.multi.kafka.starter.MmcMultiConsumerAutoConfiguration

3、定义一个配置类MmcMultiKafkaProperties,用来接收上述配置。

## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=com.mmc.multi.kafka.starter.TwoProcessor  // 业务处理类名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

配置类代码如下,其中Consumer是SpringBoot kafka框架源码类,这也是优雅的地方,这样你就可以像使用SpringBoot配置kafka一样使用本starter,使用方式基本保持一致,该支持的配置不用改代码也支持,配置的时候也有智能提示:

@ToString
@Data
@ConfigurationProperties(prefix = "spring")
public class MmcMultiKafkaProperties {

    /**
     * 支持多个kafka配置.
     */
    private Map<String, MmcKafkaProperties> kafka = new HashMap<>();


    /**
     * MmcKafkaProperties.
     */
    @Data
    static class MmcKafkaProperties {
        /**
         * 是否启用.
         */
        private boolean enabled;
        /**
         * 主题(支持配置多个topic,英文逗号分隔).
         */
        private String topic;
        /**
         * 消费组.
         */
        private String groupId;
        /**
         * 并发度.
         */
        private Integer concurrency = 1;
        /**
         * 批量消费.
         */
        private String type = "batch";
        /**
         * 是否在批次内对kafka进行去重,默认为true.
         */
        private boolean duplicate = true;
        /**
         * 处理类.
         */
        private String processor;
        /**
         * 消费者.
         */
        private final KafkaProperties.Consumer consumer = new KafkaProperties.Consumer();
    
        public Map<String, Object> buildConsumerProperties() {
            return new HashMap<>(this.consumer.buildProperties());
        }

    }
}

4、编辑MmcMultiConsumerAutoConfiguration类,根据配置构建不同的kafka factory。

@Slf4j
@Configuration
@EnableConfigurationProperties(MmcMultiKafkaProperties.class)
@ConditionalOnProperty(prefix = "spring.kafka", value = "enabled", matchIfMissing = true)
public class MmcMultiConsumerAutoConfiguration extends BaseConsumerConfiguration {

    @Resource
    private MmcMultiKafkaProperties mmcMultiKafkaProperties;

    @Bean
    public MmcKafkaInputerContainer mmcKafkaInputerContainer(MmcKafkaProcessorFactory factory,
                                                             MmcKafkaBeanPostProcessor beanPostProcessor) throws Exception {

        Map<String, MmcInputer> inputers = new HashMap<>();

        Map<String, MmcMultiKafkaProperties.MmcKafkaProperties> kafkas = mmcMultiKafkaProperties.getKafka();

        // 逐个遍历,并生成consumer
        for (Map.Entry<String, MmcMultiKafkaProperties.MmcKafkaProperties> entry : kafkas.entrySet()) {

            // 唯一消费者名称
            String name = entry.getKey();

            // 消费者配置
            MmcMultiKafkaProperties.MmcKafkaProperties properties = entry.getValue();

            // 是否开启
            if (properties.isEnabled()) {

                // 生成消费者
                MmcKafkaKafkaAbastrctProcessor inputer = factory.buildInputer(name, properties, beanPostProcessor.getSuitableClass());

                // 输入源容器
                ConcurrentMessageListenerContainer<Object, Object> container = concurrentMessageListenerContainer(properties);

                // 设置容器
                inputer.setContainer(container);
                inputer.setName(name);
                inputer.setProperties(properties);

                // 设置消费者
                container.setupMessageListener(inputer);

                // 关闭时候停止消费
                Runtime.getRuntime().addShutdownHook(new Thread(inputer::stop));

                // 直接启动
                container.start();

                // 加入集合
                inputers.put(name, inputer);
            }

        }

        return new MmcKafkaInputerContainer(inputers);
    }

    @Bean
    public MmcKafkaBeanPostProcessor mmcKafkaBeanPostProcessor() {
        return new MmcKafkaBeanPostProcessor();
    }

    @Bean
    public MmcKafkaProcessorFactory processorFactory() {

        return new MmcKafkaProcessorFactory();
    }


}

这里核心点是怎样构建业务处理类MmcKafkaKafkaAbastrctProcessor,为了方便功能的封装,所以这里我们要求所有业务处理类processor都应该继承MmcKafkaKafkaAbastrctProcessor,并定义kafka消息对应的实体类。考虑到很多开发者习惯定义@Service这些类来实现业务逻辑,所以这里我们利用BeanPostProcessor,将所有继承了MmcKafkaKafkaAbastrctProcessor类都收集起来。

public class MmcKafkaBeanPostProcessor implements BeanPostProcessor {

    @Getter
    private Map<String, MmcKafkaKafkaAbastrctProcessor<? extends MmcKafkaMsg>> suitableClass = new ConcurrentHashMap<>();

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        if (bean instanceof MmcKafkaKafkaAbastrctProcessor) {

            MmcKafkaKafkaAbastrctProcessor<? extends MmcKafkaMsg> target = (MmcKafkaKafkaAbastrctProcessor<? extends MmcKafkaMsg>) bean;
            suitableClass.putIfAbsent(beanName, target);
            suitableClass.putIfAbsent(bean.getClass().getName(), target);
        }

        return bean;
    }
}

然后跟配置类指定的processor进行一一绑定。

spring.kafka.one.processor=oneProcessor   // 业务处理类名称
spring.kafka.two.processor=twoProcessor   // 业务处理类名称

@Slf4j
@Service
public class OneProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {

    @Resource
    private DemoService demoService;

    @Override
    protected Class<DemoMsg> getEntityClass() {
        return DemoMsg.class;
    }

    @Override
    protected void dealMessage(List<DemoMsg> datas) {

        demoService.dealMessage("one", datas.stream().map(x -> (MmcKafkaMsg) x).collect(Collectors.toList()));

    }


}

同时考虑到很多朋友也喜欢定义一个Class的方式来生成业务处理逻辑,所以这里也做了兼容,于是spring.kafka.xxx.processor 配置,既支持bean的名称,也支持Class的完整路径。

public class MmcKafkaProcessorFactory {

    @Resource
    private DefaultListableBeanFactory defaultListableBeanFactory;

    public MmcKafkaKafkaAbastrctProcessor<? extends MmcKafkaMsg> buildInputer(
            String name, MmcMultiKafkaProperties.MmcKafkaProperties properties,
            Map<String, MmcKafkaKafkaAbastrctProcessor<? extends MmcKafkaMsg>> suitableClass) throws Exception {

        // 如果没有配置process,则直接从注册的Bean里查找
        if (!StringUtils.hasText(properties.getProcessor())) {

            return findProcessorByName(name, properties.getProcessor(), suitableClass);
        }

        // 如果配置了process,则从指定配置中生成实例
        // 判断给定的配置是类,还是bean名称
        if (!isClassName(properties.getProcessor())) {

            throw new IllegalArgumentException("It's not a class, wrong value of ${spring.kafka." + name + ".processor}.");
        }

        // 如果ioc容器已经存在该处理实例,则直接使用,避免既配置了process,又使用了@Service等注解
        MmcKafkaKafkaAbastrctProcessor<? extends MmcKafkaMsg> inc = findProcessorByClass(name, properties.getProcessor(), suitableClass);
        if (null != inc) {
            return inc;
        }

        // 指定的processor处理类必须继承MmcKafkaKafkaAbastrctProcessor
        Class<?> clazz = Class.forName(properties.getProcessor());
        boolean isSubclass = MmcKafkaKafkaAbastrctProcessor.class.isAssignableFrom(clazz);
        if (!isSubclass) {
            throw new IllegalStateException(clazz.getName() + " is not subClass of MmcKafkaKafkaAbastrctProcessor.");
        }

        // 创建实例
        Constructor<?> constructor = clazz.getConstructor();
        MmcKafkaKafkaAbastrctProcessor<? extends MmcKafkaMsg> ins = (MmcKafkaKafkaAbastrctProcessor<? extends MmcKafkaMsg>) constructor.newInstance();

        // 注入依赖的变量
        defaultListableBeanFactory.autowireBean(ins);

        return ins;
    }

    private MmcKafkaKafkaAbastrctProcessor<? extends MmcKafkaMsg> findProcessorByName(String name, String processor, Map<String,
            MmcKafkaKafkaAbastrctProcessor<? extends MmcKafkaMsg>> suitableClass) {

        return suitableClass.entrySet()
                .stream()
                .filter(e -> e.getKey().startsWith(name) || e.getKey().equalsIgnoreCase(processor))
                .map(Map.Entry::getValue)
                .findFirst()
                .orElseThrow(() -> new RuntimeException("Can't found any suitable processor class for the consumer which name is " + name
                        + ", please use the config ${spring.kafka." + name + ".processor} or set name of Bean like @Service(\"" + name + "Processor\") "));
    }


    private MmcKafkaKafkaAbastrctProcessor<? extends MmcKafkaMsg> findProcessorByClass(String name, String processor, Map<String,
            MmcKafkaKafkaAbastrctProcessor<? extends MmcKafkaMsg>> suitableClass) {

        return suitableClass.entrySet()
                .stream()
                .filter(e -> e.getKey().startsWith(name) || e.getKey().equalsIgnoreCase(processor))
                .map(Map.Entry::getValue)
                .findFirst()
                .orElse(null);
    }

    private boolean isClassName(String processor) {

        // 使用正则表达式验证类名格式
        String regex = "^[a-zA-Z_$][a-zA-Z\\d_$]*([.][a-zA-Z_$][a-zA-Z\\d_$]*)*$";
        return Pattern.matches(regex, processor);
    }

}

5、考虑到需要消费的kafka消息五花八门,所以我们才抽象了一个父类MmcKafkaKafkaAbastrctProcessor,要求所有业务逻辑处理类都需要继承它,实则上也是释放了我们劳动力,没必要关注消息的反序列化过程。同时,这里默认使用的都是kafka消息的批量拉取方式,如果你要单条处理,也可以逐条遍历处理。这里使用批量拉取有以下好处:
1、可以提升消息处理速度(下一篇文章将介绍kafka单分区实现10w级消息处理);
2、可以在batch里对消息做聚合和去重;

@Slf4j
@Setter
public abstract class MmcKafkaKafkaAbastrctProcessor<T extends MmcKafkaMsg> implements MmcKafkaStringInputer {

    /**
     * Kafka容器.
     */
    protected ConcurrentMessageListenerContainer<Object, Object> container;
    /**
     * 消费者名称.
     */
    protected String name;
    /**
     * 消费者配置.
     */
    protected MmcMultiKafkaProperties.MmcKafkaProperties properties;

    public MmcKafkaKafkaAbastrctProcessor() {

    }

    public MmcKafkaKafkaAbastrctProcessor(String name, MmcMultiKafkaProperties.MmcKafkaProperties properties) {

        this.name = name;
        this.properties = properties;
    }

    /**
     * 消费kafka消息.
     */
    @Override
    public void onMessage(List<ConsumerRecord<String, String>> records) {

        if (null == records || CollectionUtils.isEmpty(records)) {

            log.warn("{} records is null or records.value is empty.", name);
            return;
        }

        Assert.hasText(name, "You must pass the field `name` to the Constructor or invoke the setName() after the class was created.");
        Assert.notNull(properties, "You must pass the field `properties` to the Constructor or invoke the setProperties() after the class was created.");

        try {

            Stream<T> dataStream = records.stream()
                    .map(ConsumerRecord::value)
                    .flatMap(this::doParse)
                    .filter(Objects::nonNull)
                    .filter(this::isRightRecord);

            if (properties.isDuplicate()) {

                dataStream = dataStream.collect(Collectors.groupingBy(this::buildRoutekey))
                        .entrySet()
                        .stream()
                        .map(this::findLasted)
                        .filter(Objects::nonNull);
            }

            List<T> datas = dataStream.collect(Collectors.toList());
            if (CommonUtil.isNotEmpty(datas)) {

                this.dealMessage(datas);

            }


        } catch (Exception e) {

            log.error(name + "-dealMessage error ", e);
        }
    }


    /**
     * 将kafka消息解析为实体,支持json对象或者json数组.
     *
     * @param json kafka消息
     * @return 实体类
     */
    protected Stream<T> doParse(String json) {

        if (json.startsWith("[")) {

            // 数组
            List<T> datas = JsonUtil.parseJsonArray(json, getEntityClass());
            if (CommonUtil.isEmpty(datas)) {

                log.warn("{} doParse error, json={} is error.", name, json);
                return Stream.empty();
            }

            // 反序列对象后,做一些初始化操作
            datas = datas.stream().peek(this::doAfterParse).collect(Collectors.toList());

            return datas.stream();

        } else {

            // 对象
            T data = JsonUtil.parseJsonObject(json, getEntityClass());
            if (null == data) {

                log.warn("{} doParse error, json={} is error.", name, json);
                return Stream.empty();
            }

            // 反序列对象后,做一些初始化操作
            doAfterParse(data);

            return Stream.of(data);
        }
    }

    /**
     * 反序列对象后,做一些初始化操作.
     *
     * @param data 待处理的实体
     */
    protected void doAfterParse(T data) {

    }

    /**
     * 设置kafka容器.
     *
     * @param container kafka容器
     */
    public void setContainer(ConcurrentMessageListenerContainer<Object, Object> container) {

        this.container = container;
    }

    /**
     * 停止kafka容器.
     */
    public void stop() {
        container.stop();
    }

    /**
     * 启动kafka容器.
     */
    public void start() {
        container.start();
    }

    /**
     * 如果单次拉取的kafka消息有重复,则根据某个字段分组,并取最新的一个.
     *
     * @param entry entry
     * @return 最好更新的实体
     */
    protected  T findLasted(Map.Entry<String, List<T>> entry) {

        try {

            Optional<T> d = entry.getValue().stream()
                    .max(Comparator.comparing(T::getRoutekey));

            if (d.isPresent()) {

                return d.get();
            }

        } catch (Exception e) {

            String content = JsonUtil.toJsonStr(entry.getValue());
            log.error("处理消息出错:{}", e.getMessage() + ": " + content, e);
        }
        return null;
    }

    /**
     * 构造实体类的唯一键.
     *
     * @param t 待处理实体
     * @return 实体类的唯一键
     */
    protected String buildRoutekey(T t) {
        return t.getRoutekey();
    }

    /**
     * 过滤消息.
     *
     * @param t 待处理实体
     * @return true:不过滤,false:过滤
     */
    protected boolean isRightRecord(T t) {
        return true;
    }

    /**
     * 获取反序列的实体类类型.
     *
     * @return 反序列的实体类类型
     */
    protected abstract Class<T> getEntityClass();

    /**
     * 处理消息.
     *
     * @param datas 待处理列表
     */
    protected abstract void dealMessage(List<T> datas) throws ExecutionException, InterruptedException;


}

四、测试一下

1、引入kafka测试需要的jar。参考文章:kafka单元测试

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

2、定义一个消息实体和业务处理类。

@Data
class DemoMsg implements MmcKafkaMsg {

    private String routekey;

    private String name;

    private Long timestamp;

}
@Slf4j
@Service
public class OneProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {

    @Resource
    private DemoService demoService;

    @Override
    protected Class<DemoMsg> getEntityClass() {
        return DemoMsg.class;
    }

    @Override
    protected void dealMessage(List<DemoMsg> datas) {

        demoService.dealMessage("one", datas.stream().map(x -> (MmcKafkaMsg) x).collect(Collectors.toList()));

    }


}

3、配置kafka地址和指定业务处理类。

spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor  // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

4、编写测试类。

@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {MmcMultiConsumerAutoConfiguration.class, DemoService.class, OneProcessor.class})
@TestPropertySource(value = "classpath:application.properties")
@DirtiesContext
@EmbeddedKafka(topics = {"${spring.kafka.one.topic}"})
class AppTest {


    @Resource
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Value("${spring.kafka.one.topic}")
    private String topicOne;

    @Value("${spring.kafka.two.topic}")
    private String topicTwo;

    @Test
    void testDealMessage() throws Exception {

        // 模拟生产数据
        produceMessage();

        Thread.sleep(10 * 1000);
    }

    void produceMessage() {

        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();

        for (int i = 0; i < 10; i++) {

            DemoMsg msg = new DemoMsg();
            msg.setRoutekey("routekey" + i);
            msg.setName("name" + i);
            msg.setTimestamp(System.currentTimeMillis());

            String json = JsonUtil.toJsonStr(msg);
            producer.send(new ProducerRecord<>(topicOne, "my-aggregate-id", json));
            producer.send(new ProducerRecord<>(topicTwo, "my-aggregate-id", json));
            producer.flush();

        }
    }
}

5、运行一下,测试通过。
在这里插入图片描述

五、小结

将本项目代码构建成starter,就可以大大提升我们开发效率,我们只需要关心业务代码的开发,github项目源码:轻触这里。如果对你有用可以打个星星哦。下一篇,升级本starter,在kafka单分区下实现十万级消费处理速度。

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1220724.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

cocos3.4.2 2d射线检测 和 animation动画

2D的射线检测 ,注:目标必须有2d刚体和2d碰撞器 ,且项目设置内必须是这个物理系统 //起点位置let objs new Vec2(this.node.getWorldPosition().x, this.node.getWorldPosition().y);// 终点 let obje new Vec2(objs.x 100, objs.y);// 射线检测let results PhysicsSystem2…

Unity中Shader纹理的环绕方式

文章目录 前言一、修改环绕方式前的设置准备二、在纹理的设置面板可以修改环绕方式三、在Shader中&#xff0c;实现纹理的环绕方式切换1、在属性面板定义一个和纹理面板一样的纹理环绕方式下拉框2、在Pass中&#xff0c;定义枚举对应的变体3、在片元着色器中&#xff0c;纹理采…

【数据结构与算法】JavaScript实现树结构(一)

文章目录 一、树结构简介1.1.简单了解树结构1.2.树结构的表示方式 二、二叉树2.1.二叉树简介2.2.特殊的二叉树2.3.二叉树的数据存储 三、二叉搜索树3.1.认识二叉搜索树3.2.二叉搜索树应用举例 一、树结构简介 1.1.简单了解树结构 什么是树&#xff1f; 真实的树&#xff1a;…

Redis数据库双写一致性解决方案

程序员的公众号&#xff1a;源1024&#xff0c;获取更多资料&#xff0c;无加密无套路&#xff01; 最近整理了一份大厂面试资料《史上最全大厂面试题》&#xff0c;Springboot、微服务、算法、数据结构、Zookeeper、Mybatis、Dubbo、linux、Kafka、Elasticsearch、数据库等等 …

SpringCloud微服务:Nacos和Eureka的区别

目录 配置&#xff1a; 区别&#xff1a; ephemeral设置为true时 ephemeral设置为false时&#xff08;这里我使用的服务是order-service&#xff09; 1. Nacos与eureka的共同点 都支持服务注册和服务拉取 都支持服务提供者心跳方式做健康检测 2. Nacos与Eu…

vscode调试pytorch的DistributedDataParallel代码

这里写自定义目录标题 一、查找launch.py二、修改launch.json三、特别提醒3.1 错误的写法3.2 正确的写法 一、查找launch.py 使用代码。 find / -name launch.py | grep distributed得到的结果如下 这里我们得到了两个结果&#xff0c;看目标文件的路径名&#xff0c;第二个…

深度学习到智能小车(1)深度学习框架

0.前提 最近新开了一门叫机器学习的课程&#xff0c;老师一直在跟我们讲一些有关这方面的知识&#xff0c;告诉我们一定要学好数学&#xff0c;因为数学是算法的基础。我手上的donkeycar刚好也涉及到Keras深度神经网络&#xff0c;所以出于好奇我去图书馆借回了一本叫《Keras深…

一键免费去除视频水印和字幕的AI工具

最近有学员经常让我分享好用的智能抹除视频水印字幕AI工具&#xff0c;今天就给大家分享一个我经常用到的这款工具——腾讯智影&#xff0c;这个平台提供的智能抹除功能&#xff0c;借助这个工具我们可以将视频中不需要的字幕或者水印删除掉。 不过这款工具每天有三次免费次数…

端口映射软件

今天给大家介绍一个自己制作的工具&#xff0c;本工具可以把本地自己的项目映射到外网可以访问,自己有域名可以使用自己的,没有可以用软件自带的三级域名! Token获取 地址&#xff1a;传送 打开上面网址注册账号&#xff0c;然后点击验证&#xff0c;复制里面的值即可。 软件…

openpyxl获取单元格的主题色的颜色值

&#x1f4e2;作者&#xff1a; 小小明-代码实体 &#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/as604049322 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 欢迎讨论&#xff01; openpyxl 支持以下几种颜色类型&#xff1a; RGB (Red, Green, …

Stable Diffusion - StableDiffusion WebUI 软件升级与扩展兼容

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/134463035 目前&#xff0c;StableDiffusion WebUI 的版本是 1.6.0&#xff0c;同步更新 controlnet、tagcomplete、roop、easy-prompt-selector等…

Go 语言变量类型和声明详解

在Go中&#xff0c;有不同的变量类型&#xff0c;例如&#xff1a; int 存储整数&#xff08;整数&#xff09;&#xff0c;例如123或-123float32 存储浮点数字&#xff0c;带小数&#xff0c;例如19.99或-19.99string - 存储文本&#xff0c;例如“ Hello World”。字符串值用…

批量替换WordPress文章内图片链接

在WordPress使用过程中&#xff0c;如果中途更换了域名&#xff0c;原先文章内的图片使用的是原来的域名&#xff0c;就会造成文章页里面的图片链接无法显示。如果从后台文章挨个修改就比较麻烦。可以通过数据库进行批量替换即可。 使用 PHPMyadmin 打开 数据库&#xff0c;登…

我所理解的 UI Toolkit 启蒙阶段(一)

我所理解的 UI Toolkit 启蒙阶段&#xff08;一&#xff09; 对于自己不会的新东西的学习&#xff0c;我认为最合适的路径就是&#xff1a; 实例教学视频 —> 实操熟悉 —> 官方文档查漏补缺 —> 拟定思路实现功能 但这 4 步并非每一步都需要下 100% 的功夫&#x…

Linux C 线程

线程 概述线程和进程的异同如何选择使用进程还是线程 函数获取进程自身ID  pthread_self创建线程  pthread_create退出线程  pthread_exit线程等待  pthread_join 四种线程模型1 &#xff09;单线程2 &#xff09;单线程3 &#xff09;双线程4 &#xff09;三线程 概述…

选择Amazon EC2,走进云端新时代

目录 前言 选择云服务器 / 海外服务器需要关注的重点 Amazon EC2 云服务器的优势所在 文末总结 前言 常言道&#xff0c;工欲善其事必先利其器&#xff0c;无论你是资深开发者&#xff0c;还是普通爱好者&#xff0c;在日常开发和学习生活中都需要用到云服务器提供的丰富的…

自建es数据迁移阿里云方案

一、ElasticSearch数据迁移方法介绍 https://help.aliyun.com/document_detail/170095.html?spma2c4g.26937906.0.0.429240c9ymiXGm 可以通过Logstash、reindex和OSS等多种方式完成阿里云Elasticsearch间数据迁移、Elasticsearch数据迁移至Openstore存储中、自建Elasticsear…

GB28181学习(十六)——基于jrtplib实现tcp被动和主动收流

前言 GB/T28181-2022实时流的传输方式介绍&#xff1a;https://blog.csdn.net/www_dong/article/details/134255185 tcp passive收流 流程图 注意&#xff1a; m字段指定传输方式为TCP/RTP/AVP&#xff1b;sdp信息中增加"asetup:passive"&#xff1b;SIP服务器启…

微服务学习 | Ribbon负载均衡、Nacos注册中心、微服务技术对比

Ribbon负载均衡 负载均衡流程 负载均衡策略 通过定义IRule实现可以修改负载均衡规则&#xff0c;有两种方式&#xff1a; 1. 代码方式:在服务消费者order-service中的OrderApplication类中&#xff0c;定义一个新的IRule: 2.配置文件方式: 在order-service的application.yml…

技术分享 | 如何写好测试用例?

对于软件测试工程师来说&#xff0c;设计测试用例和提交缺陷报告是最基本的职业技能。是非常重要的部分。一个好的测试用例能够指示测试人员如何对软件进行测试。在这篇文章中&#xff0c;我们将介绍测试用例设计常用的几种方法&#xff0c;以及如何编写高效的测试用例。 ## 一…