1、业务场景
用一个 Spring Boot 的项目去实现对管控设备的监控、日志收集等。同时需要将接收到的日志进行入库,每天存一张表,如device_log_20231026…
2、Syslog客户端(接收日志的服务器,即运行Java程序的服务器)
2.1 导包
Syslog需要用到的jar包:
<dependency>
<groupId>org.graylog2</groupId>
<artifactId>syslog4j</artifactId>
<version>0.9.61</version>
</dependency>
Mybatis Plus 需要的jar包:(因为笔者此处使用MP进行数据库的交互)
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.2</version>
</dependency>
2.2 Syslog 接收端
数据库实体类 DeviceSyslog.java
package com.haitai.domain.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* 设备syslog
*
* @Author xincheng.du
* @Date 2023/10/24 13:50
*/
@Data
@TableName("device_log")
public class DeviceSyslog implements Serializable {
/**
* 主键id
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 设备ip
*/
@TableField("ip")
private String ip;
/**
* 日志信息
*/
@TableField("message")
private String message;
/**
* 日志级别
*/
@TableField("level")
private Integer level;
/**
* 日志时间
*/
@TableField("log_time")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date logTime;
/**
* 入库时间
*/
@TableField("create_time")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
}
DeviceSyslogMapper.java
package com.haitai.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.haitai.domain.entity.DeviceSyslog;
/**
* 设备syslog Mapper
*
* @Author xincheng.du
* @Date 2023/5/25 14:10
*/
public interface DeviceSyslogMapper extends BaseMapper<DeviceSyslog> {
}
SyslogUdpUtil.java
此处的 IP 和 Port 填写你运行Java程序的 IP 和你需要开放的端口。因为我的接收端和发送端在同一个局域网,所以我此处是 192.168.110.28 ,端口我此处设置为 3780.
package com.haitai.syslog;
import com.haitai.service.DeviceSyslogService;
import lombok.extern.slf4j.Slf4j;
import org.graylog2.syslog4j.SyslogConstants;
import org.graylog2.syslog4j.server.SyslogServer;
import org.graylog2.syslog4j.server.SyslogServerEventIF;
import org.graylog2.syslog4j.server.SyslogServerIF;
import org.graylog2.syslog4j.server.SyslogServerSessionlessEventHandlerIF;
import org.graylog2.syslog4j.server.impl.net.udp.UDPNetSyslogServerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.net.SocketAddress;
import java.util.Objects;
/**
* Syslog 接收端
*
* @Author xincheng.du
* @Date 2023/10/25 15:13
*/
@Slf4j
@Component
public class SyslogUdpUtil {
@Value("${syslog.udp.ip}")
private String ip;
@Value("${syslog.udp.port}")
private int port;
@Resource
private DeviceSyslogService deviceSyslogService;
/**
* 项目启动时,启动一个Syslog服务端监听指定端口,接收设备的日志消息
*/
@PostConstruct
public void init() {
new Thread(() -> {
log.info("服务端({})监听设备syslog日志", ip + ":" + port);
// 服务端
SyslogServerIF serverInstance = SyslogServer.getInstance(SyslogConstants.UDP);
UDPNetSyslogServerConfig serverConfig = (UDPNetSyslogServerConfig) serverInstance.getConfig();
serverConfig.setHost(ip);
serverConfig.setPort(port);
// 防止数据过大被截取导致不完整
serverConfig.setMaxMessageSize(SyslogConstants.SYSLOG_BUFFER_SIZE * 10);
serverConfig.addEventHandler(new SyslogServerSessionlessEventHandlerIF() {
@Override
public void event(SyslogServerIF syslogServerIF, SocketAddress socketAddress, SyslogServerEventIF syslogServerEventIF) {
if (Objects.nonNull(syslogServerEventIF)) {
deviceSyslogService.saveSyslog(socketAddress, syslogServerEventIF);
}
}
@Override
public void exception(SyslogServerIF syslogServerIF, SocketAddress socketAddress, Exception e) {}
@Override
public void initialize(SyslogServerIF syslogServerIF) {}
@Override
public void destroy(SyslogServerIF syslogServerIF) {}
});
// 启动服务端
serverInstance.run();
}).start();
}
}
DeviceSyslogService.java
package com.haitai.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.haitai.domain.entity.DeviceSyslog;
import com.haitai.domain.param.syslog.DeviceSyslogQueryParam;
import com.haitai.domain.vo.TableDataInfo;
import org.graylog2.syslog4j.server.SyslogServerEventIF;
import java.net.SocketAddress;
/**
* 设备syslog 相关接口
*
* @Author xincheng.du
* @Date 2023/10/25 11:06
*/
public interface DeviceSyslogService extends IService<DeviceSyslog> {
/**
* 日志分页
*
* @param param 参数
* @return 分页列表
*/
TableDataInfo pageSyslog(DeviceSyslogQueryParam param);
/**
* 日志入库
*
* @param socketAddress socket地址
* @param syslogServerEventIF syslog事件拓展接口
*/
void saveSyslog(SocketAddress socketAddress, SyslogServerEventIF syslogServerEventIF);
/**
* 创建日志表
*
* @param tableName 表名
*/
void createTable(String tableName);
/**
* 判断表是否存在
*
* @param tableName 表名
*/
boolean existTable(String tableName);
}
DeviceSyslogServiceImpl.java
其中 Constants.DEVICE_LOG_BASE_TABLE_NAME = “device_log”
package com.haitai.service.impl;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.haitai.config.mybatis.RequestDataHelper;
import com.haitai.constant.Constants;
import com.haitai.constant.HttpStatus;
import com.haitai.domain.entity.DeviceSyslog;
import com.haitai.domain.param.syslog.DeviceSyslogQueryParam;
import com.haitai.domain.vo.TableDataInfo;
import com.haitai.mapper.DeviceSyslogMapper;
import com.haitai.service.DeviceSyslogService;
import com.haitai.utils.LocalCache;
import lombok.extern.slf4j.Slf4j;
import org.graylog2.syslog4j.server.SyslogServerEventIF;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.sql.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Objects;
/**
* 设备syslog 相关逻辑处理
*
* @Author xincheng.du
* @Date 2023/10/24 14:18
*/
@Slf4j
@Service
public class DeviceSyslogServiceImpl extends ServiceImpl<DeviceSyslogMapper, DeviceSyslog> implements DeviceSyslogService {
/**
* 数据库用户名
*/
@Value("${spring.datasource.druid.master.username}")
private String username;
/**
* 数据库密码
*/
@Value("${spring.datasource.druid.master.password}")
private String password;
/**
* 数据库url
*/
@Value("${spring.datasource.druid.master.url}")
private String url;
@Override
public TableDataInfo pageSyslog(DeviceSyslogQueryParam param) {
HashMap<String, Object> map = new HashMap<>();
map.put("date", Objects.nonNull(param.getLogTime()) ? param.getLogTime() : new Date());
RequestDataHelper.setRequestData(map);
LambdaQueryWrapper<DeviceSyslog> deviceLambdaQueryWrapper = new LambdaQueryWrapper<>();
deviceLambdaQueryWrapper
.eq(Objects.nonNull(param.getId()), DeviceSyslog::getId, param.getId())
.like(Objects.nonNull(param.getIp()), DeviceSyslog::getIp, param.getIp())
.like(Objects.nonNull(param.getMessage()), DeviceSyslog::getMessage, param.getMessage())
.eq(Objects.nonNull(param.getLevel()), DeviceSyslog::getLevel, param.getLevel())
// 根据创建时间倒序排列
.orderByDesc(DeviceSyslog::getCreateTime);
Page<DeviceSyslog> page = new Page<>(param.getPage().longValue(), param.getLimit().longValue());
Page<DeviceSyslog> result = baseMapper.selectPage(page, deviceLambdaQueryWrapper);
TableDataInfo rspData = new TableDataInfo();
rspData.setCode(HttpStatus.SUCCESS);
rspData.setMsg("查询成功");
rspData.setRows(result.getRecords());
rspData.setTotal(result.getTotal());
return rspData;
}
@Override
@Transactional(rollbackFor = Exception.class)
public void saveSyslog(SocketAddress socketAddress, SyslogServerEventIF syslogServerEventIF) {
java.util.Date now = new Date();
String yyyyMMdd = DateUtil.format(now, "yyyyMMdd");
String tableName = Constants.DEVICE_LOG_BASE_TABLE_NAME + "_" + yyyyMMdd;
// 表不存在先创建
if (!existTable(tableName)) {
createTable(tableName);
}
DeviceSyslog deviceSyslog = new DeviceSyslog();
if (socketAddress instanceof InetSocketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
deviceSyslog.setIp(inetSocketAddress.getHostName());
}
deviceSyslog.setLevel(syslogServerEventIF.getLevel());
deviceSyslog.setMessage(syslogServerEventIF.getMessage());
deviceSyslog.setLogTime(syslogServerEventIF.getDate());
deviceSyslog.setCreateTime(now);
save(deviceSyslog);
}
@Override
public void createTable(String tableName) {
try (Connection connection = DriverManager.getConnection(url, username, password);
Statement statement = connection.createStatement()) {
String createTableSQL = "CREATE TABLE " + tableName + " (" +
" `id` bigint NOT NULL AUTO_INCREMENT," +
" `ip` varchar(50) DEFAULT NULL," +
" `message` varchar(1000) DEFAULT NULL," +
" `level` int DEFAULT NULL," +
" `log_time` datetime DEFAULT NULL," +
" `create_time` datetime DEFAULT NULL," +
" PRIMARY KEY (`id`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;";
statement.execute(createTableSQL);
log.info("系统日志表创建成功,表名:{}", tableName);
LocalCache.set(tableName, Boolean.TRUE);
} catch (Exception e) {
LocalCache.set(tableName, Boolean.FALSE);
log.error("系统日志表创建失败,表名:{}", tableName);
e.printStackTrace();
}
}
@Override
public boolean existTable(String tableName) {
Object o = LocalCache.get(tableName);
if (Objects.nonNull(o)) {
return (Boolean) o;
}
try (Connection connection = DriverManager.getConnection(url, username, password)) {
DatabaseMetaData databaseMetaData = connection.getMetaData();
ResultSet resultSet = databaseMetaData.getTables(null, null, tableName, null);
if (resultSet.next()) {
LocalCache.set(tableName, Boolean.TRUE);
return true;
} else {
LocalCache.set(tableName, Boolean.FALSE);
return false;
}
} catch (Exception e) {
log.error("判断表名:{} 是否存在出错,原因:{}", tableName, e.getMessage());
e.printStackTrace();
return false;
}
}
}
缓存工具类 LocalCache.java
package com.haitai.utils;
import cn.hutool.cache.CacheUtil;
import cn.hutool.cache.impl.TimedCache;
import cn.hutool.core.date.DateUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 本地缓存工具类
*
* @Author xincheng.du
* @Date 2023/7/4 13:52
*/
@SuppressWarnings("unused")
public class LocalCache {
private LocalCache() {}
/**
* 默认缓存时长
*/
private static final long DEFAULT_TIMEOUT = 5 *DateUnit.MINUTE.getMillis();
/**
* 默认清理间隔时间
*/
private static final long CLEAN_TIMEOUT = 5 * DateUnit.MINUTE.getMillis();
/**
* 缓存对象
*/
private static final TimedCache<String, Object> TIMED_CACHE = CacheUtil.newTimedCache(DEFAULT_TIMEOUT);
static {
// 启动定时任务
TIMED_CACHE.schedulePrune(CLEAN_TIMEOUT);
}
/**
* 存值
*
* @param key 键
* @param value 值
*/
public static void set(String key, Object value) {
TIMED_CACHE.put(key, value);
}
/**
* 设置缓存并设置过期时间
*
* @param key 缓存key
* @param value 缓存值
* @param expire 过期时间,单位:ms
*/
public static void set(String key, Object value, long expire) {
TIMED_CACHE.put(key, value, expire);
}
/**
* 设置缓存并设置过期时间(自定义单位)
*
* @param key 缓存key
* @param value 缓存值
* @param expire 过期时间,单位:ms
* @param unit 过期时间单位,默认毫秒
*/
public static void set(String key, Object value, long expire, TimeUnit unit) {
if (Objects.nonNull(unit)) {
switch (unit) {
case NANOSECONDS:
TIMED_CACHE.put(key, value, expire / 1000000);
break;
case MICROSECONDS:
TIMED_CACHE.put(key, value, expire / 1000);
break;
case SECONDS:
TIMED_CACHE.put(key, value, expire * DateUnit.SECOND.getMillis());
break;
case MINUTES:
TIMED_CACHE.put(key, value, expire * DateUnit.MINUTE.getMillis());
break;
case HOURS:
TIMED_CACHE.put(key, value, expire * DateUnit.HOUR.getMillis());
break;
case DAYS:
TIMED_CACHE.put(key, value, expire * DateUnit.DAY.getMillis());
break;
case MILLISECONDS:
default:
TIMED_CACHE.put(key, value, expire);
break;
}
} else {
TIMED_CACHE.put(key, value, expire);
}
}
/**
* 获取并重新计算过期时间
*
* @param key 键
* @return 值
*/
public static Object getWithUpdateLastAccess(String key) {
return TIMED_CACHE.get(key);
}
/**
* 取值
*
* @param key 键
* @return 值
*/
public static Object get(String key) {
return TIMED_CACHE.get(key, false);
}
/**
* 获取所有缓存的key
*
* @return key集合
*/
public static Set<String> keySet() {
return TIMED_CACHE.keySet();
}
/**
* 单个删除
*
* @param key 键
*/
public static void remove(String key) {
TIMED_CACHE.remove(key);
}
/**
* 批量删除
*
* @param keys 键集合
*/
public static void removeAll(List<String> keys) {
for (String key : keys) {
TIMED_CACHE.remove(key);
}
}
/**
* 获取所有key
*
* @return key集合
*/
public static Set<String> getAllKeys() {
return TIMED_CACHE.keySet();
}
/**
* 获取包含关键词的所有key
*
* @return key集合
*/
public static List<String> getContainsKeys(String keyword) {
Set<String> allKeys = getAllKeys();
List<String> list = new ArrayList<>();
allKeys.forEach(item -> {
if (item.contains(keyword)) {
list.add(item);
}
});
return list;
}
/**
* 清空缓存
*/
public static void clear() {
TIMED_CACHE.clear();
}
}
动态表名拦截器 DynamicDateTableNameHandler.java:
package com.haitai.config.mybatis;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.extension.plugins.handler.TableNameHandler;
import com.haitai.constant.Constants;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import java.util.Date;
import java.util.Map;
import java.util.Objects;
/**
* 动态日期表名替换处理器
*
* @Author xincheng.du
* @Date 2023/10/25 10:25
*/
@NoArgsConstructor
public class DynamicDateTableNameHandler implements TableNameHandler {
@Override
public String dynamicTableName(String sql, String tableName) {
if (StringUtils.isNotBlank(tableName) && Constants.DEVICE_LOG_BASE_TABLE_NAME.equals(tableName)) {
Date date;
Map<String, Object> paramMap = RequestDataHelper.getRequestData();
if (CollUtil.isNotEmpty(paramMap) && Objects.nonNull(paramMap.get("date"))) {
date = (Date) paramMap.get("date");
} else {
// 为空日期取当天
date = new Date();
}
String tableNameSuffix = "_" + DateUtil.format(date, "yyyyMMdd");
return Constants.DEVICE_LOG_BASE_TABLE_NAME + tableNameSuffix;
}
return tableName;
}
}
将动态表名拦截器添加到MP的拦截器链中 MybatisPlusConfig.java :
package com.haitai.config.mybatis;
import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.DynamicTableNameInnerInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author dideng.zhang
* @version 1.0
* @date 2023/5/19 14:29
*/
@Configuration
public class MybatisPlusConfig {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
PaginationInnerInterceptor paginationInnerInterceptor = new PaginationInnerInterceptor();
paginationInnerInterceptor.setDbType(DbType.MYSQL);
interceptor.addInnerInterceptor(paginationInnerInterceptor);
// 配置动态表名拦截器
DynamicTableNameInnerInterceptor dynamicTableNameInnerInterceptor = new DynamicTableNameInnerInterceptor();
DynamicDateTableNameHandler dynamicDateTableNameHandler = new DynamicDateTableNameHandler();
dynamicTableNameInnerInterceptor.setTableNameHandler(dynamicDateTableNameHandler);
interceptor.addInnerInterceptor(dynamicTableNameInnerInterceptor);
return interceptor;
}
}
请求参数传递辅助类
package com.haitai.config.mybatis;
import cn.hutool.core.collection.CollUtil;
import java.util.Map;
/**
* 请求参数传递辅助类
*/
public class RequestDataHelper {
private RequestDataHelper() {}
/**
* 请求参数存取
*/
private static final ThreadLocal<Map<String, Object>> REQUEST_DATA = new ThreadLocal<>();
/**
* 设置请求参数
*
* @param requestData 请求参数 MAP 对象
*/
public static void setRequestData(Map<String, Object> requestData) {
REQUEST_DATA.set(requestData);
}
/**
* 获取请求参数
*
* @param param 请求参数
* @return 请求参数 MAP 对象
*/
public static <T> T getRequestData(String param) {
Map<String, Object> dataMap = getRequestData();
if (CollUtil.isNotEmpty(dataMap)) {
return (T) dataMap.get(param);
}
return null;
}
/**
* 获取请求参数
*
* @return 请求参数 MAP 对象
*/
public static Map<String, Object> getRequestData() {
return REQUEST_DATA.get();
}
}
3、Syslog服务端(发送日志的服务器)
3.1 rsyslog.conf配置修改
(1)打开服务器的 /etc/rsyslog.conf 配置文件:
vi /etc/rsyslog.conf
(2)在文件末尾添加:
*.* @192.168.110.28:3780
此处的 ip 和 port 和你上述代码中的对应即可,同时还要保证这两台设备能相互访问。这里的一个“@”代表使用UDP通信,两个“@”代表使用TCP通信。
(3)保存并关闭文件,先按 esc,然后输入:
:wq
(4)防火墙开放3780端口:
firewall-cmd --permanent --add-port=3780/udp
(5)重置防火墙
firewall-cmd --reload
(6)重启 syslog 服务:
systemctl restart rsyslog
4. 测试
启动 Java 项目,过一会儿,可以看到我们的表已经创建成功了,同时已经有数据入库。