文章目录
- 单例模式介绍
- 模块简介
- 安装
- 简单的连接使用
- 单例模式的连接
- 单例类的实现
- 配置的使用
- 单例模式的测试
- 单例连接的调用
https://gitee.com/allen-huang/python
单例模式介绍
适用场景:
单例模式只允许创建一个对象,因此节省内存,加快对象访问速度,因此对象需要被公用的场合适合使用,如多个模块使用同一个数据源连接对象等等。如:
- 需要频繁实例化然后销毁的对象。
- 创建对象时耗时过多或者耗资源过多,但又经常用到的对象。
- 有状态的工具类对象。
- 频繁访问数据库或文件的对象。
以下都是单例模式的经典使用场景:
- 资源共享的情况下,避免由于资源操作时导致的性能或损耗等。如上述中的日志文件,应用配置。还有windows系统的回收站和任务管理器,只能打开一个。
- 控制资源的情况下,方便资源之间的互相通信。多线程的线程池的设计一般就是采用单例模式,这是由于线程池要方便对池中的线程进行控制。
接下来就是以 pymongo模块的来举例说明
模块简介
pymongo 是 python 操作 mongodb 的官方库,它pymongo 提供了mongdb和python交互的所有方法,文档地址:https://www.mongodb.com/docs/drivers/pymongo/
安装
pip install pymongo
简单的连接使用
先使用常规的连接方式来创建 pymongo 的连接看下,下面展示的是一个测试用例代码,具体源码在 gitee 上
源码的具体链接:https://gitee.com/allen-huang/python/blob/master/python-code/do-mongodb/test_client.py
- 方式1:
...
def test_client1(self):
"""
简单连接1
@return:
"""
# 创建连接
client = MongoClient(host="127.0.0.1", port=27017, username="admin", password="YrnKzEubSv6")
# 选择一个库下面的集合,如果集合不存在,则自动新建
coll = client['test']['user']
print(coll.find_one({'uid': 1}))
pass
...
- 方式2:
...
def test_client2(self):
"""
简单连接方式2:使用连接字符串
@return:
"""
# 创建连接
uri = f"mongodb://admin:YrnKzEubSv6@127.0.0.1:27017/?maxPoolSize=20"
client = MongoClient(uri, connectTimeoutMS=5000)
# 选择一个库下面的集合,如果集合不存在,则自动新建
coll = client['test']['user']
print(coll.find_one({'uid': 1}))
pass
...
单例模式的连接
这里创建单例模式,是基于__new__方法结合 pymongo的连接池来实现,pymongo 的连接池是内部已经集成了的,只要设置 maxPoolSize 参数就行。
单例类的实现
import threading
from pymongo import MongoClient
from conf.load_config import Config
class MongoPool(object):
"""
mongodb连接池创建
"""
_pool = None
_instance = None
_lock = threading.Lock() # 线程锁,是属于同步锁
def __new__(cls):
if not cls._instance: # 在并发进来的时候,只会创建一次,如果已经创建了,就不用排队再去等待上锁了再判断实例是否为空,这样子可以提高性能。
with cls._lock: # 类似JAVA的synchronized,自动获取和释放锁
if not cls._instance:
cls._instance = cls.get_pool()
return cls._instance
@classmethod
def get_pool(cls):
"""
获取连接池实例
"""
user = Config['mongodb']['user']
host = Config['mongodb']['host']
port = Config['mongodb']['port']
password = Config['mongodb']['password']
maxPoolSize = Config['mongodb']['maxPoolSize']
connectTimeoutMS = Config['mongodb']['connectTimeoutMS']
# 连接mongodb的URI
uri = f"mongodb://{user}:{password}@{host}:{port}/?maxPoolSize={maxPoolSize}"
client = MongoClient(uri, connectTimeoutMS=connectTimeoutMS)
return client
配置的使用
- 配置的格式:
是基于 yaml 来的,根据不同服务器环境变量来 自动匹配配置文件,pyYaml的使用参考这一篇文章。
安装:
pip install pyyaml
- 环境变量来控制不同的配置
可以根据服务器各自的环境变量 来匹配(开发、测试、生产环境)的配置文件,这样子的好处就是各自的环境配置都不受到影响,提高的系统的稳定
我这边测试的是mac本地,在.zshrc文件中,环境变量共用了之前 chatgpt 项目设置好的环境变量CHATGPT_PYTHON_ENV
,在测试和生产环境的原理也一样,只需要改成CHATGPT_PYTHON_ENV="test"
和CHATGPT_PYTHON_ENV="prod"
就行。
- 配置加载代码
源码的链接地址:https://gitee.com/allen-huang/python/tree/master/conf/load_config.py
import os
from os.path import abspath, dirname
import yaml
# 获取各个环境对应的配置文件
CONFIG_FILES = {
'dev': 'config-dev.yaml',
'test': 'config-test.yaml',
'prod': 'config-prod.yaml'
}
# 获取配置
Config = {}
def load_config():
global Config
# 获取当前运行环境
env = os.getenv('CHATGPT_PYTHON_ENV', 'dev')
if env not in CONFIG_FILES:
raise ValueError('没有该环境的配置文件')
with open(abspath(dirname(__file__)) + '/' + CONFIG_FILES[env], 'r', encoding='utf-8') as f:
try:
Config = yaml.safe_load(f)
except yaml.YAMLError as e:
raise e
# 读取当前项目的运行环境,如果没有则默认是dev环境
try:
load_config()
except Exception as e:
print(e)
单例模式的测试
开启10个线程测试, 这里不采用单元测试,而是采用__main__里面执行线程,因为单元测试使用多线程会有问题,线程执行不全。
源码地址:https://gitee.com/allen-huang/python/blob/master/python-code/do-mongodb/test_client_singleton.py
import threading
import time
from db.mongo_pool import MongoPool
def task(arg):
mongo_obj = MongoPool()
# 连接池对象是否是同一个地址
str = f"执行编号:{arg}, 连接对象的内存地址: {id(mongo_obj)}"
# 等待2秒
time.sleep(2)
print(str)
# 使用多线程进行测试
if __name__ == '__main__':
thread_list = []
for i in range(10):
t = threading.Thread(target=task, args=(i,))
thread_list.append(t)
t.start()
for t in thread_list:
t.join()
单例连接的调用
下面是测试用例的代码,具体源码看链接https://gitee.com/allen-huang/python/blob/master/python-code/do-mongodb/test_client.py
...
def test_client3(self):
"""
简单连接方式3:使用单例
@return:
"""
data = MongoPool().test.user.find_one({'uid': 1})
print(data)
...