文章目录
- 1、聚合的分类
- 2、DSL实现bucket聚合
- 3、DSL实现Metrics 聚合
- 4、RestClient实现聚合
- 5、需求:返回过滤条件的信息
- 6、带过滤条件的聚合
1、聚合的分类
聚合(aggregations)可以实现对文档数据的统计、分析、运算。(类比MySQL的聚合函数)ES聚合常见的有三类:
- 桶(Bucket)聚合:用来对文档做分组
常用:
TermAggregation:按照文档字段值分组
Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
- 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
常用:
Avg:求平均值
Max:求最大值
Min:求最小值
Stats:同时求max、min、avg、sum等
- 管道(pipeline)聚合:基于其它聚合的结果为基础做聚合
根据这些分类的介绍,可以得出,参与聚合的字段类型必须是:
keyword
数值
日期
布尔
2、DSL实现bucket聚合
DSL写bucket聚合,主要有三要素:
- 聚合名称
- 聚合类型
- 聚合字段
语法
统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合
GET /hotel/_search
{
"size": 0, // 设置size为0,结果中不包含文档,只包含聚合结果
"aggs": { // 定义聚合
"brandAgg": { //给聚合起个名字
"terms": { // 聚合的类型,按照品牌值聚合,所以选择term
"field": "brand", // 参与聚合的字段
"size": 20 // 希望获取的聚合结果数量
}
}
}
}
Bucket聚合-聚合结果排序
默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序。想修改排序方式就加order字段:
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "asc" // 按照_count升序排列
},
"size": 20
}
}
}
}
Bucket聚合-限定聚合范围
默认情况下,Bucket聚合是对索引库的所有文档做聚合,想修改这个范围可以加query条件,query与aggs同级:
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200 // 只对200元以下的文档聚合
}
}
},
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
}
}
}
}
小结:
3、DSL实现Metrics 聚合
例:获取每个品牌的用户评分的min、max、avg等值.
首先确定用Metrics的stats聚合,其次是对每个品牌来求min、max,则要先按品牌bucket聚合:
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
},
"aggs": { // 是上面brands聚合的子聚合,也就是分组后对每组分别计算,聚合的嵌套!
"score_stats": { // 聚合名称
"stats": { // 聚合类型,这里stats可以计算min、max、avg等
"field": "score" // 聚合字段,这里是score
}
}
}
}
}
}
4、RestClient实现聚合
对比DSL语句,看JavaRestClient发送聚合请求:
UT里试试:
再进行响应结果处理:
测试代码:
@Test
void testBucket() throws IOException {
SearchRequest request = new SearchRequest("hotel");
request.source().size(0);
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(20));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
Aggregations aggregations = response.getAggregations();
Terms brandTerms = aggregations.get("brandAgg");
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
for (Terms.Bucket bucket : buckets) {
String brandName = bucket.getKeyAsString();
System.out.println(brandName);
}
}
5、需求:返回过滤条件的信息
搜索页面的品牌、城市等信息不应该是在前端页面写死,而是通过聚合索引库中的酒店数据得来的:
看到这个格式,最先想到的数据格式应该是Map
/**
* 查询城市、星级、品牌的聚合结果
* * @return 聚合结果,格式:{"城市": ["上海", "北京"], "品牌": ["如家", "希尔顿"]}
*/
Map<String, List<String>> filters();
接下来开始写Service层实现:
public interface IHotelService extends IService<Hotel> {
Map<String, List<String>> filters();
}
实现这个接口:
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Resource
RestHighLevelClient client;
@Override
public Map<String, List<String>> filters() {
try {
SearchRequest request = new SearchRequest("hotel");
//聚合
buildAggregation(request);
SearchResponse response = client.search(request,RequestOptions.DEFAULT);
//获取总的聚合结果
Aggregations aggregations = response.getAggregations();
//获取品牌聚合信息
List<String> brandFilterList = getAggByName(aggregations, "brandAgg");
Map<String,List<String>> filterMap = new HashMap<>();
filterMap.put("品牌",brandFilterList);
//获取城市聚合信息
List<String> cityFilterList = getAggByName(aggregations,"cityAgg");
filterMap.put("城市",cityFilterList);
//获取星级聚合信息
List<String> startFilterList = getAggByName(aggregations,"starNameAgg");
filterMap.put("星级",startFilterList);
return filterMap;
} catch (IOException e) {
throw new RuntimeException();
}
}
/**
* 聚合DSL拼接
* @param request
*/
private void buildAggregation(SearchRequest request) {
request.source().size(0);
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("starNameAgg")
.field("starName")
.size(100)
);
}
/**
* 聚合结果解析
* @param aggregations 聚合结果
* @param name AggName
* @return
*/
private List<String> getAggByName(Aggregations aggregations,String name) {
Terms brandTerms = aggregations.get(name);
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
List<String> filterList = buckets.stream()
.map(t -> {
String brandKey = t.getKeyAsString();
return brandKey;
}).collect(Collectors.toList());
return filterList;
}
}
上面的client这个Bean,在启动类中进行处理:
@SpringBootApplication
public class HotelDemoApplication {
public static void main(String[] args) {
SpringApplication.run(HotelDemoApplication.class, args);
}
@Bean
public RestHighLevelClient client(){
return new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://10.4.130.220:9200")
));
}
}
单元测试看下效果:
@SpringBootTest
class HotelDemoApplicationTests {
@Resource
IHotelService hotelService;
@Test
void testFilter(){
Map<String, List<String>> filters = hotelService.filters();
System.out.println(filters);
}
}
跑下单元测试:
6、带过滤条件的聚合
还是上面的需求,看下项目中前端页面的传参:
这里获取筛选信息的接口有传参,是因为,如果筛选项是对索引库全部数据进行聚合,就会出现这个场景;
即,我已经搜了上海虹桥了,但底下的城市过滤项还有其他城市,因此要对聚合加过滤条件。
- 接参dto类:
@Data
public class RequestParam {
private String key;
private Integer page; //pageNum
private Integer size; //pageSize
private String sortBy;
private String brand;
private String starName;
private String city;
private Integer minPrice;
private Integer maxPrice;
private String location;
}
- 定义接口
@RestController
@RequestMapping("/hotel")
public class HotelSearchController {
@Resource
IHotelService hotelService;
@PostMapping("/filters")
public Map<String, List<String>> getFiltersOption(@RequestBody RequestParam requestParam){
return hotelService.filters(requestParam);
}
}
- Service接口
public interface IHotelService extends IService<Hotel> {
Map<String, List<String>> filters(RequestParam requestParam);
}
- 接口实现,新加形参和过滤条件,
先过滤再聚合
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Resource
RestHighLevelClient client;
//过滤条件DSL构建
private void buildBasicQuery(RequestParam requestParam, SearchRequest request) {
//BoolQuery原始查询条件,原始算分
BoolQueryBuilder booleanQuery = QueryBuilders.boolQuery();
//关键字
String key = requestParam.getKey();
if (!isEmpty(key)) {
booleanQuery.must(QueryBuilders.matchQuery("all", key));
} else {
booleanQuery.must(QueryBuilders.matchAllQuery());
}
//城市
if (!isEmpty(requestParam.getCity())) {
booleanQuery.filter(QueryBuilders.termQuery("city", requestParam.getCity()));
}
//品牌
if (!isEmpty(requestParam.getBrand())) {
booleanQuery.filter(QueryBuilders.termQuery("brand", requestParam.getBrand()));
}
//星级
if (!isEmpty(requestParam.getStarName())) {
booleanQuery.filter(QueryBuilders.termQuery("startName", requestParam.getStarName()));
}
//价格
if (requestParam.getMaxPrice() != null && requestParam.getMinPrice() != null) {
booleanQuery.filter(QueryBuilders.rangeQuery("price")
.lte(requestParam.getMaxPrice())
.gte(requestParam.getMinPrice()));
}
//function score算分控制
FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(
booleanQuery, //第一个参数传入booleanQuery为原始查询,对应原始的相关性算分
new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{ //第二个形参,function score数组,里面有个function score元素
new FunctionScoreQueryBuilder.FilterFunctionBuilder( //function score元素对象,第一个参数传入筛选字段
QueryBuilders.termQuery("isAD", true), //不再用酒店品牌筛选,而是isAD字段
ScoreFunctionBuilders.weightFactorFunction(10) //算分函数,用默认的乘法,权重为10
)
});
request.source().query(functionScoreQuery);
}
@Override
public Map<String, List<String>> filters(RequestParam requestParam) {
try {
SearchRequest request = new SearchRequest("hotel");
//复合查询过滤
buildBasicQuery(requestParam,request);
//对过滤后的数据聚合
buildAggregation(request);
SearchResponse response = client.search(request,RequestOptions.DEFAULT);
//获取总的聚合结果
Aggregations aggregations = response.getAggregations();
//获取品牌聚合信息
List<String> brandFilterList = getAggByName(aggregations, "brandAgg");
Map<String,List<String>> filterMap = new HashMap<>();
filterMap.put("品牌",brandFilterList);
//获取城市聚合信息
List<String> cityFilterList = getAggByName(aggregations,"cityAgg");
filterMap.put("城市",cityFilterList);
//获取星级聚合信息
List<String> startFilterList = getAggByName(aggregations,"starNameAgg");
filterMap.put("星级",startFilterList);
return filterMap;
} catch (IOException e) {
throw new RuntimeException();
}
}
/**
* 聚合DSL拼接
* @param request
*/
private void buildAggregation(SearchRequest request) {
request.source().size(0);
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("starNameAgg")
.field("starName")
.size(100)
);
}
/**
* 聚合结果解析
* @param aggregations 聚合结果
* @param name AggName
* @return
*/
private List<String> getAggByName(Aggregations aggregations,String name) {
Terms brandTerms = aggregations.get(name);
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
List<String> filterList = buckets.stream()
.map(t -> {
String brandKey = t.getKeyAsString();
return brandKey;
}).collect(Collectors.toList());
return filterList;
}
}
看下效果:
最后,代码有点长,以上这个需求的底层DSL其实就是这个: