SQLAlchemy 使用封装实例

news2024/11/13 20:46:30

类封装

database.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import json
import logging
from datetime import datetime

from core.utils import classlock, parse_bool
from core.config import (
    MYSQL_HOST,
    MYSQL_PORT,
    MYSQL_USER,
    MYSQL_PASS,
    MYSQL_DATABASE,
    MYSQL_TIMEOUT
)

from sqlalchemy import create_engine, Column, desc, not_, func
from sqlalchemy import Integer, String, Boolean, DateTime, Text, Enum     # Text存储大不固定长的字符串
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import SQLAlchemyError

Base = declarative_base()

log = logging.getLogger("log")

SCHEMA_VERSION = "1.0.0"


class User(Base):
    __tablename__ = "user"

    id = Column(Integer(), primary_key=True)
    file_size = Column(Integer(), nullable=False)      # nullable 不可为空
    md5 = Column(String(32), nullable=False)
    crc32 = Column(String(8), nullable=False)
    sha1 = Column(String(40), nullable=False)
    sha256 = Column(String(64), nullable=False)
    sha512 = Column(String(128), nullable=False)
    memory = Column(Boolean, nullable=False, default=False)
    ssdeep = Column(String(255), nullable=True)
    start_time = Column(DateTime(timezone=False), nullable=True, default=datetime.now)

    def __repr__(self):   # 查询返回的结果
        return "<User('{0}','{1}')>".format(self.id, self.sha256)

    def to_dict(self):
        """将对象转换为dict.
        @return: dict
        """
        d = {}
        for column in self.__table__.columns:
            d[column.name] = getattr(self, column.name)
        return d

    def to_json(self):
        """将对象转换为JSON.
        @return: JSON data
        """
        return json.dumps(self.to_dict())

class Version(Base):
    """用于确定实际数据库架构发布的表."""
    __tablename__ = "version"

    version_num = Column(String(32), nullable=False, primary_key=True)

class Database(object):
    """
    分析队列数据库
    此类处理为内部队列创建数据库用户经营它还提供了一些与之交互的功能
    """

    def __init__(self, schema_check=True, echo=False):
        """
        @param dsn: 数据库连接字符串.
        @param schema_check: 禁用或启用数据库架构版本检查.
        @param echo: echo sql 查询.
        """
        self._lock = None
        self.schema_check = schema_check
        self.echo = echo

    def connect(self, schema_check=None, dsn=None, create=True):
        """连接到数据库后端."""
        if schema_check is not None:
            self.schema_check = schema_check

        if not dsn:
            dsn = "mysql://{0}:{1}@{2}:{3}/{4}".format(MYSQL_USER, MYSQL_PASS, MYSQL_HOST, MYSQL_PORT, MYSQL_DATABASE)
            #dsn = "mysql://{username}:{password}@{hostname}:{port}/{database}"

        self._connect_database(dsn)

        # 禁用SQL日志记录。打开它进行调试.
        self.engine.echo = self.echo

        # 连接超时.
        self.engine.pool_timeout = MYSQL_TIMEOUT

        # 获取数据库会话.
        self.Session = sessionmaker(bind=self.engine)

        if create:
            self._create_tables()

    def _create_tables(self):
        """创建所有数据库表等."""
        try:
            Base.metadata.create_all(self.engine)
        except SQLAlchemyError as e:
            raise ("无法创建或连接到数据库: %s" % e)

        # 处理架构版本控制.
        # TODO: it's a little bit dirty, needs refactoring.
        tmp_session = self.Session()
        if not tmp_session.query(Version).count():
            # 设置数据库架构版本.
            tmp_session.add(Version(version_num=SCHEMA_VERSION))
            try:
                tmp_session.commit()
            except SQLAlchemyError as e:
                raise ("无法设置架构版本: %s" % e)
                tmp_session.rollback()
            finally:
                tmp_session.close()
        else:
            # 检查数据库版本是否为预期版本.
            last = tmp_session.query(Version).first()
            tmp_session.close()
            if last.version_num != SCHEMA_VERSION and self.schema_check:
                log.warning(
                    "数据库架构版本不匹配:找到 %s,应为 %s.",
                    last.version_num, SCHEMA_VERSION
                )
                log.error(
                    "(可选)进行备份,然后通过运行migrate应用最新的数据库迁移。"
                )
                sys.exit(1)

    def __del__(self):
        """断开连接池."""
        self.engine.dispose()

    def _connect_database(self, connection_string):
        """连接到数据库.
        @param connection_string: 指定数据库的连接字符串
        """
        try:
            if connection_string.startswith("sqlite"):
                # 使用“check_same_thread”在多个线程上禁用sqlite安全检查.
                self.engine = create_engine(connection_string, connect_args={"check_same_thread": False})
            elif connection_string.startswith("postgres"):
                # 禁用SSL模式以避免使用sqlalchemy和多进程时出现一些错误.
                # See: http://www.postgresql.org/docs/9.0/static/libpq-ssl.html#LIBPQ-SSL-SSLMODE-STATEMENTS
                # TODO 检查这是否仍然相关。特别是假设我们不再使用多处理.
                self.engine = create_engine(connection_string, connect_args={"sslmode": "disable"})
            else:
                self.engine = create_engine(connection_string)
        except ImportError as e:
            lib = str(e).split()[-1].strip("'")

            if lib == "MySQLdb":
                log.error(
                    "缺少MySQL数据库驱动程序(在Linux上使用 `pip install mysql-python` 安装,或在Windows上使用 `pip-install mysqlclient`)"
                )

            if lib == "psycopg2":
                log.error(
                    "缺少PostgreSQL数据库驱动程序 (使用 `pip install psycopg2`)"
                )

            log.error(
                "缺少未知的数据库驱动程序,无法导入 %s" % lib
            )
            sys.exit(-1)

    @classlock
    def add_user(self, file_size, md5, crc32, sha1, sha256, sha512, memory, ssdeep=None):
        session = self.Session()

        # 将空字符串和None值转换为有效的int
        # if not timeout:
        #     timeout = 0
        # if not priority:
        #     priority = 1
        #
        try:
            memory = parse_bool(memory)
        except ValueError:
            memory = False
        #
        # try:
        #     enforce_timeout = parse_bool(enforce_timeout)
        # except ValueError:
        #     enforce_timeout = False

        user = User()
        user.file_size = file_size
        user.md5 = md5
        user.crc32 = crc32
        user.sha1 = sha1
        user.sha256 = sha256
        user.sha512 = sha512
        user.memory = memory
        user.ssdeep = ssdeep

        session.add(user)

        try:
            session.commit()
        except SQLAlchemyError as e:
            log.error("数据库添加 user 错误: {0}".format(e))
            return False
        finally:
            session.close()
        return True

    @classlock
    def select_user(self, id=None):
        session = self.Session()

        try:
            search = session.query(User)

            if id:
                search = search.filter_by(id=id)

            # 排序
            # search = search.order_by(id)
            # search = search.order_by(desc(User.id))  倒叙

            tasks = search.all()

            return tasks
        except SQLAlchemyError as e:
            log.error("数据库查看所有 user 错误: {0}".format(e))
            return []
        finally:
            session.close()

    @classlock
    def update_user(self, id, new_file_size):
        session = self.Session()

        try:
            search = session.query(User).filter(User.id == id).first()
            search.file_size = new_file_size
            # session.query(User).filter(User.id == 1).update({'file_size': 'new_file_size'})

            session.commit()
        except SQLAlchemyError as e:
            log.error("数据库更新 user 错误: {0}".format(e))
            return False
        finally:
            session.close()
        return True

    @classlock
    def delete_user(self, id):
        session = self.Session()

        try:
            search = session.query(User).filter(User.id == id).first()
            if search:
                session.delete(search)
                session.commit()
        except SQLAlchemyError as e:
            log.error("数据库删除 user 错误: {0}".format(e))
            return False
        finally:
            session.close()
        return True

调用运行

merage.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import time
import logging

from core.database import Database

log = logging.getLogger("log")

class Merge(object):
    def __init__(self):
        db = Database()
        db.connect()

        res = db.add_user(32, "11", "22", "33", "44", "55", "off")
        if not res:
           print("添加错误")

        print(db.select_user())

        # res = db.update_user(1, 50)
        # if not res:
        #     print("更新错误")

        # res = db.delete_user(2)
        # if not res:
        #     print("删除错误")


if __name__ == '__main__':
    merge = Merge()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1080364.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

calc方法和vue中calc不生效踩坑

calc方法 calc()方法是css用来计算的,比如一个场景,上下固定高度,中间自适应,就可以使用这个方法。 预编译less也是可以使用这个方法的 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewp…

AR动态贴纸SDK,让创作更加生动有趣

在当今的社交媒体时代&#xff0c;视频已经成为了人们表达自我、分享生活的重要方式。然而&#xff0c;如何让你的视频在众多的信息中脱颖而出&#xff0c;吸引更多的关注和点赞呢&#xff1f;答案可能就在你的手中——美摄AR动态贴纸SDK。 美摄AR动态贴纸SDK是一款专为视频编辑…

设计模式02———建造者模式 c#

首先我们打开一个项目 在这个初始界面我们需要做一些准备工作 建基础通用包 创建一个Plane 重置后 缩放100倍 加一个颜色 更换天空盒&#xff08;个人喜好&#xff09; 任务&#xff1a;使用【UI】点击生成6种车零件组装不同类型车 【建造者模式】 首先资源商店下载车模型 将C…

C#实现五子棋小游戏:简单、有趣的编程项目

目录 引言什么是五子棋游戏规则开发环境准备安装C#开发环境选择合适的集成开发环境(IDE)游戏设计与功能分析游戏界面设计实现棋盘的绘制与操作实现落子功能实现输赢判断说明引言 什么是五子棋 五子棋是一种源于中国的传统棋类游戏,常见于中国、日本、韩国等亚洲国家,是亚洲…

c++视觉处理-----cv::findContours函数和图像进行去噪、平滑、边缘检测和轮廓检测,动态检测图形

cv::findContours cv::findContours 是OpenCV中用于查找图像中对象轮廓的函数。轮廓是对象的边界&#xff0c;通常用于对象检测、分割和形状分析。cv::findContours 函数的基本用法如下&#xff1a; cv::findContours(image, contours, hierarchy, mode, method, offset cv:…

单细胞分析+实验验证,多重buff加身,学会你也能发7分+。

今天给同学们分享一篇单细胞分析实验验证的生信文章“Construction of a hypoxia-immune-related prognostic panel based on integrated single-cell and bulk RNA sequencing analyses in gastric cancer”&#xff0c;这篇文章于2023年4月26日发表在Front Immunol 期刊上&am…

JUC并发编程(一):Java内存模型(JMM)及三大特性:可见性、有序性、原子性

1.简介 在当今高流量、高并发的互联网业务场景下&#xff0c;并发编程技术显得尤为重要&#xff0c;不管是哪一门编程语言&#xff0c;掌握并发编程技术是个人进阶的必经之路。时隔一个半月没有写技术博客文章&#xff0c;有点生疏了。。。闲话少叙&#xff0c;接下来我将围绕…

在使用nohup命令后台训练pytorch模型时,关闭ssh窗口导致的训练任务失败解决方法

下班前使用终端通过SSH登陆服务器&#xff0c;用nohup命令后台训练了一个pytorch模型。第二天来公司上班发现模型训练终止&#xff0c;报如下问题。 WARNING:torch.distributed.elastic.agent.server.api:Received 1 death signal, shutting down workers WARNING:torch.distr…

2023年陕西省安全员B证证考试题库及陕西省安全员B证试题解析

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年陕西省安全员B证证考试题库及陕西省安全员B证试题解析是安全生产模拟考试一点通结合&#xff08;安监局&#xff09;特种作业人员操作证考试大纲和&#xff08;质检局&#xff09;特种设备作业人员上岗证考试大…

2023年危险化学品经营单位主要负责人证考试题库及危险化学品经营单位主要负责人试题解析

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年危险化学品经营单位主要负责人证考试题库及危险化学品经营单位主要负责人试题解析是安全生产模拟考试一点通结合&#xff08;安监局&#xff09;特种作业人员操作证考试大纲和&#xff08;质检局&#xff09;特…

联想携中国移动打造车路协同方案 助力重庆实现32类车联网场景

10月11日&#xff0c;联想集团在中国移动全球合作伙伴大会上首次分享了与中国移动等合作伙伴共同打造的5G车路协同案例——重庆两江协同创新区车路协同应用。联想利用基于5G智能算力技术&#xff0c;在总里程55公里路段实现了32类车联网场景。 据了解&#xff0c;重庆两江协同创…

Spark任务优化分析

一、背景 首先需要掌握 Spark DAG、stage、task的相关概念 Spark的job、stage和task的机制论述 - 知乎 task数量和rdd 分区数相关 二、任务慢的原因分析 找到运行时间比较长的stage 再进去看里面的task 可以看到某个task 读取的数据量明显比其他task 较大。 如果是sql 任…

Python爬虫提高排名

在如今竞争激烈的互联网时代&#xff0c;网站的SEO优化变得尤为重要。而Python爬虫作为一种强大的工具&#xff0c;可以帮助网站主们提升搜索排名&#xff0c;吸引更多的流量和用户。本文将为您揭秘如何利用Python爬虫来改善您的SEO优化&#xff0c;并帮助您提升搜索排名。无论…

线性代数 --- 矩阵的QR分解,A=QR

矩阵的QR分解&#xff0c;格拉姆施密特过程的矩阵表示 首先先简单的回顾一下Gram-Schmidt正交化过程的核心思想&#xff0c;如何把一组线性无关的向量构造成一组标准正交向量&#xff0c;即&#xff0c;如何把矩阵A变成矩阵Q的过程。 给定一组线性无关的向量a,b,c&#xff0c;我…

2023年【危险化学品经营单位主要负责人】模拟考试及危险化学品经营单位主要负责人作业考试题库

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 危险化学品经营单位主要负责人模拟考试是安全生产模拟考试一点通总题库中生成的一套危险化学品经营单位主要负责人作业考试题库&#xff0c;安全生产模拟考试一点通上危险化学品经营单位主要负责人作业手机同步练习。…

支持在线状态检查的仪表板miniboard

什么是 miniboard &#xff1f; miniboard 是带有选项卡和在线状态检查的轻量级仪表板。可以通过 GUI 或 yaml 文件进行配置。 采用 shoutrrr 通知。 什么是 Shoutrrr &#xff1f; Shoutrrr 是一个类似 caronc/apprise 的通知库&#xff0c;支持多种通知服务。 安装 在群晖上…

矿物鉴定VR实践教学平台:打造全新的沉浸式学习体验

在科技的帮助下&#xff0c;我们的学习和培训方式正在发生着深刻的变化。其中&#xff0c;虚拟现实&#xff08;VR&#xff09;技术带来的沉浸式学习体验&#xff0c;为我们提供了一种全新的学习和实践方式。本文将详细介绍一款使用VR技术的教学工具——矿物鉴定VR实践教学平台…

国外无人机蜂群作战样式进展及反蜂群策略研究

源自&#xff1a;现代防御技术 作者&#xff1a;王瑞杰, 王得朝, 丰璐, 赵正党, 陈浙梁 摘 要 科技进步和军事需求的联合推动下&#xff0c;无人机蜂群作战成为一种新兴的并能够改变战争规则的颠覆性作战样式&#xff0c;各军事强国围绕“蜂群技术和战术”展开了激烈的竞争…

[GWCTF 2019]你的名字 - SSTI注入(waf绕过)

[GWCTF 2019]你的名字 1 解题流程1.1 分析1.2 解题 2 思考总结 1 解题流程 1.1 分析 1、页面只有一个输入框&#xff0c;输入什么回显什么 2、根据特性应该是SSTI注入 1.2 解题 fuzz&#xff1a;过滤则长度1512 过滤&#xff1a;{{}}、class、mro、builtins、file、func_gl…

Android Studio展示Activty生命周期

前言 本文章以及之后文章的程序版本使用Android Studio 2022.3.1 Patch 1 版本编辑&#xff0c;使用语言为java&#xff0c;最低支持API 27 Android 8.1&#xff0c;构建工具版本如下&#xff1a; 本文章主要是介绍Activty跳转和删除&#xff0c;以备后续使用&#xff0c;所以就…