Mongodb 开启oplog,java监听oplog并写入关系型数据库

news2025/2/25 12:20:56

开启Oplog

windows mongodb bin目录下找到配置文件/bin/mongod.cfg,配置如下:

replication:
  replSetName: local
  oplogSizeMB: 1024

在这里插入图片描述
双击mongo.exe
在这里插入图片描述
在这里插入图片描述
执行

rs.initiate({_id: "local", members: [{_id: 0, host: "localhost:27017"}]})

若出现如下情况则成功

{
	"ok" : 1,
	"operationTime" : Timestamp(1627503341, 1),
	"$clusterTime" : {
		"clusterTime" : Timestamp(1627503341, 1),
		"signature" : {
			"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
			"keyId" : NumberLong(0)
		}
	}
}

监听Oplog日志

pom

 	<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.10</version>
        <relativePath/>
    </parent>
    
		<dependency>
        	<groupId>org.springframework.boot</groupId>
       	 	<artifactId>spring-boot-starter-data-mongodb</artifactId>
   	 	</dependency>
 		 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver</artifactId>
            <version>3.12.7</version>
        </dependency>
        <dependency>
            <groupId>com.vividsolutions</groupId>
            <artifactId>jts</artifactId>
            <version>1.13</version>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-spatial</artifactId>
            <version>5.3.0.Beta1</version>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-java8</artifactId>
            <version>5.3.0.Beta1</version>
        </dependency>
        <dependency>
            <groupId>com.bedatadriven</groupId>
            <artifactId>jackson-datatype-jts</artifactId>
            <version>2.3</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>

配置

spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/databaseName?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&currentSchema=public
spring.datasource.username=postgres
spring.datasource.password=123456
spring.jpa.database=postgresql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.spatial.dialect.postgis.PostgisDialect
server.port=10050
spring.data.mongodb.uri=mongodb://admin:123456@localhost:27017/?authSource=admin
spring.data.mongodb.database=databseName

代码

  import com.mongodb.CursorType;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.util.JSON;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.Query;

@Slf4j
@Component
public class OplogListener implements ApplicationListener<ContextRefreshedEvent> {
    @Resource
    private MongoTemplate mongoTemplate;
    @Resource
    private EntityManager entityManager;


    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        MongoDatabase db = mongoTemplate.getMongoDatabaseFactory().getMongoDatabase("local");
        MongoCollection<Document> oplog = db.getCollection("oplog.rs");

        BsonTimestamp startTS = getStartTimestamp();
        BsonTimestamp endTS = getEndTimestamp();

        Bson filter = Filters.and(Filters.gt("ts", startTS));

        MongoCursor<Document> cursor = oplog.find(filter).cursorType(CursorType.TailableAwait).iterator();

        while (true) {
            if (cursor.hasNext()) {
                Document doc = cursor.next();
                String operation = doc.getString("op");

                if (!"n".equals(operation)) {
                    String namespace = doc.getString("ns");
                    String[] nsParts = StringUtils.split(namespace, ".");
                    String collectionName = nsParts[1];
                    String databaseName = nsParts[0];
                    Document object = (Document) doc.get("o");
                    log.info("同步数据:databse-{}  collention-{}  data-{}", databaseName, collectionName, object);
                    if ("i".equals(operation)) {
                        insert((Document) doc.get("o"), databaseName, collectionName);
                    } else if ("u".equals(operation)) {
                        update((Document) doc.get("o"), (Document) doc.get("o2"), databaseName, collectionName);
                    } else if ("d".equals(operation)) {
                        delete((Document) doc.get("o"), databaseName, collectionName);
                    }
                }
            }
        }
    }

    private BsonTimestamp getStartTimestamp() {
        long currentSeconds = System.currentTimeMillis() / 1000;
        return new BsonTimestamp((int) currentSeconds, 1);
    }

    private BsonTimestamp getEndTimestamp() {
        return new BsonTimestamp(0, 1);
    }

    private void insert(Document object, String databaseName, String collectionName) {
        entityManager.getTransaction().begin();
        try {
            String json = JSON.serialize(object);
            Query query = entityManager.createNativeQuery("INSERT INTO " + collectionName + " (json) VALUES (:json)");
            query.setParameter("json", json);
            query.executeUpdate();
            entityManager.getTransaction().commit();
        } catch (Exception e) {
            entityManager.getTransaction().rollback();
            throw new RuntimeException(e);
        }
    }

    private void update(Document object, Document update, String databaseName, String collectionName) {
        entityManager.getTransaction().begin();
        try {
            String json = JSON.serialize(object);
            String updateJson = JSON.serialize(update);
            Query query = entityManager.createNativeQuery("UPDATE " + collectionName + " SET json = :json WHERE json = :updateJson");
            query.setParameter("json", json);
            query.setParameter("updateJson", updateJson);
            query.executeUpdate();
            entityManager.getTransaction().commit();
        } catch (Exception e) {
            entityManager.getTransaction().rollback();
            throw new RuntimeException(e);
        }
    }

    private void delete(Document object, String databaseName, String collectionName) {
        entityManager.getTransaction().begin();
        try {
            String json = JSON.serialize(object);
            Query query = entityManager.createNativeQuery("DELETE FROM " + collectionName + " WHERE json = :json");
            query.setParameter("json", json);
            query.executeUpdate();
            entityManager.getTransaction().commit();
        } catch (Exception e) {
            entityManager.getTransaction().rollback();
            throw new RuntimeException(e);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1285649.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【已解决】MySQL:执行存储过程报错(MySQL字符集和排序方式冲突)

目录 问题现象&#xff1a; 问题分析&#xff1a; 解决方法&#xff1a; 拓展&#xff1a; 1、转换条件两边的字段或值为二进制数据&#xff1a; 2、转换条件两边的字段或值的字符集和排序方式&#xff1a; 3、修改列、表、库的字符集和排序方式 参考链接&#xff1a; 问…

托盘四向穿梭车自动化密集库供应|单机智能向系统智能跨越的HEGERLS托盘四向车系统

随着物流产业的迅猛发展&#xff0c;托盘四向穿梭式自动化密集仓储系统可认为是在穿梭车货架系统基础上提出的一种新仓储概念。托盘四向穿梭式立体库因其在流通仓储体系中所具有的高效密集存储功能优势、运作成本优势与系统化智能化管理优势&#xff0c;已发展为仓储物流的主流…

第73讲:深入理解MySQL数据库InnoDB存储引擎:内存结构、磁盘结构与后台线程全面解析

文章目录 1.InnoDB存储引擎的架构2.InnoDB存储引擎的内存结构2.1.Buffer Pool缓冲池2.2.Change Buffer更改缓冲区2.3.自适应Hash索引2.4.Log Buffer日志缓冲区 3.InnoDB存储引擎的磁盘结构3.1.System Tablespace系统表空间3.2.File-Per-Table Tablespaces每个表都有单独的表空间…

基于轻量级模型GHoshNet开发构建眼球眼疾识别分析系统,构建全方位多层次参数对比分析实验

工作中经常会使用到轻量级的网络模型来进行开发&#xff0c;所以平时也会常常留意使用和记录&#xff0c;在前面的博文中有过很多相关的实践工作&#xff0c;感兴趣的话可以自行移步阅读即可。 《移动端轻量级模型开发谁更胜一筹&#xff0c;efficientnet、mobilenetv2、mobil…

VS2022 显示参数类型

VS2022 显示参数类型 VS2022的智能感知功能非常强大&#xff0c;提供了类似clangd的IntelliSense。 设置方法 有时候需要代码补全&#xff0c;代码类型补全提示&#xff0c;极有可能消耗内存和运存。所以记录一下开关这个。

IDEA启动失败报错解决思路

IDEA启动失败报错解决思路 背景&#xff1a;在IDEA里安装插件失败&#xff0c;重启后直接进不去了&#xff0c;然后分析问题解决问题的过程记录下来。方便下次遇到快速解决。也是一种解决问题的思路&#xff0c;分享出去。 启动报错信息 Internal error. Please refer to https…

无频闪护眼灯哪个好?顶级无蓝光频闪护眼台灯推荐

国家卫生健康委员会疾控局宋士勋表示&#xff0c;根据近期发布的2021年监测数据来看&#xff0c;截至2020年&#xff0c;我国儿童青少年总体的近视率是52.7%&#xff0c;从不同年龄段来看&#xff0c;幼儿园6岁孩子的近视率达到14.3%&#xff0c;小学达到35.6%&#xff0c;初中…

Tubulysin C 微管蛋白C 205304-88-7

Tubulysin C 微管蛋白C 205304-88-7 英文名称&#xff1a;Tubulysin C 中文名称&#xff1a;微管蛋白C 化学名称&#xff1a;(2S,4R)-4-[[2-[(1R,3R)-1-乙酰氧基-4-甲基-3-[[(2S,3S)-3-甲基-2-[[(2R)-1 -甲基哌啶-2-羰基]氨基]戊酰基]-(丙酰氧基甲基)氨基]戊基]-1,3-噻唑-4-羰基…

3、RocketMQ源码分析(三)

RocketMQ源码-NameServer架构设计及启动流程 本文我们来分析NameServer相关代码&#xff0c;在正式分析源码前&#xff0c;我们先来回忆下NameServer的功能&#xff1a; NameServer是一个非常简单的Topic路由注册中心&#xff0c;其角色类似Dubbo中的zookeeper&#xff0c;支…

【3】密评-物理和环境安全测评

0x01 依据 GB/T 39786 -2021《信息安全技术 信息系统密码应用基本要求》针对等保三级系统要求&#xff1a; 物理和环境层面&#xff1a; a&#xff09;宜采用密码技术进行物理访问身份鉴别,保证重要区域进入人员身份的真实性&#xff1b; b&#xff09;宜采用密码技术保证电子门…

Vue JAVA开发常用模板

1.VsCode添加模板 左下角设置》用户代码片段 新建全局代码片段》将模板粘贴仅文件&#xff08;prefix用于指定触发关键字&#xff09; 添加成功过后输入配置的关键字即可使用 1.1 vue2模板 {// Example:"Print to console": {"prefix": "vue2",…

matplotlib多子图

matplotlib画图中一个轴占据多个子图 - 知乎 import matplotlib.pyplot as plt fig plt.figure() gs fig.add_gridspec(2,4) ax1 fig.add_subplot(gs[0, 0:2]) ax2 fig.add_subplot(gs[0, 2:]) axa fig.add_subplot(gs[1, 1]) axb fig.add_subplot(gs[1, 2]) axc fig.add…

Stable diffusion ai图像生成本地部署教程

前言 本文将用最干最简单的方式告诉你怎么将Stable Diffusion AI图像生成软件部署到你的本地环境 关于Stable Diffusion的实现原理和训练微调请看我其他文章 部署Stable Diffusion主要分为三个部分 下载模型&#xff08;模型可以认为是被训练好的&#xff0c;生成图像的大脑…

MIT线性代数笔记-第23讲-微分方程,exp(At)

目录 23.微分方程&#xff0c; e x p ( A t ) exp(At) exp(At)用矩阵求解微分方程矩阵指数二阶常微分方程 打赏 23.微分方程&#xff0c; e x p ( A t ) exp(At) exp(At) 用矩阵求解微分方程 例&#xff1a; { d u 1 d t − u 1 2 u 2 d u 2 d t u 1 − 2 u 2 \left \{ \b…

(C语言)计算n的阶乘

要求使用双精度 #include<stdio.h> double factorial(int n) {if(n 1)return 1;return n * factorial(n-1); } int main() {int n ;double res;scanf("%d",&n);res factorial(n);printf("%lf",res); return 0; } 运行截图&#xff1a; 注&am…

第四期丨酷雷曼无人机技能培训

第4期无人机技能培训 2023年10月25日&#xff0c;酷雷曼无人机技能培训及执照考试第四期成功举办&#xff0c;自7月份首期开办以来&#xff0c;已按照每月一期的惯例连续举办四期&#xff0c;取得了极为热烈的反响。 随着无人机培训的重要性及影响力逐渐扩大&#xff0c;参加培…

documents4j 文档格式转换

【版权所有&#xff0c;文章允许转载&#xff0c;但须以链接方式注明源地址&#xff0c;否则追究法律责任】【创作不易&#xff0c;点个赞就是对我最大的支持】 前言 仅作为学习笔记&#xff0c;供大家参考 总结的不错的话&#xff0c;记得点赞收藏关注哦&#xff01; 目录 …

LeetCode刷题---两两交换链表中的节点

个人主页&#xff1a;元清加油_【C】,【C语言】,【数据结构与算法】-CSDN博客 个人专栏&#xff1a;http://t.csdnimg.cn/D9LVS 前言&#xff1a;这个专栏主要讲述递归递归、搜索与回溯算法&#xff0c;所以下面题目主要也是这些算法做的 我讲述题目会把讲解部分分为3个部分…

电梯安全远程监控系统的主要作用和意义

电梯是现代城市生活中必不可少的交通工具&#xff0c;为了保证其安全可靠的运行&#xff0c;电梯运行监测系统应运而生。本文将介绍电梯安全远程监控的工作原理、重要性 一、电梯安全远程监控系统的作用   ◆实时监控和故障预警&#xff1a;电梯安全远程监控系统可以实时监测…

NodeJs脚手架(Koa)的简单使用

文章目录 前言一、与express的区别express-generator 提供的功能如下koa-generator 提供的功能如下两个生成器共同支持的项目骨架描述如下 二、使用步骤安装 Koa 生成器使用koa2创建项目PM2的使用 三、基础目录说明配置文件package.json入口文件 bin/www核心文件 app.jsroutes …