从变更到通知:使用Python和MongoDB Change Streams实现即时事件监听

news2025/2/25 17:33:32

MongoDB提供了一种强大的功能,称为Change Streams,它允许应用程序监听数据库中的变更事件,并在数据发生变化时立即做出响应。这在mysql数据库是不具备没有这个功能的。又如:我们在支付环节想一直监听支付回调的状态,就必须定时循环。但是用到Change Streams监听功能,就不要低效率的定时循环做法。
在本文中,我们将探讨如何在Python中使用MongoDB的Change Streams来消费变更事件。
在这里插入图片描述

什么是MongoDB Change Streams?
MongoDB Change Streams是一种实时监听数据库集合变更的能力。它们可以用来监听以下类型的事件:

插入(insert)
更新(update)
删除(delete)
更多…
Change Streams可以在应用层提供数据变更的实时通知,这对于需要即时更新用户界面、触发业务流程或同步数据到其他服务的应用程序非常有用。

为什么使用Change Streams?
使用Change Streams而不是轮询机制有以下几个优势:

性能:轮询会不断查询数据库,这可能会导致不必要的负载。Change Streams仅在数据变更时提供通知,从而减少不必要的数据库交互。
实时性:Change Streams提供近乎实时的数据变更通知,这对于需要快速响应的应用程序至关重要。
可靠性:Change Streams保证不会错过任何变更事件,即使在应用程序重启之后也能从最后的位置继续监听。
如何在Python中使用Change Streams?
要在Python中使用Change Streams,您需要使用pymongo库,这是MongoDB的官方Python驱动程序。以下是如何设置和使用Change Streams的基本步骤:

要配置MongoDB的Change Streams以监控特定类型的变更,你可以使用MongoDB的聚合管道(Aggregation Pipeline)功能。通过提供一个或多个管道阶段的数组,你可以控制Change Streams的输出。

##初始化副本集
连接到任何一个MongoDB实例(通常是配置文件中指定的第一个实例),使用MongoDB Shell来初始化副本集:

mongosh   #mongo的命令行
use admin
rs.initiate({
  _id: "myReplicaSet",   #副本集名称
  members: [
    { _id: 0, host: "host1:27017" },  #host1要换成对应的公网ip才能访问,除非其它情况
    { _id: 1, host: "host2:27017" },
    { _id: 2, host: "host3:27017" }
  ]
})

在这里,host1:27017、host2:27017和host3:27017应该替换为您的MongoDB实例的实际IP地址和端口。_id是一个从0开始的唯一标识符,用于区分不同的成员。

在宝塔内如何处理
安装好mongdoDB
在这里插入图片描述
本地检查:
在这里插入图片描述
加入配置:
目录:
root@iZ2ze50t4ys98i5408heezZ:/www/server/mongodb/bin#

生成配置内

  • keyFile:
    (base) root@VM-4-7-ubuntu:/www/server/mongodb#
    openssl rand -base64 756 > /www/server/mongodb/keyfile
    chmod 400 /www/server/mongodb/keyfile
  • replication:
    replSetName: rs0 # rs0跟命令行创建的id名要一致

在下图加入配置
在这里插入图片描述

在mongdb命令行创建副本

在这里插入图片描述
MongoDB Shell支持命令历史记录功能,您可以使用上下箭头键来检索在Shell中发出的先前命令。

mondosh
rs0 [direct: primary] test> use admin
switched to db admin
rs0 [direct: primary] admin> db.auth("root","ubVvGqDmL7UAdwkG")
{ ok: 1 }
rs0 [direct: primary] admin>
##配置复制xx
var currentConfig = rs.conf();
var newConfig = { _id: "rs0", version: currentConfig.version + 1, members: [ { _id: 0, host: "8.152.219.111:27017" }] };
rs0 [direct: primary] admin> rs.reconfig(newConfig);
{
ok: 1,
'$clusterTime': {
clusterTime: Timestamp({ t: 1733560367, i: 1 }),
signature: {
hash: Binary.createFromBase64('DhedFtGMLmVNqwW7/0tGP91vKoA=', 0),
keyId: Long('7445547217475076102')
}
},
operationTime: Timestamp({ t: 1733560367, i: 1 })
}
rs0 [direct: primary] admin> rs.status() #的输出

在这里插入图片描述

然后重启,先kill -9 端口号,启动mongod命令

##使用客户端2种方式测试连接是否成功
在这里插入图片描述
在这里插入图片描述

以上如果测试通过,表示成功了。

使用代码测试:

monitor_mongo_changes.py

from pymongo import MongoClient
from pymongo.change_stream import ChangeStream

def monitor_mongo_changes(collection_name, database_name):
    """
    监控MongoDB文档变更的函数。

    :param collection_name: 要监控的MongoDB集合名称
    :param database_name: 要监控的MongoDB数据库名称
    """
    # 连接到MongoDB
    url = "mongodb://root:ub***G@8.7.0.1:27017/esg?authSource=admin&replicaSet=rs0"
    client = MongoClient(url,
                         serverSelectionTimeoutMS=10000)  # 设置超时时间为10秒


    # 选择数据库和集合
    db = client[database_name]
    collection = db[collection_name]

    # 创建一个管道,用于匹配特定条件的变更
    # pipeline = [{'$match': {'operationType': 'update'}}]
    pipeline = [
        {'$match': {'operationType': {'$in': ['insert', 'update','delete']}}}
    ]
    # 创建Change Stream并传入管道
    change_stream = collection.watch(pipeline)

    # 监听Change Stream
    for change in change_stream:
        print("Change detected:", change)
    print("end ....")

# 使用函数
monitor_mongo_changes('gress', 'esg')

运行python monitor_mongo_changes.py ,它不会自动退出,一直会监听mongdb数据库esg数据集的修改、插入、删除动作的。

在这里插入图片描述

用到的命令集

启动命令:mongod -f /www/server/mongodb/config.conf
查看启动状态:/etc/init.d/mongodb status
查看进程:ps -ef | grep mongo
mongosh    #进入mongdo的cli环境

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2256200.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Lua使用点号和冒号的区别

首先建立一个table,再分别定义两个方法,如下: local meta {}function meta:test1(...)print(self)print("")for k,v in pairs({...}) doprint(v)end endfunction meta.test2(...)print(self)print("")for k,v in pairs…

Metasploit使用

最近在学Metasploit,Metasploit是一个免费的、可下载的渗透测试框架,通过它可以很容易地获取、开发并对计算机软件漏洞实施攻击,是一个集成了渗透测试全流程的渗透工具。 图一 模块:模块组织按照不同的用途分为7种类型的模块 &am…

如何“安装Android SDK“?

一、下载 https://android-sdk.en.softonic.com/ 二、解压(不能有中文) 三、配置环境变量 1、ANDROID_HOME:D:\android-sdk 2、在Path添加文件路径 四、验证 adb version

排查bug的通用思路

⭐️前言⭐️ APP点击某个按钮没有反应/PC端执行某个操作后,响应较慢,通用的问题排查方法: 从多个角度来排查问题 🍉欢迎点赞 👍 收藏 ⭐留言评论 🍉博主将持续更新学习记录收获,友友们有任何问题可以在评…

前端路径“@/“的使用和配置

环境:vitets 需要安装types/node npm install types/node --save-dev在tsconfig.json中添加 如果有tsconfig.app.json和tsconfig.node.json文件,则在app.json中添加 "compilerOptions": {"baseUrl":".","paths&q…

node.js中实现GETPOST请求

创建基本的服务器 const express require(express); const indexRouter require(./router); // 引入路由 const app express(); const port 3000; // 挂载路由 app.use(/api, indexRouter); app.listen(port, () > {console.log(Server is running on http://localhost…

【Python】练习【24.12.8】

题目出处 《Python程序设计基础(第2版)》,李东方等 主编,电子工业出版社,北京,2020.1 第 3 章:《Python程序的基本流程控制》 题目描述 1、编写程序,从键盘输入两点的坐标(x1, y…

多项式拟合之Math.NET Numerics

**多项式拟合 今日新认识的工业编程思想之传感器测温;热敏电阻测温如何计算通过温度计算阻值的方式:多项式拟合,通常C#中使用Math.NET Numerics Math.NET Numerics 旨在为数值计算提供方法和算法 在科学、工程和日常使用中。涵盖的主题包括…

「Mac玩转仓颉内测版46」小学奥数篇9 - 基础概率计算

本篇将通过 Python 和 Cangjie 双语实现基础概率的计算,帮助学生学习如何解决简单的概率问题,并培养逻辑推理和编程思维。 关键词 小学奥数Python Cangjie概率计算 一、题目描述 假设有一个袋子中有 5 个红球和 3 个蓝球,每次从袋子中随机…

hhdb数据库介绍(10-45)

安全 数据加密 加密规则列表页 仅加载当前页面配置:添加、删除、编辑加密规则,触发局部同步加载。加载成功后,添加配置才能生效。同样也可以执行整体同步加载(页面右上角),来同步加密规则。 搜索&#x…

六安市第二届网络安全大赛复现

misc 听说你也喜欢俄罗斯方块? ppt拼接之后 缺三个角补上 flag{qfnh_wergh_wqef} 流量分析 流量包分离出来一个压缩包 出来一张图片 黑色代表0白色代表1 101010 1000 rab 反的压缩包 转一下 密码:拾叁拾陆叁拾贰陆拾肆 密文:4p4n5758…

深度学习入门课程学习笔记(第24周)

目录 摘要 Abstracts 一、何为决策树 1、决策树的组成 2、决策树的构建 二、基尼系数( CART 算法选用的评估标准) 三、决策树中的预剪枝处理(正则化) 1、限制决策树的深度 2、限制决策树中叶子结点的个数 3、限制决策树…

聊一聊常用类System

大家好,我是G探险者! 今天来聊一聊java常用类System。 事情的起因是项目里面使用了Jasypt 框架对配置项进行加密,主要是密码相关的配置,项目里面的application.yml有关密码的配置项,使用了占位符${PASSWORD}进行了占…

在windows系统用Anaconda搭建运行PyTorch识别安全帽项目的环境

一.背景 我期望基于开源项目实现工业场景中安全帽识别。之前的各种尝试,也不太顺利。发现安全帽识别的开源项目使用的是基于Python的PyTorch实现训练的。上一篇写了Python的安装,发现后续安装其他的并不方便。我为什么选择,下面再详细说原因。…

Elasticsearch入门之HTTP基础操作

RESTful REST 指的是一组架构约束条件和原则。满足这些约束条件和原则的应用程序或设计就是 RESTful。Web 应用程序最重要的 REST 原则是,客户端和服务器之间的交互在请求之间是无状态的。从客户端到服务器的每个请求都必须包含理解请求所必需的信息。如果服务器在…

Unix、GNU、BSD 风格中 ps 参数的区别

注:本文为“不同风格中 ps 命令参数的区别”相关文章合辑。 未去重。 BSD 风格和 UNIX 风格中 ps 参数的区别 作者:Daniel Stori 译者:LCTT Name1e5s | 2017-06-17 10:53 One Last Question ps aux 以及 ps -elf 都是查看进程的方式&…

设计模式の单例工厂原型模式

文章目录 前言一、单例模式1.1、饿汉式静态常量单例1.2、饿汉式静态代码块单例1.3、懒汉式单例(线程不安全)1.4、懒汉式单例(线程安全,同步代码块)1.5、懒汉式单例(线程不安全,同步代码块&#…

深入理解Java的 JIT(即时编译器)

🧑 博主简介:CSDN博客专家,历代文学网(PC端可以访问:https://literature.sinhy.com/#/literature?__c1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,…

重生之我在21世纪学C++—关系、条件、逻辑操作符

一、关系操作符 1、关系操作符介绍 用于比较的表达式,称为 “关系表达式”(relational expression),里面使用的运算符称为 “关系运算符”(relational operator),主要有下面 6 个: 运算符描述>大于运算符,用于比…

工作:SolidWorks从3D文件导出2D的DWG或DXF类型文件方法

工作:SolidWorks从3D文件导出2D的DWG或DXF类型文件方法 SolidWorks从3D文件导出2D的DWG或2D DXF类型文件方法(一)打开3D文件(二)从装配体到工程图(三)拖出想要的角度的图型(四&#…