改造python3中的http.server为简单的文件上传下载服务

news2025/2/25 8:50:54

改造

修改python3中的http.server.SimpleHTTPRequestHandler,实现简单的文件上传下载服务

 simple_http_file_server.py:

# !/usr/bin/env python3

import datetime
import email
import html
import http.server
import io
import mimetypes
import os
import posixpath
import re
import shutil
import sys
import urllib.error
import urllib.parse
import urllib.request
import socket
from http import HTTPStatus
import threading
import contextlib
 
__version__ = "0.1"
__all__ = ["MySimpleHTTPRequestHandler"]
 
class MySimpleHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
	server_version = "SimpleHTTP/" + __version__
	extensions_map = _encodings_map_default = {
		'.gz': 'application/gzip',
		'.Z': 'application/octet-stream',
		'.bz2': 'application/x-bzip2',
		'.xz': 'application/x-xz',
	}
 
	def __init__(self, *args, directory=None, **kwargs):
		if directory is None:
			directory = os.getcwd()
		self.directory = os.fspath(directory)
		super().__init__(*args, **kwargs)
 
	def do_GET(self):
		f = self.send_head()
		if f:
			try:
				self.copyfile(f, self.wfile)
			finally:
				f.close()
 
	def do_HEAD(self):
		f = self.send_head()
		if f:
			f.close()
 
	def send_head(self):
		path = self.translate_path(self.path)
		f = None
		if os.path.isdir(path):
			parts = urllib.parse.urlsplit(self.path)
			if not parts.path.endswith('/'):
				# redirect browser - doing basically what apache does
				self.send_response(HTTPStatus.MOVED_PERMANENTLY)
				new_parts = (parts[0], parts[1], parts[2] + '/',
							 parts[3], parts[4])
				new_url = urllib.parse.urlunsplit(new_parts)
				self.send_header("Location", new_url)
				self.end_headers()
				return None
			for index in "index.html", "index.htm":
				index = os.path.join(path, index)
				if os.path.exists(index):
					path = index
					break
			else:
				return self.list_directory(path)
 
		ctype = self.guess_type(path)
		if path.endswith("/"):
			self.send_error(HTTPStatus.NOT_FOUND, "File not found")
			return None
		try:
			f = open(path, 'rb')
		except OSError:
			self.send_error(HTTPStatus.NOT_FOUND, "File not found")
			return None
		try:
			fs = os.fstat(f.fileno())
			# Use browser cache if possible
			if ("If-Modified-Since" in self.headers
					and "If-None-Match" not in self.headers):
				# compare If-Modified-Since and time of last file modification
				try:
					ims = email.utils.parsedate_to_datetime(self.headers["If-Modified-Since"])
				except (TypeError, IndexError, OverflowError, ValueError):
					# ignore ill-formed values
					pass
				else:
					if ims.tzinfo is None:
						# obsolete format with no timezone, cf.
						# https://tools.ietf.org/html/rfc7231#section-7.1.1.1
						ims = ims.replace(tzinfo=datetime.timezone.utc)
					if ims.tzinfo is datetime.timezone.utc:
						# compare to UTC datetime of last modification
						last_modif = datetime.datetime.fromtimestamp(
							fs.st_mtime, datetime.timezone.utc)
						# remove microseconds, like in If-Modified-Since
						last_modif = last_modif.replace(microsecond=0)
 
						if last_modif <= ims:
							self.send_response(HTTPStatus.NOT_MODIFIED)
							self.end_headers()
							f.close()
							return None
 
			self.send_response(HTTPStatus.OK)
			self.send_header("Content-type", ctype)
			self.send_header("Content-Length", str(fs[6]))
			self.send_header("Last-Modified",
							 self.date_time_string(fs.st_mtime))
			self.end_headers()
			return f
		except:
			f.close()
			raise
 
	def list_directory(self, path):
		try:
			list_dir = os.listdir(path)
		except OSError:
			self.send_error(HTTPStatus.NOT_FOUND, "No permission to list_dir directory")
			return None
		list_dir.sort(key=lambda a: a.lower())
		r = []
		try:
			display_path = urllib.parse.unquote(self.path, errors='surrogatepass')
		except UnicodeDecodeError:
			display_path = urllib.parse.unquote(path)
		display_path = html.escape(display_path, quote=False)
		enc = sys.getfilesystemencoding()
 
		form = """
			<h1>文件上传</h1>\n
			<form ENCTYPE="multipart/form-data" method="post">\n
				<input name="file" type="file"/>\n
				<input type="submit" value="upload"/>\n
			</form>\n"""
		title = 'Directory listing for %s' % display_path
		r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
				 '"http://www.w3.org/TR/html4/strict.dtd">')
		r.append('<html>\n<head>')
		r.append('<meta http-equiv="Content-Type" '
				 'content="text/html; charset=%s">' % enc)
		r.append('<title>%s</title>\n</head>' % title)
		r.append('<body>%s\n<h1>%s</h1>' % (form, title))
		r.append('<hr>\n<ul>')
		for name in list_dir:
			fullname = os.path.join(path, name)
			displayname = linkname = name
			# Append / for directories or @ for symbolic links
			if os.path.isdir(fullname):
				displayname = name + "/"
				linkname = name + "/"
			if os.path.islink(fullname):
				displayname = name + "@"
				# Note: a link to a directory displays with @ and links with /
			r.append('<li><a href="%s">%s</a></li>' % (urllib.parse.quote(linkname, errors='surrogatepass'),
													   html.escape(displayname, quote=False)))
		r.append('</ul>\n<hr>\n</body>\n</html>\n')
		encoded = '\n'.join(r).encode(enc, 'surrogate escape')
		f = io.BytesIO()
		f.write(encoded)
		f.seek(0)
		self.send_response(HTTPStatus.OK)
		self.send_header("Content-type", "text/html; charset=%s" % enc)
		self.send_header("Content-Length", str(len(encoded)))
		self.end_headers()
		return f
 
	def translate_path(self, path):
		# abandon query parameters
		path = path.split('?', 1)[0]
		path = path.split('#', 1)[0]
		# Don't forget explicit trailing slash when normalizing. Issue17324
		trailing_slash = path.rstrip().endswith('/')
		try:
			path = urllib.parse.unquote(path, errors='surrogatepass')
		except UnicodeDecodeError:
			path = urllib.parse.unquote(path)
		path = posixpath.normpath(path)
		words = path.split('/')
		words = filter(None, words)
		path = self.directory
		for word in words:
			if os.path.dirname(word) or word in (os.curdir, os.pardir):
				# Ignore components that are not a simple file/directory name
				continue
			path = os.path.join(path, word)
		if trailing_slash:
			path += '/'
		return path
 
	def copyfile(self, source, outputfile):
		shutil.copyfileobj(source, outputfile)
 
	def guess_type(self, path):
		base, ext = posixpath.splitext(path)
		if ext in self.extensions_map:
			return self.extensions_map[ext]
		ext = ext.lower()
		if ext in self.extensions_map:
			return self.extensions_map[ext]
		guess, _ = mimetypes.guess_type(path)
		if guess:
			return guess
		return 'application/octet-stream'
 
	def do_POST(self):
		r, info = self.deal_post_data()
		self.log_message('%s, %s => %s' % (r, info, self.client_address))
		enc = sys.getfilesystemencoding()
		res = [
			'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
			'"http://www.w3.org/TR/html4/strict.dtd">',
			'<html>\n<head>',
			'<meta http-equiv="Content-Type" content="text/html; charset=%s">' % enc,
			'<title>%s</title>\n</head>' % "Upload Result Page",
			'<body><h1>%s</h1>\n' % "Upload Result"
		]
		if r:
			res.append('<p>SUCCESS: %s</p>\n' % info)
		else:
			res.append('<p>FAILURE: %s</p>' % info)
		res.append('<a href=\"%s\">back</a>' % self.headers['referer'])
		res.append('</body></html>')
		encoded = '\n'.join(res).encode(enc, 'surrogate escape')
		f = io.BytesIO()
		f.write(encoded)
		length = f.tell()
		f.seek(0)
		self.send_response(200)
		self.send_header("Content-type", "text/html")
		self.send_header("Content-Length", str(length))
		self.end_headers()
		if f:
			self.copyfile(f, self.wfile)
			f.close()
 
	def deal_post_data(self):
		content_type = self.headers['content-type']
		if not content_type:
			return False, "Content-Type header doesn't contain boundary"
		boundary = content_type.split("=")[1].encode()
		remain_bytes = int(self.headers['content-length'])
		line = self.rfile.readline()
		remain_bytes -= len(line)
		if boundary not in line:
			return False, "Content NOT begin with boundary"
		line = self.rfile.readline()
		remain_bytes -= len(line)
		fn = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', line.decode())
		if not fn:
			return False, "Can't find out file name..."
		path = self.translate_path(self.path)
		fn = os.path.join(path, fn[0])
		line = self.rfile.readline()
		remain_bytes -= len(line)
		line = self.rfile.readline()
		remain_bytes -= len(line)
		try:
			out = open(fn, 'wb')
		except IOError:
			return False, "Can't create file to write, do you have permission to write?"
 
		preline = self.rfile.readline()
		remain_bytes -= len(preline)
		while remain_bytes > 0:
			line = self.rfile.readline()
			remain_bytes -= len(line)
			if boundary in line:
				preline = preline[0:-1]
				if preline.endswith(b'\r'):
					preline = preline[0:-1]
				out.write(preline)
				out.close()
				return True, "File '%s' upload success!" % fn
			else:
				out.write(preline)
				preline = line
		return False, "Unexpect Ends of data."
 

def _get_best_family(*address):
	infos = socket.getaddrinfo(
		*address,
		type=socket.SOCK_STREAM,
		flags=socket.AI_PASSIVE,
	)
	family, type, proto, canonname, sockaddr = next(iter(infos))
	return family, sockaddr




def serve_forever(port=8000, bind=None, directory="."):
	"""
	This runs an HTTP server on port 8000 (or the port argument).
	"""

	# ensure dual-stack is not disabled; ref #38907
	class DualStackServer(http.server.ThreadingHTTPServer):
		def server_bind(self):
			# suppress exception when protocol is IPv4
			with contextlib.suppress(Exception):
				self.socket.setsockopt(
					socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
			return super().server_bind()

		def finish_request(self, request, client_address):
			self.RequestHandlerClass(request, client_address, self,
										directory=directory)


	HandlerClass=MySimpleHTTPRequestHandler
	ServerClass=DualStackServer
	protocol="HTTP/1.0"
	ServerClass.address_family, addr = _get_best_family(bind, port)
	HandlerClass.protocol_version = protocol
	with ServerClass(addr, HandlerClass) as httpd:
		host, port = httpd.socket.getsockname()[:2]
		url_host = f'[{host}]' if ':' in host else host
		print(
			f"Serving HTTP on {host} port {port} "
			f"(http://{url_host}:{port}/) ..."
		)
		try:
			httpd.serve_forever()
		except KeyboardInterrupt:
			print("\nKeyboard interrupt received, exiting.")
			sys.exit(0)
 
if __name__ == '__main__':
	import argparse
	parser = argparse.ArgumentParser()
	parser.add_argument('--bind', '-b', metavar='ADDRESS',
						help='specify alternate bind address '
							 '(default: all interfaces)')
	parser.add_argument('--directory', '-d', default=os.getcwd(),
						help='specify alternate directory '
							 '(default: current directory)')
	parser.add_argument('port', action='store', default=8000, type=int,
						nargs='?',
						help='specify alternate port (default: 8000)')
	args = parser.parse_args()

	# 在主线程中执行
	# serve_forever(
	# 	port=args.port,
	# 	bind=args.bind,
	#	directory = args.directory
	# )

	# 在子线程中执行,这样主线程可以执行异步工作
	thread1 = threading.Thread(name='t1',target= serve_forever,
			kwargs={
				"port" : args.port,
				"bind" : args.bind,
				"directory" : args.directory
			}
		)
	thread1.start()
	# thread1.join()
	

	

使用帮助:

>python3 simple_http_file_server.py --help              
usage: simple_http_file_server.py [-h] [--cgi] [--bind ADDRESS] [--directory DIRECTORY] [port]

positional arguments:
  port                  specify alternate port (default: 8000)

optional arguments:
  -h, --help            show this help message and exit
  --cgi                 run as CGI server
  --bind ADDRESS, -b ADDRESS
                        specify alternate bind address (default: all interfaces)
  --directory DIRECTORY, -d DIRECTORY
                        specify alternate directory (default: current directory)

使用示例:

>python3 simple_http_file_server.py -d ~ 12345
Serving HTTP on :: port 12345 (http://[::]:12345/) ...
::ffff:127.0.0.1 - - [30/Nov/2023 09:35:06] "GET / HTTP/1.1" 200 -

。。。

访问:

浏览器中访问:http://127.0.0.1:12345/


参考:

        Python3 实现简单HTTP服务器(附带文件上传)_python3 -m http.server-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1282116.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenCV-python:图像像素类型转换与归一化

目录 1.图像像素类型转换 2. 图像像素转换适用情形 3.图像归一化 4.归一化方法支持 5.归一化函数 6.知识笔记 1.图像像素类型转换 图像像素类型转换是指将图像的像素值从一种类型转换为另一种类型。常见的像素类型包括无符号整数类型&#xff08;如8位无符号整数、16位无符…

树_左叶子之和

//给定二叉树的根节点 root &#xff0c;返回所有左叶子之和。 // // // // 示例 1&#xff1a; // // // // //输入: root [3,9,20,null,null,15,7] //输出: 24 //解释: 在这个二叉树中&#xff0c;有两个左叶子&#xff0c;分别是 9 和 15&#xff0c;所以返回 24 //…

彩色成像的基础和应用 原理 Principles(一)

下面我将不定期尽可能出一系列&#xff08;我觉的非常好&#xff09;翻译的文章来解释颜色这们学科。【下图为此次翻译的书籍封面】 Introduction: 颜色是一种与光的物理学&#xff0c;物质的化学&#xff0c;物体的几何特性以及人…

电脑回收站还原的文件在哪里找到?如何找回回收站还原的文件

电脑回收站是一种非常有用的功能&#xff0c;可以帮助我们恢复无意中删除的文件。然而&#xff0c;许多人可能不清楚还原的文件在哪里可以找到。本文将为您带来详细解答&#xff0c;并帮助您找回回收站还原的文件。 电脑回收站还原的文件在哪里找到 当我们使用电脑的回收站功…

微信小程序开发平台系统源码 附带完整的搭建教程

随着移动互联网的快速发展&#xff0c;微信小程序作为一种新型的应用形态&#xff0c;凭借其轻量化、易用性等特点&#xff0c;逐渐成为了移动开发领域的新宠。 以下是部分代码示例&#xff1a; 系统特色功能一览&#xff1a; 1.完善的开发工具&#xff1a;本系统提供了一整套…

设计一个在裸机下使用的简单软件定时器(3):功能测试

0 前言 在RTOS中&#xff0c;我们经常用到软件定时器来为我们处理一些对于实时性要求不高的定时任务。在裸机开发中&#xff0c;我们可能也有很多需要定时执行的任务&#xff0c;为了优雅地执行这些定时任务&#xff0c;本文设计一个在裸机下使用的简单软件定时器&#xff0c;…

java基础之HashSet详解

HashSet详解 HashSet是基于HashMap实现的一个单列存储的集合类&#xff0c;将所有的数据存在HashMap的key值中&#xff0c;而value全部使用一个Object对象存储 继承关系 public class HashSet<E> extends AbstractSet<E> implements Set<E>, Cloneable…

Unity | 渡鸦避难所-1 | 修复资源导入后呈现洋红色(Built-in 转 URP)

1 前言 Unity 编辑器导入 Asset Store 的资源包后&#xff0c;在预览和使用时&#xff0c;发现对象显示为洋红色 以小狐狸为例&#xff0c;打开资源包中的场景&#xff0c;可以看到小狐狸和地板均显示为洋红色 这是因为 Asset Store 中的资源包大部分是针对内置渲染管线项目制…

Python代码部署的三种加密方案,其中一种你肯定不知道

文章目录 前言一、代码混淆二、代码打包三、代码编译3.1 pyarmor快速使用3.2 pyarmor进阶使用关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例③Python小游戏源码五、…

tNavigator 23.2 x64

Rock Flow Dynamics&#xff08;RFD&#xff09;很高兴地宣布发布我们旗舰产品tNavigator的最新版本。版本 23.2 现在可供用户使用。 tNavigator长期以来一直被认为是油藏工程师和地质学家的强大工具&#xff0c;可为复杂的油藏行为提供准确的建模和模拟。最新版本为所有模块带…

uni-app 微信小程序之整合colorui

1. 介绍 ColorUI uni-app版本支持多端&#xff0c;兼容性经过近上万使用者测试、反馈、改进&#xff0c;目前已非常稳定&#xff01; 说白了&#xff0c;就是uni-app版本的 tailwindcss&#xff0c;只是uni-app版本更适合使用 colorui 2. 开始 下载源码解压获得 /Colorui-U…

钉钉提交审批意见,并上传附件接口集成

一&#xff1a;适配器 DingtalkApprovalFilesExecute 参考方案链接&#xff1a;轻易云数据集成平台 二&#xff1a;请求接口。配置参数 接口文档&#xff1a;使用了新旧接口 服务端API发起带有附件的审批流并下载附件 - 钉钉开放平台 接口&#xff1a;topapi/processinsta…

mac 配置hosts

hosts 目录 /etc/hosts 配置方式 ip 域名 保存退出后运行&#xff1a;(清楚dns缓存) sudo killall -HUP mDNSResponder

(C++)和为s的两个数字--双指针算法

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 和为S的两个数字_牛客题霸_牛客网输入一个升序数组 array 和一个数字S&#xff0c;在数组中查找两个数&#xff0c;使得他们的和正好是S&#xff0c;如果。题目来自【牛客题霸】https://www.nowcoder.com/practice/390da4f7a…

网工学习9-STP配置

如图 1 所示&#xff0c;当前网络中存在环路&#xff0c; SwitchA 、SwitchB 、SwitchC 和 SwitchD 都运行 STP&#xff0c;通过 彼此交互信息发现网络中的环路&#xff0c;并有选择的对某个端口进行阻塞&#xff0c;最终将环形网络结构修剪成无 环路的树形网络结构&#xff…

PAD平板签约投屏-高端活动的选择

传统的现场纸质签约仪式除了缺乏仪式感之外还缺少互动性&#xff0c;如果要将签约的过程投放到大屏幕上更是需要额外的硬件设备成本。相比于传统的纸质签约仪式&#xff0c;平板现场电子签约的形式更加的新颖、更富有科技感、更具有仪式感。 平板签约投屏是应用于会议签字仪式的…

Excel如何设置在未打印时显示虚线打印时不显示虚线

记得之前分享过一个BOM表模板&#xff0c;但是在我打印时&#xff0c;发现明明是留空白的地方却打印出来的虚线 后来&#xff0c;看了自己的页面布局&#xff0c;原来是网格线设置错误了 当我设置为查看时显示网格线&#xff0c;打印时不显示网格线&#xff0c;这样就正常了

WPS项目编号(序号)无法继续前一列表

问题&#xff1a;在编写文档中&#xff0c;序号无法继续前一列表&#xff0c;序号之间无法自动连接。 解决方法&#xff1a;使用格式刷。格式刷是复制格式的操作&#xff0c;可以用于选中已有格式的单元格&#xff0c;复制到需要设置格式的单元格。 参考文章&#xff1a;在wps…

前端可视化大屏自适应终极解决方案autofit.js

可视化大屏适配/自适应现状 可视化大屏的适配是一个老生常谈的话题了&#xff0c;现在其实不乏一些大佬开源的自适应插件、工具但是我为什么还要重复造轮子呢&#xff1f;因为目前市面上适配工具每一个都无法做到完美的效果&#xff0c;做出来的东西都差不多&#xff0c;最终实…

编程题:电话号码

&#x1f4d1;打牌 &#xff1a; da pai ge的个人主页 &#x1f324;️个人专栏 &#xff1a; da pai ge的博客专栏 ☁️宝剑锋从磨砺出&#xff0c;梅花香自苦寒来 &#x1f4d1;题目解析 这个题目比较…