sylar高性能服务器-日志(P57-P60)内容记录

news2024/11/26 8:26:59

文章目录

    • P57-P60:序列化模块
      • Varint(编码)
      • Zigzag(压缩)
      • class ByteArray
        • Node(链表结构)
        • 成员变量
        • 构造函数
        • 写入
        • 读取
        • setPosition
        • addCapacity
      • 测试

P57-P60:序列化模块

​ 序列化模块通常用于将数据转换为可以在网络上传输的格式,或者将接收到的网络数据反序列化为程序内部可用的格式。这个模块可以帮助简化网络通信的数据处理过程,提高服务器的性能和可维护性。

sylar设计的的序列化模块底层使用链表的形式存储数据,这样可以节省内存,管理内存碎片。该模块支持所有基本类型数据的写入和读取,可以选择固定字节长度或可变字节长度写入,使用可变字节长度时,使用Zigzag将有符号整型压缩后再进行Varint编码,这样能够节省大量内存空间。再写入数据时,将数据写入链表的最后一个节点中,若写不下时,创建新的节点继续写入数据。

Varint(编码)

Varint是一种使用一个或多个字节序列化整数的方法,会把整数编码为变长字节。对于32位整型数据经过Varint编码后需要10个字节。在实际场景中小数字的使用率远远多于大数字,因此通过Varint编码对于大部分场景都可以起到很好的压缩效果。

Varint 的编码规则如下:

  1. 对于一个整数,用 7 位来存储整数的数值,最高位用来表示是否还有后续字节,1 表示还有,0 表示结束。
  2. 如果整数的数值小于 128,那么只需要一个字节就可以存储,直接将整数的数值存储在最低的 7 位中,最高位为 0。
  3. 如果整数的数值大于等于 128,那么需要多个字节来存储,每个字节的最高位都为 1,其余 7 位用来存储整数的数值。除了最后一个字节,其他字节的最高位为 1,表示还有后续字节。

举个例子

  • 整数300 --二进制–> 100101100
  • 从最低位(右边开始)按7位分组 10 0101100
  • 第一个字节:10101100
  • 第二个字节:0000010
  • 结果:0xAC 0x02

实现代码

void EncodeVarint(uint32_t value) {
    // 32位的数据类型最多需要5个字节,如果是64位则需要10个字节
    uint8_t temp[5];
    uint8_t i = 0;
    while(value > 127) {
        // 取低7位保持不变 将最高位设置为1,表示后续还有字节
        temp[i++] = (value & 0x7f) | 0x80;
        // 右移7位
        value >>= 7;
    }
    // 存储最后一个小于127的剩余值
    temp[i] = value;
    write(tmp, i);
}

(value & 0x7f):取低7位保持不变

| 0x80:将最高位设置为1,表示后续还有字节

 // value & 0x7f
 10101101 (173)
& 01111111 (0x7f)
-----------
  00101101
 // | 0x80
  00101101
| 10000000 (0x80)
-----------
  10101101

Zigzag(压缩)

Zigzag算法将有符号负整数转为正数,这样能够节省字节,因为负数的二进制位几乎全为1。

Zigzag编码将正数映射到偶数,负数映射到奇数。这种映射的目的是为了利用无符号整数的变长编码特性,使得负数和正数都能够通过相同的编码方式进行存储和传输。

编码

static uint64_t EncodeZigzag64(const uint64_t& v) {
    if(v < 0) {
        return ((uint64_t)(-v))  * 2 - 1;
    } else {
        return v * 2;
    }
}

解码

static int64_t DecodeZigzag64(const uint64_t& v) {
    return (v >> 1) ^ -(v & 1);
}

class ByteArray

Node(链表结构)

所有的数据都存在这个链表结构中,在堆区开辟内存存储在ptr中;size为一个节点大小,一般设置为一个页面大小4KB;next指向下一个节点。

struct Node {
    Node(size_t s);
    Node();
    ~Node();

    char* ptr;
    Node* next;
    size_t size;
};
成员变量
// 内存块的大小
size_t m_baseSize;
// 当前操作的位置
size_t m_position;
// 总容量
size_t m_capacity;
// 当前数据的大小
size_t m_size;
// 字节序,默认大端
int8_t m_endian;
// 当一个内存块指针
Node* m_root;
// 当前操作的内存块指针
Node* m_cur;
构造函数
ByteArray::ByteArray(size_t base_size)
    : m_baseSize(base_size)
    , m_position(0)
    , m_capacity(base_size)
    , m_size(0)
    , m_endian(SYLAR_BIG_ENDIAN)
    , m_root(new Node(base_size))
    , m_cur(m_root) {

}
写入

在一个ByteArray对象中,数据都是存储在独立的节点中,当一个节点存满时让指针指向下一个节点继续存储需要写入的数据。所以写入数据我们只需要知道

  • 写入数据大小
  • 当前节点中剩余可写入的容量
void ByteArray::write(const void* buf, size_t size) {
    // 如果写入数据为0,直接返回
    if(size == 0) {
        return;
    }

    // 根据传入的size设置写入这些数据需要的容量
    addCapacity(size);
    // 记录当前节点已经写入的大小
    size_t npos = m_position % m_baseSize;
    // 计算当前节点剩余的可写入空间大小。
    size_t ncap = m_cur->size - npos;
    // 记录当前要写入数据的偏移量
    size_t bpos = 0;

    // 只要还有数据待写入
    while(size > 0) {
        // 剩余容量足够
        if(ncap >= size) {
            // 将要写入的数据拷贝到当前节点中的缓冲区中。
            memcpy(m_cur->ptr + npos, (const char*)buf + bpos, size);
            // 如果当前节点已经写满了(当前节点的偏移量加上写入的数据大小等于当前节点的容量大小)
            if(m_cur->size == (npos + size)) {
                // 移向下一个节点
                m_cur = m_cur->next;
            }
            // 更新当前位置
            m_position += size;
            // 更新缓冲区的偏移量
            bpos += size;
            // 将剩余要写入的数据大小置为0,表示所有数据已经写入完毕
            size = 0;
        // 如果当前节点剩余的可写入空间小于要写入的数据大小
        } else { 
            // 将当前节点剩余的可写入空间大小的数据拷贝到当前节点的缓冲区中
            memcpy(m_cur->ptr + npos, (const char*)buf + bpos, ncap);
            // 更新当前位置
            m_position += ncap;
            // 更新缓冲区的偏移量
            bpos += ncap;
            // 更新剩余要写入的数据大小
            size -= ncap;
            // 当前节点已经满了,移动到下一个节点
            m_cur = m_cur->next;
            // 更新当前节点剩余的可写入空间大小
            ncap = m_cur->size;
            // 新的起点,起始为0
            npos = 0;
        }
    }
    // 更新 ByteArray 对象的大小 m_size,如果当前位置大于原来的大小,则更新为当前位置
    if(m_position > m_size) {
        m_size = m_position;
    }
}

举个例子

假设我们要往一个ByteArray对象中写入下列数据

void* buf = "Hello,World!";
size_t size = 12;
  1. 首先判断写入数据不为空
  2. 调用addCapacity函数确保对象有足够的容量存储要写入的数据
  3. 假设当前节点还未写入过数据,那么几个变量如下
    • npos = 0
    • ncap = 8 (假设一个节点8字节,意思我这个节点还可以写入8字节的数据)
    • bpos = 0,记录当前写入数据的偏移量,我们要写入的数据是存放在缓冲区中,它的作用就是告诉我们从缓冲区哪里开始把数据拿出来,一开始肯定从头开始读,所以初始化为0
  4. ncap = 0 < size = 12,所以进入else分支,因为一个节点只有8个字节容量,我们要写入的数据有12字节
  5. 从buf中获得8个字节的数据,更新当前节点中最后写入的位置m_position + 8,缓冲区的偏移量bpos + 8,剩余要写入的数据为12 - 8 = 4,(已写入[Hello,Wo])
  6. 当前节点已经满了,移动下一个节点,更新ncap剩余写入空间为8,新的写入起点npos = 0
  7. 然后才是进入if分支,把剩余的4个字节写入([rld!]),跳出循环
  8. 检查当前位置是否大于 m_size,显然是,因为当前位置为12,而原来的 m_size 为0,所以更新 m_size 为12。这个m_size就是整个Bytearray对象含有数据的总大小。
读取

上面写入看懂了,读取也是差不多的。

两个read版本的区别在于读取完毕后是否改变m_position的值,比如第一个read,如果当前位置在第3个节点,读完后可能就在第5个节点了,而第二个read就不会改变。

会改变m_position

void ByteArray::read(void* buf, size_t size) {
    // size是我们想要读取的数据长度,getReadSize()返回m_size - m_position,也就是在当期操作位置后面还有多少数据能读
    // 如果要读的数据长度已经超出了ByteArray对象剩余可读的数据长度,抛出错误
    if(size > getReadSize()) {
        throw std::out_of_range("not enough len");
    }

    // 获取当前节点的位置,m_position是一直累加的,所以要对每一个节点的大小取余才能获取在当前节点中的位置
    size_t npos = m_position % m_baseSize;
    // 剩余可读容量
    size_t ncap = m_cur->size - npos;
    // buf偏移量
    size_t bpos = 0;
    while(size > 0) {
        // 该节点剩余的位置比要读的数据多
        if(ncap >= size) {
            // 将剩余的数据都读到buf中
            memcpy((char*)buf + bpos, m_cur->ptr + npos, size);
            // 若正好读完这个节点,则跳到下一个节点,不做这一步也可以,下一次再读可以在else分支里面跳
            if(m_cur->size == (npos + size)) {
                m_cur = m_cur->next;
            }
            m_position += size;
            bpos += size;
            size = 0;
        // 该节点不够读的
        } else {
            // 有多少就读多少
            memcpy((char*)buf + bpos, m_cur->ptr + npos, ncap);
            m_position += ncap;
            bpos += ncap;
            size -= ncap;
            m_cur = m_cur->next;
            ncap = m_cur->size;
            npos = 0;
        }
    }
}

不改变m_position

void ByteArray::read(void* buf, size_t size, size_t position) const{
    if(size > getReadSize()) {
        throw std::out_of_range("not enough len");
    }

    size_t npos = position % m_baseSize;
    size_t ncap = m_cur->size - npos;
    size_t bpos = 0;
    Node* cur = m_cur;
    while(size > 0) {
        if(ncap >= size) {
            memcpy((char*)buf + bpos, cur->ptr + npos, size);
            if(cur->size == (npos + size)) {
                cur = cur->next;
            }
            position += size;
            bpos += size;
            size = 0;
        } else {
            memcpy((char*)buf + bpos, cur->ptr + npos, ncap);
            position += ncap;
            bpos += ncap;
            size -= ncap;
            cur = cur->next;
            ncap = cur->size;
            npos = 0;
        }
    }
}

setPosition
void ByteArray::setPosition(size_t v) {
    // 如果超出总大小,抛出异常
    if(v > m_size) {
        throw std::out_of_range("set_position out of range");
    }
    // 更新当前操作位置
    m_position = v;

    // 链表遍历到当前位置
    m_cur = m_root;
    while(v >= m_cur->size) {
        v -= m_cur->size;
        m_cur = m_cur->next;
    }
    // 如果在循环中找到了指定位置 v 的节点,并且指定位置 v 恰好等于当前节点的大小,
    // 则表示指定位置 v 在当前节点的末尾,需要将当前节点指针移动到下一个节点。
    if(v == m_cur->size) {
        m_cur = m_cur->next;
    }
}
addCapacity
void ByteArray::addCapacity(size_t size) {
    if(size == 0) {
        return;
    }
    
    // 剩余容量
    size_t old_cap = getCapacity();
    // 如果剩余容量足够则不需要增加
    if(old_cap >= size) {
        return;
    }

    // 需要扩充的数据大小
    size = size - old_cap;
    // 根据数据大小求得需要增加的节点大小
    size_t count = (size / m_baseSize) + (((size % m_baseSize) > old_cap) ? 1 : 0);
    // 遍历链表到末尾
    Node* tmp = m_root;
    while(tmp->next) {
        tmp = tmp->next;
    }
    // 第一个扩展的节点
    Node* first = NULL;
    for(size_t i = 0; i < count; ++ i) {
        tmp->next = new Node(m_baseSize);
        if(first == NULL) {
            first = tmp->next;
        }
        // tmp一直指向末尾
        tmp = tmp->next;
        // 扩大容量
        m_capacity += m_baseSize;
    }
    // 若剩余容量为0,则跳到下一个节点,才扩展的,什么数据都没有
    if(old_cap == 0) {
        m_cur = first;
    }
}

测试

代码

#include "../sylar/bytearray.h"
#include "../sylar/sylar.h"

static sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();

void test() {
#define XX(type, len, write_fun, read_fun, base_len) {\
    std::vector<type> vec; \
    for(int i = 0; i < len; ++i) { \
        vec.push_back(rand()); \
    } \
    sylar::ByteArray::ptr ba(new sylar::ByteArray(base_len)); \
    for(auto& i : vec) { \
        ba->write_fun(i); \
    } \
    ba->setPosition(0); \
    for(size_t i = 0; i < vec.size(); ++i) { \
        type v = ba->read_fun(); \
        SYLAR_ASSERT(v == vec[i]); \
    } \
    SYLAR_ASSERT(ba->getReadSize() == 0); \
    SYLAR_LOG_INFO(g_logger) << #write_fun "/" #read_fun \
                    " (" #type " ) len=" << len \
                    << " base_len=" << base_len \
                    << " size = " << ba->getSize(); \
}
    XX(int8_t, 10, writeFint8, readFint8, 1);
    XX(uint8_t, 10, writeFuint8, readFuint8, 1);
    XX(int16_t,  10, writeFint16,  readFint16, 1);
    XX(uint16_t, 10, writeFuint16, readFuint16, 1);
    XX(int32_t,  10, writeFint32,  readFint32, 1);
    XX(uint32_t, 10, writeFuint32, readFuint32, 1);
    XX(int64_t,  10, writeFint64,  readFint64, 1);
    XX(uint64_t, 10, writeFuint64, readFuint64, 1);

    XX(int32_t,  10, writeInt32,  readInt32, 1);
    XX(uint32_t, 10, writeUint32, readUint32, 1);
    XX(int64_t,  10, writeInt64,  readInt64, 1);
    XX(uint64_t, 10, writeUint64, readUint64, 1);
#undef XX

#define XX(type, len, write_fun, read_fun, base_len) {\
    std::vector<type> vec; \
    for(int i = 0; i < len; ++i) { \
        vec.push_back(rand()); \
    } \
    sylar::ByteArray::ptr ba(new sylar::ByteArray(base_len)); \
    for(auto& i : vec) { \
        ba->write_fun(i); \
    } \
    ba->setPosition(0); \
    for(size_t i = 0; i < vec.size(); ++i) { \
        type v = ba->read_fun(); \
        SYLAR_ASSERT(v == vec[i]); \
    } \
    SYLAR_ASSERT(ba->getReadSize() == 0); \
    SYLAR_LOG_INFO(g_logger) << #write_fun "/" #read_fun \
                    " (" #type " ) len=" << len \
                    << " base_len=" << base_len \
                    << " size=" << ba->getSize(); \
    ba->setPosition(0); \
    SYLAR_ASSERT(ba->writeToFile("/tmp/" #type "_" #len "-" #read_fun ".dat")); \
    sylar::ByteArray::ptr ba2(new sylar::ByteArray(base_len * 2)); \
    SYLAR_ASSERT(ba2->readFromFile("/tmp/" #type "_" #len "-" #read_fun ".dat")); \
    ba2->setPosition(0); \
    SYLAR_ASSERT(ba->toString() == ba2->toString()); \
    SYLAR_ASSERT(ba->getPosition() == 0); \
    SYLAR_ASSERT(ba2->getPosition() == 0); \
}
    XX(int8_t,  10, writeFint8, readFint8, 1);
    XX(uint8_t, 10, writeFuint8, readFuint8, 1);
    XX(int16_t,  10, writeFint16,  readFint16, 1);
    XX(uint16_t, 10, writeFuint16, readFuint16, 1);
    XX(int32_t,  10, writeFint32,  readFint32, 1);
    XX(uint32_t, 10, writeFuint32, readFuint32, 1);
    XX(int64_t,  10, writeFint64,  readFint64, 1);
    XX(uint64_t, 10, writeFuint64, readFuint64, 1);

    XX(int32_t,  10, writeInt32,  readInt32, 1);
    XX(uint32_t, 10, writeUint32, readUint32, 1);
    XX(int64_t,  10, writeInt64,  readInt64, 1);
    XX(uint64_t, 10, writeUint64, readUint64, 1);

#undef XX
}

int main(int argc, char** argv) {
    test();
    return 0;
}

结果

image-20240306195337649

image-20240306195609579

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1494098.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java EE】文件内容的读写⸺数据流

目录 &#x1f334;数据流的概念&#x1f338;数据流分类 &#x1f333;字节流的读写&#x1f338;InputStream&#xff08;从文件中读取字节内容)&#x1f33b;示例1&#x1f33b;示例2&#x1f33b;利用 Scanner 进行字符读取 &#x1f338;OutputStream(向文件中写内容&…

当CV遇上transformer(一)ViT模型

当CV遇上transformer(一)ViT模型 我们知道计算机视觉(Computer Vision)&#xff0c;主要包括图像分类、目标检测、图像分割等子任务。 自AlexNet被提出以来&#xff0c;CNN成为了计算机视觉领域的主流架构。CNN网络结构主要由卷积层、池化层以及全连接层3部分组成&#xff0c;其…

Uni-ControlNet: All-in-One Control toText-to-Image Diffusion Models——【论文笔记】

本文发表于NeurIPS 2023 项目官网&#xff1a;Uni-ControlNet: All-in-One Control to Text-to-Image Diffusion Models 一、Introduction 近两年来&#xff0c;扩散模型在图像合成任务中表现优异&#xff0c;尤其是文本到图像&#xff08;T2I&#xff09;扩散模型已成为合成高…

腾达路由器检测环境功能破解MISP基础

在虚拟机上用qemu运行腾达路由器的网站固件会遇到无法识别网络的问题&#xff0c;这篇主要是破解这个功能&#xff0c;使腾达路由器成功在虚拟机上运行&#xff0c;方便漏洞复现 本次用到的腾达路由器版本&#xff1a; https://www.tenda.com.cn/download/detail-3683.html下…

Python 开发图形界面程序

用 Python 语言开发图形界面的程序&#xff0c;有2种选择&#xff1a; Tkinter 基于Tk的Python库&#xff0c;这是Python官方采用的标准库&#xff0c;优点是作为Python标准库、稳定、发布程序较小&#xff0c;缺点是控件相对较少。 PySide2/PySide6 基于Qt 的Python库&#x…

玩家至上:竞技游戏设计如何满足现代玩家的需求?

文章目录 一、现代玩家需求分析二、以玩家体验为核心的游戏设计三、个性化与定制化服务四、强化社交互动与社区建设五、持续更新与优化《游戏力&#xff1a;竞技游戏设计实战教程》亮点编辑推荐内容简介目录获取方式 随着科技的飞速发展和游戏产业的不断壮大&#xff0c;现代玩…

软件测试之Web自动化测试

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 关注公众号【互联网杂货铺】&#xff0c;回复 1 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 一、自动化测试基本介绍 1、自动化测试概述&#xff1a; 什么…

Android布局优化之include、merge、ViewStub的使用,7年老Android一次坑爹的面试经历

前言 开发10年&#xff0c;老码农&#xff0c;曾经是爱奇艺架构 点击领取完整开源项目《安卓学习笔记总结最新移动架构视频大厂安卓面试真题项目实战源码讲义》 师&#xff0c;东芝集团高级工程师&#xff0c;三星架构师。5年之内频繁被辞退。内心拔凉拔凉的&#xff0c;在这五…

Android大厂高级面试题灵魂100问,带你彻底弄明白

“2020年技术没有成长&#xff0c;我今年一定要好好努力学习&#xff01;” “在现在这个公司都工作了3年了&#xff0c;一毛钱工资都没有涨…” “年前真倒霉&#xff0c;老板嫌我工资高&#xff0c;被优化了&#xff0c;年后又遇到了疫情&#xff0c;现在都还没有找到合适的工…

141.乐理基础-男声女声音域、模唱、记谱与实际音高等若干问题说明

上一个内容&#xff1a;140.乐理基础-音程的转位-CSDN博客 上一个内容练习的答案&#xff1a;红色箭头指向的是转为&#xff0c;比如第一个只要写成c低g高都是正确的&#xff0c;不一定非要和图中一样 首先在 12.音域、1C到底是那一组的C 里面写了人声的音域&#xff0c;大致默…

最强照片AI无损放大工具

使用人工智能的能力来放大图像&#xff0c;同时为惊人的结果添加自然的细节。 使用深度学习技术&#xff0c;A.I.GigaPixEL可以放大图像并填满其他调整大小的产品所遗漏的细节。 下载地址&#xff1a;最强照片AI无损放大工具.zip

dolphinscheduler试用(一)(边用边修bug。。。。create tenant error)

&#xff08;作者&#xff1a;陈玓玏&#xff09; 前提&#xff1a;部署好了dolphinscheduler&#xff0c;部署篇见https://blog.csdn.net/weixin_39750084/article/details/136306890?spm1001.2014.3001.5501 官方文档见&#xff1a;https://dolphinscheduler.apache.org/…

MyBatis操作数据库(SQL注入)

本文主要来讲解6大标签&#xff0c;以便更好的MyBatis操作数据库&#xff01; <if>标签<trim>标签<where>标签<set>标签<foreach>标签<include>标签 前提需求&#xff1a; MyBatis是一个持久层框架&#xff0c;和Spring没有任何关系&…

【LeetCode】升级打怪之路 Day 14:二叉树的遍历

今日题目&#xff1a; 144. 二叉树的前序遍历94. 二叉树的中序遍历145. 二叉树的后序遍历102. 二叉树的层序遍历107. 二叉树的层序遍历 II199. 二叉树的右视图637. 二叉树的层平均值429. N 叉树的层序遍历515. 在每个树行中找最大值116. 填充每个节点的下一个右侧节点指针117. …

Fiddler入门:下载、安装、配置、抓包、customize rules

一、fiddler下载安装 安装包下载链接&#xff1a;https://www.telerik.com/download/fiddler 随便选个用途&#xff0c;填写邮箱&#xff0c;地区选择China&#xff0c;勾选“I accept the Fiddler End User License Agreement”&#xff0c;点击“DownLoad for windows”&…

⭐每天一道leetcode:28.找出字符串中第一个匹配项的下标(简单;暴力解;KMP算法,有难度)

⭐今日份题目 给你两个字符串 haystack 和 needle &#xff0c;请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标&#xff08;下标从 0 开始&#xff09;。如果 needle 不是 haystack 的一部分&#xff0c;则返回 -1 。 示例1 输入&#xff1a;haystack &q…

3.6作业

作业要求&#xff1a;数据库操作的增、删、改 程序代码&#xff1a; #include<myhead.h> int main(int argc, const char *argv[]) {//定义数据库句柄指针sqlite3 * ppDb NULL;//打开数据库&#xff0c;如果数据库不存在&#xff0c;则创建数据库//将数据库句柄由参数…

移动开发:图像查看器

一、新建ImageViewer模块&#xff0c;添加p1-p9图片(注意mdpi后缀) 二、相关代码 1.MainActivity.java文件代码 package com.example.imageviewer;import androidx.appcompat.app.AppCompatActivity;import android.os.Bundle; import android.view.MotionEvent; import and…

Jacob使用教程--通过宏来寻找变量名

说明: 这里做个随比,参考资料请见前面的系列文章 问题展示: 对于一个操作,当我们不知道怎么利用jacob写代码时,而且网上也找不到,可以按照如下操作: 比如,我们要删除 word中的文本框 我们根本不知道文本框,这个变量叫什么,在Microsoft文档哪个父目录下面, 可以通过…

【MySQL】事务?隔离级别?锁?详解MySQL并发控制机制

目录 1.先理清一下概念 2.锁 2.1.分类 2.2.表锁 2.3.行锁&#xff08;MVCC&#xff09; 2.4.间隙锁 2.5.行锁变表锁 2.6.强制锁行 1.先理清一下概念 所谓并发控制指的是在对数据库进行并发操作时如何保证数据的一致性和正确性。在数据库中与并发控制相关的概念有如下几…