一、spring-data-elasticsearch 引入es版本适配
二、jar升级
- 在项目工程根pom.xml文件中增加maven依赖管理在这里插入图片描述
<properties>
<elasticsearch.spring.version>4.2.0</elasticsearch.spring.version>
<elasticsearch.version>7.12.0</elasticsearch.version>
<spring.version>5.3.26</spring.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
<version>${elasticsearch.spring.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-analysis-mmseg</artifactId>
<version>5.2.0</version>
</dependency>
<!-- Spring Framework start -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-orm</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-expression</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- Spring Framework end -->
</dependencies>
</dependencyManagement>
再在项目使用maven依赖树命令,查看依赖是否更新成功
mvn dependency:tree >tree.txt
特别的 es7.10需要升级httpclient
<!-- httpclient 依赖-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.10</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.12</version>
</dependency>
三、 工具类封装
3.1 ElasticsearchConfig es7 RestHighLevelClient 配置
/**
* es7 RestHighLevelClient 配置
*
* @author yixiaoqun
* @date 2023/7/26
* @Copyright 深圳立创电子商务有限公司
*/
@Configuration
@ComponentScan(basePackages = "com.lcsc.zoverseas.service.es.config")
public class ElasticsearchConfig {
private final static Logger logger = LoggerFactory.getLogger(ElasticsearchConfig.class);
@Value("${elastic.host}")
private String esHost;
@Value("${elastic.scheme}")
private String esScheme;
@Value("${elastic.port}")
private Integer esPort;
@Value("${elastic.username}")
private String esUserName;
@Value("${elastic.password}")
private String esUserPassword;
@Value("${elastic.connect.timeout}")
private Integer connectTimeout;
@Value("${elastic.socket.timeout}")
private Integer socketTimeout;
@Bean
public RestHighLevelClient restHighLevelClient() {
//es验证账号密码
final CredentialsProvider provider = new BasicCredentialsProvider();
//填写账号密码
provider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(esUserName, esUserPassword));
return new RestHighLevelClient(
RestClient.builder(new HttpHost(esHost, esPort, esScheme))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(provider);
}
})
.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(
RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder.setConnectTimeout(null==connectTimeout?10000:connectTimeout) // 连接超时(默认为1秒)
.setSocketTimeout(null==socketTimeout?60000:socketTimeout);// 套接字超时(默认为30秒)
}
}));
}
}
3.2 ElasticsearchOperateServiceImpl
es7查询服务类
/**
* es7 操作实现
*
* @author yixiaoqun
* @date 2023/7/26
* @Copyright 深圳立创电子商务有限公司
*/
@Service
public class ElasticsearchOperateServiceImpl implements ElasticsearchOperateService {
private final static Logger logger = LoggerFactory.getLogger(ElasticsearchOperateServiceImpl.class);
@Autowired
private RestHighLevelClient restHighLevelClient;
//批量操作的对象
private static BulkProcessor bulkProcessor;
private BulkProcessor createBulkProcessor() {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.error("1. 【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
if (!response.hasFailures()) {
logger.error("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
} else {
BulkItemResponse[] items = response.getItems();
for (BulkItemResponse item : items) {
if (item.isFailed()) {
logger.error("2. 【afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
break;
}
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
List<DocWriteRequest<?>> requests = request.requests();
List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
logger.error("3. 【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure);
}
};
BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
}), listener);
//到达10000条时刷新
builder.setBulkActions(10000);
//内存到达8M时刷新
builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
//设置的刷新间隔10s
builder.setFlushInterval(TimeValue.timeValueSeconds(10));
//设置允许执行的并发请求数。
builder.setConcurrentRequests(8);
//设置重试策略
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
return builder.build();
}
/**
* 创建索引
*
* @param index 索引
* @return
*/
@Override
public boolean createIndex(String index) throws IOException {
if (isIndexExist(index)) {
logger.error("Index is exits!");
return false;
}
//1.创建索引请求
CreateIndexRequest request = new CreateIndexRequest(index);
//2.执行客户端请求
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
logger.error("创建索引{}成功", index);
return response.isAcknowledged();
}
/**
* 删除索引
*
* @param index
* @return
*/
@Override
public boolean deleteIndex(String index) throws IOException {
if (!isIndexExist(index)) {
logger.error("Index is not exits!");
return false;
}
//删除索引请求
DeleteIndexRequest request = new DeleteIndexRequest(index);
//执行客户端请求
AcknowledgedResponse delete = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
logger.error("删除索引{}成功", index);
return delete.isAcknowledged();
}
/**
* 判断索引是否存在
*
* @param index
* @return
*/
@Override
public boolean isIndexExist(String index) throws IOException {
GetIndexRequest request = new GetIndexRequest(index);
return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
}
/**
* 数据添加,正定ID
*
* @param jsonObject 要增加的数据
* @param index 索引,类似数据库
* @param id 数据ID, 为null时es随机生成
* @return
*/
@Override
public String addData(JSONObject jsonObject, String index, String id) throws IOException {
//创建请求
IndexRequest request = new IndexRequest(index);
//规则 put /test_index/_doc/1
request.id(id);
request.timeout(TimeValue.timeValueSeconds(1));
//将数据放入请求 json
request.source(jsonObject, XContentType.JSON);
//客户端发送请求
IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
logger.error("添加数据成功 索引为: {}, response 状态: {}, id为: {}", index, response.status().getStatus(), response.getId());
return response.getId();
}
/**
* 数据添加 随机id
*
* @param jsonObject 要增加的数据
* @param index 索引,类似数据库
* @return
*/
@Override
public String addData(JSONObject jsonObject, String index) throws IOException {
return addData(jsonObject, index, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
}
/**
* 通过model添加数据至索引
*
* @param productModel
* @param index
* @throws IOException
*/
@Override
public void addDataByModel(ProductModel productModel, String index) throws IOException {
IndexRequest indexRequest = new IndexRequest("test_demo");
indexRequest.source(JSON.toJSONString(productModel), XContentType.JSON);
indexRequest.timeout(TimeValue.timeValueSeconds(1));
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
//数据为存储而不是更新
indexRequest.create(false);
indexRequest.id(productModel.getSourcePartId() + "");
restHighLevelClient.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
logger.error("将id为:{}的数据存入ES时存在失败的分片,原因为:{}", indexRequest.id(), failure.getCause());
}
}
}
@Override
public void onFailure(Exception e) {
logger.error("{}:存储es时异常,数据信息为", indexRequest.id(), e);
}
});
}
/**
* 通过model批量添加数据至索引
*
* @param productModels
* @param index
* @throws IOException
*/
@Override
public void addDataBatchByModel(List<ProductModel> productModels, String index) throws IOException {
if (null == bulkProcessor) {
bulkProcessor = createBulkProcessor();
}
List<IndexRequest> indexRequests = new ArrayList<>();
productModels.forEach(e -> {
IndexRequest request = new IndexRequest(index);
//填充id
request.id(e.getSourcePartId() + "");
//先不修改id
request.source(JSON.toJSONString(e), XContentType.JSON);
request.opType(DocWriteRequest.OpType.CREATE);
indexRequests.add(request);
});
indexRequests.forEach(bulkProcessor::add);
}
/**
* 通过ID删除数据
*
* @param index 索引,类似数据库
* @param id 数据ID
*/
@Override
public DeleteResponse deleteDataById(String index, String id) throws IOException {
//删除请求
DeleteRequest request = new DeleteRequest(index, id);
//执行客户端请求
DeleteResponse delete = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
logger.error("索引为: {}, id为: {}删除数据成功", index, id);
return delete;
}
/**
* 通过ID 更新数据
*
* @param object 要增加的数据
* @param index 索引,类似数据库
* @param id 数据ID
* @return
*/
@Override
public void updateDataById(Object object, String index, String id) throws IOException {
//更新请求
UpdateRequest update = new UpdateRequest(index, id);
//保证数据实时更新
//update.setRefreshPolicy("wait_for");
update.timeout("1s");
update.doc(JSON.toJSONString(object), XContentType.JSON);
//执行更新请求
UpdateResponse update1 = restHighLevelClient.update(update, RequestOptions.DEFAULT);
logger.error("索引为: {}, id为: {}, 更新数据成功", index, id);
}
/**
* 根据model批量更新索引数据
*
* @param productModels
* @param index
*/
@Override
public void updateDataBatchByModel(List<ProductModel> productModels, String index) {
List<UpdateRequest> updateRequests = new ArrayList<>();
productModels.forEach(e -> {
//获取id
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("test_demo");
//更新的id
updateRequest.id(e.getSourcePartId() + "");
//更新的数据
Map<String, Object> map = new HashMap<>();
map.put("title", "美国社会动荡");
updateRequest.doc(map);
updateRequests.add(updateRequest);
});
updateRequests.forEach(bulkProcessor::add);
}
/**
* 通过ID 更新数据,保证实时性
*
* @param object 要增加的数据
* @param index 索引,类似数据库
* @param id 数据ID
* @return
*/
@Override
public void updateDataByIdNoRealTime(Object object, String index, String id) throws IOException {
//更新请求
UpdateRequest update = new UpdateRequest(index, id);
//保证数据实时更新
update.setRefreshPolicy("wait_for");
update.timeout("1s");
update.doc(JSON.toJSONString(object), XContentType.JSON);
//执行更新请求
UpdateResponse update1 = restHighLevelClient.update(update, RequestOptions.DEFAULT);
logger.error("索引为: {}, id为: {}, 更新数据成功", index, id);
}
/**
* 创建索引并指定别名
*
* @param indexName
* @param aliasName
* @return
* @throws IOException
*/
@Override
public Boolean createIndexWithAlias(String indexName, String aliasName) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(indexName);
if (StringUtils.isNotEmpty(aliasName)) {
request.alias(new Alias(aliasName));
}
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
return createIndexResponse.isAcknowledged();
}
/**
* 删除索引别名
*
* @param indexName
* @param aliasName
* @return
* @throws IOException
*/
@Override
public Boolean deleteAlias(String indexName, String aliasName) throws IOException {
DeleteAliasRequest deleteAliasRequest = new DeleteAliasRequest(indexName, aliasName);
org.elasticsearch.client.core.AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().deleteAlias(deleteAliasRequest, RequestOptions.DEFAULT);
return acknowledgedResponse.isAcknowledged();
}
/**
* 更新索引别名
*
* @param alias
* @param index
*/
@Override
public void updateIndexAlias(String alias, String index) {
try {
this.addAlias(index, alias);
} catch (IOException e) {
logger.error("updateIndexAlias", e);
}
}
/**
* 增加索引别名
*
* @param indexName
* @param aliasName
* @return
* @throws IOException
*/
@Override
public Boolean addAlias(String indexName, String aliasName) throws IOException {
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
IndicesAliasesRequest.AliasActions aliasAction =
new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
.index(indexName)
.alias(aliasName);
aliasesRequest.addAliasAction(aliasAction);
AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT);
return acknowledgedResponse.isAcknowledged();
}
/**
* 批量更新索引别名
*
* @param alias
* @param oldIndexs
* @param newIndexs
*/
@Override
public void updateIndexAliasBatch(String alias, List<String> oldIndexs, List<String> newIndexs) {
try {
//删除alias、index关联
if (oldIndexs != null && !oldIndexs.isEmpty()) {
for (String oldIndex : oldIndexs) {
this.deleteAlias(oldIndex, alias);
}
}
//新增alias、index关联
if (newIndexs != null && !newIndexs.isEmpty()) {
for (String newIndex : newIndexs) {
this.addAlias(newIndex, alias);
}
}
} catch (IOException e) {
logger.error("updateIndexAlias", e);
}
}
/**
* 批量导入
*
* @param indexName
* @param isAutoId 使用自动id 还是使用传入对象的id
* @param source
* @return
* @throws IOException
*/
public BulkResponse importAll(String indexName, boolean isAutoId, String source) throws IOException {
if (0 == source.length()) {
//todo 抛出异常 导入数据为空
}
BulkRequest request = new BulkRequest();
JSONArray array = JSON.parseArray(source);
//todo 识别json数组
if (isAutoId) {
for (Object s : array) {
request.add(new IndexRequest(indexName).source(s, XContentType.JSON));
}
} else {
for (Object s : array) {
request.add(new IndexRequest(indexName).id(JSONObject.parseObject(s.toString()).getString("id")).source(s, XContentType.JSON));
}
}
return restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
}
/**
* 获取当前时间
*
* @return
*/
private String generateCurrentData() {
Long timeStamp = System.currentTimeMillis(); //获取当前时间戳
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String sd = sdf.format(new Date(Long.parseLong(String.valueOf(timeStamp)))); // 时间戳转换成时间
return sd;
}
/**
* 根据 id 删除指定索引中的文档
*
* @param indexName
* @param id
* @return
* @throws IOException
*/
@Override
public DeleteResponse deleteDoc(String indexName, String id) throws IOException {
DeleteRequest request = new DeleteRequest(indexName, id);
return restHighLevelClient.delete(request, RequestOptions.DEFAULT);
}
/**
* 根据 id 更新指定索引中的文档
*
* @param indexName
* @param id
* @return
* @throws IOException
*/
@Override
public UpdateResponse updateDoc(String indexName, String id, String updateJson) throws IOException {
UpdateRequest request = new UpdateRequest(indexName, id);
request.doc(XContentType.JSON, updateJson);
return restHighLevelClient.update(request, RequestOptions.DEFAULT);
}
/**
* 根据 id 更新指定索引中的文档
*
* @param indexName
* @param id
* @return
* @throws IOException
*/
@Override
public UpdateResponse updateDoc(String indexName, String id, Map<String, Object> updateMap) throws IOException {
UpdateRequest request = new UpdateRequest(indexName, id);
request.doc(updateMap);
return restHighLevelClient.update(request, RequestOptions.DEFAULT);
}
/**
* 根据某字段的 k-v 更新索引中的文档
*
* @param fieldName
* @param value
* @param indexName
* @throws IOException
*/
@Override
public void updateByQuery(String fieldName, String value, String... indexName) throws IOException {
UpdateByQueryRequest request = new UpdateByQueryRequest(indexName);
//单次处理文档数量
request.setBatchSize(100)
.setQuery(new TermQueryBuilder(fieldName, value))
.setTimeout(TimeValue.timeValueMinutes(2));
restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
}
/**
* 添加文档 手动指定id
*
* @param indexName
* @param id
* @param source
* @return
* @throws IOException
*/
@Override
public IndexResponse addDoc(String indexName, String id, String source) throws IOException {
IndexRequest request = new IndexRequest(indexName);
if (null != id) {
request.id(id);
}
request.source(source, XContentType.JSON);
return restHighLevelClient.index(request, RequestOptions.DEFAULT);
}
/**
* 添加文档 使用自动id
*
* @param indexName
* @param source
* @return
* @throws IOException
*/
@Override
public IndexResponse addDoc(String indexName, String source) throws IOException {
return addDoc(indexName, null, source);
}
/**
* 创建索引
*
* @param indexName
* @param settings
* @param mapping
* @return
* @throws IOException
*/
@Override
public CreateIndexResponse createIndex(String indexName, String settings, String mapping) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(indexName);
if (null != settings && !"".equals(settings)) {
request.settings(settings, XContentType.JSON);
}
if (null != mapping && !"".equals(mapping)) {
request.mapping(mapping, XContentType.JSON);
}
return restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
}
/**
* 根据条件删除
*
* @param request
* @param aDefault
* @return
*/
@Override
public BulkByScrollResponse deleteByQuery(DeleteByQueryRequest request, RequestOptions aDefault) {
BulkByScrollResponse bulkByScrollResponse = null;
try {
bulkByScrollResponse = restHighLevelClient.deleteByQuery(request, aDefault);
} catch (Exception e) {
e.printStackTrace();
}
return bulkByScrollResponse;
}
}
ElasticsearchQueryDataServiceImpl查询实现
/** es7 查询实现
* @author yixiaoqun
* @date 2023/7/26
* @Copyright 深圳立创电子商务有限公司
*/
@Service
public class ElasticsearchQueryDataServiceImpl implements ElasticsearchQueryDataService {
private final static Logger logger = LoggerFactory.getLogger(ElasticsearchQueryDataServiceImpl.class);
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 查询索引数据 列表 不高亮
* @param searchSourceBuilder
* @param classz
* @param indexes
* @param <T>
* @return
*/
@Override
public <T> List<T> queryListEs(SearchSourceBuilder searchSourceBuilder, Class<T> classz, String... indexes) {
// 查询的数据列表
List<T> list = new ArrayList<>();
try {
// 执行查询es数据
queryEsData(searchSourceBuilder, classz, list, indexes);
} catch (IOException e) {
logger.error("精确查询数据失败,错误信息:" + e.getMessage());
// throw new BusinessTipException("99999","精确查询数据失败");
}
return list;
}
/**
* 统计行数
* @param searchSourceBuilder
* @param indexes
* @return
*/
@Override
public long countEs(SearchSourceBuilder searchSourceBuilder, String... indexes) {
try{
// 创建查询请求对象,将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest(indexes);
searchRequest.source(searchSourceBuilder);
// 执行查询,然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
return searchResponse.getHits().getTotalHits().value;
}
}catch(Exception e){
logger.error("countEs",e);
}
return 0;
}
/**
* 高亮查询
* @param boolQueryBuilder
* @param classz
* @param indexes
* @param <T>
* @return
*/
@Override
public <T> List<T> boolQueryListEs(BoolQueryBuilder boolQueryBuilder, Class<T> classz, String... indexes) {
// 查询的数据列表
List<T> list = new ArrayList<>();
try {
// 构建查询源构建器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.size(100);
// 甚至返回字段
// 如果查询的属性很少,那就使用includes,而excludes设置为空数组
// 如果排序的属性很少,那就使用excludes,而includes设置为空数组
// String[] includes = {"title", "categoryName", "price"};
// String[] excludes = {};
// searchSourceBuilder.fetchSource(includes, excludes);
// 高亮设置
// 设置高亮三要素: field: 你的高亮字段 , preTags :前缀 , postTags:后缀
HighlightBuilder highlightBuilder = new HighlightBuilder().field("title").preTags("<font color='red'>").postTags("</font>");
highlightBuilder.field("spec").preTags("<font color='red'>").postTags("</font>");
searchSourceBuilder.highlighter(highlightBuilder);
// 创建查询请求对象,将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest(indexes);
searchRequest.source(searchSourceBuilder);
// 执行查询,然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
for (SearchHit hit : hits) {
// 将 JSON 转换成对象
T bean = JSON.parseObject(hit.getSourceAsString(), classz);
// 获取高亮的数据
HighlightField highlightField = hit.getHighlightFields().get("title");
System.out.println("高亮名称:" + highlightField.getFragments()[0].string());
// 替换掉原来的数据
Text[] fragments = highlightField.getFragments();
if (fragments != null && fragments.length > 0) {
StringBuilder title = new StringBuilder();
for (Text fragment : fragments) {
title.append(fragment);
}
// 获取method对象,其中包含方法名称和参数列表
Method setTitle = classz.getMethod("setTitle", String.class);
if (setTitle != null) {
// 执行method,bean为实例对象,后面是方法参数列表;setTitle没有返回值
setTitle.invoke(bean, title.toString());
}
}
list.add(bean);
}
}
} catch (Exception e) {
// log.error("布尔查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
// throw new MyBusinessException("99999", "布尔查询失败");
}
return list;
}
/**
* 执行es查询
* @param indexes
* @param beanClass
* @param list
* @param searchSourceBuilder
* @param <T>
* @throws IOException
*/
private <T> void queryEsData(SearchSourceBuilder searchSourceBuilder,Class<T> beanClass, List<T> list, String ... indexes) throws IOException {
// 创建查询请求对象,将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest(indexes);
searchRequest.source(searchSourceBuilder);
// 执行查询,然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
for (SearchHit hit : hits) {
// 将 JSON 转换成对象
// ProductModel userInfo = JSON.parseObject(hit.getSourceAsString(), ProductModel.class);
// 将 JSON 转换成对象
T bean = JSON.parseObject(hit.getSourceAsString(), beanClass);
list.add(bean);
}
}
}
/**
* 获取所有索引
* @return
*/
public Set<String> getAllIndexName(){
Set<String> indices=null;
try {
GetAliasesRequest request = new GetAliasesRequest();
GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request,RequestOptions.DEFAULT);
Map<String, Set<AliasMetadata>> map = getAliasesResponse.getAliases();
indices = map.keySet();
} catch (IOException e) {
e.printStackTrace();
}
return indices;
}
/**
* 获取分词结果
* @param text
* @return
*/
public List<String> getAnalyze(String text){
try{
List<String> list = new ArrayList<>();
Request request = new Request("GET", URLEncoder.encode("_analyze"));
JSONObject entity = new JSONObject();
entity.put("analyzer", "hanlp");
entity.put("text", text);
request.setJsonEntity(entity.toJSONString());
Response response = restHighLevelClient.getLowLevelClient().performRequest(request);
JSONObject tokens = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
JSONArray arrays = tokens.getJSONArray("tokens");
for (int i = 0; i < arrays.size(); i++)
{
JSONObject obj = JSON.parseObject(arrays.getString(i));
list.add(obj.getString("token"));
}
return list;
}catch (Exception e){
e.printStackTrace();
throw new RuntimeException("获取分词结果失败");
}
}
}