twisted实现MMORPG 游戏数据库操作封装设计与实现

news2025/2/15 19:45:14

在设计 MMORPG(大规模多人在线角色扮演游戏)时,数据库系统是游戏架构中至关重要的一部分。数据库不仅承担了游戏中各种数据(如玩家数据、物品数据、游戏世界状态等)的存储和管理任务,还必须高效地支持并发访问、事务处理和复杂的查询。为了确保系统的可扩展性和维护性,我们需要对数据库操作进行封装和模块化设计。

为了实现上述目标,本文设计了一个基于 Twisted 的数据库封装系统。Twisted 是一个异步框架,适用于处理大量并发任务。结合 Twisted 和数据库连接池(adbapi.ConnectionPool),我们可以高效地执行异步数据库操作。
在这里插入图片描述

在这里插入图片描述

1. DatabaseError
作用:自定义异常类,用于在数据库操作发生错误时抛出详细的错误信息,包含错误消息和错误码(默认为500)。

2. DatabaseOperation
作用:抽象基类,定义了数据库操作的统一接口。所有具体的数据库操作类(如 SelectOperation, InsertOperation 等)都继承自此类,必须实现 excute 方法来执行数据库事务。
关键方法:
executeQuery:执行实际的 SQL 查询。
handleSuccess:操作成功时的回调函数。
handleFailure:操作失败时的回调函数。

3. SelectOperation
作用:继承自 DatabaseOperation,封装了 SELECT 查询操作。提供了 executeQuery 方法来执行 SQL 查询,并返回查询结果。
关键方法:
executeQuery:执行 SELECT 查询,并根据提供的表名、列名、查询条件等生成 SQL 语句。
excute:实现 DatabaseOperation 中的抽象方法,执行查询操作。

4. InsertOperation
作用:继承自 DatabaseOperation,封装了 INSERT 插入操作。通过 executeQuery 方法生成插入的 SQL 语句并执行。
关键方法:
executeQuery:构建并执行 INSERT SQL 语句,将数据插入指定的表。
excute:实现 DatabaseOperation 中的抽象方法,执行插入操作。


5. UpdateOperation
作用:继承自 DatabaseOperation,封装了 UPDATE 更新操作。通过 executeQuery 方法生成更新的 SQL 语句并执行。
关键方法:
executeQuery:构建并执行 UPDATE SQL 语句,用新值更新指定的行。
excute:实现 DatabaseOperation 中的抽象方法,执行更新操作。


6. DeleteOperation
作用:继承自 DatabaseOperation,封装了 DELETE 删除操作。通过 executeQuery 方法生成删除的 SQL 语句并执行。
关键方法:
executeQuery:构建并执行 DELETE SQL 语句,根据指定条件删除记录。
excute:实现 DatabaseOperation 中的抽象方法,执行删除操作。
7. DatabaseManager
作用:负责数据库连接池的管理和数据库操作的执行。它使用 adbapi.ConnectionPool 创建数据库连接池,执行操作并处理事务。
关键方法:
getConnection:返回数据库连接池的实例。
executeOperation:接受一个数据库操作对象,调用 runInteraction 方法来执行异步数据库事务,并处理操作成功或失败的回调。
8. GameQueryPlayerId
作用:继承自 SelectOperation,封装了查询玩家信息的操作。它指定查询条件为玩家的名称,并通过 excute 方法执行查询。
关键方法:
excute:执行 SelectOperation 中的 executeQuery 方法,查询玩家 ID。
set_query_name:设置查询的玩家名称。

代码

from twisted.enterprise import adbapi
from twisted.internet.defer import Deferred
import pymysql
import traceback
from twisted.internet import reactor
from functools import partial
from abc import ABC, abstractmethod
# 异常类定义
class DatabaseError(Exception):
    def __init__(self, message, code=500):
        self.message = message
        self.code = code


# 抽象的数据库操作类
class IDatabaseOperation(ABC):
    @abstractmethod
    def executeQuery(self, txn, table: str, columns: list, values: dict = {}, condition: dict = None) -> any:
        pass

    def handleSuccess(self, result: any):
        print("Operation succeeded with result:", result)

    def handleFailure(self, error: Exception):
        # 这里做一些额外的错误处理,比如记录日志或者返回友好的错误信息
        print("Operation failed:", error)

    @abstractmethod
    def excute(self, txn):
        #这里封装代码
        pass

class ABC_SelectOperation(IDatabaseOperation):
    def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
        try:
            column_str = ", ".join(columns) if columns else "*"
            query = f"SELECT {column_str} FROM {table}"

            if condition:
                condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])
                query += f" WHERE {condition_str}"

            print(f"Executing query: {query}, with values: {tuple(condition.values()) if condition else ()}")
            txn.execute(query, tuple(condition.values()) if condition else ())  # use condition values if any
            result = txn.fetchall()
            return result
        except Exception as e:
            print(f"Error executing query: {e}")
            traceback.print_exc()
            # 返回一个失败的结果以便继续后续操作
            return {"error": str(e)}

    @abstractmethod
    def excute(self, txn):
        pass


class ABC_InsertOperation(IDatabaseOperation):
    def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
        try:
            column_str = ", ".join(columns)
            placeholders = ", ".join(["%s"] * len(columns))  # Create placeholders based on column length

            # Extract values from the dictionary for each column
            value_tuple = tuple(values[col] for col in columns)

            query = f"INSERT INTO {table} ({column_str}) VALUES ({placeholders})"

            print(f"Executing insert query: {query}, with values: {value_tuple}")
            txn.execute(query, value_tuple)  # Use parameterized query
            return txn.lastrowid  # Return the inserted record ID
        except Exception as e:
            print(f"Error executing insert query: {e}")
            traceback.print_exc()
            # 返回一个失败的结果以便继续后续操作
            return {"error": str(e)}

    @abstractmethod
    def excute(self, txn):
        pass

class ABC_UpdateOperation(IDatabaseOperation):
    def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
        try:
            if condition and not isinstance(condition, dict):
                raise TypeError("Condition must be a dictionary")

            set_str = ", ".join([f"{col} = %s" for col in columns])
            query = f"UPDATE {table} SET {set_str}"

            # Ensure that values is passed as a tuple for update
            value_tuple = tuple(values[col] for col in columns)

            if condition:
                condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])
                query += f" WHERE {condition_str}"
                value_tuple += tuple(condition.values())  # Append condition values

            print(f"Executing update query: {query}, with values: {value_tuple}")
            txn.execute(query, value_tuple)  # Use parameterized query
            txn.connection.commit()
            return txn.rowcount  # Return the number of updated rows
        except Exception as e:
            print(f"Error executing update query: {e}")
            traceback.print_exc()
            # 返回一个失败的结果以便继续后续操作
            return {"error": str(e)}

    @abstractmethod
    def excute(self, txn):
        pass


class ABC_DeleteOperation(IDatabaseOperation):
    def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
        try:
            if not condition:
                raise ValueError("Condition for deletion cannot be empty.")
            condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])
            query = f"DELETE FROM {table} WHERE {condition_str}"

            print(f"Executing delete query: {query}, with values: {tuple(condition.values())}")
            txn.execute(query, tuple(condition.values()))  # Use condition values for parameterized query
            return txn.rowcount
        except Exception as e:
            print(f"Error executing delete query: {e}")
            traceback.print_exc()
            # 返回一个失败的结果以便继续后续操作
            return {"error": str(e)}

    @abstractmethod
    def excute(self):
        pass



# 数据库管理类,负责数据库连接池和事务
class DatabaseManager:
    def __init__(self, db_config):
        # 初始化数据库连接池
        self.dbConnectionPool = adbapi.ConnectionPool("pymysql", **db_config)

    def getConnection(self):
        return self.dbConnectionPool

    def executeOperation(self, operation: DatabaseOperation) -> Deferred:
        try:
            # 使用 partial 创建一个指定了参数的函数

            deferred = self.dbConnectionPool.runInteraction(operation.excute)
            deferred.addCallback(operation.handleSuccess)
            deferred.addErrback(operation.handleFailure)
            return deferred
        except Exception as e:
            error = DatabaseError(str(e), 500)
            operation.handleFailure(error)
            return None


# 示例数据库配置
db_config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'root',
    'database': 'test',
}


class GameQueryPlayerId(ABC_SelectOperation):
    def __init__(self):
        self.select_columns = ["name", "id"]
        self.elect_condition = {"name": "new_name"}  # 在此给出查询条件
    def excute(self, txn):

        # 执行查询操作
        return self.executeQuery(txn,"test", columns=self.select_columns, condition=self.elect_condition )

    def set_query_name(self, name):
        self.elect_condition["name"] = name

# 示例操作
def main():

    # 创建DatabaseManager实例
    db_manager = DatabaseManager(db_config)
    ccGameQueryPlayerId = GameQueryPlayerId()
    ccGameQueryPlayerId.set_query_name("new_name")
    deferred = db_manager.executeOperation(ccGameQueryPlayerId)



    reactor.run()


if __name__ == "__main__":
    main()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2298889.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

电脑端调用摄像头拍照:从基础到实现

文章目录 1. 了解navigator.mediaDevices.getUserMedia API2. 创建 HTML 结构3. 编写 JavaScript 代码3.1 打开摄像头3.2 拍照 4. 完整代码5. 测试6. 注意事项及部署 在现代 Web 开发中,调用摄像头进行拍照是一个常见的功能,尤其是在需要用户上传头像、进…

部署 DeepSeek R1各个版本所需硬件配置清单

DeepSeek-R1 通过其卓越的推理性能和灵活的训练机制,在 2025 年的春节期间受到了广泛关注。 DeepSeek-R1 是一款高性能的 AI 推理模型,主要通过强化学习技术来增强模型在复杂任务场景下的推理能力。 在本地部署 DeepSeek-R1 时,尤其是完整的…

算法18(力扣136)只出现一次的数字

1、问题 给你一个 非空 整数数组 nums,除了某个元素只出现一次以外,其余每个元素均出现两次。找出那个只出现了一次的元素。 你必须设计并实现线性时间复杂度的算法来解决此问题,且该算法只使用常量额外空间。 2、示例 (1&…

SiliconCloud 支持deepseek,送2000w token

SiliconCloud SiliconCloud 邀请奖励持续进行,2000 万 Tokens 送不停! 邀请好友赚 2000 万 Tokens:每成功邀请一位新用户通过手机号码注册,您将获得 2000 万 Tokens;注册即送 2000 万 Tokens:受邀好友作为…

在nodejs中使用RabbitMQ(六)sharding消息分片

RabbitMQ 的分片插件(rabbitmq_sharding)允许将消息分布到多个队列中,这在消息量很大或处理速度要求高的情况下非常有用。分片功能通过将消息拆分到多个队列中来平衡负载,从而提升消息处理的吞吐量和可靠性。它能够在多个队列之间…

STM32 I2C通信协议说明

目录 背景 I2C协议 数据的有效性 I2C通信开始和停止条件 I2C数据传输 发送 响应 正常情况: 异常情况: 主机结束接收 写寄存器的标准流程 读寄存器的标准流程 仲裁机制 时钟同步 SDA线的仲裁 程序 背景 对单片机的三大通信中的I2C通信进…

Keysight E5071C (Agilent) 网络分析仪的特性和规格

安捷伦E5071C网络分析仪 Keysight E5071C网络分析仪 Keysight E5071C (Agilent) 网络分析仪的其他特性和规格包括: 宽动态范围:测试端口动态范围 > 123 dB(典型值) 快速测量速度:41 ms 全 2 端口校准,…

总结:如何在SpringBoot中使用https协议以及自签证书?

总结:如何在SpringBoot中使用https协议以及自签证书? 前提一:什么是http协议?前提二:什么是https协议?一生成自签证书二 将证书转换为PKCS12格式三 配置SpringBoot(1)修改配置文件&a…

基于SSM+uniapp的数学辅导小程序+LW示例参考

1.项目介绍 系统角色:管理员、普通用户功能模块:用户管理、学习中心、知识分类管理、学习周报管理、口算练习管理、试题管理、考试管理、错题本等技术选型:SSM,Vue(后端管理web),uniapp等测试环…

利用AI智能体创建云端文档知识库并集成第三方数据源(上)

许多开发者在管理和集成多种云端的数据源时经常面对各种各样的困难,所以希望能够构建一个聊天机器人来协调这些数据源,针对业务问题并提供全面的答案。本文介绍了一种解决方案,帮助大家开发一个能够从文档和数据库中回答查询的聊天机器人&…

聚铭网络入围2025年度江苏省政府采购信息安全设备协议供货名单

近日,2025年度江苏省党政机关、事业单位及团体组织信息安全设备框架协议采购项目入围结果公布。聚铭网络凭借自身专业实力和技术优势脱颖而出,成功入围22个分包。 此次采购项目是江苏省政府采购领域级别最高、覆盖面最广的项目之一。从资格评选到后期材料…

vue+springboot+webtrc+websocket实现双人音视频通话会议

前言 最近一些时间我有研究,如何实现一个视频会议功能,但是找了好多资料都不太理想,最终参考了一个文章 WebRTC实现双端音视频聊天(Vue3 SpringBoot) 只不过,它的实现效果里面只会播放本地的mp4视频文件&…

堡垒机调用xshell 无反应

安装sso_client 确认db_path.ini xhsell路径 如图调整为本机安装的路径即可。 实战问题: 操作完成之后 Chrome还是无法调用,使用360浏览器没问题。

python后端调用Deep Seek API

python后端调用Deep Seek API 需要依次下载 ●Ollama ●Deepseek R1 LLM模型 ●嵌入模型nomic-embed-text / bge-m3 ●AnythingLLM 参考教程: Deepseek R1打造本地化RAG知识库:安装部署使用详细教程 手把手教你:deepseek R1基于 AnythingLLM API 调用本地…

Easy系列PLC 线性变换功能块(模拟量相关功能块汇总)

线性转换函数S_RTR 线性转换函数S_RTR(SCL和ST代码)_线性函数的scl语言如何编写-CSDN博客文章浏览阅读440次。博客介绍了线性转换函数S_RTR,包括其在PLC中的应用,如何与工艺PID组合使用,以及在张力开环控制中的具体实践。还提到了函数的C99兼容性,并提供了S_RTR的功能块源…

【VB语言】EXCEL中VB宏的应用

【VB语言】EXCEL中VB宏的应用 文章目录 [TOC](文章目录) 前言一、EXCEL-VB1.实验过程2.代码 二、EXCEL-VB 生成.c.h文件1.实验过程2.代码 四、参考资料总结 前言 1.WPS-VB扩展包 提示:以下是本篇文章正文内容,下面案例可供参考 一、EXCEL-VB 1.实验过…

【人工智能】如何选择合适的大语言模型,是能否提高工作效率的关键!!!

DeepSeek R1入门指南 导读一、提示语差异1.1 指令侧重点不同1.2 语言风格差异1.3 知识运用引导不同 二、挑选原则2.1 模型选择2.2 提示语设计2.3 避免误区 结语 导读 大家好,很高兴又和大家见面啦!!! 在前面的内容中&#xff0c…

Unity使用反射进行Protobuf(CS/SC)协议,json格式

protobuf生成的协议,有挺多协议的.利用反射生成dto进行伪协议的响应 和 发送请求 应用场景: 请求(CS)_后端先写完了,前端还搞完时,可使用此请求,可自测 响应(SC)_可自行构建一个响应,对数据进行测试 // 请求 使用物品 CS message ReqUseItem{optional Opcodes MessageID1[def…

初学 mybatis

前言 回顾之前 不使用 mybatis 框架,我们是怎么通过Java 操作数据库的 "jdbc" 前提:使用maven 构建的项目 1 添加 关于jdbc 的依赖,以及辅助操作数据库的 commons-dubli jar包 截取 前后端项目 2 添加配置文件里面内容有&…

C语言进阶习题(4结构体)【1】通讯录的实现

目录 1.使用结构体实现通讯录功能2.思路3. 代码实现3.1 test.c3.2 contact.c3.3 contact.h 1.使用结构体实现通讯录功能 主要功能有:显示通讯录信息,增加通讯录中人的信息,删除通讯录中人的信息,查找通信录中信息,修改…