Canal整合SpringBoot详解(二)

news2024/11/18 9:21:01

文章目录

    • Canal整合SpringBoot详解(二)
      • 什么是canal
      • 案例2:Canal+Kafka实现mysql和elasticsearch的数据同步⭐
        • Docker搭建elasticsearch7.8.0(单机版本)⭐
        • Docker安装elasticsearch-head5⭐
          • 解决es-head 406错误问题
            • 直接修改容器内文件(需要下载vim命令)
        • Docker安装kibana(注意:kibana的版本要和elasticsearch的版本相同才行)⭐
        • 修改我们刚刚的SpringBoot项目⭐
          • 把ConfigCanalRedisConsumer类注释掉(或者可以修改instance.properties的topic名称)⭐
          • 给pom.xml添加ElasticSearch的依赖⭐
          • ElasticSearchConfig.class
          • 添加索引的test方法
          • 新建ConfigCanalElasticSearchConsumer类(kafka消费者类,监听指定topic,把canal发送的消息同步到ElasticSearch中)⭐

Canal整合SpringBoot详解(二)

什么是canal

  • canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
  • canal工作原理:
    • canal的工作原理就是把自己伪装成MySQL slave从节点,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如RocketMQ、Kafka、ElasticSearch等等。
  • canal能做什么:
    • 数据库镜像
    • 数据库实时备份
    • 索引构建和实时维护
    • 业务cache(缓存)刷新
    • 带业务逻辑的增量数据处理

案例2:Canal+Kafka实现mysql和elasticsearch的数据同步⭐

案例目的:

1:实现canal只监控canal-test-db1数据库下的t_config(主要同步这个表)和.t_user表。

2:当我们修改canal-test-db1数据库下的t_config表的内容会自动同步到ElasticSearch中;

3:当我们修改canal-test-db1数据库下的t_user表则不会同步。(虽然t_user表也被canal监控,但是这个案例就要做到在被监控的情况下,而不被同步),说白了就是只同步t_config表。

Docker搭建elasticsearch7.8.0(单机版本)⭐
  • 1:docker可能会拉取不了es,此时可以配置一个很好用的镜像源(daocloud),下载非常快:
curl -sSL https://get.daocloud.io/daotools/set_mirror.sh | sh -s http://f1361db2.m.daocloud.io
sudo systemctl restart docker
  • 2:创建挂载目录:
mkdir -p /usr/local/docker/elasticsearch/config
mkdir -p /usr/local/docker/elasticsearch/data

chmod 777 /usr/local/docker/elasticsearch/config
chmod 777 /usr/local/docker/elasticsearch/data
  • 3:编写es配置文件:
vi /usr/local/docker/elasticsearch/config/elasticsearch.yml

内容如下:

cluster.name: “es-cluser01”
node.name: es-node1
network.host: 0.0.0.0
http.cors.enabled: true
http.cors.allow-origin: "*"
cluster.initial_master_nodes: ["es-node1"] #这个一定要填,集群默认的主节点名称(node.name)
  • 4:永久调大虚拟机内存:(不然启动不了)
vim /etc/sysctl.conf

在最后面添加的内容如下:

vm.max_map_count=262144
  • 5:刷新配置:
sysctl -p
  • 6:运行elasticsearch容器:(访问该服务器ip:9200即可访问)
    • ES_JAVA_OPTS两个Xms的值都要一致,不然会报错。(这个很坑!!)
docker run --name elasticsearch \
-p 9200:9200 \
-p 9300:9300 \
-e “discovery.type=single-node” \
-e ES_JAVA_OPTS="-Xms256m -Xmx256m" \
-v /usr/local/docker/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /usr/local/docker/elasticsearch/data:/usr/share/elasticsearch/data \
-v /usr/local/docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.8.0

在这里插入图片描述

Docker安装elasticsearch-head5⭐
  • 1:拉取镜像:
docker pull mobz/elasticsearch-head:5
  • 2:启动镜像:
docker run -d -p 9100:9100 --name=elasticsearch-head mobz/elasticsearch-head:5
  • 3:进入容器:
docker exec -it elasticsearch-head /bin/bash
解决es-head 406错误问题
  • 方式1:直接修改容器内文件
  • 方式2:使用容器数据卷的方式(推荐。可以使用容器数据卷的方式修改vendor.js 文件⭐)
直接修改容器内文件(需要下载vim命令)
  • 1:
mv /etc/apt/sources.list /etc/apt/sources.list.bak
    echo "deb http://mirrors.163.com/debian/ jessie main non-free contrib" >> /etc/apt/sources.list
    echo "deb http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
    echo "deb-src http://mirrors.163.com/debian/ jessie main non-free contrib" >>/etc/apt/sources.list
    echo "deb-src http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
  • 2:更新源
apt update
  • 3:安装vim:(按Y即可)
apt-get install vim
  • 4:修改vendor.js 文件:
vim _site/vendor.js 

修改1:在6886行,把contentType: "application/x-www-form-urlencoded,修改成contentType: “application/json;charset=UTF-8”

修改2:7573行 var inspectData = s.contentType === “application/x-www-form-urlencoded” &&

修改成var inspectData = s.contentType === “application/json;charset=UTF-8” &&

  • 5:重启容器:
docker restart elasticsearch-head
Docker安装kibana(注意:kibana的版本要和elasticsearch的版本相同才行)⭐
  • 1:拉取镜像:(注意:kibana的版本要和elasticsearch的版本相同才行)
docker pull kibana:7.8.0
  • 2:编辑配置文件:
mkdir -p /usr/local/kibana/config/
vi /usr/local/kibana/config/kibana.yml

内容如下:(修改elasticsearch.hosts为你的elasticsearch地址列表)

server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://192.168.184.201:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
  • 3:启动:
docker run -d \
  --name=kibana \
  --restart=always \
  -p 5601:5601 \
  -v /usr/local/kibana/config/kibana.yml:/usr/share/kibana/config/kibana.yml \
  kibana:7.8.0
  • 4:访问kibana:(服务器ip:5601)

在这里插入图片描述
在这里插入图片描述

修改我们刚刚的SpringBoot项目⭐
把ConfigCanalRedisConsumer类注释掉(或者可以修改instance.properties的topic名称)⭐

在这里插入图片描述

给pom.xml添加ElasticSearch的依赖⭐
        <!--        注意ElasticSearch依赖版本需要和我们连接的ElasticSearch版本一致-->
        <!--        elasticSearch-->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.8.0</version>
        </dependency>
		<!--        restHighLevelClient版本和elasticSearch一致-->
        <!--        restHighLevelClient-->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>
ElasticSearchConfig.class
package com.boot.config;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @author youzhengjie 2022-09-01 22:46:04
 */
@Configuration
public class ElasticSearchConfig {

    @Bean
    public RestHighLevelClient restHighLevelClient() {
        HttpHost httpHost = new HttpHost("192.168.184.201", 9200, "http");
        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
        return new RestHighLevelClient(restClientBuilder);
    }
}
添加索引的test方法
package com.boot;

import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;

@SpringBootTest
public class ConfigCanalTest {

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    private static final String INDEX="config-canal-es";

    /*
     1:创建es索引
     */
    /*

    PUT config-canal-es
    {
        "mappings":{
        "properties":{
            "configInfo":{
               "type":"text",
               "analyzer":"standard"
           },
            "datetime":{
               "type":"keyword"
           },
           "desc":{
               "type":"text"
           }

             }
        }
   }

     */
    @Test //代码实现上面的添加索引。
    //注意:使用XContentFactory.jsonBuilder()创建索引,不需要把"mappings":{}这个算上去。不然会报错。也就是说不可以写startObject("mappings").endObject()
    void addConfigIndexToES() throws IOException {

        CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX);
        XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
                .startObject() //{
                .startObject("properties")//"properties"{
                .startObject("configInfo") //"configInfo"{
                .field("type", "text") // "type":"text",
                .field("analyzer", "standard")//"analyzer":"standard"
                .endObject()//},
                .startObject("datetime")//"datetime":{
                .field("type", "keyword")//"type":"keyword"
                .endObject()//},
                .startObject("desc")//"desc":{
                .field("type", "text")//"type":"text"
                .endObject()//}
                .endObject()//} ,properties的结束
                .endObject();//}

        createIndexRequest.mapping(xContentBuilder);
        restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
    }

    @Test//删除索引
    void deleteConfigToES() throws IOException {

        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(INDEX);
        restHighLevelClient.indices().delete(deleteIndexRequest,RequestOptions.DEFAULT);

    }

}
新建ConfigCanalElasticSearchConsumer类(kafka消费者类,监听指定topic,把canal发送的消息同步到ElasticSearch中)⭐
package com.boot.comsumer;

import com.alibaba.fastjson.JSONObject;
import com.boot.entity.Config;
import com.boot.entity.config_canal.ConfigCanalBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * kafka消费者(监听名为canal-test-topic的topic),同步ElasticSearch
 * @author youzhengjie 2022-09-01 16:54:28
 */
@Component
@Slf4j
public class ConfigCanalElasticSearchConsumer {

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    //es的index,相当于mysql的数据库:(数据库.表名)
    private static final String ES_INDEX = "config-canal-es";

    //过期时间(单位:小时)
    private static final int TIME_OUT = 24;


    /**
     * @param consumer 接收消费记录(消息)
     * @param ack 手动提交消息
     */
    @KafkaListener(topics = "canal-test-topic")
    public void receive(ConsumerRecord<String, String> consumer, Acknowledgment ack) {

        try {
            //获取canal的消息
            String value = (String) consumer.value();
            log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), value);

            //转换为javaBean
            ConfigCanalBean canalBean = JSONObject.parseObject(value, ConfigCanalBean.class);
            /*
            由于我们canal.instance配置了监控canal-test-db1.t_config表和canal-test-db1.t_user表(生产环境下可以启动多个canal,每一个canal监听一张需要同步的表)
            所以我们要对这两张表分开处理。(可以通过他们的表名(canalBean.getTable())来区分)
            如果canalBean.getTable()获取的表名是t_config,则同步到es,如果不是则不管。
             */
            log.warn("["+canalBean+"]");
            if("t_config".equals(canalBean.getTable())){
                //获取是否是DDL语句
                boolean isDdl = canalBean.isDdl();
                //获取当前sql语句的类型(比如INSERT、DELETE等等)
                String type = canalBean.getType();
                List<Config> datas = canalBean.getData();
                if ("t_config".equals(canalBean.getTable())) {
                    //如果不是DDL语句
                    if (!isDdl) {
                        //INSERT和UPDATE都是一样的操作
                        if ("INSERT".equals(type) || "UPDATE".equals(type)) {
                            //新增语句
                            for (Config config : datas) {
                                // 增加、修改处理
                                IndexRequest indexRequest = new IndexRequest(ES_INDEX);
                                indexRequest.id(config.getConfigId()+""); //id
                                ConcurrentHashMap<String, Object> dataMap = new ConcurrentHashMap<>();
                                dataMap.put("configInfo",(config.getConfigInfo()!=null)?config.getConfigInfo():"");
                                dataMap.put("datetime",(config.getDatetime()!=null)?config.getDatetime():"");
                                dataMap.put("desc",(config.getDesc()!=null)?config.getDesc():"");
                                indexRequest.source(dataMap);

                                IndexResponse response= restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);

                                RestStatus status = response.status();
                                log.info("status="+status.toString());
                            }
                        }else if("DELETE".equals(type)){
                            //删除语句
                            if(datas!=null && datas.size()>0){
                                for (Config config : datas) {
                                    DeleteRequest deleteRequest = new DeleteRequest();
                                    deleteRequest.id(config.getConfigId()+"");
                                    DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
                                    log.info("deleteResponse:"+deleteResponse);
                                }
                            }
                        }

                    }
                }
            }
            //最后,如果上面的代码没有报错的情况下,可以确认消息了。(很重要)
            ack.acknowledge();
        }catch (Exception e){
            throw new RuntimeException();
        }
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1142212.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

实用篇-Eureka注册中心

一、提供者与消费者 服务提供者&#xff1a;一次业务中&#xff0c;被其他微服务调用的服务。(提供接口给其他微服务) 服务消费者&#xff1a;一次业务中&#xff0c;调用其他微服务的服务。(调用其他微服务提供的接口) 例如前面的案例中&#xff0c;order-service微服务是服…

系列七、动态代理

一、概述 二、Jdk动态代理案例 2.1、Star /*** Author : 一叶浮萍归大海* Date: 2023/10/27 17:16* Description:*/ public interface Star {/*** 唱歌* param name 歌曲名字* return*/String sing(String name);/*** 跳舞*/void dance(); } 2.2、BigStar /*** Author : 一叶…

AcWing 1.2.1 最长上升子序列模型 + 动态规划 + 图解(详细)

&#xff08;1&#xff09;acwing 4557. 最长上升子序列 4557. 最长上升子序列 - AcWing题库 给定一个长度为 N 的整数序列 a1,a2,…,aN。请你计算该序列的最长上升子序列的长度。上升子序列是指数值严格单调递增的子序列 输入格式 第一行包含整数 N第二行包含 N个整数 a1,a…

LLM系列 | 23:多模态大模型:浦语·灵笔InternLM-XComposer解读、实战和思考

引言 ​简介 模型解读 模型架构 训练 实战 环境准备 本地实测 服务部署 总结 引言 谁念西风独自凉&#xff0c;萧萧黄叶闭疏窗&#xff0c;沉思往事立残阳。 Created by DALLE 3 小伙伴们好&#xff0c;我是《小窗幽记机器学习》的小编&#xff1a;卖热干面的小女孩…

在Golang中理解错误处理

处理Golang中临时错误和最终错误的策略和示例 作为一名精通Golang的开发人员&#xff0c;您了解有效的错误处理是编写健壮可靠软件的关键因素。在复杂系统中&#xff0c;错误可能采取各种形式&#xff0c;包括临时故障和最终失败。在本文中&#xff0c;我们将探讨处理Golang中…

源码解析SpringMVC之RequestMapping注解原理

1、启动初始化 核心&#xff1a;得到应用上下文中存在的全部bean后依次遍历&#xff0c;分析每一个目标handler & 目标方法存在的注解RequestMapping&#xff0c;将其相关属性封装为实例RequestMappingInfo。最终将 uri & handler 之间的映射关系维护在类AbstractHand…

Java入门篇 之 数据类型(简单介绍)

博主回归学习状态的第三篇文章&#xff0c;希望对大家有所帮助 今日份励志文案:你若决定灿烂&#xff0c;山无遮&#xff0c;海无拦 加油&#xff01; Java中一共存在2种数据类型 1 . 基本数据类型,基本数据类型四种和八种之说(具体看下图) 四种说的是&#xff0c;整数型&…

vscode打开settings.json方法

cmd shift p&#xff0c;输入setting Open Workspace Settings 也会打开UI设置界面&#xff1b; Open User Settings (JSON) 会打开用户设置 settings.json 文件&#xff1b; Open Workspace Settings (JSON) 会打开工作区设置 settings.json 文件 vscode存在两种设置 sett…

损失函数和目标函数|知识补充

这张图中&#xff0c;横坐标size表示房屋的大小&#xff0c;纵坐标price表示房屋的价格&#xff0c;现在需要建立模型来表示两者之间的关系。 对于给定的输入x&#xff0c;模型会有一个输出f(x)&#xff0c;用一个函数来度量拟合的程度&#xff0c;也就是真实值和预测值之间的…

前端工程化面试题及答案【集合】

前言&#xff1a; 欢迎浏览和关注本专栏《 前端就业宝典 》&#xff0c; 不管是扭螺丝还是造火箭&#xff0c; 多学点知识总没错。 这个专栏是扭螺丝之上要造火箭级别的知识&#xff0c;会给前端工作学习的小伙伴带来意想不到的帮助。 本专栏将前端知识拆整为零&#xff0c;主要…

工业相机常见的工作模式、触发方式

参考&#xff1a;机器视觉——工业相机的触发应用(1) - 知乎 工业相机常见的工作模式一般分为&#xff1a; 触发模式连续模式同步模式授时同步模式 触发模式&#xff1a;相机收到外部的触发命令后&#xff0c;开始按照约定时长进行曝光&#xff0c;曝光结束后输出一帧图像。…

子集生成算法:给定一个集合,枚举所有可能的子集

给定一个集合&#xff0c;枚举所有可能的子集。 &#xff08;为简单起见&#xff0c;本文讨论的集合中没有重复元素&#xff09; 1、方法一&#xff1a;增量构造法 第一种思路是一次选出一个元素放到集合中&#xff0c;程序如下&#xff1a; void print_subset(int n, int …

C++系列之list的模拟实现

&#x1f497; &#x1f497; 博客:小怡同学 &#x1f497; &#x1f497; 个人简介:编程小萌新 &#x1f497; &#x1f497; 如果博客对大家有用的话&#xff0c;请点赞关注再收藏 &#x1f31e; list的节点类 template struct list_Node { public: list_Node* _prev; list_…

Tomcat服务部署和优化

目录 一、Tomcat&#xff1a; 1、Tomcat作用&#xff1a; 2、Tomcat的核心组件&#xff1a; 3、servlet作用&#xff1a; 4、Tomcat的核心功能&#xff1a; 二、tomcat配置 一、Tomcat&#xff1a; 是一个开源的web应用服务器&#xff0c;nginx主要处理静态页面&#xff…

不再受害:如何预防和应对.mallab勒索病毒攻击

导言&#xff1a; 我们的数据成了我们的珍宝&#xff0c;但也成了黑客们追逐的目标。其中&#xff0c;.mallab勒索病毒就是一个充满阴谋和神秘的数字威胁&#xff0c;它采用高度复杂的方法将您的数据锁在数字牢笼中。本文91数据恢复将深入探讨.mallab勒索病毒的起源、工作方式…

【RabbitMQ 实战】12 镜像队列

一、镜像队列的概念 RabbitMQ的镜像队列是将消息副本存储在一组节点上&#xff0c;以提高可用性和可靠性。镜像队列将队列中的消息复制到一个或多个其他节点上&#xff0c;并使这些节点上的队列保持同步。当一个节点失败时&#xff0c;其他节点上的队列不受影响&#xff0c;因…

视频转换器WinX HD Video Converter mac中文特点介绍

WinX HD Video Converter mac是一款功能强大的视频转换器&#xff0c;它可以将各种不同格式的视频文件转换为其他视频格式&#xff0c;以便用户在各种设备上进行播放。WinX HD Video Converter是一个功能强大、易于使用的视频转换器&#xff0c;适用于各种类型的用户&#xff0…

可图性判断(图论)

如图所示&#xff1a; 1.去arr[i]首元素&#xff0c; 后面arr[i]个元素减一 2.排序&#xff0c;以此类推 3.最后如果出现负数则不可图 4.最后元素为0&#xff0c;则可图 问题 L: Degree Sequence of Graph G代码如下&#xff1a;

C#版字节跳动SDK - SKIT.FlurlHttpClient.ByteDance

前言 在我们日常开发工作中对接第三方开放平台&#xff0c;找一款封装完善且全面的SDK能够大大的简化我们的开发难度和提高工作效率。今天给大家推荐一款C#开源、功能完善的字节跳动SDK&#xff1a;SKIT.FlurlHttpClient.ByteDance。 项目官方介绍 可能是全网唯一的 C# 版字节…

基于nodejs+vue全国公考岗位及报考人数分析

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…