数据同步的艺术:探索PostgreSQL和Redis的一致性策略

news2024/12/23 22:58:30

作者:后端小肥肠

🍇 我写过的文章中的相关代码放到了gitee,地址:xfc-fdw-cloud: 公共解决方案

🍊 有疑问可私信或评论区联系我。

🥑  创作不易未经允许严禁转载。

1. 前言

在当今高度数字化的世界中,应用程序需要处理海量数据并提供快速响应。为了应对这一挑战,使用Redis作为缓存来减少对数据库的直接访问已经成为一种广泛采用的策略。这种方法不仅能够显著提升应用性能,还能有效降低数据库负载。然而,当我们将PostgreSQL作为主数据库,Redis作为缓存层时,一个关键问题随之而来:如何确保这两个系统之间的数据保持一致?

本文将深入探讨PostgreSQL和Redis之间的数据同步策略,旨在帮助开发者和架构师构建既高效又可靠的数据架构。本文将:

  1. 分析导致数据不一致的根本原因
  2. 探讨各种同步策略的优缺点
  3. 提供实际可行的解决方案

通过本文,读者将获得设计和实现健壮的数据同步机制所需的相关知识,从而在高并发环境中构建更可靠的应用系统。

2. 数据为什么会不一致?

在现代高并发应用中,确保数据的一致性是一个复杂而关键的挑战。让我们深入探讨导致数据不一致的主要原因,以及为什么在PostgreSQL和Redis之间同步数据如此重要。

2.1 数据库:并发访问的瓶颈

在高并发的业务场景下,数据库往往成为系统中最薄弱的环节:

  • 连接限制数据库能够同时处理的连接数是有限的,高并发下很容易达到这个限制。
  • 锁竞争:并发写操作可能导致严重的锁竞争,大幅降低系统吞吐量。
  • 资源消耗每个数据库操作都消耗CPU和I/O资源,并发请求增加会导致资源迅速耗尽。
  • 查询性能复杂查询在高并发下可能导致数据库响应时间显著增加。

这些因素综合导致数据库在高并发场景下性能急剧下降,成为整个系统的瓶颈。

2.2 Redis:缓解数据库压力的关键

为了应对数据库的局限性,引入Redis作为缓存层成为一种普遍策略:

  • 减轻数据库负载将热点数据存储在Redis中,大幅减少对数据库的直接访问。
  • 提高响应速度Redis的内存存储特性使得数据访问速度远快于传统数据库。
  • 支持高并发Redis的单线程模型和非阻塞I/O使其能够高效处理大量并发请求。
  • 数据结构多样性Redis提供了多种数据结构,适用于不同的缓存场景。

使用Redis作为缓冲层,让请求首先访问Redis而不是直接访问PostgreSQL等数据库,可以显著提升系统的整体性能和并发处理能力。

2.3 数据一致性挑战

虽然引入Redis缓存解决了许多性能问题,但也带来了新的挑战,特别是在数据更新方面:

  1. 读取操作从Redis缓存读取数据通常不会引发严重的一致性问题,因为它只涉及单一数据源。
  2. 写入操作当需要更新数据时,问题变得复杂。我们需要同时更新数据库(PostgreSQL)和缓存(Redis),这个过程中很容易出现数据不一致:
    • 更新时序数据库和缓存的更新顺序会影响数据一致性。
    • 部分失败如果更新过程中发生错误,可能导致数据库和缓存状态不一致。
    • 并发更新多个客户端同时更新同一数据可能导致意外的结果。
  3. 缓存失效决定何时使缓存中的数据失效也是一个挑战,过早失效会降低缓存效果,过晚失效则可能导致数据不一致。

2.4 典型的数据读取流程

为了更好地理解这个问题,让我们看一下典型的数据读取流程:

  1. 应用程序接收到数据读取请求。
  2. 首先检查Redis缓存中是否存在所需数据。
  3. 如果Redis中存在数据(缓存命中),直接返回缓存数据。
  4. 如果Redis中不存在数据(缓存未命中),则从PostgreSQL数据库读取数据。
  5. 将从数据库读取的数据写入Redis缓存,以便后续访问。
  6. 返回数据给应用程序。

这个流程在处理读取操作时通常运作良好,但一旦涉及数据更新,就需要额外的机制来确保PostgreSQL和Redis之间的数据一致性。

在接下来的章节中,我们将深入探讨各种同步策略,以及如何在保证数据一致性的同时,维持系统的高性能和可扩展性。

3. redis与Postgres数据同步策略介绍

3.1 先删除缓存后更新数据库

在这种策略中,系统首先删除Redis中的缓存,然后更新数据库。这种方法可能引起如下问题:

  1. 读取过程中的数据不一致如果在缓存被删除后、数据库更新之前,另一个线程尝试读取数据,它会发现缓存为空。此时,它可能会从数据库中读取旧数据并将其写回缓存,导致所谓的脏缓存
  2. 数据更新后的不一致性当数据库最终更新后,缓存中的数据已经是旧数据,从而导致缓存与数据库之间的不一致。

3.2 先更新数据库后删除缓存

这种策略先对数据库进行更新,然后再删除缓存。这样做的潜在问题包括:

  1. 事务失败导致的缓存未删除如果在数据库更新后、删除缓存之前,执行更新的线程出现故障,这会导致缓存未被及时删除。
  2. 并发导致的数据不一致由于数据库的写入和缓存的删除是两个独立操作,无法保证它们之间的执行顺序,可能会导致读取操作在缓存删除前读到旧缓存,进而出现数据不一致。

3.3 延时双删策略

为了解决上述提到的同步问题,可以采用延时双删策略:

  1. 删除缓存首先删除Redis中的相关数据缓存。
  2. 更新数据库然后更新数据库,确保数据的准确性。
  3. 休眠延迟执行短暂的延时(例如500毫秒),确保所有相关数据库操作都已完成。
  4. 再次删除缓存最后再次删除缓存,确保在数据更新期间任何新生成的缓存也会被清除。

伪代码如下:

function delayDoubleDelete(key, updateStatement)
    // 第一步:删除Redis中的缓存
    Redis.delete(key)

    // 第二步:更新数据库
    Database.execute(updateStatement)

    // 第三步:执行短暂的延时,例如500毫秒
    sleep(500)

    // 第四步:再次删除Redis中的缓存
    Redis.delete(key)
end function

3.4. 使用WAL日志实现缓存同步(本文重点!!)

3.4.1. 使用WAL日志实现更新缓存的优势

在实际开发中,使用WAL(Write-Ahead Logging)日志同步Redis和PostgreSQL是一种非常高效的策略。相比于传统的缓存更新方法,WAL日志带来了许多优势:

1. 解耦操作,降低延迟

通过WAL日志,我们可以将数据库更新和缓存更新解耦。这意味着数据库操作可以独立完成,而不需要等待缓存更新,从而显著降低了响应时间,提升了系统的整体性能。

2. 确保数据一致性

WAL日志记录了数据库的所有变更操作,确保缓存更新时可以准确地反映数据库的最新状态。这种方式大大提高了数据的一致性,尤其是在高并发环境下,能够有效减少数据冲突和不一致的情况。

3. 增强容错和恢复能力

WAL日志具备出色的容错能力。即使系统发生故障,我们也可以通过重放WAL日志来恢复缓存中的数据,从而保证缓存与数据库的同步一致,快速恢复系统的正常运行。

4. 灵活性强

使用WAL日志还可以实现更加灵活的缓存更新策略。例如,你可以根据业务需求选择性地更新缓存,或者批量处理多条日志记录后再更新缓存,从而进一步优化系统性能。

总体来说,WAL日志为Redis和PostgreSQL的同步提供了一种高效、可靠的解决方案,特别适合需要高数据一致性和快速恢复能力的应用场景。

3.4.2. 基于Spring Boot实现缓存同步

在 Spring Boot 和 Java 环境中,实现 PostgreSQL 与Redis数据同步的技术流程如下:

在上述流程中:

  1. Java 生产者 监听 PostgreSQL 的 WAL 日志,捕获数据库变更。
  2. 如果存在数据更新,这些更新会被发送到 RabbitMQ。
  3. RabbitMQ 作为消息中间件,负责传递这些更新消息。
  4. Java 消费者 从 RabbitMQ 接收消息,解析这些消息,并据此在 Redis 中执行相应的 CRUD 操作。

这样的设计确保了 PostgreSQL 数据库的变更能够实时同步到 Redis 缓存中,从而维持两者之间的数据一致性。

本文仅展示WAL监听到同步Redis的核心步骤及代码(省去RabbitMQ),如对RabbitMQ构建感兴趣的jym可移步(【架构系列】RabbitMQ应用场景及在实际项目中如何搭建可靠的RabbitMQ架构体系-CSDN博客)。

步骤如下:

1. 修改 postgresql.conf 配置文件

wal_level = logical
max_replication_slots = 4
max_wal_senders = 4

2. 修改pg_hba.conf配置文件

# 允许复制连接
host    replication     all             192.168.100.0/24        md5

 这里,192.168.100.0/24应替换为实际的客户端IP地址或网段,md5表示使用密码认证。

3. 重启PostgreSQL服务

# 使用适当的命令重启PostgreSQL,这取决于你的操作系统和安装方式
sudo systemctl restart postgresql
# 或者
sudo service postgresql restart

 4. 编写监听WAL代码

package com.xfc.pg2redis.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.List;

@Service
public class DatabaseChangeService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    private static final String SLOT_NAME = "my_replication_slot";

    @Autowired
    private ChangeLogProcessor changeLogProcessor;

    @PostConstruct
    public void startListening() {
        new Thread(this::initializeReplicationSlot).start();
    }

    private void initializeReplicationSlot() {
        if (!isSlotExists(SLOT_NAME)) {
            createReplicationSlot(SLOT_NAME);
        }
        listenToReplicationSlot();
    }

    private boolean isSlotExists(String slotName) {
        String query = "SELECT COUNT(*) FROM pg_replication_slots WHERE slot_name = ?";
        Integer count = jdbcTemplate.queryForObject(query, new Object[]{slotName}, Integer.class);
        return count != null && count > 0;
    }

    private void createReplicationSlot(String slotName) {
        String query = "SELECT pg_create_logical_replication_slot(?, 'test_decoding')";
        jdbcTemplate.update(query, slotName);
    }

    public void listenToReplicationSlot() {
        String query = "SELECT data FROM pg_logical_slot_get_changes(?, NULL, NULL)";
        while (true) {
            List<String> changes = jdbcTemplate.queryForList(query, new Object[]{SLOT_NAME}, String.class);
            for (String change : changes) {
//                processChange(change);
                changeLogProcessor.processChangeLog(change);
            }
            try {
                Thread.sleep(10000);  // 暂停一段时间再次检查
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

//    private void processChange(String change) {
//        // 处理数据库变更逻辑
//        System.out.println("Detected change: " + change);
//    }
}

 这段代码是一个 Spring Boot 服务类,它使用 JdbcTemplate 从 PostgreSQL 的逻辑复制槽中获取变更数据。服务启动时检查并创建逻辑复制槽,然后启动一个线程不断监听变更,处理变更数据并调用 ChangeLogProcessor 进行进一步处理。

5. 编写解析WAL日志代码

package com.xfc.pg2redis.service;

import com.xfc.pg2redis.utils.RedisUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class ChangeLogProcessor {

    @Autowired
    private RedisUtils redisUtils;

    public void processChangeLog(String changeLog) {
        // 解析日志内容
        String operation = extractOperation(changeLog);
        String id = extractId(changeLog);
        String data = extractData(changeLog);

        switch (operation) {
            case "INSERT":
                // 将数据插入到 Redis
                redisUtils.set(id, data);
                System.out.println("Inserted into Redis: " + id + " => " + data);
                break;

            case "UPDATE":
                // 更新 Redis 中的数据
                redisUtils.set(id, data);
                System.out.println("Updated in Redis: " + id + " => " + data);
                break;

            case "DELETE":
                // 从 Redis 中删除数据
                redisUtils.del(id);
                System.out.println("Deleted from Redis: " + id);
                break;

            default:
                System.out.println("Unknown operation: " + operation);
        }
    }

    private String extractOperation(String changeLog) {
        if (changeLog.startsWith("INSERT:")) {
            return "INSERT";
        } else if (changeLog.startsWith("UPDATE:")) {
            return "UPDATE";
        } else if (changeLog.startsWith("DELETE:")) {
            return "DELETE";
        }
        return "UNKNOWN";
    }

    private String extractId(String changeLog) {
        // 简单的正则表达式提取 id
        String idPattern = "id\\[character varying\\]:'(.*?)'";
        java.util.regex.Pattern pattern = java.util.regex.Pattern.compile(idPattern);
        java.util.regex.Matcher matcher = pattern.matcher(changeLog);
        return matcher.find() ? matcher.group(1) : "";
    }

    private String extractData(String changeLog) {
        // 提取数据内容,这里假设数据是以 `name`, `price`, `num` 字段组成
        // 这里只是一个简单的提取示例,根据实际情况调整
        String dataPattern = "name\\[character varying\\]:'(.*?)' price\\[bigint\\]:(\\d+) num\\[integer\\]:(\\d+)";
        java.util.regex.Pattern pattern = java.util.regex.Pattern.compile(dataPattern);
        java.util.regex.Matcher matcher = pattern.matcher(changeLog);
        if (matcher.find()) {
            return String.format("{\"name\":\"%s\", \"price\":%s, \"num\":%s}",
                    matcher.group(1), matcher.group(2), matcher.group(3));
        }
        return "";
    }
}

上述代码定义了一个名为 ChangeLogProcessor 的服务类,用于处理 PostgreSQL 生成的变更日志,并根据这些日志更新 Redis 缓存。类中的 processChangeLog 方法接收一个变更日志字符串,通过 extractOperationextractIdextractData 方法提取操作类型、数据标识符(ID)和数据内容。根据操作类型(插入、更新、删除),该方法会调用 RedisUtils 类的方法来同步 Redis 缓存,确保 Redis 中的数据与 PostgreSQL 数据库保持一致。这样的处理流程有效支持了数据在不同存储系统间的一致性维护。 

3.4.3. 效果测试

1. 新增数据

调用新增数据接口:

 日志打印:


 redis效果:

2. 编辑数据

 调用编辑数据接口:


 日志打印:


 redis效果:

3.5. 源码地址:

xfc-fdw-cloud: 公共解决方案

4. 结语

通过本文的讨论,我们深入探讨了如何使用WAL日志实现PostgreSQL与Redis之间的高效数据同步。在现代应用程序中,确保数据库和缓存之间的数据一致性至关重要,而WAL日志提供了一种可靠的解决方案,既能保证高性能,又能确保数据的一致性。通过实际代码示例和流程解析,我们展示了如何在Spring Boot环境中实现这一机制,并验证了其在实际应用中的有效性。若本文对你有帮助。别忘记点点关注哦~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2095076.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ACL学习笔记

1.ACL快速配置 需求&#xff1a;拒绝PC 1访问PC 3 &#xff08;1&#xff09;配置PC PC 1: PC 2: PC 3: &#xff08;2&#xff09;配置R1的接口IP信息 sys sysname R1 undo info-center enable interface GigabitEthernet0/0/0 ip address 192.168.1.1 255.255.255.0 qui…

超声波智能水表多少钱一个?

超声波智能水表的价格因品牌、功能、规格等因素而异&#xff0c;就拿深圳合众致达科技有限公司智能水电表厂家的超声波智能水表DN15口径产品举例&#xff0c;价格为399元起。具体价格需根据实际需求来确定。 一、影响价格的主要因素 -技术含量&#xff1a;具备远程数据传输、…

DSOJ-id12

1.保留几位小数 #include <iostream>#include <iomanip> //必须包含这个头文件using namespace std;void main( ){ double a 3.141596;cout<<fixed<<setprecision(3)<<a<<endl; //输出小数点后3位 2. 使用了未初始化的局部变量 Point* …

如何使用小乌龟清除认证缓存、还原版本、定位及常用开发工具集成

&#x1f600;前言 本篇博文是关于如何使用小乌龟清除认证缓存、还原版本、定位及常用开发工具集成&#xff0c;希望你能够喜欢 &#x1f3e0;个人主页&#xff1a;晨犀主页 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是晨犀&#xff0c;希望我的文章可以帮助到大…

openGuass之CTE Reuse

一. 前言 ​ CTE 是指with的公共表达式&#xff0c;如下所示是个CTE样例&#xff1a; ​ CTE表达式往往在同一条sql中多次被重复引用&#xff0c;如上图所示的cte被引用了两次&#xff08;c1 和 c2&#xff09;&#xff0c;我们称为2个CTE实例。 ​ 本文只要…

Windows系统Nginx下载安装配置 运行错误处理

Nginx是一款轻量级的web 服务器/反向代理 服务器。本篇文章主要是nginx的下载安装&#xff0c;处理运行中遇到的问题&#xff0c;配置反向代理。主要分为两部分&#xff1a;下载安装和配置。 目录 1.下载安装 2.nginx配置反向代理 1.下载安装 nginx官网&#xff1a;nginx: …

新160个crackme -044-tsrh-crackme

运行分析 提示去除NAG 不去除NAG也能进入主窗口&#xff0c;需要破解Name和Serial PE分析 ASM程序&#xff0c;32位&#xff0c;壳未知 去除NAG ida搜索字符串&#xff0c;发现NAG弹窗标题字符串&#xff0c;双击进入函数 找到了messagebox&#xff0c;即NAG位置00401079 打开x…

网络压缩之动态计算(dynamic computation)

动态计算希望网络可以自由 地调整它需要的计算量。为什么期待网络可以自由地调整它需要的计算量呢? 因为有时候我 们可能同样的模型会想要跑在不同的设备上面&#xff0c;而不同的设备上面的计算资源是不太一样的。所以期待训练好一个网络以后&#xff0c;放到新的设备上面&am…

Python编程基础知识,让编程基础更加扎实(输出个人简介)

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;开发者-曼亿点 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 曼亿点 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a…

考研--数学(相关公式)

解析几何 知识点1 A(x1,y1) B(x2,y2) 则AB长度 |AB| A、B中点的坐标 &#xff08;&#xff0c;) 知识点2 方程求解 ①点斜式&#xff1a; y-y0k(x0-x) ②斜率式&#xff1a; ykxb ③两点式&#xff1a; …

【笔记篇】Davinci Configurator CanNm模块

目录 1 简介1.1 架构概览2 功能描述2.1 特性2.2 初始化2.3 状态机2.3.1 网络模式2.3.1.1 重复报文状态2.3.1.2 正常状态2.3.1.3 准备休眠状态2.3.2 预休眠模式2.3.3 总线休眠模式2.4 被动模式(对于被动节点)2.5 NM报文格式2.6 NM报文发送2.6.1 重试第一次报文请求2.7 降低总线…

集成电路学习:什么是I2C内部集成电路总线

I2C&#xff1a;内部集成电路总线 I2C&#xff0c;全称Inter-Integrated Circuit&#xff0c;即内部集成电路总线&#xff0c;是由飞利浦公司&#xff08;现为恩智浦半导体&#xff09;在上世纪八十年代初开发的一种同步的串行通信总线。它以其接线简单、硬件实现容易、可扩展性…

Mysql8利用binlog实现数据恢复

文章目录 1binlog基本概念2 binlog相关常用命令3 binlog工具mysqlbinlog4 测试数据准备&导入数据5 模拟误删表6 数据恢复方式说明7 数据恢复分析(偏移量方式恢复)8 数据恢复9 验证10 数据恢复的局限性11 总结 1binlog基本概念 binlog即binary log&#xff0c;二进制日志文件…

【React原理 - 任务调度之中断恢复】

概览 本文紧接上文介绍React调度的时间分片中任务中断和恢复&#xff0c;由于篇幅过长&#xff0c;所以拆成了两篇。上文主要介绍了调度器中的优先级和调度任务的触发、注册和调度循环。本文主要从任务调度入手介绍调度任务之后发送了什么&#xff0c;即在协调器中如何进行到f…

【matlab】数组操作:寻找最大值和最小值及其位置ind2sub函数

【matlab】数组操作&#xff1a;寻找最大值和最小值及其位置ind2sub函数 本文将介绍如何在MATLAB环境中使用内置函数来创建数组&#xff0c;以及如何找到数组中的最大值和最小值及其对应的位置。通过示例代码&#xff0c;我们将一步步展示这一过程&#xff0c;帮助读者更好地理…

探索Python的测试之道:unittest库的奥秘

文章目录 探索Python的测试之道&#xff1a;unittest库的奥秘背景&#xff1a;为何选择unittest&#xff1f;什么是unittest库&#xff1f;如何安装unittest库&#xff1f;简单库函数使用方法场景应用场景一&#xff1a;测试数学运算场景二&#xff1a;测试异常处理场景三&…

armv8 memory model概述

概述 在armv8 架构中&#xff0c;它引入了更多的维度来描述内存模型&#xff0c;从而在此基础上进行硬件优化(但其中一些并未被主流的软件所接受)&#xff0c;在此做一些简单的整理&#xff0c;更多信息请参考 Arm spec 以及 AMBA 协议。下文主要是对Memory 和 Device 两大类的…

Python 算法交易实验86 QTV200日常推进-获取A股日交易额并统计

说明 上一篇说到&#xff0c;交易量可能可以作为策略规则的支持度分析&#xff0c;但是(我现在还不想付费买数据)现成的接口似乎并没有这样的统计。获取某一只股票的日交易数据是相对简单的&#xff0c;市场上也就不到5000只的股票&#xff0c;总数据量应该也不会超过18M(5000…

面向整个价值链的高可信度卫星测试解决方案

在动态行业格局中增强卫星任务能力 在罗德与施瓦茨&#xff0c;我们利用专业知识和量身定制的测试和测量解决方案为卫星行业提供支持。这包括帮助行业参与者满足完整测试路径的要求&#xff0c;以实现完美的系统性能&#xff0c;确保符合最新技术和标准。此外&#xff0c;我们…