Elasticsearch(三)
数据聚合
聚合的分类
文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html
聚合可以实现对文档数据的统计、分析、运算。聚合常见的有三类:
-
桶聚合:用来对文档做分组
-
- TermAggregation:按文档字段值分组
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
-
度量聚合:可以计算一些值,比如:最大值、最小值、平均值等
-
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Stats:同时求max、min、avg、sum等
-
管道聚合:其它聚合的结果为基础做聚合
DSL实现Bucket聚合
现在,我们要统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌名称做聚合。类型为term类型,DSL示例:
GET /hotel/_search
{
"size":0, // 设置size为0,结果中不包含文档,只包含聚合结果
"aggs": { // 定义聚合
"brandAgg": { //给聚合起个名字
"terms": { //集合类型,按照品牌值聚合,所以选择term
"field": "brand", // 参与聚合的字段
"size": 20 // 希望获取聚合结果数量
}
}
}
}
Bucket聚合-聚合结果排序
默认情况下,Bucket聚合会统计Bucket的文档数量,记为_count,并且按照_count降序排序。
我们可以修改结果排序方式:
GET /hotel/_search
{
"size":0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"_count": "asc" // 按照_count升序排序
}
}
}
}
}
Bucket聚合-限定聚合范围
默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合文档范围,只要添加query条件即可:
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200 // 只对200元以下的文档聚合
}
}
},
"size":0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
}
}
}
}
DSL实现Metrics聚合
例如,我们要求获取每个品牌的用户评分min、max、avg等值。
我们可以利用stats聚合:
GET /hotel/_search
{
"size":0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
},
"aggs": {
"scoreAgg": {
"stats": {
"field": "score"
}
}
}
}
}
}
对结果的平均值做排序
GET /hotel/_search
{
"size":0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20,
"order":{
"scoreAgg.avg": "desc" // 排序
}
},
"aggs": {
"scoreAgg": {
"stats": {
"field": "score"
}
}
}
}
}
}
RestAPI实现聚合
解析聚合结果
案例:在IUserService中定义方法,实现对品牌、城市、星级的聚合
声明接口
Map<String, List<String>> filters();
实现
@Override
public Map<String, List<String>> filters(){
// 1.准备Request
SearchRequest request = new SearchRequest("hotel");
request.source().size(0);
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100));
SearchResponse search = null;
Map<String,List<String>> map = new HashMap<>();
try {
search = client.search(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
List<String> brandList = getAggByName(search, "brandAgg");
map.put("品牌",brandList);
List<String> cityList = getAggByName(search, "cityAgg");
map.put("城市",cityList);
List<String> starList = getAggByName(search, "starAgg");
map.put("星级",starList);
return map;
}
private static List<String> getAggByName(SearchResponse search, String aggName) {
Aggregations aggregations = search.getAggregations();
Terms brandTerms = aggregations.get(aggName);
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
ArrayList<String> list = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
String key = bucket.getKeyAsString();
list.add(key);
}
return list;
}
对接前端
@Override
public Map<String, List<String>> filters(RequestParams params){
// 1.准备Request
SearchRequest request = new SearchRequest("hotel");
buildBasicQuery(params, request);
request.source().size(0);
buildAggregation(request);
SearchResponse search = null;
Map<String,List<String>> map = new HashMap<>();
try {
search = client.search(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
List<String> brandList = getAggByName(search, "brandAgg");
map.put("品牌",brandList);
List<String> cityList = getAggByName(search, "cityAgg");
map.put("城市",cityList);
List<String> starList = getAggByName(search, "starAgg");
map.put("星级",starList);
return map;
}
private static void buildAggregation(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100));
}
private static List<String> getAggByName(SearchResponse search, String aggName) {
Aggregations aggregations = search.getAggregations();
Terms brandTerms = aggregations.get(aggName);
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
ArrayList<String> list = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
String key = bucket.getKeyAsString();
list.add(key);
}
return list;
}
private static void buildBasicQuery(RequestParams params, SearchRequest request) {
// 2.准备DSL
// 构建BooleanQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 关键字搜索
String key = params.getKey();
if (StringUtils.isBlank(key)) {
boolQuery.must(QueryBuilders.matchAllQuery());
} else {
boolQuery.must(QueryBuilders.matchQuery("all", key));
}
// 城市条件
if (params.getCity() != null && !params.getCity().equals("")) {
boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
}
// 品牌条件
if (params.getBrand() != null && !params.getBrand().equals("")) {
boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
}
// 星级条件
if (params.getStarName() != null && !params.getStarName().equals("")) {
boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
}
// 价格条件
if (params.getMinPrice() != null && params.getMaxPrice() != null) {
boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
}
// 2.算分
FunctionScoreQueryBuilder functionScoreQueryBuilder = QueryBuilders
.functionScoreQuery(boolQuery, new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
QueryBuilders.termQuery("isAD",true),
ScoreFunctionBuilders.weightFactorFunction(10)
)
});
request.source().query(functionScoreQueryBuilder);
}
自动补全
使用拼音分词器
官网:https://github.com/medcl/elasticsearch-analysis-pinyin
- 解压
- 上传到虚拟机中,elasticsearch的plugin目录
- 重启es
- 测试
自定义分词器
elasticsearch中分词器的组成包含三部分:
- character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
- tokenizer:将文本按照一定的规则切割成词条。例如keyword,就是不分词;还有ik_smart
- tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等
可以通过创建索引库时,通过settings来配置自定义analyzer(分词器):
// 自定义拼音分词器
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
}
}
拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。
因此字段在创建倒排索引时应该使用my_analyzer分词器;字段在搜索时应该使用ik_smart分词器
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name":{
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}
completion suggester查询
es提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询效率,对于文档中字段的类型有一些约束:
- 参与补全查询的字段必须时completion类型
- 字段的内容一般是用来补全的多个词条形成的数组。
查询语法:
GET /test2/_search
{
"suggest": {
"titleSuggest": {
"text": "s",
"completion": {
"field": "title",
"skip_duplicates":true,
"size":10
}
}
}
}
案例:实现hotel索引库的自动补全、拼音搜索功能
1.修改hotel索引库,设置自定义拼音分词器
2.修改索引库的name、all字段、使用自定义分词器
3.索引库添加一个新字段suggestion,类型为completion类型,使用自定义分词器
4.给HotelDoc类添加suggestion类型,内容包含brand、business
5.重新导入数据到hotel库
PUT /hotel
{
"settings": {
"analysis": {
"analyzer": {
"text_anlyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"id":{
"type": "keyword"
},
"name":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart",
"copy_to": "all"
},
"address":{
"type": "keyword",
"index": false
},
"price":{
"type": "integer"
},
"score":{
"type": "integer"
},
"brand":{
"type": "keyword",
"copy_to": "all"
},
"city":{
"type": "keyword"
},
"starName":{
"type": "keyword"
},
"business":{
"type": "keyword",
"copy_to": "all"
},
"location":{
"type": "geo_point"
},
"pic":{
"type": "keyword",
"index": false
},
"all":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart"
},
"suggestion":{
"type": "completion",
"analyzer": "completion_analyzer"
}
}
}
}
RestAPI实现自动补全
请求构造API:
结果解析
案例:实现酒店搜索页面输入框的自动补全
@Override
public List<String> getSuggestions(String prefix) {
SearchRequest request = new SearchRequest("hotel");
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix(prefix)
.skipDuplicates(true)
.size(10)
));
SearchResponse search = null;
try {
search = client.search(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
Suggest suggest = search.getSuggest();
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
List<String> list = new ArrayList<>();
for (CompletionSuggestion.Entry.Option option : suggestions.getOptions()) {
String text = option.getText().string();
list.add(text);
}
return list;
}
数据同步
数据同步问题分析
es中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,es也必须跟着改变,这个就是es的数据同步
方案一:同步调用
- 优点:实现简单,粗暴
- 缺点:业务耦合度高
方案二:异步调用
- 优点:低耦合,实现难度一般
- 缺点:依赖mq的可靠性
方案三:监听binlog
- 优点:完全解除服务间耦合
- 缺点:开启binlog增加数据库负担、实现复杂度高
案例:利用MQ实现mysql与es数据同步
定义交换机,队列,routingkey的常量
package cn.itcast.hotel.constants;
/**
* @author xc
* @date 2023/5/12 13:06
*/
public class MqConstants {
/**
* 交换机
*/
private final static String HOTEL_EXCHANGE = "hotel.topic";
/**
* 监听新增和修改的队列
*/
private final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/**
* 监听删除的队列
*/
private final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/**
* 新增或修改的RoutingKey
*/
private final static String HOTEL_INSERT_KEY = "hotel.insert";
/**
* 删除key
*/
private final static String HOTEL_DELETE_KEY = "hotel.delete";
}
声明exchange、queue、RoutingKey
package cn.itcast.hotel.config;
import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author xc
* @date 2023/5/12 13:30
*/
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
发送消息
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
监听消息
@Override
public void deleteById(Long id) {
DeleteRequest request = new DeleteRequest("hotel",id.toString());
try {
client.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void insertById(Long id) {
Hotel hotel = getById(id);
HotelDoc hotelDoc = new HotelDoc(hotel);
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
try {
client.index(request,RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
集群
ES集群结构
单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。
- 海量数据存储问题:检索库从逻辑上拆分为N个片,存储到多个节点
- 单点故障问题:将分片数据在不同节点备份
部署es集群
我们会在单机上利用docker容器运行多个es实例来模拟es集群。不过生产环境推荐大家每一台服务节点仅部署一个es的实例。
部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间
1.创建es集群
首先编写一个docker-compose文件,内容如下:
version: '2.2'
services:
es01:
image: elasticsearch:7.12.1
container_name: es01
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic
es02:
image: elasticsearch:7.12.1
container_name: es02
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data02:/usr/share/elasticsearch/data
ports:
- 9201:9200
networks:
- elastic
es03:
image: elasticsearch:7.12.1
container_name: es03
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data03:/usr/share/elasticsearch/data
networks:
- elastic
ports:
- 9202:9200
volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: local
networks:
elastic:
driver: bridge
es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件
vi /etc/sysctl.conf
添加下面的内容:
vm.max_map_count=262144
然后执行命令,让配置生效:
sysctl -p
通过docker-compose启动集群:
docker-compose up -d
2.集群状态监控
kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。
这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro
访问http://localhost:9000 即可进入管理界面:
输入你的elasticsearch的任意节点的地址和端口,点击connect即可:
绿色的条,代表集群处于绿色(健康状态)。
3.创建索引库
1)利用kibana的DevTools创建索引库
在DevTools中输入指令:
PUT /itcast
{
"settings": {
"number_of_shards": 3, // 分片数量
"number_of_replicas": 1 // 副本数量
},
"mappings": {
"properties": {
// mapping映射定义 ...
}
}
}
2)利用cerebro创建索引库
利用cerebro还可以创建索引库:
填写索引库信息:
点击右下角的create按钮:
4.查看分片效果
回到首页,即可查看索引库分片效果:
ES集群的节点角色
es中集群节点有不同的职责划分:
es中每个节点角色都有自己的不同职责,因此建议集群部署时,每个节点都有独立的角色。
ES集群的脑裂
默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其他候选节点选举一个成为主节点。当主节点与其它节点网络故障时,可能发生脑裂问题。
为了避免脑裂,需要要求选票超过(eligible节点数量+1)/2才能当选为主,因此eligible节点数量最好是奇数。
ES集群的分布式存储
当新增文档时,应该保存到不同分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?
es会通过hash算法来计算文档应该存储到哪个分片
说明:
- _routing默认是文档id
- 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改
ES集群的故障转移
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。