文章目录
- 优化
- 采用SSD硬盘-提升磁盘读写的速度
- 控制 redis 的内存在10G以内,防止fork耗时太长
- fork 注意事项
- 设置内存淘汰策略
- vm.overcommit_memory=1
- 尽可能地使用 hash 哈希存储
- 参数调优
- swapiness
- ulimit
- TCP backlog
- 客户端缓冲优化
- 碎片优化
- 问题
- 缓存与数据库数据不一致
- 将 热点库存分段
- 穿透的解决办法
- redis实现 黑名单
- 如何安全的减库存
- list可以做分页
优化
采用SSD硬盘-提升磁盘读写的速度
控制 redis 的内存在10G以内,防止fork耗时太长
fork 注意事项
fork 时,子进程 需要 拷贝 父进程的 空间内存页表
会 耗时避免复制风暴,采用树状,不要用链状
设置内存淘汰策略
allkeys-lru # 内存淘汰策略
allkeys-lfu #--->保证 redis 存储的都是 热点数据
vm.overcommit_memory=1
防止
OOM
因此一般需要将这个参数的值调整为1
,意思是 把所有可用的物理内存都允许分配给你
只要 有内存就给你来用,这样可以
避免申请内存失败
的问题
cat /proc/sys/vm/overcommit_memory
echo 'vm.overcommit_memory=1' >> /etc/sysctl.conf
sysctl vm.overcommit_memory=1
sysctl -p
尽可能地使用 hash 哈希存储
参数调优
swapiness
cat /proc/version # 查看Linux 内核版本
- 如果版本
<3.5
—>swapiness设置为0
,宁愿swap 也不会 oom ,killer(杀掉进程) - 如果版本
>=3.5
—>swapiness设置为1
,宁愿swap 也不会 oom ,killer
保证 redis 不被杀掉
echo 0> /proc/sys/vm/swappiness
echo vm.swapiness=0 >> /etc/sysctl.conf
ulimit
是用来控制linux上的最大文件链接数
的,默认值可能是1024,一般肯定是不够的
因为你在大量频繁的读写磁盘文件
的时候,或者是进行网络通信的时候,都会跟这个参数有关系
对于一个中间件系统而言肯定是不能使用默认值的
如果你采用默认值,很可能在线上
会出现如下错误:error: too many openfiles
。
echo 'ulimit -n 1000000' >> /etc/profile
ulimit -n 10032 10032 # 不同的版本会不一样
TCP backlog
cat /proc/sys/net/core/somaxconn
echo 511 > /proc/sys/net/core/somaxconn
客户端缓冲优化
客户端缓存
是很多内存异常增长的罪魁祸首
,大部分都是普通客户端输出缓冲区异常增长导致
我们先了解下执行命令的过程,客户端发送一个或者通过pipline 发送一组请求命令给服务端,然后等待服务端的响应。
一般客户端使用阻塞模式
来等待服务端响应,数据在被客户端读取前,数据是存放在客户端缓存区
.输出缓冲区的大小超过了
硬性限制的大小
时,这个客户端会被立即关闭
client-output-buffer-limit
- #表示这个客户端的输出缓冲区不限制大小
client-output-buffer-limit normal 0 0 0
- 设置
从服务器
客户端的硬性
限制为512MB
,软性
限制为128MB
, 软性限制的时长为120s
client-output-buffer-limit slave 512mb 128mb 120
优化建议:
- 应用 不要设计大 key,大 key 尽量拆分。
- 服务端的普通
客户端输出缓存区
通过参数设置,因为 内存告警的阈值大部分是使用率 80% 开始,实际 建议参数可以设置为实例内存的 5%~15%
左右,最好不要超过 20%,避免 OOM。 - 非 特殊情况下
避免使用 monitor 命令或者 rename
该命令。 - 在使用 pipline 的时候,pipeline 不能封装过多的命令,特别是一些返回结果集较多的命令更应该少封装。
- 主从复制
输出缓冲区大小设置参考
: - 缓冲区大小=( 主库写入命令速度 * 操作大小 - 主从库间 网络传输命令速度 * 操作大小 ) * 2
碎片优化
碎片优化 可以
降低内存使用率,提高访问效率
- 手动整理
memery purge
- 自动整理
- 【activedefrag yes 】:
启用 自动碎片 清理开关
- 【active-defrag-ignore-bytes 100mb】:内存碎片空间
达到 多少
才 开启碎片整理 - 【active-defrag-threshold-lower 10】:碎片率达到
百分之多少
才 开启碎片整理 - 【active-defrag-threshold-upper 100 】:内存碎片率
超过多少
,则 尽最大努力整理(占用最大资源去做碎片整理)
问题
缓存与数据库数据不一致
- 先更新缓存,再更新数据库
更新缓存成功,更新数据库失败,会造成数据不一致,很严重!!!这种情况绝对不允许!!
- 先更新数据库,再删除缓存,可以进行重试,使用定时任务活着mq
延时双删
的问题:
延时双删的第二次删除的时间
,应该大于主从复制的时
间,因为有读请求过来时,会去 从库执行,避免了脏读~
休眠时间的确定,在读
数据业务逻辑的耗时基础上加百毫秒
- 设置
缓存过期时间
进行兜底~~
吞吐量降低----> 开启线程池进行删除,或使用MQ
将 热点库存分段
redis 的big key
造成 主线程
阻塞
,所有请求可能超时,超时的越来越多,当新的请求越来越多,造成 redis 连接池资源耗尽
采用异步删除 unlink key
穿透的解决办法
业务规则
过滤 + 布隆过滤器 + Nginx 黑名单
redis实现 黑名单
防止恶意用户,恶意攻击 : 一分钟调用下单超过50次 ,加入临时黑名单 ,10分钟后才可继续操作,一小时允许一次跨时段弱校验
。使用reids的list结构,过期时间一小时
/**
* @param token
* @return true 可下单
*/
public boolean judgeUserToken(String token) {
//获取用户下单次数 1分钟50次
String blackUser = "shop-oms-submit-black-" + token;
if (redis.get(blackUser) != null) {
return false;
}
String keyCount = "shop-oms-submit-count-" + token;
Long nowSecond = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
//每一小时清一次key 过期时间1小时
Long count = redis.rpush(keyCount, String.valueOf(nowSecond), 60 * 60);
if (count < 50) {
return true;
}
//获取第50次的时间
List<String> secondString = redis.lrange(keyCount, count - 50, count - 49);
Long oldSecond = Long.valueOf(secondString.get(0));
//now > oldSecond + 60 用户可下单
boolean result = nowSecond.compareTo(oldSecond + 60) > 0;
if (!result) {
//触发限制,加入黑名单,过期时间10分钟
redis.set(blackUser, String.valueOf(nowSecond), 10 * 60);
}
return result;
}
如何安全的减库存
redis + mq + mysql 保证库存安全,满足高并发处理,但相对复杂。
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@Slf4j
public class StockService {
@Resource
private StringRedisTemplate redis;
/**
* 扣库存操作,秒杀的处理方案
*
* @param orderCode
* @param skuCode
* @param num
* @return
*/
public boolean subtractStock(String orderCode, String skuCode, Integer num) {
String key = "shop-product-stock" + skuCode;
Object value = redis.opsForValue ().get (key);
if (value == null) {
//前提 提前将商品库存放入缓存 ,如果缓存不存在,视为没有该商品
return false;
}
//先检查 库存是否充足
Integer stock = (Integer) value;
if (stock < num) {
log.info ("库存不足");
return false;
}
//不可在这里直接操作数据库减库存,否则导致数据不安全
//因为此时可能有其他线程已经将redis的key修改了
//redis 减少库存,然后才能操作数据库
Long newStock = redis.opsForValue ().increment (key, -num.longValue ());
//库存充足
if (newStock >= 0) {
log.info ("成功抢购");
//TODO 真正扣库存操作 可用MQ 进行 redis 和 mysql 的数据同步,减少响应时间
// 发送信息到mq ,进行redis和mq的同步
} else {
//库存不足,需要增加刚刚减去的库存
redis.opsForValue ().increment (key, num.longValue ());
log.info ("库存不足,并发");
return false;
}
return true;
}
}
increment 是个原子操作,也可以使用lua脚本
list可以做分页
比如前多少条数据,评论也可以存,因为本身是个集合 lrange
、 lpush
、 rpush