1. elasticsearch准备
1.1 拼音分词器
github地址:https://github.com/infinilabs/analysis-pinyin/releases?page=6
必须与elasticsearch的版本相同
第四步,重启es
docker restart es
1.2 定义索引库
PUT /app_info_article
{
"settings": {
"analysis": {
"analyzer": {
"text_anlyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings":{
"properties":{
"id":{
"type":"long"
},
"publishTime":{
"type":"date"
},
"layout":{
"type":"integer"
},
"images":{
"type":"keyword",
"index": false
},
"staticUrl":{
"type":"keyword",
"index": false
},
"authorId": {
"type": "long"
},
"authorName": {
"type": "text"
},
"title":{
"type":"text",
"analyzer":"text_anlyzer",
"search_analyzer": "ik_max_word",
"copy_to": "all"
},
"content":{
"type":"text",
"analyzer":"text_anlyzer",
"search_analyzer": "ik_max_word",
"copy_to": "all"
},
"all":{
"type": "text",
"analyzer": "ik_max_word"
},
"suggestion":{
"type": "completion",
"analyzer": "completion_analyzer"
}
}
}
}
1.3 给索引库添加文档
详情参考我的另一篇博客: xxljob分片广播+多线程实现高效定时同步elasticsearch索引库
app_info_article对应的pojo类
@Data
public class SearchArticleVo {
// 文章id
private Long id;
// 文章标题
private String title;
// 文章发布时间
private Date publishTime;
// 文章布局
private Integer layout;
// 封面
private String images;
// 作者id
private Long authorId;
// 作者名词
private String authorName;
//静态url
private String staticUrl;
//文章内容
private String content;
//状态
private int enable;
//单词自动补全
private List<String> suggestion;
public void initSuggestion(){
suggestion = new ArrayList<String>();
suggestion.add(this.title);
suggestion.add(this.authorName);
}
}
核心代码
@XxlJob("syncIndex")
public void syncIndex() {
//1、获取任务传入的参数 {"minSize":100,"size":10}
String jobParam = XxlJobHelper.getJobParam();
Map<String,Integer> jobData = JSON.parseObject(jobParam,Map.class);
int minSize = jobData.get("minSize"); //分片处理的最小总数据条数
int size = jobData.get("size"); //分页查询的每页条数 小分页
//2、查询需要处理的总数据量 total=IArticleClient.searchTotal()
Long total = articleClient.searchTotal();
//3、判断当前分片是否属于第1片,不属于,则需要判断总数量是否大于指定的数据量[minSize],大于,则执行任务处理,小于或等于,则直接结束任务
int cn = XxlJobHelper.getShardIndex(); //当前节点的下标
if(total<=minSize && cn!=0){
//结束
return;
}
//4、执行任务 [index-范围] 大的分片分页处理
//4.1:节点个数
int n = XxlJobHelper.getShardTotal();
//4.2:当前节点处理的数据量
int count = (int) (total % n==0? total/n : (total/n)+1);
//4.3:确定当前节点处理的数据范围
//从下标为index的数据开始处理 limit #{index},#{count}
int indexStart = cn*count;
int indexEnd = cn*count+count-1; //最大的范围的最后一个数据的下标
//5.小的分页查询和批量处理
int index =indexStart; //第1页的index
System.out.println("分片个数是【"+n+"】,当前分片下标【"+cn+"】,处理的数据下标范围【"+indexStart+"-"+indexEnd+"】");
do {
//=============================================小分页================================
//5.1:分页查询
//5.2:将数据导入ES
push(index,size,indexEnd);
//5.3:是否要查询下一页 index+size
index = index+size;
}while (index<=indexEnd);
}
/**
* 数据批量导入
* @param index
* @param size
* @param indexEnd
* @throws IOException
*/
public void push(int index,int size,int indexEnd) {
pool.execute(()->{
System.out.println("当前线程处理的分页数据是【index="+index+",size="+(index+size>indexEnd? indexEnd-index+1 : size)+"】");
//1)查询数据库数据
List<SearchArticleVo> searchArticleVos = articleClient.searchPage(index, index+size>indexEnd? indexEnd-index+1 : size); //size可能越界
// 第1页 index=0
// indexEnd=6
// 第2页 index=5
// indexEnd-index+=2
//2)创建BulkRequest - 刷新策略
BulkRequest bulkRequest = new BulkRequest()
//刷新策略-立即刷新
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (SearchArticleVo searchArticleVo : searchArticleVos) {
//A:创建XxxRequest
searchArticleVo.initSuggestion();
IndexRequest indexRequest = new IndexRequest("app_info_article")
//B:向XxxRequest封装DSL语句数据
.id(searchArticleVo.getId().toString())
.source(com.alibaba.fastjson.JSON.toJSONString(searchArticleVo), XContentType.JSON);
//3)将XxxRequest添加到BulkRequest
bulkRequest.add(indexRequest);
}
//4)使用RestHighLevelClient将BulkRequest添加到索引库
if(searchArticleVos!=null && searchArticleVos.size()>0){
try {
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
在xxl-job任务调度平台执行一次该任务,文档就被添加进去了
如图
1.4 自动补全查询
// 自动补全查询
GET /test/_search
{
"suggest": {
"title_suggest": { //设置这个自动查询操作的名称
"text": "java", // 关键字
"completion": {
"field": "suggestion", // 补全查询的字段名
"skip_duplicates": true, // 跳过重复的
"size": 10 // 获取前10条结果
}
}
}
}
示例1.
示例2.
2. 代码流程
2.1 核心业务代码
AssociateController
@RestController
@RequestMapping(value = "/api/v1/associate")
public class AssociateController {
@Autowired
private AssociateService associateService;
/***
* 单词自动补全
*/
@PostMapping(value = "/search")
public ResponseResult search(@RequestBody UserSearchDto dto) throws IOException {
return associateService.search(dto);
}
}
核心search方法
@Autowired
private RestHighLevelClient restHighLevelClient;
/***
* 单词自动补全
* @param dto
* @return
*/
@Override
public ResponseResult search(UserSearchDto dto) throws IOException {
//1)新建一个SearchRequest
SearchRequest request = new SearchRequest("app_info_article");
//2)创建一个单词自动补全配置 Suggest,给它取个别名
request.source().suggest(
new SuggestBuilder()
.addSuggestion(
//给它取个别名
"article_suggest",
SuggestBuilders
//指定查询的字段
.completionSuggestion("suggestion")
//去重
.skipDuplicates(true)
//搜索的前缀
.prefix(dto.getSearchWords())
.size(10)
)
);
//4)执行搜索
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
//5)解析结果集
CompletionSuggestion suggests = response.getSuggest().getSuggestion("article_suggest");
//List
List<Map<String,String>> options = new ArrayList<Map<String,String>>();
for (CompletionSuggestion.Entry.Option option : suggests.getOptions()) {
Map<String,String> dataMap = new HashMap<String,String>();
dataMap.put("associateWords",option.getText().toString());
options.add(dataMap);
}
return ResponseResult.okResult(options);
}
结果集解析
2.2 测试
请求url:http://127.0.0.1:8801/app/search/api/v1/associate/search/
其中/app/search为nginx和gateway处理过
-
测试1
-
测试2
ps:联想词中的蓝色高亮是前端处理的。 -
测试3