Canal+Kafka实现MySQL与Redis数据同步(二)

news2024/11/18 12:16:39

Canal+Kafka实现MySQL与Redis数据同步(二)

创建MQ消费者进行同步

在application.yml配置文件加上kafka的配置信息:

spring:
  kafka:
      # Kafka服务地址
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: consumer-group1
      #序列化反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288

根据上面Kafka消费命令那里,我们知道了json数据的结构,可以创建一个CanalBean对象进行接收:

public class CanalBean {
    //数据
    private List<TbCommodityInfo> data;
    //数据库名称
    private String database;
    private long es;
    //递增,从1开始
    private int id;
    //是否是DDL语句
    private boolean isDdl;
    //表结构的字段类型
    private MysqlType mysqlType;
    //UPDATE语句,旧数据
    private String old;
    //主键名称
    private List<String> pkNames;
    //sql语句
    private String sql;
    private SqlType sqlType;
    //表名
    private String table;
    private long ts;
    //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
    private String type;
    //getter、setter方法
}
public class MysqlType {
    private String id;
    private String commodity_name;
    private String commodity_price;
    private String number;
    private String description;
    //getter、setter方法
}
public class SqlType {
    private int id;
    private int commodity_name;
    private int commodity_price;
    private int number;
    private int description;
}

最后就可以创建一个消费者CanalConsumer进行消费:

@Component
public class CanalConsumer {
    //日志记录
    private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);
    //redis操作工具类
    @Resource
    private RedisClient redisClient;
    //监听的队列名称为:canaltopic
    @KafkaListener(topics = "canaltopic")
    public void receive(ConsumerRecord<?, ?> consumer) {
        String value = (String) consumer.value();
        log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);
        //转换为javaBean
        CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
        //获取是否是DDL语句
        boolean isDdl = canalBean.getIsDdl();
        //获取类型
        String type = canalBean.getType();
        //不是DDL语句
        if (!isDdl) {
            List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();
            //过期时间
            long TIME_OUT = 600L;
            if ("INSERT".equals(type)) {
                //新增语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //新增到redis中,过期时间是10分钟
                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                }
            } else if ("UPDATE".equals(type)) {
                //更新语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //更新到redis中,过期时间是10分钟
                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                }
            } else {
                //删除语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //从redis中删除
                    redisClient.deleteKey(id);
                }
            }
        }
    }
}

测试MySQL与Redis同步

mysql对应的表结构如下:

CREATE TABLE `tb_commodity_info` (
  `id` varchar(32) NOT NULL,
  `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',
  `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',
  `number` int(10) DEFAULT '0' COMMENT '商品数量',
  `description` varchar(2048) DEFAULT '' COMMENT '商品描述',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

首先在MySQL创建表。然后启动项目,接着新增一条数据:

INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉包', '3.99', '3', '大叉包,老喜欢');

tb_commodity_info表查到新增的数据:

img

Redis也查到了对应的数据,证明同步成功!

img

如果更新呢?试一下Update语句:

UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='便宜的青菜包' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';

img

img

没有问题!

总结

canal的缺点:

  1. canal只能同步增量数据。
  2. 不是实时同步,是准实时同步。
  3. 存在一些bug,不过社区活跃度较高,对于提出的bug能及时修复。
  4. MQ顺序性问题。
    网的回答,大家参考一下
    img

尽管有一些缺点,毕竟没有一样技术(产品)是完美的,合适最重要。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1226345.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JavaScript实现飞机发射子弹详解(内含源码)

JavaScript实现飞机发射子弹 前言实现过程源码展示源码讲解HTML结构CSS结构js结构 前言 文本主要讲解如何利用JavaScript实现飞机发射子弹&#xff0c;实现过程以及源码讲解。实现效果图如下&#xff1a; 实现过程 首先&#xff0c;找到飞机和子弹的UI图&#xff0c;gif图最…

C++虚函数(定义,作用,原理,案例)

一.定义&#xff1a; C的虚函数是在父类(基类)中声明的的函数&#xff0c;它可在子类(派生类)中重写。二.作用 虚函数的目的是实现多态性&#xff0c;即在程序运行时根据对象的实际类型确定调用哪个函数。三.使用方法&#xff1a; 在基类中声明虚函数时&#xff0c;需要在函…

全链路监控--pinpoint

一、pinpoint架构原理 架构组成 Pinpoint Agent:和自己运行的应用关联起来的探针 Pinpoint Collector:收集各种性能数据 Pinpoint-Web: 将收集到的数据显成为 WEB网页显示 HBase Storage: 存储收集到的数据 工作原理 pinpoint的核心思想是在各个服务节点之间彼此调用时&a…

【自然语言处理】【大模型】赋予大模型使用工具的能力:Toolformer与ART

赋予大模型使用工具的能力&#xff1a;Toolformer与ART ​ 本文介绍两种赋予大模型使用外部工具能力的方法&#xff1a;Toolformer和ART。 Toolformer论文地址&#xff1a;https://arxiv.org/pdf/2302.04761.pdf ART论文地址&#xff1a;https://arxiv.org/pdf/2303.09014.pd…

《视觉SLAM十四讲》-- 回环检测

文章目录 10 回环检测10.1 概述10.1.1 回环检测的意义10.1.2 回环检测的方法10.1.3 准确率和召回率 10.2 词袋模型10.3 字典10.3.1 字典的结构10.3.2 实践&#xff1a;创建字典 10.4 相似度计算10.4.1 理论部分10.4.2 实践&#xff1a;相似度的计算 10.5 实验分析与评述 10 回环…

Java(二)(String的常见方法,ArrayList的常见方法)

String 创建string对象 package Helloworld;public class dome1 {public static void main(String[] args) {// 1.直接双引号得到字符串对象,封装字符串对象String name "lihao";System.out.println(name);// 2. new String 创建字符串对象,并调用构造器初始化字符…

html综合笔记:设计实验室主页

&#xff11; 主页来源及效果 Overview - Lab Website Template docs (gitbook.io) greenelab/lab-website-template: An easy-to-use, flexible website template for labs (github.com) 2 创建网页 3 主要的一些file 3.1 index.md 主页面 3.1.1 intro 3.1.2 highlight …

庖丁解牛:NIO核心概念与机制详解 02 _ 缓冲区的细节实现

文章目录 PreOverview状态变量概述Position 访问方法 Pre 庖丁解牛&#xff1a;NIO核心概念与机制详解 01 接下来我们来看下缓冲区内部细节 Overview 接下来将介绍 NIO 中两个重要的缓冲区组件&#xff1a;状态变量和访问方法 (accessor) 状态变量是"内部统计机制&quo…

【汇编】处理字符问题

文章目录 前言一、处理字符问题1.1 汇编语言如何处理字符1.2 asciiascii码是什么&#xff1f;ascii码表是什么&#xff1f; 1.3 汇编语言字符示例代码 二、大小写转换2.1 问题&#xff1a;对datasg中的字符串2.2 逻辑与和逻辑或2.3 程序&#xff1a;解决大小写转换的问题一个新…

我终于体会到了:代码竟然不可以运行,为什么呢?代码竟然可以运行,为什么呢?

废话不多说&#xff0c;直接上图 初看只当是段子&#xff0c;再看已是段中人 事情经过&#xff1a; 我在写动态顺序表的尾插函数时&#xff0c;写出了如下代码&#xff0c;可以跑&#xff0c;但是这段代码有一个bug暂时先不提 //动态顺序表的尾插 void SLPushBack(SL* psl, …

python中列表的基础解释

列表&#xff1a; 一种可以存放多种类型数据的数据结构 列表的创建&#xff1a; 1.用【】创建列表 #创建一个空列表 list1[] #创建一个非空列表 list2 [zhang,li,ying,1,2,3] #输出内容及类型 print(list1,type(list1)) print(list2,type(list2))结果&#xff1a; 2.使用list…

详细步骤记录:持续集成Jenkins自动化部署一个Maven项目

Jenkins自动化部署 提示&#xff1a;本教程基于CentOS Linux 7系统下进行 Jenkins的安装 1. 下载安装jdk11 官网下载地址&#xff1a;https://www.oracle.com/cn/java/technologies/javase/jdk11-archive-downloads.html 本文档教程选择的是jdk-11.0.20_linux-x64_bin.tar.g…

Linux中系统时间同步

在Windwos中&#xff0c;系统时间的设置很简单&#xff0c;界面操作&#xff0c;通俗易懂&#xff0c;而且设置后&#xff0c;重启&#xff0c;关机都没关系。系统时间会自动保存在BIOS时钟里面&#xff0c;启动计算机的时候&#xff0c;系统会自动在BIOS里面取硬件时间&#x…

开发知识点-uniapp微信小程序-开发指南

uniapp uni.chooseLocationgetCurrentPages美团外卖微信小程序开发uniapp-美团外卖微信小程序开发P1 成果展示P2外卖小程序后端&#xff0c;学习给小程序写http接口P3 主界面配置P4 首页组件拆分P13 外卖列表布局筛选组件商家 布局测试数据创建样式 请求商家外卖数据封装请求并…

汇编-指针

一个变量如果包含的是另一个变量的地址&#xff0c; 则该变量就称为指针(pointer) 。指针是操作数组和数据结构的极好工具&#xff0c;因为它包含的地址在运行时是可以修改的。 .data arrayB byte 10h, 20h, 30h, 40h ptrB dword arrayB ptrB1 dword OFFSET arrayBarray…

4.6每日一题(多元函数的隐函数求导)

三元方程确定的二元函数类型的隐函数 方法一&#xff1a;两边对x求偏导&#xff0c;把y看成常数 注&#xff1a;z可以把x和y同时代入求出答案 方法二&#xff1a;带公式

数据结构【DS】栈

共享栈 共享栈的目的是什么&#xff1f; 目的:有效利用存储空间。 共享栈的存取数据时间复杂度为&#xff1f; 存取数据时间复杂度为O(1) 共享栈如何判空&#xff1f;如何判满&#xff1f; 两个栈的栈顶指针都指向栈顶元素&#xff0c;&#x1d461;&#x1d45c;&#x1d45d;…

【0基础学Java第十课】-- 认识String类

10. 认识String类 10.1 String类的重要性10.2 常用方法10.2.1 字符串构造10.2.2 String对象的比较10.2.3 字符串查找10.2.4 转化10.2.5 字符串替换10.2.6 字符串拆分10.2.7 字符串截取10.2.8 字符串的不可变性10.2.9 字符串修改 10.3 StringBuilder和StringBuffer10.3.1 String…

Java 11及更高版本的Oracle JDK版本

2021 年 9 月 14 日&#xff0c;Oracle 发布了可以长期支持的 JDK17 版本&#xff0c;那么从 JDK11 到 JDK17&#xff0c;到底带来了哪些特性呢&#xff1f;亚毫秒级的 ZGC 效果到底怎么样呢&#xff1f;值得我们升级吗&#xff1f;而且升级过程会遇到哪些问题呢&#xff1f;带…