web服务端接收多用户并发上传同一文件,保证文件副本只存在一份(附go语言实现)

news2024/11/24 2:25:36

背景

对于一个文件服务器来说,对于同一文件,应该只保存一份在服务器上。基于这个原则,引发出本篇内容。

本篇仅阐述文件服务器在同一时间接收同一文件的并发问题,这种对于小体量的服务来说并不常见,但是最好还是要留意一下这种极端情况。

实现原理

常见的流程:数据库记录文件的基本属性:文件名、大小、哈希值、文件路径等,以哈希值作为唯一标志。当用户新上传文件时,先查询数据库,若已存在哈希值(客户端计算并传给服务端,客户端最常见的 spark-md5)相同的记录,则不保存文件,直接标记为上传成功,使用已存在的文件副本,即通常所说的秒传实现。

上述流程缺失的就是当数据库中不存在的文件,同一时间上传了多个相同文件时,如果不做处理,服务器上是会存在多个该文件副本。所以当一个用户上传文件时,可以将文件标记为锁定状态,其他用户若上传同一文件,需查看文件的锁定状态,待锁定解除后才能进行操作。

请添加图片描述

代码实现

文字表现力有点差,还是上代码吧!本例中的服务端使用的是 gogin 框架,仅简单模拟了同一文件并发上传的情况。

文件目录结构

	- go.mod
	- go.sum
	- hash_cache.go
	- main.go
	- spark-md5-min.js
	- upload.html

js 客户端

upload.html 选择文件后,可重复点击上传按钮测试并发,或者自己改下脚本。

在这里插入图片描述

<!DOCTYPE html>
<html>
<head>
    <title>文件上传</title>
    <script src="spark-md5.min.js"></script>
</head>
<body>
<h1>文件上传</h1>
<input id="file" type="file" name="file"/>
<button onclick="upload();">上传</button>

<script>
    var file_md5 = {};

    function upload() {
        if (!file_md5.md5) {
            alert("请先选择文件");
            return
        }

        var form = new FormData();
        form.append("md5", file_md5.md5);
        form.append("file", file_md5.file);
        var xhr = new XMLHttpRequest();
        var action = "/upload"; // 上传服务的接口地址
        xhr.open("POST", action);
        xhr.send(form); // 发送表单数据
        xhr.onreadystatechange = function () {
            if (xhr.readyState == 4 && xhr.status == 200) {
                var resultObj = JSON.parse(xhr.responseText);
                // 处理返回的数据......
                console.log(resultObj)
            }
        }
    }

    document.getElementById('file').addEventListener('change', function (event) {
        var blobSlice = File.prototype.slice || File.prototype.mozSlice || File.prototype.webkitSlice,
            file = this.files[0],
            chunkSize = 2097152,                             // Read in chunks of 2MB
            chunks = Math.ceil(file.size / chunkSize),
            currentChunk = 0,
            spark = new SparkMD5.ArrayBuffer(),
            fileReader = new FileReader();

        fileReader.onload = function (e) {
            console.log('read chunk nr', currentChunk + 1, 'of', chunks);
            spark.append(e.target.result);                   // Append array buffer
            currentChunk++;

            if (currentChunk < chunks) {
                loadNext();
            } else {
                console.log('finished loading');
                var md5=spark.end()
                console.info('computed hash', md5);  // Compute hash
                file_md5 = {file:file,md5:md5}
            }
        };

        fileReader.onerror = function () {
            console.warn('oops, something went wrong.');
        };

        function loadNext() {
            var start = currentChunk * chunkSize,
                end = ((start + chunkSize) >= file.size) ? file.size : start + chunkSize;

            fileReader.readAsArrayBuffer(blobSlice.call(file, start, end));
        }

        loadNext();
    });
</script>
</body>
</html>

请添加图片描述

go gin 服务端

main.go4780 为端口开放了一个 http 服务,可访问 http://127.0.0.1:4780/client/upload.html 来访问 html 页面。

为了模拟并发场景,服务端 /upload 接口中故意让其睡眠了 30s

package main

import (
	"crypto/md5"
	"embed"
	"encoding/hex"
	"errors"
	"fmt"
	"github.com/gin-gonic/gin"
	"io"
	"net/http"
	"os"
	"path/filepath"
	"runtime"
	"time"
)

//go:embed upload.html spark-md5.min.js
var client embed.FS

var hashCache = NewHashCache()

func main() {
	engine := gin.New()
	engine.StaticFS("/client", http.FS(client))
	engine.POST("/upload", doUpload)

	engine.Run(":4780")
}

func doUpload(c *gin.Context) {
	printMem("start")
	clientMd5 := c.PostForm("md5")

	// 查询是否有其他正在上传,若有,则等待其上传完毕,根据返回值来做判断
	if hashCache.Has(clientMd5) {
		info, er := hashCache.Wait(clientMd5)
		if er != nil {
			c.String(http.StatusInternalServerError, er.Error())
			return
		}
		if info.Err == nil {
			c.String(http.StatusOK, "上传成功: "+info.SavedPath)
			return
		}
		// 若是出错了,则继续接收
	}

	hashCache.Set(clientMd5)

	// 模拟并发,这里睡一下
	time.Sleep(time.Second * 30)

	savedPath, err := doSaveFile(c, clientMd5)
	if err != nil {
		hashCache.SetDone(clientMd5, "", err)
		c.String(http.StatusInternalServerError, err.Error())
		return
	}
	hashCache.SetDone(clientMd5, savedPath, nil)

	c.String(http.StatusOK, "上传成功: "+savedPath)
} 

func doSaveFile(c *gin.Context, clientMd5 string) (savedPath string, err error) {
	fh, err := c.FormFile("file")
	if err != nil {
		return
	}

	fn := fmt.Sprintf("%s_%d", fh.Filename, time.Now().UnixMilli())
	savedPath = filepath.Join("uploaded", fn)
	err = c.SaveUploadedFile(fh, savedPath)
	if err != nil {
		return
	}

	md5Str, err := getFileMd5(savedPath)
	if err != nil {
		return
	}

	if clientMd5 != md5Str {
		os.Remove(savedPath)
		err = errors.New("哈希不匹配")
		return
	}

	return
}

func getFileMd5(p string) (md5Str string, err error) {
	f, err := os.Open(p)
	if err != nil {
		return
	}
	defer f.Close()

	h := md5.New()
	_, err = io.Copy(h, f)
	if err != nil {
		return
	}
	md5Str = hex.EncodeToString(h.Sum(nil))
	return
}

func printMem(prefix string) {
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	fmt.Printf("%s: %d Kb\n", prefix, m.Alloc/1024)
}

hash_cache.go 维护了一个 map 用于判断是否有相同的文件上传。

package main

import (
	"errors"
	"sync"
)

type HashCache struct {
	mutex sync.RWMutex
	m     map[string]*HashCacheInfo
}

func NewHashCache() *HashCache {
	return &HashCache{
		m: make(map[string]*HashCacheInfo),
	}
}

type HashCacheInfo struct {
	Done      chan struct{}
	SavedPath string
	Err       error
}

func (this *HashCache) Set(md5Hash string) {
	this.mutex.Lock()
	defer this.mutex.Unlock()
	this.m[md5Hash] = &HashCacheInfo{
		Done: make(chan struct{}),
	}
}

func (this *HashCache) SetDone(md5Hash, savedPath string, err error) error {
	this.mutex.Lock()
	defer this.mutex.Unlock()
	data, ok := this.m[md5Hash]
	if !ok {
		return errors.New("no hash: " + md5Hash)
	}

	data.SavedPath = savedPath
	data.Err = err
	close(data.Done)

	delete(this.m, md5Hash)

	//这里的 data 不能直接释放,wait 那里需要用,垃圾收集器自己去回收吧

	return nil
}

func (this *HashCache) Has(md5Hash string) bool {
	this.mutex.RLock()
	defer this.mutex.RUnlock()
	_, has := this.m[md5Hash]
	return has
}

func (this *HashCache) Wait(md5Hash string) (info HashCacheInfo, err error) {
	this.mutex.RLock()
	data, ok := this.m[md5Hash]
	if !ok {
		this.mutex.RUnlock()
		err = errors.New("no hash: " + md5Hash)
		return
	}
	this.mutex.RUnlock()
	<-data.Done

	info = *data
	return
} 维护了一个 map 用于

服务端日志输出,可见每多提交一次请求,内存占用就会增加。
请添加图片描述

在此服务中,若是同一时间上传了大量相同的文件,会导致内存占用飙升(c.PostForm 解析 formdata 数据时,会将数据读入内存)。如果要解决该问题,需要自己去做数据的读取,如下:

doUpload1 方法的更改

package main

import (
	"crypto/md5"
	"embed"
	"encoding/hex"
	"errors"
	"fmt"
	"github.com/gin-gonic/gin"
	"io"
	"mime/multipart"
	"net/http"
	"os"
	"path/filepath"
	"runtime"
	"time"
)

//go:embed upload.html spark-md5.min.js
var client embed.FS

var hashCache = NewHashCache()

func main() {
	engine := gin.New()
	engine.StaticFS("/client", http.FS(client))
	engine.POST("/upload", doUpload1)

	engine.Run(":4780")
}

func doUpload(c *gin.Context) {
	printMem("start")
	clientMd5 := c.PostForm("md5")

	// 查询是否有其他正在上传,若有,则等待其上传完毕,根据返回值来做判断
	if hashCache.Has(clientMd5) {
		info, er := hashCache.Wait(clientMd5)
		if er != nil {
			c.String(http.StatusInternalServerError, er.Error())
			return
		}
		if info.Err == nil {
			c.String(http.StatusOK, "上传成功: "+info.SavedPath)
			return
		}
		// 若是出错了,则继续接收
	}

	hashCache.Set(clientMd5)

	// 模拟并发,这里睡一下
	//time.Sleep(time.Second * 30)

	savedPath, err := doSaveFile(c, clientMd5)
	if err != nil {
		hashCache.SetDone(clientMd5, "", err)
		c.String(http.StatusInternalServerError, err.Error())
		return
	}
	hashCache.SetDone(clientMd5, savedPath, nil)

	c.String(http.StatusOK, "上传成功: "+savedPath)
}

func doSaveFile(c *gin.Context, clientMd5 string) (savedPath string, err error) {
	fh, err := c.FormFile("file")
	if err != nil {
		return
	}

	fn := fmt.Sprintf("%s_%d", fh.Filename, time.Now().UnixMilli())
	savedPath = filepath.Join("uploaded", fn)
	err = c.SaveUploadedFile(fh, savedPath)
	if err != nil {
		return
	}

	md5Str, err := getFileMd5(savedPath)
	if err != nil {
		return
	}

	if clientMd5 != md5Str {
		os.Remove(savedPath)
		err = errors.New("哈希不匹配")
		return
	}

	return
}

func getFileMd5(p string) (md5Str string, err error) {
	f, err := os.Open(p)
	if err != nil {
		return
	}
	defer f.Close()

	h := md5.New()
	_, err = io.Copy(h, f)
	if err != nil {
		return
	}
	md5Str = hex.EncodeToString(h.Sum(nil))
	return
}

func printMem(prefix string) {
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	fmt.Printf("%s: %d Kb\n", prefix, m.Alloc/1024)
}

func doUpload1(c *gin.Context) {
	printMem("start")
	reader, err := c.Request.MultipartReader()
	if err != nil {
		c.String(http.StatusBadRequest, err.Error())
		return
	}
	clientMd5, err := readMd5(reader) // 读 md5
	if err != nil {
		c.String(http.StatusBadRequest, err.Error())
		return
	}

	// 查询是否有其他正在上传,若有,则等待其上传完毕,根据返回值来做判断
	if hashCache.Has(clientMd5) {
		info, er := hashCache.Wait(clientMd5)
		if er != nil {
			c.String(http.StatusInternalServerError, er.Error())
			return
		}
		if info.Err == nil {
			er = closeReaderParts(reader)
			if er != nil {
				c.String(http.StatusInternalServerError, er.Error())
			} else {
				c.String(http.StatusOK, "上传成功: "+info.SavedPath)
			}
			return
		}
	}

	hashCache.Set(clientMd5)

	// 模拟并发,这里睡一下
	time.Sleep(time.Second * 30)

	savedPath, err := saveFilePart(reader, clientMd5)
	hashCache.SetDone(clientMd5, savedPath, err)
	if err != nil {
		c.String(http.StatusInternalServerError, err.Error())
		return
	}

	c.String(http.StatusOK, "上传成功: "+savedPath)
}

func readMd5(reader *multipart.Reader) (md5Hash string, err error) {
	part, err := reader.NextPart() // 读 md5
	if err != nil {
		return
	}
	name := part.FormName()
	if name != "md5" {
		err = errors.New("first key is not match")
		return
	}
	buf, err := io.ReadAll(part)
	if err != nil {
		return
	}
	md5Hash = string(buf)
	return
}

func closeReaderParts(reader *multipart.Reader) (err error) {
	for {
		p, er := reader.NextPart()
		if er == io.EOF {
			break
		}
		if er != nil {
			err = er
			return
		}
		p.Close()
	}
	return
}

func saveFilePart(reader *multipart.Reader, clientMd5 string) (fp string, err error) {
	part, err := reader.NextPart() // 读 file
	if err != nil {
		return
	}
	name := part.FormName()
	if name != "file" {
		err = errors.New("key not match")
		return
	}
	fn := fmt.Sprintf("%s_%d", part.FileName(), time.Now().UnixMilli())
	fp = filepath.Join("uploaded", fn)
	f, err := os.Create(fp)
	if err != nil {
		return
	}
	defer f.Close()
	_, err = io.Copy(f, part)
	if err != nil {
		return
	}

	md5Str, err := getFileMd5(fp)
	if err != nil {
		return
	}

	if clientMd5 != md5Str {
		os.Remove(fp)
		err = errors.New("哈希不匹配")
		return
	}

	return

	return
}

服务端日志输出,可见内存已不像之前消耗的多了。
请添加图片描述

这里需要注意:客户端在对 formdata 中添加数据时,需要将 md5 放在第一位,不然逻辑会出错。还有一点就是服务端若是不将请求的 body 数据读完(closeReaderParts 就是做这个的),直接将 api 返回,也会导致 js 客户端请求出错(这点我用 go 的客户端测试是不会有问题的,应该是浏览器实现原因,有知晓的小伙伴可以评论留言)。

其他的实现方式,也可以将 md5 放在请求 url 中(http://127.0.0.1:4780/client/upload?md5=xxx),然后做匹配(这里也像上述一样,如果请求的 body 不读完,客户端会报错)。

总结

本篇只是给个思路,抛砖引玉,介绍了如何实现客户端和服务器端的并发上传控制。通过示例代码,能够确保在并发上传时服务器中只存在一份文件副本。

在实际的生产环境中,可能需要进一步优化和增强这些代码,以满足性能、安全性和可靠性方面的需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/719562.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

存储协议——FC协议讲解

目录 FC基础概念 FC协议结构 FC通信 FC交换网络工作流程&#xff1a;&#xff08;以封装SCSI协议为例&#xff09; FC拓扑结构 FC协议的端口类型 FC适配器&#xff08;FC HBA卡&#xff09; FC基础概念 FC最开始为一种传输协议&#xff0c;由于其性能较高&#xff0c;逐…

我的小流量“转正”心得 --- 下载下方深度语义重排的实践

目录 一、背景 二、通过数据分析找到的问题 三、迭代流程 迭代一&#xff1a; 迭代二&#xff1a; 迭代三&#xff1a; 迭代成功的原因&#xff1a; 知识扩展 四、hnswlib调优过程 五、附录 5.1 hnsw 超参选择 一、背景 在分发中下载带来的收入占比排列仅次于搜索。…

重磅|2024年浙大MPA提前批面试政策公布:申请三步走

说曹操曹操到&#xff01;昨天还在说浙大MPA提面吃迟迟未公布的事情&#xff0c;晚些时候就来了&#xff01;等待许久的MPA考生们可以开始着手筹划自己的提面备考了&#xff01;提前批面试真题周期较长&#xff0c;但是需要做准备的内容确实也不少&#xff0c;本期专注浙大的杭…

如何区分bin log 、redo log 跟 undo log?

概要 MySQL 日志包含了错误日志、查询日志、慢查询日志、事务日志、二进制日志等&#xff0c;如果存储引擎使用的是 InnoDB &#xff0c;二进制日志(binlog)和事务日志(包括redo log和undo log) 是肯定绕不过去的&#xff0c;本篇接下来详细为大家介绍这三种日志。 redo log 为…

Android OpenGL ES实现简单绿幕抠图

目录 正文 OES FilterBlendShader Filter最后的效果缺陷 正文 实现绿幕抠图&#xff0c;其实想法很简单。 这里简单粗暴的使用着色器替换。 OES Filter 直接实现在相机预览上的Shader ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #extension GL_OE…

Spring Boot 中的 Sleuth 是什么, 如何使用

Spring Boot 是一个非常流行的 Java Web 开发框架&#xff0c;它提供了许多方便的功能&#xff0c;其中之一就是 Sleuth。Sleuth 是一个分布式跟踪系统&#xff0c;用于跟踪应用程序中的请求和操作。在本文中&#xff0c;我们将探讨 Spring Boot 中的 Sleuth 是什么&#xff0c…

git 新建分支,切换分支,上传到远程分支

git 在使用的过程中&#xff0c;有的时候我们需要更换一个分支才存贮数据&#xff0c;作为版本的一个迭代或者是阶段性成果的一个里程碑。 如何来做操作呢&#xff1f; 在git中&#xff0c;可利用checkout命令转换分支&#xff0c;该命令的作用就是切换分支或恢复工作树文件&a…

Linux串口应用编程——STM32MP157

文章目录 替换设备树文件串口API设置行规程struct termios 结构体行规程函数 串口应用——回环 替换设备树文件 挂载boot分区&#xff1a; mount /dev/mmcblk2 /boot拷贝新的设备树文件到boot分区 cp /mnt/stm32mp157c-100ask-512d-lcd-v1.dtb /bootreboot重启&#xff0c;查…

C++ set和map使用

搜索平衡二叉树的封装 1. 关联容器2. 键值对3. 树形结构的关联式容器3.1 set3.1.1 set介绍3.1.2 set 的使用1. set模板参数列表2. set的构造3. set 的迭代器4. set的容量5. set修改5. set的使用 3.2 multiset3.2.1 multiset的介绍3.2.1 multiset的使用 3.3 map3.3.1 map的介绍3…

Could not load the Qt platform plugin “xcb“

qt.core.plugin.loader: QLibraryPrivate::loadPlugin failed on “/home/ly/Qt/6.5.1/gcc_64/plugins/platforms/libqxcb.so” : “Cannot load library /home/ly/Qt/6.5.1/gcc_64/plugins/platforms/libqxcb.so: (libxcb-cursor.so.0: cannot open shared object file: No su…

VTK8.2手动卸载

利用源代码方式安装的VTK, 进行手动卸载[参考] 1、进入.. /VTK-8.2.0/build目录 make2、记录sudo make install的log log在uninstall.sh文件中 touch uninstall.sh && chmod 775 uninstall.sh && echo #!/bin/bash -v > uninstall.sh && sudo m…

【unity实战】制作俯视角射击游戏多种射击效果(一)

文章目录 本期目标前言欣赏开始1. 角色移动和场景搭建2. 绑定枪械2.1 首先将各种枪械的素材添加给人物作为子物体2.2 给枪械也分别添加两个子物体用作标记枪口和弹仓位置 3. 枪械动画4. 切换枪械5. 发射功能5.1 手枪(1) 枪械随着鼠标旋转(2) 射击时间间隔(3) 创建好子弹、弹壳和…

手把手教你如何做手机PCB电磁兼容性设计

电磁兼容性是指电子设备在各种电磁环境中仍能够协调、有效地进行工作的能力。电磁兼容性设计的目的是使电子设备既能抑制各种外来的干扰&#xff0c;使电子设备在特定的电磁环境中能够正常工作&#xff0c;同时又能减少电子设备本身对其它电子设备的电磁干扰。 1、选择合理的导…

【霹雳吧啦Wz】Transformer中Self-Attention以及Multi-Head Attention详解

文章目录 来源Transformer起源Self-Attention1. 求q、k、v2. 计算 a ^ ( s o f t m a x 那块 ) \hat{a} (softmax那块) a^(softmax那块)3. 乘V&#xff0c;计算结果 Multi-Head Attention位置编码 来源 b站视频 前天啥也不懂的时候点开来一看&#xff0c;各种模型和公式&#…

FreeRTOS 低功耗模式设计 STM32平台

1. STM32F105RBT6 的三种低功耗模式 1.1 sleep睡眠模式、stop停机模式、standby 待机模式 1.2 STM32中文参考手册有介绍STM32 低功耗模式的介绍 2. FreeRTOS 采用的是时间片轮转的抢占式任务调度机制&#xff0c;其低功耗设计思路一般是&#xff1a; ① 当运行空闲任务&#…

启动网站调试提示 HTTP 错误 403.14 – Forbidden Web 服务器被配置为不列出此目录的内容。

启动网站调试提示 HTTP 错误 403.14 – Forbidden Web 服务器被配置为不列出此目录的内容。 解决方案第一种.在网站的配置文件里添加第二种.ISS管理界面修改 解决方案 第一种.在网站的配置文件里添加 <system.webServer><directoryBrowse enabled"true" /&…

【RH850/U2A】:休眠唤醒

休眠唤醒 唤醒差异休眠差异休眠是解决整个系统待机时尽可能的减少功耗,相应的唤醒则是低功耗模式下整个系统可以被已知的条件唤醒系统,进而进入全功能模式。 RH850/U2A的配置和RH850/F1KM大同小异,本文只讲述差异部分,其他部分详见 【Davinci开发】:IO唤醒系统 唤醒差异 …

API验证器,帮助ReSharper开启VS插件新时代!

实质上&#xff0c;ReSharper特征可用于C#&#xff0c;VB.net&#xff0c;XML&#xff0c;Asp.net&#xff0c;XAML&#xff0c;和构建脚本。 使用ReSharper&#xff0c;你可以进行深度代码分析&#xff0c;智能代码协助&#xff0c;实时错误代码高亮显示&#xff0c;解决方案范…

WideNet:让网络更宽而不是更深

这是新加坡国立大学在2022 aaai发布的一篇论文。WideNet是一种参数有效的框架&#xff0c;它的方向是更宽而不是更深。通过混合专家(MoE)代替前馈网络(FFN)&#xff0c;使模型沿宽度缩放。使用单独LN用于转换各种语义表示&#xff0c;而不是共享权重。 混合专家(MoEs) 条件计…

STM32 串口代码配置

一、首先开发板上关于串口1的引脚配置已经配置好了&#xff0c;位置在SYSTEM的 usart.c 文件中&#xff08;注意&#xff1a;只配置了串口1的&#xff0c;其他使用时需要自己配置&#xff09; 重要的是明白配置的参数都是什么意思&#xff0c;针对实现不同的串口功能有什么影响…