SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka

news2024/11/25 7:10:09

一、业务价值

监听数据变化,进行异步通知,做系统内异步任务。

架构方案(懒得写了,看图吧):

 

二、修改数据库配置

2.1、更改配置文件postgresql.conf

# 更改wal日志方式为logical(必须)
wal_level = logical # minimal, replica, or logical

# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots(每个文档都这么说,但根据我的实际操作来看,一个flink-cdc服务占用一个槽,但是要大于默认值10)
max_replication_slots = 20 # max number of replication slots

# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable  

2.2、创建数据变更采集用户及赋权

-- 创建pg 高线数据同步用户
create user offline_data_user with password 'offline_data_password';

-- 给用户复制流权限
alter role offline_data_user replication;

-- 给用户登录pmsdb数据库权限
grant connect on database 数据库名 to offline_data_user;

-- 给用户授予数据库XXXX下某些SCHEMA的XXX表的读作权限
grant select on all tables in SCHEMA 某 to offline_data_user;

grant usage on SCHEMA 某 to offline_data_user;

2.3、发布表


-- 设置表发布为true
update pg publication set pubalitables=true where pubname is not null;

-- 发表所有表
create PUBLICATION dbz publication FOR ALL TABLES;

三、SpringBoot集成Flink-CDC

3.1、添加Flink-CDC的依赖

<properties>

    <flink.version>1.16.0</flink.version>
    <flink-pg.version>2.3.0</flink-pg.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-postgres-cdc</artifactId>
        <version>${flink-pg.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

3.2 构建数据源

数据转换类,将从数据库采集的转成你想要的格式:

{
  "beforeData": "",
  "afterData": "",
  "eventType": "",
  "database": "",
  "schema": "",
  "tableName": "",
  "changeTime": 0
}

数据实体类 DataChangeInfo

package com.jie.flink.cdc.doman;


import lombok.Data;

import java.io.Serializable;

/**
 * @author zhanggj
 * @data 2023/1/31
 */
@Data
public class DataChangeInfo implements Serializable {

    /**
     * 变更前数据
     */
    private String beforeData;
    /**
     * 变更后数据
     */
    private String afterData;
    /**
     * 变更类型 create=新增、update=修改、delete=删除、read=初始读
     */
    private String eventType;
    /**
     * 数据库名
     */
    private String database;
    /**
     * schema
     */
    private String schema;
    /**
     * 表名
     */
    private String tableName;
    /**
     * 变更时间
     */
    private Long changeTime;
}

数据解析类PostgreSQLDeserialization

package com.jie.flink.cdc.flinksource;

import com.esotericsoftware.minlog.Log;
import com.jie.flink.cdc.datafilter.PostgreSQLDataFilter;
import com.jie.flink.cdc.doman.DataChangeInfo;
import com.jie.flink.cdc.util.JsonUtils;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
 * @author zhanggj
 * @data 2023/1/31
 * 数据转换
 */
@Slf4j
public class PostgreSQLDeserialization implements DebeziumDeserializationSchema<String> {

    public static final String TS_MS = "ts_ms";
    public static final String DATABASE = "db";
    public static final String SCHEMA = "schema";
    public static final String TABLE = "table";
    public static final String BEFORE = "before";
    public static final String AFTER = "after";
    public static final String SOURCE = "source";

    /**
     *
     * 反序列化数据,转为变更JSON对象
     * @param sourceRecord
     * @param collector
     * @return void
     * @author lei
     * @date 2022-08-25 14:44:31
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) {
        final String topic = sourceRecord.topic();
        log.debug("收到{}的消息,准备进行转换", topic);

        final DataChangeInfo dataChangeInfo = new DataChangeInfo();

        final Struct struct = (Struct) sourceRecord.value();
        final Struct source = struct.getStruct(SOURCE);
        dataChangeInfo.setBeforeData( getDataJsonString(struct, BEFORE));
        dataChangeInfo.setAfterData(getDataJsonString(struct, AFTER));

        //5.获取操作类型  CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        dataChangeInfo.setEventType(operation.toString().toLowerCase());
        dataChangeInfo.setDatabase(Optional.ofNullable(source.get(DATABASE)).map(Object::toString).orElse(""));
        dataChangeInfo.setSchema(Optional.ofNullable(source.get(SCHEMA)).map(Object::toString).orElse(""));
        dataChangeInfo.setTableName(Optional.ofNullable(source.get(TABLE)).map(Object::toString).orElse(""));
        dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));


        log.info("收到{}的{}类型的消息, 已经转换好了,准备发往sink", topic, dataChangeInfo.getEventType());
        //7.输出数据
        collector.collect(JsonUtils.toJSONString(dataChangeInfo));
    }

    private String getDataJsonString(final Struct struct, final String fieldName) {
        if (Objects.isNull(struct)) {
            return null;
        }
        final Struct element = struct.getStruct(fieldName);
        if (Objects.isNull(element)) {
            return null;
        }
        Map<String, Object> dataMap = new HashMap<>();
        Schema schema = element.schema();
        List<Field> fieldList = schema.fields();
        for (Field field : fieldList) {
            dataMap.put(field.name(), element.get(field));
        }
        return JsonUtils.toJSONString(dataMap);
    }


    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }
}

构建PG数据源PostgreSQLDataChangeSource

package com.jie.flink.cdc.flinksource;

import com.jie.flink.cdc.datafilter.PostgreSQLReadDataFilter;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.Properties;
import java.util.UUID;

/**
 * @author zhanggj
 * @data 2023/2/10
 * flink pg 数据源配置
 */
@Data
@Component
public class PostgreSQLDataChangeSource {

    /**
     * 数据库hostname
     */
    private String hostName;

    /**
     * 数据库 端口
     */
    private Integer port;

    /**
     * 库名
     */
    private String database;

    /**
     * 用户名
     */
    @Value("${spring.datasource.username}")
    private String userName;

    /**
     * 密码
     */
    @Value("${spring.datasource.password}")
    private String password;

    /**
     * schema 组
     */
    @Value("${jie.flink-cdc.stream.source.schemas:test_schema}")
    private String[] schemaArray;

    /**
     * 要监听的表
     */
    @Value("${jie.flink-cdc.stream.source.schemas:test_table}")
    private String[] tableArray;

    /**
     * 是否忽略初始化扫描数据
     */
    @Value("${jie.flink-cdc.stream.source.init-read.ignore:false}")
    private Boolean initReadIgnore;

    @Value("${spring.datasource.url}")
    private void splitUrl(String url) {
        final String[] urlSplit = StringUtils.split(url, "/");
        final String[] hostPortSplit = StringUtils.split(urlSplit[1], ":");
        this.hostName = hostPortSplit[0];
        this.port = Integer.parseInt(hostPortSplit[1]);
        this.database = StringUtils.substringBefore(urlSplit[2], "?");
    }

    @Bean("pgDataSource")
    public DebeziumSourceFunction<String> buildPostgreSQLDataSource() {
        Properties properties = new Properties();
        // 指定连接器启动时执行快照的条件:****重要*****
        //initial- 连接器仅在没有为逻辑服务器名称记录偏移量时才执行快照。
        //always- 连接器每次启动时都会执行快照。
        //never- 连接器从不执行快照。
        //initial_only- 连接器执行初始快照然后停止,不处理任何后续更改。
        //exported- 连接器根据创建复制槽的时间点执行快照。这是一种以无锁方式执行快照的绝佳方式。
        //custom- 连接器根据snapshot.custom.class属性的设置执行快照
        properties.setProperty("debezium.snapshot.mode", "initial");
        properties.setProperty("snapshot.mode", "initial");
        // 好像不起作用使用slot.name
        properties.setProperty("debezium.slot.name", "pg_cdc" + UUID.randomUUID());
        properties.setProperty("slot.name", "flink_slot" + UUID.randomUUID());
        properties.setProperty("debezium.slot.drop.on.top", "true");
        properties.setProperty("slot.drop.on.top", "true");
        // 更多参数配置参考debezium官网 https://debezium.io/documentation/reference/1.2/connectors/postgresql.html?spm=a2c4g.11186623.0.0.4d485fb3rgWieD#postgresql-property-snapshot-mode
        // 或阿里文档 https://help.aliyun.com/document_detail/184861.html

        PostgreSQLDeserialization deserialization = null;

        if (initReadIgnore) {
            properties.setProperty("debezium.snapshot.mode", "never");
            properties.setProperty("snapshot.mode", "never");
            deserialization = new PostgreSQLDeserialization(new PostgreSQLReadDataFilter());
        } else {
            deserialization = new PostgreSQLDeserialization();
        }

        return PostgreSQLSource.<String>builder()
                .hostname(hostName)
                .port(port)
                .username(userName)
                .password(password)
                .database(database)
                .schemaList(schemaArray)
                .tableList(tableArray)
                .decodingPluginName("pgoutput")
                .deserializer(deserialization)
                .debeziumProperties(properties)
                .build();
    }
}

3.3、构建kafkaSink

package com.jie.flink.cdc.flinksink;

import lombok.Data;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;


/**
 * @author zhanggj
 * @data 2023/2/10
 * flink kafka sink配置
 */
@Data
@Component
public class FlinkKafkaSink {

    @Value("${jie.flink-cdc.stream.sink.topic:offline_data_topic}")
    private String topic;

    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaBootstrapServers;

    @Bean("kafkaSink")
    public KafkaSink buildFlinkKafkaSink() {
        return KafkaSink.<String>builder()
                .setBootstrapServers(kafkaBootstrapServers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(topic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();
    }
}

3.4、创建flink-cdc监听

利用springboot的特性, 实现CommandLineRunner将flink-cdc 作为一个项目启动时需要运行的分支子任务即可

package com.jie.flink.cdc.listener;

import com.jie.flink.cdc.flinksink.DataChangeSink;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.time.Duration;

/**
 * @author zhanggj
 * @data 2023/1/31
 * 监听数据变更
 */
@Component
public class PostgreSQLEventListener implements CommandLineRunner {

    private final DataChangeSink dataChangeSink;
    private final KafkaSink<String> kafkaSink;
    private final DebeziumSourceFunction<String> pgDataSource;

    public PostgreSQLEventListener(final DataChangeSink dataChangeSink,
                                   final KafkaSink<String> kafkaSink,
                                   final DebeziumSourceFunction<String> pgDataSource) {
        this.dataChangeSink = dataChangeSink;
        this.kafkaSink = kafkaSink;
        this.pgDataSource = pgDataSource;
    }

    @Override
    public void run(final String... args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.disableOperatorChaining();
        env.enableCheckpointing(6000L);
        // 配置checkpoint 超时时间
        env.getCheckpointConfig().setCheckpointTimeout(Duration.ofMinutes(60).toMillis());
        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 避免扫描快照超时
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(100);
        env.getCheckpointConfig().setCheckpointInterval(Duration.ofMinutes(10).toMillis());
        // 指定从 CK 自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 2000L));
        //设置状态后端
        env.setStateBackend(new HashMapStateBackend());

        DataStreamSource<String> pgDataStream = env.addSource(pgDataSource, "PostgreSQL-source")
                        .setParallelism(1);
        // sink到kafka
        pgDataStream.sinkTo(kafkaSink).name("sink2Kafka");

        env.execute("pg_cdc-kafka");
    }

}

四、遇到的问题与解决

1、pg配置没有修改,DBA说一般情况下都有改过wal_level,呵呵,一定要确认wal_level = logical是必须的。

2、Creation of replication slot failed …… FATAL:number of requested standby connections exceeds max_wal_senders (currently 10)

 求DBA大佬吧,需要改

3、Failed to start replication stream at LSN{0/1100AA50}; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.

很多文档理提供的创建数据源的代码里都只是指定了一个固定的slot.name 当你启动多个SpringBoot服务时,会报这个错误,我这个代码里直接用了UUID,其他能区分不同服务的也可以的。


        properties.setProperty("debezium.slot.name", "pg_cdc" + UUID.randomUUID());
        properties.setProperty("slot.name", "flink_slot" + UUID.randomUUID());

4、服务启动后一直在扫描快照数据,看日志,报了超时异常(异常找不到了,有空了造个再发出来)。

原因:(官网)During scanning snapshot of database tables, since there is no recoverable position, we can’t perform checkpoints. In order to not perform checkpoints, Postgres CDC source will keep the checkpoint waiting to timeout. The timeout checkpoint will be recognized as failed checkpoint, by default, this will trigger a failover for the Flink job. So if the database table is large, it is recommended to add following Flink configurations to avoid failover because of the timeout checkpoints:【Postgres CDC暂不支持在全表扫描阶段执行Checkpoint。如果您的作业在全表扫描阶段触发Checkpoint,则可能由于Checkpoint超时导致作业Failover。因此,建议您在作业开发页面高级配置的更多Flink配置中配置如下参数,避免在全量同步阶段由于Checkpoint超时导致Failover。】

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647

代码:

        // 避免扫描快照超时
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(100);
        env.getCheckpointConfig().setCheckpointInterval(Duration.ofMinutes(10).toMillis());

        // 指定从 CK 自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 2000L));

或者改超时时间配置

        // 配置checkpoint 超时时间
        env.getCheckpointConfig().setCheckpointTimeout(Duration.ofMinutes(600).toMillis());

没错,上面的时600分钟,其实对于我们的数据量(8千多万)60分钟这个配置还是不够的(单机),因此用了600分钟,但是,真正运行后报了另外的问题 OOM:Java heap space……

最后,直接关掉了快照数据的扫描

            properties.setProperty("debezium.snapshot.mode", "never");
            properties.setProperty("snapshot.mode", "never");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/336780.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PCB设计是不是该去除孤铜

你知道PCB设计是不是该去除孤铜? PCB设计的技巧需要注意很多问题&#xff0c;各个器件的兼容问题&#xff0c;以及成品问题等等都是需要考虑的重要因素。 我们今天的主题是PCB设计的时候是不是该去除孤铜的问题?有人说应该除去&#xff0c;原因大概是&#xff1a; 会造成EMI问…

学习深入理解JVM虚拟机及JavaGuide后的学习笔记

JVM虚拟机 一、JVM组成部分&#xff1a; 1.程序计数器 作用&#xff0c;是记住下一条JVM指令的内存地址&#xff1b;1.多线程情况下&#xff0c;程序计数器用于记录当前线程执行的位置&#xff0c;从而线程切换回来的时候能够知道线程上次运行到哪儿了。2.字节码解释器通过改变…

6 前缀、中缀、后缀表达式

文章目录1 前缀表达式 1. 1 缀表达式的计算机求值 2 中缀表达式3 后缀表达式 3. 1 后缀表达式的计算机求值 3. 2 中缀表达式转换为后缀表达式1 前缀表达式 前缀表达式又称波兰式&#xff0c;前缀表达式的运算符位于操作数之前 举例说明&#xff1a; (34)5-6 对应的前缀表达式就…

Orin + SC16IS752+SP3072 SPI转串口485

文章目录 1.前言2.修改过程2.1 sc16is752 芯片2.1.1引脚说明2.1.2 设备树配置2.2.1 源码分析3 调试1.前言 Orin 有四路串口,对于多数设备来说已经够用。 通过SPI 转串口再转RS485在Orin平台应该属于极个例,所以记录一下。 串口扩展芯片: SC16IS752RS485收发器: SP3072通信…

初次尝试-注册openai并使用chatGPT

1 环境 本次我打算在win11虚拟机下进行(不打算动真机的时区啦)。 2 科学上网 这里就不多介绍了&#xff0c;使用科学上网工具连接外网即可。由于软件可连接限制&#xff0c;我这里选择美国网络。 3 更改时区 这里的时区最好和上述的所连接的地区一致。 3 登录网站 1、…

HOMER docker版本安装详细流程

概述 HOMER是一款100%开源的针对SIP/VOIP/RTC的抓包工具和监控工具。 HOMER是一款强大的、运营商级、可扩展的数据包和事件捕获系统&#xff0c;是基于HEP/EEP协议的VoIP/RTC监控应用程序&#xff0c;并可以使用即时搜索、处理和存储大量的信令、RTC事件、日志和统计信息。 …

Word控件Spire.Doc 【Table】教程(13): 如何在 C# 中向现有的 word 表添加一行

Spire.Doc for .NET是一款专门对 Word 文档进行操作的 .NET 类库。在于帮助开发人员无需安装 Microsoft Word情况下&#xff0c;轻松快捷高效地创建、编辑、转换和打印 Microsoft Word 文档。拥有近10年专业开发经验Spire系列办公文档开发工具&#xff0c;专注于创建、编辑、转…

LwIP系列--软件定时器(超时处理)详解

一、目的在TCP/IP协议栈中ARP缓存的更新、IP数据包的重组、TCP的连接超时和超时重传等都需要超时处理模块&#xff08;软件定时器&#xff09;的参与。本篇主要介绍LwIP中超时处理的实现细节。上图为超时定时器链表&#xff0c;升序排序&#xff0c;其中next_timeout为链表头&a…

pyLoad远程代码执行漏洞(CVE-2023-0297)复现以及原理流量特征分析

声明&#xff1a; 请勿用于非法入侵&#xff0c;仅供学习。传送门 -》中华人民共和国网络安全法 文章目录声明&#xff1a;pyLoad介绍漏洞介绍影响版本不受影响版本漏洞原理漏洞环境搭建以及复现流量特征分析pyLoad介绍 pyLoad是一个用 Python 编写的免费和开源下载管理器&am…

计算GPS两个点之间的距离

参考&#xff1a;Https://blog.csdn.net/u011339749/article/details/125048180任意两点对应的经纬度A(lat0,long0),B(lat1,long1)则C(lat1,long0),D(lat0,long1)。通过A、B、C、D四个点可以确定一个四边形平面。同一纬度相互平行&#xff0c;可知连接ACBD四点构成了一个等腰梯…

干货|PCB板上的丝印位号与极性符号的组装性设计

PCB板上的字符很多&#xff0c;那么字符在后期起着那些非常重要的作用呢&#xff1f;一般常见的字符:“R”代表着电阻&#xff0c;"C”代表着电容&#xff0c;“RV”表示的是可调电阻&#xff0c;“L”表示的是电感&#xff0c;“Q”表示的是三极管&#xff0c;“D”表示的…

剑指Offer 第27天 JZ75 字符流中第一个不重复的字符

字符流中第一个不重复的字符_牛客题霸_牛客网 描述 请实现一个函数用来找出字符流中第一个只出现一次的字符。例如&#xff0c;当从字符流中只读出前两个字符 "go" 时&#xff0c;第一个只出现一次的字符是 "g" 。当从该字符流中读出前六个字符 “google&…

MDS75-16-ASEMI三相整流模块MDS75-16

编辑-Z MDS75-16在MDS封装里采用的6个芯片&#xff0c;是一款工业焊机专用大功率整流模块。MDS75-16的浪涌电流Ifsm为920A&#xff0c;漏电流(Ir)为5mA&#xff0c;其工作时耐温度范围为-40~150摄氏度。MDS75-16采用GPP硅芯片材质&#xff0c;里面有6颗芯片组成。MDS75-16的电…

ThreadPoolExecutor原理解析

1. 工作原理1.1 流程图1.2 执行示意图从上图得知如果当前运行的线程数小于corePoolSize(核心线程数)&#xff0c;则会创建新线程作为核心线程来执行任务(注意&#xff0c;执行这一步需要获取全局锁)。如果运行的线程等于或多于corePoolSize&#xff0c;则将任务加入BlockingQue…

C语言const的用法详解

有时候我们希望定义这样一种变量&#xff0c;它的值不能被改变&#xff0c;在整个作用域中都保持固定。例如&#xff0c;用一个变量来表示班级的最大人数&#xff0c;或者表示缓冲区的大小。为了满足这一要求&#xff0c;可以使用const关键字对变量加以限定&#xff1a;constin…

大型智慧校园系统源码 智慧班牌 智慧安防 家校互联 智慧校园小程序源码

一款针对中小学研发的智慧校园系统源码&#xff0c;智慧学校源码&#xff0c;系统有演示&#xff0c;可正常上线运营正版授权。 技术架构&#xff1a; 后端&#xff1a;Java 框架&#xff1a;springboot 前端页面&#xff1a;vue element-ui 小程序&#xff1a;小程序原生…

【CDP】CDP集群修改solr 存储路径 引发组件的ranger-audit 大量报错的解决方案

前言 我们生产上公司是使用的CDP集群&#xff0c;一次管理员通知&#xff0c;Solr 组件的数据存放路径磁盘空间不够。 我们的solr 组件时为 Ranger 服务提供日志审计功能&#xff0c; 在我们更改了磁盘路径&#xff0c;并重启了Solr 组件&#xff0c;然后发现相关组件&#…

立创eda专业版学习笔记(6)(pcb板移动节点)

先要看一个设置方面的东西&#xff1a; 进入设置-pcb-通用 我鼠标放到竖着的线上面&#xff0c;第一次点左键是这样选中的&#xff1a; 再点一次左键是这样选中的&#xff1a; 这个时候&#xff0c;把鼠标放到转角的地方&#xff0c;点右键&#xff0c;就会出现对于节点的选项…

关于VSCode安装go插件问题

比较常见的go开发编辑工具有VSCode、GoLand等&#xff0c;其中&#xff0c;使用VSCode需要下载相关的go语言插件。但是大多数情况都会下载失败&#xff0c;因为有些资源需要翻墙的原因&#xff0c;有时候翻墙了还是会报错。   本文将介绍一种帮助大家成功下载go插件的方法&am…

流水线使用(测试->构建->部署上线)

流水线介绍&#xff08;可直接查阅云效中流水线介绍&#xff09; 流水线在项目中的使用 1、选择我的流水线—>新建流水线 2、选择流水线模板&#xff08;可以根据需求选择不同模板&#xff09; 3、流水线配置 ①选择代码源&#xff1a;我目前展示的是直接使用codeup中的代码…