在设计 MMORPG(大规模多人在线角色扮演游戏)时,数据库系统是游戏架构中至关重要的一部分。数据库不仅承担了游戏中各种数据(如玩家数据、物品数据、游戏世界状态等)的存储和管理任务,还必须高效地支持并发访问、事务处理和复杂的查询。为了确保系统的可扩展性和维护性,我们需要对数据库操作进行封装和模块化设计。
为了实现上述目标,本文设计了一个基于 Twisted 的数据库封装系统。Twisted 是一个异步框架,适用于处理大量并发任务。结合 Twisted 和数据库连接池(adbapi.ConnectionPool),我们可以高效地执行异步数据库操作。
1. DatabaseError
作用:自定义异常类,用于在数据库操作发生错误时抛出详细的错误信息,包含错误消息和错误码(默认为500)。
2. DatabaseOperation
作用:抽象基类,定义了数据库操作的统一接口。所有具体的数据库操作类(如 SelectOperation, InsertOperation 等)都继承自此类,必须实现 excute 方法来执行数据库事务。
关键方法:
executeQuery:执行实际的 SQL 查询。
handleSuccess:操作成功时的回调函数。
handleFailure:操作失败时的回调函数。
3. SelectOperation
作用:继承自 DatabaseOperation,封装了 SELECT 查询操作。提供了 executeQuery 方法来执行 SQL 查询,并返回查询结果。
关键方法:
executeQuery:执行 SELECT 查询,并根据提供的表名、列名、查询条件等生成 SQL 语句。
excute:实现 DatabaseOperation 中的抽象方法,执行查询操作。
4. InsertOperation
作用:继承自 DatabaseOperation,封装了 INSERT 插入操作。通过 executeQuery 方法生成插入的 SQL 语句并执行。
关键方法:
executeQuery:构建并执行 INSERT SQL 语句,将数据插入指定的表。
excute:实现 DatabaseOperation 中的抽象方法,执行插入操作。
5. UpdateOperation
作用:继承自 DatabaseOperation,封装了 UPDATE 更新操作。通过 executeQuery 方法生成更新的 SQL 语句并执行。
关键方法:
executeQuery:构建并执行 UPDATE SQL 语句,用新值更新指定的行。
excute:实现 DatabaseOperation 中的抽象方法,执行更新操作。
6. DeleteOperation
作用:继承自 DatabaseOperation,封装了 DELETE 删除操作。通过 executeQuery 方法生成删除的 SQL 语句并执行。
关键方法:
executeQuery:构建并执行 DELETE SQL 语句,根据指定条件删除记录。
excute:实现 DatabaseOperation 中的抽象方法,执行删除操作。
7. DatabaseManager
作用:负责数据库连接池的管理和数据库操作的执行。它使用 adbapi.ConnectionPool 创建数据库连接池,执行操作并处理事务。
关键方法:
getConnection:返回数据库连接池的实例。
executeOperation:接受一个数据库操作对象,调用 runInteraction 方法来执行异步数据库事务,并处理操作成功或失败的回调。
8. GameQueryPlayerId
作用:继承自 SelectOperation,封装了查询玩家信息的操作。它指定查询条件为玩家的名称,并通过 excute 方法执行查询。
关键方法:
excute:执行 SelectOperation 中的 executeQuery 方法,查询玩家 ID。
set_query_name:设置查询的玩家名称。
代码
from twisted.enterprise import adbapi
from twisted.internet.defer import Deferred
import pymysql
import traceback
from twisted.internet import reactor
from functools import partial
from abc import ABC, abstractmethod
# 异常类定义
class DatabaseError(Exception):
def __init__(self, message, code=500):
self.message = message
self.code = code
# 抽象的数据库操作类
class IDatabaseOperation(ABC):
@abstractmethod
def executeQuery(self, txn, table: str, columns: list, values: dict = {}, condition: dict = None) -> any:
pass
def handleSuccess(self, result: any):
print("Operation succeeded with result:", result)
def handleFailure(self, error: Exception):
# 这里做一些额外的错误处理,比如记录日志或者返回友好的错误信息
print("Operation failed:", error)
@abstractmethod
def excute(self, txn):
#这里封装代码
pass
class ABC_SelectOperation(IDatabaseOperation):
def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
try:
column_str = ", ".join(columns) if columns else "*"
query = f"SELECT {column_str} FROM {table}"
if condition:
condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])
query += f" WHERE {condition_str}"
print(f"Executing query: {query}, with values: {tuple(condition.values()) if condition else ()}")
txn.execute(query, tuple(condition.values()) if condition else ()) # use condition values if any
result = txn.fetchall()
return result
except Exception as e:
print(f"Error executing query: {e}")
traceback.print_exc()
# 返回一个失败的结果以便继续后续操作
return {"error": str(e)}
@abstractmethod
def excute(self, txn):
pass
class ABC_InsertOperation(IDatabaseOperation):
def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
try:
column_str = ", ".join(columns)
placeholders = ", ".join(["%s"] * len(columns)) # Create placeholders based on column length
# Extract values from the dictionary for each column
value_tuple = tuple(values[col] for col in columns)
query = f"INSERT INTO {table} ({column_str}) VALUES ({placeholders})"
print(f"Executing insert query: {query}, with values: {value_tuple}")
txn.execute(query, value_tuple) # Use parameterized query
return txn.lastrowid # Return the inserted record ID
except Exception as e:
print(f"Error executing insert query: {e}")
traceback.print_exc()
# 返回一个失败的结果以便继续后续操作
return {"error": str(e)}
@abstractmethod
def excute(self, txn):
pass
class ABC_UpdateOperation(IDatabaseOperation):
def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
try:
if condition and not isinstance(condition, dict):
raise TypeError("Condition must be a dictionary")
set_str = ", ".join([f"{col} = %s" for col in columns])
query = f"UPDATE {table} SET {set_str}"
# Ensure that values is passed as a tuple for update
value_tuple = tuple(values[col] for col in columns)
if condition:
condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])
query += f" WHERE {condition_str}"
value_tuple += tuple(condition.values()) # Append condition values
print(f"Executing update query: {query}, with values: {value_tuple}")
txn.execute(query, value_tuple) # Use parameterized query
txn.connection.commit()
return txn.rowcount # Return the number of updated rows
except Exception as e:
print(f"Error executing update query: {e}")
traceback.print_exc()
# 返回一个失败的结果以便继续后续操作
return {"error": str(e)}
@abstractmethod
def excute(self, txn):
pass
class ABC_DeleteOperation(IDatabaseOperation):
def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
try:
if not condition:
raise ValueError("Condition for deletion cannot be empty.")
condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])
query = f"DELETE FROM {table} WHERE {condition_str}"
print(f"Executing delete query: {query}, with values: {tuple(condition.values())}")
txn.execute(query, tuple(condition.values())) # Use condition values for parameterized query
return txn.rowcount
except Exception as e:
print(f"Error executing delete query: {e}")
traceback.print_exc()
# 返回一个失败的结果以便继续后续操作
return {"error": str(e)}
@abstractmethod
def excute(self):
pass
# 数据库管理类,负责数据库连接池和事务
class DatabaseManager:
def __init__(self, db_config):
# 初始化数据库连接池
self.dbConnectionPool = adbapi.ConnectionPool("pymysql", **db_config)
def getConnection(self):
return self.dbConnectionPool
def executeOperation(self, operation: DatabaseOperation) -> Deferred:
try:
# 使用 partial 创建一个指定了参数的函数
deferred = self.dbConnectionPool.runInteraction(operation.excute)
deferred.addCallback(operation.handleSuccess)
deferred.addErrback(operation.handleFailure)
return deferred
except Exception as e:
error = DatabaseError(str(e), 500)
operation.handleFailure(error)
return None
# 示例数据库配置
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'root',
'database': 'test',
}
class GameQueryPlayerId(ABC_SelectOperation):
def __init__(self):
self.select_columns = ["name", "id"]
self.elect_condition = {"name": "new_name"} # 在此给出查询条件
def excute(self, txn):
# 执行查询操作
return self.executeQuery(txn,"test", columns=self.select_columns, condition=self.elect_condition )
def set_query_name(self, name):
self.elect_condition["name"] = name
# 示例操作
def main():
# 创建DatabaseManager实例
db_manager = DatabaseManager(db_config)
ccGameQueryPlayerId = GameQueryPlayerId()
ccGameQueryPlayerId.set_query_name("new_name")
deferred = db_manager.executeOperation(ccGameQueryPlayerId)
reactor.run()
if __name__ == "__main__":
main()