Spring Cloud学习(十一)【深入Elasticsearch 分布式搜索引擎03】

news2025/1/16 13:54:14

文章目录

  • 数据聚合
    • 聚合的种类
    • DSL实现聚合
    • RestAPI实现聚合
  • 自动补全
    • 拼音分词器
    • 自定义分词器
    • 自动补全查询
      • completion suggester查询
      • RestAPI实现自动补全
  • 数据同步
    • 数据同步思路分析
    • 实现elasticsearch与数据库数据同步
  • 集群
    • 搭建ES集群
      • 创建es集群
      • 集群状态监控
      • 创建索引库
        • 1)利用kibana的DevTools创建索引库
        • 2)利用cerebro创建索引库
      • 查看分片效果
    • ES集群的节点角色
    • 集群脑裂问题
    • 集群分布式存储
    • 集群分布式查询
    • 集群故障转移


数据聚合

聚合的种类

聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:

  • 桶(Bucket)聚合:用来对文档做分组

    • TermAggregation:按照文档字段值分组
    • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等

    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Stats:同时求max、min、avg、sum等
  • 管道(pipeline)聚合:其它聚合的结果为基础做聚合

可以类比mysql数据库,(桶=》group by 分组,度量=》聚合函数,管道=》)

参与聚合的字段类型必须是:

  • keyword
  • 数值
  • 日期
  • 布尔

DSL实现聚合

DSL实现Bucket聚合

现在,我们要统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。
类型为 term 类型,DSL示例:

GET /hotel/_search
{
  "size": 0,  // 设置size为0,结果中不包含文档,只包含聚合结果
  "aggs": { // 定义聚合
    "brandAgg": { //给聚合起个名字
      "terms": { // 聚合的类型,按照品牌值聚合,所以选择term
        "field": "brand", // 参与聚合的字段
        "size": 20 // 希望获取的聚合结果数量
      }
    }
  }
}

Bucket聚合-聚合结果排序

默认情况下,Bucket 聚合会统计 Bucket 内的文档数量,记为 _count,并且按照 _count 降序排序。
我们可以修改结果排序方式:

GET /hotel/_search
{
  "size": 0, 
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "order": {
          "_count": "asc" // 按照_count升序排列
        },
        "size": 20
      }
    }
  }
}

Bucket聚合-限定聚合范围

默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加 query 条件即可:

GET /hotel/_search
{
  "query": {
    "range": {
      "price": {
        "lte": 200 // 只对200元以下的文档聚合
      }
    }
  }, 
  "size": 0, 
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20
      }
    }
  }
}

aggs代表聚合,与query同级,此时query的作用是?

  • 限定聚合的的文档范围

聚合必须的三要素:

  • 聚合名称
  • 聚合类型
  • 聚合字段

聚合可配置属性有:

  • size:指定聚合结果数量
  • order:指定聚合结果排序方式
  • field:指定聚合字段

DSL实现Metrics 聚合

例如,我们要求获取每个品牌的用户评分的 min、max、avg 等值.
我们可以利用 stats 聚合:

GET /hotel/_search
{
  "size": 0, 
  "aggs": {
    "brandAgg": { 
      "terms": { 
        "field": "brand", 
        "size": 20
      },
      "aggs": { // 是brands聚合的子聚合,也就是分组后对每组分别计算
        "score_stats": { // 聚合名称
          "stats": { // 聚合类型,这里stats可以计算min、max、avg等
            "field": "score" // 聚合字段,这里是score
          }
        }
      }
    }
  }
}

RestAPI实现聚合

我们以品牌聚合为例,演示下 JavaRestClient 使用,先看请求组装:

在这里插入图片描述

再看下聚合结果解析

在这里插入图片描述

在IUserService中定义方法,实现对品牌、城市、星级的聚合

需求:搜索页面的品牌、城市等信息不应该是在页面写死,而是通过聚合索引库中的酒店数据得来的:

在这里插入图片描述

在IUserService中定义一个方法,实现对品牌、城市、星级的聚合,方法声明如下:

在这里插入图片描述

对接前端接口

前端页面会向服务端发起请求,查询品牌、城市、星级等字段的聚合结果:
在这里插入图片描述

可以看到请求参数与之前search时的RequestParam完全一致,这是在限定聚合时的文档范围。
例如:用户搜索“外滩”,价格在300~600,那聚合必须是在这个搜索条件基础上完成。
因此我们需要:

  1. 编写controller接口,接收该请求
  2. 修改IUserService#getFilters()方法,添加RequestParam参数
  3. 修改getFilters方法的业务,聚合时添加query条件
@Test
void testAggregation() throws IOException {
    // 1. 准备Request
    SearchRequest request = new SearchRequest("hotel");
    // 2. 准备DSL
    // 2.1 设置size
    request.source().size(0);
    // 2.2 聚合
    request.source().aggregation(AggregationBuilders
            .terms("brandAgg")
            .field("brand")
            .size(10)
    );
    // 3. 发出请求
    SearchResponse response = client.search(request, RequestOptions.DEFAULT);
    // 4. 解析结果
    Aggregations aggregations = response.getAggregations();
    // 4.1 根据聚合名称获取聚合结果
    Terms brandTerms = aggregations.get("brandAgg");
    // 4.2 获取 buckets
    List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
    for (Terms.Bucket bucket : buckets) {
        String key = bucket.getKeyAsString();
        System.out.println(key);
    }
}

Controller

@PostMapping("filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams params){
    return hotelService.filters(params);
}

Service接口

@Override
public Map<String, List<String>> filters(RequestParams params) {
    try {
        // 1. 准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 2. 准备DSL
        // 2.1 query
        buildBasicQuery(params, request);
        // 2.2 设置size
        request.source().size(0);
        // 2.3 聚合
        buildAggregation(request);
        // 3. 发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4. 解析结果
        Map<String, List<String>> result = new HashMap<>();
        Aggregations aggregations = response.getAggregations();
        // 4.1 根据品牌名称,获取品牌结果
        List<String> brandList = getAggByName(aggregations, "brandAgg");
        // 4.2 根据城市名称,获取城市结果
        List<String> cityList = getAggByName(aggregations, "cityAgg");
        // 4.3 根据星级名称,获取星级结果
        List<String> starList = getAggByName(aggregations, "starAgg");

        // 4.4 放入map
        result.put("品牌", brandList);
        result.put("城市", cityList);
        result.put("星级", starList);
        return result;
    } catch (IOException e) {
        throw new RuntimeException(e);
    } 
}

private List<String> getAggByName(Aggregations aggregations, String aggName) {
    // 4.1 根据聚合名称获取聚合结果
    Terms brandTerms = aggregations.get(aggName);
    // 4.2 获取 buckets
    List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
    // 4.3 遍历
    List<String> brandList = new ArrayList<>();
    for (Terms.Bucket bucket : buckets) {
        String key = bucket.getKeyAsString();
        brandList.add(key);
    }
    return brandList;
}

private void buildAggregation(SearchRequest request) {
    request.source().aggregation(AggregationBuilders
            .terms("brandAgg")
            .field("brand")
            .size(100)
    );
    request.source().aggregation(AggregationBuilders
            .terms("cityAgg")
            .field("city")
            .size(100)
    );
    request.source().aggregation(AggregationBuilders
            .terms("starAgg")
            .field("star")
            .size(100)
    );
}

private void buildBasicQuery(RequestParams params, SearchRequest request) {
    // 1. 构建BooleanQuery
    BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
    // 关键字搜索
    String key = params.getKey();
    if(key == null || "".equals(key)){
        boolQuery.must(QueryBuilders.matchAllQuery());
    }else{
        boolQuery.must(QueryBuilders.matchQuery("all", key));
    }
    // 条件过滤
    // 城市条件
    if (params.getCity() != null && !params.getCity().equals("")){
        boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
    }
    // 品牌条件
    if (params.getBrand() != null && !params.getBrand().equals("")){
        boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
    }
    // 星级条件
    if (params.getStarName() != null && !params.getStarName().equals("")){
        boolQuery.filter(QueryBuilders.termQuery("starName", params.getBrand()));
    }
    // 价格
    if (params.getMinPrice() != null && params.getMaxPrice() != null){
        boolQuery.filter(QueryBuilders
                 .rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
    }
    // 2. 算分控制
    FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(
            // 原始查询,相关性算分查询
            boolQuery,
            // function score 的数组
            new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
                    // 其中的一个 function score 元素
                    new FunctionScoreQueryBuilder.FilterFunctionBuilder(
                            // 过滤条件
                            QueryBuilders.termQuery("isAD", true),
                            // 算分函数
                            ScoreFunctionBuilders.weightFactorFunction(10)
                    )
            });

    request.source().query(functionScoreQuery);
}

自动补全

自动补全需求说明

当用户在搜索框输入字符时,我们应该提示出与该字符有关的搜索项,如图:

在这里插入图片描述

拼音分词器

要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin

安装方式与IK分词器一样,分三步:

  1. 解压
  2. 上传到虚拟机中,elasticsearch的plugin目录
  3. 重启elasticsearch
  4. 测试
POST /_analyze
{
 "text": "如家酒店整挺好",
 "analyzer": "pinyin"
}

在这里插入图片描述

自定义分词器

elasticsearch中分词器(analyzer)的组成包含三部分:

  • character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
  • tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
  • tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等

在这里插入图片描述

我们可以在创建索引库(自定义分词器只对指定的索引库适用)时,通过settings来配置自定义的analyzer(分词器):

在这里插入图片描述

拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。
创建倒排索引时:

在这里插入图片描述

因此字段在创建倒排索引时应该用 my_analyzer 分词器;字段在搜索时应该使用 ik_smart 分词器;

在这里插入图片描述

DELETE /test

# 自定义拼音分词器
PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": { 
        "my_analyzer": {
          "tokenizer": "ik_max_word",
          "filter": "py"
        }
      },
      "filter": {
        "py": { 
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "name":{
        "type": "text",
        "analyzer": "my_analyzer",  
        "search_analyzer": "ik_smart"
      }
    }
  }
}

POST /test/_doc/1
{
  "id": 1,
  "name": "狮子"
}
POST /test/_doc/2
{
  "id": 2,
  "name": "虱子"
}

GET /test/_search
{
  "query": {
    "match": {
      "name": "掉入狮子笼咋办"
    }
  }
}

如何使用拼音分词器?

  1. 下载pinyin分词器
  2. 解压并放到elasticsearch的plugin目录
  3. 重启即可

如何自定义分词器?

  1. 创建索引库时,在settings中配置,可以包含三部分
  2. character filter
  3. tokenizer
  4. filter

拼音分词器注意事项?

  • 为了避免搜索到同音字,搜索时不要使用拼音分词器

自动补全查询

completion suggester查询

elasticsearch提供了Completion Suggester 查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:

  • 参与补全查询的字段必须是completion类型。
  • 字段的内容一般是用来补全的多个词条形成的数组。

在这里插入图片描述
查询语法如下:

在这里插入图片描述

# 自动补全的索引库
PUT test2
{
  "mappings": {
    "properties": {
      "title":{
        "type": "completion"
      }
    }
  }
}
# 示例数据
POST test2/_doc
{
  "title": ["Sony", "WH-1000XM3"]
}
POST test2/_doc
{
  "title": ["SK-II", "PITERA"]
}
POST test2/_doc
{
  "title": ["Nintendo", "switch"]
}


# 自动补全查询
GET /test2/_search
{
  "suggest": {
    "titelSuggest": {
      "text": "s",
      "completion": {
        "field": "title",
        "skip_duplicates": true,
        "size": 10
      }
    }
  }
}

在这里插入图片描述
自动补全对字段的要求:

  • 类型是completion类型
  • 字段值是多词条的数组

酒店数据自动补全

实现hotel索引库的自动补全、拼音搜索功能

实现思路如下:

  1. 修改hotel索引库结构,设置自定义拼音分词器
  2. 修改索引库的name、all字段,使用自定义分词器
  3. 索引库添加一个新字段suggestion,类型为completion类型,使用自定义的分词器
  4. 给HotelDoc类添加suggestion字段,内容包含brand、business
  5. 重新导入数据到hotel库

注意:name、all是可分词的,自动补全的brand、business是不可分词的,要使用不同的分词器组合

# 酒店数据索引库
PUT /hotel
{
  "settings": {
    "analysis": {
      "analyzer": {
        "text_anlyzer": {
          "tokenizer": "ik_max_word",
          "filter": "py"
        },
        "completion_analyzer": {
          "tokenizer": "keyword",
          "filter": "py"
        }
      },
      "filter": {
        "py": {
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id":{
        "type": "keyword"
      },
      "name":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart",
        "copy_to": "all"
      },
      "address":{
        "type": "keyword",
        "index": false
      },
      "price":{
        "type": "integer"
      },
      "score":{
        "type": "integer"
      },
      "brand":{
        "type": "keyword",
        "copy_to": "all"
      },
      "city":{
        "type": "keyword"
      },
      "starName":{
        "type": "keyword"
      },
      "business":{
        "type": "keyword",
        "copy_to": "all"
      },
      "location":{
        "type": "geo_point"
      },
      "pic":{
        "type": "keyword",
        "index": false
      },
      "all":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart"
      },
      "suggestion":{
          "type": "completion",
          "analyzer": "completion_analyzer"
      }
    }
  }
}

GET /hotel/_search
{
  "query": {
    "match_all": {}
  }
}


GET /hotel/_search
{
  "suggest": {
    "titelSuggest": {
      "text": "h",
      "completion": {
        "field": "suggestion",
        "skip_duplicates": true,
        "size": 10
      }
    }
  }
}

RestAPI实现自动补全

先看请求参数构造的API:

在这里插入图片描述

再来看结果解析:

在这里插入图片描述

实现酒店搜索页面输入框的自动补全

查看前端页面,可以发现当我们在输入框键入时,前端会发起ajax请求:

在这里插入图片描述

在服务端编写接口,接收该请求,返回补全结果的集合,类型为List<String>

controller

@GetMapping("suggestion")
public List<String> getSuggestions(@RequestParam("key") String prefix){
    return hotelService.getSuggestions(prefix);
}

service

@Override
public List<String> getSuggestions(String prefix) {
    try {
        // 1. 准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 2. 准备DSL
        request.source().suggest(new SuggestBuilder().addSuggestion(
                "suggestions",
                SuggestBuilders.completionSuggestion("suggestion")
                .prefix(prefix)
                .skipDuplicates(true)
                .size(10)
        ));
        // 3. 发送请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4. 解析结果
        Suggest suggest = response.getSuggest();
        // 4.1 根据补全查询名称,获取补全结果
        CompletionSuggestion suggestions =  suggest.getSuggestion("suggestions");
        // 4.2 获取options
        List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
        // 4.3 遍历
        List<String> list = new ArrayList<>(options.size());
        for (CompletionSuggestion.Entry.Option option : options) {
            String text = option.getText().toString();
            list.add(text);
        }
        return list;
    } catch (IOException e) {
        throw new RuntimeException();
    }
}

数据同步

数据同步思路分析

elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步

在这里插入图片描述

方案一:同步调用

在这里插入图片描述

方案二:异步通知

在这里插入图片描述

方案三:监听binlog

在这里插入图片描述

方式一:同步调用

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高

方式二:异步通知

  • 优点:低耦合,实现难度一般
  • 缺点:依赖mq的可靠性

方式三:监听binlog

  • 优点:完全解除服务间耦合
  • 缺点:开启binlog增加数据库负担、实现复杂度高

实现elasticsearch与数据库数据同步

利用MQ实现mysql与elasticsearch数据同步

利用课前资料提供的hotel-admin项目作为酒店管理的微服务。当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。
步骤:

  • 导入课前资料提供的hotel-admin项目,启动并测试酒店数据的CRUD
  • 声明exchange、queue、RoutingKey
  • 在hotel-admin中的增、删、改业务中完成消息发送
  • 在hotel-demo中完成消息监听,并更新elasticsearch中数据
  • 启动并测试数据同步功能

在这里插入图片描述

  1. 导入项目
    在这里插入图片描述
  2. 声明exchange、queue、RoutingKey(两类消息,两种队列)

导入amqp依赖

<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yaml文件中配置rabbitmq

spring:
  rabbitmq:
    host: 10.211.55.6
    port: 5672
    username: itcast
    password: 123321
    virtual-host: /

MqConstants.java

public class MqConstants {

    /**
     * 交换机
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";

    /**
     * 监听新增和修改的队列
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";

    /**
     * 监听删除的队列
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";

    /**
     * 新增或修改的RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert";

    /**
     * 删除的RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";
}

MqConfig.java

@Configuration
public class MqConfig {

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
    }

    @Bean
    public Queue insertQueue(){
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
    }

    @Bean
    public Queue deleteQueue(){
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
    }

    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }

    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}
  1. 在hotel-admin中的增、删、改业务中完成消息发送

导入依赖,配置 yaml 文件

controller中

@RestController
@RequestMapping("hotel")
public class HotelController {

    @Autowired
    private IHotelService hotelService;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/{id}")
    public Hotel queryById(@PathVariable("id") Long id){
        return hotelService.getById(id);
    }

    @GetMapping("/list")
    public PageResult hotelList(
            @RequestParam(value = "page", defaultValue = "1") Integer page,
            @RequestParam(value = "size", defaultValue = "1") Integer size
    ){
        Page<Hotel> result = hotelService.page(new Page<>(page, size));

        return new PageResult(result.getTotal(), result.getRecords());
    }

    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){
        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);

    }
}

在这里插入图片描述

  1. 在hotel-demo中完成消息监听,并更新elasticsearch中数据

HotelListener.java

@Component
public class HotelListener {

    @Autowired
    private IHotelService hotelService;

    /**
     * 监听酒店新增或修改的业务
     * @param id
     */
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpdate(Long id){
        hotelService.insertById(id);
    }

    /**
     * 监听酒店新删除的业务
     * @param id
     */
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id){
        hotelService.deleteById(id);
    }
}

service

@Override
public void deleteById(Long id) {
    try {
        // 1. 准备Request
        DeleteRequest request = new DeleteRequest("hotel", id.toString());
        // 2. 准备发送请求
        client.delete(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

@Override
public void insertById(Long id) {
    try {
        // 0. 根据id查询酒店数据
        Hotel hotel = getById(id);
        // 转换为文档类型
        HotelDoc hotelDoc = new HotelDoc(hotel);

        // 1. 准备Request对象
        IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
        // 2. 准备Json文档
        request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
        // 3. 发送请求
        client.index(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

集群

ES集群结构

单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。

  • 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
  • 单点故障问题:将分片数据在不同节点备份(replica )

在这里插入图片描述

搭建ES集群

我们会在单机上利用docker容器运行多个es实例来模拟es集群。不过生产环境推荐大家每一台服务节点仅部署一个es的实例。

部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间

创建es集群

首先编写一个docker-compose文件,内容如下:

version: '2.2'
services:
  es01:
    image: elasticsearch:7.12.1
    container_name: es01
    environment:
      - node.name=es01
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es02,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data01:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - elastic
  es02:
    image: elasticsearch:7.12.1
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data02:/usr/share/elasticsearch/data
    ports:
      - 9201:9200
    networks:
      - elastic
  es03:
    image: elasticsearch:7.12.1
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data03:/usr/share/elasticsearch/data
    networks:
      - elastic
    ports:
      - 9202:9200
volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic:
    driver: bridge

es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

vi /etc/sysctl.conf

添加下面的内容:

vm.max_map_count=262144

然后执行命令,让配置生效:

sysctl -p

通过docker-compose启动集群:

docker-compose up -d

集群状态监控

kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。

这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro

解压即可使用,非常方便。

解压好的目录如下:

在这里插入图片描述

进入对应的bin目录:

在这里插入图片描述

双击其中的cerebro.bat文件即可启动服务。

在这里插入图片描述

访问http://localhost:9000 即可进入管理界面:

在这里插入图片描述

输入你的elasticsearch的任意节点的地址和端口,点击connect即可:

在这里插入图片描述

绿色的条,代表集群处于绿色(健康状态)。

创建索引库

1)利用kibana的DevTools创建索引库

在DevTools中输入指令:

PUT /itcast
{
  "settings": {
    "number_of_shards": 3, // 分片数量
    "number_of_replicas": 1 // 副本数量
  },
  "mappings": {
    "properties": {
      // mapping映射定义 ...
    }
  }
}
2)利用cerebro创建索引库

利用cerebro还可以创建索引库:

在这里插入图片描述

填写索引库信息:

在这里插入图片描述

点击右下角的create按钮:

在这里插入图片描述

查看分片效果

回到首页,即可查看索引库分片效果:

在这里插入图片描述

每个索引库的分片数量、副本数量都是在创建索引库时指定的,并且分片数量一旦设置以后无法修改。语法如下:

在这里插入图片描述

ES集群的节点角色

elasticsearch中集群节点有不同的职责划分:

在这里插入图片描述

elasticsearch中的每个节点角色都有自己不同的职责,因此建议集群部署时,每个节点都有独立的角色。

在这里插入图片描述

集群脑裂问题

默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。

为了避免脑裂,需要要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题

在这里插入图片描述

master eligible节点的作用是什么?

  • 参与集群选主
  • 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求

data节点的作用是什么?

  • 数据的CRUD

coordinator节点的作用是什么?

  • 路由请求到其它节点
  • 合并查询到的结果,返回给用户

集群分布式存储

当新增文档时,应该保存到不同分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?
elasticsearch会通过hash算法来计算文档应该存储到哪个分片:

在这里插入图片描述

说明:

  • _routing默认是文档的id
  • 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!

新增文档流程:

在这里插入图片描述

集群分布式查询

elasticsearch的查询分成两个阶段:

  • scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
  • gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户

在这里插入图片描述

分布式新增如何确定分片?

  • coordinating node根据id做hash运算,得到结果对shard数量取余,余数就是对应的分片

分布式查询的两个阶段

  • 分散阶段: coordinating node将查询请求分发给不同分片
  • 收集阶段:将查询结果汇总到coordinating node ,整理并返回给用户

集群故障转移

集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

故障转移:

  • master宕机后,EligibleMaster选举为新的主节点。
  • master节点监控分片、节点状态,将故障节点上的分片转移到正常节点,确保数据安全。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1239729.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

7种SQL的进阶用法

1.自定义排序&#xff08;ORDER BY FIELD&#xff09; 在MySQL中ORDER BY排序除了可以用ASC和DESC之外&#xff0c;还可以使用自定义排序方式来实现。 CREATE TABLE movies ( id INT PRIMARY KEY AUTO_INCREMENT, movie_name VARCHAR(255), actors VARCHAR(255), price DEC…

一次解决套接字操作超时错误的过程

作者&#xff1a;朱金灿 来源&#xff1a;clever101的专栏 为什么大多数人学不会人工智能编程&#xff1f;>>> 在windows客户端使用QTcpSocket连接一个ubuntu服务端程序&#xff0c;出现套接字操作超时的错误。开始感觉还莫名其妙的&#xff0c;因为之前连接都是好好…

【三维几何学习】自制简单的网格可视化软件 — Mesh Visualization

自制简单的网格可视化软件 — Mesh Visualization 引言一、整体框架1.1 三角形网格1.2 界面管理1.3 VTK可视化界面 二、核心源码2.1 三角形网格&#xff1a;TriMesh类2.2 界面Widget_Mesh_Manager2.3 VTK可视化2.4 main 引言 使用PyQt自制简单的网格可视化软件 - 视频展示 本是…

Kubernetes+Gitlab+Jenkins+ArgoCD多集群部署

KubernetesGitlabJenkinsArgoCD多集群部署 文章目录 KubernetesGitlabJenkinsArgoCD多集群部署1. KubernetesGitlabJenkinsArgoCD多集群部署2. 添加WebHooks自动触发3. Jenkins-构建-执行Shell4. 制作镜像及修改Yaml文件4.1 Dockerfile4.2 Build-Shell 5.自动部署Demo测试5.1 推…

VMware 16 Pro 安装以及下载

1、下载地址&#xff1a; https://www.aliyundrive.com/s/nj3PSD4TN9G 2、安装文件 右击打开 下一步 密钥&#xff1a;ZF3R0-FHED2-M80TY-8QYGC-NPKYF 到此&#xff0c;安装完毕

Mysql 递归查询子类Id的所有父类Id

文章目录 问题描述先看结果表结构展示实现递归查询集合查询结果修复数据 问题描述 最近开发过程中遇到一个问题,每次添加代理关系都要去递归查询一下它在不在这个代理关系树上.很麻烦也很浪费资源.想着把代理关系的父类全部存起来 先看结果 表结构展示 表名(t_agent_user_rela…

MySQL数据库时间计算的用法

今天给大家分享如何通过MySQL内置函数实现时间的转换和计算&#xff0c;在工作当中&#xff0c;测试人员经常需要查询数据库表的日期时间&#xff0c;但发现开发人员存入数据库表的形式都是时间戳形式&#xff0c;不利于测试人员查看&#xff0c;测试人员只能利用工具对时间戳进…

visual studio 如何建立 C 语言项目

安装这个 模块。 新建 空项目 创建完成 写demo 点击运行&#xff1a;

数据结构与算法编程题9

将两个递增的有序链表合并为一个递增的有序链表。要求结果链表仍使用原来两个链表的存储空间, 不另外占用其它的存储空间。表中不允许有重复的数据 a: 1, 2, 4, 5, 7, 8, 9, 10 b: 1, 2, 3, 6, 7, 8 #include <iostream> using namespace std;typedef int Elemtype; #def…

试试MyBatis-Plus可视化代码生成器,太香了,你一定会感谢我

前言 在基于Mybatis的开发模式中&#xff0c;很多开发者还会选择Mybatis-Plus来辅助功能开发&#xff0c;以此提高开发的效率。虽然Mybatis也有代码生成的工具&#xff0c;但Mybatis-Plus由于在Mybatis基础上做了一些调整&#xff0c;因此&#xff0c;常规的生成工具生成的代码…

redis运维(十六) 有序集合

一 有序集合 把握一点&#xff1a; 各种redis 命令都提供各种语言对应的API 接口,后续API是关键 ① 概念 1、sorted set --> 有序集合2、redis有序集合也是集合类型的一部分&#xff0c;所以它保留了集合中元素不能重复的特性3、但是不同的是,有序集合给每个元素多设置…

【LeetCode刷题-链表】--25.K个一组翻转链表

25.K个一组翻转链表 思路&#xff1a; 把链表节点按照k个一组分组&#xff0c;可以使用一个指针head依次指向每组的头节点&#xff0c;这个指针每次向前移动k步&#xff0c;直至链表结尾&#xff0c;对于每个分组&#xff0c; 先判断它的长度是否大于等于k&#xff0c;若是&am…

微博头条文章开放接口报错 auth by Null spi

接口文档地址 https://open.weibo.com/wiki/Toutiao/api 接口说明 https://api.weibo.com/proxy/article/publish.json 请求方式 POST 请求参数 参数名称类型是否必需描述titlestring是文章标题&#xff0c;限定32个中英文字符以内contentstring是正文内容&#xff0c;限制9…

Servlet---HttpServlet、HttpServletRequest、HttpServletResponseAPI详解

文章目录 HttpServlet基础方法doXXX方法Servlet的生命周期 HttpServletRequest获取请求中的信息获取请求传递的参数获取 query string 里的数据获取form表单里的数据获取JSON里的数据如何解析JSON格式获取数据返回数据 HttpServletResponse设置响应的Header设置不同的状态码设置…

C++(模板进阶)

目录 前言&#xff1a; 本章学习目标&#xff1a; 1.非类型模版参数 1.1使用方法 1.2注意事项 1.3 实际引用 2.模版特化 2.1概念 2.2函数模板特化 2.3类模板特化 2.3.1全特化 2.3.2偏特化 3.模版分离编译 ​编辑 3.1失败原因 ​编辑 3.2解决方案 4 总结 前言&…

乐划锁屏插画大赏热度持续,进一步促进价值内容的创造与传播

锁屏,原本只是为了防止手机在口袋里“误触”而打造的功能,现如今逐渐成为文化传播领域的热门入口。乐划锁屏不断丰富锁屏内容和场景玩法,通过打造“乐划锁屏插画大赏”系列活动为广大内容创作者提供了更多展示自我的机会,丰富平台内容。 从2020年到2023年,乐划锁屏插画大赏已连…

[点云分割] 基于颜色的区域增长分割

效果&#xff1a; 代码&#xff1a; #include <iostream> #include <thread> #include <vector>#include <pcl/point_types.h> #include <pcl/io/pcd_io.h> #include <pcl/search/search.h> #include <pcl/search/kdtree.h> #inclu…

Node.js入门指南(一)

目录 Node.js入门 什么是Node.js Node.js的作用 Node.js安装 Node.js编码注意事项 Buffer(缓冲器&#xff09; 定义 使用 fs模块 概念 文件写入 文件读取 文件移动与重命名 文件删除 文件夹操作 查看资源状态 路径问题 path模块 Node.js入门 什么是Node.js …

基于单片机声光控智能路灯系统仿真设计

**单片机设计介绍&#xff0c; 基于单片机声光控智能路灯系统仿真设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的声光控智能路灯系统是一种利用单片机技术实现智能控制的路灯系统。它通过感知环境音量和光照强度…

vue中为什么data属性是一个函数而不是一个对象

面试官&#xff1a;为什么data属性是一个函数而不是一个对象&#xff1f; 一、实例和组件定义data的区别 vue实例的时候定义data属性既可以是一个对象&#xff0c;也可以是一个函数 const app new Vue({el:"#app",// 对象格式data:{foo:"foo"},// 函数格…