背景
GrayLog作为ELK的替代产品,是新生代的日志采集框架。在一个采集节点日志的需求中,因为节点很多,产生的日志也很多,因此尝试了使用GrayLog进行日志的采集。下面记录一下使用GrayLog中遇到的坑和解决方案。
一、部署与启动
采用Docker方式部署。需要部署三个容器:
graylog: 采集日志的服务,应用方对接graylog服务,进行日志的采集。
elasticsearch: graylog采集的日志最终存储到elasticsearch中。
mongodb: 存储graylog服务的元数据。
项目中使用的graylog版本是5.2.2,es版本是7.7.0,mongodb是最新版本。
Docker部署官方文档地址
启动mongo容器命令:
sudo docker run -d \
--name mongo \
--network demo\
-v /data/llmservice/mongodb:/data/db \
-p 27017:27017 \
mongo
启动es容器命令:
docker run --name elasticsearch --restart=always --privileged=true -d -e ES_JAVA_OPTS="-Xms512m -Xmx512m" -e "discovery.type=single-node" -p 9200:9200 -p 9300:9300 elasticsearch:7.7.0
启动graylog服务命令:
首先,生成加密密码,以密码为admin为例:
echo -n admin | shasum -a 256
然后,利用生成的密码启动容器:
sudo docker run -d --privileged=true \
--link mongo --link elasticsearch \
--name graylog \
--network demo\
-v /data/llmservice/graylog/log/:/usr/share/graylog/data/log \
-v /data/llmservice/graylog/journal/:/usr/share/graylog/data/journal \
-v /data/llmservice/graylog/contentpacks/:/usr/share/graylog/data/contentpacks \
-v /data/llmservice/graylog/plugin/:/usr/share/graylog/data/plugin \
-p 9009:9000 \
-p 12201:12201/udp \
-p 12201:12201 \
-p 1514:1514 \
-p 1514:1514/udp \
-e GRAYLOG_PASSWORD_SECRET=demo@qq.com.cn \
-e GRAYLOG_ROOT_PASSWORD_SHA2=7cff97eaf95509b6fe517568dbcd58e24190e8a34b8910907b54df0b957859bc \
-e GRAYLOG_HTTP_EXTERNAL_URI="http://192.168.0.101:9009/" \
-e GRAYLOG_ELASTICSEARCH_HOSTS="http://192.168.0.101:9200" \
-e GRAYLOG_ROOT_TIMEZONE="Asia/Shanghai" \
graylog/graylog:5.2.2
其中,journal文件夹挂载映射时,启动容器容易出现没有write权限的异常。解决方式是将宿主机上先创建journal文件夹,并且给这个文件夹chmod授777或775的权限即可。
graylog的9000端口是web界面的访问端口,也是API调用的接口。
12201和1514是Input的监听端口,默认是监听tcp协议,/udp表示同时监听udp协议。
GRAYLOG_ROOT_PASSWORD_SHA2使用第一步生成的加密密码,在登录web界面或调用API时,需要用到密码(解密的密码),用户名默认是admin。
如果想改用户名,需要在graylog容器内容的/usr/share/graylog/data/config/graylog.conf中修改root_username用户。此用户名和密码用于web界面的登录和API接口的调用auth认证。
GRAYLOG_HTTP_EXTERNAL_URI配置web界面访问的url地址。一般就是服务器ip和容器9000映射的端口。
GRAYLOG_ELASTICSEARCH_HOSTS配置es的地址。
GRAYLOG_ROOT_TIMEZONE配置时区,不配置时区,graylog默认使用美国时区,与国内时间对不上。
重点:
上面的-e后面的系统配置,可以在graylog.conf配置文件中查看,里面的配置项,都可以在启动命令里加-e,然后加GRAYLOG_前缀,然后所有字母都大写,就可以在启动命令里设置了。
GrayLog参数配置官方地址
二、创建Inputs
访问web管理界面,登录,选择System–>Inputs,创建Inputs监听。
填写Title,其他默认即可。这里选GELF UDP协议,选择原因来自官网:
What is the best way to integrate my applications to Graylog?
We recommend that you use GELF. It’s easy for your application developers and eliminates the need to store the messages locally. Also, GELF can just send what app person wants so you don’t have to build extractors or do any extra processing in Graylog.将我的应用程序集成到Graylog的最佳方式是什么?
我们建议您使用GELF。这对应用程序开发人员来说很容易,并且无需在本地存储消息。此外,GELF可以发送应用程序用户想要的内容,这样您就不必在Graylog中构建提取器或进行任何额外的处理。
出处地址
三、SpringBoot项目集成GrayLog,采集特定日志
网上讲SpringBoot集成GrayLog的博客,大都是demo级别的集成,只是简单的将log4j记录的日志推送到了GrayLog中。但是在实际项目中,不可能把所有的log.info的信息都采集到GrayLog中,而是将特定的日志采集到GrayLog中。需要做以下操作:
3.1 依赖gelf包
<dependency>
<groupId>de.siegmar</groupId>
<artifactId>logback-gelf</artifactId>
<version>4.0.0</version>
</dependency>
3.2 logback-spring.xml里配置gelf Appender
<configuration>
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="GELF" class="de.siegmar.logbackgelf.GelfUdpAppender">
<filter class = "com.demo.engine.logger.EngineFilter"/>
<graylogHost>graylog服务ip</graylogHost>
<graylogPort>12201</graylogPort>
<maxChunkSize>65467</maxChunkSize>
<encoder class="com.demo.engine.logger.EngineEncoder">
<!--
配置应用名称(服务名称),通过staticField标签可以自定义一些固定的日志字段
-->
<!--<staticField>app_name:${app_name}</staticField>
<staticField>desc:${desc}</staticField>-->
</encoder>
</appender>
<!-- 控制台输出日志级别 -->
<root level="info">
<appender-ref ref="GELF" />
<appender-ref ref="STDOUT" />
</root>
</configuration>
配置到此,项目中的log.info等日志就可以采集到GrayLog服务中了。只需要在logback-spring.xml里配置对GrayLog服务的ip和端口,就可以无缝使用日志采集框架采集日志了。
3.3 过滤日志,采集特定日志
定义日志框架的过滤器,例如只采集loggername为demo记录的日志。
先定义日志过滤器:
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.filter.Filter;
import ch.qos.logback.core.spi.FilterReply;
public class EngineFilter extends Filter<ILoggingEvent> {
@Override
public FilterReply decide(ILoggingEvent iLoggingEvent) {
String loggerName = iLoggingEvent.getLoggerName();
if(loggerName.equals("demo")){//loggerName是demo则采集,否则不采集
return FilterReply.ACCEPT;
}else {
return FilterReply.DENY;
}
}
}
在logback-spring.xml里配置过滤器:
最后,在需要GrayLog采集日志的地方,定义名为demo的日志采集器:
private static final Logger log = LoggerFactory.getLogger("demo");
使用此对象log.info的日志,会采集到GrayLog中。而定义的其他采集器的log.info则不会采集到GrayLog中。
四、自定义日志字段存入GrayLog
通过log.info()方法存入GrayLog中的日志,只能是一个字符串,因为log.info()的参数是String类型。在实际项目中,往往要对日志通过某个字段进行检索或统计,只存一个字符串无法满足这样的要求。
在logback-spring.xml中,可以配置来定义一些静态的字段和固定的值,但是无法满足动态往GrayLog中添加字段和动态设置值的要求。如何满足这个需求成了能否使用GrayLog的关键。
项目中通过引入gelf包来采集日志到GrayLog中,翻看jar包的源码,找到了jar包发送日志到GrayLog的核心代码GelfEncoder的encode()方法,如下:
@Override
public byte[] encode(final ILoggingEvent event) {
final Map<String, Object> additionalFields = new HashMap<>(staticFields);
addFieldMapperData(event, additionalFields, builtInFieldMappers);
addFieldMapperData(event, additionalFields, fieldMappers);
final GelfMessage gelfMessage = new GelfMessage(
originHost,
shortPatternLayout.doLayout(event),
fullPatternLayout.doLayout(event),
event.getTimeStamp(),
LevelToSyslogSeverity.convert(event),
additionalFields
);
final byte[] json = gelfMessageToJson(gelfMessage);
if (appendNewline) {
final byte[] sep = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
final ByteBuffer bb = ByteBuffer.allocate(json.length + sep.length);
bb.put(json);
for (final byte b : sep) {
bb.put(b);
}
bb.flip();
return bb.array();
}
return json;
}
从源码可以看出,源码中有添加字段的豁口:
addFieldMapperData(event, additionalFields, builtInFieldMappers);
addFieldMapperData(event, additionalFields, fieldMappers);
查看addFieldMapperData方法源码:
private void addFieldMapperData(final ILoggingEvent event, final Map<String, Object> additionalFields,
final List<GelfFieldMapper<?>> mappers) {
for (final GelfFieldMapper<?> fieldMapper : mappers) {
try {
fieldMapper.mapField(event, (key, value) -> {
try {
addField(additionalFields, key, value);
} catch (final IllegalArgumentException e) {
addWarn("Could not add field " + key, e);
}
});
} catch (final Exception e) {
addError("Exception in field mapper", e);
}
}
}
可以看到其遍历了mappers参数,然后调用了addField()方法,来动态添加字段。
所以现在的问题转换成如何往mappers里添加字段。
mappers是一个GelfFieldMapper对象的集合。查看gelf包的官方文档,GelfFieldMapper类作者只简单写了几句,并没有提及如何初始化这个类的集合。所以只能自己初始化这个类的集合。
自己初始化的方式就是实现GelfEncoder类,重写encode方法,在encode方法中自定义GelfEncoder类,加入到集合中。代码如下:
public class EngineEncoder extends GelfEncoder {
@Override
public byte[] encode(ILoggingEvent event) {
super.addFieldMapper(new EngineFiledMapper());
return super.encode(event);
}
}
可以看到,重写的encode方法,调用了父类的addFieldMapper方法,往fieldMappers属性中手动添加了一个GelfEncoder对象,剩下的逻辑还是父类的encode方法逻辑。
GelfEncoder是一个接口,需要写其实现类,在实现类中,定义我们要动态添加的字段和值,如下:
public class EngineFiledMapper implements GelfFieldMapper {
@Override
public void mapField(ILoggingEvent event, BiConsumer valueHandler) {
if(event.getLoggerName().equals("demo")){
String message = event.getMessage();
JSONObject jsonObject = JSONObject.parseObject(message);
Set<String> strings = jsonObject.keySet();
for(String key:strings){
valueHandler.accept(key,jsonObject.get(key));
}
}
}
}
上述代码的含义是,在log.info()中,记录一个json格式的字符串,在GelfFieldMapper 中,解析json对象的key和value,然后加入到valueHandler中。
最后,在logback-spring.xml中配置我们定义的encoder类:
所以,在采集特定日志时,log.info中记录一个特定的json对象,json对象里包含我们的自定义字段,然后在GelfFieldMapper 的实现类中,解析这个json,动态添加字段。
最后,在GrayLog中,定义的json字符串,被放入了message字段,json里定义的字段,成了单独的字段存在GrayLog中,如下图:
五、GrayLog大字段日志无法存储问题解决
在项目中发现,存储日志的某个字段值过大时,无法存入GrayLog中。国内帖子并没有此问题的解决方案,通过参考stackoverflow的相关帖子,找到了解决思路,下面记录解决过程。
大字段没有存进去的原因是graylog根据字段往es建立索引时,字段的类型是es自动设定的,graylog中并没有给用户提供设置字段在es中索引类型的接口。这就造成了es里把某些字段设置成了keyword类型,当这个字段存入过大的数据时,超出了keyword类型的限制,就无法存入es里了,GrayLog采集也就失败了。
解决方案是我们手动初始化es中相关字段的索引类型,初始化好之后,再让graylog往es里采集日志。操作步骤是:
查看es中graylog的相关索引:
curl -X GET "localhost:9200/_cat/indices"
graylog相关的索引为:
此时es里的索引字段已经确定类型了,es无法直接修改索引字段的类型,所以,这里先停止graylog服务,把这些索引从es里删除:
curl -X DELETE 'http://localhost:9200/graylog_0'
curl -X DELETE 'http://localhost:9200/gl-events_0'
curl -X DELETE 'http://localhost:9200/gl-system-events_0'
然后,再启动graylog服务,又会自动创建上面三个索引。(**注意:**此时不要让graylog采集日志,否则es又会自动生成索引字段的类型)
然后手动指定es索引自定义字段的类型,graylog会默认生成一些索引的字段,如下:
curl -X GET "localhost:9200/graylog_0/_mapping"
输出:
{
"graylog_0": {
"mappings": {
"dynamic_templates": [{
"internal_fields": {
"match": "gl2_*",
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
},
{
"store_generic": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}],
"properties": {
"full_message": {
"type": "text",
"analyzer": "standard"
},
"gl2_accounted_message_size": {
"type": "long"
},
"gl2_message_id": {
"type": "keyword"
},
"gl2_processing_timestamp": {
"type": "date",
"format": "uuuu-MM-dd HH:mm:ss.SSS"
},
"gl2_receive_timestamp": {
"type": "date",
"format": "uuuu-MM-dd HH:mm:ss.SSS"
},
"message": {
"type": "text",
"analyzer": "standard"
},
"source": {
"type": "text",
"analyzer": "analyzer_keyword",
"fielddata": true
},
"streams": {
"type": "keyword"
},
"timestamp": {
"type": "date",
"format": "uuuu-MM-dd HH:mm:ss.SSS"
}
}
}
}
}
这些字段我们无需关注,我们只手动定义自定义字段的类型,命令如下:
curl -X PUT -H "Content-Type: application/json" -d '{
"properties": {
"input": {
"type": "text",
"analyzer": "standard"
},
"startTime": {
"type": "keyword"
}
}
}' "localhost:9200/graylog_0/_mapping"
我们自定义的input字段,可能存大数据,所以手动指定成text类型。如果让es自动生成,会生成keyword类型。
手动设置好之后,就可以让GrayLog正常采集日志了。
注意:
- 在采集特大日志时,比如图片的base64转码,GrayLog还是无法采集,但是手动往es对应的索引里添加数据是能添加的,这个猜测是GrayLog服务内部处理特大日志有问题导致的,只能通过升级GrayLog版本让其自身解决。
- 在手动设置es的字段类型时,不能设置Object或嵌套类型,即使我们的自定义字段是map类型或者复杂类型,es里也无法设置Object类型。原因是graylog往es里存数据时,会把值改为String类型。所以即使我们程序中定义的一个字段为map类型,graylog也会将其转成String类型,这也是graylog设计缺陷的一方面。
六、GrayLog的查询和统计接口
项目中用到了日志的查询和统计接口,这里记录一下使用方式。
官方参考地址
定义工具类如下:
public class QueryLogUtils {
private static final String SEARCH_MESSAGE = "/api/search/messages";
private static final String SEARCH_AGGREGATE = "/api/search/aggregate";
//查询日志
public static GrayLogQueryResponse queryLog(String url,String username,String password,GrayLogQueryParam grayLogQueryParam){
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.add("X-Requested-By","cli");
headers.add("Accept","application/json");
headers.add("Content-Type","application/json");
headers.setBasicAuth(username, password);
HttpEntity<GrayLogQueryParam> requestEntity = new HttpEntity<>(grayLogQueryParam,headers);
restTemplate.getInterceptors().add(new BasicAuthenticationInterceptor(username, password));
ResponseEntity<Map> response = restTemplate.exchange(url+SEARCH_MESSAGE, HttpMethod.POST, requestEntity, Map.class);
Map responseBody = response.getBody();
GrayLogQueryResponse grayLogQueryResponse = JSONObject.parseObject(JSON.toJSONString(responseBody), GrayLogQueryResponse.class);
return grayLogQueryResponse;
}
//统计日志
public static GrayLogQueryResponse queryAggregate(String url,String username,String password,GrayLogAggregateParam grayLogAggregateParam){
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.add("X-Requested-By","cli");
headers.add("Accept","application/json");
headers.add("Content-Type","application/json");
headers.setBasicAuth(username, password);
HttpEntity<GrayLogAggregateParam> requestEntity = new HttpEntity<>(grayLogAggregateParam,headers);
restTemplate.getInterceptors().add(new BasicAuthenticationInterceptor(username, password));
ResponseEntity<Map> response = restTemplate.exchange(url+SEARCH_AGGREGATE, HttpMethod.POST, requestEntity, Map.class);
Map responseBody = response.getBody();
GrayLogQueryResponse grayLogQueryResponse = JSONObject.parseObject(JSON.toJSONString(responseBody), GrayLogQueryResponse.class);
return grayLogQueryResponse;
}
}
封装查询参数实体类:
/**
*
* 参数样例:
* {
* "query": "uid:f2f2d454ce99d3c0a575d78088e7753f",
* "fields": [
* "input",
* "startTime"
* ],
* "from": 2,
* "size": 15,
* "timerange": {
* "type": "keyword",
* "keyword": "last 1000 minutes"
* },
* "sort": "startTime",
* "sort_order": "asc"
* }
* @date 2024/1/39:44
*/
@Data
public class GrayLogQueryParam {
private String query; //查询条件
private List<String> fields; //查询字段
private Integer from;//从第几条开始
private Integer size;//到第几条结束
private Map<String,String> timerange; //时间范围条件
private String sort; //排序字段
private String sort_order; //排序类型
public static class Builder {
private String query;
private List<String> fields=new ArrayList<>();
private Integer from;
private Integer size;
private Map<String, String> timerange;
private String sort;
private String sort_order;
public Builder query(String query) {
this.query = query;
return this;
}
public Builder addField(String field) {
this.fields.add(field);
return this;
}
public Builder from(Integer from) {
this.from = from;
return this;
}
public Builder size(Integer size) {
this.size = size;
return this;
}
public Builder timerange(String key,String value) {
if(this.timerange==null){
this.timerange = new HashMap<>();
}
this.timerange.put(key,value);
return this;
}
public Builder sort(String sort) {
this.sort = sort;
return this;
}
public Builder sort_order(String sort_order) {
this.sort_order = sort_order;
return this;
}
public GrayLogQueryParam build() {
return new GrayLogQueryParam(this);
}
}
public static Builder builder() {
return new Builder();
}
private GrayLogQueryParam(Builder builder) {
this.query = builder.query;
this.fields = builder.fields;
this.from = builder.from;
this.size = builder.size;
this.timerange = builder.timerange;
this.sort = builder.sort;
this.sort_order = builder.sort_order;
}
}
封装统计参数实体类(按某几个字段进行count统计):
/**
* 官方文档参考地址: https://go2docs.graylog.org/5-2/making_sense_of_your_log_data/simple_search_scripting_api.htm?tocpath=Searching%20Your%20Log%20Data%7C_____6
* 参数样例:
* {
* "query": "sceneId:1",
* "group_by": [
* {
* "field": "id"
* },
* {
* "field": "xxx"
* },
* {
* "field": "xxx"
* }
* ],
* "metrics": [
* {
* "function": "count",
* "field": "id"
* }
* ]
* }
*/
@Data
public class GrayLogAggregateParam {
private String query; //查询条件
private List<Map> group_by; //分组统计字段
private Map<String,String> timerange; //时间范围条件
private List<Map> metrics; //分组函数,average, count, latest, max, min, percentile, stdDev, sum, sumOfSquares, variance.
public static class Builder {
private String query;
private List<Map> group_by=new ArrayList<>();
private Map<String, String> timerange;
private List<Map> metrics = new ArrayList<>();
public Builder query(String query) {
this.query = query;
return this;
}
public Builder addGroupBy(String field) {
Map<String,String> map = new HashMap<>();
map.put("field",field);
//定义统计条数
map.put("limit","10000");
this.group_by.add(map);
return this;
}
public Builder timerange(String key,String value) {
if(this.timerange==null){
this.timerange = new HashMap<>();
}
this.timerange.put(key,value);
return this;
}
/**
*
* @param function 函数,avg,count,sum等
* @param field 函数统计的字段
* @return
*/
public Builder addMetrics(String function,String field) {
Map<String,String> map = new HashMap<>();
map.put("function",function);
map.put("field",field);
this.metrics.add(map);
return this;
}
public GrayLogAggregateParam build() {
return new GrayLogAggregateParam(this);
}
}
public static Builder builder() {
return new Builder();
}
private GrayLogAggregateParam(Builder builder) {
this.query = builder.query;
this.group_by = builder.group_by;
this.timerange = builder.timerange;
this.metrics = builder.metrics;
}
}
封装GrayLog API返回值实体类:
/**
* 日志查询接口响应格式如下;
* {
* "schema": [
* {
* "column_type": "field",
* "type": "string",
* "field": "uid",
* "name": "field: uid"
* },
* {
* "column_type": "field",
* "type": "string",
* "field": "input",
* "name": "field: input"
* },
* {
* "column_type": "field",
* "type": "string",
* "field": "startTime",
* "name": "field: startTime"
* }
* ],
* "datarows": [
* [
* "f2f2d454ce99d3c0a575d78088e7753f",
* "{\"nextNode\":\"2\"}",
* "2024-01-02 18:15:06"
* ]
* ],
* "metadata": {
* "effective_timerange": {
* "from": "2023-12-30T15:57:02.330Z",
* "to": "2024-01-03T03:17:02.331Z",
* "type": "absolute"
* }
* }
* }
*
*
* 日志统计接口响应如下:
* {
* "schema": [
* {
* "column_type": "grouping",
* "type": "string",
* "field": "id",
* "name": "grouping: id"
* },
* {
* "column_type": "grouping",
* "type": "string",
* "field": "xxx",
* "name": "grouping: xxx"
* },
* {
* "column_type": "grouping",
* "type": "string",
* "field": "xxx",
* "name": "grouping: xxx"
* },
* {
* "column_type": "metric",
* "type": "numeric",
* "function": "count",
* "field": "id",
* "name": "metric: count(id)"
* }
* ],
* "datarows": [
* [
* "1",
* "B_1",
* "2",
* 11
* ],
* [
* "2",
* "B_B_1",
* "33",
* 5
* ],
* [
* "2",
* "B_B_13",
* "17",
* 3
* ],
* [
* "2",
* "B_B_15",
* "8",
* 3
* ],
* [
* "17",
* "xxx",
* "19",
* 3
* ],
* [
* "8",
* "xxx",
* "-1",
* 3
* ],
* [
* "19",
* "xxx",
* "21",
* 2
* ],
* [
* "21",
* "xxx",
* "22",
* 2
* ],
* [
* "22",
* "xxx",
* "24",
* 2
* ],
* [
* "24",
* "xxx",
* "26",
* 2
* ]
* ],
* "metadata": {
* "effective_timerange": {
* "from": "2024-01-02T10:57:35.975Z",
* "to": "2024-01-03T10:57:35.975Z",
* "type": "absolute"
* }
* }
* }
*
* @date 2024/1/311:10
*/
@Data
public class GrayLogQueryResponse {
private List<Schema> schema;
private List<List<Object>> datarows;
//解析schema和datarows,返回map对象的日志节点数据
public List logList(){
List list = new ArrayList<>();
List<Schema> schemaList = this.schema;
List<List<Object>> datarows = this.datarows;
for(List<Object> row:datarows){
Map<String,Object> map = new HashMap<>();
for(int i=0;i<row.size();i++){
Schema schema = schemaList.get(i);
String field = schema.getField();
map.put(field,row.get(i));
}
String input = map.get("input").toString();
if(!input.equals("-")){//-代表没有值
inputMap = JSON.parseObject(input, Map.class);
}
list.add(inputMap)
return list;
}
//获取日志统计对象列表
public List<ComponentNodeAggregate> aggregateList(){
List<ComponentNodeAggregate> list = new ArrayList<>();
List<Schema> schemaList = this.schema;
List<List<Object>> datarows = this.datarows;
for(List<Object> row:datarows){
Map<String,Object> map = new HashMap<>();
for(int i=0;i<row.size();i++){
Schema schema = schemaList.get(i);
String field = schema.getField();
String column_type = schema.getColumn_type();
if("metric".equals(column_type)){//统计字段,填充count属性
map.put("count",row.get(i));
}else{
map.put(field,row.get(i));
}
}
ComponentNodeAggregate componentNodeAggregate = JSON.parseObject(JSON.toJSONString(map), ComponentNodeAggregate.class);
list.add(componentNodeAggregate);
}
return list;
}
@Data
public class Schema{
private String column_type;
private String type;
private String field;
private String name;
}
}
封装统计返回值对象:
@Data
public class ComponentNodeAggregate {
private String id;
private String xxx;
private String xxx;
private Integer count;
}
注意: 在查询graylog时,遵循es的查询语法,具体可参考:官网地址
最后
logback-gelf的官网地址