背景
之前已简单使用ES及Kibana和在线转Base64工具实现了检索文档的demo,并已实现WebHook的搭建和触发流程接口。
实现读取本地文件入库ES
总体思路:基于前面已经搭建的WebHook触发流程,接收到push更新消息之后,使用本地的git工具拉取最新变动。这些文件与我们的ES应用在同一台机器上,然后Java可以读取这些文件转码并交给ES处理。
我们先处理核心部分,也就是使用Java读取各种文档,如PDF、Word、txt等格式的文件解析并在ES中创建索引。
文件属性类
根据自己的需要,文件属性应至少包括文件名、文件类型、作者等字段,由于目标是可以通过浏览器页面直接打开文件,则需要包含文件的网络url(注意不是本地url地址)。
import lombok.Data;
@Data
public class FileSource {
private String title;
private String summary;
private String fileType;
private String fileUrl;
private String content;
private String author;
private String fileVersion;
private String createDate;
}
使用Data注解可以自动生成Get、Set方法,不用自己复制粘贴了。
写入流程的实现
- 使用tika库自动获取文件类型
public static String getFileTypeByDefaultTika(String filePathUrl) throws IOException, URISyntaxException {
// 从 URL 创建一个 File 对象
File file = new File(new URL("file:///" + filePathUrl).toURI());
// 使用 Tika 来检测文件的 MIME 类型
Tika tika = new Tika();
MediaType mediaType = MediaType.parse(tika.detect(file));
// 从 MIME 类型中提取文件的基本类型(如 pdf、image、video 等)
String fileType = mediaType.getSubtype();
return fileType;
}
- 根据文件类型判断排除音视频类文件
String fileType = getFileTypeByDefaultTika(pathUrl);
if (!fileType.contains("video")
&& !fileType.contains("image")
&& !"application/zip".equals(fileType)) {
……
}
- 解析文件内容为Base64
public static String FileToBase64(String filePath) throws IOException {
byte[] fileContent = Files.readAllBytes(Paths.get(filePath));
return Base64.getEncoder().encodeToString(fileContent);
}
- 调用ES客户端进行写入,包括管道预处理文档
source.setFileType(fileType);
String base64 = FileToBase64(pathUrl);
source.setContent(base64);
String body = JSON.toJSONString(source);
IndexRequest indexRequest = new IndexRequest().index("docwrite")
.source(body, XContentType.JSON)
.setPipeline("attachment") //上传时使用attachment pipline进行提取文件
.timeout(TimeValue.timeValueMinutes(10));
client.index(indexRequest, RequestOptions.DEFAULT);
这段代码是关于Elasticsearch的操作,具体是将一个文件转换为Base64格式,然后将其内容索引到Elasticsearch的指定索引中。
以下是对这段代码的详细解释:
source.setFileType(fileType);
- 这行代码为
source
对象设置一个文件的MIME类型或扩展名。
- 这行代码为
String base64 = FileToBase64(pathUrl);
- 调用
FileToBase64
函数,它接受一个文件路径,然后返回该文件的Base64编码内容。 pathUrl
是一个文件的本地路径或URL。- 结果的Base64编码字符串存储在
base64
变量中。
- 调用
source.setContent(base64);
- 将上述得到的Base64编码字符串设置为
source
对象的内容。
- 将上述得到的Base64编码字符串设置为
String body = JSON.toJSONString(source);
- 使用Fastjson将
source
对象转换为JSON格式的字符串。 - 这个JSON字符串存储在
body
变量中。
- 使用Fastjson将
IndexRequest indexRequest = new IndexRequest().index("docwrite")
- 创建一个新的
IndexRequest
对象,这是Elasticsearch Java客户端用于索引文档的请求对象。 - 指定索引的名称为"docwrite"。
- 创建一个新的
.source(body, XContentType.JSON)
- 设置请求体的内容为上面创建的
body
JSON字符串。 XContentType.JSON
表示请求体的内容类型是JSON。
- 设置请求体的内容为上面创建的
setPipeline("attachment")
- 为此索引请求设置一个pipeline,名为"attachment"。在Elasticsearch中,pipeline通常用于在索引文档之前对其进行某种处理或转换。在这里,它可能是为了处理或提取附件的内容。
.timeout(TimeValue.timeValueMinutes(10));
- 为此索引请求设置一个10分钟的超时时间。如果在这10分钟内请求未完成,它可能会超时。
client.index(indexRequest, RequestOptions.DEFAULT);
- 使用Elasticsearch客户端的
index
方法发送上面创建的indexRequest
。
调试过程出现SpringBoot启动报错实例化es客户端相关的错误:
Error creating bean with name 'elasticsearchRestHighLevelClient' defined in class
。
解决办法是添加如下的maven依赖吗,并将es客户端版本提高到7.15:
<!-- Spring Boot Elasticsearch Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
测试索引流程运行
curl -XPOST -H "Content-Type: application/json" -d '{"key1":"value1", "key2":"value2"}' http://localhost:8080/gitbucket/webhook
服务端打印收到的消息,没有报错,证明流程正常:
返回:我收到推送消息啦!
在Kibana查询ES中是否存在包含“License”的文件内容:
GET /docwrite/_search
{
"query": {
"match": {
"attachment.content": {
"query": "License",
"analyzer": "ik_smart"
}
}
}
}
结果可以正确返回:
至此,后端ES索引流程基本完成了。
后续思考
后续需要实现的是从webhook消息中识别有效信息,使用git工作流获取更新,对新增文件进行上述索引流程。需要优化的是索引文件的属性尚不完整,文件的版本如何区分,以免重复录入文件,文件删除时是否从ES索引中删除等等这些流程。