利用 DynamoDB 和 S3 结合 gzip 压缩,最大化存储玩家数据

news2024/10/6 10:30:21

前言

一些传统游戏架构中,采用 MySQL 存储玩家存档数据,利用分库分表分散单库单表的存储和性能压力,从而达到支持更多玩家的目的。随着数据量增长,数据表中 varchar 类型已经无法满足游戏中单字段的存储需求,而 blob 字段的应用对于这种架构下改造成本是最低的,因此一些游戏开始在最初设计的时候,数据库表结构就采用了 Blob 字段作为其玩家的游戏任务、道具等数据的存储。

Blob 字段在 MySQL 5.6 / 5.7 中存在 bug(MySQL Bugs: #96466),这个 bug 有概率导致数据库集群崩溃,造成数据丢失。即使在 MySQL 8.0 中,由于引擎本身设计的限制,在单表 20GB 以上,高频的更新就会导致数据库出现性能受限。并且随着表增大,性能问题会越来越明显。

随着当游戏业务爆发时增长的时候,传统关系型数据库在分库分表的时候,需要进行应用改造,同时存在一定的停机维护时间。而且这些扩展完成后,在游戏的夕阳期进行收缩也需要进行应用改造,这无疑对业务开发和基础运维的部门造成了很多额外的工作量。

DynamoDB 在应用到这个场景上是非常适用的。在业务发展任意阶段,都可以实现 0 停机的扩展,自动伸缩的特性。而且这一切对于应用层是完全透明的。同时在日常运维中也可以贴合业务负载进行动态扩缩容,从而进一步降低成本。

亚马逊云科技开发者社区为开发者们提供全球的开发技术资源。这里有技术文档、开发案例、技术专栏、培训视频、活动与竞赛等。帮助中国开发者对接世界最前沿技术,观点,和项目,并将中国优秀开发者或技术推荐给全球云社区。如果你还没有关注/收藏,看到这里请一定不要匆匆划过,点这里让它成为你的技术宝库!

概述

本文主要讲述在游戏场景下,根据 DynamoDB 的限制(每个项目都必须小于 400KB),在限制下尽可能存储更多的数据和当存储量超出限制时,扩展存储的最大化利用空间。重点描述如何利用 DynamoDB+S3 保存玩家存档中的大数据量属性,避免数据存在 S3 上后,在数据写入 S3 时,发生读取到 S3 旧存档的情况。同时利用 gzip 压缩减少数据大小,减少 IO 的开销提升性能。

架构图

实战编码

目标

  1. 所有数据保存前都进行 gzip 压缩,读取后都用 gzip 解压。
  2. S3 存储和 DynamoDB 的 binary 字段存储可以自适应。如果用户数据压缩后如果大于指定的值则写入 S3,否则直接保存到当前数据库项目中的字段。
  3. DynamoDB 项目读取的时候,解析解压后的字段,如果字符串以 s3:// 开头,则继续从 S3 中获取数据
  4. 设置 S3 读锁字段,判断当前状态是否正在写入 S3,以阻塞读进程。在每个项目需要写入 S3 前都会设置 read_lock为Ture,S3 写成功后则设置为 False。读取记录后,read_lock 是否为 True,如果是判断被阻塞,进程会等待一段时间后进行重试,直到重试次数超出指定的值。重试超时后,读进程会认为写进程可能由于某种原因导致写永远无法成功,于是会将 read_lock 设置成 False。

第一步:初始化环境参数

from time import sleep
import boto3
import gzip
import random
import json
import hashlib
import logging

# 写入 S3 的门槛,超过这个值数据会写入 S3,否则保存在数据库内,默认值 350KB
UPLOAD_TO_S3_THRESHOLD_BYTES = 358400
# 用户数据库保存的目标S3存储桶
USER_DATA_BUCKET = 'linyesh-user-data'
# 遇到 S3 有读锁,重新请求最大次数,超出次数限制锁会被自动清除
S3_READ_LOCK_RETRY_TIMES = 10
# 遇到 S3 有读锁,读请求重试间隔时间
S3_READ_RETRY_INTERVAL = 0.2

dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

参数说明

  • UPLOAD_TO_S3_THRESHOLD_BYTES:为字段最大的数据存储长度限制。单位为:字节数。由于 DynamoDB 一个项目(Item)数据大小限制为 400KB。我们除了数据存档中最大字段还必须预留一部分空间给其他字段,避免整个 Item 超出 400KB。
  • USER_DATA_BUCKET:S3 用于存储超出 400KB 后的玩家大字段数据。需要提前建好,具体步骤参考:创建存储桶
  • S3_READ_LOCK_RETRY_TIMES:限制当玩家在 S3 上的存档处在写入状态时候,读请求重试的次数。在项目处于读锁状态的时候,读进程会等待一段时间后重试。
  • S3_READ_RETRY_INTERVAL:读锁状态下,重试读的间隔时间,单位:秒。

注意:S3_READ_LOCK_RETRY_TIMES乘以S3_READ_RETRY_INTERVAL 的时间理论上必须小于S3存档上传时间的最大值,因此实际使用本文中的代码应该根据存档可能的大小来调整这 2 个参数。否则可能存档会有大概率会发生脏读的情况。

第二步:创建 DynamoDB 表

def create_tables():
    """
    创建表
    :return:
    """
    response = dynamodb.create_table(
        TableName='players',
        KeySchema=[
            {
                'AttributeName': 'username',
                'KeyType': 'HASH'
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'username',
                'AttributeType': 'S'
            }
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )

    # Wait until the table exists.
    response.wait_until_exists()

    # Print out some data about the table.
    logger.debug(response.item_count)

第三步:编写辅助逻辑

指数级回退函数

def run_with_backoff(function, retries=5, **function_parameters):
    base_backoff = 0.1  # base 100ms backoff
    max_backoff = 10  # sleep for maximum 10 seconds
    tries = 0
    while True:
        try:
            return function(function_parameters)
        except (ConnectionError, TimeoutError):
            if tries >= retries:
                raise
            backoff = min(max_backoff, base_backoff * (pow(2, tries) + random.random()))
            logger.debug(f"sleeping for {backoff:.2f}s")
            sleep(backoff)
            tries += 1

S3 路径判断函数

def is_s3_path(content):
    return content.startswith('s3://')

S3 文件获取

def get_s3_object(key):
    response = s3.get_object(Bucket=USER_DATA_BUCKET, Key=s3_key_generator(key))
    return response['Body']

检查大小超限

def check_threshold(current_size):
     return current_size > UPLOAD_TO_S3_THRESHOLD_BYTES

S3 Key 生成函数

这个函数可以将玩家的存档随机分配到 S3 桶下不同的 Prefix 中,这有利于提高 S3 中 IO 的性能。

def s3_key_generator(key):  
    s3_prefix = hashlib.md5((key).encode('utf-8')).hexdigest()[:8]  
    return s3_prefix + '/' + key 

文件上传到 S3

def upload_content_to_s3(obj_param):  
    s3_key = s3_key_generator(obj_param['key'])  
    try:  
        response = s3.put_object(  
            Body=obj_param['content_bytes'],  
            Bucket=USER_DATA_BUCKET,  
            Key=s3_key)  
        return "s3://%s/%s" % (USER_DATA_BUCKET, s3_key)  
    except Exception as e:  
        logger.error(e)  
        raise e  

第四步:编写主体逻辑

写入单个项目到 DynamoDB 数据库

def put_item(load_data):  
    gzip_data = gzip.compress(load_data)  # 压缩数据  
    logger.debug('压缩后大小%.2fKB,原始大小 %.2fKB,压缩率 %.2f%%' % (  
        len(gzip_data) / 1024.0,  
        len(load_data) / 1024.0,  
        100.0 * len(gzip_data) / len(load_data)))  
  
    table = dynamodb.Table('players')  
    player_username = 'player' + str(random.randint(1, 1000))  
    if check_threshold(len(gzip_data)):  
        try:  
            # 读锁保护  
            table.update_item(  
                Key={  
                    'username': player_username,  
                },  
                UpdateExpression="set read_lock = :read_lock",  
                ExpressionAttributeValues={  
                    ':read_lock': True,  
                },  
            )  
  
            # 写入数据到 S3  
            s3_path = run_with_backoff(upload_content_to_s3, key=player_username, content_bytes=gzip_data)  
            # 解除读锁保护,同时存储数据在 S3 上到路径  
            response = table.put_item(  
                Item={  
                    'username': player_username,  
                    'read_lock': False,  
                    'inventory': gzip.compress(s3_path.encode(encoding='utf-8', errors='strict')),  
                }  
            )  
            logger.debug('成功上传大纪录到S3,路径:%s' % s3_path)  
        except Exception as e:  
            logger.debug('存档失败')  
            logger.error(e)  
    else:  
        response = table.put_item(  
            Item={  
                'username': player_username,  
                'inventory': gzip_data,  
            }  
        )  
        logger.debug('成功上传纪录, username=%s' % player_username) 

读取数据库中一条玩家记录

def get_player_profile(uid):  
    """ 
    读取记录 
    :param uid: 玩家 id 
    :return: 
    """  
    table = dynamodb.Table('players')  
    player_name = 'player' + str(uid)  
  
    retry_count = 0  
    while True:  
        response = table.get_item(  
            Key={  
                'username': player_name,  
            }  
        )  
  
        if 'Item' not in response:  
            logger.error('Not Found')  
            return {}  
  
        item = response['Item']  
        # 检查读锁信息, 如果存在锁根据参数设置,间隔一段时间重新读取记录  
        if 'read_lock' in item and item['read_lock']:  
            retry_count += 1  
            logger.info('当前第%d次重试' % retry_count)  
            # 如果超时无法读取记录,则消除读锁,并重新读取记录  
            if retry_count < S3_READ_LOCK_RETRY_TIMES:  
                sleep(S3_READ_RETRY_INTERVAL)  
                continue  
            else:  
                table.update_item(  
                    Key={  
                        'username': player_name,  
                    },  
                    UpdateExpression="set read_lock = :read_lock",  
                    ExpressionAttributeValues={  
                        ':read_lock': False,  
                    },  
                )  
  
        inventory_bin = gzip.decompress(item['inventory'].value)  # 解压缩数据  
        inventory_str = inventory_bin.decode("utf-8")  
        if is_s3_path(inventory_str):  
            player_data = gzip.decompress(get_s3_object(player_name).read())  
            inventory_json = json.loads(player_data)  
        else:  
            inventory_json = json.loads(inventory_str)  
  
        user_profile = {**response['Item'], **{'inventory': inventory_json}}  
        return user_profile  

最后,编写测试逻辑

准备几个不同大小的 json 文件,观察写入数据库中的变化。

if __name__ == '__main__':  
    path_example = 'small.json'  
    # path_example = '500kb.json'  
    # path_example = '2MB.json'  
    with open(path_example, 'r') as load_f:  
        load_str = json.dumps(json.load(load_f))  
        test_data = load_str.encode(encoding='utf-8', errors='strict')  
    put_item(test_data)  
  
    # player_profile = get_player_profile(238)  
    # logger.info(player_profile)  

如果需要测试读锁,可以将数据库中单个项目的 read_lock 手动设置成 True,然后观察读取逻辑在这个过程中的变化。

总结

在本次测试中发现,json 格式的数据使用 gzip 后,压缩率约为 25% 左右,理论上我们可以把单个项目(item) 中可以存储最大约为 1.6MB 的数据项。即便有少量压缩后超过 400KB 的数据,也可以存储到 S3 上,仅在 DynamoDB 中存储元数据和大字段数据在 S3 上的路径。

gzip 会带来一些额外的计算和 IO 开销,但是这些开销主要会落在游戏服务器上,对于数据库来说反而减少了 IO 的开销。

在大多数场景下,玩家数据即便不压缩也很少会超过 400KB。这种情况下,建议可以尝试对比压缩启用和不启用两种场景的性能数据。以决定哪种方式更适合自己的游戏。

限制

对于存在单用户有高并发存档需求的游戏而言,以上设计中并未包含在数据存储在 S3 上后,出现并发写的场景考虑。如果有此场景的需求,需要一些应用逻辑或者架构调整。

本篇作者

林业

Amazon 解决方案架构师,负责基于 Amazon 的云计算方案的咨询与架构设计。拥有超过 14 年研发经验,曾打造千万级用户 APP,多项 Github 开源项目贡献者。在游戏、IOT、智慧城市、汽车、电商等多个领域都拥有丰富的实践经验。

文章来源:https://dev.amazoncloud.cn/column/article/630a281576658473a321ffeb?sc_medium=regulartraffic&amp;sc_campaign=crossplatform&amp;sc_channel=CSDN 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/555412.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构 -最短路径dijkstra(迪杰斯特拉)算法讲解及代码实现

迪杰斯特拉算法是一种广义的贪心算法&#xff0c;求出局部最优解&#xff0c;再去求全局最优解 图文讲解&#xff1a; 举例图&#xff1a;&#xff08;起始点为1&#xff09; 辅助数组&#xff1a; s&#xff1a;记录了目标顶点到其他顶点的最短路径是否求得&#xff08;求得…

代码调试技巧

目录 1.为什么要进行调试&#xff1f; 2.调试的基本步骤 3.关于Debug版本和Release版本 4.调试技巧 5.调试总结 我还是喜欢真实的世界&#xff0c;因为在那里&#xff0c;我可以通过自己的努力来改变残酷的现实 本专栏适用于有一定C语言基础并且还要继续学习的人 往期…

CryoEM - 冷冻电镜 CryoSPARC 软件的安装与环境配置

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://blog.csdn.net/caroline_wendy/article/details/130809095 CryoSPARC 软件是一种用于冷冻电镜数据处理的创新工具&#xff0c;可以快速、准确地重建生物分子的三维结构。CryoSPARC 软件…

【使用ChatGPT制作PPT】

内容目录 一、利用ChatGPT生成PPT内容1. 打开ChatGPT&#xff1a;2. 输入需求&#xff1a;3. 复制&#xff1a; 二、制作生成PPT1. 打开PPT制作网站&#xff1a;2. 左侧网页版-导入创建-粘贴Markdown内容-导入创建3. 自行更改副标题、演讲者、选择模板、演示及下载 一、利用Cha…

Linux网络——shell编程之iptables防火墙

Linux网络——shell编程之iptables防火墙 一、概述1.iptables2.netfilter 和 iptables的关系 二、iptables中的四表五链1.四表五链的关系2.四表3.五链 三、匹配顺序1.数据包到达防火墙的匹配流程2.规则链之间的匹配顺序3.规则链内的匹配顺序 四、iptables 防火墙的配置方法1.ip…

python tesseract-ocr + jTessBoxEditorFX 训练自定义字库

在使用tesseract-ocr进行字符识别时&#xff0c;我们使用了官方提供的字库&#xff0c;例如英文字库、中文字库&#xff0c;但这些字库并不一定能满足我们所有的需求。所以有些时候&#xff0c;我们就需要训练属于自己的自定义字库。废话少说&#xff0c;直接开干。 第一步&am…

联想首次展示全栈算力方案服务,品牌换新亮相

1、联想算力&#xff0c;第一次真正被所有人感知。 2、基于软硬服一体化的优势&#xff0c;联想打造了丰富多样的四维算力服务&#xff0c;即融合化、场景化、订阅化、绿色化&#xff0c;可以满足不同企业、不同行业的定制化需求。 5月20日&#xff0c;主题为“联想方案服务&am…

2023中兴软件类笔试

1.下列Python代码&#xff1a;将近似输出什么&#xff1f; import numpy as np print np.sqrt(6*np.sum(1/np.arange(1,1000000, dtypenp.float)**2))这段代码是用来计算圆周率的巴塞尔问题&#xff08;Basel problem&#xff09;的近似值&#xff0c;输出结果将近似为3.14159…

使用SMTP协议发送邮件

剧情介绍 今天心血来潮&#xff0c;学了一下Python3&#xff0c;里面有个章节是发送邮件&#xff0c;用示例里面的代码&#xff0c;运行后报错&#xff0c;然后记录一下问题是如何解决的&#xff0c;大家可以看一下&#xff0c;可以有效避坑。 SMTP协议介绍 SMTP&#xff08…

Mysql数据库备份 一天一次 保存最新五天 每天凌晨三点备份

Mysql数据库备份 一天一次 保存最新五天 每天凌晨一点三十备份 步骤一 先查看 sudo systemctl status crond 是否存在 不存在执行下面代码 sudo yum install cronie sudo systemctl start crond sudo systemctl enable crond sudo systemctl status crond 步骤二 Cd /home …

从零开始 Spring Boot 33:Null-safety

从零开始 Spring Boot 33&#xff1a;Null-safety 图源&#xff1a;简书 (jianshu.com) Null-safety&#xff08;null安全&#xff09;实际上是Java这个“古老”语言的历史包袱&#xff0c;很多新的语言&#xff08;比如go或kotlin&#xff09;在诞生起就在语言层面提供对null…

软件测试需要学习什么?好学吗?需要学多久?到底是报班好还是自学好?

目录 前言&#xff1a; 【文章的末尾给大家留下了大量的福利哦。】 一&#xff1a;软件测试好学吗&#xff1f;需要学习多久&#xff1f; 二&#xff1a;那么选择软件测试行业有什么优势呢&#xff1f; 三&#xff1a;再来说说大家最关心的——软件测试人员的薪资怎么样? …

Spring : XML配置 JavaBean

文章目录 前言一、xml 加载 Bean 对象总结XML加载Bean对象 前言 跟着大佬走&#xff01;&#xff01;&#xff01;&#xff01; https://github.com/DerekYRC/mini-spring 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、xml 加载 Bean 对象 大家先…

【C语言】数组名作函数参数

数组名作函数参数 引例思考例2通用性指针形参和数组形参几点说明 引例 在主函数中输入10个整数&#xff0c;并存入一个一维数组中&#xff1b;然后在被调函数中&#xff0c;将0号元素的值改为原值的10倍&#xff1b;最后在主函数中输出结果。 思路&#xff1a; 若想在被调函数…

10:00进去,10:05就出来了,这问的也太变态了···

从外包出来&#xff0c;没想到死在另一家厂子了。 自从加入这家公司&#xff0c;每天都在加班&#xff0c;钱倒是给的不少&#xff0c;所以也就忍了。没想到5月一纸通知&#xff0c;所有人不许加班&#xff0c;薪资直降30%&#xff0c;顿时有吃不起饭的赶脚。 好在有个兄弟内推…

SSM框架-SpringMVC

1. SpringMVC 1.1 Spring与Web环境集成 ApplicationContext应用上下文获取方式 应用上下文对象是通过new ClasspathXmlApplicationContext(spring配置文件) 方式获取的&#xff0c;但是每次从容器中获得Bean时都要编写new ClasspathXmlApplicationContext(spring配置文件) &…

ActiveMq消息队列

ActiveMq是一种开源的java程序&#xff0c;支持Java消息服务(JMS) 1.1 版本 一、持久化机制 1、KahaDB&#xff1a;5.4及之后版本&#xff0c;默认使用日志文件 activemq.xml默认使用KahaDB持久化存储&#xff0c;默认配置安装路径data目录下 <persistenceAdapter> …

Django框架之模板其他补充

本篇文章是对django框架模板内容的一些补充。包含注释、html转义和csrf内容。 目录 注释 单行注释 多行注释 HTML转义 Escape Safe Autoescape CSRF 防止csrf方式 表单中使用 ajax请求添加 注释 单行注释 语法&#xff1a;{# 注释内容 #} 示例&#xff1a; {# 注…

09 FPGA—利用状态机实现可乐售卖机(附代码)

1. 理论 FPGA 是并行执行的&#xff0c;如果我们想要处理具有前后顺序的事件&#xff0c;就需要引入状态机。举个例子&#xff0c;将人看成 FPGA ,我们可以在散步的时候听歌和聊天这是并行执行的&#xff0c;但一天的行程安排却是以时间段前后执行的。 状态机简写为 FSM&#…

java前后端分离有详细内容吗?

微服务架构java前后端分离都有哪些具体内容&#xff1f;目前&#xff0c;有不少客户朋友经常询问我们类似的问题。其实&#xff0c;在新的经济发展形势下&#xff0c;提质增效的低代码开发平台微服务架构早已成为不少新老客户的选择&#xff0c;它们不仅能提高办公协作效率&…