python操作mongodb底层封装并验证pymongo是否应该关闭连接

news2025/1/20 14:52:54

一、pymongo简介

https://pymongo.readthedocs.io/en/stable/github地址:https://github.com/mongodb/mongo-python-driver

mymongo安装命令:pip install pymongo==4.7.2

mymongo接口文档:PyMongo 4.7.2 Documentation

PyMongo发行版包含Python与MongoDB数据库交互的工具。bson包是用于Python的bson格式的实现。pymongo包是MongoDB的原生Python驱动程序。gridfs包是pymongo之上的一个gridfs实现。pymongo是python与mongodb交互的首选方式。

二、封装接口

import uuid
from datetime import datetime, timezone
from pydantic import BaseModel,ConfigDict
from pycommon.utils.mongo.mongo_collection_pool import mongo_collection_pool
from pymongo.cursor import Cursor
from pymongo.collection import Collection

class BaseDbService:
    def __init__(self, mongo_settings: MongoSettings, encryptor_key: str) -> None:
        self.mongo_settings = mongo_settings
        self.encryptor_key = encryptor_key

    def create(self, data: T, user_id: str, tenant_id: str = None, collection_name: str = None) -> str:
        collection: Collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
        if not get_attr_value(data, 'id'):
            set_attr_value(data, 'id', f"{uuid.uuid1()}")
        set_attr_value(data, 'created_by', user_id)
        set_attr_value(data, 'created_time', datetime.now(timezone.utc))
        request = data.model_dump(by_alias = True)

        id = collection.insert_one(request).inserted_id
        logger.log_info(f'{type(data).__name__} id {id} is created')
        return id
    
    def create_list(self, datas: list[T], user_id: str, tenant_id: str = None, collection_name: str = None) -> list[str]:
        collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
        request_list = []
        for data in datas:
            if not get_attr_value(data, 'id'):
                set_attr_value(data, 'id', f"{uuid.uuid1()}")
            set_attr_value(data, 'created_by', user_id)
            set_attr_value(data, 'created_time', datetime.now(timezone.utc))
            request = data.model_dump(by_alias = True)
            
            request_list.append(request)

        ids = collection.insert_many(request_list).inserted_ids
        logger.log_info(f'{type(data).__name__} ids {ids} are created')
        return ids

    def update(self, data: T, user_id: str, tenant_id: str = None, collection_name: str = None):
        if not get_attr_value(data, 'id'):
            raise ValueError(f'{type(data).__name__} id is null or empty when update, user_id={user_id}, tenant_id={tenant_id}, collection_name={collection_name}')
        
        set_attr_value(data, 'updated_by', user_id)
        set_attr_value(data, 'updated_time', datetime.now(timezone.utc))
        collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
        # response is dict type
        original_data = collection.find_one(data.id)
        if original_data:
            if "createdBy" in original_data:
                set_attr_value(data, "created_by", original_data["createdBy"])
            if "createdTime" in original_data:
                set_attr_value(data, "created_time", original_data["createdTime"])
            request = data.model_dump(by_alias = True)
            collection.find_one_and_replace({"_id": data.id}, request)
            logger.log_info(f'{type(data).__name__} id {id} is updated')
        else:
            logger.log_info(f'{type(data).__name__} id {id} does not exist when update')

    def get(self, id: str, responseType: TResponse, tenant_id: str = None, collection_name: str = None) -> TResponse:
        if not id:
            logger.log_info(f'{type(responseType).__name__} id is null or empty when get')
            return None

        collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
        result = collection.find_one(id)
        return responseType.model_validate(result)
    
    def get_list(self, ids: list[str], entityType: T, tenant_id: str = None, collection_name: str = None) -> list[T]:
        if not ids:
            logger.log_info(f'{type(entityType).__name__} ids is null or empty when get_list')
            return None

        collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)
        response = list(collection.find({"_id": { '$in': ids }}))
        if not response:
            return []
        result = []
        for item in response:
            result_item = entityType.model_validate(item)
            result.append(result_item)
        return result

整体思路是创建BaseDbService实例时指定目标数据库,进行具体的CRUD操作时再指定表名以及查询结果要转换成的类型(pymongo查询出结果后默认是dict类型),这样底层的数据库服务就能操作任意库的所有表了,而且最终获取的数据是具体类型的对象。

实际上用户大致会按照下述方式使用BaseDbService:

if __name__ == '__main__':
    mongo_settings = MongoSettings(connection_string="mongodb://localhost:27017/", database_name="test-activity")
    db_service = BaseDbService(mongo_settings, "bWRHdzkzVDFJbWNB")

    collection_name = "pycommon"
    tenant_id = "test-tenant-id"
    result = db_service.get("74bae4fe-13ec-11ef-be60-7404f152b5a1", ActivityQueryItem, tenant_id, collection_name)

三、数据库连接池机制 

第二步中对数据库表操作之前都要先执行mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)来获取collection的连接,mongo_collection_pool是一个单例对象,内部有一个缓存机制来存储所有已经操作过的client、database和collection,连接从不关闭,这样可以保证连接不会重复的创建销毁从而提升性能。

看到这里有些细心的朋友可能会提问连接从不关闭,不会造成当前用户过多导致新用户无法连接的情况吗?本节先展示数据库连接池,大家的疑问咱们在下边有验证

from pymongo import MongoClient
from pymongo.database import Database
from pymongo.collection import Collection
from pycommon.models.mongo.mongo_settings import MongoSettings

class MongoCollectionPool:
    def __init__(self):
        self.mongo_client_cache = {}
        self.mongo_database_cache = {}
        self.mongo_collection_cache = {}
    
    def get_collection(self, mongo_settings: MongoSettings, collection_name: str, tenant_id: str) -> Collection:
        client_cache_key = mongo_settings.connection_string
        database_name = mongo_settings.database_name
        database_cache_key = client_cache_key + "_" + mongo_settings.database_name
        collection_name = collection_name + "_" + tenant_id if tenant_id else collection_name
        collection_cache_key = client_cache_key + "_" + mongo_settings.database_name + "_" + collection_name

        mongo_client: MongoClient = None
        if client_cache_key in self.mongo_client_cache:
            mongo_client = self.mongo_client_cache[client_cache_key]
        else:
            mongo_client = MongoClient(mongo_settings.connection_string)
            self.mongo_client_cache[client_cache_key] = mongo_client

        database: Database = None
        if database_cache_key in self.mongo_database_cache:
            database = self.mongo_database_cache[database_cache_key]
        else:
            database = mongo_client[database_name]
            self.mongo_database_cache[database_cache_key] = database

        if collection_cache_key in self.mongo_collection_cache:
            return self.mongo_collection_cache[collection_cache_key]
        else:
            collection = database[collection_name]
            self.mongo_collection_cache[collection_cache_key] = collection
            return collection

# 单例对象
mongo_collection_pool = MongoCollectionPool()
from typing import Optional
from pycommon.models.base_core_model import BaseCoreModel

class MongoSettings(BaseCoreModel):
    connection_string: Optional[str] = None
    database_name: Optional[str] = None

四、mongodb数据库连接是否需要手动关闭

在判断是否需要手动关闭连接时,先要了解连接是在什么时机开启的(这样我们就能知道做哪些操作会创建新的连接),以及对象释放时是否会自动关闭(这样我们就可以根据不同的项目类型来设计不同的架构)。

为了查看mongodb当前的数据库状态,我们需要用到mongosh工具(我这里用的是mongosh-2.2.6-win32-x64,大家需要的话我可以私发到个人邮箱)。

mongosh安装方式:解压后把bin目录下的两个文件拷贝到C:\Windows\System32。

查看mongodb数据库状态:打开cmd -> 输入mongosh -> 输入db.serverStatus().connections

connections中文说明:serverStatus - MongoDB 手册 v 6 。 0

connections英文说明:serverStatus - MongoDB Manual v7.0

测试项目情况:定时任务项目(执行完毕立即退出)

下面是我测试时了解到的情况(详情请看截图):

  • mongodb默认的可用线程高达100w
  • 创建MongoClient时,active+1、current+2、threaded+2、available-2
  • 创建DataBase时,exhaustHello+1
  • 创建Collection时,数据库状态无变化
  • 执行collectionA.find_one()方法时,current+1、threaded+1、available-1
  • 继续执行collectionA.find()时,数据库状态无变化
  • 继续执行collectionB.find()时,数据库状态无变化
  • 项目结束后数据库状态恢复原状

得出结论:

  • mongodb的可用性很强大,对于web服务类型这种常活程序来说只处理一个数据库且并发不高时,手动关闭更好不关也可以接受
  • 对于定时任务这种用完即销毁的程序来说不需要考虑关闭连接的问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1717162.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI绘画Stable Diffusion【实战】一下给我整破防了!!SD制作商业海报——香水海报

大家好,我是画画的小强 今天给大家分享关于SD制作商业海报案例——香水海报。内容很干,但很实用,后续也会持续分享一些其他商业海报的制作案例,喜欢的朋友请持续关注 人“狠”话不多,大家请上车!&#xff…

VB.net 进行CAD二次开发(二)

利用参考文献2,添加面板 执行treeControl New UCTreeView()时报一个错误: 用户代码未处理 System.ArgumentException HResult-2147024809 Message控件不支持透明的背景色。 SourceSystem.Windows.Forms StackTrace: 在 System.Windows…

Java流与链表:探索java.util.stream与LinkedList的交汇点

在现代Java开发中,流(Streams)和链表(LinkedList)都是强大且常用的数据处理工具。java.util.stream提供了高效的方式来处理数据流,而LinkedList则是java.util包中的经典集合实现。本文将探索它们的交汇点&a…

手拉手springboot整合kafka发送消息

环境介绍技术栈springbootmybatis-plusmysqlrocketmq软件版本mysql8IDEAIntelliJ IDEA 2022.2.1JDK17Spring Boot3.1.7kafka2.13-3.7.0 创建topic时,若不指定topic的分区(Partition主题分区数)数量使,则默认为1个分区(partition) springboot加入依赖kafk…

MySQL排序字段无法唯一标识一条数据,导致分页查询结果出现重复或遗漏问题

问题描述 MySQL排序字段无法唯一标识一条数据,且排序字段可能有多个,多个排序字段无法唯一标识一条数据,即排序字段标识的相同数据过多,可能导致分页查询出现重复或遗漏的情况。 具体说明:      如果在排序条件中…

Stable Diffusion AI绘画:从创意词汇到艺术图画的魔法之旅

文章目录 一、Stable Diffusion的工作原理二、从提示词到模型出图的过程三、Stable Diffusion在艺术创作中的应用《Stable Diffusion AI绘画从提示词到模型出图》内容简介作者简介楚天 目录前言/序言本书特色特别提示 获取方式 在科技的飞速发展中,Stable Diffusion…

江协科技STM32学习-1 购买24Mhz采样逻辑分析仪

前言: 本文是根据哔哩哔哩网站上“江协科技STM32”视频的学习笔记,在这里会记录下江协科技STM32开发板的配套视频教程所作的实验和学习笔记内容。本文大量引用了江协科技STM32教学视频和链接中的内容。 引用: STM32入门教程-2023版 细致讲…

机器人系统ros2-开发学习实践16-RViz 用户指南

RViz 是 ROS(Robot Operating System)中的一个强大的 3D 可视化工具,用于可视化机器人模型、传感器数据、路径规划等。以下是RViz用户指南,帮助你了解如何使用RViz来进行机器人开发和调试。 启动可视化工具 ros2 run rviz2 rviz2…

【React篇】组件错误边界处理(组件错误引起的页面白屏)

我们知道在生产环境react错误会导致整个页面崩溃,显示为空白页面。 比如下图的错误,导致了左侧页面直接白屏: 由于某一个组件报错导致整个页面崩溃是很严重的问题,那么我们应该如何去降低代码报错带来的影响呢? 我们…

Rockchip芯片 写SN,IMEI,Mac等 写attenstation key 写Remote Key Provisioning

下载AP 写SN等 关机下 按住“音量” 插入USB线 进入loader 方式,在该模式下面写号,设备必须是已经有烧写过固件。 输入sn,点写入,成功。 点读取,成功。 两种设备模式:maskrom 和 loader 模式 maskrom 进…

灶新趋势,跌下神坛的电磁炉,为何被人嫌弃

在厨电市场的广阔天地中,电燃灶与电磁炉作为两种截然不同的烹饪工具,其间的竞争与演变始终牵动着消费者的心弦。近年来,电燃灶以其独特的优势崭露头角,而电磁炉则似乎从昔日的辉煌中跌下神坛,遭到越来越多人的嫌弃。这…

[排序算法]插入排序+希尔排序全梳理!

目录 1.排序是什么?1.1排序的概念1.2排序运用1.3常见的排序算法 2.插入排序分类3.直接插入排序基本思想具体步骤:动图演示代码实现直接插入排序的特性总结: 4. 希尔排序基本思想具体步骤动图演示代码实现希尔排序的特性总结: 5.总…

一种改进的经验小波变换方法(Python环境)

经验小波变换EWT是Gilles基于小波分析理论提出的一种新的自适应信号分解方法,该方法主要分为三个步骤:1.根据傅里叶谱的特性自适应划分频谱,获得一组边界;2.根据边界序列和Meyer小波构造滤波器组;3.滤波重构&#xff0…

Django——Admin站点(Python)

#前言: 该博客为小编Django基础知识操作博客的最后一篇,主要讲解了关于Admin站点的一些基本操作,小编会继续尽力更新一些优质文章,同时欢迎大家点赞和收藏,也欢迎大家关注等待后续文章。 一、简介: Djan…

Midjourney应用:电商模特换装

今天我们应用的是Midjourney应用:电商模特换装 网上找到一件衣服,没有模特 方法一:两图片融合,BLEND命令,效果不是很理想失真 方法二:服装图片垫图说明细节缺失https://cdn.discordapp.com/attachments/1…

map/set和unordered_map/unordered_set的区别及其适用情况

本专栏内容为:C学习专栏,分为初阶和进阶两部分。 通过本专栏的深入学习,你可以了解并掌握C。 💓博主csdn个人主页:小小unicorn ⏩专栏分类:C 🚚代码仓库:小小unicorn的代码仓库&…

我喜欢的vscode插件

有个更全的:提高编程效率的30个VScode插件 Image preview(图片预览) any-rule(正则表达式大全) px to rem & rpx & vw(cssrem)(px和rem之间转换) 小程序开发助手 Auto Close Tag A…

【Vulhub】Fastjson 1.2.24_rce复现

文章目录 一,Fastjson是什么?二,fastjson漏洞原理三,判断是否有fastjson反序列化四,复现Fastjson 1.2.24_rce(vulhub)环境配置1.判断是否存在Fastjson反序列化2.反弹shell3.启动RMI服务器4.构造恶意POST请求 一&#x…

【赠书第26期】AI绘画教程:Midjourney使用方法与技巧从入门到精通

文章目录 前言 1 Midjourney入门指南 1.1 注册与登录 1.2 界面熟悉 1.3 基础操作 2 Midjourney进阶技巧 2.1 描述词优化 2.2 参数调整 2.3 迭代生成 3 Midjourney高级应用 3.1 创意启发 3.2 团队协作 3.3 商业应用 4 总结与展望 5 推荐图书 6 粉丝福利 前言 在…

使用QT可视化操作信号与槽函数详解

新书上架~👇全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我👆,收藏下次不迷路┗|`O′|┛ 嗷~~ 目录 一、引言 二、QT信号与槽机制概述 三、实际操作步骤 四、案例演示 五、总结 一、引言 在…