Python与PostgreSQL的深度整合:CRUD操作全指南

news2024/12/28 6:13:06

Python与PostgreSQL的深度整合:CRUD操作全指南

1. 环境准备

1.1 安装必要的包

pip install sqlalchemy psycopg2-binary sqlmodel

1.2 数据库连接

from sqlalchemy import create_engine
from sqlmodel import Session, SQLModel

# 连接字符串格式
DATABASE_URL = "postgresql://username:password@localhost:5432/dbname"

# 创建引擎
engine = create_engine(DATABASE_URL)

# 创建所有表
SQLModel.metadata.create_all(engine)

2. 模型定义

2.1 基本模型

from datetime import datetime
from typing import Optional
from sqlmodel import SQLModel, Field

class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    username: str = Field(index=True)
    email: str = Field(unique=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)
    is_active: bool = Field(default=True)

2.2 关系模型

class Post(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    title: str
    content: str
    user_id: int = Field(foreign_key="user.id")
    created_at: datetime = Field(default_factory=datetime.utcnow)

3. CRUD 操作

3.1 创建操作 (Create)

def create_user(username: str, email: str) -> User:
    with Session(engine) as session:
        # 创建用户对象
        user = User(username=username, email=email)
        
        try:
            # 添加到会话
            session.add(user)
            # 提交事务
            session.commit()
            # 刷新对象(获取数据库生成的值)
            session.refresh(user)
            return user
        except Exception as e:
            session.rollback()
            raise e

# 使用示例
new_user = create_user("john_doe", "john@example.com")

3.2 查询操作 (Read)

from sqlmodel import select

def get_user_by_id(user_id: int) -> Optional[User]:
    with Session(engine) as session:
        return session.get(User, user_id)

def get_user_by_email(email: str) -> Optional[User]:
    with Session(engine) as session:
        statement = select(User).where(User.email == email)
        return session.exec(statement).first()

def get_all_users(skip: int = 0, limit: int = 100) -> list[User]:
    with Session(engine) as session:
        statement = select(User).offset(skip).limit(limit)
        return session.exec(statement).all()

# 复杂查询示例
def get_active_users_with_posts():
    with Session(engine) as session:
        statement = (
            select(User, Post)
            .join(Post)
            .where(User.is_active == True)
            .order_by(User.created_at.desc())
        )
        return session.exec(statement).all()

3.3 更新操作 (Update)

def update_user(user_id: int, **kwargs) -> Optional[User]:
    with Session(engine) as session:
        user = session.get(User, user_id)
        if not user:
            return None
        
        # 更新提供的字段
        for key, value in kwargs.items():
            if hasattr(user, key):
                setattr(user, key, value)
        
        try:
            session.add(user)
            session.commit()
            session.refresh(user)
            return user
        except Exception as e:
            session.rollback()
            raise e

# 使用示例
updated_user = update_user(1, email="newemail@example.com", is_active=False)

3.4 删除操作 (Delete)

def delete_user(user_id: int) -> bool:
    with Session(engine) as session:
        user = session.get(User, user_id)
        if not user:
            return False
        
        try:
            session.delete(user)
            session.commit()
            return True
        except Exception as e:
            session.rollback()
            raise e

4. 高级用法

4.1 事务管理

from sqlalchemy.orm import Session

def transfer_posts(from_user_id: int, to_user_id: int) -> bool:
    with Session(engine) as session:
        try:
            # 开始事务
            with session.begin():
                # 更新所有帖子的用户ID
                statement = (
                    update(Post)
                    .where(Post.user_id == from_user_id)
                    .values(user_id=to_user_id)
                )
                session.exec(statement)
                
                # 如果需要,可以在这里执行更多操作
                
            return True
        except Exception as e:
            print(f"Error: {e}")
            return False

4.2 批量操作

def bulk_create_users(users_data: list[dict]) -> list[User]:
    with Session(engine) as session:
        try:
            users = [User(**data) for data in users_data]
            session.add_all(users)
            session.commit()
            
            for user in users:
                session.refresh(user)
            
            return users
        except Exception as e:
            session.rollback()
            raise e

4.3 异步操作

from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine

# 异步数据库URL
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

async_engine = create_async_engine(ASYNC_DATABASE_URL)

async def async_get_user(user_id: int) -> Optional[User]:
    async with AsyncSession(async_engine) as session:
        statement = select(User).where(User.id == user_id)
        result = await session.exec(statement)
        return result.first()

5. 最佳实践

5.1 连接池配置

engine = create_engine(
    DATABASE_URL,
    pool_size=5,  # 连接池大小
    max_overflow=10,  # 超出pool_size后的最大连接数
    pool_timeout=30,  # 连接池获取连接的超时时间
    pool_recycle=1800,  # 连接重置时间(秒)
)

5.2 异常处理

from sqlalchemy.exc import IntegrityError, OperationalError

def safe_create_user(username: str, email: str) -> tuple[Optional[User], str]:
    try:
        user = create_user(username, email)
        return user, "Success"
    except IntegrityError:
        return None, "User with this email already exists"
    except OperationalError:
        return None, "Database connection error"
    except Exception as e:
        return None, f"Unexpected error: {str(e)}"

5.3 模型验证

from pydantic import EmailStr, validator

class UserCreate(SQLModel):
    username: str
    email: EmailStr
    
    @validator('username')
    def username_must_be_valid(cls, v):
        if len(v) < 3:
            raise ValueError('Username must be at least 3 characters long')
        return v

6. 性能优化

6.1 查询优化

# 使用 select_from 优化多表查询
def get_user_posts_optimized(user_id: int):
    with Session(engine) as session:
        statement = (
            select(User, Post)
            .select_from(User)
            .join(Post)
            .where(User.id == user_id)
            .options(selectinload(User.posts))
        )
        return session.exec(statement).all()

6.2 缓存实现

from functools import lru_cache

@lru_cache(maxsize=100)
def get_cached_user(user_id: int) -> Optional[User]:
    return get_user_by_id(user_id)

7. 调试和监控

7.1 SQL语句日志

import logging

logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

7.2 性能分析

from sqlalchemy import event

@event.listens_for(engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())

@event.listens_for(engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    total = time.time() - conn.info['query_start_time'].pop()
    print(f"Query took {total:.2f} seconds")

8. 部署注意事项

  1. 使用环境变量管理数据库连接信息
  2. 实现连接重试机制
  3. 正确配置连接池
  4. 实现数据库迁移策略
  5. 定期备份数据
  6. 监控数据库性能
  7. 实现错误报告机制

9. 常见问题解决

  1. 连接池耗尽
  2. 死锁处理
  3. 长事务管理
  4. 并发访问控制
  5. 大数据集处理
  6. 内存使用优化

10. 参考资源

  • SQLModel 官方文档
  • SQLAlchemy 官方文档
  • PostgreSQL 官方文档
  • Python 数据库最佳实践

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2266786.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C# 窗体应用程序嵌套web网页(基于谷歌浏览器内核)

有一个winform项目&#xff0c;需要借助一个web项目来显示&#xff0c;并且对web做一些操作,web页目是需要用谷歌内核&#xff0c;基于谷歌 Chromium项目的开源Web Browser控件来开发写了一个demo。 安装步骤 第一步&#xff1a;右键项目&#xff0c;点击 管理NuGet程序包 , 输…

SRA Toolkit简单使用(prefetch和fastq-dump)

工具下载网址&#xff1a; 01. 下载 SRA Toolkit ncbi/sra-tools 维基https://github.com/ncbi/sra-tools/wiki/01.-Downloading-SRA-Toolkit 我下载的是linux 3.0.10版&#xff0c;目前最新版如下&#xff1a;https://ftp-trace.ncbi.nlm.nih.gov/sra/sdk/3.1.1/sratoolkit.3…

Spring Boot介绍、入门案例、环境准备、POM文件解读

文章目录 1.Spring Boot(脚手架)2.微服务3.环境准备3.1创建SpringBoot项目3.2导入SpringBoot相关依赖3.3编写一个主程序&#xff1b;启动Spring Boot应用3.4编写相关的Controller、Service3.5运行主程序测试3.6简化部署 4.Hello World探究4.1POM文件4.1.1父项目4.1.2父项目的父…

【开源框架】从零到一:AutoGen Studio开源框架-UI层环境安装与智能体操作全攻略

一、什么是AutoGen AutoGen是微软推出的一款工具&#xff0c;旨在帮助开发者轻松创建基于大语言模型的复杂应用程序。在传统上&#xff0c;开发者需要具备设计、实施和优化工作流程的专业知识&#xff0c;而AutoGen则通过自动化这些流程&#xff0c;简化了搭建和优化的过程。 …

【论文阅读】MedCLIP: Contrastive Learning from Unpaired Medical Images and Text

【论文阅读】MedCLIP: Contrastive Learning from Unpaired Medical Images and Text 1.论文背景与动机2.MedCLIP的贡献3.提出的方法4.构建语义相似矩阵的过程5. 实验6. 结论与局限性 论文地址&#xff1a; pdf github地址&#xff1a;项目地址 Zifeng Wang, Zhenbang Wu, Di…

雷电模拟器安装Lxposed

雷电模拟器最新版支持Lxposed。记录一下安装过程 首先到官网下载并安装最新版&#xff0c;我安装的时候最新版是9.1.34.0&#xff0c;64位 然后开启root和系统文件读写 然后下载magisk-delta-6并安装 ,这个是吾爱破解论坛提供的&#xff0c;号称适配安卓7以上所有机型&#x…

全解:Redis RDB持久化和AOF持久化

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;…

VMware虚拟机安装银河麒麟操作系统KylinOS教程(超详细)

目录 引言1. 下载2. 安装 VMware2. 安装银河麒麟操作系统2.1 新建虚拟机2.2 安装操作系统2.3 网络配置 3. 安装VMTools 创作不易&#xff0c;禁止转载抄袭&#xff01;&#xff01;&#xff01;违者必究&#xff01;&#xff01;&#xff01; 创作不易&#xff0c;禁止转载抄袭…

HTML5实现喜庆的新年快乐网页源码

HTML5实现喜庆的新年快乐网页源码 前言一、设计来源1.1 主界面1.2 关于新年界面1.3 新年庆祝活动界面1.4 新年活动组织界面1.5 新年祝福订阅界面1.6 联系我们界面 二、效果和源码2.1 动态效果2.2 源代码 源码下载结束语 HTML5实现喜庆的新年快乐网页源码&#xff0c;春节新年网…

5G学习笔记之Non-Public Network

目录 0. NPN系列 1. 概述 2. SNPN 2.1 SNPN概述 2.2 SNPN架构 2.3 SNPN部署 2.3.1 完全独立 2.3.2 共享PLMN基站 2.3.3 共享PLMN基站和PLMN频谱 3. PNI-NPN 3.1 PNI-NPN概述 3.2 PNI-NPN部署 3.2.1 UPF独立 3.2.2 完全共享 0. NPN系列 1. NPN概述 2. NPN R18 3. 【SNPN系列】S…

大语言模型(LLM)中大数据的压缩存储及其重要性

在大型语言模型&#xff08;LLM&#xff09;中&#xff0c;KV Cache&#xff08;键值缓存&#xff09;的压缩方法及其重要性。 为什么要压缩KV Cache&#xff1f; 计算效率&#xff1a;在生成文本的过程中&#xff0c;每个生成的token都需要与之前所有的token的键值&#xff…

canvas之进度条

canvas之进度条 效果&#xff1a; 封装的组件 <template><div class"circle" :style"{ width: props.radius px, height: props.radius px }"><div class"circle-bg" :style"{ width: props.radius - 5 px, height: pr…

Boost之log日志使用

不讲理论&#xff0c;直接上在程序中可用代码&#xff1a; 一、引入Boost模块 开发环境&#xff1a;Visual Studio 2017 Boost库版本&#xff1a;1.68.0 安装方式&#xff1a;Nuget 安装命令&#xff1a; #只安装下面几个即可 Install-package boost -version 1.68.0 Install…

18.springcloud_openfeign之扩展组件二

文章目录 一、前言二、子容器默认组件FeignClientsConfigurationDecoder的注入Contract约定 对注解的支持对类上注解的支持对方法上注解的支持对参数上注解的支持MatrixVariablePathVariableRequestParamRequestHeaderSpringQueryMapRequestPartCookieValue FormattingConversi…

7-8 N皇后问题

目录 题目描述 输入格式: 输出格式: 输入样例: 输出样例: 解题思路&#xff1a; 详细代码&#xff08;dfs&#xff09;&#xff1a; 简单代码&#xff08;打表&#xff09;&#xff1a; 题目描述 在NN格的国际象棋盘上摆放N个皇后&#xff0c;使其不能互相攻击&#xff0c;即任…

现代网络负载均衡与代理导论

大家觉得有有参考意义和帮助记得及时关注和点赞&#xff01;&#xff01;&#xff01; Service mesh 是近两年网络、容器编排和微服务领域最火热的话题之一。Envoy 是目前 service mesh 数据平面的首选组件。Matt Klein 是 Envoy 的设计者和核心开发。 文章循序渐进&#xff0…

Kubernetes Gateway API-2-跨命名空间路由

1 跨命名空间路由 Gateway API 具有跨命名空间路由的核心支持。当多个用户或团队共享底层网络基础设施时,这很有用,但必须对控制和配置进行分段,以尽量减少访问和容错域。 Gateway 和 Route(HTTPRoute,TCPRoute,GRPCRoute) 可以部署到不同的命名空间中,路由可以跨命名空间…

Wend看源码-Java-集合学习(List)

摘要 本篇文章深入探讨了基于JDK 21版本的Java.util包中提供的多样化集合类型。在Java中集合共分类为三种数据结构&#xff1a;List、Set和Queue。本文将详细阐述这些数据类型的各自实现&#xff0c;并按照线程安全性进行分类&#xff0c;分别介绍非线程安全与线程安全的实现方…

集成方案 | Docusign + 蓝凌 EKP,打造一站式合同管理平台,实现无缝协作!

本文将详细介绍 Docusign 与蓝凌 EKP 的集成步骤及其效果&#xff0c;并通过实际应用场景来展示 Docusign 的强大集成能力&#xff0c;以证明 Docusign 集成功能的高效性和实用性。 在当今数字化办公环境中&#xff0c;企业对于提高工作效率和提升用户体验的需求日益迫切。蓝凌…

突围边缘:OpenAI开源实时嵌入式API,AI触角延伸至微观世界

当OpenAI宣布开源其名为openai-realtime-embedded-sdk的实时嵌入式API时&#xff0c;整个科技界都为之震惊。这一举动意味着&#xff0c;曾经遥不可及的强大AI能力&#xff0c;如今可以被嵌入到像ESP32这样的微型控制器中&#xff0c;真正地将AI的触角延伸到了物联网和边缘计算…