目录
- 谷粒商城知识点补充
- 前言
- 1 java8新特性之lambda表达式
- 1.1 为什么使用
- 1.2 从匿名类到Lambda的转换
- 1.3 语法
- 总结
- 2 Stream API
- 2.1 概述
- 2.2 创建
- 2.2.1 通过集合
- 2.2.2 通过数组
- 2.2.3 通过Stream的of()
- 2.2.4 创建无限流
- 2.3 中间操作
- 2.3.1 筛选与切片
- 1)、filter
- 2)、limit
- 3)、skip(n)
- 4)、distinct
- 2.3.2 映 射
- 1)、map
- 2)、flatMap
- 2.3.2 排序
- 1)、sorted()
- 2)、sorted(Comparator com)
- 2.4 终止操作
- 2.4.1 匹配与查找
- 1)、allMatch
- 2)、anyMatch
- 3)、noneMatch
- 4)、findFirst
- 5)、findAny
- 6)、count
- 7)、max
- 8)、min
- 9)、forEach
- 2.4.2 归约
- 1)、reduce
- 2.4.3 收集
- 1)、Collectors.toList()
- 2)、Collectors.toSet()
- 3)、Collectors.toCollection(HashSet::new)
- 4)、Collectors.maxBy()
- 5)、Collectors.minBy()
- 6)、Collectors.summingDouble()
- 7)、Collectors.averagingDouble()
- 8)、Collectors.counting()
- 9)、Collectors.summarizingDouble()
- 10)、Collectors.groupingBy()
- 11)、Collectors.groupingBy()
- 12)、Collectors.partitioningBy()
- 13)、Collectors.joining()
- 3 异步和线程池
- 3.1 线程回顾
- 3.1.1 初始化线程的4 种方式
- 3.1.2 线程池的七大参数
- 3.1.3 常见的4 种线程池
- 3.1.4 开发中为什么使用线程池
- 3.2 CompletableFuture 异步编排
- 3.2.1 创建异步对象
- 3.2.2 计算完成时回调方法
- 3.2.3 handle 方法
- 3.2.4 线程串行化方法
- 3.2.5 两任务组合- 都要完成
- 3.2.6 两任务组合- 一个完成
- 3.2.7 多任务组合
- 4 消息中间件 RabbitMQ
- 4.1 MQ前言
- 4.2 概述
- 4.3 RabbitMQ概念
- 4.4 RabbitMQ安装
- 4.5 RabbitMQ运行机制
- 4.6 SpringBoot整合RabbitMQ
- 4.7 AmqpAdmin使用
- 4.8 RabbitTemplate使用
- 4.9 RabbitListener&RabbitHandler接收消息
- 4.10 可靠投递-发送端确认
- 4.11 可靠投递-消费端确认
- 4.12 可靠投递-Ack消息确认机制
- 4.13 RabbitMQ延时队列(实现定时任务)
- 4.14 延时队列定时关单模拟
- 5 性能压测
- 5.4 性能压测
- 5.4.1 压力测试-基本介绍
- 1)、介绍
- 2)、性能指标
- 3)、jvm 内存模型
- 5.4.2 压力测试-jemter安装与使用
- 1)、下载与安装
- 2)、解决jmeter打不开问题
- 3)、使用
- 5.4.3 压力测试- jmeter在windows下地址占用bug解决
- 5.4.4 性能监控-堆内存与垃圾回收
- 5.4.5 性能监控-jvisualvm使用
- 1)、jvisualvm 能干什么
- 2)、安装插件方便查看gc
- 5.4.6 性能压测-优化-中间件对性能的影响
- 1)、中间件指标
- 2)、数据库指标
- 5.4.7 JVM 分析&调优
- 1)、几个常用工具
- 2)、命令示例
- 3)、调优项
- 5.4.8 性能压测(总)
- 6 接口幂等性
- 6.1 什么是幂等性
- 6.2 哪些情况需要防止
- 6.3 什么情况下需要幂等
- 6.4 幂等解决方案
- 6.4.1 token 机制
- 6.4.2 各种锁机制
- 1)、数据库悲观锁
- 2)、数据库乐观锁
- 3)、业务层分布式锁
- 6.4.3 各种唯一约束
- 1)、数据库唯一约束
- 2)、redis set 防重
- 3)、防重表
- 4)、全局请求唯一 id
- 7 ElasticSearch
- 7.1 简介
- 7.2 基本概念
- 7.3 Docker安装Elasticsearch
- 7.4 初步检索
- 7.4.1 _cat
- 7.4.2 索引一个文档(保存)
- 7.4.3 查询文档
- 7.4.4 更新文档
- 7.4.5 删除文档&索引
- 7.4.6 ES的批量操作——bulk
- 7.4.7 样本测试数据
- 7.5 进阶检索
- 7.5.1 SearchAPI
- 1)检索信息
- 7.5.2 Query DSL
- 1) 基本语法格式
- 2) 返回部分字段
- 3) match【匹配查询】
- 4) match_phrase【短语匹配】
- 5) multi_match【多字段匹配】
- 6) bool【复合查询】
- 7) filter【结果过滤】
- 8) term
- 9) aggregations(执行聚合)
- 7.5.3 Mapping
- 1)字段类型
- 2) 映射
- 3) 新版本改变
- 1. 创建映射
- 2. 添加新的字段映射
- 3. 更新映射
- 4.数据迁移
- 7.5.4 分词
- 1) 安装ik 分词器
- 2)修改linux网络设置
- 3) 测试分词器
- 4)自定义词库
- 7.6 Elasticsearch-Rest-Client
- 7.6.1 9300: TCP
- 7.6.2 9200: HTTP
- 7.7 SpringBoot整合ElasticSearch
- 7.7.1 导入依赖
- 7.7.2 测试
- 1) 保存数据
- 2)获取数据
- 8 分布式事务
- 前言 本地事务在分布式下的问题
- 8.1 本地事务
- 8.1.1 事务的基本性质
- 8.1.2 事务的隔离级别
- 8.1.3 事务的传播行为
- 8.1.4 SpringBoot 事务关键点
- 8.1.5 实际案例剖析
- 8.2 分布式事务
- 8.2.1 为什么有分布式事务
- 8.2.2 CAP 定理
- 8.2.3 BASE 理论
- 8.2.4 强一致性、弱一致性、最终一致性
- 8.2.5 分布式事务几种方案
- 8.2.6 seata解决分布式事务问题
- 8.2.7 最终一致性库存解锁逻辑
- 9 SpringCloud Alibaba-Sentinel
- 9.1 简介
- 9.1.1 熔断降级限流
- 9.2 Sentinel
- 9.2.1 Sentinel简介
- **9.2.2 Hystrix 与Sentinel 比较**
- 9.2.3 整合Feign+Sentinel 测试熔断降级
- 9.2.4 测试启动中出现的问题
谷粒商城知识点补充
前言
在学习谷粒商城项目中,会遇到一些知识已经学过但是没有使用就忘记的,或者是没有学过的,这篇文章就是将谷粒商城中所有不懂的知识点进行一个补充和完善的。
1 java8新特性之lambda表达式
1.1 为什么使用
Lambda是一个 匿名函数 ,我们可以把 Lambda 表达式理解为是 一段可以传递的代码 (将代码像数据一样进行传递)。使用它可以写出更简洁、更灵活的代码。作为一种更紧凑的代码风格,使 Java 的语言表达能力得到了提升。
1.2 从匿名类到Lambda的转换
1.3 语法
- -> :lambda操作符 或 箭头操作符
- -> 左边: lambda形参列表(其实就是接口中的抽象方法的形参)
- -> 右边: lambda体(其实就是重写的抽象方法的方法体)
总结
-> 左边: lambda形参列表的参数类型可以省略(类型推断),如果形参列表只有一个参数,其一对()也可以省略
-> 右边: lambda体应该使用一对{}包裹;如果lambda体只执行一条语句(可能是return语句),可以省略这一对{}和return关键字
2 Stream API
2.1 概述
2.2 创建
2.2.1 通过集合
//创建 Stream方式一:通过集合
@Test
public void test1(){
List<Employee> employees = EmployeeData.getEmployees();
// default Stream<E> stream() : 返回一个顺序流
Stream<Employee> stream = employees.stream();
// default Stream<E> parallelStream() : 返回一个并行流
Stream<Employee> parallelStream = employees.parallelStream();
}
2.2.2 通过数组
//创建 Stream方式二:通过数组
@Test
public void test2(){
int[] arr = new int[]{1,2,3,4,5,6};
//调用Arrays类的static <T> Stream<T> stream(T[] array): 返回一个流
IntStream stream = Arrays.stream(arr);
Employee e1 = new Employee(1001,"Tom");
Employee e2 = new Employee(1002,"Jerry");
Employee[] arr1 = new Employee[]{e1,e2};
Stream<Employee> stream1 = Arrays.stream(arr1);
}
2.2.3 通过Stream的of()
//创建 Stream方式三:通过Stream的of()
@Test
public void test3(){
Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5, 6);
}
2.2.4 创建无限流
//创建 Stream方式四:创建无限流
@Test
public void test4(){
// 迭代
// public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)
//遍历前10个偶数
Stream.iterate(0, t -> t + 2).limit(10).forEach(System.out::println);
// 生成
// public static<T> Stream<T> generate(Supplier<T> s)
Stream.generate(Math::random).limit(10).forEach(System.out::println);
}
2.3 中间操作
多个中间操作 可以连接起来形成一个 流水线 ,除非流水线上触发终止操作,否则 中间操作不会执行任何的处理 !而在 终止操作时一次性全部处理,称为“惰性求值”。
2.3.1 筛选与切片
1)、filter
接收 Lambda,从流中排除某些元素
// (1)、filter——接收 Lambda , 从流中排除某些元素。
@Test
public void testFilter() {
//这里加入了终止操作 ,不然中间操作一系列不会执行
//中间操作只有在碰到终止操作才会执行
emps.stream()
.filter((e)->e.getAge()>18)
.forEach(System.out::println);//终止操作
}
注意:这里filter主要是过滤一些条件,这里的话就是把年龄小于18岁的Employee对象给过滤掉,然后用forEach给遍历一下,因为中间操作只有碰到终止操作才会执行,不然的话,看不到过滤效果。以下的操作都大部分都forEach了一下,为方便看到效果。filter用的还是很多的.
2)、limit
截断流,使其元素不超过给定数量
// (2)、limit——截断流,使其元素不超过给定数量。
@Test
public void testLimit() {
emps.stream()
.filter((e)->e.getAge()>8)
.limit(4)//跟数据库中的limit有异曲同工之妙
.forEach(System.out::println);//终止操作
}
注意:这里用了上面的filter跟limit,代码意思是:过滤掉年龄小于8的,只要4条数据。这种".“式操作很有意思,就是中间操作都可以一直”.",直到得到你想要的要求。
3)、skip(n)
跳过元素,返回一个扔掉了前n个元素的流。若流中元素不足n个,则返回一个空流。与limit(n) 互补
// (3)、skip(n) —— 跳过元素,返回一个扔掉了前 n 个元素的流。若流中元素不足 n 个,则返回一个空流。与 limit(n) 互补
@Test
public void testSkip() {
emps.stream()
.filter((e)->e.getAge()>8)
.skip(2)//这里可以查找filter过滤后的数据,前两个不要,要后面的,与limit相反
.forEach(System.out::println);//终止操作
}
注意:这里同样使用了filter中间操作,也可以不用,代码意思是:过滤掉年龄小于8岁的employee对象,然后前两个对象不要,只要后面的对象。跟limit意思相反。
4)、distinct
筛选,通过流所生成元素的 hashCode() 和 equals() 去除重复元素
// (4)、distinct——筛选,通过流所生成元素的 hashCode() 和 equals() 去除重复元素
@Test
public void testDistinct() {
emps.stream()
.distinct()//去除重复的元素,因为通过流所生成元素的 hashCode() 和 equals() 去除重复元素,所以对象要重写hashCode跟equals方法
.forEach(System.out::println);//终止操作
}
注意:distinct,去除重复的元素,因为通过流所生成元素的 hashCode() 和 equals() 去除重复元素,所以对象要重写hashCode跟equals方法,我在Employee对象中重写了这两个方法。
2.3.2 映 射
1)、map
接收Lambda,将元素转换成其他形式或提取信息。接收一个函数作为参数,该函数会被应用到每个元素上,并将其映射成一个新的元素。
2)、flatMap
接收一个函数作为参数,将流中的每个值都转换成另一个流,然后把所有流连接成一个流。
// map-接收Lambda,将元素转换成其他形式或提取信息。接收一个函数作为参数,该函数会被应用到每个元素上,并将其映射成一个新的元素。
@Test
public void testMapAndflatMap() {
List<String> list=Arrays.asList("aaa","bbb","ccc","ddd");
list.stream()
.map((str)->str.toUpperCase())//里面是Function
.forEach(System.out::println);
System.out.println("----------------------------------");
//这里是只打印名字,map映射,根据Employee::getName返回一个name,映射成新的及结果name
emps.stream()
.map(Employee::getName)
.forEach(System.out::println);
System.out.println("======================================");
//流中流
Stream<Stream<Character>> stream = list.stream()
.map(StreamAPI::filterCharacter);
//{{a,a,a},{b,b,b}}
//map是一个个流(这个流中有元素)加入流中
stream.forEach(sm->{
sm.forEach(System.out::println);
});
System.out.println("=============引进flatMap=============");
// 只有一个流
Stream<Character> flatMap = list.stream()
.flatMap(StreamAPI::filterCharacter);
//flatMap是将一个个流中的元素加入流中
//{a,a,a,b,b,b}
flatMap.forEach(System.out::println);
}
/**
* 测试map跟flatMap的区别
* 有点跟集合中的add跟addAll方法类似
* add是将无论是元素还是集合,整体加到其中一个集合中去[1,2,3.[2,3]]
* addAll是将无论是元素还是集合,都是将元素加到另一个集合中去。[1,2,3,2,3]
* @param str
* @return
*/
public static Stream<Character> filterCharacter(String str){
List<Character> list=new ArrayList<>();
for (Character character : str.toCharArray()) {
list.add(character);
}
return list.stream();
}
注意:map跟flatMap还是有区别的,map是一个个流(这个流中有元素)加入流中,flatMap是将一个个流中的元素加入流中.
2.3.2 排序
1)、sorted()
自然排序(Comparable)
2)、sorted(Comparator com)
定制排序(Comparator)
@Test
public void testSorted() {
//自然排序
List<String> list=Arrays.asList("ccc","aaa","bbb","ddd","eee");
list.stream()
.sorted()
.forEach(System.out::println);
System.out.println("=======定制排序=========");
//定制排序
emps.stream()
.sorted((x, y) -> {
if(x.getAge() == y.getAge()){
return x.getName().compareTo(y.getName());
}else{
return Integer.compare(x.getAge(), y.getAge());
}
}).forEach(System.out::println);
}
2.4 终止操作
- 终端操作会从流的流水线生成结果。其结果可以是任何不是流的值,例如: List 、 Integer ,甚至是 void 。
- 流进行了终止操作后,不能再次使用。
2.4.1 匹配与查找
1)、allMatch
检查是否匹配所有元素
System.out.println("==========allMatch==============");
boolean allMatch = emps.stream()
.allMatch((e)->e.getStatus().equals(Status.BUSY));
System.out.println(allMatch);
2)、anyMatch
检查是否至少匹配一个元素
System.out.println("==========anyMatch==============");
boolean anyMatch = emps.stream()
.anyMatch((e)->e.getAge()>10);
System.out.println(anyMatch);
3)、noneMatch
检查是否没有匹配的元素
System.out.println("==========noneMatch==============");
boolean noneMatch = emps.stream()
.noneMatch((e)->e.getStatus().equals(Status.BUSY));
System.out.println(noneMatch);
4)、findFirst
返回第一个元素
System.out.println("==========findFirst==============");
Optional<Employee2> findFirst = emps.stream()
.sorted((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary()))//按照工资排序并输出第一个
.findFirst();
System.out.println(findFirst);
5)、findAny
返回当前流中的任意元素
System.out.println("==========findAny==============");
Optional<Employee2> findAny = emps.stream()
.filter((e)->e.getStatus().equals(Status.BUSY))
.findAny();
System.out.println(findAny);
6)、count
返回流中元素的总个数
System.out.println("==========count==============");
long count = emps.stream()
.count();
System.out.println(count);
7)、max
返回流中最大值
System.out.println("==========max==============");
Optional<Double> max = emps.stream()
.map(Employee2::getSalary)
.max(Double::compare);
System.out.println(max);
8)、min
返回流中最小值
System.out.println("==========min==============");
Optional<Employee2> min = emps.stream()
.min((e1,e2)->Double.compare(e1.getSalary(), e2.getSalary()));
System.out.println(min);
9)、forEach
在中间操作代码处,已经多次使用了,这里不再赘述。
2.4.2 归约
1)、reduce
reduce(T identity, BinaryOperator) / reduce(BinaryOperator)
可以将流中元素反复结合起来,得到一个值
@Test
public void testReduce() {
List<Integer> list= Arrays.asList(1,2,3,4,5,6,7,8,9,10);
Integer sum = list.stream()
.reduce(0,(x,y)->x+y);
System.out.println(sum);
Optional<Double> reduce = emps.stream()
.map(Employee2::getSalary)
.reduce(Double::sum);
System.out.println(reduce.get());
}
2.4.3 收集
Collector接口中方法的实现决定了如何对流执行收集操作(如收集到List、Set、Map)。但是Collectors实用类提供了很多静态方法,可以方便地创建常见收集器实例,具体方法与实例如下表:
1)、Collectors.toList()
List<String> collect = emps.stream()
.map(Employee2::getName)
.collect(Collectors.toList());
collect.forEach(System.out::println);
2)、Collectors.toSet()
Set<String> collect2 = emps.stream()
.map(Employee2::getName)
.collect(Collectors.toSet());
collect2.forEach(System.out::println);
3)、Collectors.toCollection(HashSet::new)
HashSet<String> collect3 = emps.stream()
.map(Employee2::getName)
.collect(Collectors.toCollection(HashSet::new));
collect3.forEach(System.out::println);
4)、Collectors.maxBy()
Optional<Double> collect = emps.stream()
.map(Employee2::getSalary)
.collect(Collectors.maxBy(Double::compare));
System.out.println(collect.get());
Optional<Employee2> collect2 = emps.stream()
.collect(Collectors.maxBy((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary())));
System.out.println(collect2.get());
5)、Collectors.minBy()
Optional<Double> collect4 = emps.stream()
.map(Employee2::getSalary)
.collect(Collectors.minBy(Double::compare));
System.out.println(collect4);
Optional<Employee2> collect3 = emps.stream()
.collect(Collectors.minBy((e1,e2)->Double.compare(e1.getSalary(),e2.getSalary())));
System.out.println(collect3.get());
6)、Collectors.summingDouble()
Double collect5 = emps.stream()
.collect(Collectors.summingDouble(Employee2::getSalary));
System.out.println(collect5);
7)、Collectors.averagingDouble()
Double collect6 = emps.stream()
.collect(Collectors.averagingDouble((e)->e.getSalary()));
Double collect7 = emps.stream()
.collect(Collectors.averagingDouble(Employee2::getSalary));
System.out.println("collect6:"+collect6);
System.out.println("collect7:"+collect7);
8)、Collectors.counting()
//总数
Long collect8 = emps.stream()
.collect(Collectors.counting());
System.out.println(collect8);
9)、Collectors.summarizingDouble()
DoubleSummaryStatistics collect9 = emps.stream()
.collect(Collectors.summarizingDouble(Employee2::getSalary));
long count = collect9.getCount();
double average = collect9.getAverage();
double max = collect9.getMax();
double min = collect9.getMin();
double sum = collect9.getSum();
System.out.println("count:"+count);
System.out.println("average:"+average);
System.out.println("max:"+max);
System.out.println("min:"+min);
System.out.println("sum:"+sum);
10)、Collectors.groupingBy()
//分组
@Test
public void testCollect3() {
Map<Status, List<Employee2>> collect = emps.stream()
.collect(Collectors.groupingBy((e)->e.getStatus()));
System.out.println(collect);
Map<Status, List<Employee2>> collect2 = emps.stream()
.collect(Collectors.groupingBy(Employee2::getStatus));
System.out.println(collect2);
}
11)、Collectors.groupingBy()
//多级分组
@Test
public void testCollect4() {
Map<Status, Map<String, List<Employee2>>> collect = emps.stream()
.collect(Collectors.groupingBy(Employee2::getStatus, Collectors.groupingBy((e)->{
if(e.getAge() >= 60)
return "老年";
else if(e.getAge() >= 35)
return "中年";
else
return "成年";
})));
System.out.println(collect);
}
12)、Collectors.partitioningBy()
//多级分组
@Test
public void testCollect4() {
Map<Status, Map<String, List<Employee2>>> collect = emps.stream()
.collect(Collectors.groupingBy(Employee2::getStatus, Collectors.groupingBy((e)->{
if(e.getAge() >= 60)
return "老年";
else if(e.getAge() >= 35)
return "中年";
else
return "成年";
})));
System.out.println(collect);
}
13)、Collectors.joining()
//组接字符串
@Test
public void testCollect6() {
String collect = emps.stream()
.map((e)->e.getName())
.collect(Collectors.joining());
System.out.println(collect);
String collect3 = emps.stream()
.map(Employee2::getName)
.collect(Collectors.joining(","));
System.out.println(collect3);
String collect2 = emps.stream()
.map(Employee2::getName)
.collect(Collectors.joining(",", "prefix", "subfix"));
System.out.println(collect2);
}
@Test
public void testCollect7() {
Optional<Double> collect = emps.stream()
.map(Employee2::getSalary)
.collect(Collectors.reducing(Double::sum));
System.out.println(collect.get());
}
3 异步和线程池
3.1 线程回顾
3.1.1 初始化线程的4 种方式
1)、继承Thread
2)、实现Runnable 接口
3)、实现Callable 接口+ FutureTask (可以拿到返回结果,可以处理异常)
4)、线程池
方式1 和方式2:主进程无法获取线程的运算结果。不适合当前场景
方式3:主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。可以导致
服务器资源耗尽。
方式4:通过如下两种方式初始化线程池
Executors.newFiexedThreadPool(3);
//或者
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit unit,
workQueue, threadFactory, handler);
通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一
个异步调用可能会依赖于另一个异步调用的执行结果。
public static class Thread01 extends Thread{
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}
}
public static class Runnable01 implements Runnable{
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}
}
public static class Callable01 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}
}
/** * 1)、继承Thread * Thread01 thread = new Thread01(); * thread.start();//启动线程 * 2)、实现Runnable 接口 * Runnable01 runnable01 = new Runnable01(); * new Thread(runnable01).start(); * 3)、实现Callable 接口+ FutureTask (可以拿到返回结果,可以处理异常) * FutureTask<Integer> futureTask = new FutureTask<>(new Callable01()); * new Thread(futureTask).start(); * //阻塞等待整个线程执行完成,获取返回结果 * Integer integer = futureTask.get(); * 4)、线程池【ExecutorService】 * 给线程池直接提交任务。 * service.execute(new Runnable01()); * 1、创建: * 1)、Executors * 2)、new ThreadPoolExecutor * * Future:可以获取到异步结果 * *区别: * 1、2不能得到返回值,3可以获取返回值。 * 1、2、3都不能控制资源 * 4可以控制资源,性能稳定。 */ //我们以后在任务代码里面,以上三种启动线程的方式都不用。【将所有的多线程异步任务都交给线程池执行】 // new Thread(()-> System.out.println("hello")).start(); //当前系统中池只有一两个,每个异步任务,提交给线程池让他自己去执行就行
3.1.2 线程池的七大参数
- @param corePoolSize the number of threads to keep in the pool, even if they are idle, unless {@code allowCoreThreadTimeOut} is set
池中一直保持的线程的数量,即使线程空闲。除非设置了allowCoreThreadTimeOut- @param maximumPoolSize the maximum number of threads to allow in the pool
池中允许的最大的线程数- @param keepAliveTime when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
当线程数大于核心线程数的时候,线程在最大多长时间没有接到新任务就会终止释放,最终线程池维持在corePoolSize 大小- @param unit the time unit for the {@code keepAliveTime} argument
时间单位- @param workQueue the queue to use for holding tasks before they are executed. This queue will hold only the {@code Runnable} tasks submitted by the {@code execute} method.
阻塞队列,用来存储等待执行的任务,如果当前对线程的需求超过了corePoolSize
大小,就会放在这里等待空闲线程执行。- @param threadFactory the factory to use when the executor creates a new thread
创建线程的工厂,比如指定线程名等- @param handler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
拒绝策略,如果线程满了,线程池就会使用拒绝策略。
// 最原始的方式
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
200,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
运行流程:
1、线程池创建,准备好core 数量的核心线程,准备接受任务
2、新的任务进来,用core 准备好的空闲线程执行。
(1) 、core 满了,就将再进来的任务放入阻塞队列中。空闲的core 就会自己去阻塞队
列获取任务执行
(2) 、阻塞队列满了,就直接开新线程执行,最大只能开到max 指定的数量
(3) 、max 都执行好了。Max-core 数量空闲的线程会在keepAliveTime 指定的时间后自
动销毁。最终保持到core 大小
(4) 、如果线程数开到了max 的数量,还有新任务进来,就会使用reject 指定的拒绝策
略进行处理
3、所有的线程创建都是由指定的factory 创建的。
面试:
一个线程池core 7; max 20 ,queue:50,100 并发进来怎么分配的;
先有7 个能直接得到执行,接下来50 个进入队列排队,在多开13 个继续执行。现在70 个
被安排上了。剩下30 个默认拒绝策略。
/** * 七大参数 * corePoolSize:[5]核心线程数【一直存在除非(allowCoreThreadTimeOut)】;线程池,创建好以后就准备就绪的线程数量,就等待来接收异步任务去执行。 * 5个 Thread thread = new Thread(); thread.start(); * * maximumPoolSize:[200] 最大线程数量;控制资源 * keepAliveTime:存活时间。如果当前的线程数量大于 core数量。 * 释放空闲的线程(maximumPoolSize - corePoolSize)。只要线程空闲大于指定的keepAliveTime; *unit:时间单位 * BlockingQueue<Runnable> workQueue:阻塞队列。如果任务有很多,就会将目前多的任务放在队列里面。 * 只要有线程空闲,就回去队列里面取出新的任务继续执行。 * threadFactory:线程的创建工厂。 * RejectedExecutionHandler handler:如果队列满了,按照我们指定的拒绝策略拒绝执行任务。 * * * * 工作顺序: * 1)、线程池创建,准备好core数量的核心线程,准备接收任务 * 1.1、core满了,就将再进来的任务放入阻塞队列中。空闲的core就会自己去阻塞队列获取任务执行 * 1.2、阻塞队列满了,就直接开新线程执行,最大只能开到马西西指定的数量 * 1.3、max满了就用RejectedExecutionHandler拒绝任务 * 1.4、max都执行完成,有很多空闲,在指定的时间keepAliveTime以后,释放max-core这些线程 * * new LinkedBlockingDeque<>():默认是Integer的最大值。内存不够。 *面试题1:一个线程池core 7; max 20 ,queue:50,100 并发进来怎么分配的; * 7个会立即执行,50个会进入阻塞队列,因为最大线程数量是20,所以再开13个进行执行。剩下的30个就使用拒绝策略。 * 如果不想抛弃还要执行:CallerRunsPolicy 同步方式 * * */
3.1.3 常见的4 种线程池
- newCachedThreadPool
- 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若
无可 回收,则新建线程。
- 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若
- newFixedThreadPool
- 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- newScheduledThreadPool
- 创建一个定长线程池,支持定时及周期性任务执行。
- newSingleThreadExecutor
- 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务
按照指定顺序(FIFO, LIFO, 优先级)执行。
//快速创建线程池
Executors.newCachedThreadPool(); //core是0 所有都可回收
Executors.newFixedThreadPool(10);//固定大小 core=max 都不可以回收
Executors.newScheduledThreadPool(10); //定时任务的线程池
Executors.newSingleThreadExecutor(); //单线程的线程池,后台从队列里面获取任务 挨个执行
3.1.4 开发中为什么使用线程池
- 降低资源的消耗
- 通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
- 提高响应速度
- 因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务
的状态,当任务来时无需创建新的线程就能执行
- 因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务
- 提高线程的可管理性
- 线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来
的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使
用线程池进行统一分配
- 线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来
3.2 CompletableFuture 异步编排
业务场景:
查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。
假如商品详情页的每个查询,需要如下标注的时间才能完成;那么,用户需要5.5s 后才能看到商品详情页的内容。很显然是不能接受的。
如果有多个线程同时完成这6 步操作,也许只需要1.5s 即可完成响应。
Future 是Java 5 添加的类,用来描述一个异步计算的结果。你可以使用isDone
方法检查计算是否完成,或者使用get
阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel
方法停止任务的执行。
虽然Future
以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
很多语言,比如Node.js,采用回调的方式实现异步编程。Java 的一些框架,比如Netty,自己扩展了Java 的Future
接口,提供了addListener
等多个扩展方法;Google guava 也提供了通用的扩展Future;Scala 也提供了简单易用且功能强大的Future/Promise 异步编程模式。
作为正统的Java 类库,是不是应该做点什么,加强一下自身库的功能呢?
在Java 8 中, 新增加了一个包含50 个方法左右的类: CompletableFuture,提供了非常强大的Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture 的方法。
CompletableFuture 类实现了Future 接口,所以你还是可以像以前一样通过get
方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。
CompletableFuture 和FutureTask 同属于Future 接口的实现类,都可以获取线程的执行结果。
3.2.1 创建异步对象
CompletableFuture 提供了四个静态方法来创建一个异步操作。
1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
2、可以传入自定义的线程池,否则就用默认的线程池;
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start....");
// CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// System.out.println("当前线程:" + Thread.currentThread().getId());
// int i = 10 / 2;
// System.out.println("运行结果:" + i);
// }, executor);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executor);
Integer integer = future.get();
System.out.println("main....end...." + integer);
}
以下测试都需要使用到自定义的线程池:Executors.newFixedThreadPool(10);
3.2.2 计算完成时回调方法
whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。
whenComplete 和whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程执行继续执行whenComplete 的任务,同一个线程。
whenCompleteAsync:是执行把whenCompleteAsync 这个任务继续提交给线程池来进行执行,可能是新的线程。
方法不以Async 结尾,意味着Action 使用相同的线程执行,而Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start....");
// CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// System.out.println("当前线程:" + Thread.currentThread().getId());
// int i = 10 / 2;
// System.out.println("运行结果:" + i);
// }, executor);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).whenComplete((res,exception)->{
//虽然能得到异常信息,但是没法修改返回数据。
System.out.println("异步任务完成了。。。结果是:" + res + ";异常是:" + exception);
}).exceptionally(throwable -> {
//可以感知异常,同时返回默认值
return 10;
});
Integer integer = future.get();
System.out.println("main....end...." + integer);
}
lambda表达式中的参数解释:
res:返回结果
exception:异常
3.2.3 handle 方法
和complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。
/**
* 方法执行完成后的处理
*/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executor).handle((res,thr)->{
if (res != null){
return res * 2;
}
if (thr != null){
return 0;
}
return 0;
});
// R apply(T t, U u)
Integer integer = future.get();
System.out.println("main....end...." + integer);
Async:异步。重新开线程进行任务
sync:同步。公用一个线程进行任务。
3.2.4 线程串行化方法
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun 方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun 的后续操作。
带有Async 默认是异步执行的。同之前。
以上都要前置任务成功完成。
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
thenrun
thenAcceptAsync
thenApplyAsync
/**
* 线程串行化
* 1)、thenRun:不能获取到上一步的执行结果,无返回值
* .thenRunAsync(()->{
* System.out.println("任务2启动了....");
* },executor);
* 2)、thenAcceptAsync能接受上一步结果,但是无返回值
* .thenAcceptAsync(res ->{
* System.out.println("线程2启动了..." + res);
* },executor);
* // void accept(T t);
* 3)、thenApplyAsync:能接受上一步结果,有返回值
* .thenApplyAsync(res -> {
* System.out.println("线程2启动了..." + res);
* return "Hello" + res;
* }, executor);
* // R apply(T t);
*/
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executor).thenApplyAsync(res -> {
System.out.println("线程2启动了..." + res);
return "Hello" + res;
}, executor);
// R apply(T t);
3.2.5 两任务组合- 都要完成
两个任务必须都完成,触发该任务。
thenCombine:组合两个future,获取两个future 的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取两个future 任务的返回结果,然后处理任务,没有
返回值。
runAfterBoth:组合两个future,不需要获取future 的结果,只需两个future 处理完任务后,
处理该任务。
runAfterBothAsync
thenAcceptBothAsync
thenCombineAsync
/**
* 两个都完成
*/
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任务1结束:" + i);
return i;
}, executor);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程:" + Thread.currentThread().getId());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2结束:");
return "Hello";
}, executor);
// 不能感知到前面两个的结果
future01.runAfterBothAsync(future02,()->{
System.out.println("任务3开始...");
},executor);
// 能接受前面两个的返回值
// void accept(T t, U u);
future01.thenAcceptBothAsync(future02,(f1,f2)->{
System.out.println("任务3开始...之前的结果:" + f1 + ">-----" + f2);
},executor);
// 能接受前面两个的返回值,并且自己也能处理返回值
// R apply(T t, U u);
CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
return f1 + ":" + f2 + "- > Haha";
}, executor);
3.2.6 两任务组合- 一个完成
当两个任务中,任意一个future 任务完成的时候,执行任务。
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
runAfterEither:两个任务有一个执行完成,不需要获取future 的结果,处理任务,也没有返
回值。
runAfterEitherAsync
acceptEitherAsync
applyToEitherAsync
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任务1结束:" + i);
return i;
}, executor);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程:" + Thread.currentThread().getId());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2结束:");
return "Hello";
}, executor);
/**
* 两个任务,只要有一个完成,我们就执行任务3
* runAfterEitherAsync:不感知结果,自己没有返回值
* acceptEitherAsync:感知结果,自己没有返回值
* acceptEitherAsync:感知结果,自己有返回值
*/
future01.runAfterEitherAsync(future02,()->{
System.out.println("任务3开始...之前的结果:");
},executor);
// void accept(T t);
future01.acceptEitherAsync(future02,(res)->{
System.out.println("任务3开始...之前的结果:" + res);
},executor);
// R apply(T t);
CompletableFuture<String> future = future01.applyToEitherAsync(future02, res -> {
System.out.println("任务3开始...之前的结果:" + res);
return res.toString() + "->哈哈";
}, executor);
3.2.7 多任务组合
allOf:等待所有任务完成
anyOf:只要有一个任务完成
allOf
anyOf
/**
* 多任务完成
*/
CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品的图片信息");
return "hello.jpg";
}, executor);
CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("查询商品的属性");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "黑色+256G";
}, executor);
CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品介绍");
return "华为";
}, executor);
// CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
// allOf.get();//等待所有结果完成
// System.out.println("main....end...."+futureImg.get()+"->"+futureAttr.get()+"->"+futureDesc.get());
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);
// allOf.get();//等待所有结果完成
System.out.println("main....end...."+anyOf.get());
4 消息中间件 RabbitMQ
4.1 MQ前言
消息队列应用场景一:异步处理
第一种模式我们必须等各个操作的做完才能返回响应,例如:发送邮件、发送短信能不能收到其实并不是侧重点,因此。可以启动两个线程来执行,也就是第二种模式,在此基础上还可以进行优化就是使用消息中间件,将注册消息存入消息队列中让邮件服务、短信服务慢慢去执行从而提升性能。
消息队列应用场景二:应用解耦
例如当我们下订单需要去调用库存系统的接口,但是库存系统的接口经常需要升级,从而导致需要去修改订单系统的源代码,因此,我们可以将订单信息写入消息队列中不管库存系统如何升级,只需要订阅去执行即可从而达到解耦的作用。
消息队列应用场景三:流量控制(流量削峰)
例如秒杀系统,当百万级别的请求向后台发送后台是会宕机的,因此,将请求消息写入消息对了中由后台慢慢的去处理,提高系统的高可用性。
4.2 概述
消息代理:替我们接收、发送消息的服务器
目的地:消息发送到哪里
点对点模式:有很多的消息的接收者,但消息的接受者只能有一个,谁能拿到消息需要靠抢;
RabbitMQ是基于AMQP协议实现的并且兼容JMS,ActiveMQ是基于JMS实现的。JMS和AMQP的区别在于:JMS面向纯java平台不不支持跨平台而AMQP是可以跨平台,假如后台服务有用PHP编写则可以兼容。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jqI3OCPa-1673534274035)(null)]
JMS和AMQP的简单对比 :
①AMQP的消息模型中direct exchange是类比JMS中P2P(Queue),AMQP的其它四种消息模型则是类比于JMS的Topic
②JMS支持的各种消息类型,AMQP只支持byte[]但也无妨最后都可以json序列化后传输
Spring对JMS、AMQP都是支持的并且提供了自动配置和常用注解
4.3 RabbitMQ概念
RabbitMQ简介:
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
核心概念
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别。
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面,等待消费者连接到这个队列将其取走。
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。
Exchange 和Queue的绑定可以是多对多的关系。
Connection
网络连接,比如一个TCP连接。
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都 是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥 有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时 指定,RabbitMQ 默认的 vhost 是 / 。
Broker
表示消息队列服务器实体。
总结:
工作流程: 首先,生产者客户端会向消息中间件服务器发送Message,Message由消息头(一些参数设置)和消息体(真正的消息内容)组成,发送消息的时候还要指定交换机,交换机和队列具有绑定关系:交换机通过路由键将消息发送给指定的接收队列,消费者监听了这个队列,队列中的内容就会被消费者实时拿到(通过信道拿到)。
无论是生产者发送消息还是消费者客户端者接收消息需要使用connnection去和RabbitMQ创建一个长连接,长连接类似于高速公里,信道类似于高速公路中的每个车道:收发数据都在连接中开辟信道进行收发;
RabbitMQ还有一个虚拟主机即类似于Docker中的容器彼此互不干扰,不需要创建多个RabbitMQ只需要创建多个虚拟机即可实现向java后台、PHP后台发送消息。
长连接的好处是当客户端宕机之后,RabbitMQ实时感知有消费者下线,继而将不会向消费者客户端发送消息而是将消息持久化存储起来保证消息不会大面积丢失。
4.4 RabbitMQ安装
docker安装RabbitMQ命令:
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
4369, 25672 (Erlang发现&集群端口)
5672, 5671 (AMQP端口)
15672 (web管理后台端口)
61613, 61614 (STOMP协议端口)
1883, 8883 (MQTT协议端口)
rabbitmq相应文档
自启动:
docker update rabbitmq --restart=always
ps:docker 安装 rabbitmq,拉取不成功,提示:
docker: Error response from daemon: Get “https://registry-1.docker.io/v2/”: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers).
重启虚拟机,多重启几次。
登录地址: ip:15672
首次登录的账号密码都是:guest
OverView介绍:
查看对应的协议和端口号
RabbitMQ配置文件的迁移,从老版本的RabbitMQ中下载配置文件
上传至新版本的RabbitMQ配置文件
Connections介绍:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ouvzacLo-1673534273827)(null)]
Channels介绍:
Exchanges介绍:
添加新的交换机
队列介绍:
Admin介绍:
虚拟主机介绍:
查看自己创建的虚拟主机:
进入后可配置权限
删除虚拟主机
设置最大连接数
显示集群消息
4.5 RabbitMQ运行机制
AMQP 中的消息路由
AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和
Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交 换器的消息应该发送到那个队列。
Exchange 类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接 看另外三种类型:
-
消息中的路由键(routing key)如果和Binding 中的 binding key 一致, 交换器 就将消息发到对应的队列中。路由键与队 列名完全匹配,如果一个队列绑定到交换 机要求路由键为“dog”,则只转发 routingkey 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard” 等等。它是完全匹配、单播的模式。
-
每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。fanout交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。
fanout 类型转发消息是最快的。
-
topic 交换器通过模式匹配分配消息的 路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。 它将路由键和绑定键的字符串切分成单 词,这些单词之间用点隔开。它同样也 会识别两个通配符:符号“#”和符号“ * ”。#匹配0个或多个单词**,***匹配一 个单词。
创建一个交换机
创建一个队列
将交换机与队列进行绑定
Unbind:可以解除绑定
-
测试交换机 的三种类型
直接交换机:精确匹配路由键
根据这张图创建所有要用的交换机、队列、以及绑定关系
依次创建4个队列
1.创建 直接交换机
绑定关系
发布消息
查看消息
Nack:表示收到了消息不告诉服务器,消息队列中的消息数量不会减少
ACK:告诉服务器收到了消息,队列中的消息就没了
2.Fanout-Exchange
扇形交换机是广播的方式,与其绑定的队列,无论是否有路由键都会收到消息
创建扇形交换机
绑定队列
发布消息.
3.Topic-Exchange
主题交换机用于模糊匹配,#匹配0个或多个单词,*匹配一个单词
添加交换机
绑定队列
发布消息
4.6 SpringBoot整合RabbitMQ
<!--引入 操作Rabbitmq依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置
配置文件前缀如下图所示:
application.properties配置文件配置:
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
3. 启动RabbitMQ
主启动类加上 @EnableRabbit注解
/**
* 使用RabbitMQ
* 1、引入amqp场景:RabbitAutoConfiguration 就会自动生效
*
* 2、给容器中自动配置了
* RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate;
* 所有的属性都是 spring.rabbitmq
* @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
* public class RabbitProperties
*
* 3、给配置文件中配置 spring.rabbitmq 信息
* 4、@EnableRabbit :@EnableXxxx:开启功能
*
*/
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(GulimallOrderApplication.class, args);
}
}
4.7 AmqpAdmin使用
1.使用AmqpAdmin创建交换机、队列、绑定关系
交换机的类型如下图所示
ps:快捷键 f4:查看继承关系
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallOrderApplicationTests {
@Autowired
AmqpAdmin amqpAdmin;
/**
* 1、如何创建 Exchange 、Queue 、Binding
* 1)、使用 AmqpAdmin 进行创建
* 2、如何收发消息
*/
@Test
public void createExchange() {
//amqpAdmin
//Exchange
/**
* public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
* durable:持久化
* autoDelete:自动删除
*/
DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false, null);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange[{}]创建成功","hello-java-exchange");
}
2.创建队列
@Test
public void createQueue(){
//public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
Queue queue = new Queue("hello-java-queue", true, false, false);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功","hello-java-queue");
}
3.绑定
@Test
public void createBinding(){
/**
* public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
* Map<String, Object> arguments)
* destination:目的地
* destinationType:目的地类型
* exchange:交换机
* routingKey:路由键
* arguments:自定义参数
* 将 exchange指定的交换机和 destination目的地进行绑定,使用routingKey作为指定的路由键
*/
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",
null);
amqpAdmin.declareBinding(binding);
log.info("Binding[{}]创建成功","hello-java-binding");
}
4.8 RabbitTemplate使用
1.使用RabbitTemplate工具类发送json类型消息
前提条件:给容器中注入json转化器
如果容器中有相应配置,就用配置的,没有就用默认的:发送序列化后的数据。
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest(){
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("哈哈");
//1、发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去,对象必须实现Serializable
String msg = "Hello World!";
//2、发送的对象类型的消息,可以是一个JSON
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",reasonEntity);
log.info("消息发送完成{}",reasonEntity);
}
自己编写配置,实现发送JSON数据。
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
4.9 RabbitListener&RabbitHandler接收消息
@RabbitListener使用前提:必须有@EnableRabbit并且标注方法的类必须在组件中
@RabbitListener标注类上监听多个队列
@RabbitHandler标注在方法上用于接受不同类型的消息对象
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
@Override
public PageUtils queryPage(Map<String, Object> params) {
IPage<OrderItemEntity> page = this.page(
new Query<OrderItemEntity>().getPage(params),
new QueryWrapper<OrderItemEntity>()
);
return new PageUtils(page);
}
/**
* queues:声明需要监听的所有队列
*
* org/springframework/amqp/core/Message
*
* 参数可以写一下类型
* 1、Message message:原生消息详细消息。头+体
* 2、T<发送的消息的类型> OrderReturnReasonEntity content:假如发送消息的类型为 OrderReturnReasonEntity 则接受的消息类型也可以为 OrderReturnReasonEntity
* 3、Channel channel:当前传输数据的通道
*
*
* Queue: 可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
* 场景:
* 1)、订单服务启动多个;同一个消息,只能有一个客户端收到
* 2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息
* @param message
* @param content
* @param channel
* @throws InterruptedException
*/
// @RabbitListener(queues = {"hello-java-queue"})
@RabbitHandler
public void receiveMessage(Message message,
OrderReturnReasonEntity content,
Channel channel) throws InterruptedException {
// (id=1, name=哈哈, sort=null, status=null, createTime=Sun Dec 11 22:17:53 CST 2022)
System.out.println("接收到消息..."+content);
byte[] body = message.getBody();
//消息头属性
MessageProperties properties = message.getMessageProperties();
// Thread.sleep(3000);
System.out.println("消息处理完成=>" + content.getName());
}
模拟多个客户端监听Queue。只要收到消息,队列就删除消息,而且只能有一个客户端收到此消息的场景:
1.订单服务启动多个,同一个消息,只能有一个客户端收到
模拟多个订单服务
模拟发送多个消息
@RestController
public class RabbitController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendMq")
public String sendMq(@RequestParam(value = "num",defaultValue = "10") Integer num){
for (int i = 0; i < num; i++) {
if (i % 2 == 0){
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("哈哈-"+i);
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
}else{
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity,new CorrelationData(UUID.randomUUID().toString()));
}
}
return "ok";
}
}
结果查看,说明:同一个消息,只能有一个客户端收到
9000
9001
2. 只有当一个消息完全处理完,方法运行结束,客户端才可以接收下一个消息
模拟复杂业务逻辑
结果查看
@RabbitListener标注类上监听多个队列
@RabbitHandler标注在方法上用于接受不同类型的消息对象
模拟向队列发送不同消息对象
@RabbitHandler标注在方法上,重载区分不同的消息
@RabbitHandler
public void receiveMessage(Message message,
OrderReturnReasonEntity content,
Channel channel) throws InterruptedException {
// (id=1, name=哈哈, sort=null, status=null, createTime=Sun Dec 11 22:17:53 CST 2022)
System.out.println("接收到消息..."+content);
byte[] body = message.getBody();
//消息头属性
MessageProperties properties = message.getMessageProperties();
// Thread.sleep(3000);
System.out.println("消息处理完成=>" + content.getName());
}
@RabbitHandler
public void receiveMessage2(OrderEntity content,
Channel channel) throws InterruptedException {
System.out.println("接收到消息..."+content.getClass());
System.out.println("消息处理完成=>" + content.getOrderSn());
}
查看结果
4.10 可靠投递-发送端确认
ComfirmCallBack:当生成者发送消息给broker,broker接收时触发的回调函数
开启ComfirmCallBack回调函数逇步骤:
①开启
# 开启发送端确认
spring.rabbitmq.publisher-confirms=true
②编写回调函数
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 定制 RabbitTemplate
* 1、服务器收到消息就回调
* 1、spring.rabbitmq.publisher-confirms=true
* 2、设置确认回调ConfirmCallback
*/
@PostConstruct // MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate(){
//设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id)
* @param ack 消息是否成功收到
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm...CorrelationData["+correlationData+"]==>ack["+ack+"]==>cause["+cause+"]");
}
});
}
测试:
ReturnCallBack:当交换机由于某些原因未将消息传送到指定的队列时,触发的回调函数
开启ReturnCallBack回调函数的步骤:
①开启
# 开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列,以异步方式优先回调我们这个returnConfirm
spring.rabbitmq.template.mandatory=true
②编写回调函数
/**
* 2、消息正确地抵达队列进行回调
* 1、spring.rabbitmq.publisher-returns=true
* spring.rabbitmq.template.mandatory=true
* 2、设置确认回调ReturnCallback
* 3、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)。
*/
@PostConstruct // MyRabbitConfig对象创建完成以后,执行这个方法
//@PostConstruct该注解被用来修饰一个非静态的void()方法。被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。PostConstruct在构造函数之后执行,init()方法之前执行。
public void initRabbitTemplate(){
//设置消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* @param message 投递失败的消息详细信息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 当时这个消息发给哪个交换机
* @param routingKey 当时这个消息用哪个路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]==>replyText["+replyText+"]==>exchange["+exchange+"]==>["+routingKey+"]");
}
});
}
发送消息时还可以设置消息的唯一id
测试:失败回调
4.11 可靠投递-消费端确认
说明: 消费者消费消息默认采用的是自动ACK也就是自动签收,broker通过通道将消息都传递给你之后自动将消息移除队列,这个就是自动ACK。采用自动ACK将会出现一些问题:当消费者接收到许多条消息时,依次处理这些消息但是在此期间宕机了将会导致后续未处理的消息丢失。
解决方案:手动ACK
手动ACK可以看作是签收操作。
①配置
#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
②手动ACK回复
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZPZyXsWy-1673534390207)(null)]
@RabbitHandler
public void receiveMessage(Message message,
OrderReturnReasonEntity content,
Channel channel) throws InterruptedException {
// (id=1, name=哈哈, sort=null, status=null, createTime=Sun Dec 11 22:17:53 CST 2022)
System.out.println("接收到消息..."+content);
byte[] body = message.getBody();
//消息头属性
MessageProperties properties = message.getMessageProperties();
// Thread.sleep(3000);
System.out.println("消息处理完成=>" + content.getName());
//在Channel 内是 按顺序自增的。
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag==>" + deliveryTag);
//签收货物,非批量模式
try{
if (deliveryTag % 2 == 0){
//收货
channel.basicAck(deliveryTag,false);
System.out.println("签收了货物..."+ deliveryTag);
}else{
// 退货 requeue=false 丢弃 requeue=true 发回服务器,服务器重新入队。
//long deliveryTag, boolean multiple, boolean requeue
channel.basicNack(deliveryTag,false,true);
//long deliveryTag, boolean requeue
// channel.basicReject();
System.out.println("没有签收货物..." + deliveryTag);
}
}catch (Exception e){
//网络中断
}
}
如何签收:
业务成功就应该签收:channel.basicAck(deliveryTag,false);
业务处理失败就应该拒签,让别人处理:channel.basicNack(deliveryTag,false,true);
ps:总结
* 定制 RabbitTemplate * 1、服务收到消息就回调 P-----B * 1、spring.rabbitmq.publisher-confirms=true * 2、设置确认回调ConfirmCallback * 2、消息正确地抵达队列进行回调 B --- Q * 1、spring.rabbitmq.publisher-returns=true * spring.rabbitmq.template.mandatory=true * 2、设置确认回调ReturnCallback * 3、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)。 Q ----- C * spring.rabbitmq.listener.direct.acknowledge-mode=manual 手动签收 * 1、默认是自动确认的,只要消息接收到,客户端就会自动确认,服务端就会移除这个消息 * 问题: * 我们收到很多消息,自动回复给服务器ack,只有一个头消息处理成功,宕机了。发生消息丢失; * 手动确认模式:只要我们没有明确告诉MQ,货物被签收,没有Ack ,消息就一直是 unacked 状态。即使 Consumer宕机,消息不会丢失,会重新变为 Ready状态。 * 下一次有新的Consumer连接进来就发送给它。 * 2、如何签收: * channel.basicNack(deliveryTag,false,false);签收;业务成功就应该签收 * channel.basicNack(deliveryTag,false,true);拒签;业务失败,拒签
4.12 可靠投递-Ack消息确认机制
-
消费者获取到消息,成功处理,可以回复Ack给Broker
- basic.ack用于肯定确认;broker将移除此消息
- basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
- basic.reject用于否定确认;同上,但不能批量
-
默认自动ack,消息被消费者收到,就会从broker的queue中移除
-
queue无消费者,消息依然会被存储,直到消费者消费
-
消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式
- 消息处理成功,ack(),接受下一个消息,此消息broker就会移除
- 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
- 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人。
4.13 RabbitMQ延时队列(实现定时任务)
使用消息队列的目的是:保证数据的最终一致性。
场景:
比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。
常用解决方案:
spring的 schedule 定时任务轮询数据库
缺点:
- 消耗系统内存、增加了数据库的压力、存在较大的时间误差
解决:rabbitmq的消息TTL和死信Exchange结合
采用定时任务的方式:每隔一段时间进行全表的扫描,会消耗系统内存和增加数据库的压力,最致命的是存在较大的时间误差
假如:10:00定时任务开始执行,则10:01有用户下订单但未支付,10:30的时候定时任务再次执行,这个订单还差1分钟才能进行关单操作,因此,下一次扫描到它要等到11:00,存在着29分钟的误差时间。
采用消息队列可以完美的解决定时任务所带来的缺陷
假如:10:00下订单,再下订单之前先给消息队列发送一条下单消息,等30分钟自动发送关闭订单消息,监听服务收到消息,去查看此订单是否完成支付,若未完成支付则关闭订单。误差也就一两分钟。对于解锁库存也是同理。
如果恰好在一次扫描后完成业务逻辑,那么就会等待两个扫描周期才能扫到过期的订单,不能保证时效性
延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
设置消息的过期时间: 在过期时间内都没有被消费则此消息将会被丢弃并称之为死信
设置队列的过期时间:在此过期时间内都没有队列被客户端连接则队列里的所有消息都被成为死信
PS:一些相关知识解释:
消息的TTL(Time To Live)
- 消息的TTL就是消息的存活时间。
- RabbitMQ可以对队列和消息分别设置TTL。
- 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
- 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者xmessage-ttl属性来设置时间,两者是一样的效果。
死信路由: 消息过期未被消费的,则消息会被交给一个指定的路由器,这个路由器由于只接收死信所以被成为死信路由
Dead Letter Exchanges(DLX)
-
一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。(什么是死信)
- 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
- 上面的消息的TTL到了,消息过期了。
- 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
-
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
-
我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列。
-
手动ack&异常消息统一放在一个队列处理建议的两种方式
- catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
- 给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败
TTL:RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
死信路由DLX:RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vEOl6L68-1673534381161)(E:\books\文档\谷粒商城之高级篇知识补充\image-20221228174738585.png)]
RabbitMQ实现延时队列的原理:通过设置队列的过期时间使消息变成死信,此队列是不能被任何服务监听的,当消息过期【成为死信】后,因为给队列设置了特定的参数,所以rabbitmq 将死信取出后不会丢弃,而是通过 x-dead-letter-exchange参数的设置:出现dead letter之后将dead letter重新发送到指定exchange,也就是 死信交换机;此外通过 x-dead-letter-routing-key参数的设置:出现dead letter之后将dead letter重新按照指定的routing-key发送,也就是说死信交换机通过指定的路由键找到指定绑定关系的队列,最终的结果就是:死信跑到了指定的队列中去,如果有消费者监听了这个队列里面的内容【这个队列里面的内容一定是生产者发的消息过期了之后的】,那么就可以做相应的操作了。
指定队列只接收死信也就是延时消息,服务器专门监听指定队列从而达到定时任务的效果。
实现1:给队列设置过期时间,推荐使用
实现方式2:给消息设置过期时间,不推荐使用
不推荐使用的原因是:RabbitMQ采用的是懒检查,假如第一个消息设置的是5分钟过期,第二个消息设置的是2分钟过期,第三个消息设置的是30s过期,RabbitMQ过来一看消息5分钟后才过期,那么5分钟之后才会来将消息路由并不会关注后面消息的过期时间。
4.14 延时队列定时关单模拟
按照下图逻辑,模拟下单成功1分钟后,收到关闭订单的消息
订单分布式主体逻辑
- 订单超时未支付触发订单过期状态修改与库存解锁
创建订单时消息会被发送至队列order.delay.queue,经过TTL的时间后消息会变成死信以order.release.order的路由键经交换机转发至队列order.release.order.queue,再通过监听该队列的消息来实现过期订单的处理
- 如果该订单已支付,则无需处理
- 否则说明该订单已过期,修改该订单的状态并通过路由键order.release.other发送消息至队列stock.release.stock.queue进行库存解锁
- 库存锁定后延迟检查是否需要解锁库存
在库存锁定后通过路由键stock.locked发送至延迟队列stock.delay.queue,延迟时间到,死信通过路由键stock.release转发至stock.release.stock.queue,通过监听该队列进行判断当前订单状态,来确定库存是否需要解锁
- 由于关闭订单和库存解锁都有可能被执行多次,因此要保证业务逻辑的幂等性,在执行业务是重新查询当前的状态进行判断
- 订单关闭和库存解锁都会进行库存解锁的操作,来确保业务异常或者订单过期时库存会被可靠解锁
编写队列、交换机,绑定关系
容器中的 Binding、Queue、Exchange 都会自动创建(RabbitMQ没有的情况)
RabbitMQ只要有,@Bean声明的属性发生变化也不会覆盖
这个地方提前将库存解锁的要用到的一个交换机和两个队列创建
@Configuration
public class MyMQConfig {
//@Bean Binding, Queue, Exchange
/**
* 延时队列
* @Bean注解 可以将 容器中的 Binding, Queue, Exchange 都会自动创建(RabbitMQ没有的情况下)
* RabbitMQ 中只要有, @Bean声明属性发生变化也不会覆盖
* @return
*/
@Bean
public Queue orderDelayQueue(){
Map<String, Object> arguments = new HashMap<>();
/**
* x-dead-letter-exchange: order-event-exchange
* x-dead-letter-routing-key: order
* x-message-ttl: 60000
*/
//死信交换机
arguments.put("x-dead-letter-exchange","order-event-exchange");
//死信路由键
arguments.put("x-dead-letter-routing-key","order.release.order");
// 消息过期时间 1分钟
arguments.put("x-message-ttl",60000);
//String name--- 队列名字, boolean durable ---是否持久化, boolean exclusive---是否排他,
// boolean autoDelete---是否自动删除, Map<String, Object> arguments--属性map
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
return queue;
}
/**
* 普通队列
*/
@Bean
public Queue orderReleaseOrderQueue(){
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}
@Bean
public Exchange orderEventExchange(){
//String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
return new TopicExchange("order-event-exchange", // name
true,// durable
false); // autoDelete
// 还有一个参数是Map<String, Object> arguments
}
/**
* 创建订单的binding
*/
@Bean
public Binding orderCreateOrderBinding(){
//String destination, DestinationType destinationType, String exchange, String routingKey,
// Map<String, Object> arguments
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
@Bean
public Binding orderReleaseOrderBinding(){
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
}
监听关单事件
MyMQConfig
/**
* 模拟订单1分钟过期后变成死信,然后控制台打印关闭订单
*/
@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
System.out.println("收到过期的订单消息:准备关闭订单" + entity.getOrderSn());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
模拟订单完成
HelloController
@Autowired
RabbitTemplate rabbitTemplate;
@ResponseBody
@GetMapping("/test/createOrder")
public String testCreateOrder(){
//订单下单成功
OrderEntity entity = new OrderEntity();
entity.setOrderSn(UUID.randomUUID().toString());
entity.setModifyTime(new Date());
//给MQ发送消息
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity);
return "ok";
}
1分钟后:控制台打印
5 性能压测
5.4 性能压测
5.4.1 压力测试-基本介绍
1)、介绍
压力测试考察当前软硬件环境下系统所能承受的最大负荷并帮助找出系统瓶颈所在。压测都是为了系统在线上的处理能力和稳定性维持在一个标准范围内,做到心中有数。
使用压力测试,我们有希望找到很多种用其他测试方法更难发现的错误。有两种错误类型是:
- 内存泄漏
- 并发与同步。
有效的压力测试系统将应用以下这些关键条件:重复,并发,量级,随机变化。
2)、性能指标
3)、jvm 内存模型
5.4.2 压力测试-jemter安装与使用
1)、下载与安装
- 地址 :https://www.apache.org/dist/jmeter/binaries/apache-jmeter-5.5.zip.sha512
- 直接解压即可。运行bin/jmeter.bat文件。
2)、解决jmeter打不开问题
3)、使用
- 设置简体中文
- 添加线程组
- 右键点击 "测试计划” → “添加” → “线程(用户)” → “线程组”
-
配置线程组参数
线程组主要参数详解:
线程数:虚拟用户数。一个虚拟用户占用一个进程或线程。模拟多少用户访问也就填写多少个线程数量。
Ramp-Up时间(秒):设置的虚拟用户数需要多长时间全部启动。如果线程数为
100
,准备时长为5
,那么需要
5
秒钟启动100
个线程,也就是每秒钟启动20
个线程。 相当于每秒模拟20
个用户进行访问,设置为零我理解为并发访问。循环次数:如果线程数为
100
,循环次数为100
。那么总请求数为100*100=10000
。如果勾选了“永远”,那么所有线程会一直发送请求,直到选择停止运行脚本。Delay Thread creation until needed:直到需要时延迟线程的创建。
调度器:设置线程组启动的开始时间和结束时间(配置调度器时,需要勾选循环次数为永远)
持续时间(秒):测试持续时间,会覆盖结束时间
启动延迟(秒):测试延迟启动时间,会覆盖启动时间
启动时间:测试启动时间,启动延迟会覆盖它。当启动时间已过,手动只需测试时当前时间也会覆盖它。
结束时间:测试结束时间,持续时间会覆盖它。
- 添加测试接口
-
右键点击 “你的线程组” → “添加” → “取样器” → “HTTP请求”
-
填写接口请求参数,我这里对本地的
Spring-boot
服务进行测试
Http请求主要参数详解
协议:向目标服务器发送HTTP请求协议,可以是
HTTP
或HTTPS
,默认为HTTP
。服务器名称或IP :
HTTP
请求发送的目标服务器名称或IP
。端口号:目标服务器的端口号,默认值为80
方法:发送
HTTP
请求的方法,可用方法包括GET
、POST
、HEAD
、PUT
、OPTIONS
、TRACE
、DELETE
等。路径:目标
URL
路径(URL
中去掉服务器地址、端口及参数后剩余部分)。内容编码:编码方式,默认为
ISO-8859-1
编码,这里配置为utf-8
。参数:同请求一起发送参数 ,在请求中发送的
URL
参数,用户可以将URL
中所有参数设置在本表中,表中每行为一个参数(对应URL
中的key=value
),注意参数传入中文时需要勾选“编码”
- 添加监听器—查看结果树
- 右键点击 “你的线程组” → “添加” → “监听器” → “察看结果树”
- 这里,我们修改响应数据格式(你返回什么格式就选什么,我这里是返回json),运行Http请求,可以看到本次请求返回的响应数据。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tiTG4Twe-1673534390491)(null)]
- 添加监听器—聚合报告
- 右键点击 “你的线程组” → “添加” → “监听器” → “聚合报告”,用以存放性能测试报告
- 添加监听器—汇总图
这些都可以勾选,显示图表。
- 性能测试
- 为了测试出效果,我这里模拟100个用户并发访问获取数据,循环6次,线程组数据修改如下:
- 回到聚合报告运行本次压力测试
- 分析测试报告(先得让本次压力测试运行完毕)
聚合报告参数详解:
Label:每个
JMeter
的element
(例如我这里只有一个Spring WebFlux
)都有一个Name
属性,这里显示的就是Name
属性的值。样本(Samples):请求数——表示这次测试中一共发出了多少个请求,我这里模拟了
100
个用户循环6
次也就为100*6=600
。平均值(Average):平均响应时间(单位:
ms
)。默认是单个Request
的平均响应时间,当使用了Transaction Controller
时,也可以是Transaction
为单位显示平均响应时间。中位数(Median):也就是
50%
用户的响应时间。90% 百分位(Line):
90%
用户的响应时间。相邻几个*%
同意。最小值(Min):最小响应时间。
最大值(Max):最大响应时间。
异常(Error) %:错误率——错误请求数/请求总数。
吞吐量(Throughput):吞吐量——默认情况下表示每秒完成的请求数(
Request per Second
),当使用了Transaction Controller
时,也可以表示类似LoadRunner
的Transaction per Second
数 。接收 KB/Sec:每秒从服务器端接收到的数据量,相当于
LoadRunner
中的Throughput/Sec
。发送 KB/Sec:每秒向服务器发送的数据量,相当于
LoadRunner
中的Throughput/Sec
。
结果分析
有错误率同开发确认,确定是否允许错误的发生或者错误率允许在多大的范围内;
Throughput 吞吐量每秒请求的数大于并发数,则可以慢慢的往上面增加;若在压测的机器性能很好的情况下,出现吞吐量小于并发数,说明并发数不能再增加了,可以慢慢的
往下减,找到最佳的并发数;压测结束,登陆相应的web 服务器查看CPU 等性能指标,进行数据的分析;
最大的tps,不断的增加并发数,加到tps 达到一定值开始出现下降,那么那个值就是
最大的tps。最大的并发数:最大的并发数和最大的tps 是不同的概率,一般不断增加并发数,达到一个值后,服务器出现请求超时,则可认为该值为最大的并发数。
压测过程出现性能瓶颈,若压力机任务管理器查看到的cpu、网络和cpu 都正常,未达到90%以上,则可以说明服务器有问题,压力机没有问题。
影响性能考虑点包括:
- 数据库、应用程序、中间件(tomact、Nginx)、网络和操作系统等方面
首先考虑自己的应用属于CPU 密集型还是IO 密集型
5.4.3 压力测试- jmeter在windows下地址占用bug解决
JMeter Address Already in useJMeter Address Already in use
5.4.4 性能监控-堆内存与垃圾回收
伊甸园区能放下,就放,放不下,小GC一次,存活的放幸存者区;实在放不下,就放老年代。老年代如果还放不下,就来一个大GC,将新生代和老年代内存清理;放得下就放,放不下就报异常。
YGC触发的时机:伊甸园区的内存不够了;
FGC触发的时间:老年代的内存都不够了;
FGC要比 YGC慢很多我们要避免 FGC次数太多,影响整体性能。
老年代主要放:存活时间久;大对象
伊甸园区GC之后,将放到 幸存者区。
visual GC 垃圾回收机制:
要避免 FGC,因为 FGC比 YGC 花费的时间多很多。
补充知识:
图解Java 垃圾回收机制
垃圾回收算法共有 4种:①标记清算算法;②复制算法;③标记整理算法;④分代收集算法。
这里主要讲第④种。
1. 分代收集算法
对于一个大型的系统,当创建的对象和方法变量比较多时,堆内存中的对象也会比较多,如果逐一分析对象是否该回收,那么势必造成效率低下。分代收集算法是基于这样一个事实:不同的对象的生命周期(存活情况)是不一样的,而不同生命周期的对象位于堆中不同的区域,因此对堆内存不同区域采用不同的策略进行回收可以提高 JVM 的执行效率。当代商用虚拟机使用的都是分代收集算法:新生代对象存活率低,就采用复制算法;老年代存活率高,就用标记清除算法或者标记整理算法。Java堆内存一般可以分为新生代、老年代和永久代三个模块。
1). 新生代(Young Generation)
新生代的目标就是尽可能快速的收集掉那些生命周期短的对象,一般情况下,所有新生成的对象首先都是放在新生代的。新生代内存按照 8:1:1 的比例分为一个eden区和两个survivor(survivor0,survivor1)区,大部分对象在Eden区中生成。在进行垃圾回收时,先将eden区存活对象复制到survivor0区,然后清空eden区,当这个survivor0区也满了时,则将eden区和survivor0区存活对象复制到survivor1区,然后清空eden和这个survivor0区,此时survivor0区是空的,然后交换survivor0区和survivor1区的角色(即下次垃圾回收时会扫描Eden区和survivor1区),即保持survivor0区为空,如此往复。特别地,当survivor1区也不足以存放eden区和survivor0区的存活对象时,就将存活对象直接存放到老年代。如果老年代也满了,就会触发一次FullGC,也就是新生代、老年代都进行回收。注意,新生代发生的GC也叫做MinorGC,MinorGC发生频率比较高,不一定等 Eden区满了才触发。
2). 老年代(Old Generation)
老年代存放的都是一些生命周期较长的对象,就像上面所叙述的那样,在新生代中经历了N次垃圾回收后仍然存活的对象就会被放到老年代中。此外,老年代的内存也比新生代大很多(大概比例是1:2),当老年代满时会触发Major GC(Full GC),老年代对象存活时间比较长,因此FullGC发生的频率比较低。
3). 永久代(Permanent Generation)
永久代主要用于存放静态文件,如Java类、方法等。永久代对垃圾回收没有显著影响,但是有些应用可能动态生成或者调用一些class,例如使用反射、动态代理、CGLib等bytecode框架时,在这种时候需要设置一个比较大的永久代空间来存放这些运行过程中新增的类。
2.小结:
由于对象进行了分代处理,因此垃圾回收区域、时间也不一样。垃圾回收有两种类型,Minor GC 和 Full GC。
-
Minor GC:对新生代进行回收,不会影响到年老代。因为新生代的 Java 对象大多死亡频繁,所以 Minor GC 非常频繁,一般在这里使用速度快、效率高的算法,使垃圾回收能尽快完成。
-
Full GC:也叫 Major GC,对整个堆进行回收,包括新生代和老年代。由于Full GC需要对整个堆进行回收,所以比Minor GC要慢,因此应该尽可能减少Full GC的次数,导致Full GC的原因包括:老年代被写满、永久代(Perm)被写满和System.gc()被显式调用等。
3. 垃圾收集器
如果说垃圾收集算法是内存回收的方法论,那么垃圾收集器就是内存回收的具体实现。下图展示了7种作用于不同分代的收集器,其中用于回收新生代的收集器包括Serial、PraNew、Parallel Scavenge,回收老年代的收集器包括Serial Old、Parallel Old、CMS,还有用于回收整个Java堆的G1收集器。不同收集器之间的连线表示它们可以搭配使用。
Serial收集器(复制算法): 新生代单线程收集器,标记和清理都是单线程,优点是简单高效;
Serial Old收集器 (标记-整理算法): 老年代单线程收集器,Serial收集器的老年代版本;
ParNew收集器 (复制算法): 新生代收并行集器,实际上是Serial收集器的多线程版本,在多核CPU环境下有着比Serial更好的表现;
Parallel Scavenge收集器 (复制算法): 新生代并行收集器,追求高吞吐量,高效利用 CPU。吞吐量 = 用户线程时间/(用户线程时间+GC线程时间),高吞吐量可以高效率的利用CPU时间,尽快完成程序的运算任务,适合后台应用等对交互相应要求不高的场景;
Parallel Old收集器 (标记-整理算法): 老年代并行收集器,吞吐量优先,Parallel Scavenge收集器的老年代版本;
CMS(Concurrent Mark Sweep)收集器(标记-清除算法): 老年代并行收集器,以获取最短回收停顿时间为目标的收集器,具有高并发、低停顿的特点,追求最短GC回收停顿时间。
G1(Garbage First)收集器 (标记-整理算法): Java堆并行收集器,G1收集器是JDK1.7提供的一个新收集器,G1收集器基于“标记-整理”算法实现,也就是说不会产生内存碎片。此外,G1收集器不同于之前的收集器的一个重要特点是:G1回收的范围是整个Java堆(包括新生代,老年代),而前六种收集器回收的范围仅限于新生代或老年代。
4.内存分配与回收策略
Java技术体系中所提倡的自动内存管理最终可以归结为自动化地解决了两个问题:给对象分配内存 以及 回收分配给对象的内存。一般而言,对象主要分配在新生代的Eden区上,如果启动了本地线程分配缓存(TLAB),将按线程优先在TLAB上分配。少数情况下也可能直接分配在老年代中。总的来说,内存分配规则并不是一层不变的,其细节取决于当前使用的是哪一种垃圾收集器组合,还有虚拟机中与内存相关的参数的设置。
1) 对象优先在Eden分配,当Eden区没有足够空间进行分配时,虚拟机将发起一次MinorGC。现在的商业虚拟机一般都采用复制算法来回收新生代,将内存分为一块较大的Eden空间和两块较小的Survivor空间,每次使用Eden和其中一块Survivor。 当进行垃圾回收时,将Eden和Survivor中还存活的对象一次性地复制到另外一块Survivor空间上,最后处理掉Eden和刚才的Survivor空间。(HotSpot虚拟机默认Eden和Survivor的大小比例是8:1)当Survivor空间不够用时,需要依赖老年代进行分配担保。
2) <b><font color=red>大对象直接进入老年代</font></b>。所谓的大对象是指,需要大量连续内存空间的Java对象,最典型的大对象就是那种很长的字符串以及数组。
3) <b><font color=red>长期存活的对象将进入老年代。</font></b>当对象在新生代中经历过一定次数(默认为15)的Minor GC后,就会被晋升到老年代中。
4) <b><font color=red>动态对象年龄判定。</font></b>为了更好地适应不同程序的内存状况,虚拟机并不是永远地要求对象年龄必须达到了MaxTenuringThreshold才能晋升老年代,如果在Survivor空间中相同年龄所有对象大小的总和大于Survivor空间的一半,年龄大于或等于该年龄的对象就可以直接进入老年代,无须等到MaxTenuringThreshold中要求的年龄。
需要注意的是,Java的垃圾回收机制是Java虚拟机提供的能力,用于在空闲时间以不定时的方式动态回收无任何引用的对象占据的内存空间。也就是说,垃圾收集器回收的是无任何引用的对象占据的内存空间而不是对象本身。
5.4.5 性能监控-jvisualvm使用
- Jdk 的两个小工具jconsole、jvisualvm(升级版的jconsole);通过命令行启动,可监控本地和远程应用。远程应用需要配置。
1)、jvisualvm 能干什么
- 监控内存泄露,跟踪垃圾回收,执行时内存、cpu 分析,线程分析…
- [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-P4Uv7nDm-1673534391074)(null)]
- 运行:正在运行的
休眠:sleep
等待:wait
驻留:线程池里面的空闲线程
监视:阻塞的线程,正在等待锁
2)、安装插件方便查看gc
- Cmd 启动jvisualvm
- 工具->插件
-
如果503 错误解决:
-
打开网址https://visualvm.github.io/pluginscenters.html
-
cmd 查看自己的jdk 版本,找到对应的
-
复制下面查询出来的链接。并重新设置上即可
-
5.4.6 性能压测-优化-中间件对性能的影响
1)、中间件指标
2)、数据库指标
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VTaOAI1q-1673534390789)(null)]
5.4.7 JVM 分析&调优
1)、几个常用工具
2)、命令示例
3)、调优项
官方文档:https://docs.oracle.com/javase/8/docs/technotes/tools/unix/java.html#BGBCIEFC
5.4.8 性能压测(总)
按表格中进行压测。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mHRfptyG-1673534457190)(null)]
- 压测中间件
线程数这里统一 都是 50,避免设置太高导致 内存爆满等情况。
- Nginx
压测 1分10秒
汇总报告
聚合报告
docker stats :实时查看 状态
内存比较浪费,因为 Nginx 基本只做转发,并不实际处理请求,主要是空闲状态。
- 网关压测
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-scCd6yRD-1673534457573)(null)]
-
压测一个简单服务
IndexController
//压力测试
//简单服务
//搭配中间件
@ResponseBody
@GetMapping("/hello")
public String hello(){
return "hello";
}
- 简单服务 搭配 网关
- id: product_route
uri: lb://gulimall-product
predicates:
- Path=/api/product/**,/hello
filters:
- RewritePath=/api/(?<segment>/?.*), /$\{segment}
#http://localhost:88/api/product/gory/list/tree http://localhost:10000/product/gory/list/tree
- 网关 + Nginx + 简单服务
- 首页全量数据获取压测(包括静态资源)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LKF7bMHK-1673534456817)(null)]
- 压测首页一级菜单渲染
- 压测首页一级菜单渲染开缓存(优化1:开缓存:线上生产时需要;开发时关闭)
略有提升
-
压测 首页开缓存 、数据库优化、日志级别提高(这里的首页就是首页一级分类)
补充:日志等级由低到高:debug<info<warn<Error<Fatal;
优化2
数据库字段 parent_cid 没有索引的条件下
/**
* 查找 1级分类
* parent_cid = 0 或者 cat_level = 1
* @return
*/
@Override
public List<CategoryEntity> getLevel1Categorys() {
long l = System.currentTimeMillis();//压力测试
List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));
System.out.println("消耗时间:" + (System.currentTimeMillis() - l));//测试没有索引的条件下,具体时间耗费
return categoryEntities;
}
优化3
给 parent_cid 添加索引
- 压测 三级分类数据获取
吞吐量实在太低。
- 动静分离
动态请求转交给 微服务;静态请求直接返回。
后台服务专心处理动态资源,继而可以提升很大的吞吐量。
①将静态资源文件 放在 Nginx下:这里将商品服务 static下的index资源复制过来,商品服务下的可以删除。
②给index.html中的静态资源加上前缀。
css:
js:
img:
③在 gulimall.conf 中修改,添加 路径
测试
- 在开缓存,日志提高、有索引、动静分离的情况下,首页全量数据压测。
①先将 线程数设置为50进行测试,看优化效果。
确实有提升。
②将线程 设置 为 200,模拟 线上应用内存崩溃宕机。
可以看到 老年代以及伊甸园区内存都溢满了,FGC次数频繁:
这是因为我们给商品服务设置的内存太小了,仅有100M,所以很容易爆满。
idea控制台报错,堆溢出等问题:
首页访问不了,因为 线上应用期间,CPU 内存爆满、卡死,将应用挤下线。
优化:加大内存,避免 老年代和新生代 内存满了之后一直GC,拖慢时间。
200线程数压测。
伊甸园区内存可能爆满,GC次数多(频繁创建临时对象),老年代GC次数减少。
- 优化三级分类查询。
1.将数据库的多次查询变为一次,之前是嵌套查询,导致和数据库做很多次交互
baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", parentCid));
将上面的查询代码抽取为一个方法,继而减少 查询三级分类这个方法对数据库的多次查询,优化DB操作。
最终结果改造:
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {
/**
* 1.将数据库的多次查询变为一次,之前是嵌套查询,导致和数据库做很多次交互
* 后续查询操作从这次查询结果中取结果
*/
List<CategoryEntity> selectList = baseMapper.selectList(null);//不传任何条件,就代表查询所有。
//1.查出所有1级分类
List<CategoryEntity> level1Categorys = getParent_cid(selectList,0L);//1级分类的 父id 是 0
//2.封装数据
Map<String, List<Catelog2Vo>> parent_cid = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
//1.每一个的一级分类,查到这个一级分类的二级分类
List<CategoryEntity> categoryEntities = getParent_cid(selectList,v.getCatId());//将正在遍历的元素v的catId作为 parent_cid的值,就可以找出它的子分类
//2.封装上面的结果
List<Catelog2Vo> catelog2Vos = null;
if (categoryEntities != null) {
catelog2Vos = categoryEntities.stream().map(l2 -> {
Catelog2Vo catelog2Vo = new Catelog2Vo(v.getCatId().toString(), null, l2.getCatId().toString(), l2.getName());
//1.找到当前二级分类的三级分类,封装成 vo
List<CategoryEntity> level3Catelog = getParent_cid(selectList,l2.getCatId());
// 三级分类有数据的情况下
if (level3Catelog != null){
List<Catelog2Vo.Catelog3Vo> collect = level3Catelog.stream().map(l3 -> {
//2.封装成指定格式
Catelog2Vo.Catelog3Vo catelog3Vo = new Catelog2Vo.Catelog3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName());
return catelog3Vo;
}).collect(Collectors.toList());
catelog2Vo.setCatalog3List(collect);
}
return catelog2Vo;
}).collect(Collectors.toList());
}
return catelog2Vos;
}));
return parent_cid;
}
private List<CategoryEntity> getParent_cid(List<CategoryEntity> selectList,Long parent_cid) {
// return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", parentCid));
//从已知的集合 selectList (查询所有)中找 出 ParentCid == 指定的值
List<CategoryEntity> collect = selectList.stream().filter(item -> item.getParentCid() == parent_cid).collect(Collectors.toList());
return collect;
}
为了开发期间方便,改为100m
测试:
可以看出,即使优化了业务操作,其吞吐量也不是很高,所以我们接下来使用分布式系统中的性能提升大神器:缓存。合理运用缓存,可以给系统一个质的提升。
总结:
- 中间件越多,性能损失越大,大多都损失在网络交互了;
- 业务:
- Db(MySQL 优化)
- 模板的渲染速度(缓存)
- 静态资源
6 接口幂等性
6.1 什么是幂等性
6.2 哪些情况需要防止
6.3 什么情况下需要幂等
6.4 幂等解决方案
6.4.1 token 机制
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
6.4.2 各种锁机制
1)、数据库悲观锁
2)、数据库乐观锁
3)、业务层分布式锁
6.4.3 各种唯一约束
1)、数据库唯一约束
2)、redis set 防重
3)、防重表
4)、全局请求唯一 id
7 ElasticSearch
7.1 简介
全文搜索属于最常见的需求,开源的 Elasticsearch 是目前全文搜索引擎的首选。它可以快速地储存、搜索和分析海量数据。维基百科、Stack Overflow、Github 都采用它。
文档:
官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
官方中文:https://www.elastic.co/guide/cn/elasticsearch/guide/current/foreword_id.html
社区中文:
https://es.xiaoleilu.com/index.html
http://doc.codingdict.com/elasticsearch/0/
(其中中文文档都是比较旧的版本)
7.2 基本概念
1、Index(索引):
动词,相当于 MySQL 中的 insert;
名词,相当于 MySQL 中的 Database
2、Type(类型):
在 Index(索引)中,可以定义一个或多个类型。
类似于 MySQL 中的 Table;每一种类型的数据放在一起
3、Document(文档)
保存在某个索引(Index)下,某种类型(Type)的一个数据(Document),文档是 JSON 格式的,Document 就像是 MySQL 中的某个 Table 里面的内容
4、为什么ES搜索快? 倒排索引
7.3 Docker安装Elasticsearch
- 将虚拟机存储升到1G。基础篇是512m,因为要安装elasticsearch。直接在vb中设置即可
- 查看可用虚拟机内存
- 下载镜像文件
docker pull elasticsearch:7.4.2 存储和检索数据
docker pull kibana:7.4.2 可视化检索数据
注意版本要对应
- 创建实例
mkdir -p /mydata/elasticsearch/config
mkdir -p /mydata/elasticsearch/data
echo "http.host: 0.0.0.0" >> /mydata/elasticsearch/config/elasticsearch.yml
chmod -R 777 /mydata/elasticsearch/ 保证权限,否则启动不了。一会儿就自动停止
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 \
-e "discovery.type=single-node" \
-e ES_JAVA_OPTS="-Xms64m -Xmx512m" \
-v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \
-v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.4.2
以后在外面装好插件重启即可;
特别注意:
-e ES_JAVA_OPTS=“-Xms64m -Xmx256m” \ 测试环境下,设置 ES 的初始内存和最大内存,否则导致过大启动不了 ES
- 访问测试
出现下面json数据,即证明安装成功。
- 安装Kibana
docker run --name kibana -e ELASTICSEARCH_HOSTS=http://59.110.106.16:9200 -p 5601:5601 \
-d kibana:7.4.2
http://192.168.56.10:9200 一定改为自己虚拟机的地址
- 访问测试
59.110.106.16:5601
出现如下界面,即证明安装成功。
- 注意我们没有设置开机自启,所以我们每次均需要进行启动。su :从普通用户切换为root用户。密码是:vagrant
docker ps -a :查看以往启动记录。
docker start 容器ID
7.4 初步检索
7.4.1 _cat
GET /_cat/nodes:查看所有节点
GET /_cat/health:查看 es 健康状况
GET /_cat/master:查看主节点
GET /_cat/indices:查看所有索引 show databases;
7.4.2 索引一个文档(保存)
保存一个数据,保存在哪个索引的哪个类型下,指定用哪个唯一标识PUT customer/external/1;在 customer 索引下的 external 类型下保存 1 号数据为
# # 在customer索引下的external类型下保存1号数据
PUT customer/external/1
PUT http://59.110.106.16:9200/customer/external/1
Content-Type: application/json
{
"name": "John Doe"
}
返回数据
返回数据:
带有下划线开头的,称为元数据,反映了当前的基本信息。
{
"_index": "customer", 表明该数据在哪个数据库下;
"_type": "external", 表明该数据在哪个类型下;
"_id": "1", 表明被保存数据的id;
"_version": 1, 被保存数据的版本
"result": "created", 这里是创建了一条数据,如果重新put一条数据,则该状态会变为updated,并且版本号也会发生变化。
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 1,
"_primary_term": 1
}
PUT 和 POST 都可以,
POST 新增。如果不指定 id,会自动生成 id, 新增这个数据。指定 id 就会修改这个数据,并新增版本号
PUT 可以新增可以修改。PUT 必须指定 id;由于 PUT 需要指定 id,我们一般都用来做修改操作,不指定 id 会报错(405)。
PUT: 带ID,第一次是新增操作,第二次是更新操作,如果不带ID,报405错误。
7.4.3 查询文档
GET customer/external/1
### GET request with a header
GET http://59.110.106.16:9200/customer/external/1
Accept: application/json
返回数据:
{
"_index": "customer",
"_type": "external",
"_id": "1",
"_version": 2,
"_seq_no": 1, //并发控制字段,每次更新都会+1,用来做乐观锁
"_primary_term": 1,//同上,主分片重新分配,如重启,就会变化
"found": true,
"_source": { //存储的信息
"name": "John Doe"
}
}
乐观锁用法:通过“
if_seq_no=1&if_primary_term=1
”,当序列号匹配的时候,才进行修改,否则不修改。
示例:
当seq_no=1&primary_term=1时,就修改
###
PUT http://59.110.106.16:9200/customer/external/1?if_seq_no=1&if_primary_term=1
Content-Type: application/json
{
"name": "John Doe"
}
修改成功,修改之后seq_no变成2
7.4.4 更新文档
POST customer/external/1/_update
{
"doc":{
"name": "John Doew"
}
}
或者
POST customer/external/1
{
"name": "John Doe2"
}
或者
PUT customer/external/1
{
"name": "John Doe"
}
- 注意带不带_update的语法不同,带_update的话就需要加doc
- POST 操作会对比源文档数据,如果相同不会有什么操作,文档version 不增加;返回的result中是“noop”:no operation,不操作
- PUT 操作总会将数据重新保存并增加version 版本;
- 看场景:
- 对于大并发更新,不带update;
- 对于大并发查询偶尔更新,带update;对比更新,重新计算分配规则。
- 更新同时增加属性
发送之后,与元素据对比,发现相同,不做任何操作
POST http://59.110.106.16:9200/customer/external/1/_update
Content-Type: application/json
{
"doc":{
"name": "John"
}
}
7.4.5 删除文档&索引
DELETE customer/external/1
DELETE customer
注:elasticsearch并没有提供删除类型的操作,只提供了删除索引和文档的操作。
删除id=1的数据
DELETE http://59.110.106.16:9200/customer/external/1
删除customer索引
DELETE http://59.110.106.16:9200/customer
响应
{
"acknowledged": true
}
7.4.6 ES的批量操作——bulk
用Kibana之中的Dev Tools
示例1:
POST customer/external/_bulk
{"index":{"_id":"1"}} # index叫做索引,保存操作
{"name": "John Doe" }
{"index":{"_id":"2"}}
{"name": "Jane Doe" }
语法格式: 两行一组,第一行为操作,第二行为数据
{ action: { metadata }}\n
{ request body}\n
{ action: { metadata }}\n
{ request body}\n
这里的批量操作,当发生某一条执行发生失败时,其他的数据仍然能够接着执行,也就是说彼此之间是独立的。
bulk API 以此按顺序执行所有的 action(动作)。如果一个单个的动作因任何原因而失败,它将继续处理它后面剩余的动作。当 bulk API 返回时,它将提供每个动作的状态(与发送的顺序相同),所以您可以检查是否一个指定的动作是不是失败了。
返回数据
#! Deprecation: [types removal] Specifying types in bulk requests is deprecated.
{
"took" : 318, 花费了多少ms
"errors" : false, 没有发生任何错误
"items" : [ 每个数据的结果
{
"index" : { 保存
"_index" : "customer", 索引
"_type" : "external", 类型
"_id" : "1", 文档
"_version" : 1, 版本
"result" : "created", 创建
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1,
"status" : 201 新建完成
}
},
{
"index" : { 第二条记录
"_index" : "customer",
"_type" : "external",
"_id" : "2",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 1,
"_primary_term" : 1,
"status" : 201
}
}
]
}
示例2:
POST /_bulk
{"delete":{"_index":"website","_type":"blog","_id":"123"}}
{"create":{"_index":"website","_type":"blog","_id":"123"}}
{"title":"my first blog post"}
{"index":{"_index":"website","_type":"blog"}}
{"title":"my second blog post"}
{"update":{"_index":"website","_type":"blog","_id":"123"}}
{"doc":{"title":"my updated blog post"}}
7.4.7 样本测试数据
一份顾客银行账户信息的虚构的 JSON 文档样本。每个文档都有下列的 schema(模式):
{
"account_number": 0,
"balance": 16623,
"firstname": "Bradshaw",
"lastname": "Mckenzie",
"age": 29,
"gender": "F",
"address": "244 Columbus Place",
"employer": "Euron",
"email": "bradshawmckenzie@euron.com",
"city": "Hobucken",
"state": "CO"
}
测试数据地址https://gitee.com/xlh_blog/common_content/blob/master/es%E6%B5%8B%E8%AF%95%E6%95%B0%E6%8D%AE.json
POST bank/account/_bulk
//上面的数据
//GET _cat/indices, 查看索引
//刚导入了1000条数据
yellow open bank dno5JY9tTrGHdsjIMkvQyA 1 1 1000 0 414.2kb 414.2kb
7.5 进阶检索
7.5.1 SearchAPI
ES 支持两种基本方式检索:
- 一个是通过使用 REST request URI 发送搜索参数(uri+检索参数)
- 另一个是通过使用 REST request body 来发送它们(uri+请求体)
1)检索信息
- 一切检索从_search 开始
返回的数据如上图。
- uri+请求体进行检索
GET bank/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"account_number": {
"order": "desc"
}
}
]
}
HTTP 客户端工具(POSTMAN),get 请求不能携带请求体,我们变为post 也是一样的
我们POST 一个JSON 风格的查询请求体到_search API。
需要了解,一旦搜索的结果被返回,Elasticsearch 就完成了这次请求,并且不会维护任何
服务端的资源或者结果的cursor(游标)
7.5.2 Query DSL
1) 基本语法格式
Elasticsearch 提供了一个可以执行查询的Json 风格的DSL(domain-specific language 领域特定语言)。这个被称为Query DSL。该查询语言非常全面,并且刚开始的时候感觉有点复杂,
真正学好它的方法是从一些基础的示例开始的。
- 一个查询语句的典型结构
- 如果是针对某个字段,那么它的结构如下:
GET bank/_search
{
"query": {
"match_all": {}
},
"from": 0,
"size": 1,
"sort": [
{
"account_number": {
"order": "desc"
}
}
]
}
- query 定义如何查询
- match_all 查询类型【代表查询所有的所有】,es 中可以在query 中组合非常多的查询类型完成复杂查询
- 除了query 参数之外,我们也可以传递其它的参数以改变查询结果。如sort,size ; from+size 限定,完成分页功能
- sort 排序,多字段排序,会在前序字段相等时后续字段内部排序,否则以前序为准
2) 返回部分字段
GET bank/_search
{
"query": {
"match_all": {}
},
"from": 0,
"size": 5,
"_source": [
"age",
"balance"
]
}
3) match【匹配查询】
- 基本类型(非字符串),精确匹配
match 返回account_number=20 的
GET bank/_search
{
"query": {
"match": {
"account_number": "20"
}
}
}
- 字符串,全文检索
GET bank/_search
{
"query": {
"match": {
"address": "mill"
}
}
}
最终查询出address 中包含mill 单词的所有记录
match 当搜索字符串类型的时候,会进行全文检索,并且每条记录有相关性得分。
- 字符串,多个单词(分词+全文检索)
GET bank/_search
{
"query": {
"match": {
"address": "mill road"
}
}
}
最终查询出address 中包含mill 或者road 或者mill road 的所有记录,并给出相关性得分
4) match_phrase【短语匹配】
GET bank/_search
{
"query": {
"match_phrase": {
"address": "mill road"
}
}
}
将需要匹配的值当成一个整体单词(不分词)进行检索
查出address 中包含mill road 的所有记录,并给出相关性得分
5) multi_match【多字段匹配】
GET bank/_search
{
"query": {
"multi_match": {
"query": "mill",
"fields": [
"state",
"address"
]
}
}
}
state 或者address 包含mill
6) bool【复合查询】
bool 用来做复合查询:
复合语句可以合并任何其它查询语句,包括复合语句,了解这一点是很重要的。这就意味
着,复合语句之间可以互相嵌套,可以表达非常复杂的逻辑。
- must:必须达到must 列举的所有条件
GET bank/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"address": "mill"
}
},
{
"match": {
"gender": "F"
}
}
]
}
}
}
- should:应该达到should 列举的条件,如果达到会增加相关文档的评分,并不会改变查询的结果。如果query 中只有should 且只有一种匹配规则,那么should 的条件就会
被作为默认匹配条件而去改变查询结果。
GET bank/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"address": "mill"
}
},
{
"match": {
"gender": "M"
}
}
],
"should": [
{
"match": {
"address": "lane"
}
}
]
}
}
}
- must_not 必须不是指定的情况
GET bank/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"address": "mill"
}
},
{
"match": {
"gender": "M"
}
}
],
"should": [
{
"match": {
"address": "lane"
}
}
],
"must_not": [
{
"match": {
"email": "baluba.com"
}
}
]
}
}
}
address 包含mill,并且gender 是M,如果address 里面有lane 最好不过,但是email 必
须不包含baluba.com
7) filter【结果过滤】
并不是所有的查询都需要产生分数,特别是那些仅用于“filtering”(过滤)的文档。为了不计算分数Elasticsearch 会自动检查场景并且优化查询的执行。
GET bank/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"address": "mill"
}
}
],
"filter": {
"range": {
"balance": {
"gte": 10000,
"lte": 20000
}
}
}
}
}
}
8) term
和match 一样。匹配某个属性的值。全文检索字段用match,其他非text 字段匹配用term。
9) aggregations(执行聚合)
聚合提供了从数据中分组和提取数据的能力。最简单的聚合方法大致等于SQL GROUP BY 和SQL 聚合函数。在Elasticsearch 中,您有执行搜索返回hits(命中结果),并且同时返
回聚合结果,把一个响应中的所有hits(命中结果)分隔开的能力。这是非常强大且有效的,您可以执行查询和多个聚合,并且在一次使用中得到各自的(任何一个的)返回结果,使用
一次简洁和简化的API 来避免网络往返。
-
搜索address 中包含mill 的所有人的年龄分布以及平均年龄,但不显示这些人的详情。
GET bank/_search { "query": { # 查询出包含mill的 "match": { "address": "Mill" } }, "aggs": { #基于查询聚合 "ageAgg": { # 聚合的名字,随便起 "terms": { # 看值的可能性分布 "field": "age", "size": 10 } }, "ageAvg": { "avg": { # 看age值的平均 "field": "age" } }, "balanceAvg": { "avg": { # 看balance的平均 "field": "balance" } } }, "size": 0 # 不看详情 }
size:0 不显示搜索数据 aggs:执行聚合。聚合语法如下 "aggs": { "aggs_name 这次聚合的名字,方便展示在结果集中": { "AGG_TYPE 聚合的类型(avg,term,terms)": {} } },
-
复杂:按照年龄聚合,并且请求这些年龄段的这些人的平均薪资
GET bank/account/_search
{
"query": {
"match_all": {}
},
"aggs": {
"age_avg": {
"terms": {
"field": "age",
"size": 1000
},
"aggs": {
"banlances_avg": {
"avg": {
"field": "balance"
}
}
}
}
},
"size": 1000
}
- 复杂:查出所有年龄分布,并且这些年龄段中M 的平均薪资和F 的平均薪资以及这个年龄段的总体平均薪资
GET bank/_search
{
"query": {
"match_all": {}
},
"aggs": {
"ageAgg": {
"terms": { # 看age分布
"field": "age",
"size": 100
},
"aggs": { # 子聚合
"genderAgg": {
"terms": { # 看gender分布
"field": "gender.keyword" # 注意这里,文本字段应该用.keyword
},
"aggs": { # 子聚合
"balanceAvg": {
"avg": { # 男性的平均
"field": "balance"
}
}
}
},
"ageBalanceAvg": {
"avg": { #age分布的平均(男女)
"field": "balance"
}
}
}
}
},
"size": 0
}
7.5.3 Mapping
1)字段类型
-
text
⽤于全⽂索引,搜索时会自动使用分词器进⾏分词再匹配 -
keyword
不分词,搜索时需要匹配完整的值
2) 映射
3) 新版本改变
1. 创建映射
- 创建索引并指定映射
PUT /my_index
{
"mappings": {
"properties": {
"age": {
"type": "integer"
},
"email": {
"type": "keyword" # 指定为keyword
},
"name": {
"type": "text" # 全文检索。保存时候分词,检索时候进行分词匹配
}
}
}
}
2. 添加新的字段映射
PUT /my_index/_mapping
{
"properties": {
"employee-id": {
"type": "keyword",
"index": false # 字段不能被检索。检索
}
}
}
- 这里的 “index”: false,表明新增的字段不能被检索,只是一个冗余字段。
3. 更新映射
- 对于已经存在的映射字段,我们不能更新。更新必须创建新的索引进行数据迁移
4.数据迁移
- 先创建出new_twitter 的正确映射。然后使用如下方式进行数据迁移
6.0以后写法
POST _reindex 【固定写法】
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter"
}
}
老版本写法
POST reindex
{
"source":{
"index":"twitter",
"twitter":"twitter"
},
"dest":{
"index":"new_twitters"
}
}
更多见:https://www.elastic.co/guide/en/elasticsearch/reference/7.6/docs-reindex.html
- 将旧索引的type 下的数据进行迁移
POST _reindex
{
"source": {
"index": "twitter",
"type": "tweet"
},
"dest": {
"index": "tweets"
}
}
- 举例 :想要将年龄修改为integer
- 原来
GET /bank/_search
查出
"age":{"type":"long"}
- 先创建新的索引
PUT /newbank
{
"mappings": {
"properties": {
"account_number": {
"type": "long"
},
"address": {
"type": "text"
},
"age": {
"type": "integer"
},
"balance": {
"type": "long"
},
"city": {
"type": "keyword"
},
"email": {
"type": "keyword"
},
"employer": {
"type": "keyword"
},
"firstname": {
"type": "text"
},
"gender": {
"type": "keyword"
},
"lastname": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"state": {
"type": "keyword"
}
}
}
}
- 查看“newbank”的映射:
GET /newbank/_mapping:
能够看到age的映射类型被修改为了integer.
"age":{"type":"integer"}
- 将bank中的数据迁移到newbank中
POST _reindex
{
"source": {
"index": "bank",
"type": "account"
},
"dest": {
"index": "newbank"
}
}
运行输出:
#! Deprecation: [types removal] Specifying types in reindex requests is deprecated.
{
"took" : 768,
"timed_out" : false,
"total" : 1000,
"updated" : 0,
"created" : 1000,
"deleted" : 0,
"batches" : 1,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 0,
"requests_per_second" : -1.0,
"throttled_until_millis" : 0,
"failures" : [ ]
}
- 查看newbank中的数据
GET /newbank/_search
输出
"hits" : {
"total" : {
"value" : 1000,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "newbank",
"_type" : "_doc", # 没有了类型
7.5.4 分词
1) 安装ik 分词器
- 注意:不能用默认elasticsearch-plugin install xxx.zip 进行自动安装
- 将es和kibana都设置为虚拟机自启的时候开机启动,以后我们新安装的都这样设置。
- 下载es对应版本的ik分词器,下载地址:https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.4.2/elasticsearch-analysis-ik-7.4.2.zip
- 由于此时我们的虚拟机很多功能都还没有下载,比如说(wegt),我们可以采取在主机下载解压后,传到相应文件目录。
- 进入es 容器内部plugins 目录 (外部挂载在/mydata/elasticsearch/plugins目录下)
- 记录踩坑
使用xftp传输文件一直报错。(已经是root用户)
解决办法:修改权限 chmod 777 目的地文件夹。
- 自动安装好(可以进入bin目录看看是否有了)
2)修改linux网络设置
- 老师课件上ping不通外网
vi eth1
GATYWAY=192.168.56.1 一般ip地址是192.168.56.10 直接取第一位
DNS1=114.114.114.114
DNS2=8.8.8.8 //解析地址
3) 测试分词器
4)自定义词库
- 修改/usr/share/elasticsearch/plugins/ik/config中的IKAnalyzer.cfg.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 -->
<entry key="ext_dict"></entry>
<!--用户可以在这里配置自己的扩展停止词字典-->
<entry key="ext_stopwords"></entry>
<!--用户可以在这里配置远程扩展字典 -->
<entry key="remote_ext_dict">http://192.168.56.10/es/fenci.txt</entry>
<!--用户可以在这里配置远程扩展停止词字典-->
<!-- <entry key="remote_ext_stopwords">words_location</entry> -->
</properties>
修改完成后,需要重启elasticsearch容器,否则修改不生效。docker restart elasticsearch
更新完成后,es只会对于新增的数据用更新分词。历史数据是不会重新分词的。如果想要历史数据重新分词,需要执行:
POST my_index/_update_by_query?conflicts=proceed
- 安装nginx
mkdir nginx
# 在主机中创建nginx目录
docker pull nginx:1.10
# 随便启动一个nginx实例,只是为了复制出配置,放到docker里作为镜像的统一配置
docker run -p 80:80 --name nginx -d nginx:1.10
# 把nginx里的东西复制出来
cd /mydata/nginx
docker container cp nginx:/etc/nginx .
然后在外部 /mydata/nginx/nginx 有了一堆文件
mv /mydata/nginx/nginx /mydata/nginx/conf
# 停掉nginx
docker stop nginx
docker rm nginx
# 创建新的nginx,使用刚才复制出来的配置文件
docker run -p 80:80 --name nginx \
-v /mydata/nginx/html:/usr/share/nginx/html \
-v /mydata/nginx/logs:/var/log/nginx \
-v /mydata/nginx/conf:/etc/nginx \
-d nginx:1.10
# 注意一下这个路径映射到了/usr/share/nginx/html,我们在nginx配置文件中是写/usr/share/nginx/html,不是写/mydata/nginx/html
# 设置开机自启
docker update nginx --restart=always
- 测试
mkdir /mydata/nginx/html/es
cd /mydata/nginx/html/es
vim fenci.txt
输入尚硅谷
- 测试http://192.168.56.10/es/fenci.txt
7.6 Elasticsearch-Rest-Client
java操作es有两种方式
7.6.1 9300: TCP
- spring-data-elasticsearch:transport-api.jar;
- springboot版本不同,ransport-api.jar不同,不能适配es版本
- 7.x已经不建议使用,8以后就要废弃
7.6.2 9200: HTTP
有诸多包
-
jestClient: 非官方,更新慢;
-
RestTemplate:模拟HTTP请求,ES很多操作需要自己封装,麻烦;
-
HttpClient:同上;
-
Elasticsearch-Rest-Client:官方RestClient,封装了ES操作,API层次分明,上手简单;
最终选择Elasticsearch-Rest-Client(elasticsearch-rest-high-level-client)
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html
7.7 SpringBoot整合ElasticSearch
-
创建项目gulimall-search
-
选择依赖web,但不要在里面选择es
- https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-compatibility.html 文档地址
7.7.1 导入依赖
- 这里的版本要和所按照的ELK版本匹配。
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.2</version>
</dependency>
- 在spring-boot-dependencies中所依赖的ES版本位6.8.5,要改掉
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.4.2</elasticsearch.version>
</properties>
- 请求测试项,比如es添加了安全访问规则,访问es需要添加一个安全头,就可以通过requestOptions设置
官方建议把requestOptions创建成单实例
/**
* 1.导入依赖
* 2.编写配置
*/
@Configuration
public class GulimallElasticSearchConfig {
public static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
// builder.addHeader("Authorization", "Bearer " + TOKEN);
// builder.setHttpAsyncResponseConsumerFactory(
// new HttpAsyncResponseConsumerFactory
// .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
@Bean
public RestHighLevelClient esRestClient(){
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("192.168.56.10", 9200, "http")));
return client;
}
}
7.7.2 测试
1) 保存数据
-
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-index.html
-
保存方式分为同步和异步,异步方式多了个listener回调
/**
* 测试存储数据到es
*/
@Data
class User{
private String username;
private String gender;
private Integer age;
}
@Test
public void indexData() throws IOException {
// 设置索引
IndexRequest indexRequest = new IndexRequest("users");
indexRequest.id("1"); //数据的id
/**
* 后面我们基本都是使用这种方法
*/
User user = new User();
user.setUsername("zhangsan");
user.setAge(18);
user.setGender("男");
String jsonString = JSON.toJSONString(user);
//设置要保存的内容,指定数据和类型
indexRequest.source(jsonString, XContentType.JSON); //要保存的内容
//执行创建索引和保存数据
IndexResponse index = client.index(indexRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);
//提取有用的响应数据
System.out.println(index);
}
2)获取数据
- https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-search.html
@ToString
@Data
static class Account {
private int account_number;
private int balance;
private String firstname;
private String lastname;
private int age;
private String gender;
private String address;
private String employer;
private String email;
private String city;
private String state;
}
/**
* 复杂检索 在bank中搜索address中包含mill的所有人的年龄分布以及平均年龄,平均薪资
*/
@Test
public void searchData() throws IOException {
//1.创建检索请求
SearchRequest searchRequest = new SearchRequest();
//1.1)指定索引
searchRequest.indices("bank");
//1.2)构造检索条件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchQuery("address","Mill"));
//1.2.1)按照年龄分布进行聚合
TermsAggregationBuilder ageAgg = AggregationBuilders.terms("ageAgg").field("age").size(10);
sourceBuilder.aggregation(ageAgg);
//1.2.2)计算平均年龄
AvgAggregationBuilder ageAvg = AggregationBuilders.avg("ageAvg").field("age");
sourceBuilder.aggregation(ageAvg);
//1.2.3)计算平均薪资
AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
sourceBuilder.aggregation(balanceAvg);
System.out.println("检索条件:"+sourceBuilder);
searchRequest.source(sourceBuilder);
//2.执行检索
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println("检索结果:"+searchResponse);
//3. 将检索结果封装为Bean
SearchHits hits = searchResponse.getHits();
SearchHit[] searchHits = hits.getHits();
for (SearchHit searchHit : searchHits) {
String sourceAsString = searchHit.getSourceAsString();
Account account = JSON.parseObject(sourceAsString, Account.class);
System.out.println(account);
}
//4.获取聚合信息
Aggregations aggregations = searchResponse.getAggregations();
Terms ageAgg1 = aggregations.get("ageAgg");
//Buckets分析信息
for (Terms.Bucket bucket : ageAgg1.getBuckets()) {
String keyAsString = bucket.getKeyAsString();
System.out.println("年龄:"+keyAsString+"==>"+bucket.getDocCount());
}
Avg ageAvg1 = aggregations.get("ageAvg");
System.out.println("平均年龄:"+ageAvg1.getValue());
Avg balanceAvg1 = aggregations.get("balanceAvg");
System.out.println("平均薪资:"+balanceAvg1.getValue());
}
1. 检索条件:{"query":{"match":{"address":{"query":"Mill","operator":"OR","prefix_length":0,"max_expansions":50,"fuzzy_transpositions":true,"lenient":false,"zero_terms_query":"NONE","auto_generate_synonyms_phrase_query":true,"boost":1.0}}},"aggregations":{"ageAgg":{"terms":{"field":"age","size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"_count":"desc"},{"_key":"asc"}]}},"ageAvg":{"avg":{"field":"age"}},"balanceAvg":{"avg":{"field":"balance"}}}}
检索结果:{"took":27,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":4,"relation":"eq"},"max_score":5.4032025,"hits":[{"_index":"bank","_type":"account","_id":"970","_score":5.4032025,"_source":{"account_number":970,"balance":19648,"firstname":"Forbes","lastname":"Wallace","age":28,"gender":"M","address":"990 Mill Road","employer":"Pheast","email":"forbeswallace@pheast.com","city":"Lopezo","state":"AK"}},{"_index":"bank","_type":"account","_id":"136","_score":5.4032025,"_source":{"account_number":136,"balance":45801,"firstname":"Winnie","lastname":"Holland","age":38,"gender":"M","address":"198 Mill Lane","employer":"Neteria","email":"winnieholland@neteria.com","city":"Urie","state":"IL"}},{"_index":"bank","_type":"account","_id":"345","_score":5.4032025,"_source":{"account_number":345,"balance":9812,"firstname":"Parker","lastname":"Hines","age":38,"gender":"M","address":"715 Mill Avenue","employer":"Baluba","email":"parkerhines@baluba.com","city":"Blackgum","state":"KY"}},{"_index":"bank","_type":"account","_id":"472","_score":5.4032025,"_source":{"account_number":472,"balance":25571,"firstname":"Lee","lastname":"Long","age":32,"gender":"F","address":"288 Mill Street","employer":"Comverges","email":"leelong@comverges.com","city":"Movico","state":"MT"}}]},"aggregations":{"lterms#ageAgg":{"doc_count_error_upper_bound":0,"sum_other_doc_count":0,"buckets":[{"key":38,"doc_count":2},{"key":28,"doc_count":1},{"key":32,"doc_count":1}]},"avg#ageAvg":{"value":34.0},"avg#balanceAvg":{"value":25208.0}}}
GulimallSearchApplicationTests.Account(account_number=970, balance=19648, firstname=Forbes, lastname=Wallace, age=28, gender=M, address=990 Mill Road, employer=Pheast, email=forbeswallace@pheast.com, city=Lopezo, state=AK)
GulimallSearchApplicationTests.Account(account_number=136, balance=45801, firstname=Winnie, lastname=Holland, age=38, gender=M, address=198 Mill Lane, employer=Neteria, email=winnieholland@neteria.com, city=Urie, state=IL)
GulimallSearchApplicationTests.Account(account_number=345, balance=9812, firstname=Parker, lastname=Hines, age=38, gender=M, address=715 Mill Avenue, employer=Baluba, email=parkerhines@baluba.com, city=Blackgum, state=KY)
GulimallSearchApplicationTests.Account(account_number=472, balance=25571, firstname=Lee, lastname=Long, age=32, gender=F, address=288 Mill Street, employer=Comverges, email=leelong@comverges.com, city=Movico, state=MT)
年龄:38==>2
年龄:28==>1
年龄:32==>1
平均年龄:34.0
平均薪资:25208.0
检索条件:
{
"query": {
"match": {
"address": {
"query": "Mill",
"operator": "OR",
"prefix_length": 0,
"max_expansions": 50,
"fuzzy_transpositions": true,
"lenient": false,
"zero_terms_query": "NONE",
"auto_generate_synonyms_phrase_query": true,
"boost": 1.0
}
}
},
"aggregations": {
"ageAgg": {
"terms": {
"field": "age",
"size": 10,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [{
"_count": "desc"
}, {
"_key": "asc"
}]
}
},
"ageAvg": {
"avg": {
"field": "age"
}
},
"balanceAvg": {
"avg": {
"field": "balance"
}
}
}
}
检索结果: {
"took": 27,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 4,
"relation": "eq"
},
"max_score": 5.4032025,
"hits": [{
"_index": "bank",
"_type": "account",
"_id": "970",
"_score": 5.4032025,
"_source": {
"account_number": 970,
"balance": 19648,
"firstname": "Forbes",
"lastname": "Wallace",
"age": 28,
"gender": "M",
"address": "990 Mill Road",
"employer": "Pheast",
"email": "forbeswallace@pheast.com",
"city": "Lopezo",
"state": "AK"
}
}, {
"_index": "bank",
"_type": "account",
"_id": "136",
"_score": 5.4032025,
"_source": {
"account_number": 136,
"balance": 45801,
"firstname": "Winnie",
"lastname": "Holland",
"age": 38,
"gender": "M",
"address": "198 Mill Lane",
"employer": "Neteria",
"email": "winnieholland@neteria.com",
"city": "Urie",
"state": "IL"
}
}, {
"_index": "bank",
"_type": "account",
"_id": "345",
"_score": 5.4032025,
"_source": {
"account_number": 345,
"balance": 9812,
"firstname": "Parker",
"lastname": "Hines",
"age": 38,
"gender": "M",
"address": "715 Mill Avenue",
"employer": "Baluba",
"email": "parkerhines@baluba.com",
"city": "Blackgum",
"state": "KY"
}
}, {
"_index": "bank",
"_type": "account",
"_id": "472",
"_score": 5.4032025,
"_source": {
"account_number": 472,
"balance": 25571,
"firstname": "Lee",
"lastname": "Long",
"age": 32,
"gender": "F",
"address": "288 Mill Street",
"employer": "Comverges",
"email": "leelong@comverges.com",
"city": "Movico",
"state": "MT"
}
}]
},
"aggregations": {
"lterms#ageAgg": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [{
"key": 38,
"doc_count": 2
}, {
"key": 28,
"doc_count": 1
}, {
"key": 32,
"doc_count": 1
}]
},
"avg#ageAvg": {
"value": 34.0
},
"avg#balanceAvg": {
"value": 25208.0
}
}
}
-
mysql GROUP_CONCAT(DISTINCT ssav.
attr_value
) -
https://www.yiibai.com/mysql/group_concat.html
-
thymeleaf中的# 和$符号这些区别。
${} 变量表达式:用于访问容器上下文环境中的变量;
**#{} ** 消息表达式(井号表达式,资源表达式):通常与th:text属性一起使用,指明声明了th:text的标签的文本是#{}中的key所对应的value,而标签内的文本将不会显示。
-
循环遍历的几种方式
-
Map
-
mybatisplus中的foreach #{}
-
类和对象构造方法
Ctrl + w 快速选中一个单词
我修改成了 alt +w
-
方法引用 SkuInfoEntity::getSkuId
-
jquery发送的ajax请求
-
函数式接口
-
thymeleaf中取出地址栏中的数据 ${} controller中是model,放入model中
-
springmvc 中的model
-
threadLocal
多线程访问同一个共享变量的时候容易出现并发问题,特别是多个线程对一个变量进行写入的时候,为了保证线程安全,一般使用者在访问共享变量的时候需要进行额外的同步措施才能保证线程安全性。ThreadLocal是除了加锁这种同步方式之外的一种保证一种规避多线程访问出现线程不安全的方法,当我们在创建一个变量后,如果每个线程对其进行访问的时候访问的都是线程自己的变量这样就不会存在线程不安全问题。
ThreadLocal是JDK包提供的,它提供线程本地变量,如果创建一乐ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的一个副本,在实际多线程操作的时候,操作的是自己本地内存中的变量,从而规避了线程安全问题,如下图所示
-
为了在页面展示信息,我们可以将其放入请求域中,model…
给model中放的数据,如果是转发,会放在请求域中,如果是重定向,会自动拼接在地址上。
-
单元测试写了,没有运行按钮
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aDbqB2CD-1673534545234)(null)]
8 分布式事务
前言 本地事务在分布式下的问题
本地事务会失效不回滚的两种情况:
①锁库存假失败,由于网络原因导致连接超时,但是锁库存已经操作成功。此时,订单数据回滚而锁库存数据没有回滚。
②其它远程服务调用失败,订单数据回滚,但是已经执行成功的远程服务调用的数据库数据无法回滚
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-m6SRb06m-1673534544691)(null)]
8.1 本地事务
8.1.1 事务的基本性质
数据库事务的几个特性:原子性(Atomicity )、一致性( Consistency )、隔离性或独立性( Isolation)
和持久性(Durabilily),简称就是ACID;
- 原子性:一系列的操作整体不可拆分,要么同时成功,要么同时失败
- 一致性:数据在事务的前后,业务整体一致。
- 转账。A:1000;B:1000; 转200 事务成功; A:800 B:1200
- 隔离性:事务之间互相隔离。
- 持久性:一旦事务成功,数据一定会落盘在数据库。
在以往的单体应用中,我们多个业务操作使用同一条连接操作不同的数据表,一旦有异常,我们可以很容易的整体回滚;
Business:我们具体的业务代码
Storage:库存业务代码;扣库存
Order:订单业务代码;保存订单
Account:账号业务代码;减账户余额
比如买东西业务,扣库存,下订单,账户扣款,是一个整体;必须同时成功或者失败。
一个事务开始,代表以下的所有操作都在同一个连接里面;
8.1.2 事务的隔离级别
- READ UNCOMMITTED(读未提交)
该隔离级别的事务会读到其它未提交事务的数据,此现象也称之为脏读。 - READ COMMITTED(读已提交)
一个事务可以读取另一个已提交的事务,多次读取会造成不一样的结果,此现象称为不可重复读问题,Oracle 和SQL Server 的默认隔离级别。 - REPEATABLE READ(可重复读)
该隔离级别是MySQL 默认的隔离级别,在同一个事务里,select 的结果是事务开始时时间点的状态,因此,同样的select 操作读到的结果会是一致的,但是,会有幻读现象。MySQL
的InnoDB 引擎可以通过next-key locks 机制(参考下文"行锁的算法"一节)来避免幻读。 - SERIALIZABLE(序列化)
在该隔离级别下事务都是串行顺序执行的,MySQL 数据库的InnoDB 引擎会给读操作隐式加一把读共享锁,从而避免了脏读、不可重读复读和幻读问题。
8.1.3 事务的传播行为
1、PROPAGATION_REQUIRED:如果当前没有事务,就创建一个新事务,如果当前存在事务,就加入该事务,该设置是最常用的设置。
2、PROPAGATION_SUPPORTS:支持当前事务,如果当前存在事务,就加入该事务,如果当前不存在事务,就以非事务执行。
3、PROPAGATION_MANDATORY:支持当前事务,如果当前存在事务,就加入该事务,如果当前不存在事务,就抛出异常。
4、PROPAGATION_REQUIRES_NEW:创建新事务,无论当前存不存在事务,都创建新事务。
5、PROPAGATION_NOT_SUPPORTED:以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
6、PROPAGATION_NEVER:以非事务方式执行,如果当前存在事务,则抛出异常。
7、PROPAGATION_NESTED:如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,
则执行与PROPAGATION_REQUIRED 类似的操作。
8.1.4 SpringBoot 事务关键点
1、事务的自动配置
TransactionAutoConfiguration
2、事务的坑
在同一个类里面,编写两个方法,内部调用的时候,会导致事务设置失效。原因是没有用到代理对象的缘故。
解决:
0)、导入spring-boot-starter-aop
1)、@EnableTransactionManagement(proxyTargetClass = true)
2)、@EnableAspectJAutoProxy(exposeProxy=true)
3)、AopContext.currentProxy() 调用方法
8.1.5 实际案例剖析
案例一:
方法B()和方法A()共用一个事务,方法C则创建一个新事务,若出现异常则方法B()和方法A()会回滚,方法C()则不会
案例二:
方法B()设置了事务的超时时间,但是方法B()和方法A()共用方法A()的事务,因此,以方法A设置的超时时间为准。
SpringBoot事务的坑
事务失效的原因:绕过了代理
①未启用事务
@EnableTransactionManagement 注解用来启用spring事务自动管理事务的功能,这个注解千万不要忘记写了。
② 方法不是public类型的。
@Transactional 可以用在类上、接口上、public方法上,如果将@Trasactional用在了非public方法上,事务将无效。
③数据源未配置事务管理器
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
④自身调用问题
spring是通过aop的方式,对需要spring管理事务的bean生成了代理对象,然后通过代理对象拦截了目标方法的执行,在方法前后添加了事务的功能,所以必须通过代理对象调用目标方法的时候,事务才会起效。
看下面代码,大家思考一个问题:当外部直接调用m1的时候,m2方法的事务会生效么?
@Component
public class UserService {
public void m1(){
this.m2();
}
@Transactional
public void m2(){
//执行db操作
}
}
显然不会生效,因为m1中通过this的方式调用了m2方法,而this并不是代理对象,this.m2()不会被事务拦截器,所以事务是无效的,如果外部直接调用通过UserService这个bean来调用m2方法,事务是有效的,上面代码可以做一下调整,如下,@1在UserService中注入了自己,此时会产生更为严重的问题:循环依赖
@Component
public class UserService {
@Autowired //@1
private UserService userService;
public void m1() {
this.userService.m2();
}
@Transactional
public void m2() {
//执行db操作
}
}
⑤ 异常类型错误
spring事务回滚的机制:对业务方法进行try catch,当捕获到有指定的异常时,spring自动对事务进行回滚,那么问题来了,哪些异常spring会回滚事务呢?
并不是任何异常情况下,spring都会回滚事务,默认情况下,RuntimeException和Error的情况下,spring事务才会回滚。
也可以自定义回滚的异常类型(需继承RuntimeException):
@Transactional(rollbackFor = {异常类型列表})
⑥异常被吞了
当业务方法抛出异常,spring感知到异常的时候,才会做事务回滚的操作,若方法内部将异常给吞了,那么事务无法感知到异常了,事务就不会回滚了。
如下代码,事务操作2发生了异常,但是被捕获了,此时事务并不会被回滚
@Transactional
public void m1(){
事务操作1
try{
事务操作2,内部抛出了异常
}catch(Exception e){
}
}
⑦业务和spring事务代码必须在一个线程中
spring事务实现中使用了ThreadLocal,ThreadLocal大家应该知道吧,可以实现同一个线程中数据共享,必须是同一个线程的时候,数据才可以共享,这就要求业务代码必须和spring事务的源码执行过程必须在一个线程中,才会受spring事务的控制,比如下面代码,方法内部的子线程内部执行的事务操作将不受m1方法上spring事务的控制,这个大家一定要注意
@Transactional
public void m1() {
new Thread() {
一系列事务操作
}.start();
}
解决方案:
本地事务失效的原因:同一个对象内事务方法互相调用默认失效,原因绕过了代理对象,事务使用代理对象来控制
解决:使用代理对象来调用事务方法
方法B()和方法C()的事务属性设置会失效,原因是绕过了代理,SpringBoot的事务是通过AOP代理实现的
解决事务失效的步骤:
1.引入aspectj依赖
<!-- 导入aop依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
aop引入了 aspectj依赖
2. 开启aspectj动态代理功能,以后所有的动态代理都是aspectj创建的。通过设置exposeProxy暴露代理对象
3. 本类互调用对象
@Transactional(timeout = 30)
public void A(){
// B();
// C();
OrderServiceImpl service =(OrderServiceImpl)AopContext.currentProxy();
service.B();
service.C();
int i = 10/0;
}
@Transactional(propagation = Propagation.REQUIRED,timeout = 20)
public void B(){
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void C(){
}
*本地事务失效问题 * 同一个对象内事务方法互调默认失效,原因:绕过了代理对象,事务使用代理对象来控制的 * 解决:使用代理对象来调用事务方法 * 1)、引入spring-boot-starter-aop:引入了aspectj * 2)、@EnableAspectJAutoProxy(exposeProxy = true):开启 aspectj 动态代理功能。以后所有的动态代理都是aspectj创建的(即使没有接口也会创建代理对象) * exposeProxy = true:对外暴露代理对象 * 3)、本类互调用对象 * OrderServiceImpl orderService = (OrderServiceImpl) AopContext.currentProxy(); * orderService.b(); * orderService.c(); *
以下是参考网友的:
ps:可以参考:
分布式事务:
订单服务下订单---------
库存服务锁库存---------->分布式事务
用户服务扣减积分-------/
事务保证:
1、订单服务异常,库存锁定不运行,全部回滚,撤销操作
2、库存服务事务自治,锁定失败全部回滚,订单感受到,继续回滚
3、库存服务锁定成功了,但是网络原因返回数据途中问题?
4、库存服务锁定成功了,库存服务下面的逻辑发生故障,订单回滚了,怎么处理?
利用消息队列实现最终一致
库存服务锁定成功后发给消息队列消息(当前库存工作单),过段时间自动解锁,解锁时先查询订单的支付状态,解锁成功修改库存工作单详情项状态为已解锁。
- 1、远程服务假失败:远程服务其实成功了,由于网络故障等没有返回导致:订单回滚,库存却扣减
- 2、远程服务执行完成,下面的其他方法出现问题导致:已执行的远程请求,肯定不能回滚
事务传播问题中,传播后事务设置还是原来的,如果不想用原来设置,必须new事务
注意同类中调用的话,被调用事务会失效,原因在于aop。事务基于代理,同对象的方法动态代理都是同一个。解决方案是使用代理对象调用。引用aop-starter后,使用aspectJ,开启AspectJ动态代理,原来默认使用的是jdk动态代理。
使用@EnableAspectJAutoProxy(exposeProxy=true)后,就取代了jdk动态代理。它没有接口也可以创建动态代理。设置true是为了对外暴露代理对象。
AopContext.currentProxy()然后强转,就是当前代理对象。
public interface AService {
public void a();
public void b();
}
@Service()
public class AServiceImpl1 implements AService{
@Transactional(propagation = Propagation.REQUIRED)
public void a() {
this.b();
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void b() {
}
}
此处的this指向目标对象,因此调用this.b()将不会执行b事务切面,即不会执行事务增强,
因此b方法的事务定义“@Transactional(propagation = Propagation.REQUIRES_NEW)”将不会实施,
即结果是b和a方法的事务定义是一样的(我们可以看到事务切面只对a方法进行了事务增强,没有对b方法进行增强)
Q1:b中的事务会不会生效?
A1:不会,a的事务会生效,b中不会有事务,因为a中调用b属于内部调用,没有通过代理,所以不会有事务产生。
Q2:如果想要b中有事务存在,要如何做?
A2:<aop:aspectj-autoproxy expose-proxy=“true”> ,设置expose-proxy属性为true,将代理暴露出来,使用AopContext.currentProxy()获取当前代理,将this.b()改为((UserService)AopContext.currentProxy()).b()
解决方案:
public void a() {
((AService) AopContext.currentProxy()).b();//即调用AOP代理对象的b方法即可执行事务切面进行事务增强
}
8.2 分布式事务
8.2.1 为什么有分布式事务
分布式系统经常出现的异常
机器宕机、网络异常、消息丢失、消息乱序、数据错误、不可靠的TCP、存储数据丢失…
分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免。
8.2.2 CAP 定理
1、CAP 定理
CAP 原则又称CAP 定理,指的是在一个分布式系统中
-
一致性(Consistency):
在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
-
可用性(Availability)
在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性) -
分区容错性(Partition tolerance)
大多数分布式系统都分布在多个子网络。每个子网络就叫做一个区(partition)。
分区容错的意思是,区间通信可能失败。比如,一台服务器放在中国,另一台服务器放在美国,这就是两个区,它们之间可能无法通信。
CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾,分区容错必须都要
- CP要求一致性(有一个没同步好就不可用)
- AP要求高可用
一般来说,分区容错无法避免,因此可以认为CAP 的P 总是成立。CAP 定理告诉我们,
剩下的C 和A 无法同时做到。
分布式系统中实现一致性的raft 算法、paxos
动画详解:
选举与同步理论
raft是一个实现分布式一致性的协议
结点的状态:
- follower
- candidate
- leader
选举leader:
- 默认都以follower状态启动,follower监听不到leader,就称为一个candidate
- 投票给自己,然后告诉其他人,同时也收到别人的投票信息。根据投票信息和投票信息里带的信息(如那个节点里的数据)
- 收到投票后,改选一个自己觉得最靠谱的。某一节点收到票数超过一半就变成leader
raft有两个超时时间控制领导选举:
选举超时:从follower到candidate的时间,150ms-300ms(自旋时间),这个时间段内没收到leader的心跳就变为候选者。
- 自旋时间结束后变成candidate,开始一轮新的选举(老师上课举的例子是)
- 投出去票后重新计时自旋
- leader就发送追加日志给follower,follower就正常
消息发送的,心跳时间:如10ms,leader收到投票后,下一次心跳时就带上消息,follower收到消息后重置选举时间
- leader宕机,follower收不到心跳,开始新的选举
写数据:
- 接下来所有的数据都要先给leader,leader派发给follower
- 比如领导收到信息5后,领导先在leader的log中写入变化set 5。(上面的动态红颜色代表没提交),此时5还没提交,而是改了leader的log后,
- leader下一次心跳时,顺便带着信息让follower也去改变follower的log,follower写入日志成功后,发送确认ack 5给leader,
- leader收到大多数的ack后,leader就自己正式写入数据,然后告诉follower提交写入硬盘/内存吧(这个过程和响应客户端是同时的)。这个过程叫做日志复制(也有过半机制)
- 然后leader响应说集群写入好了
如果有的结点消息滞后了:
5台机器因为局域网隔离又分为3、2生成两个leader怎么办:
对于1,2结点那个leader,更新log后收不到大多数的ack(得超过1个ack),所以改log不成功,一直保存不成功
对于345结点的leader:收到消息后更新log并且收到ack过半且超过1个,成功保存。
此时网络又通了,以更高轮选举的leader为主,退位一个leader。那1,2结点日志都回滚,同步新leader的log。这样就都一致性了
另外注意:集群一般都是单数,因为有过半机制。比如原来集群6个机器,分为2半后,各3个,选leader时谁都拿不到6/2+1=4个投票,所以都没有leader
更多动画(可以自己选择宕机情况)raft.github.io
但一般都是保证AP,舍弃C
自我总结Raft算法的原理说明:
Raft算法的原理说明:
首先,在Raft中一个节点有三种角色:①追随者(Follower)②候选人(Candidate)③领导者(Leader)
一开始,所有节点都是追随者状态,如果没有监听到领导者给他们发的信息,他们可以变成候选人(他觉得没有人命令他了,他就可以当领导了),候选人将会给追随者发起投票选举,追随者们将会投票给候选人,如果候选人得到了大多数票则它将会成为领导者。这个过程被成为:领导选举。
追随者是如何成为候选人的呢?首先,节点有一个自旋超时时间(150ms-300ms),谁自旋结束的快谁就是候选者,候选人发起选举【先投自己一票,然后将投票请求发送给其他节点】,如果节点在此轮选举中还没有投过票【没有投票给其他人】,那么节点将会投票给它【投票成功:节点将重置自己的选举超时时间】,一旦候选人收到大多数投票那么它将成为领导者。成为领导者之后则开始心跳联络,定期向节点发出我还在的消息,节点回复收到,这种状态直到领导者挂掉为止。
两个时间控制选取领导选举:
①自旋时间【选举超时】:是随从想要变成候选者的这段时间【想不想当领导】,一般是 150ms - 300ms ,我们一般称为节点的自旋时间。
②心跳时间【会发送日志】:消息的发送时间。领导者每隔一段时间就要给随从发送消息【相当于维护一个心跳连接】,节点收到消息之后就会自动重置自己的选举时间【自旋时间】,继而整个领导者节点就会和其他节点一直维护这个心跳连接。【心跳时间较短,否则超过300ms就会变成候选者】
投票如果失败,会一直投票,直到选到一个领导者。分布式系统中,为了保证一致性,所有改变将需要听从领导者,假设客户端发来一条 SET 5 命令,首先,领导者会将这条命令写入到log中(此时是没有提交的状态 uncommitted:意味着其他人从我这读数据还是读不到 5 这个数据的),然后会将 SET 5 这个命令记录下的日志发送给它的追随者【在下一次心跳时发送出去】,追随者们也收到了 SET 5 这个日志并写入,领导者接收到大多数节点的响应回复–已经将这条命令写入log中了【注意,此时所有节点日志中的这条命令都是uncommited的】,之后领导就会提交(committed),现在领导的数据就变成了 5,然后在下一次心跳时间领导就会响应给客户端保存成功并且通知它的随从节点:我已经提交了,你们也可以提交了,那么其他节点都是commited的,数据就变成了 5,这个过程被成为:日志复制。
日志复制总结:
领导者需要将所有的改变复制发送给其他节点,【使用日志的方式发送,且是在每一次心跳的时候发送出去的】,其他节点同步改变日志。
日志复制过程在分区中的体现【容忍分区错误】:由于网络原因,A、B被划分为1区,C、D、E被划分为2区,1区和2区之间不能通信,A原来是领导者所以在1区它还是领导者,2区经过多轮选举选出了新的领导者,现在有Client1给1区发 SET 10 的命令,A保存命令至日志然后通知B也保存日志,但是通知没有得到大多数节点的回复因此是uncommited的状态,Client2给2区发 SET 100 命令,2区领导者保存命令至日志,同时通知其它节点页保存命令至日志并且收到大多数节点的回复,2区领导者将会commit并会通知其它节点也去commit的。最终,当1区和2区的通信恢复了,由于2区的领导者是经过多轮选举【看版本号】选出的所以它成为了所有节点的领导者,原来1区的领导者就变成了追随者,1区A、B节点发现跟领导者的日志不一致,马上回滚还未提交的日志并匹配新领导者的日志【同步日志】,至此所有节点的数据是一致的。
Raft算法总结:通过领导选举和日志复制保证分布式系统的一致性,即使有分区错误,也能容忍,并保证一致性。
CP面临的问题:
对于多数大型互联网应用场景,主机众多、部署分散,而且现在的集群规模越来越大,所以节点故障、网络故障是常态,而且要保证服务可用性达到99.9999999(N个9),即保证P和A,舍弃C。舍弃C的含义是:保证数据的最终一致性而不是去追求强一致性。
8.2.3 BASE 理论
是对CAP 理论的延伸,思想是即使无法做到强一致性(CAP 的一致性就是强一致性),但可以采用适当的采取弱一致性,即最终一致性。
BASE 是指
-
基本可用(Basically Available)
- 基本可用是指分布式系统在出现故障的时候,允许损失部分可用性(例如响应时间、功能上的可用性)。需要注意的是,基本可用绝不等价于系统不可用。
- 响应时间上的损失:正常情况下搜索引擎需要在0.5 秒之内返回给用户相应的查询结果,但由于出现故障(比如系统部分机房发生断电或断网故障),查询结果的响应时间增加到了1~2 秒。
- 功能上的损失:购物网站在购物高峰(如双十一)时,为了保护系统的稳定性,部分消费者可能会被引导到一个降级页面。
- 基本可用是指分布式系统在出现故障的时候,允许损失部分可用性(例如响应时间、功能上的可用性)。需要注意的是,基本可用绝不等价于系统不可用。
-
软状态( Soft State)
- 软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。分布式存储中一般一份数据会有多个副本,允许不同副本同步的延时就是软状态的体现。mysql replication 的异步复制也是一种体现。
-
最终一致性( Eventual Consistency)
- 最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况。
8.2.4 强一致性、弱一致性、最终一致性
从客户端角度,多进程并发访问时,更新过的数据在不同进程如何获取的不同策略,决定了不同的一致性。对于关系型数据库,要求更新过的数据能被后续的访问都能看到,这是强一致性(本地事务就是强一致性,要么成功,要么失败)。如果能容忍后续的部分或者全部访问不到,则是弱一致性。如果经过一段时间后要求能访问到更新后的数据,则是最终一致性。
8.2.5 分布式事务几种方案
1)、2PC 模式
数据库支持的2PC【2 phase commit 二阶提交】,又叫做XA Transactions。MySQL 从5.5 版本开始支持,SQL Server 2005 开始支持,Oracle 7 开始支持。
其中,XA 是一个两阶段提交协议,该协议分为以下两个阶段:
- 第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交.
- 第二阶段:事务协调器要求每个数据库提交数据。
其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。
如图所示,如果有订单服务和库存服务要求分布式事务,要求有一个总的事务管理器
总的事务管理让事务分为两个阶段,
第一个阶段是预备(log)。
第二个阶段是正式提交(commit)
总事务管理器接收到两个服务都预备好了log(收到ack),就告诉他们commit如果有一个没准备好,就回滚所有人。
- XA 协议比较简单,而且一旦商业数据库实现了XA 协议,使用分布式事务的成本也比较低。
- XA 性能不理想,特别是在交易下单链路,往往并发量很高,XA 无法满足高并发场景
- XA 目前在商业数据库支持的比较理想,在mysql 数据库中支持的不太理想,mysql 的XA 实现,没有记录prepare 阶段日志,主备切换回导致主库与备库数据不一致。
- 许多nosql 也没有支持XA,这让XA 的应用场景变得非常狭隘。
- 也有3PC,引入了超时机制(无论协调者还是参与者,在向对方发送请求后,若长时间未收到回应则做出相应处理)
2)、柔性事务-TCC 事务补偿型方案
刚性事务:遵循ACID 原则,强一致性。
柔性事务:遵循BASE 理论,最终一致性;
与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。
一阶段prepare 行为:调用自定义的prepare 逻辑。
二阶段commit 行为:调用自定义的commit 逻辑。
二阶段rollback 行为:调用自定义的rollback 逻辑。
Try代码模块中需要Coder自己编写业务逻辑,Confirm代码块中会提交数据(例如:加2),那么在Cancel中则需要Coder编写回滚逻辑(例如:减2)
所谓TCC 模式,是指支持把自定义的分支事务纳入到全局事务的管理中。
3)、柔性事务-最大努力通知型方案
按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对。这种方案主要用在与第三方系统通讯时,比如:调用微信或支付宝支付后的支付结果通知。这种方案也是结合MQ 进行实现,例如:通过MQ 发送http 请求,设置最大通知次数。达到通知次数后即不再通知。
案例:银行通知、商户通知等(各大交易业务平台间的商户通知:多次通知、查询校对、对账文件),支付宝的支付成功异步回调。
大业务调用订单,库存,积分。最后积分失败,则一遍遍通知他们回滚
让子业务监听消息队列
如果收不到就重新发
4)、柔性事务-可靠消息+最终一致性方案(异步确保型)
实现:业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才会真正发送。
8.2.6 seata解决分布式事务问题
1、Seata简单概念
Seata使用的是2PC的模式
Seata快速开始传送门:Seata 快速开始
- Seata是什么
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
- 专业术语
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
2、SEATA 的分布式交易解决方案
Seata的工作模式: 首先,TM会告诉TC全局事务开始了,由各个事务分支向TC汇报事务的状态,是成功还是回滚。如果有一个事务分支汇报回滚,则通过TC让之前提交的事务都会回滚,回滚的依赖于Seata中的Magic表,用于记录提交之前的版本和数据。
2、Seata分布式事务体验
要执行下单,
- TM告诉TC开启一个全局事务。
- storage注册分支事务,实时向TC汇报分支状态
- account失败,告诉TC失败了,TC回滚全部全局事务。
我们只需要使用一个 @GlobalTransactional
注解在业务方法上:
@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
......
}
开启Seata分布式事务的步骤:
1.为每一个微服务创建undo_log
我们有业务步骤,但是SEATA AT
模式需要 UNDO_LOG
表,记录之前执行的操作。每个涉及的子系统对应的数据库都要新建表。
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
2.导入Seata依赖
注意:这里老师直接将Seata 依赖导入common中,实际测试时,所有项目都启动不了
解决方案:
如果是给公共服务中引入的 seata 依赖,就需要给其他不使用 seata的服务中配置如下内容:
排除 common 服务里面的 seata 依赖,这里因为我们暂时只 给 订单、库存服务里面使用,其他微服务都需要进行如下操作:排除seata 依赖。
此处我采用给需要的服务引入 seata 依赖就行。这里暂时就向 库存服务和 订单服务里面引入 seata 依赖。
这里我们使用 2.1.0 的版本 ,使用 seata-server 0.7.1
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
seata:0.7.1 下载地址:https://github.com/seata/seata/releases/download/v0.7.1/seata-server-0.7.1.zip
seata快速开始:https://seata.io/zh-cn/docs/user/quickstart.html
3、修改seata-server的配置文件
将registry.conf中的type修改为nacos并修改serverAddr为本机注册中心地址
此时,启动 seata-server,可以在 nacos中看到一个 serverAddr服务注册进来。
- nacos地址
- 查看服务
4.所有想要用到分布式事务的微服务使用seata DataSourceProxy代理自己的数据源
- 注入 DataSourceProxy
由于Seata通过代理数据源现分支持事务,如果没有注册,事务无法成功返回滚
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
/**
* 需要将 DataSourceProxy 设置为主数据源,否则事务无法回滚
*
* @param druidDataSource The DruidDataSource
* @return The default datasource
*/
@Primary
@Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
与上面配置数据源的方式等价,这么配置
注意细节:高版本之后无需配置数据源,这步可忽略
@Configuration
public class MySeataConfig {
@Autowired
DataSourceProperties dataSourceProperties;
@Bean
public DataSource dataSource(DataSourceProperties dataSourceProperties){
// properties.initializeDataSourceBuilder().type(type).build();
HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
if (StringUtils.hasText(dataSourceProperties.getName())) {
dataSource.setPoolName(dataSourceProperties.getName());
}
return new DataSourceProxy(dataSource);//使用 seata 进行包装数据源
}
}
在order、ware中都配置好上面的配置。
5、@GlobalTransactional
在大事务的入口标记注解@GlobalTransactional
开启全局事务,并且每个小事务标记注解@Transactional
6、其他配置
要求每个微服务要有register.conf和file.conf
将register.conf和file.conf复制到需要开启分布式事务的根目录,并修改file.conf
vgroup_mapping.${application.name}-fescar-service-group = “default”
官方要求:
- file.conf 的 service.vgroup_mapping 配置必须和
spring.application.name
一致
org.springframework.cloud:spring-cloud-starter-alibaba-seata
在的org.springframework.cloud.alibaba.seata.GlobalTransactionAutoConfiguration
类中,默认会使用作服务名注册到Seata${spring.application.name}-fescar-service-group
Server上,如果和file.conf
中的配置不一致,会提示no available server to connect
错误也可以通过配置
spring.cloud.alibaba.seata.tx-service-group
修改后缩,但必须和file.conf
中的配置保持一致
在大事务上@GlobalTransactional,小事务上@Transactional即可。
* seata控制分布式事务 * 1)、每一个微服务先必须创建 undo_log; * 2)、安装事务协调器;seata-server: https://github.com/seata/seata/releases * 3)、整合 * 1、导入依赖 spring-cloud-starter-alibaba-seata seata-all-0.7.1 * 2、解压并启动seata-server: * registry.conf: 注册中心配置:修改 registry type =nacos * file.conf: * 3、所有想要用到分布式事务的微服务使用 seata DataSourceProxy代理自己的数据源 * 4、每个微服务,都必须导入 * registry.conf * file.conf: vgroup_mapping.{application.name}-fescar-service-group = "default" * 5、启动测试分布式事务 * 6、给分布式大事务的入口标注 @GlobalTransactional * 7、每一个远程的小事务用 @Transactional
8.2.7 最终一致性库存解锁逻辑
seata 的AT模式(实际上就是 2pc模式)并不适合于高并发场景,原因在于:加锁导致整个线程变成串行化执行,效率太低下了
seata的TCC模式、SAGA模式可以自行学习:
传送门:https://github.com/seata/seata-samples
以下情况适合使用 at 模式:
比如发布商品后需要保存商品,这里面发生了几次远程调用,适合使用at模式。
在高并发,如下单,at模式有很多锁,影响效率。所以不使用at tcc。使用消息方式。
失败了之后发消息。库存服务本身也可以使用自动解锁模式。消息队列。
自动解锁:库存服务订阅消息队列,库存解锁发给消息队列
保存库存工作单和库存工作单详情,
锁定库存后数据库记录。后面的事务失败后看前面的库存,有没解锁的就解锁。
定期全部检索很麻烦,所以引入延迟队列。
锁库存后害怕订单失败,锁库存后发送给消息队列,只不过要暂存一会先别被消费。半小时以后再消费就可以知道大事务成功没有。
具体逻辑:
增强版逻辑
9 SpringCloud Alibaba-Sentinel
9.1 简介
9.1.1 熔断降级限流
什么是熔断
A 服务调用B 服务的某个功能,由于网络不稳定问题,或者B 服务卡机,导致功能时间超长。如果这样子的次数太多。我们就可以直接将B 断路了(A 不再请求B 接口),凡是调用B 的直接返回降级数据,不必等待B 的超长执行。这样B 的故障问题,就不会级联影响到A。
什么是降级
整个网站处于流量高峰期,服务器压力剧增,根据当前业务情况及流量,对一些服务和页面进行有策略的降级[停止服务,所有的调用直接返回降级数据]。以此缓解服务器资源的的压力,以保证核心业务的正常运行,同时也保持了客户和大部分客户的得到正确的相应。
异同:
相同点:
1、为了保证集群大部分服务的可用性和可靠性,防止崩溃,牺牲小我2、用户最终都是体验到某个功能不可用
不同点:
1、熔断是被调用方故障,触发的系统主动规则2、降级是基于全局考虑,停止一些正常服务,释放资源
什么是限流
对打入服务的请求流量进行控制,使服务能够承担不超过自己能力的流量压力。
9.2 Sentinel
9.2.1 Sentinel简介
官方文档:https://github.com/alibaba/Sentinel/wiki/%E4%BB%8B%E7%BB%8D
项目地址:https://github.com/alibaba/Sentinel
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,
从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
Sentinel 具有以下特征:
- 丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
- 完备的实时监控:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。
- 广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Apache Dubbo、gRPC、Quarkus 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。同时 Sentinel 提供 Java/Go/C++ 等多语言的原生实现。
- 完善的 SPI 扩展机制:Sentinel 提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。
Sentinel 分为两个部分:
- 核心库(Java 客户端)不依赖任何框架/库,能够运行于所有 Java 运行时环境,同时对 Dubbo / Spring Cloud 等框架也有较好的支持。
- 控制台(Dashboard)基于 Spring Boot 开发,打包后可以直接运行,不需要额外的 Tomcat 等应用容器。
Sentinel 基本概念
- 资源
资源是Sentinel 的关键概念。它可以是Java 应用程序中的任何内容,例如,由应用程序提供的服务,或由应用程序调用的其它应用提供的服务,甚至可以是一段代码。在接下来的文档中,我们都会用资源来描述代码块。
**只要通过Sentinel API 定义的代码,就是资源,能够被Sentinel 保护起来。**大部分情况下,
可以使用方法签名,URL,甚至服务名称作为资源名来标示资源。
- 规则
围绕资源的实时状态设定的规则,可以包括流量控制规则、熔断降级规则以及系统保护规
则。所有规则可以动态实时调整。
9.2.2 Hystrix 与Sentinel 比较
9.2.3 整合Feign+Sentinel 测试熔断降级
相关文档:https://github.com/alibaba/Sentinel/wiki/%E4%B8%BB%E9%A1%B5
Spring Boot/Spring Cloud整合 Sentinel
下面先在秒杀服务中整合测试使用sentinel。
- 1、初体验
①导入依赖【公共服务下】
<!--引入 sentinel -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
② 配置
秒杀服务
# sentinel有关配置
#可视化界面控制台
spring.cloud.sentinel.transport.dashboard=localhost:8333
#微服务和控制台之间传输数据的服务传输的端口号:默认8719
spring.cloud.sentinel.transport.port=8719
③下载并启动 sentinel控制台。
通过引入 的 sentinel依赖版本:
使用 sentinel 1.6.3版本的控制台。并启动。
④使用 http://seckill.gulimall.com/currentSeckillSkus
请求测试。
因为sentinel 控制台启用的是懒加载方式,所以我们需要先发送请求,控制台才会显示。
发送一次请求之后,控制台显示。
- 2、编写流控规则:
有两个地方可以编写:第一个:
第二个:
编写规则:每秒发送一次请求。
如果一秒内多次点击:报错。
- 3、补充效果:
- 01 控制台->实时监控显示图表数据:暴露当前web应用的所有端点信息。
引入依赖:
<!--引入sentinel的审计模块:监控springboot应用的实时健康信息等-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置:
#暴露web所有节点
management.endpoints.web.exposure.include=*
- 02 返回自定义的错误消息提示
公共服务 的 BizCodeEnume 添加错误码:
TOO_MANY_REQUEST(10002,"请求流量过大")
秒杀服务编写配置类:
@Configuration
public class SeckillSentinelConfig {
public SeckillSentinelConfig(){
WebCallbackManager.setUrlBlockHandler(new UrlBlockHandler() {
@Override
public void blocked(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, BlockException e) throws IOException {
R error = R.error(BizCodeEnume.TOO_MANY_REQUEST.getCode(), BizCodeEnume.TOO_MANY_REQUEST.getMsg());
httpServletResponse.setCharacterEncoding("UTF-8");//设置字符编码
httpServletResponse.setContentType("application/json");//文本内容使用JSON
httpServletResponse.getWriter().write(JSON.toJSONString(error));
}
});
}
}
测试错误提示:
接下来我们把全服务都引入 actuator统计审计信息
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置:
# sentinel有关配置
#可视化界面控制台
spring.cloud.sentinel.transport.dashboard=localhost:8333
#暴露web所有节点
management.endpoints.web.exposure.include=*
出现问题:order 服务提示 循环依赖问题:
解决:
订单服务的 MyRabbitConfig
将商城 业务测试一遍,控制台效果:
每个微服务都有图表信息:监控数据了。
具体文档参考:流量控制
- 4、熔断降级
参考文档
* 4、使用Sentinel来保护feign远程调用:熔断; * 1)、调用方的熔断保护:feign.sentinel.enabled=true * 2)、调用方手动指定远程服务的降级策略。远程服务被降级处理。触发我们的熔断回调方法 * 3)、超大流量的时候,必须牺牲一些远程服务。在服务的提供方(远程服务)指定降级策略; * 提供方是在运行,但是不运行自己的业务逻辑,返回的是默认的降级数据(限流的数据)。
给 商品服务加入以下配置
#开启远程熔断保护
feign.sentinel.enabled=true
测试商品服务和秒杀服务,这里商品服务作为调用方,秒杀服务作为提供方,商品服务远程调用 秒杀服务的这个请求:GET:http://gulimall-seckill/sku/seckill/{skuId}
①这里测试调用方的熔断保护
给 商品服务的秒杀远程接口编写一个 回调实现。
在降级类中实现对应的
feign接口
,并重写降级方法
@Slf4j
@Component
public class SeckillFeignServiceFallBack implements SeckillFeignService {
@Override
public R getSkuSeckillInfo(Long skuId) {
log.info("熔断方法调用...getSkuSeckillInfo");
return R.error(BizCodeEnume.TOO_MANY_REQUEST.getCode(),BizCodeEnume.TOO_MANY_REQUEST.getMsg());
}
}
feign的降级,在远程调用的类配置限流处理规则
在@FeignClient
设置fallback
属性
@FeignClient(value = "gulimall-seckill",fallback = SeckillFeignServiceFallBack.class)
public interface SeckillFeignService {
@GetMapping("/sku/seckill/{skuId}")
R getSkuSeckillInfo(@PathVariable("skuId") Long skuId);
}
效果:当秒杀服务停止后,商品服务如果调用秒杀服务,则直接返回编写的降级方法,返回错误状态码。
②测试 调用方手动降级远程服务。
在 商品服务的链路中,对需要调用的远程秒杀服务请求做降级。
上面的降级表示:当 1秒内点击调用该请求的次数超过5次,然后如果响应时间超过我们设置 的 1ms,在我们设置的 10s时间窗口内,调用该方法都会自动熔断。
**③ 测试服务提供方手动降级:**比如我们在网站流量很大的时候,可以在后台对一些不重要的服务做 降级策略,但是我们一般不会对 服务的提供方做降级策略。这里我们只是测试一下。
给秒杀服务的 http://seckill.gulimall.com/currentSeckillSkus
请求做降级处理,
效果就是:提供方是在运行,但是不运行自己的业务逻辑,返回的是默认的降级数据(限流的数据)。
调用方:叫熔断数据:调不过来了,熔断;
手动给业务方法降级:叫降级数据。
注意概念的区分。
因为 给调用方手动降级也是很方便的,那就给全服务都开启sentinel的feign代理功能。【网关服务不要引入,否则报错】
feign.sentinel.enabled=true
此外,我们给全服务都加上 我们自己配置的 sentinel 错误数据。
@Configuration
public class SeckillSentinelConfig {
public SeckillSentinelConfig(){
WebCallbackManager.setUrlBlockHandler(new UrlBlockHandler() {
@Override
public void blocked(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, BlockException e) throws IOException {
R error = R.error(BizCodeEnume.TOO_MANY_REQUEST.getCode(), BizCodeEnume.TOO_MANY_REQUEST.getMsg());
httpServletResponse.setCharacterEncoding("UTF-8");//设置字符编码
httpServletResponse.setContentType("application/json");//文本内容使用JSON
httpServletResponse.getWriter().write(JSON.toJSONString(error));
}
});
}
}
这样我们全服务如果被sentinel限流或者降级之后,返回的都是我们自己设置的JSON串。
- 5、自定义受保护的资源
我们定义资源主要使用的方式有:
①主流框架的默认适配
②抛出异常的方式定义资源
③注解方式定义资源
之前使用的是框架的默认适配,现在我们介绍 后面两种常用的方式。
* 5、自定义受保护的资源 * 1)、try (Entry entry = SphU.entry("seckillSkus")) { * //业务逻辑 * }catch(Exception e){ * * } * * 2)、基于注解。 * @SentinelResource(value = "getCurrentSeckillSkusResource",blockHandler = "blockHandler") * * 无论是1,2方式一定要配置被限流以后的默认返回。 * url请求可以设置统一返回:SeckillSentinelConfig 即:每一个自定义的资源需要单独的设置限流返回数据; 而所有的url请求则不用,因为会被自动扫描到,而且我们又自己编写了一个处理限流的类。
抛出异常的方式定义资源
以秒杀服务为例,修改一段代码测试:
可以为我们自定义的资源进行流控规则配置。
一秒内多次访问,效果:
此外,还可以使用注解的方式进行资源的自定义,不过没有上面这种方式方便。
使用注解:
Sentinel 支持通过
@SentinelResource
注解定义资源并配置blockHandler
和fallback
函数来进行限流之后的处理。
效果展示:
可以看到 两个资源都显示了。配置流控规则:
一秒内多次发送请求:
- 6、网关流控
ps:避坑指南:这里之前全服务引入 sentinel 开启对 feign的管理之后,网关服务启动报错,所以将 网关服务中的这一段配置注释掉。
feign.sentinel.enabled=true
参考文档:https://github.com/alibaba/Sentinel/wiki/%E7%BD%91%E5%85%B3%E9%99%90%E6%B5%81
①网关服务引入依赖:
<!-- 引入 sentinel跟网管层的整合 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-sentinel-gateway</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
测试:这里需要使用 1.7.1版本的sentinel控制台,这样才能看到效果。
随便测试一个请求:http://seckill.gulimall.com/currentSeckillSkus
可以看到网关这里就有了相应的簇点链路:
②对秒杀服务做网关层面的流控规则配置:
测试一秒内多次请求:
例如,我们对秒杀服务进行更加精确的流控管理:
使用postman测试:
返回错误信息。
③自定义网关流控返回。
网关服务编写配置类:
@Configuration
public class SentinelGatewayConfig {
// TODO 响应式编程
public SentinelGatewayConfig() {
GatewayCallbackManager.setBlockHandler(new BlockRequestHandler() {
//网关限流了请求,就会调用此回调 Mono Flux
@Override
public Mono<ServerResponse> handleRequest(ServerWebExchange serverWebExchange, Throwable throwable) {
R error = R.error(BizCodeEnume.TOO_MANY_REQUEST.getCode(), BizCodeEnume.TOO_MANY_REQUEST.getMsg());
String errJson = JSON.toJSONString(error);
Mono<ServerResponse> body = ServerResponse.ok().body(Mono.just(errJson), String.class);
return body;
}
});
}
}
流控规则配置:
postman测试:
1秒内多次发送请求:
- 7、Sleuth+Zipkin 服务链路追踪
-
为什么用
微服务架构是一个分布式架构,它按业务划分服务单元,一个分布式系统往往有很多个服务单元。由于服务单元数量众多,业务的复杂性,如果出现了错误和异常,很难去定位。主要体现在,一个请求可能需要调用很多个服务,而内部服务的调用复杂性,决定了问题难以定位。所以微服务架构中,必须实现分布式链路追踪,去跟进一个请求到底有哪些服务参与,参与的顺序又是怎样的,从而达到每个请求的步骤清晰可见,出了问题,很快定位。
链路追踪组件有Google 的Dapper,Twitter 的Zipkin,以及阿里的Eagleeye (鹰眼)等,它们都是非常优秀的链路追踪开源组件。 -
基本术语
-
Span(跨度):基本工作单元,发送一个远程调度任务就会产生一个Span,Span 是一
个64 位ID 唯一标识的,Trace 是用另一个64 位ID 唯一标识的,Span 还有其他数据信
息,比如摘要、时间戳事件、Span 的ID、以及进度ID。 -
Trace(跟踪):一系列Span 组成的一个树状结构。请求一个微服务系统的API 接口,
这个API 接口,需要调用多个微服务,调用每个微服务都会产生一个新的Span,所有
由这个请求产生的Span 组成了这个Trace。 -
Annotation(标注):用来及时记录一个事件的,一些核心注解用来定义一个请求的开
始和结束。这些注解包括以下:-
cs - Client Sent -客户端发送一个请求,这个注解描述了这个Span 的开始
-
sr - Server Received -服务端获得请求并准备开始处理它,如果将其sr 减去cs 时间戳
便可得到网络传输的时间。 -
ss - Server Sent (服务端发送响应)–该注解表明请求处理的完成(当请求返回客户
端),如果ss 的时间戳减去sr 时间戳,就可以得到服务器请求的时间。 -
cr - Client Received (客户端接收响应)-此时Span 的结束,如果cr 的时间戳减去
cs 时间戳便可以得到整个请求所消耗的时间。
官方文档:
https://cloud.spring.io/spring-cloud-static/spring-cloud-sleuth/2.1.3.RELEASE/single/spring-cloud-sleuth.html -
-
如果服务调用顺序如下
那么用以上概念完整的表示出来如下:
Span 之间的父子关系如下:
- 整合Sleuth
①引入依赖:(公共服务)
<!--服务提供者与消费者导入依赖-->
<!--链路追踪 sleuth<-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
②打开debug 日志
这里以商品服务为例:
#开启debug日志
logging.level.org.springframework.cloud.openfeign=debug
logging.level.org.springframework.cloud.sleuth=debug
③商品服务详情页刷新
控制台显示:
发起一次远程调用,观察控制台
DEBUG [user-service,541450f08573fff5,541450f08573fff5,false]
user-service:服务名
541450f08573fff5:是TranceId,一条链路中,只有一个TranceId
541450f08573fff5:是spanId,链路中的基本工作单元id
false:表示是否将数据输出到其他服务,true 则会把信息输出到其他可视化的服务上观察
- 整合
zipkin
可视化观察
通过Sleuth 产生的调用链监控信息,可以得知微服务之间的调用链路,但监控信息只输出到控制台不方便查看。我们需要一个图形化的工具-zipkin。Zipkin 是Twitter 开源的分布式跟踪系统,主要用来收集系统的时序数据,从而追踪系统的调用问题。zipkin 官网地址如下:
https://zipkin.io/
①docker 安装zipkin 服务器
docker run -d -p 9411:9411 openzipkin/zipkin
②导入
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
zipkin 依赖也同时包含了sleuth,可以省略sleuth 的引用【公共服务引入该依赖,使得全服务通用。如果启动报错,就单独给每个服务引入。】
③添加zipkin 相关配置
spring:
application:
name: user-service
zipkin:
base-url: http://192.168.56.10:9411/ # zipkin 服务器的地址
# 关闭服务发现,否则Spring Cloud 会把zipkin 的url 当做服务名称
discoveryClientEnabled: false
sender:
type: web # 设置使用http 的方式传输数据
sleuth:
sampler:
probability: 1 # 设置抽样采集率为100%,默认为0.1,即10%
或者 使用 properties方式配置【全服务配置】
#服务追踪
# zipkin 服务器的地址
spring.zipkin.base-url=http://192.168.56.10:9411/
# 关闭服务发现,否则Spring Cloud 会把zipkin 的url 当做服务名称
spring.zipkin.discovery-client-enabled=false
# 设置使用http 的方式传输数据
spring.zipkin.sender.type=web
# 设置抽样采集率为100%,默认为0.1,即10%
spring.sleuth.sampler.probability=1
④发送远程请求,测试zipkin。
服务调用链追踪信息统计
其中可以看到请求的方式,请求时间,异步等信息
此外,还可以查询依赖。
5. Zipkin 数据持久化
Zipkin 默认是将监控数据存储在内存的,如果Zipkin 挂掉或重启的话,那么监控数据就会丢失。所以如果想要搭建生产可用的Zipkin,就需要实现监控数据的持久化。而想要实现数据持久化,自然就是得将数据存储至数据库。好在Zipkin 支持将数据存储至:
- 内存(默认)
- MySQL
- Elasticsearch
- Cassandra
Zipkin 数据持久化相关的官方文档地址如下:
https://github.com/openzipkin/zipkin#storage-component
Zipkin 支持的这几种存储方式中,内存显然是不适用于生产的,这一点开始也说了。而使用MySQL 的话,当数据量大时,查询较为缓慢,也不建议使用。Twitter 官方使用的是Cassandra作为Zipkin 的存储数据库,但国内大规模用Cassandra 的公司较少,而且Cassandra 相关文档也不多。
综上,故采用Elasticsearch 是个比较好的选择,关于使用Elasticsearch 作为Zipkin 的存储数
据库的官方文档如下:
elasticsearch-storage:
https://github.com/openzipkin/zipkin/tree/master/zipkin-server#elasticsearch-storage
zipkin-storage/elasticsearch
https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/elasticsearch
通过docker 的方式
docker run --env STORAGE_TYPE=elasticsearch --env ES_HOSTS=192.168.56.10:9200
openzipkin/zipkin-dependencies
使用es 时Zipkin Dependencies 支持的环境变量
9.2.4 测试启动中出现的问题
-
- 在公共服务中引入sentinel依赖,但是在启动 第三方服务中的时候,出现如下错误:
The Bean Validation API is on the classpath but no implementation could be found
出现这个原因,网上查看了下原因:版本不兼容问题。
解决方案
因为第三方微服务中暂时用不到校验,所以我们可以把校验依赖排除掉。在引入的common服务中可以排除validation-api
这个依赖。
<dependency>
<groupId>com.atguigu.gulimall</groupId>
<artifactId>gulimall-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</exclusion>
</exclusions>
</dependency>
-
- 项目启动之后秒杀服务中出现空指针异常,我们可以在出现空指针异常的地方加入一个判断,就是当他不为空的时候才进行一下操作。
java.lang.NullPointerException: null
at com.atguigu.gulimall.seckill.service.impl.SeckillServiceImpl.saveSessionInfo(SeckillServiceImpl.java:238) ~[classes/:na]
at com.atguigu.gulimall.seckill.service.impl.SeckillServiceImpl.uploadSeckillSkuLatest3Days(SeckillServiceImpl.java:76) ~[classes/:na]
at com.atguigu.gulimall.seckill.scheduled.SeckillSkuScheduled.uploadSeckillSkuLatest3Days(SeckillSkuScheduled.java:52) ~[classes/:na]
at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131]
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) [spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_131]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
报错解决。