分布式搜索引擎2——深入elasticsearch

news2025/1/13 15:29:30

数据聚合

聚合的分类

聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:

  • 桶(Bucket)聚合:用来对文档做分组

    • TermAggregation:按照文档字段值分组
    • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等

    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Stats: 同时求max、min、avg、sum等
  • 管道(pipeline)聚合:其它聚合的结果为基础做聚合

Bucket聚合

DSL实现

现在,我们要统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。

类型为term类型,DSL示例:

GET/hotel/_search{
	"size": 0,//设置size为0,结果中不包含文档,只包含聚合结果
    "aggs " : {//定义聚合
        "brandAgg": {//给聚合起个名字
            "terms" : {//聚合的类型,按照品牌值聚合,所以选择term
                "field" : "brand",//参与聚合的字段
                "size" : 20//希望获取的聚合结果数量
            }   
        }   
    }    
}

聚合结果排序

默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序。我们可以修改结果排序方式:

# 聚合
GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "order": {
          "_count": "asc"
        }, 
        "size": 20
      }
    }
  }
}

限定聚合范围

默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可:

GET /hotel/_search
{
  "query": {
    "range": {
      "price": {
        "lte": 200 //只对价格低于200的做聚合
      }
    }
  }, 
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20
      }
    }
  }
}

Metrics聚合

DSL实现

例如,我们要求获取每个品牌的用户评分的min、max、avg等值.

我们可以利用stats聚合:

GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20,
        "order": {
          "score_stats.avg": "desc"//根据平均评分降序排列
        }
      },
      "aggs": {//是brands聚合的子聚合,也就是分组后对每组分别计算
        "score_stats": {//聚合名称
          "stats": {//聚合类型,这里stats可以计算min、max、avg等
            "field": "score"//聚合字段,这里是score
          }
        }
      }
    }
  }
}

RestAPI实现聚合

我们以品牌聚合为例,演示下Java的RestClient使用,先看请求组装:

1

再看下聚合结果解析

2

@Test
void testAggregation() throws IOException {
    //1.准备Request
    SearchRequest request = new SearchRequest("hotel");
    //2.准备DSL
    //2.1设置size
    request.source().size(0);
    request.source().aggregation(AggregationBuilders
            .terms("brandAgg")
            .field("brand")
            .size(20)
    );
    //3.发出请求
    SearchResponse response = client.search(request, RequestOptions.DEFAULT);
    //4.解析结果
    Aggregations aggregations = response.getAggregations();
    Terms brandAgg = aggregations.get("brandAgg");
    List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
    for (Terms.Bucket bucket : buckets) {
        String key = bucket.getKeyAsString();
        long docCount = bucket.getDocCount();
        System.out.println(key + ":" + docCount);
    }
}

聚合案列

案例
在lUserService中定义方法,实现对品牌、城市、星级的聚合
需求:搜索页面的品牌、城市等信息不应该是在页面写死,而是通过聚合索引库中的酒店数据得来的:

在lUserService中定义一个方法,实现对品牌、城市、星级的聚合,方法声明如

/**
 * 查询城市、星级、品牌的聚合结果
 * @return 聚合结果,格式:["城市":["上海","北京""],"品牌" :["如家","希尔顿"]}
 */
Map<String, List<String>> filters();
@Override
public Map<String, List<String>> filters() {
    try {
        //1.准备Request
        SearchRequest request = new SearchRequest("hotel");
        //2.准备DSL
        //2.1设置size
        request.source().size(0);
        //2.2聚合
        buildAggregation(request);
        //3.发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        //4.解析结果
        Map<String, List<String>> results = new HashMap<>();
        Aggregations aggregations = response.getAggregations();
        //根据名称获取聚合结果
        List<String> brandList = getAggByName(aggregations, "brandAgg");
        List<String> cityList = getAggByName(aggregations, "cityAgg");
        List<String> starList = getAggByName(aggregations, "starAgg");
        //放入map
        results.put("品牌", brandList);
        results.put("城市", cityList);
        results.put("星级", starList);

        return results;
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

private static List<String> getAggByName(Aggregations aggregations, String name) {
    Terms brandAgg = aggregations.get(name);
    List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
    List<String> brandList = new ArrayList<>();
    for (Terms.Bucket bucket : buckets) {
        //获取key
        String key = bucket.getKeyAsString();
        brandList.add(key);
    }
    return brandList;
}

private void buildAggregation(SearchRequest request) {
    request.source().aggregation(AggregationBuilders
            .terms("brandAgg")
            .field("brand")
            .size(20)
    );
    request.source().aggregation(AggregationBuilders
            .terms("cityAgg")
            .field("city")
            .size(20)
    );
    request.source().aggregation(AggregationBuilders
            .terms("starAgg")
            .field("starName")
            .size(20)
    );
}

对接前端接口

前端页面会向服务端发起请求,查询品牌、城市、星级等字段的聚合结果:

3.1

3.2

可以看到请求参数与之前search时的RequestParam完全一致,这是在限定聚合时的文档范围。
例如:用户搜索“外滩”,价格在300~600,那聚合必须是在这个搜索条件基础上完成。

因此我们需要:

  1. 编写controller接口,接收该请求

    @PostMapping("/filters")
    public Map<String, List<String>> getFilters(@RequestBody RequestParams params) {
        return hotelService.filters(params);
    }
    
  2. 修改IHotelServicet#getFilters()方法,添加RequestParam参数

    Map<String, List<String>> filters(RequestParams params);
    
  3. 修改getFilters方法的业务,聚合时添加query条件

        @Override
        public Map<String, List<String>> filters(RequestParams params) {
            try {
                //1.准备Request
                SearchRequest request = new SearchRequest("hotel");
                //2.准备DSL
                //2.1query
                buildBasicQuery(params, request);
                //2.2设置size
                request.source().size(0);
                //2.3聚合
                buildAggregation(request);
                //3.发出请求
                SearchResponse response = client.search(request, RequestOptions.DEFAULT);
                //4.解析结果
                Map<String, List<String>> results = new HashMap<>();
                Aggregations aggregations = response.getAggregations();
                //根据名称获取聚合结果
                List<String> brandList = getAggByName(aggregations, "brandAgg");
                List<String> cityList = getAggByName(aggregations, "cityAgg");
                List<String> starList = getAggByName(aggregations, "starAgg");
                //放入map
                results.put("brand", brandList);
                results.put("city", cityList);
                results.put("starName", starList);
    
                return results;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    
        private static List<String> getAggByName(Aggregations aggregations, String name) {
            Terms brandAgg = aggregations.get(name);
            List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
            List<String> brandList = new ArrayList<>();
            for (Terms.Bucket bucket : buckets) {
                //获取key
                String key = bucket.getKeyAsString();
                brandList.add(key);
            }
            return brandList;
        }
    
        private void buildAggregation(SearchRequest request) {
            request.source().aggregation(AggregationBuilders
                    .terms("brandAgg")
                    .field("brand")
                    .size(20)
            );
            request.source().aggregation(AggregationBuilders
                    .terms("cityAgg")
                    .field("city")
                    .size(20)
            );
            request.source().aggregation(AggregationBuilders
                    .terms("starAgg")
                    .field("starName")
                    .size(20)
            );
        }
    
        private void buildBasicQuery(RequestParams params, SearchRequest request) throws IOException {
            //1.构建BooleanQuery
            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
            //关键字搜索
            String key = params.getKey();
            if (key == null || "".equals(key)) {
                boolQuery.must(QueryBuilders.matchAllQuery());
            } else {
                boolQuery.must(QueryBuilders.matchQuery("all", key));
            }
            //city精确匹配
            String city = params.getCity();
            if (!(city == null || "".equals(city))) {
                boolQuery.filter(QueryBuilders.termQuery("city", city));
            }
            //brand精确匹配
            String brand = params.getBrand();
            if (!(brand == null || "".equals(brand))) {
                boolQuery.filter(QueryBuilders.termQuery("brand", brand));
            }
            //startName精确查询
            String startName = params.getStartName();
            if (!(startName == null || "".equals(startName))) {
                boolQuery.filter(QueryBuilders.termQuery("startName", startName));
            }
            //价格
            Integer minPrice = params.getMinPrice();
            Integer maxPrice = params.getMaxPrice();
            if (minPrice != null && maxPrice != null) {
                boolQuery.filter(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice));
            }
            //2.算分控制
            FunctionScoreQueryBuilder functionScoreQuery =
                    QueryBuilders.functionScoreQuery(
                            //原始查询,相关性算分的查询
                            boolQuery,
                            //function score的数组
                            new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
                                    //其中的一个function score元素
                                    new FunctionScoreQueryBuilder.FilterFunctionBuilder(
                                            //过滤条件
                                            QueryBuilders.termQuery("isAD", true),
                                            //算分函数
                                            ScoreFunctionBuilders.weightFactorFunction(10)
                                    )
                            });
            request.source().query(functionScoreQuery);
        }
    }
    

自动补全

拼音分词器

自动补全需求说明

当用户在搜索框输入字符时,我们应该提示出与该字符有关的搜索项,如图:

4

使用拼音分词

要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin

安装方式与IK分词器一样,分三步:

  1. 解压
  2. 上传到虚拟机中,elasticsearch的plugin目录
  3. 重启elasticsearch
  4. 测试
# 测试拼音分词器
POST /_analyze
{
  "text": "如家酒店真不错",
  "analyzer": "pinyin"
}
{
  "tokens" : [
    {
      "token" : "ru",
      "start_offset" : 0,
      "end_offset" : 0,
      "type" : "word",
      "position" : 0
    },
    {
      "token" : "rjjdzbc",
      "start_offset" : 0,
      "end_offset" : 0,
      "type" : "word",
      "position" : 0
    },
    {
      "token" : "jia",
      "start_offset" : 0,
      "end_offset" : 0,
      "type" : "word",
      "position" : 1
    },
    {
      "token" : "jiu",
      "start_offset" : 0,
      "end_offset" : 0,
      "type" : "word",
      "position" : 2
    },
    {
      "token" : "dian",
      "start_offset" : 0,
      "end_offset" : 0,
      "type" : "word",
      "position" : 3
    },
    {
      "token" : "zhen",
      "start_offset" : 0,
      "end_offset" : 0,
      "type" : "word",
      "position" : 4
    },
    {
      "token" : "bu",
      "start_offset" : 0,
      "end_offset" : 0,
      "type" : "word",
      "position" : 5
    },
    {
      "token" : "cuo",
      "start_offset" : 0,
      "end_offset" : 0,
      "type" : "word",
      "position" : 6
    }
  ]
}

自定义分词器

elasticsearch中分词器(analyzer)的组成包含三部分:

  • character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
  • tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
  • tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等

5

我们可以在创建索引库时,通过settings来配置自定义的analyzer(分词器)∶

PUT /test{
	"settings": {
        "analysis" : {
            "analyzer" : { //自定义分词器
                "my_analyzer" : {//分词器名称
                    "tokenizer" : "ik_max_word",
                    "filter" : "pinyin"
                }
            }
        }
    }
}

此处的拼音分词器还需要进行进一步配置:

PUT /test{
    "settings": {
        "analysis": {
            "analyzer" : { //自定义分词器
                "my_analyzer" : {//分词器名称
                    "tokenizer" : "ik_max_word",
                    "filter": "py"
                }
            },
            "filter": { //自定义tokenizer filter
                "py": {//过滤器名称"type" : "pinyin",//过滤器类型,这里是pinyin
                    "keep_full_pinyin" : false,
                    "keep_joined_full_pinyin": true,
                    "keep_original" : true,
                    "limit_first_letter_length": 16,
                    "remove_duplicated_term" : true,
                    "none_chinese_pinyin_tokenize": false
                }
            }
        }
    }
}

拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。

6

因此字段在创建倒排索引时应该用my_analyzer分词器;字段在搜索时应该使用ik_smart分词器;

PUT /test{
  "settings": {
    "analysis": {
      "analyzer": {
        "my_analyzer": {
          "tokenizer": "ik_max_word",
          "filter": "py"
        }
      },
      "filter": {
        "py": {...}
      }
    }
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "analyzer": "my_analyzer",
        "search_analyzer": "ik_smart"
      }
    }
  }
}

自动补全查询

completion suggester查询

elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:

  • 参与补全查询的字段必须是completion类型。
//创建索引库PUT test
{
  "mappings": {
    "properties": {
      "title": {
        "type": "completion"
      }
    }
  }
}
//示例数据
POST test/ _doc
{
  "title" : ["Sony","WH-1000XM3"]
}
POST test/_doc
{
  "title" : ["SK-II""PITERA"]
}
POST test/_doc
{
  "title" : ["Nintendo","switch"]
}

查询语法如下:

//自动补全查询
GET /test/_search
{
  "suggest": {
    "title_suggest": {
      "text": "s",//关键字
      "completion": {
        "field": "title",//补全查询的字段
        "skip_duplicates": true//跳过重复的
        "size": 10//获取前10条结果
      }
    }
  }
}

自动补全对字段的要求:

  • 类型是completion类型
  • 字段值是多词条的数组

实现酒店搜索框

案例
实现hotel索引库的自动补全、拼音搜索功能
实现思路如下:

  1. 修改hotel索引库结构,设置自定义拼音分词器

  2. 修改索引库的name、all字段,使用自定义分词器

  3. 索引库添加一个新字段suggestion,类型为completion类型,使用自定义的分词器

    DELETE /hotel
    # 酒店数据索引库
    PUT /hotel
    {
      "settings": {
        "analysis": {
          "analyzer": {
            "text_anlyzer": {
              "tokenizer": "ik_max_word",
              "filter": "py"
            },
            "completion_analyzer": {
              "tokenizer": "keyword",
              "filter": "py"
            }
          },
          "filter": {
            "py": {
              "type": "pinyin",
              "keep_full_pinyin": false,
              "keep_joined_full_pinyin": true,
              "keep_original": true,
              "limit_first_letter_length": 16,
              "remove_duplicated_term": true,
              "none_chinese_pinyin_tokenize": false
            }
          }
        }
      },
      "mappings": {
        "properties": {
          "id":{
            "type": "keyword"
          },
          "name":{
            "type": "text",
            "analyzer": "text_anlyzer",
            "search_analyzer": "ik_smart",
            "copy_to": "all"
          },
          "address":{
            "type": "keyword",
            "index": false
          },
          "price":{
            "type": "integer"
          },
          "score":{
            "type": "integer"
          },
          "brand":{
            "type": "keyword",
            "copy_to": "all"
          },
          "city":{
            "type": "keyword"
          },
          "starName":{
            "type": "keyword"
          },
          "business":{
            "type": "keyword",
            "copy_to": "all"
          },
          "location":{
            "type": "geo_point"
          },
          "pic":{
            "type": "keyword",
            "index": false
          },
          "all":{
            "type": "text",
            "analyzer": "text_anlyzer",
            "search_analyzer": "ik_smart"
          },
          "suggestion":{
              "type": "completion",
              "analyzer": "completion_analyzer"
          }
        }
      }
    }
    
  4. 给HotelDoc类添加suggestion字段,内容包含brand、business

    
    @Data
    @NoArgsConstructor
    public class HotelDoc {
        private Long id;
        private String name;
        private String address;
        private Integer price;
        private Integer score;
        private String brand;
        private String city;
        private String starName;
        private String business;
        private String location;
        private String pic;
        private Object distance;
        private String isAD;
        private List<String> suggestion;
    
        public HotelDoc(Hotel hotel) {
            this.id = hotel.getId();
            this.name = hotel.getName();
            this.address = hotel.getAddress();
            this.price = hotel.getPrice();
            this.score = hotel.getScore();
            this.brand = hotel.getBrand();
            this.city = hotel.getCity();
            this.starName = hotel.getStarName();
            this.business = hotel.getBusiness();
            this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
            this.pic = hotel.getPic();
            if (this.business.contains("/")) {
                // business有多个值,进行切割 like:江湾/五角场商业区
                String[] arr = this.business.split("/");
                // 添加元素
                this.suggestion = new ArrayList<>();
                this.suggestion.add(this.brand);
                Collections.addAll(this.suggestion, arr);
            } else if (this.business.contains("、")) {
                // business有多个值,进行切割 like:江湾、五角场商业区
                String[] arr = this.business.split("、");
                // 添加元素
                this.suggestion = new ArrayList<>();
                this.suggestion.add(this.brand);
                Collections.addAll(this.suggestion, arr);
            } else {
                this.suggestion = Arrays.asList(this.brand, this.business);
            }
        }
    }
    
  5. 重新导入数据到hotel库

    运行ElasticsearchDocumentTest中的testBulkRequest

SDL测试

# 自动补全查询
POST /hotel/_search
{
  "suggest": {
    "title_suggest": {
      "text": "h", 
      "completion": {
        "field": "suggestion", 
        "skip_duplicates": true,
        "size": 10 
      }
    }
  }
}

7

RestAPI实现

发送请求

8

结果处理

9

@Test
void testSuggest() throws IOException {
    // 1.准备Request
    SearchRequest request = new SearchRequest("hotel");
    // 2.准备DSL
    request.source().suggest(new SuggestBuilder().addSuggestion(
            "suggestion",
            SuggestBuilders.completionSuggestion("suggestion")
                    .prefix("h")
                    .skipDuplicates(true)
                    .size(10)
    ));
    // 3.发起请求
    SearchResponse response = client.search(request, RequestOptions.DEFAULT);
    // 4.解析结果
    Suggest suggest = response.getSuggest();
    // 4.1根据名称获取补全结果
    CompletionSuggestion suggestion = suggest.getSuggestion("suggestion");
    // 4.2获取options并遍历
    for (Suggest.Suggestion.Entry.Option options : suggestion.getOptions()) {
        // 4.3获取一个option 中的text,也就是补全的词条
        String text = options.getText().string();
        System.out.println(text);
    }
}

运行结果:

10

案例
实现酒店搜索页面输入框的自动补全
查看前端页面,可以发现当我们在输入框键入时,前端会发起ajax请求:

实现案列

案例
实现酒店搜索页面输入框的自动补全

查看前端页面,可以发现当我们在输入框键入时,前端会发起ajax请求:

11

实现:

@GetMapping("/suggestion")
public List<String> getSuggestions(@RequestParam("key") String prefix){
    return hotelService.getSuggestions(prefix);
}
List<String> getSuggestions(String prefix);
@Override
public List<String> getSuggestions(String prefix) {
    try {
        // 1.准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 2.准备DSL
        request.source().suggest(new SuggestBuilder().addSuggestion(
                "suggestion",
                SuggestBuilders.completionSuggestion("suggestion")
                        .prefix(prefix)
                        .skipDuplicates(true)
                        .size(10)
        ));
        // 3.发起请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4.解析结果
        Suggest suggest = response.getSuggest();
        // 4.1根据名称获取补全结果
        CompletionSuggestion suggestion = suggest.getSuggestion("suggestion");
        // 4.2获取options并遍历
        List<String> list = new ArrayList<>(suggestion.getOptions().size());
        for (Suggest.Suggestion.Entry.Option options : suggestion.getOptions()) {
            // 4.3获取一个option 中的text,也就是补全的词条
            String text = options.getText().string();
            list.add(text);
        }
        return list;
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

数据同步

数据同步问题分析

elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步

在微服务中,负责酒店管理(操作mysql )的业务与负责酒店搜索(操作elasticsearch )的业务可能在两个不同的微服务上,数据同步该如何实现呢?

方案一:同步调用

12

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高

方案二:异步通知

13

  • 优点:低耦合,实现难度一般
  • 缺点:依赖mq的可靠性

方案三:监听binlog

14

  • 优点:完全解除服务间耦合
  • 缺点:开启binlog增加数据库负担、实现复杂度高

案例

案例 利用MQ实现mysql与elasticsearch数据同步

利用课前资料提供的hotel-admin项目作为酒店管理的微服务。当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。

步骤:

  • 导入课前资料提供的hotel-admin项目,启动并测试酒店数据的CRUD

  • 声明exchange、queue、RoutingKey

    package cn.itcast.hotel.constants;
    
    public class MqConstants {
        /**
         * 交换机
         */
        public static final String HOTEL_EXCHANGE = "hotel.topic";
        /**
         * 监听新增和修改的队列
         */
        public static final String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
        /**
         * 监听删除的队列
         */
        public static final String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
        /**
         * 新增或修改的RoutingKey
         */
        public static final String HOTEL_INSERT_KEY = "hotel.insert";
        /**
         * 删除的RoutingKey
         */
        public static final String HOTEL_DELETE_KEY = "hotel.delete";
    }
    
    package cn.itcast.hotel.config;
    
    import cn.itcast.hotel.constants.MqConstants;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MqConfig {
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
        }
    
        @Bean
        public Queue insertQueue() {
            return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
        }
    
        @Bean
        public Queue deleteQueue() {
            return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
        }
    
        @Bean
        public Binding insertQueueBinding() {
            return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
        }
    
        @Bean
        public Binding deleteQueueBinding() {
            return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
        }
    
    }
    
  • 在hotel-admin中的增、删、改业务中完成消息发送

    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel) {
        hotelService.save(hotel);
        //发送mq消息
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    }
    
    @PutMapping()
    public void updateById(@RequestBody Hotel hotel) {
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        //发送mq消息
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    }
    
    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);
        //发送mq消息
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);
    }
    
  • 在hotel-demo中完成消息监听,并更新elasticsearch中数据

    package cn.itcast.hotel.mq;
    
    import cn.itcast.hotel.constants.MqConstants;
    import cn.itcast.hotel.service.IHotelService;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class HotelListener {
    
        @Autowired
        private IHotelService hotelService;
    
        /**
         * 监听酒店新增或修改的业务
         * @param id 酒店id
         */
        @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
        private void listenHotelInsertOrUpdate(Long id) {
            hotelService.insertById(id);
        }
    
        /**
         * 监听酒店删除的业务
         * @param id 酒店id
         */
        @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
        private void listenHotelDelete(Long id) {
            hotelService.deleteById(id);
        }
    }
    
    void deleteById(Long id);
    
    void insertById(Long id);
    
    @Override
    public void deleteById(Long id) {
        try {
            // 1.准备Request
            DeleteRequest request = new DeleteRequest("hotel", id.toString());
            // 2.发送请求
            client.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    
    @Override
    public void insertById(Long id) {
        try {
            // 0.根据id查酒店数据
            Hotel hotel = getById(id);
            //转换为文档类型
            HotelDoc hotelDoc = new HotelDoc(hotel);
    
            //1.准备Request对象
            IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
            //2.准备Json文档
            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
            //3.发送请求
            client.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    
  • 启动并测试数据同步功能

15

elasticsearch集群

搭建ES集群

ES集群结构

单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。

  • 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点

  • 单点故障问题:将分片数据在不同节点备份( replica )

16

搭建ES集群

我们计划利用3个docker容器模拟3个es的节点。

首先编写一个docker-compose文件,内容如下:

version: '2.2'
services:
  es01:
    image: elasticsearch:7.12.1
    container_name: es01
    environment:
      - node.name=es01
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es02,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data01:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - elastic
  es02:
    image: elasticsearch:7.12.1
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data02:/usr/share/elasticsearch/data
    ports:
      - 9201:9200
    networks:
      - elastic
  es03:
    image: elasticsearch:7.12.1
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data03:/usr/share/elasticsearch/data
    networks:
      - elastic
    ports:
      - 9202:9200
volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic:
    driver: bridge

es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

vi /etc/sysctl.conf

添加下面的内容:

vm.max_map_count=262144

然后执行命令,让配置生效:

sysctl -p

通过docker-compose启动集群:

docker-compose up -d

集群状态监控

kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。

这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro

注意:在 Java 9 及以上版本中,模块化系统的安全性增强导致了 java.lang.ClassLoader.defineClass 方法默认不可访问。因此,在 Windows 系统中使用 .\cerebro.bat 启动 Cerebro 时,需要通过添加 --add-opens 参数来打开此方法的访问权限,才能避免 java.lang.reflect.InaccessibleObjectException 异常。所以此处采用jdk1.8运行

创建索引库

1)利用kibana的DevTools创建索引库

在DevTools中输入指令:

PUT /itcast
{
  "settings": {
    "number_of_shards": 3, // 分片数量
    "number_of_replicas": 1 // 副本数量
  },
  "mappings": {
    "properties": {
      // mapping映射定义 ...
    }
  }
}

2)利用cerebro创建索引库

利用cerebro还可以创建索引库:

17

填写索引库信息:

18

点击右下角的create按钮:

19

查看分片效果

回到首页,即可查看索引库分片效果:

20

ES集群的节点角色

elasticsearch中集群节点有不同的职责划分:

21

elasticsearch中的每个节点角色都有自己不同的职责,因此建议集群部署时,每个节点都有独立的角色。

22

集群脑裂问题

默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。

为了避免脑裂,需要要求选票超过(eligible节点数量+1)/2才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题

ES集群的分布式存储

当新增文档时,应该保存到不同分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?

23

说明:

  • _routing默认是文档的id
  • 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!

24

ES集群的分布式查询

elasticsearch的查询分成两个阶段:

  • scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
  • gather phase:聚集阶段,coordinating node汇总data node的搜索结果并处理为最终结果集返回给用户

25

ES集群的故障转移

集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。

26

故障转移:

  • master宕机后,EligibleMaster选举为新的主节点。
  • master节点监控分片、节点状态,将故障节点上的分片转移到正常节点,确保数据安全。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/513138.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第10章:数据处理增删改

一、插入数据 CREATE TABLE emp1 (id int(11) ,name varchar(15) ,hire_date date ,salary double(10,2) )1.添加一条数据 ①没有指明添加的字段&#xff0c;一定按照顺序添加 insert into emp1 values(1,wang,2000-4-4,5900)②指明添加的字段&#xff08;推荐&#xff09;…

【 图像水印 2019 CVPR】 StegaStamp 论文翻译

【 图像水印 2019 CVPR】 StegaStamp 论文翻译 论文题目&#xff1a;StegaStamp: Invisible Hyperlinks in Physical Photographs 中文题目&#xff1a;物理照片中不可见的超链接 论文链接&#xff1a;https://arxiv.org/abs/1904.05343 论文代码&#xff1a;https://github.co…

QxRibbon 知:openEuler 23.03 搭建 Qt5 开发环境

文章目录 安装 openEuler 23.03 虚拟机安装 GNOME 桌面环境安装 Qt5 开发环境构建 QxRibbon参考资料 安装 openEuler 23.03 虚拟机 VMware 安装 openEuler 23.03 虚拟机 平台&#xff1a;x86_64 虚拟机配置&#xff1a;4核、4G内存、100G磁盘 ISO 镜像&#xff1a;https://mir…

FT2000+ qemu kvm 红旗 crash 分析 频繁设置CPU online导致进程卡死、不调度故障

测试程序 /** tcti.cpp参考&#xff1a; https://www.cnblogs.com/organic/p/17321523.htmlg -stdc11 -lpthread trigger_cgroup_timer_inactive.cpp -o inactive_timer ./inactive_timer 100000 10000 */#include <errno.h> #include <iostream> #include <pt…

Redis进阶(集群,雪崩,击穿,穿透.......)

Redis进阶 Redis事务_事务的概念与ACID特性 Redis的事物不保证原子性 数据库层面事务 在数据库层面&#xff0c;事务是指一组操作&#xff0c;这些操作要么全都被成功执行&#xff0c;要么全都不执行。 数据库事务的四大特性 A&#xff1a;Atomic&#xff0c;原子性&#xf…

Docker笔记5 | 容器的基本操作

5 | 容器的基本操作 1 启动容器1.1 启动方式1.2 新建容器并启动1.3 docker run时的运行过程1.4 启动已终止容器1.5 后台运行1.6 查看容器信息 2 终止容器3 进入容器3.1 docker attach3.2 docker exec 4 导入导出容器4.1 导出容器4.2 导入容器 5 删除容器 1 启动容器 1.1 启动方…

Linux内核主要组成部分有哪些?

Linux 内核由几大子系统构成&#xff0c;分别为进程调度、进程间通信&#xff08;IPC&#xff09; 、内存管理、虚拟 文件系统和网络接口。这几大子系统既相互独立又有非常紧密的关联。图 3-5 展示了内核的 几大子系统之间以及这些子系统和计算机系统的其他模块之间的关系。 接…

ADS-B教学实验方案

ADS-B教学系统是为了让学生学习ADS-B原理、ADS-B系统组成、ADS-B信号处理技术。可以通过ADS-B教学系统进一步研究分析ADS-B位置的精度、准确性、稳定性、实时性&#xff0c;设计基于ADS-B的空中碰撞告警系统&#xff0c;混合空域的空中交通管理系统(UTM)设计。也可以研究ADS-B报…

《花雕学AI》你不知道的AI 机器人:29个让你大开眼界的事实

AI 机器人是人工智能技术的最具代表性的应用之一&#xff0c;它们可以模仿人类的行为和思维&#xff0c;完成各种复杂的任务&#xff0c;如识别图像、语音和文字&#xff0c;进行对话、翻译和推理&#xff0c;控制机械臂、汽车和飞机等。AI 机器人的发展速度令人惊叹&#xff0…

Windows命令提示行使用指南二(批处理)

命令提示行使用指南 前言四、批处理简介五、如何编写批处理1、Hello world2、做加法3、查找文件&#xff0c;并输出到文本。4、批量重命名5、自动记录开机时间 前言 cmd 是 Windows 操作系统中的命令行界面&#xff08;CLI&#xff09;&#xff0c;也称为命令提示符&#xff0…

Linux shell编程 数组排序算法

冒泡排序 循环对比相邻的元素&#xff0c;交换较大元素到后面的位置 大循环根据列表中存在的元素数量循环n-1次&#xff0c;保证所有元素都能被排序完成 小循环从前向后遍历&#xff0c;循环一次循环范围减少一位&#xff08;由于后面的已经排列完成无需再比较&#xff09;小循…

【halcon知识】应用仿射变换

一、说明 无论什么样的变换&#xff0c;都离不开齐次变换矩阵。一般地&#xff0c;先准备一个空的齐次变换矩阵&#xff0c;这个矩阵随便填写&#xff1a;1&#xff09;填入旋转类参数就是旋转矩阵&#xff0c;2——填入仿射参数就可进行仿射变换&#xff0c;3&#xff09;填入…

Kali-linux攻击WordPress和其他应用程序

今天越来越多的企业利用SAAS&#xff08;Software as a Service&#xff09;工具应用在他们的业务中。例如&#xff0c;他们经常使用WordPress作为他们网站的内容管理系统&#xff0c;或者在局域网中使用Drupal框架。从这些应用程序中找到漏洞&#xff0c;是非常有价值的。 为…

[JAVA数据结构]堆

目录 1.堆的概念 2.堆的创建 3.堆的插入与删除 3.1堆的插入 3.2堆的删除 1.堆的概念 如果有一个关键码的集合K {k0&#xff0c;k1&#xff0c; k2&#xff0c;…&#xff0c;kn-1}&#xff0c;把它的所有元素按完全二叉树的顺序存储方式存储在一个一维数组中&#xff0c;…

【Linux】远程桌面连接服务器报错:未启用对服务器的远程访问......

&#x1f341;博主简介 &#x1f3c5;云计算领域优质创作者   &#x1f3c5;华为云开发者社区专家博主   &#x1f3c5;阿里云开发者社区专家博主 &#x1f48a;交流社区&#xff1a;运维交流社区 欢迎大家的加入&#xff01; 文章目录 前述操作环境说明&#xff1a;远程报…

<数据结构>NO4.带头双向循环链表

文章目录 前言1. 头文件2. 函数实现1&#xff09;创建哨兵位节点2&#xff09;新增一个节点3&#xff09;打印链表4&#xff09;头插5&#xff09;尾插6&#xff09;头删7&#xff09;尾删8&#xff09;查找9&#xff09;pos前插入10&#xff09;删除pos处节点11&#xff09;销…

Redis 缓存穿透、缓存击穿与缓存雪崩

文章目录 1. 缓存穿透解决方法 2. 缓存击穿解决方法 3. 缓存雪崩解决方法 在 redis 的应用场景中&#xff0c;需要考虑缓存在某些场景下可能出现的问题&#xff1a; 缓存穿透 缓存击穿 缓存雪崩 以下缓存问题的讨论都是基于以下应用架构讨论的&#xff1a; 1. 缓存穿透 对应…

数据备份系列:Rsync 备份实战记录(二)

一、Rsync Cron 场景使用 在对数据备份要求实时性不高的情况下&#xff0c;可优先考虑该场景&#xff0c;选择一个合适的时间&#xff0c;对数据进行定时远程增量同步。 在《数据备份系列&#xff1a;Rsync 备份详解&#xff08;一&#xff09;》中我们已经对服务搭建以及远程…

DAD-DAS模型

DAD-DAS模型 文章目录 DAD-DAS模型[toc]1 产品服务:需求方程2 实际利率:费雪方程3 通货膨胀:菲利普斯方程4 预期通货膨胀&#xff1a;适应性预期5 货币政策规则&#xff1a;泰勒方程6 动态总供给-总需求方程&#xff08;DAS-DAD&#xff09;7 总供给冲击模拟 1 产品服务:需求方…

【JavaEE初阶】文件操作——IO

摄影分享~ 文章目录 文件文件路径&#xff08;Path&#xff09; 文件的类型Java中操作文件File概述 文件内容的读写——数据流字节流InputStream概述OutputStream 概述字符流FileInputStream 概述利用 Scanner 进行字符读取 实例练习 文件 文件&#xff1a;File这个概念&…