Redis主从复制流程

news2025/1/24 2:24:07

前言

Redis 支持部署多节点,然后按照 1:n 的方式构建主从集群,即一个主库、n 个从库。主从库之间会自动进行数据同步,但是只有主库同时允许读写,从库只允许读。
image.png
搭建主从复制集群的目的:

  • 从库可用于容灾备份
  • 从库可以分摊读流量
  • 提高服务可用性

开启主从复制,你需要关心的配置项有:

# 主库的IP和端口
replicaof <masterip> <masterport>

# 连接主库认证的用户名和密码
masteruser <username>
masterauth <master-password>

replica-read-only yes # 从库是否只读
repl-diskless-sync no # 是否开启无盘同步 直接基于Socket传输
repl-diskless-sync-delay 5 # 无盘同步的延迟时间
repl-diskless-load disabled # 是否开启无盘加载
repl-backlog-size 1mb # 主从复制积压缓冲区大小
repl-backlog-ttl 3600 # repl_backlog过期时间 主库一段时间后没有任何从库连接将会释放backlog

主库在运行时也可以通过命令手动将其配置为从库:

replicaof <host> <port>

数据同步流程

Redis 主从库之间的数据同步可以分为三个阶段:

PSYNC <replid> <offset>

1、全量同步:从库第一次连接到主库后,因为没有任何数据,所以需要做一次全量同步。因为不知道主库的 replid 和 offset,所以会发送PSYNC ? -1。主库会触发bgsave命令生成一份完整的 RDB 文件,然后通过 Socket 发送给从库。从库接收到 RDB 文件后,首先清空自己的数据库,防止数据污染,然后加载 RDB 文件恢复数据。在数据同步期间,主库仍会接收客户端发起的写命令,所以从库此时的数据还不是最新的。因此,主库在同步期间执行的所有写命令还会写一份到 replication buffer,然后一并发送给从库。

新版本 Redis 也支持无盘复制,主库生成的 RDB 数据不落盘,直接 Socket 发给从库,适用于网络带宽高、磁盘性能差的场景。

2、基于长连接的命令传播:第一次全量同步后,主从库之间的长连接会一直保持,主库执行的所有写命令都会发给从库,从库通过回放这些写命令来和主库保持数据一致。
3、断连后的增量同步:长连接如果因为网络原因断开了,从库的数据就又不是最新的了,如果再触发一次全量同步,会给主库增加很大压力。为了解决这个问题,Redis 会在主库开辟一块缓冲区 repl_backlog,主库在命令传播的同时也会写一份到 repl_backlog,断连后的从库恢复连接后,可以通过 repl_backlog 来做增量同步。
image.png
replid 也就是主库的 run_id,它是 Redis 实例的唯一标识,可以通过info命令查看:

127.0.0.1:6379> info
# Server
run_id:50e4d514a576a152541504c334fe0a2d446bf8f6

offset 是复制偏移量,它代表主从库之间数据同步的进度,从库的 offset 越接近主库数据就越新,也可以通过它来监控从库的数据同步延迟情况。

第一次数据同步时,因为从库没有数据,所以 offset 是写死的 -1,代表主库要传输一次全量 RDB 数据,之后从库就需要记录下自己同步的偏移量。为了避免从库因为网络问题断开连接收不到写命令,主库会开辟一块单独的复制积压缓冲区 repl_backlog,默认大小是 1MB,主库在传播写命令时,也会往 repl_backlog 写一份,等待从库恢复连接后可以直接增量同步数据。
如果从库断连时间太久,期间发生的写入量又很大,repl_backlog 就会膨胀的很大,非常占用内存。因此,repl_backlog 被设计成一个固定大小的环形缓冲区,Redis 会采用循环写的方式记录写命令,默认大小个人认为太保守了,你可以根据自己的需要适当调大一点。
repl_backlog 该设置多大合适呢?可以用公式计算一下:

repl_backlog_size = (每秒写入量 * 平均数据大小 - 网络带宽) * 2

假设你的 Redis 服务每秒要写入一万次,平均每次写入数据量在 1KB,网络带宽是 5MB,那么 repl_backlog 最少要 5MB,考虑到断连等一些特殊情况,建议再扩大一倍设为 10MB 比较合适。
repl_backlog 设置的太小会导致主库频繁触发全量同步,每次全量同步都要 fork 子进程生成 RDB 文件,这在一定程度上会影响主库性能,需要特别注意。

源码

给 Redis 从库设置新主库后,从库会和主库建立连接,然后发送PING命令确保主库是正常的,接着发送AUTH命令完成认证,再发送REPLCONF命令跟主库握手,告诉主库自己的一些信息。握手完成以后,从库会发送PSYNC命令给主库,主库回复是全量同步还是增量同步,如果是第一次连接那必然是全量同步,从库开始在本地创建临时文件用于接收 RDB 数据,接收完最后清空数据库,加载 RDB 文件。
image.png
从库整个数据同步的过程,也是一个状态机切换的过程,Redis 定义了一批状态:

typedef enum {
    REPL_STATE_NONE = 0, // 没有复制
    REPL_STATE_CONNECT, // 准备连接主库
    REPL_STATE_CONNECTING, // 连接中
    /* --- 握手环节 是有序的 --- */
    REPL_STATE_RECEIVE_PING_REPLY, // 等待主库回复PING
    REPL_STATE_SEND_HANDSHAKE, // 准备握手
    REPL_STATE_RECEIVE_AUTH_REPLY, // 等待主库回复AUTH
    REPL_STATE_RECEIVE_PORT_REPLY, // 等待主库回复REPLCONF
    REPL_STATE_RECEIVE_IP_REPLY, // 等待主库回复REPLCONF
    REPL_STATE_RECEIVE_CAPA_REPLY, // 等待主库回复REPLCONF
    REPL_STATE_SEND_PSYNC, // 准备发送PSYNC命令
    REPL_STATE_RECEIVE_PSYNC_REPLY, // 等待主库回复PSYNC
    REPL_STATE_TRANSFER, // 等待主库传输RDB数据
    REPL_STATE_CONNECTED, // 全量同步完成,正常连接中
} repl_state;

replicaofCommand()是从库处理 replicaof 命令的入口方法:

  • 集群模式下或故障转移时,命令是不支持调用的
  • 如果参数是no one,把自己切换为主库
  • 否则把自己切换为从库,连接新主库
void replicaofCommand(client *c) {
    // 集群模式和故障转移时 不支持
    if (server.cluster_enabled) {
        addReplyError(c,"REPLICAOF not allowed in cluster mode.");
        return;
    }
    if (server.failover_state != NO_FAILOVER) {
        addReplyError(c,"REPLICAOF not allowed while failing over.");
        return;
    }
    if (!strcasecmp(c->argv[1]->ptr,"no") &&
        !strcasecmp(c->argv[2]->ptr,"one")) {
        // 把自己升级为主库
        if (server.masterhost) {
            replicationUnsetMaster();
            sds client = catClientInfoString(sdsempty(),c);
            serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
                client);
            sdsfree(client);
        }
    } else {
        // 连接主库
        long port;

        if (c->flags & CLIENT_SLAVE)
        {
            // 已经是从库了
            addReplyError(c, "Command is not valid when client is a replica.");
            return;
        }
        // 第2个参数读取port
        if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
            return;
        // 已经指向了同一个主库,不做任何处理
        if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
            && server.masterport == port) {
            serverLog(LL_NOTICE,"REPLICAOF would result into synchronization "
                                "with the master we are already connected "
                                "with. No operation performed.");
            addReplySds(c,sdsnew("+OK Already connected to specified "
                                 "master\r\n"));
            return;
        }
        // 设置新的主库
        replicationSetMaster(c->argv[1]->ptr, port);
        sds client = catClientInfoString(sdsempty(),c);
        serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')",
            server.masterhost, server.masterport, client);
        sdsfree(client);
    }
    addReply(c,shared.ok);
}

replicationSetMaster()方法给从库设置主库:

  • 断开被阻塞的客户端连接,因为自己是从库了,阻塞在例如brpop命令的客户端已经没有意义了
  • 记录主库的 IP 端口等信息
  • 断开从库的从库连接,要求它们重新同步数据
  • 状态机改为:待连接主库
  • 连接主库
void replicationSetMaster(char *ip, int port) {
    int was_master = server.masterhost == NULL;
    sdsfree(server.masterhost);
    server.masterhost = NULL;
    if (server.master) {
        freeClient(server.master);
    }
    // 断开被阻塞的客户端连接,自己已经是从库了,阻塞在例如brpop命令的客户端已经没有意义了
    disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */

    // 设置新主库的 主机和端口
    server.masterhost = sdsnew(ip);
    server.masterport = port;

    // 断开当前从库的从库连接,要求重新同步新数据
    disconnectSlaves();

    // 状态机改为 待连接主库
    server.repl_state = REPL_STATE_CONNECT;
    // 连接主库
    connectWithMaster();
}

connectWithMaster()和主库连接连接:

  • 和主库建立连接
  • 注册可读事件:syncWithMaster
  • 状态机改为:主库连接中
int connectWithMaster(void) {
    // 建立连接 处理器是 syncWithMaster
    server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();
    if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,
                NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) {
        connClose(server.repl_transfer_s);
        server.repl_transfer_s = NULL;
        return C_ERR;
    }
    server.repl_transfer_lastio = server.unixtime;
    // 状态机改为 主库连接中
    server.repl_state = REPL_STATE_CONNECTING;
    serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
    return C_OK;
}

TCP 连接建立后,syncWithMaster()方法会被触发:

  • 发送PING,确保主库正常
  • 发送AUTH完成认证
  • 发送REPLCONF命令告诉主库自己的一些信息,例如:IP、端口、是否支持无盘复制
  • 发送PSYNC命令,判断全量同步还是增量同步
  • 如果是全量同步
    • 创建临时文件用于接收 RDB 数据
    • 注册可读事件:readSyncBulkPayload 接收 RDB 数据
void syncWithMaster(connection *conn) {
    
    ......握手前置处理......
    
    if (server.repl_state == REPL_STATE_SEND_PSYNC) {
        // 同步数据 只是发送PSYNC命令
        if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {
            err = sdsnew("Write error sending the PSYNC command.");
            abortFailover("Write error to failover target");
            goto write_error;
        }
        // 状态机改为 等待主库PSYNC回复
        server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
        return;
    }
    // 主库回复了PSYNC,读取结果
    psync_result = slaveTryPartialResynchronization(conn,1);
    if (psync_result == PSYNC_CONTINUE) {// 增量同步
        serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
        if (server.supervised_mode == SUPERVISED_SYSTEMD) {
            redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n");
        }
        return;
    }
    if (psync_result == PSYNC_NOT_SUPPORTED) {
        // 主库不支持PSYNC命令 降级发送SYNC命令
        serverLog(LL_NOTICE,"Retrying with SYNC...");
        if (connSyncWrite(conn,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
            serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
                strerror(errno));
            goto error;
        }
    }
    // 使用磁盘加载
    if (!useDisklessLoad()) {
        // 创建临时文件
        while(maxtries--) {
            snprintf(tmpfile,256,
                "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
            dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
            if (dfd != -1) break;
            sleep(1);
        }
        if (dfd == -1) {
            serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
            goto error;
        }
        // 记录临时文件和fd
        server.repl_transfer_tmpfile = zstrdup(tmpfile);
        server.repl_transfer_fd = dfd;
    }
    // 注册可读事件,接收RDB文件
    if (connSetReadHandler(conn, readSyncBulkPayload)
            == C_ERR)
    {
        char conninfo[CONN_INFO_LEN];
        serverLog(LL_WARNING,
            "Can't create readable event for SYNC: %s (%s)",
            strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));
        goto error;
    }
    // 状态机改为 等待主库传输RDB文件
    server.repl_state = REPL_STATE_TRANSFER;
    server.repl_transfer_size = -1;
    server.repl_transfer_read = 0;
    server.repl_transfer_last_fsync_off = 0;
    server.repl_transfer_lastio = server.unixtime;
    return;
}

slaveTryPartialResynchronization()方法尝试增量同步,能不能增量同步是主库来判断的,可能主库压根就不支持增量同步,也可以从库落后的太多,超过了积压缓冲区的大小,这种情况下也不得不执行全量同步。

  • 发送PSYNC命令,等待主库回复
  • 解析主库的回复,返回同步类型
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
    /**
     * read_reply
     * 0: 发送PSYNC命令
     * 1: 读取PSYNC回复结果
     */
    if (!read_reply) {
        if (server.cached_master) {
            // 缓存了主库的信息,直接发送 |PSYNC <replid> <offset>| 做增量同步
            psync_replid = server.cached_master->replid;
            snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
            serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
        } else {
            // 第一次连主库,发送 |PSYNC ? -1| 做全量同步
            serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
            psync_replid = "?";
            memcpy(psync_offset,"-1",3);
        }
        if (server.failover_state == FAILOVER_IN_PROGRESS) {
            reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,"FAILOVER",NULL);
        } else {
            // 发送PSYNC命令
            reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,NULL);
        }
        // 等待主库回复
        return PSYNC_WAIT_REPLY;
    }
    // 全量同步 主库回复 |+FULLRESYNC <master_replid> <offset>|
    if (!strncmp(reply,"+FULLRESYNC",11)) {
        .......
        return PSYNC_FULLRESYNC;
    }
    // 可以增量同步 主库回复|+CONTINUE|
    if (!strncmp(reply,"+CONTINUE",9)) {
        ......
    	// 增量同步
        replicationResurrectCachedMaster(conn);
        return PSYNC_CONTINUE;
    }
    if (!strncmp(reply,"-NOMASTERLINK",13) ||
        !strncmp(reply,"-LOADING",8))
    {
        // 稍后再试
        return PSYNC_TRY_LATER;
    }
    // 其它回复认为主库不支持PSYNC
    return PSYNC_NOT_SUPPORTED;
}

readSyncBulkPayload()用于接收主库发送的 RDB 数据,有两种格式:

  • 有盘复制:$<count>/r/n data,事先知道数据总长度
  • 无盘复制:$EOF:<XXX>\r\n data <XXX>,事先不知数据总长度,<XXX>是一个 40 字节长度的随机数分隔符
/**
 * 读取第一行信息
 * - 磁盘复制: $<count>/r/n数据
 * - 无盘复制: $EOF:<XXX>\r\n数据<XXX>
 */
if (server.repl_transfer_size == -1) {
    if (connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000) == -1) {
        goto error;
    }
    // 校验第一行回复
    if (buf[0] == '-') {
        goto error;
    } else if (buf[0] == '\0') {
        server.repl_transfer_lastio = server.unixtime;
        return;
    } else if (buf[0] != '$') {
        serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
        goto error;
    }
    // 无盘复制数据格式
    if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
        usemark = 1;
        memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
        memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
        /* Set any repl_transfer_size to avoid entering this code path
         * at the next call. */
        // 无盘复制,不知道数据总长度,随便设置个0,避免再次读取第一行
        server.repl_transfer_size = 0;
        serverLog(LL_NOTICE,
            "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s",
            use_diskless_load? "to parser":"to disk");
    } else {
        // 有盘复制
        usemark = 0;
        // 读取主库要传输的数据长度
        server.repl_transfer_size = strtol(buf+1,NULL,10);
    }
    return;
}

如果是走磁盘加载,从库会把接收到的数据写入磁盘,对应的文件描述符是repl_transfer_fd变量。接收数据时,每达到 8MB 就刷一次磁盘,避免最后一次性刷盘带来的延迟。

if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
    goto error;
}

if (server.repl_transfer_read >=
    server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
    off_t sync_size = server.repl_transfer_read -
                      server.repl_transfer_last_fsync_off;
    rdb_fsync_range(server.repl_transfer_fd,
        server.repl_transfer_last_fsync_off, sync_size);
    server.repl_transfer_last_fsync_off += sync_size;
}

最后清空本地数据库避免数据污染,然后加载接收到的 RDB 文件和主库数据保持一致。

// 清空数据库
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
// RDB临时文件强制刷盘
if (fsync(server.repl_transfer_fd) == -1) {
    cancelReplicationHandshake(1);
    return;
}
int old_rdb_fd = open(server.rdb_filename,O_RDONLY|O_NONBLOCK);
// 替换rdb文件名
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
    cancelReplicationHandshake(1);
    if (old_rdb_fd != -1) close(old_rdb_fd);
    return;
}
// 载入新的RDB文件
if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
    cancelReplicationHandshake(1);
    if (server.rdb_del_sync_files && allPersistenceDisabled()) {
        serverLog(LL_NOTICE,"Removing the RDB file obtained from "
                            "the master. This replica has persistence "
                            "disabled");
        bg_unlink(server.rdb_filename);
    }
    return;
}
// 状态机改为 已连接,后续做好命令传播即可
server.repl_state = REPL_STATE_CONNECTED;

全量同步完以后,后续主从库之间就是基于长连接的命令传播了,主库会在执行写命令时执行replicationFeedSlaves()方法把命令发送给从库:

void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    // 和上次数据库编号不一样,要先发送SELECT命令
    if (server.slaveseldb != dictid) {
        robj *selectcmd;    
        // SELECT命令追加到repl_backlog
        if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
        // 遍历从库连接,发送SELECT命令
        listRewind(slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;
            if (!canFeedReplicaReplBuffer(slave)) continue;
            addReply(slave,selectcmd);
        }
        if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
            decrRefCount(selectcmd);
    }
    // 实际命令追加到repl_backlog
    if (server.repl_backlog) {
        ......
    }
    //再把实际命令发送给所有从库
    listRewind(slaves,&li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;
        if (!canFeedReplicaReplBuffer(slave)) continue;
        addReplyArrayLen(slave,argc);
        for (j = 0; j < argc; j++)
            addReplyBulk(slave,argv[j]);
    }
}

总结

主从复制是 Redis 实现服务高可用的关键特性之一,主节点通过把写命令异步传播给从节点的方式来实现数据同步。同时,为了避免从库因为网络问题断开导致数据不一致,主库会开辟一块主从复制积压缓冲区 repl_backlog 缓存最近的写命令,待从库恢复连接后,可以直接走增量同步。为了避免缓冲区膨胀,repl_backlog 采用固定大小循环写的方式,一旦从库落后的太久,需要增量同步的日志被主库覆盖掉了,就不得不触发全量同步,因此建议线上可以适当调大缓冲区的大小。
从库第一次连接必须走全量同步,全量同步会影响主库的性能,所以单个 Redis 实例的内存最好控制在 4GB 左右,内存太大不单单执行 RDB 耗时,从库同步时间也会更加耗时,如果 4GB 无法满足业务需求,那么就部署分片集群。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1101887.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity3D 基础——使用 Mathf.SmoothDamp 函数制作相机的缓冲跟踪效果

使用 Mathf.SmoothDamp 函数制作相机的缓冲跟踪效果&#xff0c;让物体的移动不是那么僵硬&#xff0c;而是做减速的缓冲效果。将以下的脚本绑定在相机上&#xff0c;然后设定好 target 目标对象&#xff0c;即可看到相机的缓动效果。通过设定 smoothTime 的值&#xff0c;可以…

安达发|AI人工智能APS系统:工业4.0的智能引擎

在工业4.0的背景下&#xff0c;人工智能&#xff08;AI&#xff09;和APS智能排程软件已经成为智能工厂的标准配置。它们通过集成先进的信息技术、通信技术和物联网技术&#xff0c;实现了生产过程的智能化、自动化和数据化&#xff0c;从而提高了生产效率、降低了生产成本、提…

运行.sln 32/64位程序,启动不了,无法显示界面

32/64位启动不了&#xff0c;无法显示界面。后台有进程 》环境变量--轴控下&#xff0c;选择的“串口”选择了虚拟串口(比如&#xff1a;com3&#xff0c;是蓝牙串口)&#xff0c;或者因为当前机台以前虚拟过这个串口&#xff0c;所以存在。串口存在&#xff0c;就会去调用函数…

微信小程序----会议oa项目---首页

目录 什么是Flex弹性布局 图解 代码演示 flex弹性布局的特点 Flex 弹性布局的常见属性 flex-wrap属性 flex-flow属性 轮播图后台数据获取及组件使用 首页布局 什么是Flex弹性布局 Flex 弹性布局&#xff08;Flexbox&#xff09;是一种用于在容器中进行灵活排列和对齐元素的…

Java数据结构——应用DFS算法计算流程图下游节点(1)

问题描述&#xff1a; 前端在绘制流程图的时候&#xff0c;某些情况需要对某个节点之后的流程图进行折叠&#xff0c;因此需要得到某个节点的ID后&#xff0c;计算出这个ID下游之后的所有节点&#xff08;找到的节点&#xff0c;边也就找到了&#xff09; 已知条件&#xff1a…

【vscode编辑器插件】前端 php unity自用插件分享

文章目录 一篇一句前言前端vuegitphpunity后端其他待续完结 一篇一句 “思考是最困难的工作&#xff0c;这也许是为什么很少有人这样做。” - 亨利福特&#xff08;Henry Ford&#xff09; 前言 无论是什么语言&#xff0c;我都会选择使用vscode进行开发&#xff0c;我愿称v…

2023亿发智能数字化解决方案供应商,贵州一体化企业信息管理系统

企业数字化服务的解决方案是指运用数字技术对企业运营进行全方位的数字化升级和优化&#xff0c;提供以数字化服务为核 心的全面解决方案&#xff0c;解决企业在数字化转型过程中面临的技术和业务难题。 数字化服务解决方案的功能 在数字化时代的背景下&#xff0c;贵州企业的…

京东店铺商品评论数据采集,京东商品评论数据接口,京东API接口

京东店铺商品评论数据接口可以获取到商品ID&#xff0c;商品标题&#xff0c;商品优惠券&#xff0c;商品到手价&#xff0c;商品价格&#xff0c;商品优惠价&#xff0c;商品sku属性&#xff0c;商品图片&#xff0c;商品视频&#xff0c;商品sku属性图片&#xff0c;商品属性…

协同过滤电影推荐系统 计算机竞赛

文章目录 1 简介1 设计概要2 课题背景和目的3 协同过滤算法原理3.1 基于用户的协同过滤推荐算法实现原理3.1.1 步骤13.1.2 步骤23.1.3 步骤33.1.4 步骤4 4 系统实现4.1 开发环境4.2 系统功能描述4.3 系统数据流程4.3.1 用户端数据流程4.3.2 管理员端数据流程 4.4 系统功能设计 …

万界星空科技/生产制造管理MES系统/开源MES/免费MES

一、 开源系统概述&#xff1a; 万界星空科技免费MES、开源MES、商业开源MES、市面上最好的开源MES、MES源代码、免费MES、免费智能制造系统、免费排产系统、免费排班系统、免费质检系统、免费生产计划系统、免费仓库管理系统、免费出入库管理系统、免费可视化数字大屏。 万界…

centos 7.9 安装sshpass

1.作用 sshpass是一个用于非交互式SSH密码验证的实用程序。它可以用于自动输入密码以进行SSH登录&#xff0c;从而简化了自动化脚本和批处理作业中的SSH连接过程。 sshpass命令可以与ssh命令一起使用&#xff0c;通过在命令行中提供密码参数来执行远程命令。以下是一个示例命…

KVM/qemu安装UOS 直接让输入用户密码

错误信息 安装后出现&#xff1a; 1、点击刚刚建立的虚拟机最上角感叹号&#xff08;设备管理器&#xff09; ----新建硬件---输入----类型&#xff1a;【通用 USB Mouse】。 ----新建硬件---输入----类型&#xff1a;【通用 USB keyboard】。 2、在设备管理器中----新建硬…

全流程TOUGH系列软件应用

TOUGH系列软件是由美国劳伦斯伯克利实验室开发的&#xff0c;旨在解决非饱和带中地下水、热运移的通用模拟软件。和传统地下水模拟软件Feflow和Modflow不同&#xff0c;TOUGH系列软件采用模块化设计和有限积分差网格剖分方法&#xff0c;通过配合不同状态方程&#xff08;EOS模…

天锐绿盾透明加密、半透明加密、智能加密这三种不同加密模式的区别和适用场景——@德人合科技-公司内部核心文件数据、资料防止外泄系统

由于企事业单位海量的内部数据存储情况复杂&#xff0c;且不同公司、不同部门对于文件加密的需求各不相同&#xff0c;单一的加密系统无法满足多样化的加密需求。天锐绿盾企业加密系统提供多种不同的加密模式&#xff0c;包括透明加密、半透明加密和智能加密&#xff0c;用户可…

印尼禁令频出,Shopee该站也停止销售跨境商品

日前&#xff0c;Shopee印尼站已经正式停止销售来自海外或跨境卖家的商品&#xff0c;这一举措于10月4日开始施行。 Shopee印尼公共政策负责人Radityo Triatmojo表示&#xff0c;该举措系对印尼2023年第31号贸易部长条例&#xff08;Reg 31/2023&#xff09;的响应。该条例旨在…

Compose Canvas基础(2) 图形转换

Compose Canvas基础&#xff08;2&#xff09;图形转换 前言平移 translate缩放 scale旋转 rotate自定义绘图区域及绘制内边距inset组合转换 withTransform完整代码总结 上一篇文章 Compose Canvas基础&#xff08;1&#xff09; drawxxx方法 前言 阅读本文需要一定compose基…

1347. 制造字母异位词的最小步骤数 (中等,Counter)

闲来无事&#xff0c;今天多做一题 条件很宽&#xff0c;可以任意替换&#xff0c;且排列相同也可以所以只要统计每个字母在 s 中比在 t 中多出现的次数之和即可 class Solution:def minSteps(self, s: str, t: str) -> int:n [0] * 26for i in s:n[ord(i) - ord(a)] 1f…

通过商品ID查询天猫商品详情数据,可以拿到商品标题,商品价格,商品库存,商品销量,商品sku数据等,天猫API接口

通过商品ID查询天猫商品详情数据可以用淘宝开放平台的淘宝客商品详情查询接口&#xff08;taobao.tbk.item.info.get&#xff09;来完成。 首先需要申请一个淘宝开放平台的应用&#xff0c;并获取到App Key和App Secret&#xff0c;然后使用淘宝开放平台的淘宝客商品详情查询接…

常见的加密算法和类型

加密的类型有 对称加密算法 | 非对称加密算法 | hash算法 文章目录 对称加密算法非对称加密算法 (重点)hash加密算法 对称加密算法 对称加密算法 使用相同的密钥来进行加密和解密 数据通过密钥加密成密文 而密文也只能通过相同的密钥解密成数据 常见的对称加密算法 AES&…

洗地机什么牌子好用?洗地机排名

洗地机是如今清洁工作中非常重要的设备&#xff0c;它可以提高清洁效率&#xff0c;保持地面卫生&#xff0c;并减轻人力劳动的负担&#xff0c;市面上有许多不同品牌的洗地机&#xff0c;那么洗地机哪个牌子最好用呢?下面我们来介绍一下洗地机排名&#xff0c;并分析其热门型…