业务数据如何存储一直以来都是项目开发中的一个比较重要的话题。我们要从资源的利用率,业务场景和技术实现多个方面考虑存储的问题。“抛开业务谈技术就是耍流氓”,所有技术架构都要站在实际的业务场景中分析。比如个人端的产品,这种就属于读多写少的业务,对于这种系统的架构就要更多的考虑增加缓存来减少数据库读压力;B端的产品,基本上属于写多读少的业务,客户数量不多但是单个客户的qps会非常高,要减少这种业务对数据库的压力,数据写入一般要采用批处理。物联网类型的产品,基本上也是属于写多读少的场景,一个设备会有几百上千的测点数据上报,并且上报的频率非常均匀,对于这种类型的系统我们要保证数据能够及时写入数据库并且不能对数据库产生太大的压力,一般我们就采用批量写库方案。下面就针对这种写多读少的物联网系统设计方案分享一下个人的经验。
我最近在做的一个系统是接收储能系统上报的数据,设备每10s上报一轮数据,云端接收到数据后做一些基本的处理判断逻辑,就将数据存入数据库。业务端展示数据分为历史数据和实时数据,历史数据用于其他系统或业务部门分析,实时数据需要做一些预警和最新数据展示。所以我设计的系统机构图大致如下:
系统使用kafka消息队列将接收数据和处理数据的两个服务进行解耦,在这个系统中,最为核心的服务就是数据服务,它要负责数据的处理、入库、查询等多个功能。在批量写数据库时我们要考虑两个方面:一个是批量插入数据多少条比较合适,另外一个就是如果数据积累比较缓慢最长等待多久就要执行一次批量写。
综合以上的分析,我设计的批量写入数据库采用 缓存队列+定时任务 方式实现,这样既能满足批量写入数据库的要求,又能满足及时更新数据库。
既然数据有缓存,就存在缓存数据丢失的风险,规避数据丢失的问题,行业内采用比较多的方案就是WAL(Write-Ahead Logging)。对于我的系统就是数据服务从kafka消费到数据后先写本地日志,再将数据放入缓存队列,然后通知kafka消费数据成功;数据在缓存队列中积累到批量写入数据库的条数或到达一定时间后就插入数据库。在这个过程中如果服务宕机导致缓存队列的数据没有入库,我们也可以通过本地日志找回数据,再次启动服务后重新加载插入数据库。
对于日志中的数据,会有一个标识标记当前已经入库的位置,当缓存队列的数据写入数据库后,更新日志标记点,标记点之前的数据都是写入数据库成功的,标记点之后的数据是目前还在缓存中并未写入数据库。如果程序正常退出,会有一个钩子函数清理缓存队列,将队列中的所有数据都写入数据库,清理标记点;如果程序异常退出,那么标记点之后的数据都是不安全的,在程序下一次正常启动时检查这个标记点,将标记点之后的数据重新加载进程序并写入数据库。这里的标记点通过在项目部署目录下创建一个文件来实现:程序每次写入数据库后更新这个标记点并将它写入文件,程序正常退出就通过钩子函数删除这个文件。下次程序启动后检查文件是否存在:不存在就是正常退出不用做任何处理;存在就是异常退出,这时就要读取标记点的值,把它之后的所有数据重新写入数据库。
通过上面的分析,整个写入数据的流程图如下:
对于标记点的设计结构如下:
由于设备上报的数据有多种类型,那么缓存队列就存在多个,每个队列对应的标记点就有多个,这里选择数据存入缓存队列的时间戳作为标记,占用4个字节,每次写入数据库成功后就将这个时间戳写入到文件,多个缓存队列就依次向后写入文件,为了提高写文件性能,使用mmap将文件映射到内存中,通过修改内存中的值达到写文件的目的。
以上分析了整个项目的架构和实现逻辑,接下来就通过代码实现上面的这些逻辑。
talk is cheap, show me the code
一、项目结构:
使用SpringBoot3.2 + kafka3.6 + jdk17。
maven依赖如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
<version>1.18.30</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- redis相关依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!-- 数据库相关依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.5</version>
<exclusions>
<exclusion>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
</dependency>
二、日志记录实现
写日志部分,就不单独实现了,采用logback框架写日志,单个日志文件大小设置为1GB,日志滚动策略采用大小+日期方式。配置文件在resource目录下的 logback-spring.xml 文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml" />
<property name="LOG_PATH" value="logs"/>
<property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS}|%t|%-5p|%c|%m%n"/>
<!-- 业务日志:记录系统接收到的数据,用于数据丢失处理 -->
<appender name="redo" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/redo.log</file>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|%m%n</pattern>
<charset>UTF-8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<FileNamePattern>${LOG_PATH}/redo-%i.log.%d{yyyy-MM-dd}</FileNamePattern>
<maxHistory>7</maxHistory>
<maxFileSize>1GB</maxFileSize>
<totalSizeCap>40GB</totalSizeCap>
</rollingPolicy>
</appender>
<logger name="redo" level="INFO" additivity="false">
<appender-ref ref="redo"/>
</logger>
<!-- 控制台 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|%thread|[%-5level]|%logger{36}.%method|%msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- 错误日志:输出所有错误日志信息 -->
<appender name="FILE-ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/error/error.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/error/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
<maxHistory>30</maxHistory>
<maxFileSize>500MB</maxFileSize>
<totalSizeCap>10GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>${FILE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- WARN日志:输出所有WARN日志信息 -->
<appender name="FILE-WARN" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/warn/warn.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/warn/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
<maxHistory>30</maxHistory>
<maxFileSize>500MB</maxFileSize>
<totalSizeCap>10GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>${FILE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>WARN</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 消息日志:输出所有消息日志信息 -->
<appender name="FILE-INFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/info/info.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/info/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
<maxHistory>30</maxHistory>
<maxFileSize>500MB</maxFileSize>
<totalSizeCap>10GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>${FILE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 调试日志:输出所有消息日志信息 -->
<appender name="FILE-DEBUG" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/debug/debug.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/debug/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
<maxHistory>30</maxHistory>
<maxFileSize>500MB</maxFileSize>
<totalSizeCap>10GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>${FILE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>DEBUG</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE-ERROR"/>
<appender-ref ref="FILE-WARN"/>
<appender-ref ref="FILE-INFO"/>
<appender-ref ref="FILE-DEBUG"/>
</root>
</configuration>
日志格式为: 日志时间|消息主题|消息内容 ,消息主题就是kafka的主题,时间戳是接收到消息的时间戳,这个时间戳用于异常退出时下次启动服务恢复消息时判断数据是在标记点前后的依据,消息内容就是具体消费的信息。示例数据如下:
2024-09-23 15:03:19.378|batch-message|{"data":"{\"id\":\"1727074999121\",\"code\":\"code1--1\",\"name\":\"name1--1\",\"random\":\"0eaf04363d5a49419967d500dc149e5a\",\"createTime\":\"2024-09-23 15:03:19\"}","dataType":"Work1Data"}
2024-09-23 15:03:19.734|batch-message|{"data":"{\"id\":\"1727074999359\",\"code\":\"code2--1\",\"name\":\"name2--1\",\"random\":\"89b8b315f4e84fedb5af799ce4860c56\",\"createTime\":\"2024-09-23 15:03:19\"}","dataType":"Work2Data"}
三、消费KAFKA数据
数据是通过消费kafka队列接收消息,如果消息被成功接收并且写入本地日志和缓存队列,那么就提交ack给kafka,实现代码如下:
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.xingo.common.JacksonUtils;
import org.xingo.consumer.batch.BatchWorkFactory;
import org.xingo.consumer.batch.impl.BatchWork1;
import org.xingo.consumer.batch.impl.BatchWork2;
import org.xingo.domain.Work1Data;
import org.xingo.domain.Work2Data;
import java.util.List;
/**
* @Author xingo
* @Date 2024/9/13
*/
@Slf4j
@Component
public class KafkaConsumer {
Logger redoLogger = LoggerFactory.getLogger("redo");
@Autowired
private BatchWorkFactory batchWorkFactory;
/**
* 数据消费者
*/
@KafkaListener(topics = "batch-message")
public void linsten01(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
if(records != null) {
for (ConsumerRecord<String, String> record : records) {
// 写本地日志
long ts = System.currentTimeMillis();
System.out.println(record.value());
redoLogger.info("{}|{}", record.topic(), record.value());
// 解析消息内容,保存到本地缓存队列
try {
JsonNode json = JacksonUtils.getObjectMapper().readTree(record.value());
String dataType = json.get("dataType").asText();
if("Work1Data".equals(dataType)) {
Work1Data work1Data = JacksonUtils.parseObject(json.get("data").asText(), Work1Data.class);
work1Data.setTs(ts);
batchWorkFactory.get(BatchWork1.class).add(work1Data);
} else if("Work2Data".equals(dataType)) {
Work2Data work2Data = JacksonUtils.parseObject(json.get("data").asText(), Work2Data.class);
work2Data.setTs(ts);
batchWorkFactory.get(BatchWork2.class).add(work2Data);
}
} catch (JsonProcessingException e) {
log.error("解析数据异常", e);
}
}
//提交offset消费成功
ack.acknowledge();
}
}
}
这里面json反序列化使用到了Jackson工具类:
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Locale;
import java.util.TimeZone;
/**
* json工具
*
* @Author xingo
* @Date 2023/12/15
*/
public class JacksonUtils {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
static {
// Long类型处理,避免前端处理长整型时精度丢失
SimpleModule module1 = new SimpleModule();
module1.addSerializer(Long.class, ToStringSerializer.instance);
module1.addSerializer(Long.TYPE, ToStringSerializer.instance);
JavaTimeModule module2 = new JavaTimeModule();
// java8日期处理
module2.addSerializer(LocalDateTime.class,
new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
module2.addSerializer(LocalDate.class,
new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
module2.addSerializer(LocalTime.class,
new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
module2.addDeserializer(LocalDateTime.class,
new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
module2.addDeserializer(LocalDate.class,
new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
module2.addDeserializer(LocalTime.class,
new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
OBJECT_MAPPER
// 添加modules
.registerModules(module1, module2, new Jdk8Module())
// 日期类型不转换为时间戳
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false)
// 反序列化的时候如果多了其他属性,不抛出异常
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
// 如果是空对象的时候,不抛异常
.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
// 空对象不序列化
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
// 日期格式化
.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"))
// 设置时区
.setTimeZone(TimeZone.getTimeZone("GMT+8"))
// 驼峰转下划线
// .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
// 语言
.setLocale(Locale.SIMPLIFIED_CHINESE);
}
/**
* 反序列化
* @param json json字符串
* @param clazz 发序列化类型
* @return
* @param <T>
*/
public static <T> T parseObject(String json, Class<T> clazz) {
if(json == null) {
return null;
}
try {
return OBJECT_MAPPER.readValue(json, clazz);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
public static <T> T parseObject(String json, TypeReference<T> type) {
if(json == null) {
return null;
}
try {
return OBJECT_MAPPER.readValue(json, type);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
public static <T> T parseObject(byte[] bytes, TypeReference<T> type) {
if(bytes == null) {
return null;
}
try {
return OBJECT_MAPPER.readValue(bytes, type);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 反序列化列表
* @param json
* @return
* @param <T>
*/
public static <T> List<T> parseArray(String json) {
if(json == null) {
return null;
}
try {
TypeReference<List<T>> type = new TypeReference<List<T>>(){};
return OBJECT_MAPPER.readValue(json, type);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
/**
* 写为json串
* @param obj 对象
* @return
*/
public static String toJSONString(Object obj) {
if(obj == null) {
return null;
}
try {
return OBJECT_MAPPER.writeValueAsString(obj);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
public static byte[] toJSONBytes(Object obj) {
if(obj == null) {
return null;
}
try {
return OBJECT_MAPPER.writeValueAsBytes(obj);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 获取jackson对象
* @return
*/
public static ObjectMapper getObjectMapper() {
return OBJECT_MAPPER;
}
}
四、数据缓存队列
缓存数据的队列有两种数据结构可选:数组和链表,链表适用于数据量不定的场景,随机增减节点非常方便;数组长度一定,随机增减性能比较低,对于顺序读写性能比链表要好。这里的缓存主要是尾部添加,一次读取入库的场景,综合来看,数组性能会好一些,由于缓存数据条数不一定,只有最大长度限制,选择ArrayList这个结构非常方便。至于并发问题,在读写时加锁处理。
对于数据缓存这部分逻辑,我们定义一个抽象类叫BatchWork,它主要用于公共处理逻辑的抽取,对于不同数据结构的对象,只需要继承这个父类,并且实现该对象批量处理数据的逻辑即可。
批处理抽象父类定义如下:
import lombok.extern.slf4j.Slf4j;
import org.xingo.domain.WorkData;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
* 批处理任务父类
*
* @Author xingo
* @Date 2024/9/13
*/
@Slf4j
public abstract class BatchWork<T extends WorkData> {
/**
* 最大时间间隔,超过这个时间间隔还没有执行数据批量插入就执行定时任务
*/
public static final int DIFF_TIMESTAMP = 30_000;
/**
* 执行时间间隔:30s
*/
public static final int period = 30; // 执行的间隔时间
/**
* 批量插入数据最大条数
*/
protected short batchSize = 1000;
/**
* 最近一次执行数据插入时间戳
*/
protected long lastTimestamp = 0L;
/**
* 任务名称
*/
protected BatchWorkEnum batchWork;
/**
* 系统操作线程池:构建一个核心池大小是8、线程池最大线程数是16的线程池,可以根据主机CPU核数进行调整
*/
public static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8, 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(8), new ThreadFactory() {
private final AtomicInteger number = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("batch-savedb-" + number.getAndIncrement());
return thread;
}
});
/**
* 定时任务线程池:定时检查缓存队列,通过判断缓存队列最后一次执行时间来判断是否要将缓存队列数据插入数据库表中
*/
private static final ScheduledExecutorService taskExecutor = Executors.newScheduledThreadPool(16, new ThreadFactory() {
private final AtomicInteger number = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("batch-work-" + number.getAndIncrement());
return thread;
}
});
/**
* 缓存数据列表
*/
private List<T> datas = new ArrayList<>(batchSize);
/**
* 锁对象
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* 启动定时执行任务方法,在项目启动时启动
*/
public void run() {
if(batchWork == null) {
throw new RuntimeException("属性 [batchWork] 不能为空");
}
// 最后更新时间与当前时间差大于时间间隔,将执行一次批处理任务
Runnable task = () -> {
long diff = System.currentTimeMillis() - lastTimestamp;
if(diff >= DIFF_TIMESTAMP) {
batch();
}
};
Random random = new Random();
long initialDelay = random.nextInt(10, 30); // 初始延迟时间
// 为了让任务尽可能均匀分布,所有要批量处理的任务初始时间随机生成
taskExecutor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
BatchWorkFactory.WORK_NUMS.incrementAndGet();
log.info("{}批量任务开始执行", batchWork);
}
/**
* 服务关闭时执行
*/
public void clear() {
log.info("项目关闭开始清理{}批量任务", batchWork);
this.batch();
BatchWorkFactory.WORK_NUMS.decrementAndGet();
log.info("{}批量任务已结束", batchWork);
}
/**
* 添加数据方法:将要插入数据添加到缓存队列
* @param data
*/
public void add(T data) {
lock.lock();
try {
datas.add(data);
} finally {
lock.unlock();
}
this.cache(data);
// 添加数据后判断缓存数据条数,如果缓存条数超过了批处理阈值,执行一次批处理,结合定时任务就实现了“最大缓存条数或时间间隔”内插入数据的逻辑
if(datas.size() >= batchSize) {
batch();
}
}
/**
* 批量新增数据:将一批数据添加到缓存队列
* @param batch
*/
public void batchAdd(List<T> batch) {
lock.lock();
try {
datas.addAll(batch);
} finally {
lock.unlock();
}
if(datas.size() >= batchSize) {
batch();
}
}
/**
* 批量处理方法:具体子类要实现这个方法
*/
public abstract void batchInsertDb(List<T> datas);
/**
* 数据缓存:缓存最新一条数据
* @param data
*/
public abstract void cache(T data);
/**
* 批处理方法:
* 将当前缓存的批处理数据赋值给其他对象,当前缓存列表重新申请一个空间接收数据
*/
private void batch() {
log.info("调用{}批量处理方法", batchWork);
if(!datas.isEmpty()) {
List<T> copyList = null;
lock.lock();
try {
if(!datas.isEmpty()) {
// 定义一个局部变量指向原有的数组,原有的数组重新申请空间用于接收数据
copyList = datas;
datas = new ArrayList<>(batchSize);
}
} finally {
lock.unlock();
}
if(copyList != null && !copyList.isEmpty()) {
final List<T> dbList = copyList;
// 插入数据是比较耗时的过程,这里放入异步线程池慢慢执行,主线程继续接收数据
threadPool.execute(() -> {
log.info("{}写入数据库开始|{}", batchWork, dbList.size());
long s = System.currentTimeMillis();
this.batchInsertDb(dbList);
lastTimestamp = System.currentTimeMillis();
long _ts = dbList.get(dbList.size() - 1).getTs();
log.info("{}写入数据库完成|{}|{}|{}ms", batchWork, dbList.size(), new Timestamp(_ts), (lastTimestamp - s));
// 插入数据完成后,更新标记点
MarkPointLog.markPoint(batchWork, _ts);
});
}
}
}
}
demo项目模拟了两类数据批处理:
批处理任务1:
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.xingo.consumer.batch.BatchWork;
import org.xingo.consumer.batch.BatchWorkEnum;
import org.xingo.domain.Work1Data;
import org.xingo.common.JacksonUtils;
import org.xingo.mapper.Work1Mapper;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* 批处理子类1
*
* @Author xingo
* @Date 2024/9/13
*/
@Slf4j
@Service
public class BatchWork1 extends BatchWork<Work1Data> {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private Work1Mapper work1Mapper;
public BatchWork1() {
this.batchWork = BatchWorkEnum.Work1;
}
@Override
public void batchInsertDb(List<Work1Data> datas) {
try {
work1Mapper.batchInsert(datas);
} catch (Exception e) {
log.error("批量插入数据异常", e);
// 这里是批量插入数据如果异常,转为单条插入数据,一般的异常都是数据库主键冲突导致的
datas.forEach(data -> {
try {
work1Mapper.insert(data);
} catch (Exception ex) {
log.error("单条插入数据异常", ex);
}
});
}
log.info("批量处理数据|{}|{}", this.batchWork, datas.size());
}
@Override
public void cache(Work1Data data) {
// 缓存最新一条数据供其他业务使用
redisTemplate.opsForValue().set("data:work1", JacksonUtils.toJSONString(data), 30, TimeUnit.SECONDS);
}
}
批处理任务2:
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.xingo.consumer.batch.BatchWork;
import org.xingo.consumer.batch.BatchWorkEnum;
import org.xingo.common.JacksonUtils;
import org.xingo.domain.Work2Data;
import org.xingo.mapper.Work2Mapper;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* 批处理子类2
*
* @Author xingo
* @Date 2024/9/13
*/
@Slf4j
@Service
public class BatchWork2 extends BatchWork<Work2Data> {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private Work2Mapper work2Mapper;
public BatchWork2() {
this.batchWork = BatchWorkEnum.Work2;
}
@Override
public void batchInsertDb(List<Work2Data> datas) {
try {
work2Mapper.batchInsert(datas);
} catch (Exception e) {
log.error("批量插入数据异常", e);
datas.forEach(data -> {
try {
work2Mapper.insert(data);
} catch (Exception ex) {
log.error("单条插入数据异常", ex);
}
});
}
log.info("批量处理数据|{}|{}", this.batchWork, datas.size());
}
@Override
public void cache(Work2Data data) {
redisTemplate.opsForValue().set("data:work2", JacksonUtils.toJSONString(data), 30, TimeUnit.SECONDS);
}
}
这里只有两个批处理任务,如果批处理任务有更多个,那么就需要一个统一的地方管理这些任务:
定义一个工厂类管理批量任务,创建、销毁、获取这些批量任务都通过这个工厂类来实现:
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.core.type.filter.AssignableTypeFilter;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 批量任务工厂
*
* @Author xingo
* @Date 2024/9/13
*/
@Slf4j
@Component
public class BatchWorkFactory {
/**
* 批量任务集合
*/
private final Map<Class<? extends BatchWork>, BatchWork> map = new HashMap<>();
@Autowired
private ApplicationContext applicationContext;
@Autowired
private MarkPointLog markPointLog;
/**
* 工厂初始化是否完成
*/
private boolean ok = false;
/**
* 任务启动完成
*/
public static final AtomicInteger WORK_NUMS = new AtomicInteger(0);
@PostConstruct
public void run() {
// 构建bean缓存:通过spring提供的类扫描目录下的所有子类加载到集合中做统一管理
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false);
provider.addIncludeFilter(new AssignableTypeFilter(BatchWork.class));
Set<BeanDefinition> components = provider.findCandidateComponents("org/xingo/consumer/batch/impl");
for (BeanDefinition component : components) {
try {
Class<? extends BatchWork> clazz = (Class<? extends BatchWork>) Class.forName(component.getBeanClassName());
BatchWork bean = applicationContext.getBean(clazz);
map.put(clazz, bean);
log.info("初始化加载类实例信息|{}|{}", component.getBeanClassName(), bean);
} catch (Exception e) {
log.error("加载类异常", e);
}
}
// 初始标记服务
markPointLog.initMarkPointLog();
// 批处理任务启动方法
map.values().forEach(BatchWork::run);
try {
// 标记任务启动成功
while (WORK_NUMS.get() != map.size()) {
TimeUnit.MICROSECONDS.sleep(5);
}
ok = true;
} catch (Exception e) {
e.printStackTrace();
}
log.info("批处理任务全部启动");
}
/**
* 正常关闭处理
*/
@PreDestroy
public void clear() {
try {
// 批处理任务关闭方法
map.values().forEach(BatchWork::clear);
// 等待批量任务全部处理完成
while (WORK_NUMS.get() != 0) {
TimeUnit.SECONDS.sleep(1);
log.info("等待批处理任务结束,剩余{}个任务关闭", WORK_NUMS.get());
}
// 清理标记文件
markPointLog.destroyMarkPointLog();
log.info("批处理任务全部关闭");
} catch (Exception e) {
log.error("清理本地批处理任务异常", e);
}
}
/**
* 获取批量执行任务类
* @param workName
* @return
*/
public BatchWork get(Class<? extends BatchWork> workName) {
if(ok) {
return map.get(workName);
} else {
int cnt = 0;
while (!ok) {
try {
TimeUnit.MILLISECONDS.sleep(3);
if(cnt++ >= 10000) {
break;
}
} catch (Exception e) {
log.error("获取批处理任务异常", e);
}
}
return map.get(workName);
}
}
}
在BathWork抽象类中,数据批量插入数据库后要记录下当前插入数据的时间点工具类:MarkPointLog:
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.math.BigDecimal;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* 数据标记点,每次插入数据后就要更新标记点
*
* @Author xingo
* @Date 2024/9/13
*/
@Slf4j
@Component
public class MarkPointLog {
@Value("${user.dir}")
private String dirPath;
@Autowired
private LoadDataService loadDataService;
/**
* 标记服务正常关闭文件路径:
* 服务正常关闭时会将文件删除,
* 下次启动时如果文件存在,表示上次是异常关闭的,那么就会处理历史消息重新发布一次处理
*/
private String filePath = null;
/**
* 写文件
*/
private static MappedByteBuffer buffer = null;
/**
* 锁对象
*/
private static final ReentrantLock lock = new ReentrantLock();
/**
* 初始化
*/
public void initMarkPointLog() {
filePath = dirPath.endsWith(File.separator) ? (dirPath + "work") : (dirPath + File.separator + "work");
File file = new File(filePath);
if(file.exists()) {
RandomAccessFile rw = null;
try {
long minTs = System.currentTimeMillis();
rw = new RandomAccessFile(file, "r");
SimpleDateFormat datetimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
for (BatchWorkEnum mark : BatchWorkEnum.values()) {
long ts = rw.readLong();
if(ts < minTs) {
minTs = ts;
}
log.error("系统异常关闭最后提交数据时间戳|{}|{}|{}", mark, ts, datetimeFormat.format(ts));
}
final long min = minTs;
new Thread(() -> loadDataService.loadData(min), "loadLogWork").start();
} catch (Exception e) {
log.error("读取状态文件异常", e);
} finally {
if(rw != null) {
try {
rw.close();
} catch (IOException e) {
log.error("读取状态文件异常", e);
}
}
}
}
// 映射运行状态文件
FileChannel channel = null;
try {
channel = new RandomAccessFile(file, "rw").getChannel();
int size = BatchWorkEnum.values().length * 8;
buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size);
buffer.putLong(0);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (channel != null) {
channel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 清理标记点文件
*/
public void destroyMarkPointLog() {
if(filePath != null) {
File file = new File(filePath);
if(file.exists()) {
boolean rs = file.delete();
log.info("删除标识文件完成|{}|{}", filePath, rs);
}
}
}
/**
* 最大标记点
* @param workEnum
* @param ts
*/
public static void markPoint(BatchWorkEnum workEnum, long ts) {
lock.lock();
try {
int position = workEnum.getIdx() * 8;
buffer.position(position);
buffer.putLong(ts);
buffer.force();
} finally {
lock.unlock();
}
}
}
当数据在缓存队列中还未插入数据库之前系统宕机了,这就存在数据丢失风险,那么在系统启动后就要重新消费这部分数据:
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
/**
* @Author xingo
* @Date 2024/9/23
*/
@Slf4j
@Component
public class LoadDataService {
@Value("${user.dir}")
private String dirPath;
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 加载数据到系统等待再次入库处理
*
* @param timestamp
*/
public void loadData(long timestamp) {
// 最小时间再减去一个时间间隔,认为这段时间内的数据都是不安全的
long startTimestamp = timestamp - BatchWork.DIFF_TIMESTAMP;
long endTimestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(5);
String logPath = dirPath + File.separator + "logs";
File kafkaLogs = new File(logPath);
File file1 = null, file2 = null;
String yesterday = LocalDate.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
// 查找当前的日志文件
if(kafkaLogs.exists() && kafkaLogs.isDirectory()) {
File[] files = kafkaLogs.listFiles();
if(files != null) {
for(File file : files) {
String fname = file.getName();
if(fname.equals("redo.log")) {
log.info("匹配日志文件|{}", fname);
file1 = file;
} else if(fname.endsWith(yesterday)) {
log.info("匹配历史日志文件|{}", fname);
file2 = file;
}
}
}
}
if(file1 != null) {
sendDataToQueue(file1, startTimestamp, endTimestamp);
}
if(file2 != null) {
sendDataToQueue(file2, startTimestamp, endTimestamp);
}
}
/**
* 一行数据最大长度,用于数据查找时回退
*/
private int LINE_MAX_SIZE = 1000;
/**
* 发送数据到消息
* @param file
* @param startTimestamp
* @param endTimestamp
*/
private void sendDataToQueue(File file, long startTimestamp, long endTimestamp) {
// 查找数据:二分法查找要加载的数据
RandomAccessFile raf = null;
SimpleDateFormat datetimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
try {
raf = new RandomAccessFile(file, "rw");
long start = 0L;
long end = file.length();
long point = 0L;
String line = null;
// 2024-09-23 10:20:25.387|batch-message|{"data":{"id":"1727058025075","code":"code1--1727058025075","name":"name1--1727058025075","random":"bdfa8ca9d5084c25b9267f34e7bff6cf","createTime":"2024-09-23 10:20:25"},"dataType":"Work1Data"}
boolean find = false;
while (true) {
if(end - start < 60) {
break;
}
point = (start + end) / 2;
raf.seek(Math.max(0, point - LINE_MAX_SIZE));
line = raf.readLine();
if (StringUtils.isBlank(line)) {
break;
}
String[] arr = line.split("\\|");
if (arr.length != 3) {
line = raf.readLine();
if (line == null || "".equals(line)) {
break;
}
arr = line.split("\\|");
} else {
try {
datetimeFormat.parse(arr[0]);
} catch (ParseException e) {
line = raf.readLine();
if (line == null || "".equals(line)) {
break;
}
arr = line.split("\\|");
}
}
long time = datetimeFormat.parse(arr[0]).getTime();
if (time < startTimestamp) {
start = point;
} else {
find = true;
break;
}
}
if (find) {
raf.seek(start);
line = raf.readLine();
String[] arr = line.split("\\|");
if (arr.length != 3) {
line = raf.readLine();
} else {
try {
datetimeFormat.parse(arr[0]);
} catch (ParseException e) {
line = raf.readLine();
}
}
while (StringUtils.isNotBlank(line)) {
arr = line.split("\\|");
long time = datetimeFormat.parse(arr[0]).getTime();
if(time > endTimestamp) {
break;
}
// 这里将可能丢失的数据重新发送到消息队列再次消费持久化
if(time >= startTimestamp) {
kafkaTemplate.send(arr[1], arr[2]);
log.info("加载可能丢失的数据|{}", arr[2]);
}
line = raf.readLine();
}
}
} catch (Exception e) {
log.error("读取文件异常", e);
} finally {
try {
if(raf != null) {
raf.close();
}
} catch (IOException e) {
log.error("关闭文件异常", e);
}
}
}
}
是否加载数据主要是根据项目目录下释放存在一个标记文件:work
。这个文件在项目启动时创建,在项目正常关闭时删除这个文件。在数据批量插入成功后会把标记点也写入到这个文件中,由于存在多个缓存队列,每个缓存队列需要写入到这个文件的不同位置,这里需要定义一个枚举类记录每个缓存队列写入到这个文件的位置:
import org.xingo.consumer.batch.impl.BatchWork1;
import org.xingo.consumer.batch.impl.BatchWork2;
/**
* 批量任务名
*
* @Author xingo
* @Date 2024/9/13
*/
public enum BatchWorkEnum {
/**
* 批量任务1
*/
Work1((byte) 0, BatchWork1.class),
/**
* 批量任务2
*/
Work2((byte) 1, BatchWork2.class),
;
/**
* 批量任务队列的序列号,是一个从0开始递增的值;
* 它主要用于批量数据入库后最新数据时间点记录文件查找和写入时使用
* 如果新增缓存队列这个序号递增,不能更改已经存在的序号
* 如果要调整或删除已有的序号,要保证系统正常退出后再操作,否则会有数据丢失风险
*/
private byte idx;
/**
* 批量任务对应的类
*/
private Class<? extends BatchWork> work;
BatchWorkEnum(byte idx, Class<? extends BatchWork> work) {
this.idx = idx;
this.work = work;
}
public byte getIdx() {
return idx;
}
public Class<? extends BatchWork> getWork() {
return work;
}
}
五、数据对象处理
上面这些逻辑都完成了,最后一步就是数据持久化,我这里使用mysql数据库做持久化测试,数据对象和持久化代码如下:
统一的父类:
import java.io.Serializable;
/**
* @Author xingo
* @Date 2024/9/13
*/
public class WorkData implements Serializable {
/**
* 数据放入缓存队列时间
*/
protected long ts;
public Long getTs() {
return ts;
}
public void setTs(Long ts) {
this.ts = ts;
}
}
两个实体类:
实体类1:
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @Author xingo
* @Date 2024/9/13
*/
@Data
@TableName("t_work1")
public class Work1Data extends WorkData {
private Long id;
private String code;
private String name;
private String random;
private LocalDateTime createTime;
}
实体类2:
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @Author xingo
* @Date 2024/9/13
*/
@Data
@TableName("t_work2")
public class Work2Data extends WorkData {
private Long id;
private String code;
private String name;
private String random;
private LocalDateTime createTime;
}
xml文件:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.xingo.mapper.Work1Mapper">
<insert id="batchInsert" parameterType="hashmap">
INSERT INTO t_work1 (id, code, name, random, ts, create_time) VALUES
<foreach collection="datas" item="item" index="index" separator=",">
(#{item.id}, #{item.code}, #{item.name}, #{item.random}, #{item.ts}, #{item.createTime})
</foreach>
</insert>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.xingo.mapper.Work2Mapper">
<insert id="batchInsert" parameterType="hashmap">
INSERT INTO t_work2 (id, code, name, random, ts, create_time) VALUES
<foreach collection="datas" item="item" index="index" separator=",">
(#{item.id}, #{item.code}, #{item.name}, #{item.random}, #{item.ts}, #{item.createTime})
</foreach>
</insert>
</mapper>
Mapper接口:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.xingo.domain.Work1Data;
import java.util.List;
/**
* @Author xingo
* @Date 2024/9/23
*/
@Mapper
public interface Work1Mapper extends BaseMapper<Work1Data> {
void batchInsert(List<Work1Data> datas);
}
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.xingo.domain.Work2Data;
import java.util.List;
/**
* @Author xingo
* @Date 2024/9/23
*/
@Mapper
public interface Work2Mapper extends BaseMapper<Work2Data> {
void batchInsert(List<Work2Data> datas);
}
数据库建表SQL语句:
CREATE TABLE `t_work1` (
`id` bigint NOT NULL,
`code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`name` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`random` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`create_time` datetime NULL DEFAULT NULL,
`ts` bigint NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
CREATE TABLE `t_work2` (
`id` bigint NOT NULL,
`code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`name` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`random` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`create_time` datetime NULL DEFAULT NULL,
`ts` bigint NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
六、测试验证
- 正常情况:验证正常插入情况非常简单,编写一个测试方法不断的向kafka的主题中写入数据,观察数据消费和入库情况,通过对比发送数据和接收数据是否存在差异来确定服务是否有问题。
- 正常退出:在正常运行情况下,对批量处理服务执行
kill -15
关闭程序,正常退出情况下项目根目录下的work
文件会被清除,那么下次程序启动时就不需要重新加载日志数据。 - 异常退出:在正常运行情况下对批量处理服务执行kill -9 命令杀死进程,观察项目根目录下的
work
文件还存在,并没有被清除,这就表明程序在上次退出时是异常退出的,那么在程序再次启动时会重新加载日志数据,避免程序异常导致数据丢失的风险。