准实时刷新集群中各节点本地缓存的解决方案

news2025/1/13 14:17:20

目录

背景

Redis发布订阅

MQ广播消息

配置中心Nacos,Zookeeper监听

注册中心获取服务节点ip端口接口调用

本地定时任务兜底


背景

我们在系统开发过程中,为了减少数据库和redis缓存的查询以提升接口性能,有时候会把一些常用的、变动不是很频繁的数据放到本地缓存,因为我们的服务是多节点部署的,在缓存的数据发生变动时,怎么快速的将整个集群中的各个节点上的数据同步更新呢?

这里总结了如下几种常用的解决方案:

  • Redis发布订阅
  • MQ广播消息
  • 配置中心nacos,zookeeper监听
  • 注册中心获取服务节点ip端口进行接口调用
  • 定时任务兜底

        下面介绍一下几种方案的实现思路,同时也给出了主要的实现代码,具体的刷新逻辑可以根据自己的需要进行实现。

Redis发布订阅

        借助redis的发布订阅机制,服务使用Redis注册一个channel监听,当有数据变更的时候往channel发布一个消息,这样集群中的各个节点都会收到这个消息执行本地缓存的刷新操作。

        测试代码如下所示:

@Resource
RedisTemplate redisTemplate;

@Test
public void test() throws InterruptedException {
	new Thread(()->refreshCacheListener()).start();
	new Thread(()->refreshCacheListener()).start();
	Thread.sleep(1000L);
	redisTemplate.convertAndSend("refreshCache","刷新本地缓存了");
	Thread.sleep(1000L);
	redisTemplate.convertAndSend("refreshCache","又刷新了");
}

private void refreshCacheListener() {
	RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
	connection.subscribe((message,pattern)->{
		log.info("刷新消息:thread:{},message:{}",Thread.currentThread().getId(),new String(message.getBody()));
		//TODO 根据收到的消息执行刷新本地缓存的逻辑
	},"refreshCache".getBytes());
}

运行输出如下:

刷新消息:thread:110,message:"刷新本地缓存了"
刷新消息:thread:111,message:"刷新本地缓存了"
刷新消息:thread:111,message:"又刷新了"
刷新消息:thread:110,message:"又刷新了"

MQ广播消息

        借助MQ的广播消息的机制,当缓存数据变更的时候发布一个广播消息,集群中的各个节点都会收到这个消息执行本地缓存的刷新操作。

        以RabbitMQ实现广播消息举例:通过rabbitmq的fanout广播类型交换机,然后程序中会声明一个临时队列绑定到广播交换机上,这样每个机器都会声明一个不同名字的临时队列绑定到广播交换机上,往广播交换机上发送一个消息,所有的机器都会消费到该消息,临时队列不会持久化,服务重启或者停止后,原先的临时队列会被删除,然后重新创建新的临时队列进行绑定。 

        RabbitMQ广播消息代码示例如下 

@Slf4j
@Component
public class RefreshCacheConsumer {
 
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(), //注意这里不要定义队列名称,系统会随机产生
            exchange = @Exchange(value = "refreshCacheExchange",type = ExchangeTypes.FANOUT)
    ))
    public void process(String payload) {
        log.info("刷新广播消息receive:{}",payload)
        //TODO 根据收到的消息执行刷新本地缓存的逻辑
    }
}

配置中心Nacos,Zookeeper监听

Nacos配置中心 

        借助Nacos配置中心的监听机制实现本地缓存刷新的原理也和上面类似,服务监听一个配置中心文件的内容变更,当缓存数据变更时发布新的配置内容,大致代码如下所示

@Slf4j
@Component
public class CacheRefresher implements InitializingBean {
    @Resource
    private NacosConfigManager nacosConfigManager;
 
    @Override
    public void afterPropertiesSet() throws Exception {
        nacosConfigManager.getConfigService().addListener("nacos-refresh-cache.properties", "DEFAULT_GROUP",
            new Listener() {
 
                @Override
                public Executor getExecutor() {
                    return Executors.newSingleThreadExecutor();
                }
 
                @Override
                public void receiveConfigInfo(String configInfo) {
                    log.info("刷新缓存recieve:{}", configInfo);
					//TODO 根据收到的消息执行刷新本地缓存的逻辑
                }
        });
    }
}


public class ConfigServerDemo {

    public static void main(String[] args) throws Exception {
        String serverAddr = "localhost";
        String dataId = "nacos-refresh-cache.properties";
        String group = "DEFAULT_GROUP";
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
        ConfigService configService = NacosFactory.createConfigService(properties);
       
        //缓存变更,发布配置
        configService.publishConfig(dataId,group,"refreshCache");

        Thread.sleep(3000);
        content = configService.getConfig(dataId, group, 5000);
        System.out.println(content);
    }
}

Zookeeper节点监听 

        利用zookeeper的watch机制,服务监听节点的数据变更事件,在事件中调用刷新缓存的逻辑,当有缓存数据变更时更新zookeeper对应节点的数据即可。关于zookeeper的使用如果不太了解的话,可以看下我前面的一篇介绍zookeeper的curator客户端使用的博客 (8)zookeeper开源客户端Curator使用介绍

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
 
public class TestNodeCacheListener {
 
	/** zookeeper地址 */
	static final String CONNECT_ADDR = "192.168.74.4:2181,192.168.74.5:2181,192.168.74.6:2181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	@SuppressWarnings("resource")
	public static void main(String[] args) throws Exception {
		//1 重试策略:初试时间为1s 重试10次
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		CuratorFramework client = CuratorFrameworkFactory.builder()
				.connectString(CONNECT_ADDR)
				.connectionTimeoutMs(SESSION_OUTTIME)
				.retryPolicy(retryPolicy)
				.build();
		client.start();
		
		//dataIsCompressed:true/false 表示是否对节点数据进行压缩
		final NodeCache cache = new NodeCache(client, "/super", false);
		//start(true) 里面的true表示对节点进行初始化
		cache.start(false);
		
		cache.getListenable().addListener(new NodeCacheListener() {
			
			@Override
			public void nodeChanged() throws Exception {
				ChildData data = cache.getCurrentData(); 
				if (null != data) { 
					System.out.println("路径为:" + cache.getCurrentData().getPath());
					System.out.println("数据为:" + new String(cache.getCurrentData().getData()));
					System.out.println("状态为:" + cache.getCurrentData().getStat());
					System.out.println("---------------------------------------");
				} else { 
					System.out.println("节点被删除!"); 
				}
 
			}
		});
		
		Thread.sleep(1000);
		client.create().forPath("/super", "123".getBytes());
		
		Thread.sleep(1000);
		client.setData().forPath("/super", "456".getBytes());
		
		Thread.sleep(1000);
		client.delete().forPath("/super");
		
		Thread.sleep(Integer.MAX_VALUE);
	}
}

        程序输出如下所示: 

路径为:/super
数据为:123
状态为:429496729934,429496729934,1542033727201,1542033727201,0,0,0,0,3,0,429496729934
 
---------------------------------------
路径为:/super
数据为:456
状态为:429496729934,429496729935,1542033727201,1542033728271,1,0,0,0,3,0,429496729934
 
---------------------------------------
节点被删除!

注册中心获取服务节点ip端口接口调用

        上面的几个方案都是异步更新的,无法立即同步获取缓存刷新的结果。我们的服务都是集群部署的,可以提供一个缓存刷新的接口,缓存变更时获取集群所有节点的ip和端口列表,挨个调用缓存刷新接口,这样我们就可以同步获取到缓存刷新的结果了,如果刷新失败可以进行重试操作。

        我们项目开发中一般都会使用注册中心,通过注册中心可以获取服务的所有节点的ip和端口列表,我们以Nacos为例,大致代码如下所示

@Resource
NacosRegistration registration;
@Resource
RestTemplate restTemplate;

@Test
public void test2() throws Exception {
	//变更的缓存key
	String merchantNo="T001";
	NamingService namingService = registration.getNacosNamingService();
	List<Instance> instanceList = namingService.getAllInstances("pay-service");
	if(CollectionUtils.isNotEmpty(instanceList)){
		//遍历集群中的服务列表,挨个调用接口进行缓存刷新
		for(Instance instance:instanceList){
			String url = new StringBuilder()
					.append("http://")
					.append(instance.getIp())
					.append(":")
					.append(instance.getPort())
					.append("/refreshCache?merchantNo=")
					.append(merchantNo).toString();
			log.info("当前服务节点,url:{}",url);
			ResponseEntity<Response> entity = restTemplate.getForEntity(url, Response.class);
			Response response = entity.getBody();
		}
	}
}

本地定时任务兜底

        上面几种解决方法或多或少都有失败的可能性,可以通过定时任务刷新本地缓存进行兜底。

        定时任务可以通过spring schedule、Timer、quartz等方案进行实现。

        spring schedule定时任务大致代码如下所示:

/**
 * 初始延迟5分钟,每执行完一次后,固定间隔10分钟执行一次<br/>
 * 
 */
@Scheduled(initialDelay = 1000*60*5,fixedDelay = 1000*60*10)
@Override
public Response refreshCacheTask() {
	log.info("刷新缓存信息start===============================");
	
	log.info("刷新缓存信息信息end================================="));
	return Response.success();
}

        我所了解的几种常见的刷新本地缓存的方案介绍完了, 如果还有新的方案,欢迎评论区补充哈。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/615647.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[数据结构 -- C语言] 堆实现Top-K问题,原来王者荣耀的排名是这样实现的,又涨知识了

目录 1、什么是Top-K问题&#xff1f; 1.1 Top-K基本思路 2、Top-K问题逻辑分析 2.1 建堆&#xff0c;大小为K的小堆 2.2 将剩余的N - K 个元素依次与堆顶元素比较&#xff0c;大于就替换 2.3 打印堆 3、TopK实现代码 4、Top-K问题完整代码 结果展示&#xff1a; TopK…

做IT运维的,哪有人不疯的

网飞最新的剧集《怒呛人生》大受欢迎的一大原因就是&#xff1a;发疯。 在2023年&#xff0c;发疯已经从一种人身攻击&#xff0c;拯救语言匮乏的恶评转移成一个中性词&#xff0c;在某些语境下&#xff0c;等同于冒犯、破罐子破摔。连快乐都不敢的东亚人&#xff0c;为啥发疯…

C++:智能指针

目录 一. 智能指针的概念及原理 1.1 什么是智能指针 1.2 智能指针的原理 二. 智能指针的拷贝问题 三. auto_ptr 3.1 auto_ptr的拷贝构造和赋值问题 3.2 auto_ptr的模拟实现 四. unique_ptr 五. shared_ptr 5.1 shared_ptr的常用接口 5.2 shared_ptr的拷贝构造和赋值…

软件工程导论(四)软件编码测试与维护

一、软件编程 1.1良好的编程习惯 变量命名有意义并且使用统一的命名规则 编写自文档代码&#xff08;序言性注释 or 行内注释&#xff09; 提前进行可维护性考量&#xff08;可以用常量的方式存在的数值最好以变量的方式存在&#xff09; 良好的视觉安排可以提高代码的可读性(…

ChatGPT训练一次要耗多少电?

如果开个玩笑&#xff1a;问ChatGPT最大的贡献是什么&#xff1f; “我觉得它对全球变暖是有一定贡献的。”知名自然语言处理专家、计算机科学家吴军在4月接受某媒体采访时如是说。 随着ChatGPT引爆AIGC&#xff0c;国内外巨头纷纷推出自己的AI大模型&#xff0c;大家为人工智…

2023 开放原子全球开源峰会“开发者之夜”高能剧透!

开发者之夜~即将高燃启动 最潮&#xff01;最嗨&#xff01;最青春&#xff01; 肆意&#xff01;亲切&#xff01;嗨 FUN 派&#xff01; 这是一场面向开发者的线下狂欢&#xff01; 也是一场精心准备的答谢盛宴&#xff01; 更是一场开源圈的老友聚会&#xff01; 开发者之夜…

IP地址中的子网掩码和CIDR

将常规的子网掩码转换为二进制&#xff0c;将发现子网掩格式为连续的二进制1跟连续0&#xff0c;其中子网掩码中为1的部分表示网络ID&#xff0c;子网掩中为0的表示主机ID。比如255.255.0.0转换为二进制为11111111 11111111 00000000 00000000。 ​ 在前面所举的例子中为什么不…

Yakit: 集成化单兵安全能力平台使用教程·Web Fuzzer篇

Yakit: 集成化单兵安全能力平台使用教程Web Fuzzer篇 1.数据包共享2.数据包扫描3.使用Web Fuzzer进行模糊测试4.常用 fuzz 标签5.热加载Fuzz1.数据包共享 分享/导入功能可用于信息分享,分享可以设置有效时长和分享密码,凭分享id和密码可以导入分享者的请求包 注意:数据包是…

uni-app 自定义组件之星级评价分数

效果图&#xff1a; 1.自定义组件starsRating.vue文件&#xff08;放在components文件夹内&#xff09; 代码截图&#xff1a; 对应的代码&#xff1a; <image click“btnStars1” class“starsicon” :src“starsObject[0]” mode“widthFix”> <image click“…

redis基础-----安装及使用场景基础操作

需要使用的网址 Redis中文网 Download | Redis 数据库及缓存架构选型网址&#xff1a; DB-Engines Ranking - popularity ranking of database management systems 常识&#xff1a; 存储方面&#xff1a; 磁盘&#xff1a; 1&#xff0c;寻址&#xff1a;ms 2&#xff…

达梦数据库读写分离集群异常测试(⾼可⽤)及双主(类似脑裂)问题处理

目录 测试前准备... 4 断电测试... 4 一、备库204断电... 4 二、断电数据新增测试... 5 1、备库204断电... 5 2、主库200新增数据&#xff0c;203备库查询正常... 5 3、204服务器启动并启动守护进程&#xff0c;测试&#xff0c;正常... 6 三、主库断电测试... 6 1、主…

python使用requests+excel进行接口自动化测试(建议收藏)

前言 在当今的互联网时代中&#xff0c;接口自动化测试越来越成为软件测试的重要组成部分。Python是一种简单易学&#xff0c;高效且可扩展的语言&#xff0c;自然而然地成为了开发人员的首选开发语言。而requests和xlwt这两个常用的Python标准库&#xff0c;能够帮助我们轻松…

archive log list :报错的解决办法

装好oracle数据库之后&#xff0c; 没事在练习sql语句&#xff0c; 看看一些基本的字典表啊啥的 但是当我执行 archive log list这个的时候居然给我报错&#xff0c; 这句话的意思是&#xff1a; 查看数据库的备份和恢复策略&#xff0c;并确定归档文件的具体位置&#xff…

小觅相机去畸变--Apriltag标签检测--Apriltag_ros

小觅相机型号:深度版,视场角50 ROS版本:nodelet 1.使用Calibrator获取相机的标定参数,或者用小觅相机自带的sdk获取: calibrator可以参考:ROS系统-摄像头标定camera calibration 小觅自带sdk参考:获取图像标定参数 或者小觅的ROS包编译后,会生成 这个get_img_para…

请求从前端到后端跟踪调试

请求慢的原因很多&#xff0c;当出现前端反应接口慢时&#xff0c;而通过后端日志查看请求处理时间并不慢时&#xff0c;往往会手足无措&#xff0c;当面对网络问题出现手足无措时&#xff0c;这就是在提醒你该抓包分析了&#xff0c;那么一般如何根据抓包文件去分析慢请求呢&a…

【MySQL】数据库报错集

一. 报错列表 1.1. Out of range value for column “xx” at row x 阐述&#xff1a;第 “x” 行的列 “xx” 超出范围 原因&#xff1a;建表时&#xff0c;类型bigint且长度20&#xff0c;如下字段的值超过其可输入的范围了 解决&#xff1a;修改该值为该列所设的长度即可 …

Cookie和Session原理详解

目录 前言 Cookie Session 会话机制 Cookie和Session的区别 Servlet中对Session和Cookie的封装 代码实例&#xff1a;实现用户登录 约定前后端交互的接口 前端页面&#xff1a; 后端实现 login index 总结 前言 在web的发展史中&#xff0c;我们知道浏览器和服务…

【模拟电子技术】理论考核回顾

写在前面&#xff1a; 1&#xff1a;好好学习&#xff0c;早日学会看B站华成英老师的课&#xff0c;不然就会像我一样最后快挂科了。 2&#xff1a;杂谈&#xff1a;我觉得一个“智者”可以因为我不会做题来侮辱我的智商&#xff0c;但是不能借此侮辱我没好好复习。 3&#…

AI换脸(支持视频换脸,支持cpu、低算力)【附代码】

可直接选择一张人脸去替换另一张图片或者视频中的人脸。本项目仅提供人脸替换部分&#xff0c;不需要数据集&#xff0c;不用训练&#xff01; 目录 项目说明 环境说明 准备工作 如何使用 免责声明 项目说明 本项目参考源码&#xff1a;GitHub - s0md3v/roop: one-click…

[数据结构 -- 手撕排序算法第一篇] 堆排序,一篇带你搞懂堆排序

目录 1、堆的应用 -- 堆排序 1.1 堆排序的思路分析 2、建堆 2.1 向上调整建堆&#xff1a;O(N*logN) 2.1.1 向上调整代码 2.1.2 向上调整建堆代码 2.2 向下调整建堆&#xff1a;O(N) 2.2.1 向下调整代码 2.2.2 向下调整建堆代码 3、堆排序实现代码 4、堆排序测试 1、…