文章目录
- 谷粒商城
- ElasticSearch
- 一、基本概念
- 1、Index(索引)
- 2、Type(类型)
- 3、Document(文档)
- 4、倒排索引机制
- 二、Docker 安装 Es
- 1、下载镜像文件
- 2、创建实例
- 三、初步检索
- 1、_cat
- 2、索引一个文档(保存)
- 3、查询文档
- 4、更新文档
- 5、删除文档&索引
- 6、bulk 批量 API
- 7、样本测试数据
- 四、进阶检索
- 1、SearchAPI
- 2、Query DSL
- 1)、基本语法格式
- 2)、返回部分字段
- 3)、match【匹配查询】
- 4)、match_phrase【短语匹配】
- 5)、multi_match【多字段匹配】
- 6)、bool【复合查询】
- 7)、filter【结果过滤】
- 8)、term
- 9)、aggregations(执行聚合)
- 3、Mapping
- 1)、字段类型
- 2)、映射
- 3)、新版本改变
- 4、分词
- 1)、安装 ik 分词器
- 2)、测试分词器
- 3)、自定义词库
- 五、Elasticsearch-Rest-Client
- 六、附录-安装 nginx
- 性能与压力测试
- 一、性能监控
- 1、jvm 内存模型
- 2、堆
- 3、jconsole 与 jvisualvm
- 4、监控指标
- 5、JVM 分析&调优
- 二、压力测试
- 1、性能指标
- 2、JMeter
- 1、JMeter 安装
- 2、JMeter 压测示例
- 3、JMeter Address Already in use 错误解决
- 缓存&分布式锁
- 一、缓存
- 1、缓存使用
- 2、整合 redis 作为缓存
- 二、缓存失效问题
- 三、缓存数据一致性
- 1、双写模式
- 2、失效模式
- 解决方案
- 四、分布式锁
- 1、分布式锁与本地锁
- 2、分布式锁实现
- 3、Redisson 完成分布式锁
- 五、Spring Cache
- 1、简介
- 2、基础概念
- 3、注解
- 4、表达式语法
- 5、使用
- 异步&线程池
- 一、线程回顾
- 1、初始化线程的 4 种方式
- 2、线程池的七大参数
- 3、常见的 4 种线程池
- 4、开发中为什么使用线程池
- 二、CompletableFuture 异步编排
- 1、创建异步对象
- 2、计算完成时回调方法
- 3、handle 方法
- 4、线程串行化方法
- 5、两任务组合 - 都要完成
- 6、两任务组合 - 一个完成
- 7、多任务组合
- 社交登陆&单点登陆
- 一、社交登陆
- 1、OAuth2.0
- 2、微博登陆准备工作
- 3、微博登陆测试
- 二、SSO(单点登陆)
- 0、前置概念:
- 1)、单点登录业务介绍
- 2)、几个基本概念
- 1、Cookie 接入方式
- 2、Token 接入方式
- 3、有状态登录
- 4、无状态登录
- 5、集成社交登陆
- 三、JWT
- 1、简介
- 2、数据格式
- 3、交互流程
- 4、授权中心流程
- 5、JWT 优势
- 6、使用 JWT 带来的问题
- RbaaitMQ
- 消息中间件
- 一、概述
- 二、RabbitMQ概念
- 三、Docker安装RabbitMQ
- 四、RabbitMQ运行机制
- 五、RabbitMQ整合
- 六、RabbitMQ消息确认机制-可靠抵达
- 可靠抵达-ConfirmCallback
- 可靠抵达-ReturnCallback
- 可靠抵达-Ack消息确认机制
- 七、RabbitMQ延时队列(实现定时任务)
- 消息的TTL(Time To Live)
- Dead Letter Exchanges(DLX)
- 延时队列实现-1
- 延时队列实现-2
- SpringBoot中使用延时队列
- 如何保证消息可靠性-消息丢失
- 如何保证消息可靠性-消息重复
- 如何保证消息可靠性-消息积压
- 接口幂等性
- 一、什么是幂等性
- 二、哪些情况需要防止
- 三、什么情况下需要幂等
- 四、幂等解决方案
- 1、token 机制
- 2、各种锁机制
- 1、数据库悲观锁
- 2、数据库乐观锁
- 3、业务层分布式锁
- 3、各种唯一约束
- 1、数据库唯一约束
- 2、redis set 防重
- 4、防重表
- 5、全局请求唯一 id
- 分布式事务
- 一、本地事务
- 1、事务的基本性质
- 2、事务的隔离级别
- 3、事务的传播行为
- 4、SpringBoot 事务关键点
- 二、分布式事务
- 1、为什么有分布式事务
- 2、CAP 定理与 BASE 理论
- 1、CAP 定理
- 2、面临的问题
- 3、BASE 理论
- 4、强一致性、弱一致性、最终一致性
- 3、分布式事务几种方案
- 1)、2PC 模式
- 2)、柔性事务-TCC 事务补偿型方案
- 3)、柔性事务-最大努力通知型方案
- 4)、柔性事务-可靠消息+最终一致性方案(异步确保型)
- 定时任务
- 一、定时任务
- 1、cron 表达式
- 2、cron 示例
- 3、SpringBoot 整合
- 二、分布式定时任务
- 1、定时任务问题
- 2、扩展-分布式调度
- Sentinel
- 1、简介
- 1、熔断降级限流
- 2、Sentinel 简介
- 集群
谷粒商城
ElasticSearch
一、基本概念
1、Index(索引)
动词,相当于 MySQL 中的 insert;
名词,相当于 MySQL 中的 Database
2、Type(类型)
在 Index(索引)中,可以定义一个或多个类型。
类似于 MySQL 中的 Table;每一种类型的数据放在一起;
3、Document(文档)
保存在某个索引(Index)下,某种类型(Type)的一个数据(Document),文档是 JSON 格式的,Document 就像是 MySQL 中的某个 Table 里面的内容;
4、倒排索引机制
二、Docker 安装 Es
1、下载镜像文件
docker pull elasticsearch:7.4.2
存储和检索数据
docker pull kibana:7.4.2
可视化检索数据
2、创建实例
1、ElasticSearch
mkdir -p /mydata/elasticsearch/config
mkdir -p /mydata/elasticsearch/data
echo "http.host: 0.0.0.0" >> /mydata/elasticsearch/config/elasticsearch.yml
chmod -R 777 /mydata/elasticsearch/ 保证权限
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 \
-e "discovery.type=single-node" \
-e ES_JAVA_OPTS="-Xms64m -Xmx512m" \
-v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \
-v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.4.2
以后再外面装好插件重启即可;
特别注意:
-e ES_JAVA_OPTS="-Xms64m -Xmx256m" \ # 测试环境下,设置 ES 的初始内存和最大内存,否则导致过大启动不了 ES
2、Kibana
docker run --name kibana -e ELASTICSEARCH_HOSTS=http://192.168.56.10:9200 -p 5601:5601 -d kibana:7.4.2
# http://192.168.56.10:9200 一定改为自己虚拟机的地址
三、初步检索
1、_cat
GET /_cat/nodes:查看所有节点
GET /_cat/health:查看 es 健康状况
GET /_cat/master:查看主节点
GET /_cat/indices:查看所有索引 show databases;
2、索引一个文档(保存)
保存一个数据,保存在哪个索引的哪个类型下,指定用哪个唯一标识
PUT customer/external/1;在 customer 索引下的 external 类型下保存 1 号数据为
PUT customer/external/1
{
"name": "John Doe"
}
PUT 和 POST 都可以,
POST 新增。如果不指定 id,会自动生成 id。指定 id 就会修改这个数据,并新增版本号PUT 可以新增可以修改。PUT 必须指定 id;由于 PUT 需要指定 id,我们一般都用来做修改操作,不指定 id 会报错。
3、查询文档
GET customer/external/1
结果:
{
"_index": "customer",
//在哪个索引
"_type": "external",
//在哪个类型
"_id": "1",
//记录 id
"_version": 2,
//版本号
"_seq_no": 1,
//并发控制字段,每次更新就会+1,用来做乐观锁
"_primary_term": 1,
//同上,主分片重新分配,如重启,就会变化
"found": true,
"_source": {
//真正的内容
"name": "John Doe"
}
}
更新携带 ?if_seq_no=0&if_primary_term=1
4、更新文档
POST customer/external/1/_update
{
"doc":{
"name": "John Doew"
}
}
或者
POST customer/external/1
{
"name": "John Doe2"
}
或者
PUT customer/external/1
{
"name": "John Doe"
}
不同:
-
POST 操作会对比源文档数据,如果相同不会有什么操作,文档 version 不增加PUT 操作总会将数据重新保存并增加 version 版本;
-
带_update 对比元数据如果一样就不进行任何操作。
-
看场景;
- 对于大并发更新,不带 update;
- 对于大并发查询偶尔更新,带 update;对比更新,重新计算分配规则。
新同时增加属性
POST customer/external/1/_update
{
"doc": { "name": "Jane Doe", "age": 20 }
}
PUT 和 POST 不带_update 也可以
5、删除文档&索引
DELETE customer/external/1
DELETE customer
6、bulk 批量 API
POST customer/external/_bulk
{"index":{"_id":"1"}}
{"name": "John Doe" }
{"index":{"_id":"2"}}
{"name": "Jane Doe" }
语法格式:
{ action: { metadata }}\n
{ request body
}\n
{ action: { metadata }}\n
{ request body
}\n
复杂实例:
POST /_bulk
{ "delete": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "create": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title":
"My first blog post" }
{ "index":
{ "_index": "website", "_type": "blog" }}
{ "title":
"My second blog post" }
{ "update": { "_index": "website", "_type": "blog", "_id": "123", "_retry_on_conflict" : 3} }
{ "doc" : {"title" : "My updated blog post"} }
bulk API 以此按顺序执行所有的 action(动作)。如果一个单个的动作因任何原因而失败,它将继续处理它后面剩余的动作。当 bulk API 返回时,它将提供每个动作的状态(与发送的顺序相同),所以您可以检查是否一个指定的动作是不是失败了。
7、样本测试数据
我准备了一份顾客银行账户信息的虚构的 JSON 文档样本。每个文档都有下列的 schema(模式):
{
"account_number": 0,
"balance": 16623,
"firstname": "Bradshaw",
"lastname": "Mckenzie",
"age": 29,
"gender": "F",
"address": "244 Columbus Place",
"employer": "Euron",
"email": "bradshawmckenzie@euron.com",
"city": "Hobucken",
"state": "CO"
}
https://github.com/elastic/elasticsearch/blob/master/docs/src/test/resources/accounts.json?raw=true
导入测试数据
POST bank/account/_bulk
测试数据
四、进阶检索
1、SearchAPI
ES 支持两种基本方式检索 :
- 一个是通过使用 REST request URI 发送搜索参数(uri+检索参数)
- 另一个是通过使用 REST request body 来发送它们(uri+请求体)
1)、检索信息
一切检索从_search 开始
GET bank/_search # 检索 bank 下所有信息,包括 type 和 docs
GET bank/_search?q=*&sort=account_number:asc # 请求参数方式检索
响应结果解释:
took - Elasticsearch 执行搜索的时间(毫秒)
time_out - 告诉我们搜索是否超时
_shards - 告诉我们多少个分片被搜索了,以及统计了成功/失败的搜索分片
hits - 搜索结果
hits.total - 搜索结果
hits.hits - 实际的搜索结果数组(默认为前 10 的文档)
sort - 结果的排序 key(键)(没有则按 score 排序)
score 和 max_score –相关性得分和最高得分(全文检索用)
uri+请求体进行检索
GET bank/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"account_number": {
"order": "desc"
}
}
]
}
HTTP 客户端工具(POSTMAN),get 请求不能携带请求体,我们变为 post 也是一样的我们 POST 一个 JSON 风格的查询请求体到 _search API。
需要了解,一旦搜索的结果被返回,Elasticsearch 就完成了这次请求,并且不会维护任何服务端的资源或者结果的 cursor(游标)
2、Query DSL
1)、基本语法格式
Elasticsearch 提供了一个可以执行查询的 Json 风格的 DSL(domain-specific language 领域特定语言)。这个被称为 Query DSL。该查询语言非常全面,并且刚开始的时候感觉有点复杂,真正学好它的方法是从一些基础的示例开始的。
- 一个查询语句 的典型结构
{
QUERY_NAME: {
ARGUMENT: VALUE,
ARGUMENT: VALUE,...
}
}
- 如果是针对某个字段,那么它的结构如下:
{
QUERY_NAME: {
FIELD_NAME: {
ARGUMENT: VALUE,
ARGUMENT: VALUE,...
}
}
}
GET bank/_search
{
"query": {
"match_all": {}
},
"from": 0,
"size": 5,
"sort": [
{
"account_number": {
"order": "desc"
}
}
]
}
- query 定义如何查询,
- match_all 查询类型【代表查询所有的所有】,es 中可以在 query 中组合非常多的查询类型完成复杂查询
- 除了 query 参数之外,我们也可以传递其它的参数以改变查询结果。如 sort,size
- from+size 限定,完成分页功能
- sort 排序,多字段排序,会在前序字段相等时后续字段内部排序,否则以前序为准
2)、返回部分字段
GET bank/_search
{
"query": {
"match_all": {}
},
"from": 0,
"size": 5,
"_source": ["age","balance"]
}
3)、match【匹配查询】
基本类型(非字符串),精确匹配
GET bank/_search
{
"query": {
"match": {
"account_number": "20"
}
}
}
// match 返回 account_number=20 的
字符串,全文检索
GET bank/_search
{
"query": {
"match": {
"address": "mill"
}
}
}
// 最终查询出 address 中包含 mill 单词的所有记录
// match 当搜索字符串类型的时候,会进行全文检索,并且每条记录有相关性得分。
字符串,多个单词(分词+全文检索)
GET bank/_search
{
"query": {
"match": {
"address": "mill road"
}
}
}
// 最终查询出 address 中包含 mill 或者 road 或者 mill road 的所有记录,并给出相关性得分
4)、match_phrase【短语匹配】
将需要匹配的值当成一个整体单词(不分词)进行检索
GET bank/_search
{
"query": {
"match_phrase": {
"address": "mill road"
}
}
}
// 查出 address 中包含 mill road 的所有记录,并给出相关性得分
5)、multi_match【多字段匹配】
GET bank/_search
{
"query": {
"multi_match": {
"query": "mill",
"fields": ["state","address"]
}
}
}
// state 或者 address 包含 mill
6)、bool【复合查询】
bool 用来做复合查询:
复合语句可以合并 任何 其它查询语句,包括复合语句,了解这一点是很重要的。这就意味着,复合语句之间可以互相嵌套,可以表达非常复杂的逻辑。
must:必须达到 must 列举的所有条件
GET bank/_search
{
"query": {
"bool": {
"must": [
{ "match": { "address": "mill" } },
{ "match": { "gender": "M" } }
]
}
}
}
should:应该达到 should 列举的条件,如果达到会增加相关文档的评分,并不会改变查询的结果。如果 query 中只有 should 且只有一种匹配规则,那么 should 的条件就会被作为默认匹配条件而去改变查询结果
GET bank/_search
{
"query": {
"bool": {
"must": [
{ "match": { "address": "mill" } },
{ "match": { "gender": "M" } }
],
"should": [
{"match": { "address": "lane" }}
]
}
}
}
must_not 必须不是指定的情况
GET bank/_search
{
"query": {
"bool": {
"must": [
{ "match": { "address": "mill" } },
{ "match": { "gender": "M" } }
],
"should": [
{"match": { "address": "lane" }}
],
"must_not": [
{"match": { "email": "baluba.com" }}
]
}
}
}
// address 包含 mill,并且 gender 是 M,如果 address 里面有 lane 最好不过,但是 email 必须不包含 baluba.com
7)、filter【结果过滤】
并不是所有的查询都需要产生分数,特别是那些仅用于 “filtering”(过滤)的文档。为了不计算分数 Elasticsearch 会自动检查场景并且优化查询的执行。
GET bank/_search
{
"query": {
"bool": {
"must": [
{"match": { "address": "mill"}}
],
"filter": {
"range": {
"balance": {
"gte": 10000,
"lte": 20000
}
}
}
}
}
}
8)、term
和 match 一样。匹配某个属性的值。全文检索字段用 match,其他非 text 字段匹配用 term。
GET bank/_search
{
"query": {
"bool": {
"must": [
{"term": {
"age": {
"value": "28"
}
}},
{"match": {
"address": "990 Mill Road"
}}
]
}
}
}
9)、aggregations(执行聚合)
聚合提供了从数据中分组和提取数据的能力。最简单的聚合方法大致等于 SQL GROUP BY 和 SQL 聚合函数。在 Elasticsearch 中,您有执行搜索返回 hits(命中结果),并且同时返回聚合结果,把一个响应中的所有 hits(命中结果)分隔开的能力。这是非常强大且有效的,您可以执行查询和多个聚合,并且在一次使用中得到各自的(任何一个的)返回结果,使用一次简洁和简化的 API 来避免网络往返。
搜索 address 中包含 mill 的所有人的年龄分布以及平均年龄,但不显示这些人的详情。
GET bank/_search
{
"query": {
"match": {
"address": "mill"
}
},
"aggs": {
"group_by_state": {
"terms": {
"field": "age"
}
},
"avg_age": {
"avg": {
"field": "age"
}
}
},
"size": 0
}
// size:0 不显示搜索数据
// aggs:执行聚合。聚合语法如下
"aggs": {
"aggs_name 这次聚合的名字,方便展示在结果集中": {
"AGG_TYPE 聚合的类型(avg,term,terms)": {}
}
},
复杂:
按照年龄聚合,并且请求这些年龄段的这些人的平均薪资
GET bank/account/_search
{
"query": {
"match_all": {}
},
"aggs": {
"age_avg": {
"terms": {
"field": "age",
"size": 1000
},
"aggs": {
"banlances_avg": {
"avg": {
"field": "balance"
}
}
}
}
},
"size": 1000
}
复杂:查出所有年龄分布,并且这些年龄段中 M 的平均薪资和 F 的平均薪资以及这个年龄段的总体平均薪资
GET bank/account/_search
{
"query": {
"match_all": {}
},
"aggs": {
"age_agg": {
"terms": {
"field": "age",
"size": 100
},
"aggs": {
"gender_agg": {
"terms": {
"field": "gender.keyword",
"size": 100
},
"aggs": {
"balance_avg": {
"avg": {
"field": "balance"
}
}
}
},
"balance_avg":{
"avg": {
"field": "balance"
}
}
}
}
}
,
"size": 1000
}
3、Mapping
1)、字段类型
2)、映射
Mapping(映射)
Mapping 是用来定义一个文档(document),以及它所包含的属性(field)是如何存储和索引的。比如,使用 mapping 来定义:
- 哪些字符串属性应该被看做全文本属性(full text fields)。
- 哪些属性包含数字,日期或者地理位置。
- 文档中的所有属性是否都能被索引(_all 配置)。
- 日期的格式。
- 自定义映射规则来执行动态添加属性。
查看 mapping 信息:
GET bank/_mapping
修改 mapping 信息
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html
自动猜测的映射类型
3)、新版本改变
Es7 及以上移除了 type 的概念。
-
关系型数据库中两个数据表示是独立的,即使他们里面有相同名称的列也不影响使用,但 ES 中不是这样的。elasticsearch 是基于 Lucene 开发的搜索引擎,而 ES 中不同 type下名称相同的 filed 最终在 Lucene 中的处理方式是一样的。
- 两个不同 type 下的两个 user_name,在 ES 同一个索引下其实被认为是同一个 filed,你必须在两个不同的 type 中定义相同的 filed 映射。否则,不同 type 中的相同字段名称就会在处理中出现冲突的情况,导致 Lucene 处理效率下降。
- 去掉 type 就是为了提高 ES 处理数据的效率。
-
Elasticsearch 7.x
- URL 中的 type 参数为可选。比如,索引一个文档不再要求提供文档类型。
-
Elasticsearch 8.x
- 不再支持 URL 中的 type 参数。
-
解决:
1)、将索引从多类型迁移到单类型,每种类型文档一个独立索引
2)、将已存在的索引下的类型数据,全部迁移到指定位置即可。详见数据迁移
1、创建索引并指定映射
PUT /my-index
{
"mappings": {
"properties": {
"age":
{ "type": "integer" },
"email":
{ "type": "keyword"
},
"name":
{ "type": "text"
}
}
}
}
2、添加新的字段映射
PUT /my-index/_mapping
{
"properties": {
"employee-id": {
"type": "keyword",
"index": false
}
}
}
3、更新映射
对于已经存在的映射字段,我们不能更新。更新必须创建新的索引进行数据迁移
4、数据迁移
先创建出 new_twitter 的正确映射。然后使用如下方式进行数据迁移
POST _reindex
[固定写法]
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter"
}
}
// 将旧索引的 type 下的数据进行迁移
POST _reindex
{
"source": {
"index": "twitter",
"type": "tweet"
},
"dest": {
"index": "tweets"
}
}
4、分词
一个 tokenizer(分词器)接收一个字符流,将之分割为独立的 tokens(词元,通常是独立的单词),然后输出 tokens 流。
例如,whitespace tokenizer 遇到空白字符时分割文本。它会将文本 “Quick brown fox!” 分割为 [Quick, brown, fox!]。
该 tokenizer(分词器)还负责记录各个 term(词条)的顺序或 position 位置(用于 phrase 短语和 word proximity 词近邻查询),以及 term(词条)所代表的原始 word(单词)的 start(起始)和 end(结束)的 character offsets(字符偏移量)(用于高亮显示搜索的内容)。
Elasticsearch 提供了很多内置的分词器,可以用来构建 custom analyzers(自定义分词器)。
1)、安装 ik 分词器
注意:不能用默认 elasticsearch-plugin install xxx.zip 进行自动安装
https://github.com/medcl/elasticsearch-analysis-ik/releases?after=v6.4.2 对应 es 版本安装
# 进入 es 容器内部 plugins 目录
docker exec -it 容器 id /bin/bash
wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.4.2/elasticsearch-analysis-ik-7.4.2.zip
unzip 下载的文件
rm –rf *.zip
mv elasticsearch/ ik
# 可以确认是否安装好了分词器
cd ../bin
elasticsearch plugin list:即可列出系统的分词器
2)、测试分词器
// 使用默认
POST _analyze
{
"text": "我是中国人"
}
请观察结果
// 使用分词器
POST _analyze
{
"analyzer": "ik_smart",
"text": "我是中国人"
}
请观察结果
// 另外一个分词器
ik_max_word
POST _analyze
{
"analyzer": "ik_max_word",
"text": "我是中国人"
}
请观察结果
能够看出不同的分词器,分词有明显的区别,所以以后定义一个索引不能再使用默认的 mapping 了,要手工建立 mapping, 因为要选择分词器。
3)、自定义词库
修改/usr/share/elasticsearch/plugins/ik/config/中的 IKAnalyzer.cfg.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 -->
<entry key="ext_dict"></entry>
<!--用户可以在这里配置自己的扩展停止词字典-->
<entry key="ext_stopwords"></entry>
<!--用户可以在这里配置远程扩展字典 -->
<entry key="remote_ext_dict">http://192.168.128.130/fenci/myword.txt</entry>
<!--用户可以在这里配置远程扩展停止词字典-->
<!-- <entry key="remote_ext_stopwords">words_location</entry> -->
</properties>
原来的 xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 -->
<entry key="ext_dict"></entry>
<!--用户可以在这里配置自己的扩展停止词字典-->
<entry key="ext_stopwords"></entry>
<!--用户可以在这里配置远程扩展字典 -->
<!-- <entry key="remote_ext_dict">words_location</entry> -->
<!--用户可以在这里配置远程扩展停止词字典-->
<!-- <entry key="remote_ext_stopwords">words_location</entry> -->
</properties
按照标红的路径利用 nginx 发布静态资源,按照请求路径,创建对应的文件夹以及文件,放在nginx 的 html 下
然后重启 es 服务器,重启 nginx。
在 kibana 中测试分词效果
更新完成后,es 只会对新增的数据用新词分词。历史数据是不会重新分词的。如果想要历史数据重新分词。需要执行:
POST my_index/_update_by_query?conflicts=proceed
五、Elasticsearch-Rest-Client
1)、9300:TCP
- spring-data-elasticsearch:transport-api.jar;
- springboot 版本不同, transport-api.jar 不同,不能适配 es 版本
- 7.x 已经不建议使用,8 以后就要废弃
2)、9200:HTTP
- JestClient:非官方,更新慢
- RestTemplate:模拟发 HTTP 请求,ES 很多操作需要自己封装,麻烦
- HttpClient:同上
- Elasticsearch-Rest-Client:官方 RestClient,封装了 ES 操作,API 层次分明,上手简单
最终选择 Elasticsearch-Rest-Client(elasticsearch-rest-high-level-client)
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html
1、SpringBoot 整合
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.2</version>
</dependency>
2、配置
@Bean
RestHighLevelClient client() {
RestClientBuilder builder = RestClient.builder(new HttpHost("192.168.56.10", 9200, "http"));
return new RestHighLevelClient(builder);
}
3、使用
参照官方文档:
@Test
void test1() throws IOException {
Product product = new Product();
product.setSpuName("华为");
product.setId(10L);
IndexRequest request = new IndexRequest("product").id("20")
.source("spuName","华为","id",20L);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
System.out.println(request.toString());
IndexResponse response2 = client.index(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
}
}
六、附录-安装 nginx
-
随便启动一个 nginx 实例,只是为了复制出配置
-
docker run -p 80:80 --name nginx -d nginx:1.10
-
-
将容器内的配置文件拷贝到当前目录:docker container cp nginx:/etc/nginx .
- 修改文件名称:mv nginx conf
- 把这个 conf 移动到/mydata/nginx
- 终止原容器:docker stop nginx
- 执行命令删除原容器:docker rm nginx
-
创建新的 nginx;执行以下命令
docker run -p 80:80 --name nginx \
-v /mydata/nginx/html:/usr/share/nginx/html \
-v /mydata/nginx/logs:/var/log/nginx \
-v /mydata/nginx/conf:/etc/nginx \
-d nginx:1.10
给 nginx 的 html 下面放的所有资源可以直接访问;
性能与压力测试
一、性能监控
1、jvm 内存模型
-
程序计数器 Program Counter Register:
-
记录的是正在执行的虚拟机字节码指令的地址,
-
此内存区域是唯一一个在 JAVA 虚拟机规范中没有规定任何OutOfMemoryError 的区域
-
-
虚拟机:VM Stack
- 描述的是 JAVA 方法执行的内存模型,每个方法在执行的时候都会创建一个栈帧,用于存储局部变量表,操作数栈,动态链接,方法接口等信息
- 局部变量表存储了编译期可知的各种基本数据类型、对象引用
- 线程请求的栈深度不够会报 StackOverflowError 异常
- 栈动态扩展的容量不够会报 OutOfMemoryError 异常
- 虚拟机栈是线程隔离的,即每个线程都有自己独立的虚拟机栈
-
本地方法:Native Stack
- 本地方法栈类似于虚拟机栈,只不过本地方法栈使用的是本地方法
-
堆:Heap
- 几乎所有的对象实例都在堆上分配内存
2、堆
所有的对象实例以及数组都要在堆上分配。堆是垃圾收集器管理的主要区域,也被称为“GC堆”;也是我们优化最多考虑的地方。
堆可以细分为:
- 新生代
- Eden 空间
- From Survivor 空间
- To Survivor 空间
- 老年代
- 永久代/元空间
- Java8 以前永久代,受 jvm 管理,java8 以后元空间,直接使用物理内存。因此,默认情况下,元空间的大小仅受本地内存限制。
垃圾回收
从 Java8 开始,HotSpot 已经完全将永久代(Permanent Generation)移除,取而代之的是一个新的区域—元空间(MetaSpace)
3、jconsole 与 jvisualvm
Jdk 的两个小工具 jconsole、jvisualvm(升级版的 jconsole);通过命令行启动,可监控本地和远程应用。远程应用需要配置
1、jvisualvm 能干什么
监控内存泄露,跟踪垃圾回收,执行时内存、cpu 分析,线程分析…
运行:正在运行的
休眠:sleep
等待:wait
驻留:线程池里面的空闲线程
监视:阻塞的线程,正在等待锁
2、安装插件方便查看 gc
- Cmd 启动 jvisualvm
- 工具->插件
4、监控指标
1、中间件指标
- 当前正在运行的线程数不能超过设定的最大值。一般情况下系统性能较好的情况下,线程数最小值设置 50 和最大值设置 200 比较合适。
- 当前运行的 JDBC 连接数不能超过设定的最大值。一般情况下系统性能较好的情况下,JDBC 最小值设置 50 和最大值设置 200 比较合适。
- GC频率不能频繁,特别是 FULL GC 更不能频繁,一般情况下系统性能较好的情况下,JVM 最小堆大小和最大堆大小分别设置 1024M 比较合适。
2、数据库指标
- SQL 耗时越小越好,一般情况下微秒级别。
- 命中率越高越好,一般情况下不能低于 95%。
- 锁等待次数越低越好,等待时间越短越好。
压测内容 | 压测线程数 | 吞吐量/s | 90%响应时间 | 99%响应时间 |
---|---|---|---|---|
Nginx | 50 | 2335 | 11 | 944 |
Gateway | 50 | 10367 | 8 | 31 |
简单服务 | 50 | 11341 | 8 | 17 |
首页一级菜单渲染 | 50 | 270(db,thymeleaf) | 267 | 365 |
首页渲染(开缓存) | 50 | 290 | 251 | 365 |
首页渲染(开缓存、优化数据库、关日志) | 50 | 700 | 105 | 183 |
三级分类数据获取 | 50 | 2(db)/8(加索引) | … | … |
三级分类(优化业务) | 50 | 111 | 571 | 896 |
三级分类(使用redis作为缓存) | 50 | 411 | 153 | 217 |
首页全量数据获取 | 50 | 7(静态资源) | ||
Nginx+Gateway | 50 | |||
Gateway+简单服务 | 50 | 3126 | 30 | 125 |
全链路 | 50 | 800 | 88 | 310 |
-
中间件越多,性能损失越大,大多都损失在网络交互了;
-
业务:
- Db(MySQL 优化)
- 模板的渲染速度(缓存)
- 静态资源
5、JVM 分析&调优
jvm 调优,调的是稳定,并不能带给你性能的大幅提升。服务稳定的重要性就不用多说了,保证服务的稳定,gc 永远会是 Java 程序员需要考虑的不稳定因素之一。复杂和高并发下的服务,必须保证每次 gc 不会出现性能下降,各种性能指标不会出现波动,gc 回收规律而且干净,找到合适的 jvm 设置。Full gc 最会影响性能,根据代码问题,避免 full gc 频率。可以适当调大年轻代容量,让大对象可以在年轻代触发 yong gc,调整大对象在年轻代的回收频次,尽可能保证大对象在年轻代回收,减小老年代缩短回收时间;
1、几个常用工具
jstack | 查看 jvm 线程运行状态,是否有死锁现象等等信息 |
jinfo | 可以输出并修改运行时的 java 进程的 opts。 |
jps | 与 unix 上的 ps 类似,用来显示本地的 java 进程,可以查看本地运行着几个 java 程序,并显示他们的进程号。 |
jstat | 一个极强的监视 VM 内存工具。可以用来监视 VM 内存内的各种堆和非堆的大小 及其内存使用量。 |
jmap | 打印出某个 java 进程(使用 pid)内存内的所有’对象’的情况(如:产生那些对象, 及其数量) |
2、命令示例
jstat 工具特别强大,有众多的可选项,详细查看堆内各个部分的使用量,以及加载类的数量。使用时,需加上查看进程的进程 id,和所选参数。
jstat -class pid | 显示加载 class 的数量,及所占空间等信息 |
jstat -compiler pid | 显示 VM 实时编译的数量等信息。 |
jstat -gc pid | 可以显示 gc 的信息,查看 gc 的次数,及时间 |
jstat -gccapacity pid | 堆内存统计,三代(young,old,perm)内存使用和占用大小 |
jstat -gcnew pid | 新生代垃圾回收统计 |
jstat -gcnewcapacity pid | 新生代内存统计 |
jstat -gcold pid | 老年代垃圾回收统计 |
除了以上一个参数外,还可以同时加上 两个数字,如:jstat -printcompilation 3024 250 6 是每 250 毫秒打印一次,一共打印 6 次,还可以加上-h3 每三行显示一下标题。
jstat -gcutil pid 1000 100 : 1000ms 统计一次 gc 情况统计 100 次;
jinfo 是 JDK 自带的命令,可以用来查看正在运行的 java 应用程序的扩展参数,包括 Java System 属性和 JVM 命令行参数;也可以动态的修改正在运行的 JVM 一些参数。当系统崩溃时,jinfo 可以从 core 文件里面知道崩溃的 Java 应用程序的配置信息
jinfo pid | 输出当前 jvm 进程的全部参数和系统属性 |
jinfo -flag name pid | 可以查看指定的 jvm 参数的值;打印结果:-无此参数,+有 |
jinfo -flag [+|-]name pid | 开启或者关闭对应名称的参数(无需重启虚拟机) |
jinfo -flag name=value pid | 修改指定参数的值 |
jinfo -flags pid | 输出全部的参数 |
jinfo -sysprops pid | 输出当前 jvm 进行的全部的系统属性 |
jmap 可以生成 heap dump 文件,也可以查看堆内对象分析内存信息等,如果不使用这个命令,还可以使用-XX:+HeapDumpOnOutOfMemoryError 参数来让虚拟机出现 OOM 的时候自动生成 dump 文件。
jmap -dump:live,format=b,file=dump.hprof pid | dump 堆到文件,format 指定输出格式,live 指明是活着的对象,file 指定文件名。eclipse 可以打开这个文件 |
jmap -heap pid | 打印 heap 的概要信息,GC 使用的算法,heap 的配置和使用情况,可以用此来判断内存目前的使用情况以及垃圾回收情况 |
jmap -finalizerinfo pid | 打印等待回收的对象信息 |
jmap -histo:live pid | 打印堆的对象统计,包括对象数、内存大小等。jmap -histo:live 这个命令执行,JVM 会先触发 gc,然后再统计信息 |
jmap -clstats pid | 打印 Java 类加载器的智能统计信息,对于每个类加载器而言,对于每个类加载器而言,它的名称,活跃度,地址,父类加载器,它所加载的类的数量和大小都会被打印。此外,包含的字符串数量和大小也会被打印。 |
-F | 强制模式。如果指定的 pid 没有响应,请使用 jmap -dump 或 jmap -histo 选项。此模式下,不支持 live 子选项。 |
jmap -F -histo pid | jstack 是 jdk 自带的线程堆栈分析工具,使用该命令可以查看或导出 Java 应用程序中线程堆栈信息。 |
jstack pid | 输出当前 jvm 进程的全部参数和系统属性 |
3、调优项
官方文档:https://docs.oracle.com/javase/8/docs/technotes/tools/unix/java.html#BGBCIEFC
二、压力测试
压力测试考察当前软硬件环境下系统所能承受的最大负荷并帮助找出系统瓶颈所在。压测都是为了系统在线上的处理能力和稳定性维持在一个标准范围内,做到心中有数。
使用压力测试,我们有希望找到很多种用其他测试方法更难发现的错误。有两种错误类型是: 内存泄漏,并发与同步。
有效的压力测试系统将应用以下这些关键条件:重复,并发,量级,随机变化。
1、性能指标
- 响应时间(Response Time: RT)
- 响应时间指用户从客户端发起一个请求开始,到客户端接收到从服务器端返回的响应结束,整个过程所耗费的时间。
- HPS(Hits Per Second) :每秒点击次数,单位是次/秒。
- TPS(Transaction per Second):系统每秒处理交易数,单位是笔/秒。
- QPS(Query per Second):系统每秒处理查询次数,单位是次/秒。
- 对于互联网业务中,如果某些业务有且仅有一个请求连接,那么 TPS=QPS=HPS,一般情况下用 TPS 来衡量整个业务流程,用 QPS 来衡量接口查询次数,用 HPS 来表示对服务器单击请求。
- 无论 TPS、QPS、HPS,此指标是衡量系统处理能力非常重要的指标,越大越好,根据经验,一般情况下:
- 金融行业:1000TPS~50000TPS,不包括互联网化的活动
- 保险行业:100TPS~100000TPS,不包括互联网化的活动
- 制造行业:10TPS~5000TPS
- 互联网电子商务:10000TPS~1000000TPS
- 互联网中型网站:1000TPS~50000TPS
- 互联网小型网站:500TPS~10000TPS
- 最大响应时间(Max Response Time) 指用户发出请求或者指令到系统做出反应(响应)的最大时间。
- 最少响应时间(Mininum ResponseTime) 指用户发出请求或者指令到系统做出反应(响应)的最少时间。
- 90%响应时间(90% Response Time) 是指所有用户的响应时间进行排序,第 90%的响应时间。
- 从外部看,性能测试主要关注如下三个指标
- 吞吐量:每秒钟系统能够处理的请求数、任务数。
- 响应时间:服务处理一个请求或一个任务的耗时。
- 错误率:一批请求中结果出错的请求所占比例。
2、JMeter
1、JMeter 安装
https://jmeter.apache.org/download_jmeter.cgi
下载对应的压缩包,解压运行 jmeter.bat 即可
2、JMeter 压测示例
1、添加线程组
线程组参数详解:
- 线程数:虚拟用户数。一个虚拟用户占用一个进程或线程。设置多少虚拟用户数在这里也就是设置多少个线程数。
- Ramp-Up Period(in seconds)准备时长:设置的虚拟用户数需要多长时间全部启动。如果线程数为 10,准备时长为 2,那么需要 2 秒钟启动 10 个线程,也就是每秒钟启动 5 个线程。
- 循环次数:每个线程发送请求的次数。如果线程数为 10,循环次数为 100,那么每个线程发送 100 次请求。总请求数为 10*100=1000 。如果勾选了“永远”,那么所有线程会一直发送请求,一到选择停止运行脚本。
- Delay Thread creation until needed:直到需要时延迟线程的创建。
- 调度器:设置线程组启动的开始时间和结束时间(配置调度器时,需要勾选循环次数为永远)
- 持续时间(秒):测试持续时间,会覆盖结束时间
- 启动延迟(秒):测试延迟启动时间,会覆盖启动时间
- 启动时间:测试启动时间,启动延迟会覆盖它。当启动时间已过,手动只需测试时当前时间也会覆盖它。
- 结束时间:测试结束时间,持续时间会覆盖它。
2、添加 HTTP 请求
3、添加监听器
4、启动压测&查看分析结果
结果分析
- 有错误率同开发确认,确定是否允许错误的发生或者错误率允许在多大的范围内;
- Throughput 吞吐量每秒请求的数大于并发数,则可以慢慢的往上面增加;若在压测的机器性能很好的情况下,出现吞吐量小于并发数,说明并发数不能再增加了,可以慢慢的往下减,找到最佳的并发数;
- 压测结束,登陆相应的 web 服务器查看 CPU 等性能指标,进行数据的分析;
- 最大的 tps,不断的增加并发数,加到 tps 达到一定值开始出现下降,那么那个值就是最大的 tps。
- 最大的并发数:最大的并发数和最大的 tps 是不同的概率,一般不断增加并发数,达到一个值后,服务器出现请求超时,则可认为该值为最大的并发数。
- 压测过程出现性能瓶颈,若压力机任务管理器查看到的 cpu、网络和 cpu 都正常,未达到 90%以上,则可以说明服务器有问题,压力机没有问题。
- 影响性能考虑点包括:
- 数据库、应用程序、中间件(tomact、Nginx)、网络和操作系统等方面
- 首先考虑自己的应用属于 CPU 密集型还是 IO 密集型
3、JMeter Address Already in use 错误解决
windows 本身提供的端口访问机制的问题。
Windows 提供给 TCP/IP 链接的端口为 1024-5000,并且要四分钟来循环回收他们。就导致我们在短时间内跑大量的请求时将端口占满了。
-
cmd 中,用 regedit 命令打开注册表
-
在 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters 下,
- 右击 parameters,添加一个新的 DWORD,名字为 MaxUserPort
- 然后双击 MaxUserPort,输入数值数据为 65534,基数选择十进制(如果是分布式运
行的话,控制机器和负载机器都需要这样操作哦)
-
修改配置完毕之后记得重启机器才会生效
https://support.microsoft.com/zh-cn/help/196271/when-you-try-to-connect-from-tcp-ports-greater-than-5000-you-receive-t
TCPTimedWaitDelay:30
缓存&分布式锁
一、缓存
1、缓存使用
为了系统性能的提升,我们一般都会将部分数据放入缓存中,加速访问。而 db 承担数据落盘工作。
哪些数据适合放入缓存?
-
即时性、数据一致性要求不高的
-
访问量大且更新频率不高的数据(读多,写少)
举例:电商类应用,商品分类,商品列表等适合缓存并加一个失效时间(根据数据更新频率来定),后台如果发布一个商品,买家需要 5 分钟才能看到新的商品一般还是可以接受的。
data = cache.load(id);//从缓存加载数据
If(data == null){
data = db.load(id);//从数据库加载数据
cache.put(id,data);//保存到 cache 中
}
return data;
注意:在开发中,凡是放入缓存中的数据我们都应该指定过期时间,使其可以在系统即使没有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的数据永久不一致问题。
2、整合 redis 作为缓存
1、引入 redis-starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2、配置 redis
spring:
redis:
host: 192.168.56.10
port: 6379
3、使用 RedisTemplate 操作 redis
@Autowired
StringRedisTemplate stringRedisTemplate;
@Test
public void testStringRedisTemplate(){
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
ops.set("hello","world_"+ UUID.randomUUID().toString());
String hello = ops.get("hello");
System.out.println(hello);
}
4、切换使用 jedis
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
二、缓存失效问题
先来解决大并发读情况下的缓存失效问题;
1、缓存穿透
-
缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的 null 写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。
-
在流量大时,可能 DB 就挂掉了,要是有人利用不存在的 key 频繁攻击我们的应用,这就是漏洞。
-
解决:
缓存空结果、并且设置短的过期时间。
2、缓存雪崩
-
缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到 DB,DB 瞬时压力过重雪崩。
-
解决:
原有的失效时间基础上增加一个随机值,比如 1-5 分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
3、缓存击穿
-
对于一些设置了过期时间的 key,如果这些 key 可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:如果这个 key 在大量请求同时进来前正好失效,那么所有对这个 key 的数据查询都落到 db,我们称为缓存击穿。
-
解决:
加锁
-
方案一:synchronized + 双重检查(单体应用)
-
方案二:分布式锁
-
三、缓存数据一致性
1、双写模式
2、失效模式
3、改进方法 1-分布式读写锁
分布式读写锁。读数据等待写数据整个操作完成
4、改进方法 2-使用 cananl
解决方案
无论是双写模式还是失效模式,都会导致缓存的不一致问题,即多个实例同时更新会出事。怎么办?
- 如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加上过期时间,每隔一段时间触发读的主动更新即可
- 如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式
- 缓存数据+过期时间也足够解决大部分业务对于缓存的要求
- 通过加锁保证并发读写,写写的时候按顺序排好队,读读无所谓,所以适合使用读写锁。(业务不关心脏数据,允许临时脏数据可忽略)
总结:
- 我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保证每天拿到当前最新数据即可
- 我们不应该过度设计,增加系统的复杂性
- 遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。
四、分布式锁
1、分布式锁与本地锁
2、分布式锁实现
使用 RedisTemplate 操作分布式锁
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
//1、占分布式锁。去 redis 占坑
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid,300,TimeUnit.SECONDS);
if(lock){
System.out.println("获取分布式锁成功...");
//加锁成功... 执行业务
//2、设置过期时间,必须和加锁是同步的,原子的
//redisTemplate.expire("lock",30,TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> dataFromDb;
try{
dataFromDb = getDataFromDb();
}finally {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
//删除锁
Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
}
//获取值对比+对比成功删除=原子操作 lua 脚本解锁
// String lockValue = redisTemplate.opsForValue().get("lock");
// if(uuid.equals(lockValue)){
//删除我自己的锁
// redisTemplate.delete("lock");//删除锁
// }
return dataFromDb;
}else {
//加锁失败...重试。synchronized ()
//休眠 100ms 重试
System.out.println("获取分布式锁失败...等待重试");
try{
Thread.sleep(200);
}catch (Exception e){
}
return getCatalogJsonFromDbWithRedisLock();//自旋的方式
}
}
3、Redisson 完成分布式锁
1、简介
Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(In-Memory Data Grid)。充分的利用了 Redis 键值数据库提供的一系列优势,基于 Java 实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。
官方文档:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95
2、配置
@Configuration
public class MyRedissonConfig {
@Bean(destroyMethod = "shutdown")
public RedissonClient config(){
// 默认连接地址 127.0.0.1:6379
RedissonClient redisson = Redisson.create();
Config config = new Config();
//redis://127.0.0.1:7181
//可以用"rediss://"来启用 SSL 连接
config.useSingleServer().setAddress("redis://192.168.56.10:6379");
return Redisson.create(config);
}
}
3、使用分布式锁
可重入锁(Reentrant Lock)(非公平)
// 获取一把锁,只要锁的名字一样,就是同一把锁
RLock lock = redisson.getLock("anyLock");// 最常见的使用方法
lock.lock(); // 阻塞等待
// 锁自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删除掉
// 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
// 加锁以后 10 秒钟自动解锁
// 无需调用 unlock 方法手动解锁
lock.lock(10, TimeUnit.SECONDS); // 自动解锁的时间一定要大于业务的执行时间
//如果传递了超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
// 如果未指定锁的超时时间,就是用30 *1000(LockWatchdogTimeout的默认时间)
// 只要占锁成功,就会启动一个定时任务(重新给锁设定过期时间,新的过期时间就是看门狗的默认时间)。每隔10s都会自动执行
// 尝试加锁,最多等待 100 秒,上锁以后 10 秒自动解锁
boolean res = lock.tryLock(100,10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
读写锁(ReadWriteLock)
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
// 保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁);读锁是一个共享锁
// 写锁没释放,读就必须等待
public String writeValue() {
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
// 写数据加写锁,读数据加读锁
RLock rLock = lock.writeLock();
String s = "";
try {
rLock.lock();
s= UUID.randomUUID().toString();
Thread.sleep(30000);
redisTemplate.opsForValue().set("writeValue", s);
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
public String readValue() {
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
// 读数据加读锁
RLock rLock = lock.readLock();
String s = "";
try {
rLock.lock();
Thread.sleep(30000);
s = redisTemplate.opsForValue().get("writeValue");
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
信号量(Semaphore)
/**
* 停车位
* 限流
*/
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.acquire(); // 获取一个信号值 // 阻塞
boolean b = park.tryAcquire(); // 重试,不行就放弃
if (b){
// 执行业务
}else {
return "error";
}
return "ok";
}
public String go() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.release(); // 释放一个值
return "ok";
}
闭锁(CountDownLatch)
/**
* 放假,锁门
* 5个班都走完,才能锁大门
*/
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
door.await(); // 等待闭锁完成
return "放假了。。。";
}
public String gogogo(Long id){
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown(); // 计数减一
return id+"班的人都走了。。。。";
}
五、Spring Cache
1、简介
- Spring 从 3.1 开始定义了 org.springframework.cache.Cache和 org.springframework.cache.CacheManager 接口来统一不同的缓存技术;并支持使用 JCache(JSR-107)注解简化我们开发;
- Cache 接口为缓存的组件规范定义,包含缓存的各种操作集合;Cache 接 口 下 Spring 提 供 了 各 种 xxxCache 的 实 现 ; 如 RedisCache , EhCacheCache ,ConcurrentMapCache 等;
- 每次调用需要缓存功能的方法时,Spring 会检查检查指定参数的指定的目标方法是否已经被调用过;如果有就直接从缓存中获取方法调用后的结果,如果没有就调用方法并缓存结果后返回给用户。下次调用直接从缓存中获取。
- 使用 Spring 缓存抽象时我们需要关注以下两点;
- 1、确定方法需要被缓存以及他们的缓存策略
- 2、从缓存中读取之前缓存存储的数据
2、基础概念
3、注解
4、表达式语法
5、使用
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
@EnableConfigurationProperties(CacheProperties.class)
@Configuration
@EnableCaching
public class MyCacheConfig {
@Bean
RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
CacheProperties.Redis redisProperties = cacheProperties.getRedis();
// 将配置文件中的所有配置都生效
if (redisProperties.getTimeToLive() != null) {
config = config.entryTtl(redisProperties.getTimeToLive());
}
if (redisProperties.getKeyPrefix() != null) {
config = config.prefixCacheNameWith(redisProperties.getKeyPrefix());
}
if (!redisProperties.isCacheNullValues()) {
config = config.disableCachingNullValues();
}
if (!redisProperties.isUseKeyPrefix()) {
config = config.disableKeyPrefix();
}
return config;
}
}
spring:
cache:
type: redis
redis:
time-to-live: 3600000
key-prefix: CACHE_ # 指定了前缀,如果没有就默认使用缓存的名字作为前缀
use-key-prefix: true
cache-null-values: true # 是否缓存控制,防止缓存穿透
// 指定生成的缓存使用的key: key属性指定,接受一个SpEL
@Cacheable(value={"category"}, key = "#root.method.name")
public List<CategoryEntity> getLevelCategory(){
...
return null;
}
// 失效模式
@CacheEvict(value="category", key = "'getLevel1Categorys'")
// 同时进行多种缓存操作 @Caching
// 指定删除某个分区下的所有数据 @CacheEvict(value="category",allEntries=true)
// 存储同一类型的数据,都可以指定成同一个分区。分区名默认是分区前缀
@CachePut //双写模式
Spring-Cache的不足
- 读模式:
- 缓存穿透:查询一个null数据,解决:缓存空数据,ache-null-values=true
- 缓存击穿:大量并发进来同时查询一个正好过期的数据。解决:加锁? 默认是无锁的,(@Cacheable(sync = true))
- 缓存雪崩:大量的key同时过期。解决:加随机事件。
- 写模式:缓存与数据库不一致
- 读写加锁
- 引入Canal,感知到MySQL的更新去更新数据库
- 读多写多,直接去数据库查询就行
总结:
- 常规数据(读多写少,即时性,一致性要求不高的数据);完全可以使用spring-cache
- 特殊数据:特殊设计
原理:CacheManager(RedisCacheManager) -> Cache(RedisCache) -> Cache 负责缓存的读写
异步&线程池
一、线程回顾
1、初始化线程的 4 种方式
1)、继承 Thread
2)、实现 Runnable 接口
3)、实现 Callable 接口 + FutureTask (可以拿到返回结果,可以处理异常)
4)、线程池
方式1 和方式 2:主进程无法获取线程的运算结果。不适合当前场景
方式 3:主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。可以导致服务器资源耗尽。
方式 4:通过如下两种方式初始化线程池
Executors.newFiexedThreadPool(3);
//或者
new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit
unit,
workQueue, threadFactory, handler);
通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。
2、线程池的七大参数
/**
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
池中一直保持的线程的数量,即使线程空闲。除非设置了 allowCoreThreadTimeOut
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
池中允许的最大的线程数
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
当线程数大于核心线程数的时候,在指定时间后,释放空闲线程
* @param unit the time unit for the {@code keepAliveTime} argument
时间单位
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
阻塞队列,用来存储等待执行的任务,如果当前对线程的需求超过了 corePoolSize大小,就会放在这里等待空闲线程执行。
* @param threadFactory the factory to use when the executor
* creates a new thread
创建线程的工厂,比如指定线程名等
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
拒绝策略,如果线程满了,线程池就会使用拒绝策略。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
运行流程:
1、线程池创建,准备好 core 数量的核心线程,准备接受任务
2、新的任务进来,用 core 准备好的空闲线程执行。
- (1)、core 满了,就将再进来的任务放入阻塞队列中。空闲的 core 就会自己去阻塞队列获取任务执行
- (2)、阻塞队列满了,就直接开新线程执行,最大只能开到 max 指定的数量
- (3)、max 都执行好了。Max-core 数量空闲的线程会在 keepAliveTime 指定的时间后自动销毁。最终保持到 core 大小
- (4)、如果线程数开到了 max 的数量,还有新任务进来,就会使用 reject 指定的拒绝策略进行处理
3、所有的线程创建都是由指定的 factory 创建的。
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,
200,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
面试:
一个线程池 core 7; max 20 ,queue:50,100 并发进来怎么分配的;
先有 7 个能直接得到执行,接下来 50 个进入队列排队,再多开 13 个继续执行。现在 70 个被安排上了。剩下 30 个默认拒绝策略。
3、常见的 4 种线程池
-
newCachedThreadPool
-
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
-
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
-
-
newFixedThreadPool
-
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
-
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等
-
-
newScheduledThreadPool
-
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
-
创建一个定长线程池,支持定时及周期性任务执行。
-
-
newSingleThreadExecutor
-
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
-
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
-
4、开发中为什么使用线程池
- 降低资源的消耗
- 通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
- 提高响应速度
- 因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行
- 提高线程的可管理性
- 线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配
二、CompletableFuture 异步编排
Future 是 Java 5 添加的类,用来描述一个异步计算的结果。你可以使用isDone
方法检查计算是否完成,或者使用get
阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel
方法停止任务的执行。
虽然Future
以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
很多语言,比如 Node.js,采用回调的方式实现异步编程。Java 的一些框架,比如 Netty,自己扩展了 Java 的 Future
接口,提供了addListener
等多个扩展方法;Google guava 也提供了通用的扩展 Future;Scala 也提供了简单易用且功能强大的 Future/Promise 异步编程模式。
作为正统的 Java 类库,是不是应该做点什么,加强一下自身库的功能呢?
在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture,提供了非常强大的Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。
CompletableFuture 类实现了 Future 接口,所以你还是可以像以前一样通过get
方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。
CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。
1、创建异步对象
CompletableFuture 提供了四个静态方法来创建一个异步操作。
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
2、可以传入自定义的线程池,否则就用默认的线程池;
2、计算完成时回调方法
public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}
whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。
whenComplete 和 whenCompleteAsync 的区别:
-
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
-
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
@Override
public Object get() {
System.out.println(Thread.currentThread().getName() + "\t completableFuture");
int i = 10 / 0;
return 1024;
}
}).whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object o, Throwable throwable) {
System.out.println("-------o=" + o.toString());
System.out.println("-------throwable=" + throwable);
}0
}).exceptionally(new Function<Throwable, Object>() {
@Override
public Object apply(Throwable throwable) {
System.out.println("throwable=" + throwable);
return 6666;
}
});
System.out.println(future.get());
}
}
3、handle 方法
public <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
public <U> CompletableFuture<U> handleAsync( BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}
public <U> CompletableFuture<U> handleAsync( BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。
// 方法完成后的处理
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).handle((res, thr) -> {
if (res != null) {
return res * 2;
}
if (thr != null) {
return 0;
}
return 0;
});
4、线程串行化方法
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun 的后续操作
带有 Async 默认是异步执行的。同之前。
以上都要前置任务成功完成。
Function<? super T,? extends U>
-
T:上一个任务返回结果的类型
-
U:当前任务的返回值类型
5、两任务组合 - 都要完成
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(asyncPool, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) {
return biRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {
return biRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}
两个任务必须都完成,触发该任务。
thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务。
6、两任务组合 - 一个完成
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(null, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(asyncPool, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}
public CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(null, other, action);
}
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(asyncPool, other, action);
}
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor) {
return orAcceptStage(screenExecutor(executor), other, action);
}
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) {
return orRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) {
return orRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor) {
return orRunStage(screenExecutor(executor), other, action);
}
当两个任务中,任意一个 future 任务完成的时候,执行任务。
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值。
7、多任务组合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
allOf:等待所有任务完成
anyOf:只要有一个任务完成
社交登陆&单点登陆
一、社交登陆
QQ、微博、github 等网站的用户量非常大,别的网站为了简化自我网站的登陆与注册逻辑,引入社交登陆功能;
步骤:
1)、用户点击 QQ 按钮
2)、引导跳转到 QQ 授权页
3)、用户主动点击授权,跳回之前网页。
1、OAuth2.0
- OAuth: OAuth(开放授权)是一个开放标准,允许用户授权第三方网站访问他们存储在另外的服务提供者上的信息,而不需要将用户名和密码提供给第三方网站或分享他们数据的所有内容。
- OAuth2.0:对于用户相关的 OpenAPI(例如获取用户信息,动态同步,照片,日志,分享等),为了保护用户数据的安全和隐私,第三方网站访问用户数据前都需要显式的向用户征求授权。
- 官方版流程:
- (A)用户打开客户端以后,客户端要求用户给予授权。
- (B)用户同意给予客户端授权。
- (C)客户端使用上一步获得的授权,向认证服务器申请令牌。
- (D)认证服务器对客户端进行认证以后,确认无误,同意发放令牌。
- (E)客户端使用令牌,向资源服务器申请获取资源。
- (F)资源服务器确认令牌无误,同意向客户端开放资源。
2、微博登陆准备工作
1、进入微博开放平台
2、登陆微博,进入微连接,选择网站接入
3、选择立即接入
4、创建自己的应用
5、我们可以在开发阶段进行测试了
记住自己的 app key 和 app secret 我们一会儿用
6、进入高级信息,填写授权回调页的地址
7、添加测试账号(选做)
8、进入文档,按照流程测试社交登陆
3、微博登陆测试
1、引导用户到如下地址
https://api.weibo.com/oauth2/authorize?client_id=YOUR_CLIENT_ID&response_type=code&redirect_uri=YOUR_REGISTERED_REDIRECT_URI
2、用户同意授权,页面跳转至 xxx/?code=CODE
3、使用返回的 code,换取 access token
https://api.weibo.com/oauth2/access_token?client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&grant_type=authorization_code&redirect_uri=YOUR_REGISTERED_REDIRECT_URI&code=CODE
https://api.weibo.com/oauth2/access_token?client_id=4217011631&client_secret=98de9bad1b633e42e01c46746e791047&grant_type=authorization_code&redirect_uri=http://www.gulishop.com/success&code=fef987b3f9ad1169955840b467bfc661
注意,上面这个是 post 请求
{
"access_token": "2.00pDpxyGd3J5bEef6b98778e0ZKsu4",
"remind_in": "157679999",
"expires_in": 157679999,
"uid": "6397634785",
"isRealName": "true"
}
4、使用 AccessToken 调用开发 API 获取用户信息
至此微博登陆调试完成。
Oauth2.0;授权通过后,使用 code 换取 access_token,然后去访问任何开放 API
1)、code 用后即毁
2)、access_token 在几天内是一样的
3)、uid 永久固定
二、SSO(单点登陆)
Single Sign On 一处登陆、处处可用
0、前置概念:
1)、单点登录业务介绍
早期单一服务器,用户认证。
缺点:单点性能压力,无法扩展
分布式,SSO(single sign on)模式
解决 :
- 用户身份信息独立管理,更好的分布式管理。
- 可以自己扩展安全策略
- 跨域不是问题
缺点:
- 认证服务器访问压力较大。
2)、几个基本概念
2.1 什么是跨域 Web SSO。
域名通过“.”号切分后,从右往左看,不包含“.”的是顶级域名,包含一个“.”的是一级域名,包含两个“.”的是二级域名,以此类推。
例如对网址 http://www.cnblogs.com/baibaomen,域名部分是 www.cnblogs.com。
用“.”拆分后从右往左看:
- cookie.setDomain(“.cnblogs.com”);//最多设置到本域的一级域名这里
- cookie.setDomain(“.baidu.com”);//最多设置到本域的一级域名这里
”com”不包含“.”,是顶级域名; “cnblogs.com”包含一个“.”,是一级域名;
www.cnblogs.com 包含两个“.”,是二级域名。
blog.cnblogs.com
news.cnblogs.com
跨域 Web SSO 指的是针对 Web 站点,各级域名不同都能处理的单点登录方案。
2.2 浏览器读写 cookie 的安全性限制:一级或顶级域名不同的网站,
无法读到彼此写的 cookie。
所以 baidu.com 无法读到 cnblogs.com 写的 cookie。
一级域名相同,只是二级或更高级域名不同的站点,可以通过设置 domain 参数共享 cookie读写。这种场景可以选择不跨域的 SSO 方案。
域名相同,只是 https 和 http 协议不同的 URL,默认 cookie 可以共享。知道这一点对处理 SSO 服务中心要登出
2.3 http 协议是无状态协议。浏览器访问服务器时,要让服务器知道你是谁,只有两种方式:
方式一:把“你是谁”写入 cookie。它会随每次 HTTP 请求带到服务端;
方式二:在 URL、表单数据中带上你的用户信息(也可能在 HTTP 头部)。这种方式依赖于从特定的网页入口进入,因为只有走特定的入口,才有机会拼装出相应的信息,提交到服务端。
大部分 SSO 需求都希望不依赖特定的网页入口(集成门户除外),所以后一种方式有局限性。适应性强的方式是第一种,即在浏览器通过 cookie 保存用户信息相关凭据,随每次请求传递到服务端。我们采用的方案是第一种。
1、Cookie 接入方式
2、Token 接入方式
类似社交登陆
3、有状态登录
为了保证客户端 cookie 的安全性,服务端需要记录每次会话的客户端信息,从而识别客户端身份,根据用户身份进行请求的处理,典型的设计如 tomcat 中的 session。
例如登录:用户登录后,我们把登录者的信息保存在服务端session 中,并且给用户一个 cookie值,记录对应的 session。然后下次请求,用户携带 cookie 值来,我们就能识别到对应 session,从而找到用户的信息。
缺点是什么?
- 服务端保存大量数据,增加服务端压力
- 服务端保存用户状态,无法进行水平扩展
- 客户端请求依赖服务端,多次请求必须访问同一台服务器
即使使用 redis 保存用户的信息,也会损耗服务器资源。
4、无状态登录
微服务集群中的每个服务,对外提供的都是 Rest 风格的接口。而 Rest 风格的一个最重要的规范就是:服务的无状态性,即:
- 服务端不保存任何客户端请求者信息
- 客户端的每次请求必须具备自描述信息,通过这些信息识别客户端身份
带来的好处是什么呢?
- 客户端请求不依赖服务端的信息,任何多次请求不需要必须访问到同一台服务
- 服务端的集群和状态对客户端透明
- 服务端可以任意的迁移和伸缩
- 减小服务端存储压力
5、集成社交登陆
1)、用户点击不同的社交登陆按钮,先来我们自己的服务器
https://passport.csdn.net/v1/register/authorization?authType=qq/sina
2)、命令浏览器重定向到用户授权页
用户确认授权
https://graph.qq.com/oauth2.0/authorize
3)、qq 返回的响应,会命令用户重定向到指定位置
4)、服务器的这个位置就可以收到我们的 code 码
收到 code 码,服务器自己用 code 交换 access_token 令牌,并获取到用户的信息。给浏览器只给用户的信息即可;
access_token=UUID
浏览器访问带 UUID_token 而不是 access_token;
三、JWT
1、简介
JWT,全称是 Json Web Token, 是 JSON 风格轻量级的授权和身份认证规范,可实现无状态、分布式的 Web 应用授权;官网:https://jwt.io
GitHub 上 jwt 的 java 客户端:https://github.com/jwtk/jjwt
我们最终可以利用 jwt 实现无状态登录
2、数据格式
JWT 包含三部分数据:
- Header:头部,通常头部有两部分信息:
- token 类型:JWT
- 加密方式:base64(HS256)
- Payload:载荷,就是有效数据,一般包含下面信息:
- 用户身份信息(注意,这里因为采用 base64 编码,可解码,因此不要存放敏感信息)
- 注册声明:如 token 的签发时间,过期时间,签发人等
- 这部分也会采用 base64 编码,得到第二部分数据
- Signature:签名,是整个数据的认证信息。根据前两步的数据,再加上指定的密钥(secret)(不要泄漏,最好周期性更换),通过 base64 编码生成。用于验证整个数据完整和可靠性
3、交互流程
步骤:
- 1、用户登录
- 2、服务的认证,通过后根据 secret 生成 token
- 3、将生成的 token 返回给浏览器
- 4、用户每次请求携带 token
- 5、服务端利用秘钥解读 jwt 签名,判断签名有效后,从 Payload 中获取用户信息
- 6、处理请求,返回响应结果
因为 JWT 签发的 token 中已经包含了用户的身份信息,并且每次请求都会携带,这样服务的就无需保存用户信息,甚至无需去数据库查询,完全符合了 Rest 的无状态规范。
4、授权中心流程
5、JWT 优势
- 易于水平扩展
- 在 cookie-session 方案中,cookie 内仅包含一个 session 标识符,而诸如用户信息、授权列表等都保存在服务端的 session 中。如果把 session 中的认证信息都保存在JWT 中,在服务端就没有 session 存在的必要了。当服务端水平扩展的时候,就不用处理 session 复制(session replication)/ session 黏连(sticky session)或是引入外部 session 存储了[实际上 spring-session 和 hazelcast 能完美解决这个问题]。
- 防护 CSRF(跨站请求伪造)攻击
- 访问某个网站会携带这个域名下的 cookie。所以可能导致攻击。但是我们可以把 jwt放在请求头中发送。
- Jwt 放在请求头中,就必须把 jwt 保存在 cookie 或者 localStorage 中。保存这里 js就会读写,又会导致 xss 攻击。可以设置 cookie,httponly=true 来防止 xss
- 安全
- 只是 base64 编码了,cookie+session 直接将数据保存在服务端,看都看不见,请问哪个更安全?
6、使用 JWT 带来的问题
- 我们不建议使用 jwt+cookie 代替 session+cookie 机制,jwt 更适合 restful api
- jwt token 泄露了怎么办?
- 这个问题可以不考虑,因为 session+cookie 同样泄露了 cookie 的 jsessionid 也会有这个问题
- 我们可以遵循以下规范减少风险
- 使用 https 加密应用
- 返 回jwt 给 客 户 端 时 设 置 httpOnly=true 并 且 使 用cookie 而 不 是LocalStorage 存储 jwt,防止 XSS 攻击和 CSRF 攻击
- secret 如果泄露会导致大面积风险
- 定期更新
- Secret 设计可以和用户关联起来,每个用户不一样。防止全用一个 secret
- 注销和修改密码
- 传统的 session+cookie 方案用户点击注销,服务端清空 session 即可,因为状态保存在服务端。我们不害怕注销后的假登录
- Jwt 会有问题。用户如果注销了或者修改密码了。恶意用户还使用之前非法盗取来的 token,可以在不重新登录的情况下继续使用
- 可以按程度使用如下设计,减少一定的风险
- 清空客户端的 cookie,这样用户访问时就不会携带 jwt,服务端就认为用户需要重新登录。这是一个典型的假注销,对于用户表现出退出的行为,实际上这个时候携带对应的 jwt 依旧可以访问系统。
- 清空或修改服务端的用户对应的 secret,这样在用户注销后,jwt 本身不变,但是由于 secret 不存在或改变,则无法完成校验。这也是为什么将secret 设计成和用户相关的原因
- 借助第三方存储,管理 jwt 的状态,可以以 jwt 为 key,去 redis 校验存在性。但这样,就把无状态的 jwt 硬生生变成了有状态了,违背了 jwt的初衷。实际上这个方案和 session 都差不多了。
- 修改密码则略微有些不同,假设号被到了,修改密码(是用户密码,不是jwt 的 secret)之后,盗号者在原 jwt 有效期之内依旧可以继续访问系统,所以仅仅清空 cookie 自然是不够的,这时,需要强制性的修改 secret
- 可以按程度使用如下设计,减少一定的风险
- 续签问题
- 传统的 cookie 续签方案一般都是框架自带的,session 有效期 30 分钟,30分钟内如果有访问,session 有效期被刷新至 30 分钟。而 jwt 本身的 payload之中也有一个 exp 过期时间参数,来代表一个 jwt 的时效性,而 jwt 想延期这个 exp 就有点身不由己了,因为 payload 是参与签名的,一旦过期时间被修改,整个 jwt 串就变了,jwt 的特性天然不支持续签!
- 可如下解决,但都不是完美方案
- 每次请求刷新 jwt:简单暴力,性能低下,浪费资源。
- 只要快要过期的时候刷新 jwt:jwt 最后的几分钟,换新一下。但是如果用户连续操作了 27 分钟,只有最后的 3 分钟没有操作,导致未刷新jwt,就很难受。
- 完 善refreshToken : 借 鉴oauth2 的 设 计 , 返 回 给 客 户 端 一 个refreshToken,允许客户端主动刷新 jwt。这样做,还不如用 oauth2
- 使用 redis 记录独立的过期时间:jwt 作为 key,在 redis 中保存过期时间,每次使用在 redis 中续期,如果 redis 没有就认为过期。但是这样做,还不如用 session+cookie
- 总结
- 在 Web 应用中,别再把 JWT 当做 session 使用,绝大多数情况下,传统的cookie-session 机制工作得更好
- JWT 适合一次性的命令认证,颁发一个有效期极短的 JWT,即使暴露了危险也很小,由于每次操作都会生成新的 JWT,因此也没必要保存 JWT,真正实现无状态。
RbaaitMQ
消息中间件
异步处理
应用解耦
流量控制
一、概述
- 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力
- 消息服务中两个重要概念:
- 消息代理(message broker)和目的地(destination)
当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
-
消息队列主要有两种形式的目的地
-
队列(queue):点对点消息通信(point-to-point)
-
主题(topic):发布(publish)/订阅(subscribe)消息通信
-
-
点对点式:
- 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列
- 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
-
发布订阅式:
- 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息
-
JMS(Java Message Service)JAVA消息服务:
- 基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
-
AMQP(Advanced Message Queuing Protocol)
-
高级消息队列协议,也是一个消息代理的规范,兼容JMS
-
RabbitMQ是AMQP的实现
-
- Spring支持
- spring-jms提供了对JMS的支持
- spring-rabbit提供了对AMQP的支持
- 需要ConnectionFactory的实现来连接消息代理
- 提供JmsTemplate、RabbitTemplate来发送消息
- @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
- @EnableJms、@EnableRabbit开启支持
- Spring Boot自动配置
- JmsAutoConfiguration
- RabbitAutoConfiguration
- 市面的MQ产品
- ActiveMQ、RabbitMQ、RocketMQ、Kafka
二、RabbitMQ概念
RabbitMQ简介:
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
核心概念
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange 和Queue的绑定可以是多对多的关系。
Connection
网络连接,比如一个TCP连接。
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
Broker
表示消息队列服务器实体
三、Docker安装RabbitMQ
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p
15672:15672 rabbitmq:management
4369, 25672 (Erlang发现&集群端口)
5672, 5671 (AMQP端口)
15672 (web管理后台端口)
61613, 61614 (STOMP协议端口)
1883, 8883 (MQTT协议端口)
https://www.rabbitmq.com/networking.html
四、RabbitMQ运行机制
AMQP 中的消息路由
AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
-
Direct Exchange
消息中的路由键(routing key)如果和Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routingkey 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
-
Fanout Exchange
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
-
Topic Exchange
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配一个单词。
五、RabbitMQ整合
- 引入 spring-boot-starter-amqp
- application.yml配置
- 测试RabbitMQ
- AmqpAdmin:管理组件
- RabbitTemplate:消息发送处理组件
- @RabbitListener 监听消息的方法可以有三种参数(不分数量,顺序)
- Object content, Message message, Channel channel
六、RabbitMQ消息确认机制-可靠抵达
保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制
- publisher confirmCallback 确认模式
- publisher returnCallback 未投递到 queue 退回模式
- consumer ack机制
可靠抵达-ConfirmCallback
spring.rabbitmq.publisher-confirms=true
- 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启confirmcallback 。
- CorrelationData:用来表示当前消息唯一性。
- 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才会调用 confirmCallback。
- 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。
可靠抵达-ReturnCallback
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
- confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到return 退回模式。
- 这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。
可靠抵达-Ack消息确认机制
spring.rabbitmq.listener.simple.acknowledge-mode=manual
-
消费者获取到消息,成功处理,可以回复Ack给Broker
-
basic.ack用于肯定确认;broker将移除此消息
-
basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
-
basic.reject用于否定确认;同上,但不能批量
-
-
默认自动ack,消息被消费者收到,就会从broker的queue中移除
-
queue无消费者,消息依然会被存储,直到消费者消费
-
消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式
- 消息处理成功,ack(),接受下一个消息,此消息broker就会移除
- 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
- 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人
七、RabbitMQ延时队列(实现定时任务)
场景:
比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。
常用解决方案:
spring的 schedule 定时任务轮询数据库
缺点:
- 消耗系统内存、增加了数据库的压力、存在较大的时间误差
解决:rabbitmq的消息TTL和死信Exchange结合
消息的TTL(Time To Live)
-
消息的TTL就是消息的存活时间。
-
RabbitMQ可以对队列和消息分别设置TTL。
- 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
- 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。
Dead Letter Exchanges(DLX)
-
一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。(什么是死信)
-
一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
-
上面的消息的TTL到了,消息过期了。
-
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
-
-
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
-
我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列
-
手动ack&异常消息统一放在一个队列处理建议的两种方式
- catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
- 给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败
延时队列实现-1
设置队列过期时间实现延时队列
延时队列实现-2
设置消息过期时间实现延时队列
SpringBoot中使用延时队列
- 1、Queue、Exchange、Binding可以@Bean进去
- 2、监听消息的方法可以有三种参数(不分数量,顺序)
- Object content, Message message, Channel channel
- 3、channel可以用来拒绝消息,否则自动ack;
如何保证消息可靠性-消息丢失
消息丢失
- 消息发送出去,由于网络问题没有抵达服务器
- 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期扫描重发的方式
- 做好日志记录,每个消息状态是否都被服务器收到都应该记录
- 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发
- 消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。
- publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。
- 自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机
- 一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重新入队
如何保证消息可靠性-消息重复
消息重复
- 消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者
- 消息消费失败,由于重试机制,自动又将消息发送出去
- 成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送
- 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标志
- 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用处理
- rabbitMQ的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的
如何保证消息可靠性-消息积压
消息积压
- 消费者宕机积压
- 消费者消费能力不足积压
- 发送者发送流量太大
- 上线更多的消费者,进行正常消费
- 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理
接口幂等性
一、什么是幂等性
接口幂等性就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用;比如说支付场景,用户购买了商品支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额返发现多扣钱了,流水记录也变成了两条...,这就没有保证接口的幂等性。
二、哪些情况需要防止
用户多次点击按钮
用户页面回退再次提交
微服务互相调用,由于网络问题,导致请求失败。feign 触发重试机制其他业务情况
三、什么情况下需要幂等
以 SQL 为例,有些操作是天然幂等的。
SELECT * FROM table WHER id=?,无论执行多少次都不会改变状态,是天然的幂等。
UPDATE tab1 SET col1=1 WHERE col2=2,无论执行成功多少次状态都是一致的,也是幂等操作。
delete from user where userid=1,多次操作,结果一样,具备幂等性
insert into user(userid,name) values(1,‘a’) 如 userid 为唯一主键,即重复操作上面的业务,只会插入一条用户数据,具备幂等性。
UPDATE tab1 SET col1=col1+1 WHERE col2=2,每次执行的结果都会发生变化,不是幂等的。
insert into user(userid,name) values(1,‘a’) 如 userid 不是主键,可以重复,那上面业务多次操作,数据都会新增多条,不具备幂等性。
四、幂等解决方案
1、token 机制
1、服务端提供了发送 token 的接口。我们在分析业务的时候,哪些业务是存在幂等问题的,就必须在执行业务前,先去获取 token,服务器会把 token 保存到 redis 中。
2、然后调用业务接口请求时,把 token 携带过去,一般放在请求头部。
3、服务器判断 token 是否存在 redis 中,存在表示第一次请求,然后删除 token,继续执行业务。
4、如果判断 token 不存在 redis 中,就表示是重复操作,直接返回重复标记给 client,这样就保证了业务代码,不被重复执行。
危险性:
1、先删除 token 还是后删除 token;
- (1) 先删除可能导致,业务确实没有执行,重试还带上之前 token,由于防重设计导致,请求还是不能执行。
- (2) 后删除可能导致,业务处理成功,但是服务闪断,出现超时,没有删除 token,别人继续重试,导致业务被执行两边
- (3) 我们最好设计为先删除 token,如果业务调用失败,就重新获取 token 再次请求。
2、Token 获取、比较和删除必须是原子性
-
(1) redis.get(token) 、token.equals、redis.del(token)如果这两个操作不是原子,可能导致,高并发下,都 get 到同样的数据,判断都成功,继续业务并发执行
-
(2) 可以在 redis 使用 lua 脚本完成这个操作
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
2、各种锁机制
1、数据库悲观锁
select * from xxxx where id = 1 for update;
悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,需要根据实际情况选用。另外要注意的是,id 字段一定是主键或者唯一索引,不然可能造成锁表的结果,处理起来会非常麻烦。
2、数据库乐观锁
这种方法适合在更新的场景中,
update t_goods set count = count -1 , version = version + 1 where good_id=2 and version = 1
根据 version 版本,也就是在操作库存前先获取当前商品的 version 版本号,然后操作的时候带上此 version 号。我们梳理下,我们第一次操作库存时,得到 version 为 1,调用库存服务version 变成了 2;但返回给订单服务出现了问题,订单服务又一次发起调用库存服务,当订单服务传如的 version 还是 1,再执行上面的 sql 语句时,就不会执行;因为 version 已经变为 2 了,where 条件就不成立。这样就保证了不管调用几次,只会真正的处理一次。
乐观锁主要使用于处理读多写少的问题
3、业务层分布式锁
如果多个机器可能在同一时间同时处理相同的数据,比如多台机器定时任务都拿到了相同数据处理,我们就可以加分布式锁,锁定此数据,处理完成后释放锁。获取到锁的必须先判断这个数据是否被处理过。
3、各种唯一约束
1、数据库唯一约束
插入数据,应该按照唯一索引进行插入,比如订单号,相同的订单就不可能有两条记录插入。我们在数据库层面防止重复。
这个机制是利用了数据库的主键唯一约束的特性,解决了在 insert 场景时幂等问题。但主键的要求不是自增的主键,这样就需要业务生成全局唯一的主键。
如果是分库分表场景下,路由规则要保证相同请求下,落地在同一个数据库和同一表中,要不然数据库主键约束就不起效果了,因为是不同的数据库和表主键不相关。
2、redis set 防重
很多数据需要处理,只能被处理一次,比如我们可以计算数据的 MD5 将其放入 redis 的 set,每次处理数据,先看这个 MD5 是否已经存在,存在就不处理。
4、防重表
使用订单号 orderNo 做为去重表的唯一索引,把唯一索引插入去重表,再进行业务操作,且他们在同一个事务中。这个保证了重复请求时,因为去重表有唯一约束,导致请求失败,避免了幂等问题。这里要注意的是,去重表和业务表应该在同一库中,这样就保证了在同一个事务,即使业务操作失败了,也会把去重表的数据回滚。这个很好的保证了数据一致性。
之前说的 redis 防重也算
5、全局请求唯一 id
调用接口时,生成一个唯一 id,redis 将数据保存到集合中(去重),存在即处理过。可以使用 nginx 设置每一个请求的唯一 id;
proxy_set_header X-Request-Id $request_id;
分布式事务
一、本地事务
1、事务的基本性质
数据库事务的几个特性:原子性(Atomicity )、一致性( Consistency )、隔离性或独立性( Isolation)和持久性(Durabilily),简称就是 ACID;
- 原子性:一系列的操作整体不可拆分,要么同时成功,要么同时失败
- 一致性:数据在事务的前后,业务整体一致。
- 隔离性:事务之间互相隔离。
- 持久性:一旦事务成功,数据一定会落盘在数据库。
在以往的单体应用中,我们多个业务操作使用同一条连接操作不同的数据表,一旦有异常,我们可以很容易的整体回滚;
2、事务的隔离级别
-
READ UNCOMMITTED(读未提交)
该隔离级别的事务会读到其它未提交事务的数据,此现象也称之为脏读。
-
READ COMMITTED(读提交)
一个事务可以读取另一个已提交的事务,多次读取会造成不一样的结果,此现象称为不可重复读问题,Oracle 和 SQL Server 的默认隔离级别。
-
REPEATABLE READ(可重复读)
该隔离级别是 MySQL 默认的隔离级别,在同一个事务里,select 的结果是事务开始时时间点的状态,因此,同样的 select 操作读到的结果会是一致的,但是,会有幻读现象。MySQL的 InnoDB 引擎可以通过 next-key locks 机制(参考下文"行锁的算法"一节)来避免幻读。
-
SERIALIZABLE(序列化)
在该隔离级别下事务都是串行顺序执行的,MySQL 数据库的 InnoDB 引擎会给读操作隐式加一把读共享锁,从而避免了脏读、不可重读复读和幻读问题。
3、事务的传播行为
1、PROPAGATION_REQUIRED:如果当前没有事务,就创建一个新事务,如果当前存在事务,就加入该事务,该设置是最常用的设置。
2、PROPAGATION_SUPPORTS:支持当前事务,如果当前存在事务,就加入该事务,如果当前不存在事务,就以非事务执行。
3、PROPAGATION_MANDATORY:支持当前事务,如果当前存在事务,就加入该事务,如果当前不存在事务,就抛出异常。
4、PROPAGATION_REQUIRES_NEW:创建新事务,无论当前存不存在事务,都创建新事务。
5、PROPAGATION_NOT_SUPPORTED:以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
6、PROPAGATION_NEVER:以非事务方式执行,如果当前存在事务,则抛出异常。
7、PROPAGATION_NESTED:如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与 PROPAGATION_REQUIRED 类似的操作。
4、SpringBoot 事务关键点
1、事务的自动配置
TransactionAutoConfiguration
2、事务的坑
在同一个类里面,编写两个方法,内部调用的时候,会导致事务设置失效。原因是没有用到代理对象的缘故。
解决:
- 导入 spring-boot-starter-aop
- @EnableTransactionManagement(proxyTargetClass = true)
- @EnableAspectJAutoProxy(exposeProxy=true)
- AopContext.currentProxy() 调用方法
二、分布式事务
1、为什么有分布式事务
分布式系统经常出现的异常
机器宕机、网络异常、消息丢失、消息乱序、数据错误、不可靠的 TCP、存储数据丢失…
分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免。
2、CAP 定理与 BASE 理论
1、CAP 定理
CAP 原则又称 CAP 定理,指的是在一个分布式系统中
- 一致性(Consistency):
- 在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
- 可用性(Availability)
- 在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
- 分区容错性(Partition tolerance)
- 大多数分布式系统都分布在多个子网络。每个子网络就叫做一个区(partition)。分区容错的意思是,区间通信可能失败。比如,一台服务器放在中国,另一台服务器放在美国,这就是两个区,它们之间可能无法通信。
CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。
一般来说,分区容错无法避免,因此可以认为 CAP 的 P 总是成立。CAP 定理告诉我们,剩下的 C 和 A 无法同时做到。
分布式系统中实现一致性的 raft 算法、paxos
http://thesecretlivesofdata.com/raft/
2、面临的问题
对于多数大型互联网应用的场景,主机众多、部署分散,而且现在的集群规模越来越大,所以节点故障、网络故障是常态,而且要保证服务可用性达到 99.99999%(N 个 9),即保证P 和 A,舍弃 C。
3、BASE 理论
是对 CAP 理论的延伸,思想是即使无法做到强一致性(CAP 的一致性就是强一致性),但可以采用适当的采取弱一致性,即最终一致性。
BASE 是指
- 基本可用(Basically Available)
- 基本可用是指分布式系统在出现故障的时候,允许损失部分可用性(例如响应时间、功能上的可用性),允许损失部分可用性。需要注意的是,基本可用绝不等价于系统不可用。
- 响应时间上的损失:正常情况下搜索引擎需要在 0.5 秒之内返回给用户相应的查询结果,但由于出现故障(比如系统部分机房发生断电或断网故障),查询结果的响应时间增加到了 1~2 秒。
- 功能上的损失:购物网站在购物高峰(如双十一)时,为了保护系统的稳定性,部分消费者可能会被引导到一个降级页面。
- 基本可用是指分布式系统在出现故障的时候,允许损失部分可用性(例如响应时间、功能上的可用性),允许损失部分可用性。需要注意的是,基本可用绝不等价于系统不可用。
- 软状态( Soft State)
- 软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。分布式存储中一般一份数据会有多个副本,允许不同副本同步的延时就是软状态的体现。mysql replication 的异步复制也是一种体现。
- 最终一致性( Eventual Consistency)
- 最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况。
4、强一致性、弱一致性、最终一致性
- 从客户端角度,多进程并发访问时,更新过的数据在不同进程如何获取的不同策略,决定了不同的一致性。对于关系型数据库,要求更新过的数据能被后续的访问都能看到,这是强一致性。如果能容忍后续的部分或者全部访问不到,则是弱一致性。如果经过一段时间后要求能访问到更新后的数据,则是最终一致性
3、分布式事务几种方案
1)、2PC 模式
数据库支持的 2PC【2 phase commit 二阶提交】,又叫做 XA Transactions。
MySQL 从 5.5 版本开始支持,SQL Server 2005 开始支持,Oracle 7 开始支持。
其中,XA 是一个两阶段提交协议,该协议分为以下两个阶段:
第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交.
第二阶段:事务协调器要求每个数据库提交数据。
其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。
- XA 协议比较简单,而且一旦商业数据库实现了 XA 协议,使用分布式事务的成本也比较低。
- XA 性能不理想,特别是在交易下单链路,往往并发量很高,XA 无法满足高并发场景
- XA 目前在商业数据库支持的比较理想,在 mysql 数据库中支持的不太理想,mysql 的XA 实现,没有记录 prepare 阶段日志,主备切换回导致主库与备库数据不一致。
- 许多 nosql 也没有支持 XA,这让 XA 的应用场景变得非常狭隘。
- 也有 3PC,引入了超时机制(无论协调者还是参与者,在向对方发送请求后,若长时间未收到回应则做出相应处理)
2)、柔性事务-TCC 事务补偿型方案
刚性事务:遵循 ACID 原则,强一致性。
柔性事务:遵循 BASE 理论,最终一致性;
与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。
一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
二阶段 commit 行为:调用 自定义 的 commit 逻辑。
二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。
3)、柔性事务-最大努力通知型方案
按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对。这种方案主要用在与第三方系统通讯时,比如:调用微信或支付宝支付后的支付结果通知。这种方案也是结合 MQ 进行实现,例如:通过 MQ 发送 http 请求,设置最大通知次数。达到通
知次数后即不再通知。
案例:银行通知、商户通知等(各大交易业务平台间的商户通知:多次通知、查询校对、对账文件),支付宝的支付成功异步回调
4)、柔性事务-可靠消息+最终一致性方案(异步确保型)
实现:业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才会真正发送。
定时任务
一、定时任务
1、cron 表达式
语法:秒 分 时 日 月 周 年(Spring 不支持)
http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html
特殊字符:
,:枚举;
(cron="7,9,23 * * * * ?"):任意时刻的 7,9,23 秒启动这个任务;
-:范围:
(cron="7-20 * * * * ?"):任意时刻的 7-20 秒之间,每秒启动一次
*:任意;
指定位置的任意时刻都可以
/:步长;
(cron="7/5 * * * * ?"):第 7 秒启动,每 5 秒一次;
(cron="*/5 * * * * ?"):任意秒启动,每 5 秒一次;
?:(出现在日和周几的位置):为了防止日和周冲突,在周和日上如果要写通配符使用?
(cron="* * * 1 * ?"):每月的 1 号,启动这个任务;
L:(出现在日和周的位置)”,last:最后一个
(cron="* * * ? * 3L"):每月的最后一个周二
W:
Work Day:工作日
(cron="* * * W * ?"):每个月的工作日触发
(cron="* * * LW * ?"):每个月的最后一个工作日触发
#:第几个
(cron="* * * ? * 5#2"):每个月的第 2 个周 4
2、cron 示例
Expression | Meaning |
---|---|
0 0 12 * * ? | Fire at 12pm (noon) every day |
0 15 10 ? * * | Fire at 10:15am every day |
0 15 10 * * ? | Fire at 10:15am every day |
0 15 10 * * ? * | Fire at 10:15am every day |
0 15 10 * * ? 2005 | Fire at 10:15am every day during the year 2005 |
0 * 14 * * ? | Fire every minute starting at 2pm and ending at 2:59pm, every day |
0 0/5 14 * * ? | Fire every 5 minutes starting at 2pm and ending at 2:55pm, every day |
0 0/5 14,18 * * ? | Fire every 5 minutes starting at 2pm and ending at 2:55pm, AND fire every 5 minutes starting at 6pm and ending at 6:55pm, every day |
0 0-5 14 * * ? | Fire every minute starting at 2pm and ending at 2:05pm, every day |
0 10,44 14 ? 3 WED | Fire at 2:10pm and at 2:44pm every Wednesday in the month of March. |
0 15 10 ? * MON-FRI | Fire at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday |
0 15 10 15 * ? | Fire at 10:15am on the 15th day of every month |
0 15 10 L * ? | Fire at 10:15am on the last day of every month |
0 15 10 L-2 * ? | Fire at 10:15am on the 2nd-to-last last day of every month |
0 15 10 ? * 6L | Fire at 10:15am on the last Friday of every month |
0 15 10 ? * 6L | Fire at 10:15am on the last Friday of every month |
0 15 10 ? * 6L 2002-2005 | Fire at 10:15am on every last friday of every month during the years 2002, 2003, 2004 and 2005 |
0 15 10 ? * 6#3 | Fire at 10:15am on the third Friday of every month |
0 0 12 1/5 * ? | Fire at 12pm (noon) every 5 days every month, starting on the first day of the month. |
0 11 11 11 11 ? | Fire every November 11th at 11:11am. |
3、SpringBoot 整合
@EnableScheduling
@Scheduled
二、分布式定时任务
1、定时任务问题
1)、同时执行导致的重复
由于同样的服务会部署多个节点,多个节点的定时任务代码可能同时启动。将同样的事情做了多次
使用分布式锁。
2)、任务拆分并发执行
使用 ElasticJob
2、扩展-分布式调度
http://elasticjob.io/docs/elastic-job-cloud/00-overview/
Sentinel
1、简介
1、熔断降级限流
什么是熔断
A 服务调用 B 服务的某个功能,由于网络不稳定问题,或者 B 服务卡机,导致功能时间超长。如果这样子的次数太多。我们就可以直接将 B 断路了(A 不再请求 B 接口),凡是调用 B 的直接返回降级数据,不必等待 B 的超长执行。 这样 B 的故障问题,就不会级联影响到 A。
什么是降级
整个网站处于流量高峰期,服务器压力剧增,根据当前业务情况及流量,对一些服务和页面进行有策略的降级[停止服务,所有的调用直接返回降级数据]。以此缓解服务器资源的的压力,以保证核心业务的正常运行,同时也保持了客户和大部分客户的得到正确的相应。
异同:
相同点:
1、为了保证集群大部分服务的可用性和可靠性,防止崩溃,牺牲小我
2、用户最终都是体验到某个功能不可用
不同点:
1、熔断是被调用方故障,触发的系统主动规则
2、降级是基于全局考虑,停止一些正常服务,释放资源
什么是限流
对打入服务的请求流量进行控制,使服务能够承担不超过自己能力的流量压力
2、Sentinel 简介
官方文档:https://github.com/alibaba/Sentinel/wiki/%E4%BB%8B%E7%BB%8D
项目地址:https://github.com/alibaba/Sentinel
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
Sentinel 具有以下特征:
- 丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
- 完备的实时监控:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。
- 广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。
- 完善的 SPI 扩展点:Sentinel 提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。
集群
集群的目标
- 高可用,当一台服务器停止服务器后,对于业务及用户毫无影响。停止服务的原因可能由于网卡、路由器、机房、CPU负载过高、内存溢出、自然灾害等不可预期的原因导致,在很多时候也称单点问题。
- 突破数据量限制,一台服务器不能储存大量数据,需要多台分担,每个 存储一部分,共同存储完整集群数据。最好能做到互相备份,及时单节点故障,也能在其他节点找到数据。
- 数据备份容灾,单点故障后,存储的数据仍然可以在必得地方拉起。
- 压力分担,由于多个服务器都能完成各自一部分工作,所以尽量的避免了单点压力的存在
集群的基础形式
-
主从式
- 主从复制,同步方式
- 主从调度,控制方式
-
分片式
- 数据分片存储,片区之间备份
-
选主式
- 为了容灾选主
- 为了调度选主