认识微服务
微服务是一种经过良好架构设计的分布式架构方案,微服务的特征有:
- 单一职责:微服务拆分粒度小,每一个服务都对应唯一的业务能力,做到单一职责,避免重复开发
- 面向服务:微服务对外暴露业务接口
- 自治:团队独立,技术独立,数据独立,部署独立
- 隔离性强:微服务调用做好隔离,容错,降级,避免出现级联问题
各架构设计的特点
- 单体式架构
- 简单方便,高度耦合,扩展性差,适合小型项目。例如:学生管理系统
- 分布式架构特点
- 松耦合,扩展性好,但架构复杂,难度大。适合大型互联网项目。例如:京东,淘宝
- 微服务:一种良好的分布式架构方案
- 优点:拆分粒度更小,服务更独立、耦合度更低
- 缺点:架构非常复杂,运维、监控、部署难度提高
微服务技术对比
微服务中父pom文件
在这个文件中管理下面子工程的依赖版本号,然后子工程中引入依赖不需要写版本号
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.demo</groupId>
<artifactId>cloud-demo</artifactId>
<version>1.0</version>
<modules>
<module>user-service</module>
<module>order-service</module>
</modules>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR10</spring-cloud.version>
<mysql.version>5.1.47</mysql.version>
<mybatis.version>2.1.1</mybatis.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- springCloud -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- mysql驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!--mybatis-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
EureKa注册中心
使用场景
假如我们的服务提供者user-service部署了多个实例,如图:
大家思考几个问题:
- order-service在发起远程调用的时候,该如何得知user-service实例的ip地址和端口?
- 有多个user-service实例地址,order-service调用时该如何选择?
- order-service如何得知某个user-service实例是否依然健康,是不是已经宕机?
Eureka的结构和作用
这些问题都需要利用SpringCloud中的注册中心来解决,其中最广为人知的注册中心就是Eureka,其结构如下:
回答之前的各个问题。
问题1:order-service如何得知user-service实例地址?
获取地址信息的流程如下:
- user-service服务实例启动后,将自己的信息注册到eureka-server(Eureka服务端)。这个叫服务注册
- eureka-server保存服务名称到服务实例地址列表的映射关系
- order-service根据服务名称,拉取实例地址列表。这个叫服务发现或服务拉取
问题2:order-service如何从多个user-service实例中选择具体的实例?
- order-service从实例列表中利用负载均衡算法选中一个实例地址
- 向该实例地址发起远程调用
问题3:order-service如何得知某个user-service实例是否依然健康,是不是已经宕机?
- user-service会每隔一段时间(默认30秒)向eureka-server发起请求,报告自己状态,称为心跳
- 当超过一定时间没有发送心跳时,eureka-server会认为微服务实例故障,将该实例从服务列表中剔除
- order-service拉取服务时,就能将故障实例排除了
注意:一个微服务,既可以是服务提供者,又可以是服务消费者,因此eureka将服务注册、服务发现等功能统一封装到了eureka-client端
因此,接下来我们动手实践的步骤包括:
搭建eureka-server
创建eureka-serve服务
在父工程上右键新建 Module
选择 Maven 创建
输入服务名称,点击创建即可
引入eureka依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
添加启动类
@EnableEurekaServer
@SpringBootApplication
public class EurekaApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaApplication.class,args);
}
}
编写配置文件
server:
port: 10086
spring:
application:
name: eurekaserver
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:10086/eureka # Eureka 中台地址
访问页面
启动 EurekaApplication ,本地访问 127.0.0.1:10086
服务注册
我们将 user-server 注册到 eureka-server 中
添加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
添加配置
spring:
application:
name: orderserver #服务名必填
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:10086/eureka
启动多个实例
在服务名称上右键选择 Copy Configuration
修改新实例的端口号
现在 OrderServer 就会有两个实例
我们按照相同的方法吧 user-server 注册到 eureka-server 中
服务发现
目前我们打开 order-server 的 controller,调用获取订单的方法
可以看到只能获取到用户id,如果想要获取用户信息,则必须调用另外一个服务的接口
添加 RestTemplate 实例
在上下文添加 RestTemplate Bean,同时添加 @LoadBalanced 注解,这个注解的作用是开启负载均衡
@Bean
@LoadBalanced
public RestTemplate restTemplate(){
return new RestTemplate();
}
改写queryOrderById方法
调用 http://localhost:8080/order/101 接口查看返回
这样我们不必关心另外一个服务的端口号,直接将服务名替换成ip地址和端口号即可
Ribbon负载均衡
负载均衡原理
SpringCloud底层其实是利用了一个名为Ribbon的组件,来实现负载均衡功能的。
那么我们发出的请求明明是http://userservice/user/1,怎么变成了http://localhost:8081的呢?
请求图
负载均衡策略
负载均衡的规则都定义在IRule接口中,而IRule有很多不同的实现类:
不同规则的含义如下:
内置负载均衡规则类 | 规则描述 |
---|---|
RoundRobinRule | 简单轮询服务列表来选择服务器。它是Ribbon默认的负载均衡规则。 |
AvailabilityFilteringRule | 对以下两种服务器进行忽略: (1)在默认情况下,这台服务器如果3次连接失败,这台服务器就会被设置为“短路”状态。短路状态将持续30秒,如果再次连接失败,短路的持续时间就会几何级地增加。 (2)并发数过高的服务器。如果一个服务器的并发连接数过高,配置了AvailabilityFilteringRule规则的客户端也会将其忽略。并发连接数的上限,可以由客户端的..ActiveConnectionsLimit属性进行配置。 |
WeightedResponseTimeRule | 为每一个服务器赋予一个权重值。服务器响应时间越长,这个服务器的权重就越小。这个规则会随机选择服务器,这个权重值会影响服务器的选择。 |
ZoneAvoidanceRule | 以区域可用的服务器为基础进行服务器的选择。使用Zone对服务器进行分类,这个Zone可以理解为一个机房、一个机架等。而后再对Zone内的多个服务做轮询。 |
BestAvailableRule | 忽略那些短路的服务器,并选择并发数较低的服务器。 |
RandomRule | 随机选择一个可用的服务器。 |
RetryRule | 重试机制的选择逻辑 |
默认的实现就是ZoneAvoidanceRule,是一种轮询方案
自定义负载均衡
方式一
在 orderserver 启动类中添加 IRule 实例
@Bean
public IRule randomRule(){
return new RandomRule();
}
这种方式是全局生效的,从 orderserver 服务发出的请求都将采用随机策略来做负载均衡
方式二
userserver: # 给某个微服务配置负载均衡规则,这里是userserver服务
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule # 负载均衡规则
这种方式只会让 userserver 的服务使用修改后的负载均衡策略
一般用默认的负载均衡规则,不做修改
饥饿加载
Ribbon默认是采用懒加载,即第一次访问时才会去创建LoadBalanceClient,请求时间会很长。
而饥饿加载则会在项目启动时创建,降低第一次访问的耗时,通过下面配置开启饥饿加载:
ribbon:
eager-load:
enabled: true #开启饥饿加载
clients: #指定那些服务生效,接收一个list,只有一个时可以直接写在后面,多个时使用 - xxxServer
- userserver
Nacso注册中心
安装并启动
Nacos是阿里巴巴的产品,现在是SpringCloud中的一个组件。相比Eureka功能更加丰富,在国内受欢迎程度较高。
从官网下载并解压到非中文目录即可
启动方法:打开 bin 目录双击运行
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uaRHlP7W-1687829496139)(C:/Users/yikonsh/AppData/Roaming/Typora/typora-user-images/image-20230317155057222.png)]
运行地址:http://192.168.60.74:8848/nacos/index.html
默认账号密码都是:nacos
服务注册
在父工程添加依赖
<!--nacos-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
子工程添加依赖,同时注释掉 eureka
<!-- nacos客户端依赖包 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
修改配置文件
spring:
application:
name: orderserver #服务名必填
cloud:
nacos:
server-addr: localhost:8848
# 注释掉原来的eureka的配置
#eureka:
# client:
# service-url:
# defaultZone: http://127.0.0.1:10086/eureka
查看 nacos 服务列表
服务多级存储模型
一个服务可以有多个实例,例如我们的user-service,可以有:
- 127.0.0.1:8081
- 127.0.0.1:8082
- 127.0.0.1:8083
假如这些实例分布于全国各地的不同机房,例如:
- 127.0.0.1:8081,在上海机房
- 127.0.0.1:8082,在上海机房
- 127.0.0.1:8083,在杭州机房
Nacos就将同一机房内的实例 划分为一个集群。
也就是说,user-service是服务,一个服务可以包含多个集群,如杭州、上海,每个集群下可以有多个实例,形成分级模型,如图:
设置集群
修改配置项,添加 cluster-name 属性
spring:
application:
name: userserver
cloud: # nacos服务注册
nacos:
server-addr: localhost:8848
discovery:
cluster-name: HZ # 集群名称 HZ代指杭州
然后再复制一个实例配置,设置一个新的实例名为 SH
-Dserver.port=9092 -Dspring.cloud.nacos.discovery.cluster-name=SH
当前启动的实例如下
刷新 nacos 面板
NacosRule负载均衡
首先也要给 orderserve 添加一个 HZ 集群
spring:
application:
name: orderserver #服务名必填
cloud:
nacos:
server-addr: localhost:8848
discovery:
cluster-name: HZ #设置集群名称HZ
设置负载均衡规则
userserver: # 给某个微服务配置负载均衡规则,这里是userserver服务
ribbon:
NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule # 负载均衡规则
这样设置完成后将会优先访问同集群内的服务,如果同集群中的服务失效,则会访问其他集群中的服务,同时也会触发一条警告信息,提示当前同集群的服务出现故障
权重配置
实际部署中会出现这样的场景:
服务器设备性能有差异,部分实例所在机器性能较好,另一些较差,我们希望性能好的机器承担更多的用户请求。
但默认情况下NacosRule是同集群内随机挑选,不会考虑机器的性能问题。
因此,Nacos提供了权重配置来控制访问频率,权重越大则访问频率越高。
在nacos控制台,找到 userserver 的实例列表,点击编辑,即可修改权重:
注意:如果权重修改为0,则该实例永远不会被访问
环境隔离
只有在相同环境下的服务才能被互相访问到
新建环境
给服务设置所属空间,添加 namespace 属性,值填写生成的id
spring:
application:
name: userserver
cloud: # nacos服务注册
nacos:
server-addr: localhost:8848
discovery:
cluster-name: HZ # 集群名称 HZ代指杭州
namespace: 34024135-04a1-4cec-a909-08bf1d76c362 # 命名空间id
然后将服务重启,查看nacos面板
默认的 public 空间就只剩下 orderserver 实例
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2ewceARG-1687829496152)(C:\Users\25019\AppData\Roaming\Typora\typora-user-images\image-20230318164427536.png)]
dev空间只有 userserver 实例,两个空间中的实例不能互相调用
Nacos与Eureka的区别
Nacos的服务实例分为两种l类型:
-
临时实例:如果实例宕机超过一定时间,会从服务列表剔除,默认的类型。
-
非临时实例:如果实例宕机,不会从服务列表剔除,也可以叫永久实例。
配置一个服务实例为永久实例:
spring:
cloud:
nacos:
discovery:
ephemeral: false # 设置为非临时实例
Nacos和Eureka整体结构类似,服务注册、服务拉取、心跳等待,但是也存在一些差异:
-
Nacos与eureka的共同点
- 都支持服务注册和服务拉取
- 都支持服务提供者心跳方式做健康检测
-
Nacos与Eureka的区别
- Nacos支持服务端主动检测提供者状态:临时实例采用心跳模式,非临时实例采用主动检测模式
- 临时实例心跳不正常会被剔除,非临时实例则不会被剔除
- Nacos支持服务列表变更的消息推送模式,服务列表更新更及时
- Nacos集群默认采用AP方式,当集群中存在非临时实例时,采用CP模式;Eureka采用AP方式
Nacos实现配置管理
统一配置管理
添加配置信息
首先点击配置列表
引入nacos配置依赖
这里提前在父工程设置的依赖版本
<!--nacos配置管理依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>${nacos.config.version}</version>
</dependency>
添加 bootstrap.yml
bootstrap.yml 配置文件的读取时机要比 application.yml 文件提前,所以在这个配置文件中设置 nacos 服务地址和要读取的配置信息
spring:
application:
name: userserver #服务名
profiles:
active: dev # 环境名
cloud:
nacos:
server-addr: localhost:8848 # 服务地址
config:
file-extension: yaml # 文件后缀名
记得要把 application.yml 配置文件中的重复配置删除掉,以及命名空间去掉
测试是否读取到
添加一个方法,使用 nacos 中的配置来完成日期格式化功能
通过 @Value 注解读取配置文件中的信息,然后实现日期格式化
@Value("${pattern.dateformat}")
String dateformat;
@GetMapping("now")
public String now(){
return LocalDateTime.now().format(DateTimeFormatter.ofPattern(dateformat));
}
访问:http://localhost:9091/user/now 接口查看
然后再访问:9090端口同样可以
实现配置热更新
方式一
通过 @RefreshScope 注解的方式来实现热更新,在通过 @Value 注解的所在类上添加这个注解来实现
方式二
使用 @ConfigurationProperties 注解代替 @Value 注解。
新建配置类,添加 @ConfigurationProperties(prefix = “pattern”) 注解,表示读取以 pattern 开头的配置文件,自动读取配置并且给相同名字的属性赋值
package cn.itcast.user.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author songzx
* @date 2023/3/18
* @apiNote
*/
@Data
@Component
@ConfigurationProperties(prefix = "pattern")
public class PatternProperties {
String dateformat;
}
然后再代码中通过 PatternProperties 类获取配置文件中的值
@Autowired
PatternProperties patternProperties;
@GetMapping("now")
public String now(){
return LocalDateTime.now().format(DateTimeFormatter.ofPattern(patternProperties.getDateformat()));
}
多环境配置共享
nacos中新建配置文件
当我们有多个环境时,比如有:dev、test、uat等环境都需要共同使用相同的配置时,我们可以来新建一个多环境共享的配置,配置方法为在 nacos 配置管理中添加一个 [服务名].yaml 的配置文件,不需要携带环境名
dev环境读取配置
在属性配置类中添加 name 属性
新增接口,返回当前的配置文件
@GetMapping("prop")
public PatternProperties patternProperties(){
return patternProperties;
}
查看返回
新增test环境并读取配置
在服务名称上右键选择编辑配置
设置环境为 test 环境
启动服务查看接口返回的配置值
多环境配置共享优先级
我们在本地配置文件中也添加 name 属性
然后再 nacos 中的 dev 环境配置文件也添加 name 属性
然后重启服务,分别访问 test 环境和 dev 环境查看返回
由此得出如下优先级
Nacos集群搭建
1.集群结构图
官方给出的Nacos集群图:
其中包含3个nacos节点,然后一个负载均衡器代理3个Nacos。这里负载均衡器可以使用nginx。
我们计划的集群结构:
三个nacos节点的地址:
节点 | ip | port |
---|---|---|
nacos1 | 192.168.150.1 | 8845 |
nacos2 | 192.168.150.1 | 8846 |
nacos3 | 192.168.150.1 | 8847 |
2.搭建集群
搭建集群的基本步骤:
- 搭建数据库,初始化数据库表结构
- 下载nacos安装包
- 配置nacos
- 启动nacos集群
- nginx反向代理
2.1.初始化数据库
Nacos默认数据存储在内嵌数据库Derby中,不属于生产可用的数据库。
官方推荐的最佳实践是使用带有主从的高可用数据库集群,主从模式的高可用数据库可以参考传智教育的后续高手课程。
这里我们以单点的数据库为例来讲解。
首先新建一个数据库,命名为,而后导入下面的SQL:
CREATE TABLE `config_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(255) DEFAULT NULL,
`content` longtext NOT NULL COMMENT 'content',
`md5` varchar(32) DEFAULT NULL COMMENT 'md5',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
`src_user` text COMMENT 'source user',
`src_ip` varchar(50) DEFAULT NULL COMMENT 'source ip',
`app_name` varchar(128) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
`c_desc` varchar(256) DEFAULT NULL,
`c_use` varchar(64) DEFAULT NULL,
`effect` varchar(64) DEFAULT NULL,
`type` varchar(64) DEFAULT NULL,
`c_schema` text,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfo_datagrouptenant` (`data_id`,`group_id`,`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_info_aggr */
/******************************************/
CREATE TABLE `config_info_aggr` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(255) NOT NULL COMMENT 'group_id',
`datum_id` varchar(255) NOT NULL COMMENT 'datum_id',
`content` longtext NOT NULL COMMENT '内容',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`app_name` varchar(128) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfoaggr_datagrouptenantdatum` (`data_id`,`group_id`,`tenant_id`,`datum_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='增加租户字段';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_info_beta */
/******************************************/
CREATE TABLE `config_info_beta` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(128) NOT NULL COMMENT 'group_id',
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL COMMENT 'content',
`beta_ips` varchar(1024) DEFAULT NULL COMMENT 'betaIps',
`md5` varchar(32) DEFAULT NULL COMMENT 'md5',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
`src_user` text COMMENT 'source user',
`src_ip` varchar(50) DEFAULT NULL COMMENT 'source ip',
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfobeta_datagrouptenant` (`data_id`,`group_id`,`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info_beta';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_info_tag */
/******************************************/
CREATE TABLE `config_info_tag` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(128) NOT NULL COMMENT 'group_id',
`tenant_id` varchar(128) DEFAULT '' COMMENT 'tenant_id',
`tag_id` varchar(128) NOT NULL COMMENT 'tag_id',
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL COMMENT 'content',
`md5` varchar(32) DEFAULT NULL COMMENT 'md5',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
`src_user` text COMMENT 'source user',
`src_ip` varchar(50) DEFAULT NULL COMMENT 'source ip',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfotag_datagrouptenanttag` (`data_id`,`group_id`,`tenant_id`,`tag_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info_tag';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_tags_relation */
/******************************************/
CREATE TABLE `config_tags_relation` (
`id` bigint(20) NOT NULL COMMENT 'id',
`tag_name` varchar(128) NOT NULL COMMENT 'tag_name',
`tag_type` varchar(64) DEFAULT NULL COMMENT 'tag_type',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(128) NOT NULL COMMENT 'group_id',
`tenant_id` varchar(128) DEFAULT '' COMMENT 'tenant_id',
`nid` bigint(20) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`nid`),
UNIQUE KEY `uk_configtagrelation_configidtag` (`id`,`tag_name`,`tag_type`),
KEY `idx_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_tag_relation';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = group_capacity */
/******************************************/
CREATE TABLE `group_capacity` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`group_id` varchar(128) NOT NULL DEFAULT '' COMMENT 'Group ID,空字符表示整个集群',
`quota` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '配额,0表示使用默认值',
`usage` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '使用量',
`max_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个配置大小上限,单位为字节,0表示使用默认值',
`max_aggr_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '聚合子配置最大个数,,0表示使用默认值',
`max_aggr_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个聚合数据的子配置大小上限,单位为字节,0表示使用默认值',
`max_history_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '最大变更历史数量',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_group_id` (`group_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='集群、各Group容量信息表';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = his_config_info */
/******************************************/
CREATE TABLE `his_config_info` (
`id` bigint(64) unsigned NOT NULL,
`nid` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`data_id` varchar(255) NOT NULL,
`group_id` varchar(128) NOT NULL,
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL,
`md5` varchar(32) DEFAULT NULL,
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`src_user` text,
`src_ip` varchar(50) DEFAULT NULL,
`op_type` char(10) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
PRIMARY KEY (`nid`),
KEY `idx_gmt_create` (`gmt_create`),
KEY `idx_gmt_modified` (`gmt_modified`),
KEY `idx_did` (`data_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='多租户改造';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = tenant_capacity */
/******************************************/
CREATE TABLE `tenant_capacity` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`tenant_id` varchar(128) NOT NULL DEFAULT '' COMMENT 'Tenant ID',
`quota` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '配额,0表示使用默认值',
`usage` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '使用量',
`max_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个配置大小上限,单位为字节,0表示使用默认值',
`max_aggr_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '聚合子配置最大个数',
`max_aggr_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个聚合数据的子配置大小上限,单位为字节,0表示使用默认值',
`max_history_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '最大变更历史数量',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='租户容量信息表';
CREATE TABLE `tenant_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`kp` varchar(128) NOT NULL COMMENT 'kp',
`tenant_id` varchar(128) default '' COMMENT 'tenant_id',
`tenant_name` varchar(128) default '' COMMENT 'tenant_name',
`tenant_desc` varchar(256) DEFAULT NULL COMMENT 'tenant_desc',
`create_source` varchar(32) DEFAULT NULL COMMENT 'create_source',
`gmt_create` bigint(20) NOT NULL COMMENT '创建时间',
`gmt_modified` bigint(20) NOT NULL COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_info_kptenantid` (`kp`,`tenant_id`),
KEY `idx_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='tenant_info';
CREATE TABLE `users` (
`username` varchar(50) NOT NULL PRIMARY KEY,
`password` varchar(500) NOT NULL,
`enabled` boolean NOT NULL
);
CREATE TABLE `roles` (
`username` varchar(50) NOT NULL,
`role` varchar(50) NOT NULL,
UNIQUE INDEX `idx_user_role` (`username` ASC, `role` ASC) USING BTREE
);
CREATE TABLE `permissions` (
`role` varchar(50) NOT NULL,
`resource` varchar(255) NOT NULL,
`action` varchar(8) NOT NULL,
UNIQUE INDEX `uk_role_permission` (`role`,`resource`,`action`) USING BTREE
);
INSERT INTO users (username, password, enabled) VALUES ('nacos', '$2a$10$EuWPZHzz32dJN7jexM34MOeYirDdFAZm2kuWj7VEOJhhZkDrxfvUu', TRUE);
INSERT INTO roles (username, role) VALUES ('nacos', 'ROLE_ADMIN');
2.2.下载nacos
nacos在GitHub上有下载地址:https://github.com/alibaba/nacos/tags,可以选择任意版本下载。
本例中才用1.4.1版本:
2.3.配置Nacos
将这个包解压到任意非中文目录下,如图:
目录说明:
- bin:启动脚本
- conf:配置文件
进入nacos的conf目录,修改配置文件cluster.conf.example,重命名为cluster.conf:
然后添加内容:
127.0.0.1:8845
127.0.0.1.8846
127.0.0.1.8847
然后修改application.properties文件,添加数据库配置
spring.datasource.platform=mysql
db.num=1
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user.0=root
db.password.0=123
2.4.启动
将nacos文件夹复制三份,分别命名为:nacos1、nacos2、nacos3
然后分别修改三个文件夹中的application.properties,
nacos1:
server.port=8845
nacos2:
server.port=8846
nacos3:
server.port=8847
然后分别启动三个nacos节点:
startup.cmd
2.5.nginx反向代理
找到课前资料提供的nginx安装包:
解压到任意非中文目录下:
修改conf/nginx.conf文件,配置如下:
upstream nacos-cluster {
server 127.0.0.1:8845;
server 127.0.0.1:8846;
server 127.0.0.1:8847;
}
server {
listen 80;
server_name localhost;
location /nacos {
proxy_pass http://nacos-cluster;
}
}
启动nginx
start nginx.exe
而后在浏览器访问:http://localhost/nacos即可。
代码中application.yml文件配置如下:
spring:
cloud:
nacos:
server-addr: localhost:80 # Nacos地址
2.6.优化
-
实际部署时,需要给做反向代理的nginx服务器设置一个域名,这样后续如果有服务器迁移nacos的客户端也无需更改配置.
-
Nacos的各个节点应该部署到多个不同服务器,做好容灾和隔离
Feign远程调用
先来看我们以前利用RestTemplate发起远程调用的代码:
存在下面的问题:
•代码可读性差,编程体验不统一
•参数复杂URL难以维护
Feign是一个声明式的http客户端,官方地址:https://github.com/OpenFeign/feign
其作用就是帮助我们优雅的实现http请求的发送,解决上面提到的问题。
Feign替代RestTemplate
首先在 orderserver 中添加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
启动类中添加注解
编写Feign的客户端
@FeignClient("userserver") // 声明要调用的服务名称
public interface UserClient {
@GetMapping("/user/{id}")
User getUser(@PathVariable("id") Long id);
}
这个客户端主要是基于SpringMVC的注解来声明远程调用的信息,比如:
- 服务名称:userservice
- 请求方式:GET
- 请求路径:/user/{id}
- 请求参数:Long id
- 返回值类型:User
这样,Feign就可以帮助我们发送http请求,无需自己使用RestTemplate来发送了。
测试
@Service
public class OrderService {
@Autowired
OrderMapper orderMapper;
@Autowired
UserClient userClient;
public Order queryOrderById(Long orderId) {
// 1.查询订单
Order order = orderMapper.findById(orderId);
// 2.调用userserver服务中的接口
User user = userClient.getUser(order.getUserId());
// 3.设置用户信息
order.setUser(user);
// 4.返回数据
return order;
}
}
然后分别访问:http://localhost:8080/order/101、http://localhost:8080/order/102、http://localhost:8080/order/103、http://localhost:8080/order/104,来查看 userserver 服务日志打印
可以发现 Feign 自带了负载均衡
Feign自定义配置
Feign可以支持很多的自定义配置,如下表所示:
类型 | 作用 | 说明 |
---|---|---|
feign.Logger.Level | 修改日志级别 | 包含四种不同的级别:NONE、BASIC、HEADERS、FULL |
feign.codec.Decoder | 响应结果的解析器 | http远程调用的结果做解析,例如解析json字符串为java对象 |
feign.codec.Encoder | 请求参数编码 | 将请求参数编码,便于通过http请求发送 |
feign. Contract | 支持的注解格式 | 默认是SpringMVC的注解 |
feign. Retryer | 失败重试机制 | 请求失败的重试机制,默认是没有,不过会使用Ribbon的重试 |
一般情况下,默认值就能满足我们使用,如果要自定义时,只需要创建自定义的@Bean覆盖默认Bean即可。
其中日志的级别分为四种:
- NONE:不记录任何日志信息,这是默认值。
- BASIC:仅记录请求的方法,URL以及响应状态码和执行时间
- HEADERS:在BASIC的基础上,额外记录了请求和响应的头信息
- FULL:记录所有请求和响应的明细,包括头信息、请求体、元数据。
修改日志输出
方式一
通过配置文件的方式来修改
我们可以针对于某个服务来设置,这样只会对访问这个服务时执行该级别的日志输出
feign:
client:
config:
userservice: # 针对某个微服务的配置
loggerLevel: FULL # 日志级别
也可以对全局微服务做修改
feign:
client:
config:
default: # 这里用default就是全局配置,如果是写服务名称,则是针对某个微服务的配置
loggerLevel: FULL # 日志级别
效果测试,修改前
修改后
方式二
也可以基于Java代码来修改日志级别,先声明一个类,然后声明一个Logger.Level的对象:
public class DefaultFeignConfiguration {
@Bean
public Logger.Level feignLogLevel(){
return Logger.Level.BASIC; // 日志级别为BASIC
}
}
如果要全局生效,将其放到启动类的 @EnableFeignClients
这个注解中:
@EnableFeignClients(defaultConfiguration = DefaultFeignConfiguration .class)
如果是局部生效,则把它放到对应的 @FeignClient
这个注解中:
@FeignClient(value = "userservice", configuration = DefaultFeignConfiguration .class)
Feign性能优化
Feign底层发起http请求,依赖于其它的框架。其底层客户端实现包括:
•URLConnection:默认实现,不支持连接池
•Apache HttpClient :支持连接池
•OKHttp:支持连接池
因此提高Feign的性能主要手段就是使用连接池代替默认的URLConnection。
这里我们用Apache的HttpClient来演示。
1)引入依赖
在order-service的pom文件中引入Apache的HttpClient依赖:
<!--httpClient的依赖 -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>
2)配置连接池
在order-service的application.yml中添加配置:
feign:
client:
config:
default: # default全局的配置
loggerLevel: BASIC # 日志级别,BASIC就是基本的请求和响应信息
httpclient:
enabled: true # 开启feign对HttpClient的支持
max-connections: 200 # 最大的连接数
max-connections-per-route: 50 # 每个路径的最大连接数
总结,Feign的优化:
1.日志级别尽量用basic
2.使用HttpClient或OKHttp代替URLConnection
① 引入feign-httpClient依赖
② 配置文件开启httpClient功能,设置连接池参数
Feign最佳实践
继承方式
一样的代码可以通过继承来共享:
1)定义一个API接口,利用定义方法,并基于SpringMVC注解做声明。
2)Feign客户端和Controller都集成改接口
优点:
- 简单
- 实现了代码共享
缺点:
-
服务提供方、服务消费方紧耦合
-
参数列表中的注解映射并不会继承,因此Controller中必须再次声明方法、参数列表、注解
抽取方式
将Feign的Client抽取为独立模块,并且把接口有关的POJO、默认的Feign配置都放到这个模块中,提供给所有消费者使用。
例如,将UserClient、User、Feign的默认配置都抽取到一个feign-api包中,所有微服务引用该依赖包,即可直接使用。
基于抽取方式的实现
首先新建一个 feign-api Module,并且导入 feign 依赖
<!--feign客户端-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
然后将之前的 Client 和 pojo 复制过来
然后在 orderserver 中引入 feign-api
然后把 orderserver 中有关 feign 的调用替换成我们定义的 feign-api 中的相关文件
注意要在 orderserver 启动类上的 @EnableFeignClients 注解添加我们要使用的那个 Client ,否则无法扫描到 Client 从而无法实现自动注入
这里的扫描包的方式有两种:
方式一:
指定Feign应该扫描的包:
@EnableFeignClients(basePackages = "cn.itcast.feign.clients")
方式二:
指定需要加载的Client接口:
@EnableFeignClients(clients = {UserClient.class})
微服务网关
网关的作用
- 对用户请求做身份验证,权限校验
- 将用户的请求路由到微服务,并实现负载均衡
- 对用户的请求做限流
搭建GateWay网关
新建Module并引入依赖
<!--网关-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--nacos服务发现依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
新建启动类
@SpringBootApplication
public class GateApplication {
public static void main(String[] args) {
SpringApplication.run(GateApplication.class,args);
}
}
添加配置
server:
port: 10010 # 网关端口
spring:
application:
name: gateway # 服务名称
cloud:
nacos:
server-addr: localhost:8848 # nacos地址
gateway:
routes: # 网关路由配置
- id: user-server # 路由id,自定义,只要唯一即可
# uri: http://127.0.0.1:8081 # 路由的目标地址 http就是固定地址
uri: lb://userserver # 路由的目标地址 lb就是负载均衡,后面跟服务名称
predicates: # 路由断言,也就是判断请求是否符合路由规则的条件
- Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求
- id: order-server
uri: lb://orderserver
predicates:
- Path=/order/**
启动GateWay项目
分别访问
-
http://localhost:10010/order/101
-
http://localhost:10010/user/1
如图,我们已经实现了网关搭建
网关路由的流程图
总结:
网关搭建步骤:
-
创建项目,引入nacos服务发现和gateway依赖
-
配置application.yml,包括服务基本信息、nacos地址、路由
路由配置包括:
-
路由id:路由的唯一标示
-
路由目标(uri):路由的目标地址,http代表固定地址,lb代表根据服务名负载均衡
-
路由断言(predicates):判断路由的规则,
-
路由过滤器(filters):对请求或响应做处理
断言工厂
我们在配置文件中写的断言规则只是字符串,这些字符串会被Predicate Factory读取并处理,转变为路由判断的条件
例如Path=/user/**是按照路径匹配,这个规则是由
org.springframework.cloud.gateway.handler.predicate.PathRoutePredicateFactory
类来
处理的,像这样的断言工厂在SpringCloudGateway还有十几个:
名称 | 说明 | 示例 |
---|---|---|
After | 是某个时间点后的请求 | - After=2037-01-20T17:42:47.789-07:00[America/Denver] |
Before | 是某个时间点之前的请求 | - Before=2031-04-13T15:14:47.433+08:00[Asia/Shanghai] |
Between | 是某两个时间点之前的请求 | - Between=2037-01-20T17:42:47.789-07:00[America/Denver], 2037-01-21T17:42:47.789-07:00[America/Denver] |
Cookie | 请求必须包含某些cookie | - Cookie=chocolate, ch.p |
Header | 请求必须包含某些header | - Header=X-Request-Id, \d+ |
Host | 请求必须是访问某个host(域名) | - Host=.somehost.org,.anotherhost.org |
Method | 请求方式必须是指定方式 | - Method=GET,POST |
Path | 请求路径必须符合指定规则 | - Path=/red/{segment},/blue/** |
Query | 请求参数必须包含指定参数 | - Query=name, Jack或者- Query=name |
RemoteAddr | 请求者的ip必须是指定范围 | - RemoteAddr=192.168.1.1/24 |
Weight | 权重处理 |
我们只需要掌握Path这种路由工程就可以了。
路由过滤器配置
过滤器种类
Spring提供了31种不同的路由过滤器工厂。例如:
名称 | 说明 |
---|---|
AddRequestHeader | 给当前请求添加一个请求头 |
RemoveRequestHeader | 移除请求中的一个请求头 |
AddResponseHeader | 给响应结果中添加一个响应头 |
RemoveResponseHeader | 从响应结果中移除有一个响应头 |
RequestRateLimiter | 限制请求的流量 |
针对某个服务添加过滤器
在 orderserver 服务中添加过滤器
spring:
cloud:
gateway:
routes: # 网关路由配置
- id: order-server
uri: lb://orderserver
predicates:
- Path=/order/**
filters: # 过滤器
- AddRequestHeader=Truth, hello springcloud! # 添加请求头,逗号相当于等于的意思,表示为: Truth=hello springcloud
然后再 orderserver 中获取这个参数
@GetMapping("{orderId}")
public Order queryOrderByUserId(@PathVariable("orderId") Long orderId,
@RequestHeader(value = "Truth",required = false) String Truth) {
System.out.println("Truth:" + Truth);
// 根据id查询订单并返回
return orderService.queryOrderById(orderId);
}
测试访问
添加默认过滤器
如果要对所有的路由都生效,则可以将过滤器工厂写到default下。格式如下:
spring:
cloud:
gateway:
routes: # 网关路由配置
- id: order-server
uri: lb://orderserver
predicates:
- Path=/order/**
default-filters: # 默认过滤器。对所有服务都生效
- AddRequestHeader=Truth, hello springcloud!
全局过滤器
我们可以实现 GlobalFilter 接口来自定义过滤方法
添加 AuthorizeFilter 类,添加如下代码
/**
* @author songzx
* @create 2023-03-21 16:13
*/
@Order(-1) // @Order 的作用是设置拦截器的优先级,数字越小,优先级越高
@Component
public class AuthorizeFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 1.获取请求体
ServerHttpRequest request = exchange.getRequest();
// 2.从请求体中获取请求路径中携带的参数:author
MultiValueMap<String, String> params = request.getQueryParams();
String author = params.getFirst("author");
// 3.判断参数值是否等于admin,是就放行,否则不放行
if("admin".equals(author)){
return chain.filter(exchange);
}
// 4.不予放行,实施拦截
// 4.1 拦截前设置状态码为401
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
// 4.2 结束处理
return exchange.getResponse().setComplete();
}
}
这样通过网关访问微服务接口时必须携带 author=admin 的参数才能被放行
测试如下
过滤器执行顺序
请求进入网关会碰到三类过滤器:当前路由的过滤器、DefaultFilter、GlobalFilter
请求路由后,会将当前路由过滤器和DefaultFilter、GlobalFilter,合并到一个过滤器链(集合)中,排序后依次执行每个过滤器:
排序的规则是什么呢?
- 每一个过滤器都必须指定一个int类型的order值,order值越小,优先级越高,执行顺序越靠前。
- GlobalFilter通过实现Ordered接口,或者添加@Order注解来指定order值,由我们自己指定
- 路由过滤器和defaultFilter的order由Spring指定,默认是按照声明顺序从1递增。
- 当过滤器的order值一样时,会按照 defaultFilter > 路由过滤器 > GlobalFilter的顺序执行。
跨域问题处理
在 gateway 配置文件中添加如下配置
spring:
cloud:
gateway:
globalcors: # 全局的跨域处理
add-to-simple-url-handler-mapping: true # 解决options请求被拦截问题
corsConfigurations:
'[/**]':
allowedOrigins: # 允许哪些网站的跨域请求
- "http://localhost:5050"
allowedMethods: # 允许的跨域ajax的请求方式
- "GET"
- "POST"
- "DELETE"
- "PUT"
- "OPTIONS"
allowedHeaders: "*" # 允许在请求中携带的头信息
allowCredentials: true # 是否允许携带cookie
maxAge: 360000 # 这次跨域检测的有效期
请求示例:
在 5050 端口的地址上请求 10010 的接口,默认情况下会出现跨域问题,但是我们添加完配置后不会有跨域问题
Docker
什么是Docker
Docker是一个快速交付应用,快速运行应用的技术:
- 可以将程序及其依赖、运行环境一起打包成一个镜像,可以迁移到任意的 Linux 操作系统
- 运行时利用沙箱机制形成容器隔离,各个应用互不干扰
- 启动、移除都可以通过一行命令完成,方便快捷
Docker和虚拟机的区别
Docker可以让一个应用在任何操作系统中非常方便的运行。而以前我们接触的虚拟机,也能在一个操作系统中,运行另外一个操作系统,保护系统中的任何应用。
两者有什么差异呢?
虚拟机(virtual machine)是在操作系统中模拟硬件设备,然后运行另一个操作系统,比如在 Windows 系统里面运行 Ubuntu 系统,这样就可以运行任意的Ubuntu应用了。
Docker仅仅是封装函数库,并没有模拟完整的操作系统,如图:
对比来看:
小结:
Docker和虚拟机的差异:
-
docker是一个系统进程;虚拟机是在操作系统中的操作系统
-
docker体积小、启动速度快、性能好;虚拟机体积大、启动速度慢、性能一般
Docker架构
Docker中有几个重要的概念:
镜像(Image):Docker将应用程序及其所需的依赖、函数库、环境、配置等文件打包在一起,称为镜像。
容器(Container):镜像中的应用程序运行后形成的进程就是容器,只是Docker会给容器进程做隔离,对外不可见。
一切应用最终都是代码组成,都是硬盘中的一个个的字节形成的文件。只有运行时,才会加载到内存,形成进程。
而镜像,就是把一个应用在硬盘上的文件、及其运行环境、部分系统函数库文件一起打包形成的文件包。这个文件包是只读的。
容器呢,就是将这些文件中编写的程序、函数加载到内存中允许,形成进程,只不过要隔离起来。因此一个镜像可以启动多次,形成多个容器进程。
例如你下载了一个QQ,如果我们将QQ在磁盘上的运行文件及其运行的操作系统依赖打包,形成QQ镜像。然后你可以启动多次,双开、甚至三开QQ,跟多个妹子聊天。
DockerHub
开源应用程序非常多,打包这些应用往往是重复的劳动。为了避免这些重复劳动,人们就会将自己打包的应用镜像,例如Redis、MySQL镜像放到网络上,共享使用,就像GitHub的代码共享一样。
-
DockerHub:DockerHub是一个官方的Docker镜像的托管平台。这样的平台称为Docker Registry。
-
国内也有类似于DockerHub 的公开服务,比如 网易云镜像服务、阿里云镜像库等。
我们一方面可以将自己的镜像共享到DockerHub,另一方面也可以从DockerHub拉取镜像:
Docker架构
我们要使用Docker来操作镜像、容器,就必须要安装Docker。
Docker是一个CS架构的程序,由两部分组成:
-
服务端(server):Docker守护进程,负责处理Docker指令,管理镜像、容器等
-
客户端(client):通过命令或RestAPI向Docker服务端发送指令。可以在本地或远程向服务端发送指令。
如图:
小结
镜像:
- 将应用程序及其依赖、环境、配置打包在一起
容器:
- 镜像运行起来就是容器,一个镜像可以运行多个容器
Docker结构:
-
服务端:接收命令或远程请求,操作镜像或容器
-
客户端:发送命令或者请求到Docker服务端
DockerHub:
- 一个镜像托管的服务器,类似的还有阿里云镜像服务,统称为DockerRegistry
Linux关闭防火墙
查看防火墙状态
systemctl status firewalld.service
关闭防火墙
systemctl stop firewalld.service
永久关闭
systemctl disable firewalld.service
安装Docker
Docker 分为 CE 和 EE 两大版本。CE 即社区版(免费,支持周期 7 个月),EE 即企业版,强调安全,付费使用,支持周期 24 个月。
Docker CE 分为 stable
test
和 nightly
三个更新频道。
官方网站上有各种环境下的 安装指南,这里主要介绍 Docker CE 在 CentOS上的安装。
CentOS安装Docker
Docker CE 支持 64 位版本 CentOS 7,并且要求内核版本不低于 3.10, CentOS 7 满足最低内核的要求,所以我们在CentOS 7安装Docker。
1.1.卸载(可选)
如果之前安装过旧版本的Docker,可以使用下面命令卸载:
yum remove docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine \
docker-ce
1.2.安装docker
首先需要大家虚拟机联网,安装yum工具
yum install -y yum-utils \
device-mapper-persistent-data \
lvm2 --skip-broken
然后更新本地镜像源:
# 设置docker镜像源
yum-config-manager \
--add-repo \
https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
sed -i 's/download.docker.com/mirrors.aliyun.com\/docker-ce/g' /etc/yum.repos.d/docker-ce.repo
yum makecache fast
然后输入命令:
yum install -y docker-ce
docker-ce为社区免费版本。稍等片刻,docker即可安装成功。
1.3.启动docker
Docker应用需要用到各种端口,逐一去修改防火墙设置。非常麻烦,因此建议大家直接关闭防火墙!
启动docker前,一定要关闭防火墙后!!
启动docker前,一定要关闭防火墙后!!
启动docker前,一定要关闭防火墙后!!
# 关闭
systemctl stop firewalld
# 禁止开机启动防火墙
systemctl disable firewalld
通过命令启动docker:
systemctl start docker # 启动docker服务
systemctl stop docker # 停止docker服务
systemctl restart docker # 重启docker服务
然后输入命令,可以查看docker版本:
docker -v
如图:
1.4.重启docker
systemctl restart docker
1.5.配置镜像加速
docker官方镜像仓库网速较差,我们需要设置国内镜像服务:
参考阿里云的镜像加速文档:https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors
Docker基本操作
镜像名称
首先来看下镜像的名称组成:
- 镜名称一般分两部分组成:[repository]:[tag]。
- 在没有指定tag时,默认是latest,代表最新版本的镜像
如图:
这里的mysql就是repository,5.7就是tag,合一起就是镜像名称,代表5.7版本的MySQL镜像。
镜像命令
常见的镜像操作命令如图:
拉取镜像
我们可以访问官方网站,来查询要拉取的镜像
docker pull nginx:laster
查看已安装镜像
docker images
删除镜像
docker rmi nginx:latest
将镜像保存为tar文件
docker save -o nginx.tar nginx:latest
使用load加载镜像
docker load -i nginx.tar
容器的基本操作
基本操作图
容器操作的命令如图:
容器保护三个状态:
- 运行:进程正常运行
- 暂停:进程暂停,CPU不再运行,并不释放内存
- 停止:进程终止,回收进程占用的内存、CPU等资源
其中:
-
docker run:创建并运行一个容器,处于运行状态
-
docker pause:让一个运行的容器暂停
-
docker unpause:让一个容器从暂停状态恢复运行
-
docker stop:停止一个运行的容器
-
docker start:让一个停止的容器再次运行
-
docker rm:删除一个容器
容器基本操作1
我们以启动 nginx 容器为例
启动nginx容器
docker run --name nginxcontent -p 7070:80 -d nginx
查看运行中的容器
docker ps
测试访问
访问:http://182.43.250.34:7070
查看容器日志
docker logs nginxcontent
持续监听查看日志
docker logs -f nginxcontent
容器基本操作2
查看正在运行的容器
docker ps
查看所有容器
docker ps -a
-a 参数可以查看所有容器,包括没有运行的容器
停止正在运行的容器
docker stop 容器名称
重启已经停止的容器
docker start 容器名称
删除容器
docker rm 容器名称
上面这种删除默认只能删除没有运行的容器,而不能删除正在运行的容器
强制删除正在运行的容器
docker rm -f 容器名称
进入容器
docker exec -it 容器名称 bash
容器操作案例以Redis为例
启动一个Redis容器
进入容器内部添加一个 num=666
Docker数据卷
数据卷的作用
将容器和数据分离,解耦合,方便操作容器内数据,保证数据安全。通过数据卷和容器内的文件做相互关联,我们可以直接操作宿主机上的文件,从而达到实现修改容器内数据的目的。
数据卷的基本操作
创建一个数据卷
docker volume create 数据卷名称
下图是创建一个名为 html 的数据卷
查看所有数据卷
docker volume ls
查看指定数据卷的详细信息
docker volume inspect 数据卷名称
查询html数据卷的详细信息
删除所有未使用的数据卷
docker volume prune 数据卷名称
删除指定数据卷
docker volume rm 数据卷名称
数据卷挂载操作
使用 -v 数据卷名称:/容器内目标路径
,其中容器内目标路径可以在 DockerHub 上查到
下面是启动 nginx 容器并挂载 html 数据卷的命令
docker run --name mn -p 7070:80 -v html:/usr/share/nginx/html -d nginx
然后查看 html 数据卷对应宿主机的真实文件地址
docker volume inspect html
然后 cd 到查询到的位置,并且列出该文件夹下的文件
可以发现容器内的文件在我们主机上也对应的存在一份
现在访问 7070 端口来查看页面
然后尝试修改主机上的 html 文件
然后直接刷新页面
我们可以发现页面也跟着改变,说明通过数据卷来修改了容器内的文件
在运行容器时,如果数据卷不存在,docker 会自动创建数据卷,不需要刻意提前创建数据卷
数据卷挂载案例以MySQL为例
首先将准备好的 mysql.tar 上传到 /tmp/mysql,并且新建 config 和 data 两个文件夹
接着将准备好的 hmy.cnf 文件上传到 config 文件中
然后使用 docker 加载 tar 镜像
docker load -i mysql.tar
加载完镜像后可以启动 mysql 容器了,输入下面的命令
docker run --name mysql8 \
-e MYSQL_ROOT_PASSWORD=fawu123.. \
-p 3306:3306 \
-v /tmp/mysql/conf/hmy.cnf:/etc/mysql/conf.d/hmy.cnf/hmy.cnf \
-v /tmp/mysql/data:/var/lib/mysql \
-d \
mysql:8.0.24
docker run --name mysql \
-e MYSQL_ROOT_PASSWORD=abc123 \
-p 3307:3306 \
-v /tmp/mysql/config/hmy.cnf:/etc/mysql/conf.d/hmy.cnf/hmy.cnf \
-v /tmp/mysql/data:/var/lib/mysql \
-d \
mysql:5.7.25
上面的命令解释
-
反斜杠表示命令换行符,表示命令没有结束继续往下读取
-
-e 设置系统环境变量,这里设置mysql密码为 abc123
-
-p 宿主机端口和容器端口映射
-
-v 挂载数据卷
-
-d 运行的容器名称
踩坑点:换行执行命令时,镜像名称要单独在一行,不能放在 -d 后面运行,否则启动会出错
测试链接
自定义镜像Dockerfile
更新详细语法说明,请参考官网文档: https://docs.docker.com/engine/reference/builder
基于Ubuntu构建Java项目
首先将事先准备好的资料复制到 docker-demo 文件夹中
其中 Dockerfile 文件内容如下
# 指定基础镜像
FROM ubuntu:16.04
# 配置环境变量,JDK的安装目录
ENV JAVA_DIR=/usr/local
# 拷贝jdk和java项目的包
COPY ./jdk8.tar.gz $JAVA_DIR/
COPY ./docker-demo.jar /tmp/app.jar
# 安装JDK
RUN cd $JAVA_DIR \
&& tar -xf ./jdk8.tar.gz \
&& mv ./jdk1.8.0_144 ./java8
# 配置环境变量
ENV JAVA_HOME=$JAVA_DIR/java8
ENV PATH=$PATH:$JAVA_HOME/bin
# 暴露端口
EXPOSE 8090
# 入口,java项目的启动命令
ENTRYPOINT java -jar /tmp/app.jar
然后执行构建镜像的命令
docker build -t javaweb:1.0 .
javaweb:1.0
表示要构建的镜像名称和tag
注意命令后面跟上空格 .,小数点的意思是表示当前目录
正在构建中
构建完成,查看当前的所有镜像
启动镜像
docker run --name web -p 8090:8090 -d javaweb:1.0
测试访问
基于java8构建Java项目
在上面的例子中,我们每次构建都要从新的环境去安装 java1.8 jdk环境,如果有多个服务需要构建,那么每次都要重复的执行一遍,所以我们可以基于别人弄好的一个java8项目来构建镜像
修改 Dockerfile 文件成如下
# 基于java:8-alpine构建
FROM java:8-alpine
# 拷贝目录下的 docker-demo.jar 到镜像中的 tmp/app.jar
COPY ./docker-demo.jar /tmp/app.jar
# 指定容器的端口
EXPOSE 8090
# 入口,java项目的启动命令
ENTRYPOINT java -jar /tmp/app.jar
然后再次执行构建命令
docker build -t javaweb:1.2 .
查看构建好的镜像
启动镜像
docker run --name web2 -p 8091:8090 -d javaweb:1.2
测试访问
CentOS7安装DockerCompose
2.1.下载
Linux下需要通过命令下载:
# 安装
curl -L https://github.com/docker/compose/releases/download/1.23.1/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
如果下载速度较慢,或者下载失败,可以使用课前资料提供的docker-compose文件:
上传到/usr/local/bin/
目录也可以。
修改文件权限
修改文件权限:
# 修改权限
chmod +x /usr/local/bin/docker-compose
2.3.Base自动补全命令:
# 补全命令
curl -L https://raw.githubusercontent.com/docker/compose/1.29.1/contrib/completion/bash/docker-compose > /etc/bash_completion.d/docker-compose
如果这里出现错误,需要修改自己的hosts文件:
echo "199.232.68.133 raw.githubusercontent.com" >> /etc/hosts
这里如果修改了hosts文件还是不行,重启一下 docker 再次运行即可
使用CockerCompose部署集群
服务打包
首先将各个微服务打包
每个微服务文件夹里面都只有一个 jar 包和 Dockerfile 文件
其中 Dockerfile 文件内容如下
FROM java:8-alpine
COPY ./app.jar /tmp/app.jar
ENTRYPOINT java -jar /tmp/app.jar
然后 docker-compose.yml 内容如下
version: "3.2"
services:
nacos:
image: nacos/nacos-server
environment:
MODE: standalone
ports:
- "8848:8848"
mysql:
image: mysql:5.7.25
environment:
MYSQL_ROOT_PASSWORD: 123
volumes:
- "$PWD/mysql/data:/var/lib/mysql"
- "$PWD/mysql/conf:/etc/mysql/conf.d/"
userservice:
build: ./user-service
orderservice:
build: ./order-service
gateway:
build: ./gateway
ports:
- "10010:10010"
运行所有容器
然后执行运行命令
docker-compose up -d
运行成功截图
查看正在运行的容器,可以发现所有的微服务都在运行中
查看日志
docker-compose logs
重启 docker-compose
docker-compose restart gateway userservice orderservice
停止启动的所有容器
docker-compose down
down
命令会停止所有容器,并删除相关的网络和卷。在容器停止时,docker-compose
会自动移除容器名称、网络和卷等相关资源
docker-compose stop
stop
命令会停止所有容器,但是容器名称、网络和卷等相关资源不会被删除。如果你想启动容器,可以使用 docker-compose start
命令。
Docker私有镜像仓库
搭建镜像仓库可以基于Docker官方提供的DockerRegistry来实现。
官网地址:https://hub.docker.com/_/registry
3.1.简化版镜像仓库
Docker官方的Docker Registry是一个基础版本的Docker镜像仓库,具备仓库管理的完整功能,但是没有图形化界面。
搭建方式比较简单,命令如下:
docker run -d \
--restart=always \
--name registry \
-p 5000:5000 \
-v registry-data:/var/lib/registry \
registry
命令中挂载了一个数据卷registry-data到容器内的/var/lib/registry 目录,这是私有镜像库存放数据的目录。
访问http://YourIp:5000/v2/_catalog 可以查看当前私有镜像服务中包含的镜像
3.2.带有图形化界面版本
使用DockerCompose部署带有图象界面的DockerRegistry,命令如下:
version: '3.0'
services:
registry:
image: registry
volumes:
- ./registry-data:/var/lib/registry
ui:
image: joxit/docker-registry-ui:static
ports:
- 8080:80
environment:
- REGISTRY_TITLE=传智教育私有仓库
- REGISTRY_URL=http://registry:5000
depends_on:
- registry
3.3.配置Docker信任地址
我们的私服采用的是http协议,默认不被Docker信任,所以需要做一个配置:
# 打开要修改的文件
vi /etc/docker/daemon.json
# 添加内容:
"insecure-registries":["http://182.43.250.34:7071"]
# 重加载
systemctl daemon-reload
# 重启docker
systemctl restart docker
启动服务
docker-compose up -d
访问 182.43.250.34:7071 来查看UI界面
上传镜像到私有仓库
首先要对已有镜像进行重新 tag,以 nginx 为例
docker tag nginx:latest 182.43.250.34:7071/nginx:1.0
然后执行推送命令
docker push 182.43.250.34:7071/nginx:1.0
刷新 http://182.43.250.34:7071 查看
从私有镜像拉取
docker pull 182.43.250.34:7071/nginx:1.0
异步通信MQ
同步通信的优缺点
优点
- 时效性高,可以立即得到结果
缺点
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
异步通信的优缺点
优点
- 耦合度低
- 吞吐量提升
- 故障隔离
- 流量销峰
缺点
- 依赖于Broker的可靠性、安全性、吞吐能力
- 架构复杂了,业务没有明显的流程线,不好追踪管理
各个MQ框架的区别
安装RabbitMq
使用 load 的方式加载课前准备好的 mq.tar 镜像包
docker load -i mq.tar
查看加载好的镜像
运行mq,其中设置账号 szx,密码为 abc123
docker run \
-e RABBITMQ_DEFAULT_USER=szx \
-e RABBITMQ_DEFAULT_PASS=abc123 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
运行成功后访问可以查看 MQ 的 UI 界面
http://182.43.250.34:15672
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ANq4wl8T-1687829496187)(https://szx-bucket1.oss-cn-hangzhou.aliyuncs.com/picgo/image-20230327200044070.png)]
RabbitMq快速入门
Hello RabbitMQ
添加消息发送者
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("182.43.250.34");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("szx");
factory.setPassword("abc123");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
添加消息消费者
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("182.43.250.34");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("szx");
factory.setPassword("abc123");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
运行两次发送者,然后再运行消息消费者,打印如下
SpringAMQP-简单队列模型
引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
定义消息发送者
首先添加配置
spring:
rabbitmq:
host: 182.43.250.34 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: szx # 用户名
password: abc123 # 密码
编写代码
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void test(){
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
此时消息队列中有一条消息待消费
定义消息消费者
同样先添加配置文件
spring:
rabbitmq:
host: 182.43.250.34 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: szx # 用户名
password: abc123 # 密码
然后需要添加一个 Bean 交由 Spring 管理,当监听到消息后会自动触发这个方法
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
然后需要启动 Spring 运行函数,查看打印
此时消息队列中的消息就没有了
WorkQueue
Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。
消息发送
修改代码,设置一秒钟发送50个消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest2 {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void test() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring message_____";
for (int i = 1; i <= 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
// 一秒钟内发送50条消息
Thread.sleep(20);
}
}
}
消息接收
可以定义多个消费者来消费,消费的是同一个队列中的消息
// 设置消费者1每秒处理40个
@RabbitListener(queues = "simple.queue")
public void listenWorkQueueMessage1(String msg) throws InterruptedException {
System.out.println("spring 消费者1接收到消息:【" + msg + "】," + LocalDateTime.now());
Thread.sleep(20);
}
// 设置消费者2每秒处理10个
@RabbitListener(queues = "simple.queue")
public void listenWorkQueueMessage2(String msg) throws InterruptedException {
System.err.println("spring 消费者2接收到消息____:【" + msg + "】," + LocalDateTime.now());
Thread.sleep(200);
}
// 设置消费者3每秒处理10个
@RabbitListener(queues = "simple.queue")
public void listenWorkQueueMessage3(String msg) throws InterruptedException {
System.out.println("spring 消费者3接收到消息____:【" + msg + "】," + LocalDateTime.now());
Thread.sleep(20);
}
修改配置文件,每次只能预取一个消息,这样放置性能不好的消费者也去获取大量消息来处理
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
运行效果
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jCeQ4XUk-1687829496190)(https://szx-bucket1.oss-cn-hangzhou.aliyuncs.com/picgo/image-20230327213538064.png)]
可以看到,消费者2执行的消费比较少,其他两个执行的比较多
发布/订阅
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
Fanout
Fanout 也可以理解为广播,消息发送给交换机后,交换机会吧消息发送给每一个与之绑定的队列,所以这两个队列都能获取到消息
完成如下示例
- 声明一个交换机,名称叫做:itcast.fanout
- 声明两个队列,分别是:fanout.queue1, fanout.queue2
添加配置类
在 consumer 中添加交换机
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author songzx
* @date 2023/4/16
* @apiNote
*/
@Configuration
public class FanoutConfig {
// 声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
// 声明第一个队列
@Bean
public Queue queue1(){
return new Queue("fanout.queue1");
}
// 将第一个队列和交换机绑定
@Bean
public Binding bindingQueue(Queue queue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
// 声明第二个队列
@Bean
public Queue queue2(){
return new Queue("fanout.queue2");
}
// 将第二个队列和交换机绑定
@Bean
public Binding bindingQueue2(Queue queue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}
消息发送
package cn.itcast.mq.helloworld;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author songzx
* @date 2023/3/27
* @apiNote
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testSendFanout(){
// 交换机名称
String exChangeName = "itcast.fanout";
// 消息内容
String message = "hello every one";
// 发送消息给交换机
rabbitTemplate.convertAndSend(exChangeName,"",message);
}
}
消息接收
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
Direct
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
下面通过注解的方式来实现上面的图
消息接收
@RabbitListener(bindings = @QueueBinding(
value = @Queue("directQueue1"),
exchange = @Exchange("itcast.exchange"),
key = {"blue","red"}
))
public void directQueue1(String msg){
System.out.println("directQueue1收到消息:" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("directQueue2"),
exchange = @Exchange("itcast.exchange"),
key = {"yellow","red"}
))
public void directQueue2(String msg){
System.out.println("directQueue2收到消息:" + msg);
}
消息发送
@Test
public void testDirectExChange(){
// 交换机名称
String exChangeName = "itcast.exchange";
// 消息内容
String message = "hello blue";
// 发送消息给交换机
rabbitTemplate.convertAndSend(exChangeName,"blue",message);
}
Topic
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者 item.spu
item.*
:只能匹配item.spu
消息接收
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topicQueue1"),
exchange = @Exchange(value = "topic",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void topicQueue1(String msg){
System.out.println("topicQueue1收到消息:" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topicQueue2"),
exchange = @Exchange(value = "topic",type = ExchangeTypes.TOPIC),
key = "#.weather"
))
public void topicQueue2(String msg){
System.out.println("topicQueue2收到消息:" + msg);
}
消息发送
@Test
public void testTopicExChange(){
// 交换机名称
String exChangeName = "topic";
// 消息内容
String message = "测试新闻发送";
// 发送消息给交换机
rabbitTemplate.convertAndSend(exChangeName,"china.news",message);
}
消息转换器
默认情况下,我们只能发送字节消息。无法发送其他类型的消息,所以需要安装消息转换器,来将消息转成json的格式来发送
安装消息转换器的依赖
添加依赖
<!--消息转换器-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
配置消息bean
在启动类中添加 Bean,这个 Bean 需要在发送消息和接收消息的工程中都配置
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
发送消息
@Test
public void testMessageConverter(){
Map<String,Object> msg = new HashMap<>();
msg.put("name","里斯");
msg.put("age",15);
rabbitTemplate.convertAndSend("fanout.queue1",msg);
}
队列必须提前声明好
接收消息
@RabbitListener(queues = "fanout.queue1")
public void objectQueue(Map<String,Object> msp){
System.out.println("接收到消息:" + msp);
}
分布式搜索引擎
什么是elasticsearch?
- 一个开源的分布式搜索引擎,可以用来实现搜索、日志统计、分析、系统监控等功能
什么是elastic stack(ELK)?
- 是以elasticsearch为核心的技术栈,包括beats、Logstash、kibana、elasticsearch
什么是Lucene?
- 是Apache的开源搜索引擎类库,提供了搜索引擎的核心API
安装es、kibana
安装
参考课前资料:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kIqsTosK-1687829496193)(https://szx-bucket1.oss-cn-hangzhou.aliyuncs.com/picgo/image-20210720203805350.png)]
分词器
参考课前资料:
总结
分词器的作用是什么?
- 创建倒排索引时对文档分词
- 用户搜索时,对输入的内容分词
IK分词器有几种模式?
- ik_smart:智能切分,粗粒度
- ik_max_word:最细切分,细粒度
IK分词器如何拓展词条?如何停用词条?
- 利用config目录的IkAnalyzer.cfg.xml文件添加拓展词典和停用词典
- 在词典中添加拓展词条或者停用词条
索引库操作
索引库就类似数据库表,mapping映射就类似表的结构。
我们要向es中存储数据,必须先创建“库”和“表”。
mapping映射属性
mapping是对索引库中文档的约束,常见的mapping属性包括:
- type:字段数据类型,常见的简单类型有:
- 字符串:text(可分词的文本)、keyword(精确值,例如:品牌、国家、ip地址)
- 数值:long、integer、short、byte、double、float、
- 布尔:boolean
- 日期:date
- 对象:object
- index:是否创建索引,默认为true
- analyzer:使用哪种分词器
- properties:该字段的子字段
例如下面的json文档:
{
"age": 21,
"weight": 52.1,
"isMarried": false,
"info": "黑马程序员Java讲师",
"email": "zy@itcast.cn",
"score": [99.1, 99.5, 98.9],
"name": {
"firstName": "云",
"lastName": "赵"
}
}
对应的每个字段映射(mapping):
- age:类型为 integer;参与搜索,因此需要index为true;无需分词器
- weight:类型为float;参与搜索,因此需要index为true;无需分词器
- isMarried:类型为boolean;参与搜索,因此需要index为true;无需分词器
- info:类型为字符串,需要分词,因此是text;参与搜索,因此需要index为true;分词器可以用ik_smart
- email:类型为字符串,但是不需要分词,因此是keyword;不参与搜索,因此需要index为false;无需分词器
- score:虽然是数组,但是我们只看元素的类型,类型为float;参与搜索,因此需要index为true;无需分词器
- name:类型为object,需要定义多个子属性
- name.firstName;类型为字符串,但是不需要分词,因此是keyword;参与搜索,因此需要index为true;无需分词器
- name.lastName;类型为字符串,但是不需要分词,因此是keyword;参与搜索,因此需要index为true;无需分词器
创建索引库
基本语法
- 请求方式:PUT
- 请求路径:/索引库名,可以自定义
- 请求参数:mapping映射
示例
PUT /索引库名称
{
"mappings": {
"properties": {
"字段名":{
"type": "text",
"analyzer": "ik_smart"
},
"字段名2":{
"type": "keyword",
"index": "false"
},
"字段名3":{
"properties": {
"子字段": {
"type": "keyword"
}
}
},
// ...略
}
}
}
下面创建一个名为 heima 的索引库
PUT /heima
{
"mappings": {
"properties": {
"info":{
"type": "text",
"analyzer": "ik_smart"
},
"email":{
"type": "keyword",
"index": false
},
"name":{
"properties": {
"firstName":{
"type":"keyword"
},
"lastName":{
"type":"keyword"
}
}
}
}
}
}
运行出现如下图,表示创建索引成功
查询、删除、修改索引库
查询索引库
GET /heima
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oMUGsA3l-1687829496194)(https://szx-bucket1.oss-cn-hangzhou.aliyuncs.com/picgo/image-20230423193044096.png)]
修改索引库,修改索引库时只能添加新的字段,不能修改已有的字段
PUT /heima/_mappings
{
"properties":{
"age":{
"type":"integer"
}
}
}
删除索引库
DELETE /heima
文档操作
添加、删除、查询
添加文档语法
POST /索引名称/_doc/要添加数据的id
示例代码
# 添加文档
POST /heima/_doc/1
{
"info":"黑马程序员Java讲师",
"age":18,
"email":"zy@163.com",
"name":{
"firstName":"赵",
"lastName":"云"
}
}
查询
# 查询文档
GET /heima/_doc/1
删除
# 删除文档
DELETE /heima/_doc/1
修改文档
全量修改,如果id不存在,则会新增一条
# 全量更新
PUT /heima/_doc/1
{
"info":"黑马程序员Java讲师",
"age":18,
"email":"ZhaoYun@163.com",
"name":{
"firstName":"赵",
"lastName":"云"
}
}
局部修改,只修改一个字段
# 局部修改,只修改某个字段
POST /heima/_update/1
{
"doc": {
"age":20
}
}
RestClient
导入
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.12.1</elasticsearch.version>
</properties>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
默认安装的是 7.6.1 版本,要使用固定 7.12.1 版本
初始化Client
编写测试方法
@SpringBootTest
public class HotelIndexTest {
private RestHighLevelClient client;
@Test
public void testInitClient(){
System.out.println(client);
}
@BeforeEach
void setClient(){
this.client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://8.130.118.240:9200")
));
}
@AfterEach
void removeClient() throws IOException {
this.client.close();
}
}
索引操作
首先我们要根据 SQL 表结构,自行设置 mapping 映射的 JSON,然后通过 RestClient 来在 Java 代码中添加索引
创建索引库
// 创建索引库
@Test
void createHotelIndex() throws IOException {
// 1.创建Request对象
CreateIndexRequest request = new CreateIndexRequest("hotel");
// 2.请求参数
request.source(MAPPING_TEMPLATE, XContentType.JSON);
// 3.发起请求
client.indices().create(request, RequestOptions.DEFAULT);
}
其中 MAPPING_TEMPLATE 是我们提前准备好的 mapping 映射的 JSON 文件
package cn.itcast.hotel.constants;
/**
* @author songzx
* @create 2023-04-26 16:55
*/
public class HotelConstants {
public static final String MAPPING_TEMPLATE = "{\n" +
" \"mappings\": {\n" +
" \"properties\": {\n" +
" \"id\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"name\":{\n" +
" \"type\": \"text\",\n" +
" \"analyzer\": \"ik_max_word\",\n" +
" \"copy_to\": \"all\"\n" +
" },\n" +
" \"address\":{\n" +
" \"type\": \"keyword\",\n" +
" \"index\": false\n" +
" },\n" +
" \"price\":{\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"score\":{\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"brand\":{\n" +
" \"type\": \"keyword\",\n" +
" \"copy_to\": \"all\"\n" +
" },\n" +
" \"city\":{\n" +
" \"type\": \"keyword\",\n" +
" \"copy_to\": \"all\"\n" +
" },\n" +
" \"starName\":{\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"business\":{\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"location\":{\n" +
" \"type\": \"geo_point\"\n" +
" },\n" +
" \"pic\":{\n" +
" \"type\": \"keyword\",\n" +
" \"index\": false\n" +
" },\n" +
" \"all\":{\n" +
" \"type\": \"text\",\n" +
" \"analyzer\": \"ik_max_word\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}";
}
执行完成后,我们在网页中查看该索引
判断索引是否存在
// 判断索引库是否存在
@Test
void existsHotelIndex() throws IOException {
// 1.创建Request对象
GetIndexRequest request = new GetIndexRequest("hotel");
// 2.发起请求
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
// 3.输出结果
System.out.println(exists);
}
运行结果
删除索引
// 删除索引库
@Test
void deleteHotelIndex() throws IOException {
// 1.创建Request对象
DeleteIndexRequest request = new DeleteIndexRequest("hotel");
// 2.发起请求
client.indices().delete(request, RequestOptions.DEFAULT);
}
文档操作
添加文档
索引库对应的实体类如下
@Data
@TableName("tb_hotel")
public class Hotel {
@TableId(type = IdType.INPUT)
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String longitude;
private String latitude;
private String pic;
}
但是在经纬度处理方面可映射文件的 geo_point 类型不一致,所以要转换实体类
@Data
@NoArgsConstructor
public class HotelDoc {
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String location;
private String pic;
public HotelDoc(Hotel hotel) {
this.id = hotel.getId();
this.name = hotel.getName();
this.address = hotel.getAddress();
this.price = hotel.getPrice();
this.score = hotel.getScore();
this.brand = hotel.getBrand();
this.city = hotel.getCity();
this.starName = hotel.getStarName();
this.business = hotel.getBusiness();
this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
this.pic = hotel.getPic();
}
}
通过id查询出一条数据,转换成文档实体类
// 添加文档
@Test
void testAddDocument() throws IOException {
Hotel hotel = hotelService.getById("56227L");
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1.创建request对象
IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString());
// 2.准备json数据
request.source(JSON.toJSONString(hotelDoc),XContentType.JSON);
// 3.发送请求
client.index(request,RequestOptions.DEFAULT);
}
执行完成后查看是否添加成功
根据id查询文档
// 查询文档
@Test
void testGetDocument() throws IOException {
// 1.准备request
GetRequest request = new GetRequest("hotel", "56227");
// 2.发送请求
GetResponse response = client.get(request, RequestOptions.DEFAULT);
// 3.解析响应数据
String json = response.getSourceAsString();
// 4.将json转成实体类
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
System.out.println(hotelDoc);
}
返回结果
根据id删除文档
// 删除文档
@Test
void testDeleteDocument() throws IOException {
// 1.准备request
DeleteRequest request = new DeleteRequest("hotel", "56227");
// 2.调用删除方法
client.delete(request,RequestOptions.DEFAULT);
}
修改文档
修改文档分成:
- 全量修改:本质是先根据id删除,再新增
- 增量修改:修改文档中的指定字段值
全量修改的语法和新增语法一致,下面演示增量修改的语法
// 修改文档-增量修改
@Test
void testUpdateDocument() throws IOException {
// 1.准备request
UpdateRequest request = new UpdateRequest("hotel","56227");
// 2.设置map数据
HashMap<String, Object> hashMap = new HashMap<>();
hashMap.put("price",299);
hashMap.put("starName","三星级");
// 3.设置dock
request.doc(hashMap);
// 4.执行修改方法
client.update(request,RequestOptions.DEFAULT);
}
修改前
修改后
批量新增文档
// 批量新增文档
@Test
void testBulkRequest() throws IOException {
// 1.查询酒店的所有数据到list集合
List<Hotel> hotelList = hotelService.list();
// 2.创建request
BulkRequest bulkRequest = new BulkRequest();
// 3.使用for循环,将多个文档数据添加到request
for (Hotel hotel : hotelList) {
// 3.1转换成文档类型HotelDoc
HotelDoc hotelDoc = new HotelDoc(hotel);
// 3.2创建新增文档的request对象
bulkRequest.add(
new IndexRequest("hotel")
.id(hotelDoc.getId().toString())
.source(JSON.toJSONString(hotelDoc),XContentType.JSON)
);
}
// 4.发送请求
client.bulk(bulkRequest,RequestOptions.DEFAULT);
}
查询操作
match_all查询
DSL语法
# 查询所有
GET /hotel/_search
{
"query": {
"match_all": {}
}
}
Java语法
// 测试查询所有数据
@Test
void testQueryMatchAll() throws IOException {
// 1.准备查询request
SearchRequest searchRequest = new SearchRequest("hotel");
// 2.准备DSL
searchRequest.source().query(QueryBuilders.matchAllQuery());
// 3.发送请求得到响应
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
// 4.解析数据
SearchHits searchHits = response.getHits();
// 4.1获取总条数
long total = searchHits.getTotalHits().value;
System.out.println("获取到" + total + "条数据");
// 4.2文档数据
SearchHit[] hits = searchHits.getHits();
// 4.3遍历数据
for (SearchHit hit : hits) {
// 获取文档json
String json = hit.getSourceAsString();
// 反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
System.out.println(hotelDoc);
}
}
解析SearchResponse方法
// 解析数据公共方法
void parsingData(SearchResponse response){
// 4.解析数据
SearchHits searchHits = response.getHits();
// 5.获取总数
long total = searchHits.getTotalHits().value;
// 6.遍历数据
SearchHit[] hits = searchHits.getHits();
for (SearchHit hit : hits) {
String json = hit.getSourceAsString();
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
System.out.println(hotelDoc);
}
}
match查询
DSL查询
# 单字段查询
GET /hotel/_search
{
"query": {
"match": {
"all": "外滩酒店"
}
}
}
Java代码
@Test
void testMatchSearch() throws IOException {
// 1.准备request
SearchRequest searchRequest = new SearchRequest("hotel");
// 2.准备DSL数据
searchRequest.source().query(
QueryBuilders.matchQuery("all","上海")
).from(5).size(6);
// 3.发起查询
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
// 4.解析数据
SearchHits searchHits = response.getHits();
// 5.获取总数
long total = searchHits.getTotalHits().value;
// 6.遍历数据
SearchHit[] hits = searchHits.getHits();
for (SearchHit hit : hits) {
String json = hit.getSourceAsString();
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
System.out.println(hotelDoc);
}
}
多字段查询
DSL
# 多字段查询
GET /hotel/_search
{
"query": {
"multi_match": {
"query": "上海外滩",
"fields": ["brand","name","business"]
}
}
}
java代码
// 多字段查询
@Test
void testMultiMatchSearch() throws IOException {
// 1.准备request
SearchRequest searchRequest = new SearchRequest("hotel");
// 2.准备DSL数据
searchRequest.source().query(
QueryBuilders.multiMatchQuery("上海","name","business")
);
// 3.发起查询
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
// 公共解析方法
parsingData(response);
}
term精准查询
DSL语句
# 精准查询
GET /hotel/_search
{
"query": {
"term": {
"city": {
"value": "上海"
}
}
}
}
java代码
// 精准查询
@Test
void testTermSearch() throws IOException {
SearchRequest request = new SearchRequest("hotel");
request.source().query(
QueryBuilders.termQuery("city","上海")
);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
parsingData(response);
}
range范围查询
range 查询,用于数据过滤查询,比如下面查询价格大于等于400小于等于500的数据
DSL语法
GET /hotel/_search
{
"query": {
"range": {
"price": {
"gte": 400,
"lte": 500
}
}
}
}
range 查询,用于数据过滤查询,比如下面查询价格大于400小于500的数据
GET /hotel/_search
{
"query": {
"range": {
"price": {
"gte": 400,
"lte": 500
}
}
}
}
java代码
// 范围查询
@Test
void testRangeQuery() throws IOException {
SearchRequest request = new SearchRequest("hotel");
request.source()
.query(QueryBuilders.rangeQuery("price").gt(400).lt(500));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
parsingData(response);
}
bool布尔多条件查询
DSL
GET /hotel/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"all": "如家"
}
},
{
"term": {
"city": {
"value": "上海"
}
}
}
],
"must_not": [
{
"range": {
"price": {
"gte": 400
}
}
}
],
"filter": [
{
"geo_distance": {
"distance": "10km",
"location": {
"lat": 31.21,
"lon": 121.5
}
}
}
]
}
}
}
java代码
// 布尔查询,多条件查询
@Test
void testBooleQuery() throws IOException {
SearchRequest request = new SearchRequest("hotel");
// 准备boolQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 查询名字包含上海如家的
boolQuery.must(QueryBuilders.matchQuery("name","上海如家"));
// 查询地址等于上海的
boolQuery.must(QueryBuilders.termQuery("city","上海"));
// 过滤掉价格高于400的,只返回不高于400的数据
boolQuery.filter(QueryBuilders.rangeQuery("price").lt(400));
// 发起查询,并按照价格升序排序
request.source().query(boolQuery).sort("price",SortOrder.ASC);
// 发起查询
SearchResponse search = client.search(request, RequestOptions.DEFAULT);
parsingData(search);
}
查询结果
分页和排序查询
搜索名字包含如家的酒店,按照评分降序排序,评分相同时按照价格升序排序
DSL
GET /hotel/_search
{
"query": {
"match": {
"name": "如家"
}
},
"sort": [
{
"score": "desc"
},
{
"price": "asc"
}
]
}
Java
// 分页和排序
@Test
void testPageSearch() throws IOException {
// 页码,和每页条数
int page = 1,size = 10;
// 准备request数据
SearchRequest request = new SearchRequest("hotel");
// 查询包含上海如家的数据
request.source().query(QueryBuilders.matchQuery("all","上海如家"));
// 按照价格升序排序
request.source().sort("price",SortOrder.ASC);
// 使用form size 分页查询数据
request.source().from((page - 1) * size).size(page * size);
// 发起查询
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
parsingData(response);
}
按照地理位置排序查询
查找距离自己最近的酒店
DSL语法
GET /hotel/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"_geo_distance": {
"location": {
"lat": 30.913828,
"lon": 121.50091
},
"order": "asc",
"unit": "km"
}
}
]
}
算分函数语法
算分函数function_scope query
查询名字中包含外滩的酒店,并且吧7天酒店的排名往前靠
GET hotel/_search
{
"query": {
"function_score": {
"query": {
"match": {
"all": "外滩"
}
},
"functions": [
{
"filter": {
"term": {
"brand": "7天酒店"
}
},
"weight": 10
}
],
"boost_mode": "sum"
}
}
}
查询结果高亮显示
DSL 语法
# 高亮查询
GET /hotel/_search
{
"query": {
"match": {
"all": "如家"
}
},
"highlight": {
"fields": {
"name": {
"require_field_match": "false"
}
}
}
}
Java代码
// 高亮查询
@Test
void testHighlight() throws IOException {
SearchRequest searchRequest = new SearchRequest("hotel");
searchRequest.source().query(QueryBuilders.matchQuery("all","如家"));
// 处理高亮
searchRequest.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
parsingData(search);
}
解析高亮代码
// 解析数据公共方法
void parsingData(SearchResponse response){
// 4.解析数据
SearchHits searchHits = response.getHits();
// 5.获取总数
long total = searchHits.getTotalHits().value;
System.out.println("共有:" + total + "条数据");
// 6.遍历数据
SearchHit[] hits = searchHits.getHits();
for (SearchHit hit : hits) {
String json = hit.getSourceAsString();
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
// 解析高亮
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
if(!CollectionUtils.isEmpty(highlightFields)){
HighlightField highlightField = highlightFields.get("name");
if (highlightField != null) {
String name = highlightField.getFragments()[0].string();
hotelDoc.setName(name);
}
}
System.out.println(hotelDoc);
}
}
多条件查询
查询条件如下
首先完善参数对象,添加对应的字段
@Data
public class HotelParams {
String key;
Integer page;
Integer size;
String sortBy;
String city;
String brand;
String starName;
Integer minPrice;
Integer maxPrice;
}
查询逻辑
@Autowired
RestHighLevelClient client;
@Override
public PageHotel hotelFilters(HotelParams params) {
try {
// 查询
SearchRequest searchRequest = new SearchRequest("hotel");
// 构建查询条件
buildQuery(params, searchRequest);
// 查询
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
// 解析数据
return parsingData(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void buildQuery(HotelParams params, SearchRequest searchRequest) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 查询关键字
String key = params.getKey();
if(key == null || "".equals(key)){
boolQuery.must(QueryBuilders.matchAllQuery());
}else{
boolQuery.must(QueryBuilders.matchQuery("all",key));
}
// 查询城市
if(!StringUtils.isEmpty(params.getCity())){
boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
}
// 查询品牌
if(!StringUtils.isEmpty(params.getBrand())){
boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
}
// 查询星级
if(!StringUtils.isEmpty(params.getStarName())){
boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
}
// 价格
if(params.getMinPrice() != null && params.getMaxPrice() != null){
boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
}
// 分页
int page = params.getPage();
int size = params.getSize();
searchRequest.source().query(boolQuery).from((page - 1) * size).size(size);
}
查询距离我最近的酒店,并显示距离
前端会传递过来一个经纬度
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-B7Bj9IN6-1687829496197)(https://szx-bucket1.oss-cn-hangzhou.aliyuncs.com/picgo/image-20230522210043421.png)]
需要在查询对象中添加对应的属性
添加查询逻辑,核心代码
// 根据距离查询
if(!StringUtils.isEmpty(params.getLocation())){
searchRequest.source().sort(SortBuilders
.geoDistanceSort("location",new GeoPoint(params.getLocation()))
.order(SortOrder.ASC)
.unit(DistanceUnit.KILOMETERS)
);
}
增加距离的解析
// 解析数据公共方法
PageHotel parsingData(SearchResponse response){
// 4.解析数据
SearchHits searchHits = response.getHits();
// 5.获取总数
long total = searchHits.getTotalHits().value;
// 6.遍历数据
SearchHit[] hits = searchHits.getHits();
ArrayList<HotelDoc> hotelDocArrayList = new ArrayList<>();
for (SearchHit hit : hits) {
String json = hit.getSourceAsString();
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
// 增加距离解析
Object[] sortValues = hit.getSortValues();
if(sortValues.length > 0){
hotelDoc.setDistance(sortValues[0]);
}
hotelDocArrayList.add(hotelDoc);
}
return new PageHotel(total,hotelDocArrayList);
}
效果展示
广告置顶功能
通过代码来控制算分
首先添加属性 isAD 来表示是否是广告
然后编写 DSL 找两个数据来更新文档
POST /hotel/_update/517915
{
"doc": {
"isAD": true
}
}
POST /hotel/_update/36934
{
"doc": {
"isAD": true
}
}
添加算分查询
private void buildQuery(HotelParams params, SearchRequest searchRequest) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 查询关键字
String key = params.getKey();
if(key == null || "".equals(key)){
boolQuery.must(QueryBuilders.matchAllQuery());
}else{
boolQuery.must(QueryBuilders.matchQuery("all",key));
}
// 查询城市
if(!StringUtils.isEmpty(params.getCity())){
boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
}
// 查询品牌
if(!StringUtils.isEmpty(params.getBrand())){
boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
}
// 查询星级
if(!StringUtils.isEmpty(params.getStarName())){
boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
}
// 价格
if(params.getMinPrice() != null && params.getMaxPrice() != null){
boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
}
// 算分控制
FunctionScoreQueryBuilder functionScoreQueryBuilder =
QueryBuilders.functionScoreQuery(
// 原始查询,相关算分查询
boolQuery,
// function score 的数组
new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
// 其中一个function score 元素
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
// 查询条件
QueryBuilders.termQuery("isAD",true),
// 算分函数
ScoreFunctionBuilders.weightFactorFunction(10)
)
}
);
searchRequest.source().query(functionScoreQueryBuilder);
// 分页
int page = params.getPage();
int size = params.getSize();
searchRequest.source().from((page - 1) * size).size(size);
// 根据距离查询
if(!StringUtils.isEmpty(params.getLocation())){
searchRequest.source().sort(SortBuilders
.geoDistanceSort("location",new GeoPoint(params.getLocation()))
.order(SortOrder.ASC)
.unit(DistanceUnit.KILOMETERS)
);
}
}
效果展示
数据聚合
什么是聚合
聚合是对文档数据的统计、分析、计算
聚合的常见种类有哪些
- Bucket:对文档进行分组,并统计每组数量
- Metric:对文档数据做计算,如 Max、Min
- Pipeline:基于其他聚合结果再聚合
参与聚合的字段类型必须是
- keyword
- 数值
- 日期
- 布尔
DSL实现聚合
按照酒店品牌做聚合
GET hotel/_search
{
"size": 0,
"aggs": {
"brandAgge": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"_count": "desc"
}
}
}
}
}
查询结果
默认按照 _count 降序返回,可以设置 "_count": "ase"
,实现升序返回
"order": {
"_count": "desc"
}
另外聚合查询时,会吧所有的数据查出来做聚合,这对内存的消耗很大,我们可以通过 query 来限制查询范围
例如:查询价格小于等于200的酒店做聚合
GET hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200
}
}
},
"size": 0,
"aggs": {
"brandAgge": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"_count": "asc"
}
}
}
}
}
查询结果
小结:
- aggs代表聚合,与query同级,此时query的作用是什么
- 限定聚合的文档范围
- 聚合必须的三要素
- 聚合名称
- 聚合类型
- 聚合字段
- 聚合可配置的属性有哪些
- size 指定聚合结果的数量
- order 指定聚合结果的排序方式
- field 指定聚合字段
嵌套聚合
嵌套聚合,求出每个品牌的平均得分,并按照平均分降序返回
GET hotel/_search
{
"size": 0,
"aggs": {
"brandAgge": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"scoreAgg.avg": "desc"
}
},
"aggs": {
"scoreAgg": {
"stats": {
"field": "score"
}
}
}
}
}
}
返回结果
RestAPI实现聚合
// 聚合查询
@Test
void aggregateQuery() throws IOException {
// 1.准备请求体
SearchRequest request = new SearchRequest("hotel");
// 2.准备DSL
request.source().size(0);
// 2.1 聚合查询
TermsAggregationBuilder brandAgg = AggregationBuilders.terms("brandAgg")
.field("brand")
.order(BucketOrder.aggregation("scoreAvg", false))
.size(20);
// 2.2 添加三个子聚合:平均值、最大值、最小值
brandAgg.subAggregation(AggregationBuilders.avg("scoreAvg").field("score"));
brandAgg.subAggregation(AggregationBuilders.max("scoreMax").field("score"));
brandAgg.subAggregation(AggregationBuilders.min("scoreMin").field("score"));
// 2.3 将桶聚合查询添加到请求体重
request.source().aggregation(brandAgg);
// 3 发出查询
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4. 解析结果
Terms brandTerms = response.getAggregations().get("brandAgg");
List<? extends Terms.Bucket> brandTermsBuckets = brandTerms.getBuckets();
System.out.println("|品牌|数量|平均分|最高分|最低分|");
for (Terms.Bucket bucket : brandTermsBuckets) {
String keyAsString = bucket.getKeyAsString();
long docCount = bucket.getDocCount();
// 获取平均分、最高分、最低分
Avg scoreAvg = bucket.getAggregations().get("scoreAvg");
Max scoreMax = bucket.getAggregations().get("scoreMax");
Min scoreMin = bucket.getAggregations().get("scoreMin");
System.out.print(keyAsString + "|");
System.out.print(docCount + "|");
System.out.print(scoreAvg.getValue() + "|");
System.out.print(scoreMax.getValue() + "|");
System.out.println(scoreMin.getValue() + "|");
}
}
返回结果
实现根据查询结果聚合
@Override
public HashMap<String, Object> filterList(HotelParams params) {
try {
SearchRequest request = new SearchRequest("hotel");
// 构建查询条件
buildQuery(params, request);
// 添加聚合
request.source().size(0);
// 酒店品牌做聚合
request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));
// 酒店城市做聚合
request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));
// 酒店评分聚合
request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));
// 查询
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 解析数据响应数据
Aggregations aggregations = response.getAggregations();
// 获取查询结果中的品牌聚合信息
List<String> brandAgg = getAggregationByName(aggregations, "brandAgg");
// 获取查询结果中的城市聚合信息
List<String> cityAgg = getAggregationByName(aggregations, "cityAgg");
// 获取查询结果中的品牌聚合信息
List<String> starAgg = getAggregationByName(aggregations, "starAgg");
HashMap<String, Object> filterMap = new HashMap<>();
filterMap.put("brand",brandAgg);
filterMap.put("city",cityAgg);
filterMap.put("star",starAgg);
return filterMap;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
封装的 getAggregationByName
方法,用来根据聚合名称,获取聚合结果
// 根据聚合名称,获取聚合结果
private List<String> getAggregationByName(Aggregations aggregations,String aggName){
Terms terms = aggregations.get(aggName);
List<? extends Terms.Bucket> buckets = terms.getBuckets();
List<String> list = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
String brandName = bucket.getKeyAsString();
list.add(brandName);
}
return list;
}
实现根据拼音自动补全
安装拼音分词器
我们可以实现根据拼音自动补全功能
GitHub地址:拼音分词器)
在线安装
es 是容器的名称,根据你的实际情况调整
下载的版本必须和你的 elasticsearch 版本一致
# 进入容器内部
docker exec -it es /bin/bash
# 在线下载并安装
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-pinyin/releases/download/v7.12.1/elasticsearch-analysis-pinyin-7.12.1.zip
#退出
exit
#重启容器
docker restart es
测试分词器是否安装成功
POST /_analyze
{
"text": ["如家酒店还不错"],
"analyzer": "pinyin"
}
查看结果
自定义分词器
在上面的例子中,我们通过拼音分词器实现了拼音分词的效果,但是中文没有保留下来,并且每一个字都有一个拼音,并不是一个词语来分词,这对我们来说是没有用的,所以,我们需要自定义分词器,来实现通过拼音分词时,按照词语来分词,并且保留中文和拼音。
elasticsearch中分词器(analyzer)的组成包含三部分
- character filter :在 tokennizer 之前对文本进行处理,例如:删除字符串、替换字符
- tokenizer:将文本按照一定的规则切割词条(term),例如:keyword,就是不分词,还有 ik_smart
- tokenizer filter:将 tokenizer 输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等
实现自定义分词器,运行如下函数
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}
上面这段的含义:
- 首先添加一个 test 索引库
- 设置分词器,分词器的名称叫做 my_analyzer
- 设置 tokenizer 使用 ik_max_word 分词
- filter 设置使用拼音分词器过滤,这里使用的代词 py
- 然后对 py 分词器作进一步设置,设置了他的分词策略
- 最后添加 mappings 映射,映射 name 字段,使用的分词器是 my_analyzer
测试自定义分词器
自定义分词器只能在当前索引中使用
POST /test/_analyze
{
"text": ["如家酒店还不错"],
"analyzer": "my_analyzer"
}
分词效果
测试在文档查询时的效果
首先往 test 索引库中添加几条文档
POST /test/_doc
{
"name":"狮子"
}
POST /test/_doc
{
"name":"柿子"
}
测试查询
情景一:通过拼音查询字段
GET test/_search
{
"query": {
"match": {
"name": "sz"
}
}
}
情景二:通过中文来查询
GET test/_search
{
"query": {
"match": {
"name": "动物园里有狮子"
}
}
}
自动补全
自动补全有特殊的要求:
- 参与补全查询的字段类型必须是:completion 类型
- 字段的内容一般是用来补全的多个词条组成
首先创建一个索引库
PUT test2
{
"mappings": {
"properties": {
"title":{
"type": "completion"
}
}
}
}
往索引库中添加文档
POST test2/_doc
{
"title": [
"Sony",
"WH-1000XM3"
]
}
POST test2/_doc
{
"title": [
"SK-II",
"PITERA"
]
}
POST test2/_doc
{
"title": [
"Nintendo",
"switch"
]
}
测试补全查询
GET test2/_search
{
"suggest": {
"titleSuggest": {
"text": "s",
"completion": {
"field": "title",
"skip_duplicates": true,
"size": 10
}
}
}
}
解释:
- 使用 suggest 查询,而不是使用 query 查询文档
- titleSuggest 自定义的字段,会作为属性返回
"text": "s"
:自动补全的开头词语"skip_duplicates": true
:去除重复值"size": 10
:查询前10条数据
实现酒店数据自动补全功能
修改索引库数据
先删除旧的索引库,在添加新的索引库
DELETE hotel
PUT /hotel
{
"settings": {
"analysis": {
"analyzer": {
"text_anlyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"id":{
"type": "keyword"
},
"name":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart",
"copy_to": "all"
},
"address":{
"type": "keyword",
"index": false
},
"price":{
"type": "integer"
},
"score":{
"type": "integer"
},
"brand":{
"type": "keyword",
"copy_to": "all"
},
"city":{
"type": "keyword"
},
"starName":{
"type": "keyword"
},
"business":{
"type": "keyword",
"copy_to": "all"
},
"location":{
"type": "geo_point"
},
"pic":{
"type": "keyword",
"index": false
},
"all":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart"
},
"suggestion":{
"type": "completion",
"analyzer": "completion_analyzer"
}
}
}
}
修改JAVA对象
添加 suggestion 字段
package cn.itcast.hotel.pojo;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@Data
@NoArgsConstructor
public class HotelDoc {
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String location;
private String pic;
private Object distance;
private Boolean isAD;
private List<String> suggestion;
public HotelDoc(Hotel hotel) {
this.id = hotel.getId();
this.name = hotel.getName();
this.address = hotel.getAddress();
this.price = hotel.getPrice();
this.score = hotel.getScore();
this.brand = hotel.getBrand();
this.city = hotel.getCity();
this.starName = hotel.getStarName();
this.business = hotel.getBusiness();
this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
this.pic = hotel.getPic();
if(this.business.contains("/")){
String[] list = this.business.split("/");
this.suggestion = new ArrayList<>();
this.suggestion.add(this.brand);
Collections.addAll(this.suggestion,list);
}else{
this.suggestion = Arrays.asList(this.business,this.brand);
}
}
}
重新执行添加文档的方法
// 批量新增文档
@Test
void testBulkRequest() throws IOException {
// 1.查询酒店的所有数据到list集合
List<Hotel> hotelList = hotelService.list();
// 2.创建request
BulkRequest bulkRequest = new BulkRequest();
// 3.使用for循环,将多个文档数据添加到request
for (Hotel hotel : hotelList) {
// 3.1转换成文档类型HotelDoc
HotelDoc hotelDoc = new HotelDoc(hotel);
// 3.2创建新增文档的request对象
bulkRequest.add(
new IndexRequest("hotel")
.id(hotelDoc.getId().toString())
.source(JSON.toJSONString(hotelDoc),XContentType.JSON)
);
}
// 4.发送请求
client.bulk(bulkRequest,RequestOptions.DEFAULT);
}
测试查询结果
GET hotel/_search
{
"suggest": {
"hotel_suggestion": {
"text": "pu",
"completion": {
"field": "suggestion",
"skip_duplicates": true,
"size": 100
}
}
}
}
使用Java代码实现自动补全查询
// 自动补全查询
@Test
void testSuggest() throws IOException {
// 准备 Request
SearchRequest request = new SearchRequest("hotel");
// 准备 DSL
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestion",
SuggestBuilders.completionSuggestion("suggestion")
.prefix("sh")
.skipDuplicates(true)
.size(10)
));
// 发起请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 解析数据
Suggest suggest = response.getSuggest();
// 根据名称获取补全结果
CompletionSuggestion suggestion = suggest.getSuggestion("suggestion");
// 获取所有的options
List<CompletionSuggestion.Entry.Option> options = suggestion.getOptions();
// 遍历options从中获取text
for (CompletionSuggestion.Entry.Option option : options) {
String text = option.getText().toString();
System.out.println(text);
}
}
调用接口实现自动补全
添加 controller
@GetMapping("suggestion")
List<String> suggestionSearch(@RequestParam("key") String key){
return hotelService.suggestionSearch(key);
}
实现 suggestionSearch 方法
@Override
public List<String> suggestionSearch(String key) {
try {
SearchRequest request = new SearchRequest("hotel");
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestion",
SuggestBuilders.completionSuggestion("suggestion")
.prefix(key)
.skipDuplicates(true)
.size(10)
));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
Suggest suggest = response.getSuggest();
CompletionSuggestion suggestion = suggest.getSuggestion("suggestion");
return suggestion.getOptions().stream()
.map(Suggest.Suggestion.Entry.Option::getText)
.map(Text::toString)
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
最终效果
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uUohxJTi-1687829496204)(https://szx-bucket1.oss-cn-hangzhou.aliyuncs.com/picgo/image-20230529212109611.png)]
数据同步
数据同步的三种方式
方式一:同步调用
- 优点:实现简单,粗暴
- 缺点:业务耦合度太高
方式二:异步通知
- 优点:低耦合,实现难度一般
- 缺点:依赖mq的可靠性
方式三:监听binlog
- 优点:完全解除服务间的耦合
- 确定:开启binlog增加数据库的负担,实现复杂度高
导入hotel-admin
在课程资料中导入 hotel-admin
运行查看
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yWAW4M9l-1687829496205)(https://szx-bucket1.oss-cn-hangzhou.aliyuncs.com/picgo/image-20230529221353718.png)]
声明队列和交换机
队列和交换机的声明需要在消费者声明,所以需要在 hotel-demo 项目中完成以下操作
引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加配置
spring:
rabbitmq:
host: 182.43.250.34 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: szx # 用户名
password: abc123 # 密码
声明常量类,用来配置交换机名称、队列名称、routingKey
package cn.itcast.hotel.constants;
/**
* @author songzx
* @date 2023/5/29
* @apiNote
*/
public class MqConstants {
/**
* 交换机
*/
public final static String HOTEL_EXCHANGE = "hotel_topic";
/**
* 监听新增和修改队列
*/
public final static String HOTEL_INSERT_QUEUE = "hotel_insert_queue";
/**
* 监听删除队列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel_delete_queue";
/**
* 新增获取修改的routingKey
*/
public final static String HOTEL_INERT_KEY = "hotel_insert_key";
/**
* 删除的routingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel_delete_key";
}
使用 Bean 注入的方式来声明交换机和队列
添加 MqConfig 配置类
package cn.itcast.hotel.config;
import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author songzx
* @date 2023/5/29
* @apiNote
*/
@Configuration
public class MqConfig {
/**
* 声明交换机
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
}
/**
* 插入获取修改队列
* @return
*/
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}
/**
* 删除队列
* @return
*/
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
}
/**
* 将新增和修改队列绑定到交换机,并声明routingKey
* @return
*/
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INERT_KEY);
}
/**
* 将删除队列绑定到交换机,并声明routingKey
* @return
*/
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
添加消息发送
以下操作需要在 hotel-admin 项目中完成
引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加配置
spring:
rabbitmq:
host: 182.43.250.34 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: szx # 用户名
password: abc123 # 密码
添加常量配置,将上面的 MqConstants 复制过来
添加发送消息代码
@Autowired
private IHotelService hotelService;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
// 往mq发送消息
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INERT_KEY,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
// 往mq发送消息
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
// 往mq发送消息
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
Sentinel
雪崩问题
什么是雪崩问题:微服务之间相互调用,因为调用链中的一个服务故障,引起的整个链路都无法访问的情况
如何避免雪崩问题:
- 超时处理:设定超时时间,请求超过一定的时间没有响应就返回错误信息,不会无休止的等待。
- 舱壁模式:限定每个业务能使用的线程数,避免耗尽整个tomcat的资源,因此也叫线程隔离。
- 熔断降级:由断路器统计业务执行的异常比例,如果超出闻值则会熔断该业务,拦截访问该业务的一切请求。
- 流量控制:限制业务访问的QPS,避免服务因流量的突增而故障。
服务保护的对比
安装Sentinel控制台
可以访问GitHUb官网下载,也可以从课程资料中获取
运行启动命令:
java -jar sentinel-dashboard-1.8.1.jar
# 或者指定端口启动,我这里为例防止和后面的微服务冲突,所以指定端口7001启动
java -Dserver.port=7001 -jar sentinel-dashboard-1.8.1.jar
默认启动的是8080端口,所以我们访问:http://localhost:7001/
默认账号密码都是:Sentinel
登录成功后默认只有一个欢迎页
这是因为我们目前还没有一个微服务接入
拿出我们之前学习的 cloud-demo 项目来继续学习
同时启动nacos,查看服务列表
微服务整合Sentinel
在 order-service 服务中安装依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
如果出现安装失败的问题,检查父组件的 Spring Cloud Alibaba 的依赖管理,是否添加作用域
<!--管理 alibaba-cloud 依赖的的文件-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
添加配置文件
server:
port: 8088
spring:
cloud:
sentinel:
transport:
dashboard: localhost:7001
重启完成后我们访问几次order服务下的接口:
http://localhost:8080/order/101
刷新 sentinel 控制台
流量控制
簇点链路
当请求进入微服务时,首先会访问DispatcherServlet,然后进入Controller、Service、Mapper,这样的一个调用链就叫做簇点链路。簇点链路中被监控的每一个接口就是一个资源。
默认情况下sentinel会监控SpringMVC的每一个端点(Endpoint,也就是controller中的方法),因此SpringMVC的每一个端点(Endpoint)就是调用链路中的一个资源。
例如,我们刚才访问的order-service中的OrderController中的端点:/order/{orderId}
流控、熔断等都是针对簇点链路中的资源来设置的,因此我们可以点击对应资源后面的按钮来设置规则:
- 流控:流量控制
- 降级:降级熔断
- 热点:热点参数限流,是限流的一种
- 授权:请求的权限控制
快速入门
点击接口后面的流控按钮
然后点击新增保存当前配置
打开接口压测工具,如果没有用过jmeter,可以参考课前资料提供的文档《Jmeter快速入门.md》
启动后查看返回