业务数据批量插入数据库实践

news2024/11/18 15:44:35

业务数据如何存储一直以来都是项目开发中的一个比较重要的话题。我们要从资源的利用率,业务场景和技术实现多个方面考虑存储的问题。“抛开业务谈技术就是耍流氓”,所有技术架构都要站在实际的业务场景中分析。比如个人端的产品,这种就属于读多写少的业务,对于这种系统的架构就要更多的考虑增加缓存来减少数据库读压力;B端的产品,基本上属于写多读少的业务,客户数量不多但是单个客户的qps会非常高,要减少这种业务对数据库的压力,数据写入一般要采用批处理。物联网类型的产品,基本上也是属于写多读少的场景,一个设备会有几百上千的测点数据上报,并且上报的频率非常均匀,对于这种类型的系统我们要保证数据能够及时写入数据库并且不能对数据库产生太大的压力,一般我们就采用批量写库方案。下面就针对这种写多读少的物联网系统设计方案分享一下个人的经验。
我最近在做的一个系统是接收储能系统上报的数据,设备每10s上报一轮数据,云端接收到数据后做一些基本的处理判断逻辑,就将数据存入数据库。业务端展示数据分为历史数据和实时数据,历史数据用于其他系统或业务部门分析,实时数据需要做一些预警和最新数据展示。所以我设计的系统机构图大致如下:
系统架构图
系统使用kafka消息队列将接收数据和处理数据的两个服务进行解耦,在这个系统中,最为核心的服务就是数据服务,它要负责数据的处理、入库、查询等多个功能。在批量写数据库时我们要考虑两个方面:一个是批量插入数据多少条比较合适,另外一个就是如果数据积累比较缓慢最长等待多久就要执行一次批量写。
综合以上的分析,我设计的批量写入数据库采用 缓存队列+定时任务 方式实现,这样既能满足批量写入数据库的要求,又能满足及时更新数据库。
既然数据有缓存,就存在缓存数据丢失的风险,规避数据丢失的问题,行业内采用比较多的方案就是WAL(Write-Ahead Logging)。对于我的系统就是数据服务从kafka消费到数据后先写本地日志,再将数据放入缓存队列,然后通知kafka消费数据成功;数据在缓存队列中积累到批量写入数据库的条数或到达一定时间后就插入数据库。在这个过程中如果服务宕机导致缓存队列的数据没有入库,我们也可以通过本地日志找回数据,再次启动服务后重新加载插入数据库。
对于日志中的数据,会有一个标识标记当前已经入库的位置,当缓存队列的数据写入数据库后,更新日志标记点,标记点之前的数据都是写入数据库成功的,标记点之后的数据是目前还在缓存中并未写入数据库。如果程序正常退出,会有一个钩子函数清理缓存队列,将队列中的所有数据都写入数据库,清理标记点;如果程序异常退出,那么标记点之后的数据都是不安全的,在程序下一次正常启动时检查这个标记点,将标记点之后的数据重新加载进程序并写入数据库。这里的标记点通过在项目部署目录下创建一个文件来实现:程序每次写入数据库后更新这个标记点并将它写入文件,程序正常退出就通过钩子函数删除这个文件。下次程序启动后检查文件是否存在:不存在就是正常退出不用做任何处理;存在就是异常退出,这时就要读取标记点的值,把它之后的所有数据重新写入数据库。
通过上面的分析,整个写入数据的流程图如下:
处理数据业务流程
对于标记点的设计结构如下:
标记点结构
由于设备上报的数据有多种类型,那么缓存队列就存在多个,每个队列对应的标记点就有多个,这里选择数据存入缓存队列的时间戳作为标记,占用4个字节,每次写入数据库成功后就将这个时间戳写入到文件,多个缓存队列就依次向后写入文件,为了提高写文件性能,使用mmap将文件映射到内存中,通过修改内存中的值达到写文件的目的。

以上分析了整个项目的架构和实现逻辑,接下来就通过代码实现上面的这些逻辑。
talk is cheap, show me the code

一、项目结构:

使用SpringBoot3.2 + kafka3.6 + jdk17。
maven依赖如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>provided</scope>
    <version>1.18.30</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<!-- redis相关依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>
<!-- 数据库相关依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.5.5</version>
    <exclusions>
        <exclusion>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis-spring</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>3.0.3</version>
</dependency>
<dependency>
    <groupId>com.mysql</groupId>
    <artifactId>mysql-connector-j</artifactId>
    <version>8.0.33</version>
</dependency>

二、日志记录实现

写日志部分,就不单独实现了,采用logback框架写日志,单个日志文件大小设置为1GB,日志滚动策略采用大小+日期方式。配置文件在resource目录下的 logback-spring.xml 文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml" />
    <property name="LOG_PATH" value="logs"/>
    <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS}|%t|%-5p|%c|%m%n"/>

    <!-- 业务日志:记录系统接收到的数据,用于数据丢失处理 -->
    <appender name="redo" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_PATH}/redo.log</file>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|%m%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <FileNamePattern>${LOG_PATH}/redo-%i.log.%d{yyyy-MM-dd}</FileNamePattern>
            <maxHistory>7</maxHistory>
            <maxFileSize>1GB</maxFileSize>
            <totalSizeCap>40GB</totalSizeCap>
        </rollingPolicy>
    </appender>

    <logger name="redo" level="INFO" additivity="false">
        <appender-ref ref="redo"/>
    </logger>

    <!-- 控制台 -->
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|%thread|[%-5level]|%logger{36}.%method|%msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <!-- 错误日志:输出所有错误日志信息 -->
    <appender name="FILE-ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_PATH}/error/error.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${LOG_PATH}/error/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
            <maxHistory>30</maxHistory>
            <maxFileSize>500MB</maxFileSize>
            <totalSizeCap>10GB</totalSizeCap>
        </rollingPolicy>
        <encoder>
            <pattern>${FILE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- WARN日志:输出所有WARN日志信息 -->
    <appender name="FILE-WARN" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_PATH}/warn/warn.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${LOG_PATH}/warn/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
            <maxHistory>30</maxHistory>
            <maxFileSize>500MB</maxFileSize>
            <totalSizeCap>10GB</totalSizeCap>
        </rollingPolicy>
        <encoder>
            <pattern>${FILE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>WARN</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 消息日志:输出所有消息日志信息 -->
    <appender name="FILE-INFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_PATH}/info/info.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${LOG_PATH}/info/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
            <maxHistory>30</maxHistory>
            <maxFileSize>500MB</maxFileSize>
            <totalSizeCap>10GB</totalSizeCap>
        </rollingPolicy>
        <encoder>
            <pattern>${FILE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>INFO</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 调试日志:输出所有消息日志信息 -->
    <appender name="FILE-DEBUG" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_PATH}/debug/debug.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${LOG_PATH}/debug/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
            <maxHistory>30</maxHistory>
            <maxFileSize>500MB</maxFileSize>
            <totalSizeCap>10GB</totalSizeCap>
        </rollingPolicy>
        <encoder>
            <pattern>${FILE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>DEBUG</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
        <appender-ref ref="FILE-ERROR"/>
        <appender-ref ref="FILE-WARN"/>
        <appender-ref ref="FILE-INFO"/>
        <appender-ref ref="FILE-DEBUG"/>
    </root>
</configuration>

日志格式为: 日志时间|消息主题|消息内容 ,消息主题就是kafka的主题,时间戳是接收到消息的时间戳,这个时间戳用于异常退出时下次启动服务恢复消息时判断数据是在标记点前后的依据,消息内容就是具体消费的信息。示例数据如下:

2024-09-23 15:03:19.378|batch-message|{"data":"{\"id\":\"1727074999121\",\"code\":\"code1--1\",\"name\":\"name1--1\",\"random\":\"0eaf04363d5a49419967d500dc149e5a\",\"createTime\":\"2024-09-23 15:03:19\"}","dataType":"Work1Data"}
2024-09-23 15:03:19.734|batch-message|{"data":"{\"id\":\"1727074999359\",\"code\":\"code2--1\",\"name\":\"name2--1\",\"random\":\"89b8b315f4e84fedb5af799ce4860c56\",\"createTime\":\"2024-09-23 15:03:19\"}","dataType":"Work2Data"}

三、消费KAFKA数据

数据是通过消费kafka队列接收消息,如果消息被成功接收并且写入本地日志和缓存队列,那么就提交ack给kafka,实现代码如下:

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.xingo.common.JacksonUtils;
import org.xingo.consumer.batch.BatchWorkFactory;
import org.xingo.consumer.batch.impl.BatchWork1;
import org.xingo.consumer.batch.impl.BatchWork2;
import org.xingo.domain.Work1Data;
import org.xingo.domain.Work2Data;

import java.util.List;

/**
 * @Author xingo
 * @Date 2024/9/13
 */
@Slf4j
@Component
public class KafkaConsumer {

    Logger redoLogger = LoggerFactory.getLogger("redo");

    @Autowired
    private BatchWorkFactory batchWorkFactory;

    /**
     * 数据消费者
     */
    @KafkaListener(topics = "batch-message")
    public void linsten01(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        if(records != null) {
            for (ConsumerRecord<String, String> record : records) {
                // 写本地日志
                long ts = System.currentTimeMillis();
                System.out.println(record.value());
                redoLogger.info("{}|{}", record.topic(), record.value());
            
                // 解析消息内容,保存到本地缓存队列
                try {
                    JsonNode json = JacksonUtils.getObjectMapper().readTree(record.value());
                    String dataType = json.get("dataType").asText();
                    if("Work1Data".equals(dataType)) {
                        Work1Data work1Data = JacksonUtils.parseObject(json.get("data").asText(), Work1Data.class);
                        work1Data.setTs(ts);
                        batchWorkFactory.get(BatchWork1.class).add(work1Data);
                    } else if("Work2Data".equals(dataType)) {
                        Work2Data work2Data = JacksonUtils.parseObject(json.get("data").asText(), Work2Data.class);
                        work2Data.setTs(ts);
                        batchWorkFactory.get(BatchWork2.class).add(work2Data);
                    }
                } catch (JsonProcessingException e) {
                    log.error("解析数据异常", e);
                }
            }

            //提交offset消费成功
            ack.acknowledge();
        }
    }
}

这里面json反序列化使用到了Jackson工具类:

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;

import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Locale;
import java.util.TimeZone;

/**
 * json工具
 *
 * @Author xingo
 * @Date 2023/12/15
 */
public class JacksonUtils {

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    static {
        // Long类型处理,避免前端处理长整型时精度丢失
        SimpleModule module1 = new SimpleModule();
        module1.addSerializer(Long.class, ToStringSerializer.instance);
        module1.addSerializer(Long.TYPE, ToStringSerializer.instance);

        JavaTimeModule module2 = new JavaTimeModule();
        // java8日期处理
        module2.addSerializer(LocalDateTime.class,
                new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        module2.addSerializer(LocalDate.class,
                new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
        module2.addSerializer(LocalTime.class,
                new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
        module2.addDeserializer(LocalDateTime.class,
                new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        module2.addDeserializer(LocalDate.class,
                new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
        module2.addDeserializer(LocalTime.class,
                new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));

        OBJECT_MAPPER
                // 添加modules
                .registerModules(module1, module2, new Jdk8Module())
                // 日期类型不转换为时间戳
                .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
                .configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false)
                // 反序列化的时候如果多了其他属性,不抛出异常
                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
                // 如果是空对象的时候,不抛异常
                .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
                // 空对象不序列化
                .setSerializationInclusion(JsonInclude.Include.NON_NULL)
                // 日期格式化
                .setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"))
                // 设置时区
                .setTimeZone(TimeZone.getTimeZone("GMT+8"))
                // 驼峰转下划线
                // .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
                // 语言
                .setLocale(Locale.SIMPLIFIED_CHINESE);
    }

    /**
     * 反序列化
     * @param json      json字符串
     * @param clazz     发序列化类型
     * @return
     * @param <T>
     */
    public static <T> T parseObject(String json, Class<T> clazz) {
        if(json == null) {
            return null;
        }
        try {
            return OBJECT_MAPPER.readValue(json, clazz);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static <T> T parseObject(String json, TypeReference<T> type) {
        if(json == null) {
            return null;
        }
        try {
            return OBJECT_MAPPER.readValue(json, type);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static <T> T parseObject(byte[] bytes, TypeReference<T> type) {
        if(bytes == null) {
            return null;
        }
        try {
            return OBJECT_MAPPER.readValue(bytes, type);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 反序列化列表
     * @param json
     * @return
     * @param <T>
     */
    public static <T> List<T> parseArray(String json) {
        if(json == null) {
            return null;
        }
        try {
            TypeReference<List<T>> type = new TypeReference<List<T>>(){};
            return OBJECT_MAPPER.readValue(json, type);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 写为json串
     * @param obj   对象
     * @return
     */
    public static String toJSONString(Object obj) {
        if(obj == null) {
            return null;
        }
        try {
            return OBJECT_MAPPER.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static byte[] toJSONBytes(Object obj) {
        if(obj == null) {
            return null;
        }
        try {
            return OBJECT_MAPPER.writeValueAsBytes(obj);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 获取jackson对象
     * @return
     */
    public static ObjectMapper getObjectMapper() {
        return OBJECT_MAPPER;
    }
}

四、数据缓存队列

缓存数据的队列有两种数据结构可选:数组和链表,链表适用于数据量不定的场景,随机增减节点非常方便;数组长度一定,随机增减性能比较低,对于顺序读写性能比链表要好。这里的缓存主要是尾部添加,一次读取入库的场景,综合来看,数组性能会好一些,由于缓存数据条数不一定,只有最大长度限制,选择ArrayList这个结构非常方便。至于并发问题,在读写时加锁处理。
对于数据缓存这部分逻辑,我们定义一个抽象类叫BatchWork,它主要用于公共处理逻辑的抽取,对于不同数据结构的对象,只需要继承这个父类,并且实现该对象批量处理数据的逻辑即可。
批处理抽象父类定义如下:

import lombok.extern.slf4j.Slf4j;
import org.xingo.domain.WorkData;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 批处理任务父类
 * 
 * @Author xingo
 * @Date 2024/9/13
 */
@Slf4j
public abstract class BatchWork<T extends WorkData> {

    /**
     * 最大时间间隔,超过这个时间间隔还没有执行数据批量插入就执行定时任务
     */
    public static final int DIFF_TIMESTAMP = 30_000;
    /**
     * 执行时间间隔:30s
     */
    public static final int period = 30;    // 执行的间隔时间
    /**
     * 批量插入数据最大条数
     */
    protected short batchSize = 1000;

    /**
     * 最近一次执行数据插入时间戳
     */
    protected long lastTimestamp = 0L;

    /**
     * 任务名称
     */
    protected BatchWorkEnum batchWork;

    /**
     * 系统操作线程池:构建一个核心池大小是8、线程池最大线程数是16的线程池,可以根据主机CPU核数进行调整
     */
    public static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8, 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(8), new ThreadFactory() {

        private final AtomicInteger number = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("batch-savedb-" + number.getAndIncrement());

            return thread;
        }
    });

    /**
     * 定时任务线程池:定时检查缓存队列,通过判断缓存队列最后一次执行时间来判断是否要将缓存队列数据插入数据库表中
     */
    private static final ScheduledExecutorService taskExecutor = Executors.newScheduledThreadPool(16, new ThreadFactory() {

        private final AtomicInteger number = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("batch-work-" + number.getAndIncrement());

            return thread;
        }
    });

    /**
     * 缓存数据列表
     */
    private List<T> datas = new ArrayList<>(batchSize);

    /**
     * 锁对象
     */
    private final ReentrantLock lock = new ReentrantLock();

    /**
     * 启动定时执行任务方法,在项目启动时启动
     */
    public void run() {
        if(batchWork == null) {
            throw new RuntimeException("属性 [batchWork] 不能为空");
        }

        // 最后更新时间与当前时间差大于时间间隔,将执行一次批处理任务
        Runnable task = () -> {
            long diff = System.currentTimeMillis() - lastTimestamp;
            if(diff >= DIFF_TIMESTAMP) {
                batch();
            }
        };

        Random random = new Random();
        long initialDelay = random.nextInt(10, 30);     // 初始延迟时间
        // 为了让任务尽可能均匀分布,所有要批量处理的任务初始时间随机生成
        taskExecutor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

        BatchWorkFactory.WORK_NUMS.incrementAndGet();
        log.info("{}批量任务开始执行", batchWork);
    }

    /**
     * 服务关闭时执行
     */
    public void clear() {
        log.info("项目关闭开始清理{}批量任务", batchWork);
        this.batch();
        BatchWorkFactory.WORK_NUMS.decrementAndGet();
        log.info("{}批量任务已结束", batchWork);
    }

    /**
     * 添加数据方法:将要插入数据添加到缓存队列
     * @param data
     */
    public void add(T data) {
        lock.lock();
        try {
            datas.add(data);
        } finally {
            lock.unlock();
        }
        this.cache(data);
        // 添加数据后判断缓存数据条数,如果缓存条数超过了批处理阈值,执行一次批处理,结合定时任务就实现了“最大缓存条数或时间间隔”内插入数据的逻辑
        if(datas.size() >= batchSize) {
            batch();
        }
    }

    /**
     * 批量新增数据:将一批数据添加到缓存队列
     * @param batch
     */
    public void batchAdd(List<T> batch) {
        lock.lock();
        try {
            datas.addAll(batch);
        } finally {
            lock.unlock();
        }
        if(datas.size() >= batchSize) {
            batch();
        }
    }

    /**
     * 批量处理方法:具体子类要实现这个方法
     */
    public abstract void batchInsertDb(List<T> datas);

    /**
     * 数据缓存:缓存最新一条数据
     * @param data
     */
    public abstract void cache(T data);

    /**
     * 批处理方法:
     * 将当前缓存的批处理数据赋值给其他对象,当前缓存列表重新申请一个空间接收数据
     */
    private void batch() {
        log.info("调用{}批量处理方法", batchWork);
        if(!datas.isEmpty()) {
            List<T> copyList = null;
            lock.lock();
            try {
                if(!datas.isEmpty()) {
                    // 定义一个局部变量指向原有的数组,原有的数组重新申请空间用于接收数据
                    copyList = datas;
                    datas = new ArrayList<>(batchSize);
                }
            } finally {
                lock.unlock();
            }
            if(copyList != null && !copyList.isEmpty()) {
                final List<T> dbList = copyList;
                // 插入数据是比较耗时的过程,这里放入异步线程池慢慢执行,主线程继续接收数据
                threadPool.execute(() -> {
                    log.info("{}写入数据库开始|{}", batchWork, dbList.size());
                    long s = System.currentTimeMillis();
                    this.batchInsertDb(dbList);
                    lastTimestamp = System.currentTimeMillis();
                    long _ts = dbList.get(dbList.size() - 1).getTs();
                    log.info("{}写入数据库完成|{}|{}|{}ms", batchWork, dbList.size(), new Timestamp(_ts), (lastTimestamp - s));
                    // 插入数据完成后,更新标记点
                    MarkPointLog.markPoint(batchWork, _ts);
                });
            }
        }
    }

}

demo项目模拟了两类数据批处理:
批处理任务1:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.xingo.consumer.batch.BatchWork;
import org.xingo.consumer.batch.BatchWorkEnum;
import org.xingo.domain.Work1Data;
import org.xingo.common.JacksonUtils;
import org.xingo.mapper.Work1Mapper;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * 批处理子类1
 * 
 * @Author xingo
 * @Date 2024/9/13
 */
@Slf4j
@Service
public class BatchWork1 extends BatchWork<Work1Data> {

    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private Work1Mapper work1Mapper;

    public BatchWork1() {
        this.batchWork = BatchWorkEnum.Work1;
    }

    @Override
    public void batchInsertDb(List<Work1Data> datas) {
        try {
            work1Mapper.batchInsert(datas);
        } catch (Exception e) {
            log.error("批量插入数据异常", e);
            // 这里是批量插入数据如果异常,转为单条插入数据,一般的异常都是数据库主键冲突导致的
            datas.forEach(data -> {
                try {
                    work1Mapper.insert(data);
                } catch (Exception ex) {
                    log.error("单条插入数据异常", ex);
                }
            });
        }
        log.info("批量处理数据|{}|{}", this.batchWork, datas.size());
    }

    @Override
    public void cache(Work1Data data) {
        // 缓存最新一条数据供其他业务使用
        redisTemplate.opsForValue().set("data:work1", JacksonUtils.toJSONString(data), 30, TimeUnit.SECONDS);
    }
}

批处理任务2:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.xingo.consumer.batch.BatchWork;
import org.xingo.consumer.batch.BatchWorkEnum;
import org.xingo.common.JacksonUtils;
import org.xingo.domain.Work2Data;
import org.xingo.mapper.Work2Mapper;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * 批处理子类2
 * 
 * @Author xingo
 * @Date 2024/9/13
 */
@Slf4j
@Service
public class BatchWork2 extends BatchWork<Work2Data> {

    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private Work2Mapper work2Mapper;

    public BatchWork2() {
        this.batchWork = BatchWorkEnum.Work2;
    }

    @Override
    public void batchInsertDb(List<Work2Data> datas) {
        try {
            work2Mapper.batchInsert(datas);
        } catch (Exception e) {
            log.error("批量插入数据异常", e);
            datas.forEach(data -> {
                try {
                    work2Mapper.insert(data);
                } catch (Exception ex) {
                    log.error("单条插入数据异常", ex);
                }
            });
        }
        log.info("批量处理数据|{}|{}", this.batchWork, datas.size());
    }

    @Override
    public void cache(Work2Data data) {
        redisTemplate.opsForValue().set("data:work2", JacksonUtils.toJSONString(data), 30, TimeUnit.SECONDS);
    }
}

这里只有两个批处理任务,如果批处理任务有更多个,那么就需要一个统一的地方管理这些任务:
定义一个工厂类管理批量任务,创建、销毁、获取这些批量任务都通过这个工厂类来实现:

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.core.type.filter.AssignableTypeFilter;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 批量任务工厂
 *
 * @Author xingo
 * @Date 2024/9/13
 */
@Slf4j
@Component
public class BatchWorkFactory {

    /**
     * 批量任务集合
     */
    private final Map<Class<? extends BatchWork>, BatchWork> map = new HashMap<>();

    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private MarkPointLog markPointLog;

    /**
     * 工厂初始化是否完成
     */
    private boolean ok = false;

    /**
     * 任务启动完成
     */
    public static final AtomicInteger WORK_NUMS = new AtomicInteger(0);

    @PostConstruct
    public void run() {
        // 构建bean缓存:通过spring提供的类扫描目录下的所有子类加载到集合中做统一管理
        ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false);
        provider.addIncludeFilter(new AssignableTypeFilter(BatchWork.class));
        Set<BeanDefinition> components = provider.findCandidateComponents("org/xingo/consumer/batch/impl");
        for (BeanDefinition component : components) {
            try {
                Class<? extends BatchWork> clazz = (Class<? extends BatchWork>) Class.forName(component.getBeanClassName());
                BatchWork bean = applicationContext.getBean(clazz);
                map.put(clazz, bean);
                log.info("初始化加载类实例信息|{}|{}", component.getBeanClassName(), bean);
            } catch (Exception e) {
                log.error("加载类异常", e);
            }
        }

        // 初始标记服务
        markPointLog.initMarkPointLog();

        // 批处理任务启动方法
        map.values().forEach(BatchWork::run);
        try {
            // 标记任务启动成功
            while (WORK_NUMS.get() != map.size()) {
                TimeUnit.MICROSECONDS.sleep(5);
            }
            ok = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("批处理任务全部启动");
    }

    /**
     * 正常关闭处理
     */
    @PreDestroy
    public void clear() {
        try {
            // 批处理任务关闭方法
            map.values().forEach(BatchWork::clear);

            // 等待批量任务全部处理完成
            while (WORK_NUMS.get() != 0) {
                TimeUnit.SECONDS.sleep(1);
                log.info("等待批处理任务结束,剩余{}个任务关闭", WORK_NUMS.get());
            }

            // 清理标记文件
            markPointLog.destroyMarkPointLog();

            log.info("批处理任务全部关闭");
        } catch (Exception e) {
            log.error("清理本地批处理任务异常", e);
        }
    }

    /**
     * 获取批量执行任务类
     * @param workName
     * @return
     */
    public BatchWork get(Class<? extends BatchWork> workName) {
        if(ok) {
            return map.get(workName);
        } else {
            int cnt = 0;
            while (!ok) {
                try {
                    TimeUnit.MILLISECONDS.sleep(3);
                    if(cnt++ >= 10000) {
                        break;
                    }
                } catch (Exception e) {
                    log.error("获取批处理任务异常", e);
                }
            }
            return map.get(workName);
        }
    }

}

在BathWork抽象类中,数据批量插入数据库后要记录下当前插入数据的时间点工具类:MarkPointLog:

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.math.BigDecimal;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 数据标记点,每次插入数据后就要更新标记点
 * 
 * @Author xingo
 * @Date 2024/9/13
 */
@Slf4j
@Component
public class MarkPointLog {

    @Value("${user.dir}")
    private String dirPath;
    @Autowired
    private LoadDataService loadDataService;

    /**
     * 标记服务正常关闭文件路径:
     * 服务正常关闭时会将文件删除,
     * 下次启动时如果文件存在,表示上次是异常关闭的,那么就会处理历史消息重新发布一次处理
     */
    private String filePath = null;
    /**
     * 写文件
     */
    private static MappedByteBuffer buffer = null;

    /**
     * 锁对象
     */
    private static final ReentrantLock lock = new ReentrantLock();

    /**
     * 初始化
     */
    public void initMarkPointLog() {
        filePath = dirPath.endsWith(File.separator) ? (dirPath + "work") : (dirPath + File.separator + "work");
        File file = new File(filePath);
        if(file.exists()) {
            RandomAccessFile rw = null;
            try {
                long minTs = System.currentTimeMillis();
                rw = new RandomAccessFile(file, "r");
                SimpleDateFormat datetimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                for (BatchWorkEnum mark : BatchWorkEnum.values()) {
                    long ts = rw.readLong();
                    if(ts < minTs) {
                        minTs = ts;
                    }
                    log.error("系统异常关闭最后提交数据时间戳|{}|{}|{}", mark, ts, datetimeFormat.format(ts));
                }

                final long min = minTs;
                new Thread(() -> loadDataService.loadData(min), "loadLogWork").start();
            } catch (Exception e) {
                log.error("读取状态文件异常", e);
            } finally {
                if(rw != null) {
                    try {
                        rw.close();
                    } catch (IOException e) {
                        log.error("读取状态文件异常", e);
                    }
                }
            }
        }

        // 映射运行状态文件
        FileChannel channel = null;
        try {
            channel = new RandomAccessFile(file, "rw").getChannel();
            int size = BatchWorkEnum.values().length * 8;
            buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size);
            buffer.putLong(0);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 清理标记点文件
     */
    public void destroyMarkPointLog() {
        if(filePath != null) {
            File file = new File(filePath);
            if(file.exists()) {
                boolean rs = file.delete();
                log.info("删除标识文件完成|{}|{}", filePath, rs);
            }
        }
    }

    /**
     * 最大标记点
     * @param workEnum
     * @param ts
     */
    public static void markPoint(BatchWorkEnum workEnum, long ts) {
        lock.lock();
        try {
            int position = workEnum.getIdx() * 8;
            buffer.position(position);
            buffer.putLong(ts);
            buffer.force();
        } finally {
            lock.unlock();
        }
    }

}

当数据在缓存队列中还未插入数据库之前系统宕机了,这就存在数据丢失风险,那么在系统启动后就要重新消费这部分数据:

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;

/**
 * @Author xingo
 * @Date 2024/9/23
 */
@Slf4j
@Component
public class LoadDataService {

    @Value("${user.dir}")
    private String dirPath;
    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 加载数据到系统等待再次入库处理
     *
     * @param timestamp
     */
    public void loadData(long timestamp) {
        // 最小时间再减去一个时间间隔,认为这段时间内的数据都是不安全的
        long startTimestamp = timestamp - BatchWork.DIFF_TIMESTAMP;
        long endTimestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(5);

        String logPath = dirPath + File.separator + "logs";
        File kafkaLogs = new File(logPath);
        File file1 = null, file2 = null;
        String yesterday = LocalDate.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));

        // 查找当前的日志文件
        if(kafkaLogs.exists() && kafkaLogs.isDirectory()) {
            File[] files = kafkaLogs.listFiles();
            if(files != null) {
                for(File file : files) {
                    String fname = file.getName();
                    if(fname.equals("redo.log")) {
                        log.info("匹配日志文件|{}", fname);
                        file1 = file;
                    } else if(fname.endsWith(yesterday)) {
                        log.info("匹配历史日志文件|{}", fname);
                        file2 = file;
                    }
                }
            }
        }
        if(file1 != null) {
            sendDataToQueue(file1, startTimestamp, endTimestamp);
        }
        if(file2 != null) {
            sendDataToQueue(file2, startTimestamp, endTimestamp);
        }
    }

    /**
     * 一行数据最大长度,用于数据查找时回退
     */
    private int LINE_MAX_SIZE = 1000;

    /**
     * 发送数据到消息
     * @param file
     * @param startTimestamp
     * @param endTimestamp
     */
    private void sendDataToQueue(File file, long startTimestamp, long endTimestamp) {
        // 查找数据:二分法查找要加载的数据
        RandomAccessFile raf = null;
        SimpleDateFormat datetimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        try {
            raf = new RandomAccessFile(file, "rw");
            long start = 0L;
            long end = file.length();
            long point = 0L;
            String line = null;
            // 2024-09-23 10:20:25.387|batch-message|{"data":{"id":"1727058025075","code":"code1--1727058025075","name":"name1--1727058025075","random":"bdfa8ca9d5084c25b9267f34e7bff6cf","createTime":"2024-09-23 10:20:25"},"dataType":"Work1Data"}
            boolean find = false;
            while (true) {
                if(end - start < 60) {
                    break;
                }
                point = (start + end) / 2;
                raf.seek(Math.max(0, point - LINE_MAX_SIZE));
                line = raf.readLine();
                if (StringUtils.isBlank(line)) {
                    break;
                }
                String[] arr = line.split("\\|");
                if (arr.length != 3) {
                    line = raf.readLine();
                    if (line == null || "".equals(line)) {
                        break;
                    }
                    arr = line.split("\\|");
                } else {
                    try {
                        datetimeFormat.parse(arr[0]);
                    } catch (ParseException e) {
                        line = raf.readLine();
                        if (line == null || "".equals(line)) {
                            break;
                        }
                        arr = line.split("\\|");
                    }
                }

                long time = datetimeFormat.parse(arr[0]).getTime();

                if (time < startTimestamp) {
                    start = point;
                } else {
                    find = true;
                    break;
                }
            }

            if (find) {
                raf.seek(start);
                line = raf.readLine();
                String[] arr = line.split("\\|");
                if (arr.length != 3) {
                    line = raf.readLine();
                } else {
                    try {
                        datetimeFormat.parse(arr[0]);
                    } catch (ParseException e) {
                        line = raf.readLine();
                    }
                }

                while (StringUtils.isNotBlank(line)) {
                    arr = line.split("\\|");
                    long time = datetimeFormat.parse(arr[0]).getTime();
                    if(time > endTimestamp) {
                        break;
                    }
                    // 这里将可能丢失的数据重新发送到消息队列再次消费持久化
                    if(time >= startTimestamp) {
                        kafkaTemplate.send(arr[1], arr[2]);
                        log.info("加载可能丢失的数据|{}", arr[2]);
                    }

                    line = raf.readLine();
                }
            }
        } catch (Exception e) {
            log.error("读取文件异常", e);
        } finally {
            try {
                if(raf != null) {
                    raf.close();
                }
            } catch (IOException e) {
                log.error("关闭文件异常", e);
            }
        }
    }
}

是否加载数据主要是根据项目目录下释放存在一个标记文件:work。这个文件在项目启动时创建,在项目正常关闭时删除这个文件。在数据批量插入成功后会把标记点也写入到这个文件中,由于存在多个缓存队列,每个缓存队列需要写入到这个文件的不同位置,这里需要定义一个枚举类记录每个缓存队列写入到这个文件的位置:

import org.xingo.consumer.batch.impl.BatchWork1;
import org.xingo.consumer.batch.impl.BatchWork2;

/**
 * 批量任务名
 *
 * @Author xingo
 * @Date 2024/9/13
 */
public enum BatchWorkEnum {

    /**
     * 批量任务1
     */
    Work1((byte) 0, BatchWork1.class),

    /**
     * 批量任务2
     */
    Work2((byte) 1, BatchWork2.class),


    ;

    /**
     * 批量任务队列的序列号,是一个从0开始递增的值;
     * 它主要用于批量数据入库后最新数据时间点记录文件查找和写入时使用
     * 如果新增缓存队列这个序号递增,不能更改已经存在的序号
     * 如果要调整或删除已有的序号,要保证系统正常退出后再操作,否则会有数据丢失风险
     */
    private byte idx;

    /**
     * 批量任务对应的类
     */
    private Class<? extends BatchWork> work;

    BatchWorkEnum(byte idx, Class<? extends BatchWork> work) {
        this.idx = idx;
        this.work = work;
    }

    public byte getIdx() {
        return idx;
    }

    public Class<? extends BatchWork> getWork() {
        return work;
    }
}

五、数据对象处理

上面这些逻辑都完成了,最后一步就是数据持久化,我这里使用mysql数据库做持久化测试,数据对象和持久化代码如下:
统一的父类:

import java.io.Serializable;

/**
 * @Author xingo
 * @Date 2024/9/13
 */
public class WorkData implements Serializable {

    /**
     * 数据放入缓存队列时间
     */
    protected long ts;

    public Long getTs() {
        return ts;
    }

    public void setTs(Long ts) {
        this.ts = ts;
    }
}

两个实体类:

实体类1:

import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.time.LocalDateTime;

/**
 * @Author xingo
 * @Date 2024/9/13
 */
@Data
@TableName("t_work1")
public class Work1Data extends WorkData {

    private Long id;

    private String code;

    private String name;

    private String random;

    private LocalDateTime createTime;
}

实体类2:

import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.time.LocalDateTime;

/**
 * @Author xingo
 * @Date 2024/9/13
 */
@Data
@TableName("t_work2")
public class Work2Data extends WorkData {

    private Long id;

    private String code;

    private String name;

    private String random;

    private LocalDateTime createTime;
}

xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.xingo.mapper.Work1Mapper">

    <insert id="batchInsert" parameterType="hashmap">
        INSERT INTO t_work1 (id, code, name, random, ts, create_time) VALUES
        <foreach collection="datas" item="item" index="index" separator=",">
            (#{item.id}, #{item.code}, #{item.name}, #{item.random}, #{item.ts}, #{item.createTime})
        </foreach>
    </insert>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.xingo.mapper.Work2Mapper">

    <insert id="batchInsert" parameterType="hashmap">
        INSERT INTO t_work2 (id, code, name, random, ts, create_time) VALUES
        <foreach collection="datas" item="item" index="index" separator=",">
            (#{item.id}, #{item.code}, #{item.name}, #{item.random}, #{item.ts}, #{item.createTime})
        </foreach>
    </insert>
</mapper>

Mapper接口:

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.xingo.domain.Work1Data;

import java.util.List;

/**
 * @Author xingo
 * @Date 2024/9/23
 */
@Mapper
public interface Work1Mapper extends BaseMapper<Work1Data> {

    void batchInsert(List<Work1Data> datas);
}
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.xingo.domain.Work2Data;

import java.util.List;

/**
 * @Author xingo
 * @Date 2024/9/23
 */
@Mapper
public interface Work2Mapper extends BaseMapper<Work2Data> {

    void batchInsert(List<Work2Data> datas);
}

数据库建表SQL语句:

CREATE TABLE `t_work1`  (
  `id` bigint NOT NULL,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `name` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `random` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `create_time` datetime NULL DEFAULT NULL,
  `ts` bigint NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

CREATE TABLE `t_work2`  (
  `id` bigint NOT NULL,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `name` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `random` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `create_time` datetime NULL DEFAULT NULL,
  `ts` bigint NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

六、测试验证

  1. 正常情况:验证正常插入情况非常简单,编写一个测试方法不断的向kafka的主题中写入数据,观察数据消费和入库情况,通过对比发送数据和接收数据是否存在差异来确定服务是否有问题。
  2. 正常退出:在正常运行情况下,对批量处理服务执行 kill -15 关闭程序,正常退出情况下项目根目录下的 work 文件会被清除,那么下次程序启动时就不需要重新加载日志数据。
  3. 异常退出:在正常运行情况下对批量处理服务执行kill -9 命令杀死进程,观察项目根目录下的 work 文件还存在,并没有被清除,这就表明程序在上次退出时是异常退出的,那么在程序再次启动时会重新加载日志数据,避免程序异常导致数据丢失的风险。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2162257.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

代码随想录算法训练营Day7 | 454.四数相加Ⅱ、383.赎金信、15.三数之和、18.四数之和

454.四数相加Ⅱ 题目 454. 四数相加 II - 力扣&#xff08;LeetCode&#xff09; 给你四个整数数组 nums1、nums2、nums3 和 nums4 &#xff0c;数组长度都是 n &#xff0c;请你计算有多少个元组 (i, j, k, l) 能满足&#xff1a; 0 < i, j, k, l < nnums1[i] nums…

weblogic中间件漏洞复现

后台弱口令getshell 1.开启环境 cd vulhub-master/weblogic/weak_password docker-compose up -d docker ps 2.f访问靶场 访问/console/login/LoginForm.jsp这个目录进行登录&#xff0c; 默认账号密码&#xff1a;weblogic/Oracle123 需要注意的是单个账号进行登录时&…

C++_CH18_构造函数与析构函数

C_CH18_构造函数与析构函数 1 类的默认成员函数 在编写类的时候&#xff0c;C编译器会默认生成6个默认的函数&#xff0c;但是不显示出来&#xff1a; 需要关注以下两个方面: 第一:我们不写时&#xff0c;编译器默认生成的函数行为是什么&#xff0c;是否满足我们的需求。 …

LabVIEW界面输入值设为默认值

在LabVIEW中&#xff0c;将前面板上所有控件的当前输入值设为默认值&#xff0c;可以通过以下步骤实现&#xff1a; 使用控件属性节点&#xff1a;你可以创建一个属性节点来获取所有控件的引用。 右键点击控件&#xff0c;选择“创建” > “属性节点”。 设置属性节点为“D…

实践出真知!8个案例速通栅格系统

在现代设计中&#xff0c;栅格系统作为一种重要的布局方案&#xff0c;能够有效提升设计的秩序感。对于 UI 设计领域&#xff0c;栅格系统也广泛用于跨屏幕的响应式设计&#xff0c;帮助设计师打造更好的多端体验。本文将简要介绍栅格系统的基本概念和搭建方法&#xff0c;并提…

什么是unix中的fork函数?

一、前言 在本专栏之前的文档中已经介绍过unix进程环境相关的概念了&#xff0c;本文将开始介绍unix中一个进程如何创建出新进程&#xff0c;主要是通过fork函数来实现此功能。本文将包含如下内容&#xff1a; 1.fork函数简介 2.父进程与子进程的特征 3.如何使用fork创建新进程…

依赖不对应导致java文件不能正常显示

项目中若出现非正常显示的java文件&#xff0c;检查下是否依赖版本不对应。&#xff08;前提必须是maven项目&#xff09;

网络原理(4)——网络层(IP)、数据链路层

1. IP 协议 基本概念&#xff1a; 主机&#xff1a;配有 IP 地址&#xff0c;但是不进行路由控制的设备 路由器&#xff1a;即配有 IP 地址&#xff0c;又能进行路由控制 节点&#xff1a;主机和路由器的统称 IP 协议报头格式 1) 4 位版本&#xff1a;实际上只有两个取值&…

通义灵码AI 程序员正式发布:写代码谁还动手啊

虽然见不到面 但你已深潜我心 前几天&#xff0c;在 2024 年的杭州云栖大会上&#xff0c;随着通义大模型能力的全面提升&#xff0c;阿里云通义灵码这位中国的首位 AI 程序员也迎来重大的升级。 一年前这位 AI 程序员还只能完成基础的编程任务&#xff0c;到现在可以做到几…

Leetcode 543. 124. 二叉树的直径 树形dp C++实现

问题&#xff1a;Leetcode 543. 二叉树的直径&#xff08;边权型&#xff09; 给你一棵二叉树的根节点&#xff0c;返回该树的 直径 。 二叉树的 直径 是指树中任意两个节点之间最长路径的 长度 。这条路径可能经过也可能不经过根节点 root 。两节点之间路径的 长度 由它们之…

探索未来:MultiOn,AI的下一个革命

文章目录 探索未来&#xff1a;MultiOn&#xff0c;AI的下一个革命背景&#xff1a;为什么选择MultiOn&#xff1f;MultiOn是什么&#xff1f;如何安装MultiOn&#xff1f;简单的库函数使用方法场景应用常见问题及解决方案总结 探索未来&#xff1a;MultiOn&#xff0c;AI的下一…

图表示学习中的Transformer:Graphormer的突破

人工智能咨询培训老师叶梓 转载标明出处 在自然语言处理和计算机视觉等领域&#xff0c;Transformer架构已经成为主导选择。然而&#xff0c;在图级别的预测任务中&#xff0c;它的表现并不如主流的图神经网络&#xff08;GNN&#xff09;变体。这一现象引发了一个思考&#x…

指针变量的自增、自减运算

指针变量的自增、自减运算相比较于普通变量的自增、自减运算又什么区别呢&#xff1f; 让我们先来复习一下普通变量的自增、自减运算 int main() {int i; //定义一个整型变量printf("请输入一个数字&#xff1a;\n");scanf("%d&qu…

JetBrains系列产品无限重置免费试用方法

JetBrains系列产品无限重置免费试用方法 写在前面安装插件市场安装插件 写在前面 支持的产品&#xff1a; IntelliJ IDEA AppCode CLion DataGrip GoLand PhpStorm PyCharm Rider RubyMine WebStorm为了保证无限重置免费试用方法的稳定性&#xff0c;推荐下载安装2021.2.2及其…

QT Creator cmake 自定义项目结构, 编译输出目录指定

1. 目的 将不同的源文件放到不同的目录下进行管理&#xff0c; 如下&#xff1a; build: 编译输出目录 include: 头文件目录 rsources: 资源文件目录 src: cpp文件目录 2. 创建完cmake工程后修改CMakeLists.txt 配置 注 &#xff1a; 这里头文件目录是include, 所以在includ…

CSS05-复合选择器

一、什么是复合选择器 1-1、后代选择器&#xff08;重要&#xff09; 示例1&#xff1a; 示例2&#xff1a; 示例3&#xff1a; 1-2、子选择器 示例&#xff1a; 1-3、并集选择器&#xff08;重要&#xff09; 示例&#xff1a; 1-4、伪类选择器 1、链接伪类选择器 注意事项&am…

CVPR最牛图像评价算法!

本文所涉及所有资源均在 传知代码平台可获取。 目录 概述 一、论文思路 1.多任务学习框架&#xff1a; 2.视觉-语言对应关系&#xff1a; 3.动态损失权重&#xff1a; 4.模型优化和评估&#xff1a; 二、模型介绍 三、详细实现方法 1.图像编码器和语言编码器&#xff08;Image…

德蒂企鹅PAEDIPROTECT:德国医研力作,专为敏感肌婴幼儿量身打造

新生儿的诞生总是伴随着喜悦&#xff0c;也充满着手忙脚乱&#xff0c;尤其是敏感肌宝宝的皮肤护理。宝宝的皮肤如同初绽的花瓣&#xff0c;皮肤角质层薄而脆弱&#xff0c;容易受到外界刺激物的影响&#xff0c;水分流失快&#xff0c;经常会出现干燥、瘙痒、红斑甚至湿疹等症…

【ARM】AMBA和总线

AMBA AMBA&#xff08;Advanced Microcontroller Bus Architecture&#xff09; 总线是由ARM公司提出的一种开放性的片上总线标准&#xff0c;它独立于处理器和工艺技术&#xff0c;具有高速度低功耗等特点。 总线&#xff1a;系统芯片中各个模块之间需要有接口来连接。总线作…

爬虫类Chrome去除前端无限debugger反调试(轻松分析算法)

文章目录 引言方法1(简易抓包或者分析js适用)方法2(解决实际问题-最简单的方法)方法3(解决实际问题-麻烦点也是学会fiddler的一个功能)第一步&#xff1a;熟悉界面的大致功能意思第二步&#xff1a;保存出需要替换的代码&#xff0c;记住保存位置&#xff0c;待会儿要用第三步&…