使用TCP方式拉取Canal数据

news2024/11/18 23:43:09

1 Canal对接Kafka联调

1.1 配置修改

canal.properties

修改 zk:

canal.zkServers = 10.51.50.219:2181

instance.properties

开启配置项:

canal.mq.dynamicTopic 是 Canal 的 MQ 动态 Topic 配置项:

  • test_javaedge_01 是kafka 的 topic
  • test_db.users 要监控的数据库、表
  • test_db.users 表发生变化时,Canal 将会把变化的数据推送到名为 test_javaedge_01:test_db.users 的 MQ Topic 中。
canal.mq.dynamicTopic=test_javaedge_01:test_db\\.users

开启一个消费者

[root@javaedge-kafka-dev bin]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_javaedge_01

datagrip 新增数据:

消费到该数据:

2 使用TCP方式拉取Canal数据

现在 serverMode 改回tcp。重启

javaedge@JavaEdgedeMac-mini deployer % jps
71002 CanalLauncher
javaedge@JavaEdgedeMac-mini deployer %

canal 同步程序

package com.javaedge.canal;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.base.CaseFormat;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;

public class CanalClientApp {
    public static void main(String[] args) throws Exception {

        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("localhost", 11111),
                "example",
                null, null);

        while (true) {
            connector.connect();
            connector.subscribe("test_db.users");
            Message message = connector.get(100);
            List<CanalEntry.Entry> entries = message.getEntries();
            if (entries.size()>0) {
                for (CanalEntry.Entry entry : entries) {
                    String tableName = entry.getHeader().getTableName();

                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                    CanalEntry.EventType eventType = rowChange.getEventType();

                    if (eventType == CanalEntry.EventType.INSERT) {
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            HashMap<Object, Object> map = new HashMap<>();
                            for (CanalEntry.Column column : afterColumnsList) {
                                String key = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());
                                map.put(key, column.getValue());
                            }
                            System.out.println("tableName=" + tableName + "  map=" + JSON.toJSONString(map));
                        }
                    }
                }

            }

        }

    }
}

运行程序。操作 user 数据表,新增一行数据:

程序输出:

显然,后续不管你想把数据同步到哪儿去,都完全自由!

数据链路

MySQL -》canal server(tcp)-》canal client-》kafka。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1001405.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

混合云代表,青云科技QingCloud从此走向小而美之路

你有你的活法&#xff0c; 我有我的活法 只要活出自己&#xff0c; 活出个样子都是美事 【全球元观察 &#xff5c; 热点关注】 刚注意到青云科技QingCloud的最新发布2023年半年度报告&#xff0c;2023上半年同比减亏约7000万元&#xff0c;研发人员减少83人。从这两个数字…

贝wa儿歌(安卓)

本次贝wa儿歌为大家提供的是特殊版本的&#xff0c;软件需要注册登录才能才能使用VIP权限&#xff0c;安装好后&#xff0c;打开如果提示更新请点击取消&#xff0c;一定不要更新哦。 贝瓦儿歌不仅只是儿歌&#xff0c;其实有很多分类的&#xff0c;包括有国学&#xff0c;动画…

小米6/6X/米8/米9手机刷入鸿蒙HarmonyOS.4.0系统-刷机包下载-遥遥领先

小米手机除了解锁root权限&#xff0c;刷GSI和第三方ROM也是米粉的一大爱好&#xff0c;这不&#xff0c;在华为发布了HarmonyOS.4.0系统后不久&#xff0c;我们小米用户也成功将自己的手机干山了HarmonyOS.4.0系统。虽然干上去HarmonyOS.4.0系统目前BUG非常多&#xff0c;根本…

系统架构:软件工程速成

文章目录 参考第一章 概述软件工程概述软件过程 参考 软件工程速成(期末考研复试软考)均适用. 支持4K 第一章 概述 软件工程概述 定义&#xff1a;采用工程的概念、原理、技术和方法来开发与维护软件。 三要素&#xff1a; 方法&#xff1a;完成软件开发各项任务的技术方…

人工智能AI 全栈体系(二)

第一章 神经网络是如何实现的 上节描述的网络结构比较特殊&#xff0c;不具有一般性。比如前面我们讲过的权重都是1或者-1&#xff0c;这是很特殊的情况&#xff0c;实际上权重可以是任何数值&#xff0c;可以是正的&#xff0c;也可以是负的&#xff0c;也可以是带小数的。权…

网络安全岗位面试经验总结(附面试题)

思路流程: 信息收集 服务器的相关信息&#xff08;真实ip&#xff0c;系统类型&#xff0c;版本&#xff0c;开放端口&#xff0c;WAF等&#xff09; 网站指纹识别&#xff08;包括&#xff0c;cms&#xff0c;cdn&#xff0c;证书等&#xff09;&#xff0c;dns记录 whois信…

Linux工具——gcc

目录 一&#xff0c;gcc简介 二&#xff0c;C语言源文件的编译过程 1.预处理 2.编译 3.汇编 4.链接 5.动静态库 一&#xff0c;gcc简介 相信有不少的小白和我一样在学习Linux之前只听说过visual studio。其实这个gcc这个编译器实现的功能便是和visual studio一样的功能&…

纷享销客入选中国信通院《高质量数字化转型产品及服务全景图》

近期&#xff0c;在中国信息通信研究院主办的“2023数字生态发展大会”暨中国信通院“铸基计划”年中上&#xff0c;重磅发布了《高质量数字化转型产品及服务全景图&#xff08;2023&#xff09;》&#xff0c;纷享销客凭借先进的技术能力和十余年客户业务场景应用理解&#xf…

Linux fcntl函数

/*#include <unistd.h>#include <fcntl.h>int fcntl(int fd, int cmd, ... ); 参数&#xff1a;- fd&#xff1a;需要操作的文件描述符- cmd&#xff1a;表示对文件描述符如何操作- F_DUPFD:复制文件描述符&#xff0c;复制的是参数fd&#xff0c;得到一个新的文件…

LeetCode:2. 两数相加

给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。 请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 之外&#xff0c;这两个数都不会以 0 …

Java学习笔记------抽象类和抽象方法

抽象方法 抽象方法&#xff1a;将共性的行为&#xff08;方法&#xff09;抽取到父类之后&#xff0c;由于每一个子类执行的内容是不一样的&#xff0c;所以&#xff0c;在父类中不能确定具体的方法体&#xff0c;该方法就可以定义为抽象方法抽象类&#xff1a;如果一个类中存…

d3.js 的使用

这篇文章相当于之前 svg 的补充。 因为 svg 代码肯定不是人为去专门写的。 在这里推荐制作 svg 的第三方库 - D3.js 用于定制数据可视化的JavaScript库 - D3 官网地址&#xff1a; D3 by Observable | The JavaScript library for bespoke data visualization 简单使用 画…

嵌入式基础知识-信息安全与加密

本篇来介绍计算机领域的信息安全以及加密相关基础知识&#xff0c;这些在嵌入式软件开发中也同样会用到。 1 信息安全 1.1 信息安全的基本要素 保密性&#xff1a;确保信息不被泄露给未授权的实体。包括最小授权原则、防暴露、信息加密、物理加密。完整性&#xff1a;保证数…

nginx-日志处理

access.log #正常请求的日志 error.log #访问错误日志&#xff0c;404&#xff0c;500等请求在这里 buffer&#xff1a;设置缓冲区&#xff0c;访问日志不会直接打到磁盘上&#xff0c;而是先积攒到缓冲区&#xff0c;缓冲区满了后在统一往…

H5打包APP和IOS实现免签,超级签,mam签,h5分发

博主技术笔记 博主开源微服架构前后端分离技术博客项目源码地址&#xff0c;欢迎各位star 微信公众号&#xff0c;每天给大家提供技术干货 http://yun.viphssp.top/root 打包出来的模板&#xff1a;https://yun.viphssp.top/mq/3/html/index.html

解决a标签内容中img标签和p标签垂直方向间隔太大的问题

现象如下&#xff1a; 对应的html结构&#xff1a; 解决办法&#xff1a;给a标签设置&#xff1a;display: inline-block和line-height属性。 然后问题解决&#xff1a; 具体原理如下&#xff08;由chatgpt回答&#xff09;&#xff1a; display: inline-block 可以减少垂直方…

java可以跨平台的原因是什么?

因为不同平台可以安装对应的JVM&#xff08;Java Virtual Machine&#xff09;&#xff0c;它是C/ C写的&#xff0c;JVM可以屏蔽所有和平台相关的信息&#xff0c;并帮助把Java文件经过编译后生成的和平台无关的class类文件&#xff08;也就是字节码&#xff09;从硬盘读取到内…

Java面试题(持续更新中)

一、Java基础集合多线程JVM 1.Java基础 1.1面向对象和面向过程的区别 面向过程&#xff1a;面向过程的性能比面向对象高。因为类调用时需要实例化&#xff0c;消耗比较大&#xff0c;比较消耗资源&#xff0c;所以当性能是最重要的考量因素的时候&#xff0c;比如单片机、Li…

内存卡数据恢复软件推荐,简直是高效恢复好帮手!

“朋友们在使用内存卡时有没有好的内存卡数据恢复软件推荐呀&#xff1f;我的内存卡对我来说真的很重要&#xff0c;但是我不小心把里面的数据删除了&#xff0c;我应该怎么做才能恢复里面的数据呢&#xff1f;” 内存卡为我们的生活提供了很多的便利&#xff0c;我们可以在里面…

输入时并未按照格式,没注意汉字符号

&#x1f388;问题现象&#xff1a; 运行出来的代码没得到想要的结果&#xff1a; &#x1f388;原因分析&#xff1a; 程序运行起来了&#xff0c;计算的结果是错误的&#xff0c;这个最好的解决办法就是调试&#xff0c;一步步的看代码在每个阶段的值是不是我们期望的&…