文章目录
- 全文检索
- 任务描述
- 技术难点
- 任务目标
- 实现过程
- 1. java读取Json文件,并导入MySQL数据库中
- 2. 利用Logstah完成MySQL到ES的数据同步
- 3. 开始编写功能接口
- 3.1 全文检索接口
- 3.2 查询详情
- 4. 前端调用
全文检索
任务描述
- 在获取到数据之后如何在ES中进行数据建模,以方便之后搜索接口的实现
- 接下来,要考虑的问题是,如何实现MySQL和ES的数据同步
- 接下来是技术实现,要如何实现基于关键词进行全文检索和对于某一条数据的查询详情
- 在接口实现之后,前端调用后端暴露的接口来进行数据获取,并在页面进行展示
技术难点
- 数据同步
- ES的检索的实现
- 精确定位MySQL表中的数据
任务目标
- 根据关键词进行全文检索
- 查询详情
实现过程
1. java读取Json文件,并导入MySQL数据库中
public List<Workticket> getWorkticket(){
ObjectMapper objectMapper = new ObjectMapper();
List<Workticket> jsonObjects = null;
try {
jsonObjects = objectMapper.readValue(new File("D:\\data_hanchuan\\workticket.json"), List.class);
} catch (IOException e) {
e.printStackTrace();
}
return jsonObjects;
}
上述代码将json文件的数据封装成对象,然后调用MP的批量增加方法(deviceService.saveBatch(list);),将其添加到hanchuan数据库中
2. 利用Logstah完成MySQL到ES的数据同步
【注】Logstash、ES以及Kibana必须版本一致
主要参考logstash这篇博客,完成从MySQL到ES的数据同步。下面是其中的一张表的 .conf文件(几张表就对应几个conf文件)
input {
stdin {}
jdbc {
type => "jdbc"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://localhost:3306/hanchuan?characterEncoding=UTF-8&autoReconnect=true&allowPublicKeyRetrieval=true"
# 数据库连接账号密码;
jdbc_user => "root"
jdbc_password => "root"
# MySQL依赖包路径;
jdbc_driver_library => "D:\apply_soft\elasticsearch_all_soft\logstash-7.6.1\bin\result\mysql-connector-j-8.0.31.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库重连尝试次数
connection_retry_attempts => "3"
# 判断数据库连接是否可用,默认false不开启
jdbc_validate_connection => "true"
# 数据库连接可用校验超时时间,默认3600S
jdbc_validation_timeout => "3600"
# 开启分页查询(默认false不开启);
jdbc_paging_enabled => "true"
# 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值);
jdbc_page_size => "3000"
# statement为查询数据sql,如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径;
# sql_last_value为内置的变量,存放上次查询结果中最后一条数据tracking_column的值,此处即为ModifyTime;
# statement_filepath => "mysql/jdbc.sql"
statement => "SELECT id,defective_appearance,leakage_type,anbiao1,subsystem,duty_group,anbiao2,defective_why,
elimination_person,department,accept_describe,accept_group from defect where id > :sql_last_value order by id desc"
# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
lowercase_column_names => false
# Value can be any of: fatal,error,warn,info,debug,默认info;
sql_log_level => warn
#
# 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
record_last_run => true
# 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;
use_column_value => true
# 需要记录的字段,用于增量同步,需是数据库字段
tracking_column => "id"
# record_last_run上次数据存放位置;
last_run_metadata_path => "result/defect/last_id.txt"
# 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
clean_run => false
#
# 同步频率(分 时 天 月 年),默认每分钟同步一次;
schedule => "* * * * *"
}
}
filter {
mutate {
//挑选其中的一个字段充当title字段
rename => {"defective_appearance" => "title"}
//将其id值设置为”数据库表名001_id“ 方便之后查询详情接口的实现
update => {"id" => "defect001_%{id}"}
// 将其他字段填充到message字段当中
add_field => {
"message" =>["%{title};%{leakage_type};%{anbiao1};%{subsystem};%{duty_group};%{anbiao2};%{defective_why};%{elimination_person};%{department};%{accept_describe};%{accept_group};"]
}
//将多余字段删除,使表的结构始终呈现为{id,title,message}形式
remove_field => ["leakage_type","anbiao1","subsystem","duty_group","anbiao2","defective_why","elimination_person","department","accept_describe","accept_group"]
}
}
output {
elasticsearch {
# host => "192.168.1.1"
# port => "9200"
# 配置ES集群地址
hosts => ["localhost:9200"]
# 索引名字,必须小写
index => "hanchuan001"
}
stdout {
codec => json_lines
}
}
最终我们ES中的数据结构就是下面这个样子
3. 开始编写功能接口
3.1 全文检索接口
@Override
public MetaTotal searchAllHighLight(String msg, int pageNo, int pageSize) throws IOException {
if (pageNo <= 1) {
pageNo = 1;
}
SearchRequest request = new SearchRequest(resultIndex);
// 进行搜索
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.matchQuery("message", msg))
.should(QueryBuilders.matchQuery("title", msg));
// sourceBuilder.size(2000);
// 分页
sourceBuilder.from(pageNo);
sourceBuilder.size(pageSize);
// 进行高亮设置
HighlightBuilder highlightBuilder = new HighlightBuilder();
HighlightBuilder.Field field = new HighlightBuilder.Field("message")
.preTags("<span style='color:red'>")
.postTags("</span>");
HighlightBuilder.Field field1 = new HighlightBuilder.Field("title")
.preTags("<span style='color:red'>")
.postTags("</span>");
highlightBuilder.field(field).field(field1);
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.highlighter(highlightBuilder);
// 加入到request中
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
List<Meta> list = new ArrayList<>();
for (SearchHit hit : response.getHits().getHits()) {
//----进行高亮字段的替换
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
HighlightField message = highlightFields.get("message");
HighlightField title = highlightFields.get("title");
// 未高亮之前的结果
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
// 1.找到message中出现关键字的地方进行高亮替换
if (message != null) {
Text[] fragments = message.fragments();
String n_mess = "";
for (Text text : fragments) {
n_mess += text;
}
sourceAsMap.put("message", n_mess);
}
// 2.找到title中出现关键字的地方进行高亮替换
if (title != null) {
Text[] fragments = title.fragments();
String n_title = "";
for (Text text : fragments) {
n_title += text;
}
sourceAsMap.put("title", n_title);
}
//----结束----
Meta meta = new Meta();
try {
BeanUtils.populate(meta, hit.getSourceAsMap());
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
list.add(meta);
}
MetaTotal metas = new MetaTotal();
metas.setList(list);
metas.setTotal(response.getHits().getTotalHits().value);
System.out.println(metas.getTotal() + "总记录数");
return metas;
}
在业务逻辑代码写好之后在控制层暴露接口
@ResponseBody
@GetMapping("/search/{keyword}/{pageNo}/{pageSize}")
public Result searchByMsg(@PathVariable String keyword,
@PathVariable int pageNo,
@PathVariable int pageSize) throws IOException {
MetaTotal metas = service.searchAllHighLight(keyword,pageNo,pageSize);
Page<Meta> page = new Page<>(pageNo,pageSize);
page.setRecords(metas.getList());
page.setTotal(metas.getTotal());
return new Result().code(200).message("查询成功").data("list",metas.getList()).data("total",metas.getTotal());
}
3.2 查询详情
根据前端传递的id值,进行解析,找到对应的数据库表,进行详情查看。
@RequestMapping("/details/{id}")
@ResponseBody
public Result look_details2(@PathVariable("id") String id, Map<String,Object> map){
String[] str = id.split("001_");
if (str[0].equals("defect")){
Defect defect = defectService.getById(str[1]);
return new Result().code(200).message("详情结果").data("details",defect);
} else if (str[0].equals("device")) {
Device device = deviceService.getById(str[1]);
return new Result().code(200).message("详情结果").data("details",device);
} else if (str[0].equals("riskcontroller")) {
Riskcontroller riskcontroller = riskControllerService.getById(str[1]);
return new Result().code(200).message("详情结果").data("details",riskcontroller);
}else if (str[0].equals("security")) {
Security security = securityService.getById(str[1]);
return new Result().code(200).message("详情结果").data("details",security);
}else if (str[0].equals("workticket")) {
Workticket workticket = workticketService.getById(str[1]);
return new Result().code(200).message("详情结果").data("details",workticket);
}
return new Result().code(500).message("查询失败");
}
4. 前端调用