微服务-实用篇
- 一、微服务治理
- 1.微服务远程调用
- 2.Eureka注册中心
- Eureka的作用:
- 搭建EurekaServer服务
- Client服务注册
- 服务发现
- Ribbon负载均衡策略配置
- Ribbon配置饥饿加载
- 3.nacos注册中心
- 使用nacos注册中心服务
- nacos区域负载均衡
- nacos环境隔离-namespace
- Nacos和Eureka的对比
- nacos配置管理
- 配置管理步骤
- 配置热更新
- 多环境配置共享
- 4.http客户端Feign
- Feigin的使用步骤
- Feign的日志配置
- Feign的性能优化
- 5.统一网关Gateway
- 作用
- 搭建网关服务
- 路由的过滤器配置
- 全局过滤器
- 过滤器链执行顺序
- 二、异步通信
- 1.什么是AMQP?
- 2.部署RabbitMQ
- 3.SpingAMQP如何发送消息
- 4.SpingAMQP如何接收消息
- 5.WorkQueue模型
- 6.交换机
- 7.FanoutExchange
- 8.DirectExchange
- 9.TopicExchange
- 10.消息转换器
- 三、分布式搜索
- 1.初识elasticsearch
- 2.安装elastic
- 3.索引库的操作
- 4.文档操作
- 5.RestClient
- RestClient的初步使用
- RestClient增删改查
- 6.elasticsearch搜索功能
- DSL查询语法
- 7.RestClient查询文档
- 8.聚合
- 9.RestClient实现聚合
- 10.自动补全
- 11.RestClient实现自动补全
- 12.数据同步
- 13.ES集群
- 13.ES集群
一、微服务治理
1.微服务远程调用
-
springCloud提供了RestTemplate,可以发起远程http协议的调用
-
使用
-
注入bean
@Bean public RestTemplate restTemplate() { return new RestTemplate(); }
-
restTemplate使用
@RestController @RequestMapping("order") public class OrderController { @Autowired private OrderService orderService; @Autowired private RestTemplate restTemplate; private static final String UserBaseApiURL = "http://localhost:8081/user/"; @GetMapping("{orderId}") public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) { // 根据id查询订单并返回 Order order = orderService.queryOrderById(orderId); // 远程调用查询用户 User user = restTemplate.getForObject(UserBaseApiURL + order.getUserId(), User.class); order.setUser(user); return order; } }
-
2.Eureka注册中心
-
Eureka的作用:
-
消费者该如何获取服务提供者具体信息?
- 服务提供者启动时向eureka注册自己的信息
- eureka保存这些信息
- 消费者根据服务名称向eureka拉取提供者信息
-
如果有多个服务提供者,消费者该如何选择?
- 服务消费者利用负载均衡算法,从服务列表中挑选一个
-
消费者如何感知服务提供者的健康状态?
- 服务者会每隔30秒向EurekaServer发送心跳请求,报告健康状态
- eureka会更新记录服务列表信息,心跳不正常会被剔除
- 消费者就可以拉取到最新的信息
-
-
搭建EurekaServer服务
-
创建项目引入依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency>
-
编写application.yml配置文件
server: port: 10086 spring: application: name: eureka-server # 服务名 eureka: client: service-url: # 服务地址 defaultZone: http://127.0.0.1:10086/eureka/
-
在启动类上开启自动装配
@EnableEurekaServer // 开启eureka自动装配 @SpringBootApplication public class EurekaApplication { public static void main(String[] args) { SpringApplication.run(EurekaApplication.class , args); } }
-
-
Client服务注册
-
引入eureka-client依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
-
在application.yml中配置eureka地址
spring: datasource: url: jdbc:mysql://localhost:3306/cloud_order?useSSL=false username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver application: name: orderservice # 服务名 eureka: client: service-url: # 服务地址 defaultZone: http://127.0.0.1:10086/eureka/
-
-
服务发现
-
给RestTemplate添加@LoadBalanced注解
@Bean @LoadBalanced public RestTemplate restTemplate() { return new RestTemplate(); }
-
用服务提供者的服务名称远程调用
@RestController @RequestMapping("order") public class OrderController { @Autowired private OrderService orderService; @Autowired private RestTemplate restTemplate; private static final String UserBaseApiURL = "http://userservice/user/"; @GetMapping("{orderId}") public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) { // 根据id查询订单并返回 Order order = orderService.queryOrderById(orderId); // 远程调用查询用户 User user = restTemplate.getForObject(UserBaseApiURL + order.getUserId(), User.class); order.setUser(user); return order; } }
-
-
Ribbon负载均衡策略配置
-
-
代码方式,注入IRule实例bean
@Bean public IRule randomIRule() { return new RandomRule(); }
-
-
2.配置文件方式: 在application.yml文件中添加配置项
userservice: ribbon: NFLoadBalancerRuleClassName: com.netfix.loadbalancer.RandomRule
-
-
Ribbon配置饥饿加载
-
ribbon默认采用懒加载,即第一次访问时才会去创建LoadBalanceClient,请求时间会很长
-
在application.yml中开启饥饿加载
ribbon: eager-load: enabled: true # 开启饥饿加载 clients: - userservice # 指定对userservice这个服务饥饿加载
-
3.nacos注册中心
-
使用nacos注册中心服务
-
引入依赖
<!-- nacos的管理依赖 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.2.5.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <!-- nacos客户端依赖包 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency>
-
修改application.yml文件nacos配置
spring: application: name: orderservice # 服务名 cloud: nacos: server-addr: localhost:8848 # nacos端口号
-
-
nacos区域负载均衡
-
nacos默认负载均衡策略为服务轮询,一个服务往往会在多地部署多个实例,相同区域内的服务之间相互调用时长消耗更短,因此区域内优先调用比较合理,nacos提供了这样的负载均衡策略,不过区域内的策略为随机策略,当区域内没有可用服务时再访问其他区域的可用服务。
-
application.yml配置区域
spring: application: name: userservice # 服务名 cloud: nacos: server-addr: localhost:8848 discovery: cluster-name: HZ # 集群名称
-
为Ribbon配置区域优先策略
userservice: ribbon: NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule
-
-
nacos环境隔离-namespace
-
不同的时期有不同的环境,比如开发时需要开发环境,namespace就可以进行环境隔离,不同环境之间的服务无法调用。
-
application.yaml 配置namespace
spring: application: name: orderservice # 服务名 cloud: nacos: server-addr: localhost:8848 discovery: cluster-name: HZ # 集群名称 namespace: fd3445b3-8e01-446a-91d5-03ab8b5a7205 # 环境id(从nacos控制台查看)
-
-
Nacos和Eureka的对比
-
共同点
- 都支持服务注册和服务拉取
- 都支持服务提供者心跳方式做健康检测
-
区别
-
Nacos支持服务端主动监测提供者状态:临时实例采用心跳模式,非临时实例采用主动健康监测。
spring: application: name: orderservice # 服务名 cloud: nacos: server-addr: localhost:8848 discovery: cluster-name: HZ # 集群名称 ephemeral: false # 非临时实例
-
临时实例心跳不正常会被剔除,非临时实例则不会被剔除。
-
Nacos支持服务列表变更的消息推送模式,服务列表更新及时。
-
Nacos集群默认采用AP方式,当集群中存在非临时实例时,采用CP模式;Eureka采用AP模式。
-
-
-
nacos配置管理
-
配置管理步骤
-
在控制台添加配置文件
-
添加依赖
<!-- nacos配置管理依赖 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency>
-
在bootstrap.yml文件中配置
spring: application: name: userservice profiles: active: dev # 开发环境 cloud: nacos: server-addr: localhost:8848 config: file-extension: yaml # 文件后缀
-
使用@Value注解注入
@Value("${pattern.format}") private String format;
-
-
配置热更新
-
通过@Value方式注入的配置属性需要在类上添加@RefreshScope注解即可实现配置热更新
@Slf4j @RestController @RequestMapping("/user") @RefreshScope // 配置热更新注解 public class UserController { @Autowired private UserService userService; // 配置属性注入 @Value("${pattern.format}") private String format; }
-
通过@ConfigurationProperties方式注入的属性自动热更新
@Component @ConfigurationProperties(prefix = "pattern") @Data public class PatternProperties { private String format; }
-
-
多环境配置共享
- 微服务会从nacos读取的配置文件:
- [服务名]-[spring.proflle.active].yaml
- [服务名].yaml,默认配置,多环境共享
- 优先级
- [服务名]-[环境].yaml > [服务名].yaml > 本地配置
- 微服务会从nacos读取的配置文件:
-
4.http客户端Feign
-
Feigin的使用步骤
-
引入依赖
<!-- fegin依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency>
-
启动类添加@EnableFeignClients注解
@MapperScan("cn.itcast.order.mapper") @SpringBootApplication @EnableFeignClients public class OrderApplication { public static void main(String[] args) { SpringApplication.run(OrderApplication.class, args); } }
-
编写FeignClient接口
@FeignClient("userservice") public interface UserClient { @GetMapping("user/{id}") User findById(@PathVariable("id") Long id); }
-
使用FeignClient中定义的方法代替RestTemplate
@RestController @RequestMapping("order") public class OrderController { @Autowired private OrderService orderService; @Autowired private UserClient userClient; @GetMapping("{orderId}") public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) { // 根据id查询订单并返回 Order order = orderService.queryOrderById(orderId); // 远程调用查询用户 User user = userClient.findById(order.getUserId()); order.setUser(user); return order; } }
-
-
Feign的日志配置
-
方式一是配置文件
feign: client: config: default: # 这里是default就是全局配置,如果是写服务名,则针对某个微服务的配置 logger-level: FULL # 日志级别
-
方式二是JAVA代码配置类
public class FeignClientConfiguration { @Bean public Logger.Level feignLogLevel(){ return Logger.Level.FULL; } }
@MapperScan("cn.itcast.order.mapper") @SpringBootApplication // 在启动类上添加的配置类属于全局配置 @EnableFeignClients(defaultConfiguration = FeignAutoConfiguration.class) public class OrderApplication { public static void main(String[] args) { SpringApplication.run(OrderApplication.class, args); } }
// 如果需要对某一服务进行配置在服务接口上添加即可 @FeignClient(value = "userservice" , configuration = FeignClientConfiguration.class) public interface UserClient { @GetMapping("user/{id}") User findById(@PathVariable("id") Long id); }
-
-
Feign的性能优化
-
引入依赖
<dependency> <groupId>io.github.openfeign</groupId> <artifactId>feign-httpclient</artifactId> </dependency>
-
配置连接池
feign: client: config: default: # 这里是default就是全局配置,如果是写服务名,则针对某个微服务的配置 logger-level: FULL # 日志级别 httpclient: enabled: true # 开启feign对HttpClient的支持 max-connections: 200 # 最大连接数 max-connections-per-route: 50 # 每个路径的最大连接数
-
5.统一网关Gateway
-
作用
- 对用户请求做身份验证,权限认证
- 将用户请求路由到微服务,并实现负载均衡
- 将用户请求做限流
-
搭建网关服务
-
创建新的module,引入SpringCloudGateWay的依赖和服务发现依赖
<dependencies> <!-- nacos服务发现依赖 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <!-- 网关gateway依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> </dependencies>
-
编写路由配置及nacos地址
server: port: 10010 spring: application: name: gateway cloud: nacos: server-addr: localhost:80 # nacos地址 gateway: routes: # 网关路由配置 - id: user-service # 路由id 自定义 uri: lb://userservice # 路由的目标地址 lb是负载均衡 predicates: # 路由断言 - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求 - id: order-service uri: lb://orderservice predicates: # 路由断言 - Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求
-
-
路由的过滤器配置
-
为某个服务添加过滤器
spring: application: name: gateway cloud: nacos: server-addr: localhost:8848 # nacos地址 gateway: routes: # 网关路由配置 - id: user-service # 路由id 自定义 uri: lb://userservice # 路由的目标地址 lb是负载均衡 predicates: # 路由断言 - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求 - id: order-service uri: lb://orderservice predicates: # 路由断言 - Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求 filters: - AddRequestHeader=color, blue # 局部过滤器
-
添加默认过滤器(全局)
spring: application: name: gateway cloud: nacos: server-addr: localhost:8848 # nacos地址 gateway: routes: # 网关路由配置 - id: user-service # 路由id 自定义 uri: lb://userservice # 路由的目标地址 lb是负载均衡 predicates: # 路由断言 - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求 - id: order-service uri: lb://orderservice predicates: # 路由断言 - Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求 default-filters: - AddRequestHeader=color, blue # 全局过滤器
-
-
全局过滤器
@Order(1) // 过滤器等级 越低优先值越高 @Component public class AuthorizeFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); HttpHeaders headers = request.getHeaders(); String token = headers.getFirst("token"); if (token != null && token.equals("abc")) { return chain.filter(exchange); } exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED); return exchange.getResponse().setComplete(); } }
-
过滤器链执行顺序
- 每一个过滤器都必须指定一个int类型的order值,order值越小,优先级越高,执行顺序越靠前。
- GlobalFilter通过实现Ordered接口,或者添加@Order注解来指定Order值,由我们自己指定
- 路由过滤器和defaultFilter的order由Spring指定,默认是按照声明顺序从1递增
- 当过滤器的order值一样时,会按照default>路由过滤器>GlobalFilter的循序执行
二、异步通信
1.什么是AMQP?
- 应用层消息通信的一种协议,与语言和平台无关
2.部署RabbitMQ
RabbitMQ部署指南.md
3.SpingAMQP如何发送消息
-
引入AMQP的Starter依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
配置RabbitMQ地址
spring: rabbitmq: addresses: 192.168.88.101 # 地址名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast password: 123321
-
新建测试类
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { String queueName = "simple.queue"; String message = "hello , Spring amqp!"; rabbitTemplate.convertAndSend(queueName , message); } }
4.SpingAMQP如何接收消息
-
配置地址
spring: rabbitmq: addresses: 192.168.88.101 # 地址名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast password: 123321
-
新建类
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") // 在启动前要确保该队列存在! public void listenSimpleQueue(String msg) { System.out.println("消费者1接收到消息 = " + msg); } }
5.WorkQueue模型
-
多个模型绑定到一个队列,用一个消息会被一个消费者处理
-
通过设置prefetch来控制消费者预取的消息数量
logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: addresses: 192.168.88.101 # 地址名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast password: 123321 listener: simple: prefetch: 1 # 修改消费者提前把握的最大数量
-
示例:
-
设置发布50条消息
@Test public void testSimpleWorkQueue() { String queueName = "simple.queue"; for (int i = 0; i < 50; i++) { String message = "hello , Spring amqp! - " + (i + 1); rabbitTemplate.convertAndSend(queueName , message); } }
-
两个接收者
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg) { System.out.println("消费者1接收到消息 = " + msg); } @RabbitListener(queues = "simple.queue") public void listenSimpleQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到消息 = " + msg); Thread.sleep(500); } }
-
6.交换机
- 交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExChange的会将消息路由到每个绑定的队列
7.FanoutExchange
-
特点:
- 会将交换机接收的消息转发给绑定的所有队列,所有与之监听的消费者全部都会收到消息
-
编写配置类
@Configuration public class FanoutConfig { // 声明FanoutExchange交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("itcast.fanout"); } // 声明第一个队列 @Bean public Queue queue1() { return new Queue("fanout.queue1"); } // 绑定队列1和交换机 @Bean public Binding bindingQueue1(Queue queue1 , FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue1).to(fanoutExchange); } // 声明第二个队列 @Bean public Queue queue2() { return new Queue("fanout.queue2"); } // 绑定队列2和交换机 @Bean public Binding bindingQueue2(Queue queue2 , FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue2).to(fanoutExchange); } }
-
编写消费者代码
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到消息 = " + msg); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到消息 = " + msg); }
8.DirectExchange
-
特点:
- 该交换机需要设置key值,当该交换机收到消息时,交换机会转发给指定key值的队列。存在多个相同的key值时,则群发
-
案例
-
编写接收者,绑定交换机(注解方式)
@RabbitListener(bindings = @QueueBinding( value = @Queue( name = "direct.queue1"), exchange = @Exchange( name = "itcast.direct" , type = ExchangeTypes.DIRECT), key = {"red" , "blue" } )) public void listenDirectQueue1(String msg) { System.out.println("消费者1接收到消息 = " + msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue( name = "direct.queue2"), exchange = @Exchange( name = "itcast.direct" , type = ExchangeTypes.DIRECT), key = {"red" , "pink" } )) public void listenDirectQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到消息 = " + msg); }
-
编写发布者
@Test public void testDirectQueue() { String queueName = "simple.queue"; String message = "hello , Spring amqp!"; // 交换机name String exchangeName = "itcast.direct"; rabbitTemplate.convertAndSend(exchangeName , "red" , message); }
-
9.TopicExchange
-
特点
- TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
- Queue与Exchange指定BindingKey时可以使用通配符:
- #:代指0个或多个单词
- *:代指一个单词
-
示例:
-
编写接收者
@RabbitListener(bindings = @QueueBinding( value = @Queue( name = "topic.queue1"), exchange = @Exchange( name = "itcast.topic" , type = ExchangeTypes.TOPIC), key = {"china.*" } )) public void listenTopicQueue1(String msg) { System.out.println("消费者1接收到消息 = " + msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue( name = "topic.queue2"), exchange = @Exchange( name = "itcast.topic" , type = ExchangeTypes.TOPIC), key = {"china.*" } )) public void listenTopicQueue2(String msg) { System.err.println("消费者2接收到消息 = " + msg); }
-
编写发送者
@Test public void testTopicQueue() { String queueName = "simple.queue"; String message = "hello , Spring amqp!"; // 交换机name String exchangeName = "itcast.topic"; rabbitTemplate.convertAndSend(exchangeName , "china.news" , message); }
-
10.消息转换器
-
RabbieMQ是可以传递java对象的,通过MessageConverter实现,但是默认是JDk的序列化,最好为其配置JSON对象转化器,注意发布与接收方必须使用相同的MessageConverter
-
案例
-
消息发布方
-
引入JackSon依赖
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.16.1</version> </dependency>
-
注入Bean
@Bean // 注入json消息转化器 public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
-
发布消息
@Test public void ObjectQueue() { String queueName = "object.queue"; HashMap<String, String> hashMap = new HashMap<>(); hashMap.put("name" , "小强"); hashMap.put("age" , "18"); rabbitTemplate.convertAndSend(queueName , hashMap); }
-
-
消息接收方
-
引入JackSon依赖
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.16.1</version> </dependency>
-
注入Bean
@Bean // 注入json消息转化器 public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
-
定义队列
@Bean public Queue ObjectQueue() { return new Queue("object.queue"); }
-
接收消息
@RabbitListener(queues = "object.queue") public void listenObjectQueue2(HashMap<String , Object> msg) { System.err.println("消费者接收到消息 = " + msg); }
-
-
三、分布式搜索
1.初识elasticsearch
-
什么是elasticsearch?
- 一个开源的分布式搜索引擎,可以用来实现搜索、日志统计、分析、系统监控等功能
-
什么是elastic stack (ELK)?
- 是指以elasticsearch为核心的技术栈,包括beats、logstash、kibana、elasticsearch
2.安装elastic
安装elasticsearch.md
3.索引库的操作
-
mapping属性
- type:字段数据类型,常见的简单类型有:
- 字符串:text(可分词文本)、keyword(精确值,例如:品牌、国家)
- 数值:long、integer、short、byte、double、float
- 布尔:boolean
- 日期:date
- 对象:object
- index:是否创建索引,默认为true
- analyzer:使用哪种分词器
- properties:该字段的子字段
- type:字段数据类型,常见的简单类型有:
-
索引库操作
-
创建索引库
PUT /索引库名
# 创建索引库 PUT /heima { "mappings": { "properties": { "info": { "type": "text", "analyzer": "ik_smart" }, "email" : { "type": "keyword", "index": false }, "name" : { "properties": { "firstName" : { "type" : "keyword", "index" : false }, "lastName" : { "type" : "keyword", "index" : false } } } } } }
-
查看索引库
GET /索引库名
# 查询索引库 GET /heima
-
删除索引库
DELETE /索引库名
# 删除索引库 DELETE /黑马
-
添加字段
PUT /索引库名/_mapping
# 添加新字段 PUT /heima/_mapping { "properties": { "color" : { "type" : "keyword", "index" : false } } }
-
4.文档操作
-
添加文档
-
模板
POST /索引库名/_doc/文档id { "字段1" : "值1", "字段2" : "值2", "字段3" : { "子属性1" : "值3", "子属性2" : "值4", } }
-
示例
# 添加文档 POST /heima/_doc/1 { "info" : "尚硅谷,让天下没有学完的技术!", "email" : "1482939313@qq.com", "name" : { "firstName" : "尚", "lastName" : "硅谷" } }
-
-
查看文档
-
模板
GET /索引库名/_doc/文档id
-
示例
GET /heima/_doc/1
-
-
删除文档
-
模板
DELETE /索引库名/_doc/文档id
-
示例
DELETE /heima/_doc/1
-
-
修改文档
-
方式一:全量修改
-
特点
id存在则修改,不存在则创建
-
模板
PUT /索引库名/_doc/文档id { "字段1" : "值1", "字段2" : "值2", }
-
-
方式二:局部修改
-
模板
POST /索引库名/_update/文档id { "doc" : { "字段1" : "值1", "字段2" : "值2", } }
-
-
5.RestClient
-
RestClient的初步使用
-
引入依赖
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.12.1</version> </dependency>
<!-- 注意版本控制 --> <properties> <java.version>1.8</java.version> <elasticsearch.version>7.12.1</elasticsearch.version> </properties>
-
初始化RestHighLevelClient
package cn.itcast.hotel; @SpringBootTest class HotelDemoApplicationTests { private RestHighLevelClient client; @BeforeEach void setUp() { this.client = new RestHighLevelClient(RestClient.builder( HttpHost.create("http://192.168.88.101:9200") )); } @AfterEach void tearDown() throws IOException { this.client.close(); } @Test void contextLoads() { System.out.println(this.client); } }
-
-
RestClient增删改查
-
操作索引库
-
创建索引库
// 新建索引 @Test public void addIndex() throws IOException { // 创建Request对象 CreateIndexRequest request = new CreateIndexRequest("hotel"); // 准备请求参数 RestClientConstant.HOTELTEMPLATE 为新建索引的json结构 request.source(RestClientConstant.HOTELTEMPLATE , XContentType.JSON); // 发送请求 client.indices().create(request , RequestOptions.DEFAULT); }
-
删除和判断索引库
// 删除索引 @Test public void deleteIndex() throws IOException { // 创建Request对象 DeleteIndexRequest request = new DeleteIndexRequest("hotel"); // 发送请求 client.indices().delete(request , RequestOptions.DEFAULT); } // 判断索引库是否存在 @Test public void existsIndex() throws IOException { // 创建Request对象 GetIndexRequest request = new GetIndexRequest("hotel"); // 发送请求 boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); System.out.println( exists ? "索引库存在" : "索引库不存在"); }
-
-
操作文档
-
新增文档
// 新增文档 @Test public void addDocument() throws IOException { // 获取信息 Hotel hotel = hotelService.getById(36934L); HotelDoc hotelDoc = new HotelDoc(hotel); // 创建Request对象 并设置id IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString()); // 设置请求 request.source(JSON.toJSONString(hotelDoc) , XContentType.JSON); // 发送请求 client.index(request , RequestOptions.DEFAULT); }
-
查询文档
// 新增文档 @Test public void findDocument() throws IOException { // 创建Request对象 并设置id GetRequest request = new GetRequest("hotel").id("36934"); // 发送请求 GetResponse response = client.get(request, RequestOptions.DEFAULT); String json = response.getSourceAsString(); HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); System.out.println("hotelDoc = " + hotelDoc); }
-
更新文档
// 更新文档 @Test public void updateDocument() throws IOException { // 创建Request对象 并设置id UpdateRequest request = new UpdateRequest("hotel", "36934"); // 设置更新信息 每两个参数为一对 key,value request.doc( "city" , "美国" ); // 发送请求 client.update(request , RequestOptions.DEFAULT); }
-
删除文档
// 删除文档 @Test public void deleteDocument() throws IOException { // 创建Request对象 并设置id DeleteRequest req = new DeleteRequest("hotel", "36934"); // 发送请求 client.delete(req , RequestOptions.DEFAULT); }
-
批量操作
// 批量操作 @Test public void bulkDocument() throws IOException { // 获取数据 List<Hotel> hotelList = hotelService.list(); // 创建Request对象 BulkRequest bulkRequest = new BulkRequest(); // 写入 hotelList.stream().forEach( item -> { HotelDoc hotelDoc = new HotelDoc(item); bulkRequest.add(new IndexRequest("hotel") .id(item.getId().toString()).source(JSON.toJSONString(hotelDoc) , XContentType.JSON)); } ); // 发送请求 client.bulk(bulkRequest , RequestOptions.DEFAULT); }
-
-
6.elasticsearch搜索功能
-
DSL查询语法
-
基本语法
GET /indexName/_search { "query" : { "查询类型" : { "查询条件" : "条件值" } } }
-
查询所有
GET /hotel/_search { "query" : { “match_all” : { } } }
-
全文检索查询
-
会对查询text进行分词,查询倒排索引
-
match和multi_match的区别:
match查询单字段,multi_match根据多个字段查询,参与查询字段越多,查询性能越差
# match查询 GET /hotel/_search { "query": { "match": { "all": "上海如家" } } } # multi_match查询 GET /hotel/_search { "query": { "multi_match": { "query": "上海如家", "fields": ["brand" , "name" , "business"] } } }
-
-
精确查询
# term查询 GET /hotel/_search { "query": { "term": { "brand": { "value": "如家" } J } } } # range查询 范围 GET /hotel/_search { "query": { "range": { "price": { "gte": 400, "lte": 500 } } } }
-
地理查询
-
geo_bounding_box:查询geo_point值落在某个矩形范围的所有文档
# geo_bounding_box 边界框查询 GET /hotel/_search { "query": { "geo_bounding_box" : { "location" : { "top_left" : { "lat" : 31.1, "lon" : 121.5 }, "bottom_right" : { "lat" : 30.9, "lon" : 121.7 } } } } }
-
geo_distance:查询到指定中心点小于某个距离值的所有文档
# geo_distance 距离中心点查询 GET /hotel/_search { "query": { "geo_distance" : { "distance" : "15km", "location" : "31.21,121.5" } } }
-
-
复合查询
-
function score query 可以修改文档的相关性算分(query socre),根据新得到的算分排序。
-
function score query定义的三要素
- 过滤条件:哪些文档要加分
- 算分函数:如何计算function score
- 加权方式:funcation score 与 query score如何运算
# function sorce GET /hotel/_search { "query": { "function_score": { "query": { "match": { "all": "上海" } }, "functions": [ { "filter": { "term": { "brand": "万豪" } }, "weight": 10 } ], "boost_mode": "multiply" } } }
-
复合查询Boolean Query是一个或多个查询子句的组合。子查询的组合方式有:
- must:必须匹配每个子查询,类似“与”
- should:选择性匹配子查询,类似“或”
- must_not:必须不匹配,不参与算分,类似“非”
- filter:必须匹配,不参与算分
# 搜索名字包含“如家”,价格不高于400,在坐标31.21,121.5周围10km范围内的酒店 # 搜索名字包含“如家”,价格不高于400,在坐标31.21,121.5周围10km范围内的酒店 GET /hotel/_search { "query": { "bool": { "must": [ { "match": { "name": "如家" } } ], "must_not": [ { "range": { "price": { "gt": 400 } } } ], "filter": [ { "geo_distance": { "distance": "10km", "location": { "lat": 31.21, "lon": 121.5 } } } ] } } }
-
-
排序
-
elasticsearch默认根据相关度算分,也可以指定
-
简单类型
GET /hotel/_search { "query": { "match": { "all": "北京" } }, "sort": [ { "price": { "order": "desc" } } ] }
-
地理坐标
GET /hotel/_search { "query": { "match": { "all": "上海" } }, "sort": [ { "_geo_distance": { "location": "31.21,121.5", "order": "asc", "unit": "km" } } ] }
-
-
分页
-
elasticsearch默认只返回top10的数据
GET /hotel/_search { "query": { "match_all": {} }, "from": 10, // 分页开始页 "size": 10 // 分页数量 }
-
-
高亮
GET /hotel/_search { "query": { "match": { "all": "如家" } }, "highlight": { "fields": { "name": { "require_field_match": "false" } } } }
-
7.RestClient查询文档
@Test
public void query() throws IOException {
// 创建request对象
SearchRequest searchRequest = new SearchRequest("hotel");
// 构造DSL语句
// searchRequest.source().query(QueryBuilders.matchAllQuery());
searchRequest.source().query(QueryBuilders.matchQuery("all" , "上海"))
.sort("price" , SortOrder.ASC)
.highlighter(new HighlightBuilder().field("name").requireFieldMatch(false))
.from(0)
.size(50);
// 发送请求
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println("查询结果总条数为:" + response.getHits().getTotalHits() + "条");
System.out.println("高亮部分为:" + response.getHits().getHits()[0].getHighlightFields());
List<HotelDoc> hotelDocList = parseRestClintResponse(response, HotelDoc.class);
hotelDocList.stream().forEach(System.out :: println);
}
private <T> List<T> parseRestClintResponse(SearchResponse response , Class<T> clazz) {
SearchHits hits = response.getHits();
ArrayList<T> list = new ArrayList<>();
SearchHit[] hitsArray = hits.getHits();
Arrays.stream(hitsArray).forEach( item -> list.add(JSON.parseObject(item.getSourceAsString() , clazz)));
return list;
}
8.聚合
-
什么是聚合?
- 聚合可以实现对文档数据的统计、分析、运算
-
聚合的分类
- 桶聚合:用来对文档作分组
- TermAggregation:按照文档字段值分组
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
- 度量聚合:用以计算一些值,比如:最大值、最小值、avg等
- Avg:平均值
- Max:最大值
- Min:最小值
- Stats:同时求Avg、Max、Min、sum等
- 管道聚合:其他聚合的结果为基础做聚合
- 桶聚合:用来对文档作分组
-
示例:
-
DSL实现Bucket聚合
GET /hotel/_search { "query": { "range": { "price": { "lte": 500 } } }, "size" : 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 100, "order": { "_count": "asc" } } } } }
-
DSL实现Metrics
GET /hotel/_search { "size": 0, "aggs": { "brandAggs": { "terms": { "field": "brand", "size": 30, "order": { "score_aggs.avg": "desc" } }, "aggs": { "score_aggs": { "stats": { "field": "score" } } } } } }
-
9.RestClient实现聚合
/**
* 聚合查询
*/
@Test
public void queryAggregation() throws IOException {
SearchRequest request = new SearchRequest("hotel");
request.source().size(0);
request.source().aggregation(AggregationBuilders
.terms("brandAgg").field("brand").size(20).order(BucketOrder.aggregation("scoreStats.avg" , false))
.subAggregation(AggregationBuilders.stats("scoreStats")
.field("score")));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
Aggregations aggregations = response.getAggregations();
Terms brandAgg = aggregations.get("brandAgg");
List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
buckets.stream().forEach( item -> {
System.out.print(item.getKeyAsString());
Aggregations aggregations1 = item.getAggregations();
List<Aggregation> aggregations2 = aggregations1.asList();
Aggregation aggregation = aggregations2.get(0);
// 解析出aggregation内的内容
if (aggregation instanceof Stats) {
Stats stats = (Stats) aggregation;
double avgScore = stats.getAvg();
double minScore = stats.getMin();
double maxScore = stats.getMax();
long count = stats.getCount();
System.out.println("平均分:" + avgScore + ", 最低分:" + minScore + ", 最高分:" + maxScore + ", 总数:" + count);
}
}
);
}
10.自动补全
-
自定义分词器
// 酒店数据索引库 PUT /hotel { "settings": { "analysis": { "analyzer": { "text_anlyzer": { "tokenizer": "ik_max_word", "filter": "py" }, "completion_analyzer": { "tokenizer": "keyword", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "id":{ "type": "keyword" }, "name":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart", "copy_to": "all" }, "address":{ "type": "keyword", "index": false }, "price":{ "type": "integer" }, "score":{ "type": "integer" }, "brand":{ "type": "keyword", "copy_to": "all" }, "city":{ "type": "keyword" }, "starName":{ "type": "keyword" }, "business":{ "type": "keyword", "copy_to": "all" }, "location":{ "type": "geo_point" }, "pic":{ "type": "keyword", "index": false }, "all":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart" }, "suggestion":{ "type": "completion", "analyzer": "completion_analyzer" } } } }
// 自动补全查询 POST /hotel/_search { "suggest": { "suggestions": { "text": "sd", "completion": { "field": "suggestion", "skip_duplicates": true, "size": 10 } } } }
11.RestClient实现自动补全
/**
* 自动补全
*/
@Test
public void autoComplete() throws IOException {
SearchRequest request = new SearchRequest("hotel");
request.source()
.suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders
.completionSuggestion("suggestion")
.size(10)
.prefix("sd")
));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
Suggest suggest = response.getSuggest();
CompletionSuggestion suggestion = suggest.getSuggestion("suggestions");
List<CompletionSuggestion.Entry.Option> options = suggestion.getOptions();
options.stream().forEach(item -> {
System.out.println(item.getHit());
});
}
12.数据同步
-
数据同步的几种实现方式
- 同步调用:实现简单,粗暴、业务耦合度高
- 异步通知:低耦合,实现难度一般、依赖mp的可靠性
- 监听binlog:完全解除服务间耦合、开始binlog增加数据库负担、实现复杂度高
-
利用MQ实现mysql与elasticsearch数据同步
-
定义常量类
public class MqConstant { // 交换机 public static final String HOTEL_EXCHANGE_TOPIC = "hotel.exchange.topic"; // update队列 public static final String HOTEL_UPDATE_QUEUE = "hotel.update"; // delete队列 public static final String HOTEL_DELETE_QUEUE = "hotel.delete"; }
-
在消费者微服务中声明exchange、queue、RoutingKey(rabbitMQ懒加载,只有消费者在监听,不监听不会创建交换机队列等)
@Configuration public class MqConfig { // 交换机 @Bean public TopicExchange topicExchange() { return new TopicExchange(MqConstant.HOTEL_EXCHANGE_TOPIC , true , false); } // 更新队列 @Bean(name = "updateQueue") public Queue updateQueue(){ return new Queue(MqConstant.HOTEL_UPDATE_QUEUE , true); } // 删除队列 @Bean(name = "deleteQueue") public Queue deleteQueue(){ return new Queue(MqConstant.HOTEL_DELETE_QUEUE , true); } // 绑定队列交换机 @Bean public Binding bindingUpdateQueue(@Qualifier("updateQueue") Queue updateQueue , TopicExchange topicExchange){ return BindingBuilder.bind(updateQueue).to(topicExchange).with(MqConstant.HOTEL_UPDATE_QUEUE); } // 绑定队列交换机 @Bean public Binding bindingDeleteQueue(@Qualifier("deleteQueue") Queue deleteQueue , TopicExchange topicExchange){ return BindingBuilder.bind(deleteQueue).to(topicExchange).with(MqConstant.HOTEL_DELETE_QUEUE); } }
-
生产者发布
@RestController @RequestMapping("hotel") @CrossOrigin("*") public class HotelController { @Autowired private IHotelService hotelService; @Autowired private RabbitTemplate rabbitTemplate; @PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_UPDATE_QUEUE , hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_DELETE_QUEUE , id); } }
-
消费者监听
@Component public class HotelListener { @Autowired private IHotelService hotelService; // 更新监听 @RabbitListener(queues = MqConstant.HOTEL_UPDATE_QUEUE) public void updateListen(Long id) { System.out.println("更新es文档..."); hotelService.updateEs(id); } // 删除监听 @RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE) public void deleteListen(Long id) { System.out.println("删除es文档..."); hotelService.deleteEs(id); } }
-
13.ES集群
- 各节点的职责
- master eligible节点的作用是什么?
- 参与集群选主
- 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
- data节点的作用是什么?
- 数据CRUD
- coordinator节点的作用是什么?
- 路由请求到其他节点
- 合并查询到的结果,返回用户
- master eligible节点的作用是什么?
- 分布式新增如何确定分配分片?
- coordinating node根据id做hash运算,得到结果对shard数量取余,余数就是对应的分片
- 分布式查询
- 分散阶段:coordinator node将查询请求分发给不同分片
- 收集阶段:将查询结果汇总到coordinator node,整理并返回给用户
- ES集群的故障转移
- 集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这叫故障转移
nt.HOTEL_DELETE_QUEUE);
}
}
```
-
生产者发布
@RestController @RequestMapping("hotel") @CrossOrigin("*") public class HotelController { @Autowired private IHotelService hotelService; @Autowired private RabbitTemplate rabbitTemplate; @PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_UPDATE_QUEUE , hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_DELETE_QUEUE , id); } }
-
消费者监听
@Component public class HotelListener { @Autowired private IHotelService hotelService; // 更新监听 @RabbitListener(queues = MqConstant.HOTEL_UPDATE_QUEUE) public void updateListen(Long id) { System.out.println("更新es文档..."); hotelService.updateEs(id); } // 删除监听 @RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE) public void deleteListen(Long id) { System.out.println("删除es文档..."); hotelService.deleteEs(id); } }
13.ES集群
- 各节点的职责
- master eligible节点的作用是什么?
- 参与集群选主
- 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
- data节点的作用是什么?
- 数据CRUD
- coordinator节点的作用是什么?
- 路由请求到其他节点
- 合并查询到的结果,返回用户
- master eligible节点的作用是什么?
- 分布式新增如何确定分配分片?
- coordinating node根据id做hash运算,得到结果对shard数量取余,余数就是对应的分片
- 分布式查询
- 分散阶段:coordinator node将查询请求分发给不同分片
- 收集阶段:将查询结果汇总到coordinator node,整理并返回给用户
- ES集群的故障转移
- 集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这叫故障转移