改造python3中的http.server为简单的文件下载服务

news2025/1/13 7:42:11

改造

修改python3中的http.server.SimpleHTTPRequestHandler,实现简单的文件上传下载服务

 simple_http_file_server.py:

# !/usr/bin/env python3

import datetime
import email
import html
import http.server
import io
import mimetypes
import os
import posixpath
import re
import shutil
import sys
import urllib.error
import urllib.parse
import urllib.request
import socket
from http import HTTPStatus
import threading
import contextlib
 
__version__ = "0.1"
__all__ = ["MySimpleHTTPRequestHandler"]
 
class MySimpleHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
	server_version = "SimpleHTTP/" + __version__
	extensions_map = _encodings_map_default = {
		'.gz': 'application/gzip',
		'.Z': 'application/octet-stream',
		'.bz2': 'application/x-bzip2',
		'.xz': 'application/x-xz',
	}
 
	def __init__(self, *args, directory=None, **kwargs):
		if directory is None:
			directory = os.getcwd()
		self.directory = os.fspath(directory)
		super().__init__(*args, **kwargs)
 
	def do_GET(self):
		f = self.send_head()
		if f:
			try:
				self.copyfile(f, self.wfile)
			finally:
				f.close()
 
	def do_HEAD(self):
		f = self.send_head()
		if f:
			f.close()
 
	def send_head(self):
		path = self.translate_path(self.path)
		f = None
		if os.path.isdir(path):
			parts = urllib.parse.urlsplit(self.path)
			if not parts.path.endswith('/'):
				# redirect browser - doing basically what apache does
				self.send_response(HTTPStatus.MOVED_PERMANENTLY)
				new_parts = (parts[0], parts[1], parts[2] + '/',
							 parts[3], parts[4])
				new_url = urllib.parse.urlunsplit(new_parts)
				self.send_header("Location", new_url)
				self.end_headers()
				return None
			for index in "index.html", "index.htm":
				index = os.path.join(path, index)
				if os.path.exists(index):
					path = index
					break
			else:
				return self.list_directory(path)
 
		ctype = self.guess_type(path)
		if path.endswith("/"):
			self.send_error(HTTPStatus.NOT_FOUND, "File not found")
			return None
		try:
			f = open(path, 'rb')
		except OSError:
			self.send_error(HTTPStatus.NOT_FOUND, "File not found")
			return None
		try:
			fs = os.fstat(f.fileno())
			# Use browser cache if possible
			if ("If-Modified-Since" in self.headers
					and "If-None-Match" not in self.headers):
				# compare If-Modified-Since and time of last file modification
				try:
					ims = email.utils.parsedate_to_datetime(self.headers["If-Modified-Since"])
				except (TypeError, IndexError, OverflowError, ValueError):
					# ignore ill-formed values
					pass
				else:
					if ims.tzinfo is None:
						# obsolete format with no timezone, cf.
						# https://tools.ietf.org/html/rfc7231#section-7.1.1.1
						ims = ims.replace(tzinfo=datetime.timezone.utc)
					if ims.tzinfo is datetime.timezone.utc:
						# compare to UTC datetime of last modification
						last_modif = datetime.datetime.fromtimestamp(
							fs.st_mtime, datetime.timezone.utc)
						# remove microseconds, like in If-Modified-Since
						last_modif = last_modif.replace(microsecond=0)
 
						if last_modif <= ims:
							self.send_response(HTTPStatus.NOT_MODIFIED)
							self.end_headers()
							f.close()
							return None
 
			self.send_response(HTTPStatus.OK)
			self.send_header("Content-type", ctype)
			self.send_header("Content-Length", str(fs[6]))
			self.send_header("Last-Modified",
							 self.date_time_string(fs.st_mtime))
			self.end_headers()
			return f
		except:
			f.close()
			raise
 
	def list_directory(self, path):
		try:
			list_dir = os.listdir(path)
		except OSError:
			self.send_error(HTTPStatus.NOT_FOUND, "No permission to list_dir directory")
			return None
		list_dir.sort(key=lambda a: a.lower())
		r = []
		try:
			display_path = urllib.parse.unquote(self.path, errors='surrogatepass')
		except UnicodeDecodeError:
			display_path = urllib.parse.unquote(path)
		display_path = html.escape(display_path, quote=False)
		enc = sys.getfilesystemencoding()
 
		form = """
			<h1>文件上传</h1>\n
			<form ENCTYPE="multipart/form-data" method="post">\n
				<input name="file" type="file"/>\n
				<input type="submit" value="upload"/>\n
			</form>\n"""
		title = 'Directory listing for %s' % display_path
		r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
				 '"http://www.w3.org/TR/html4/strict.dtd">')
		r.append('<html>\n<head>')
		r.append('<meta http-equiv="Content-Type" '
				 'content="text/html; charset=%s">' % enc)
		r.append('<title>%s</title>\n</head>' % title)
		r.append('<body>%s\n<h1>%s</h1>' % (form, title))
		r.append('<hr>\n<ul>')
		for name in list_dir:
			fullname = os.path.join(path, name)
			displayname = linkname = name
			# Append / for directories or @ for symbolic links
			if os.path.isdir(fullname):
				displayname = name + "/"
				linkname = name + "/"
			if os.path.islink(fullname):
				displayname = name + "@"
				# Note: a link to a directory displays with @ and links with /
			r.append('<li><a href="%s">%s</a></li>' % (urllib.parse.quote(linkname, errors='surrogatepass'),
													   html.escape(displayname, quote=False)))
		r.append('</ul>\n<hr>\n</body>\n</html>\n')
		encoded = '\n'.join(r).encode(enc, 'surrogate escape')
		f = io.BytesIO()
		f.write(encoded)
		f.seek(0)
		self.send_response(HTTPStatus.OK)
		self.send_header("Content-type", "text/html; charset=%s" % enc)
		self.send_header("Content-Length", str(len(encoded)))
		self.end_headers()
		return f
 
	def translate_path(self, path):
		# abandon query parameters
		path = path.split('?', 1)[0]
		path = path.split('#', 1)[0]
		# Don't forget explicit trailing slash when normalizing. Issue17324
		trailing_slash = path.rstrip().endswith('/')
		try:
			path = urllib.parse.unquote(path, errors='surrogatepass')
		except UnicodeDecodeError:
			path = urllib.parse.unquote(path)
		path = posixpath.normpath(path)
		words = path.split('/')
		words = filter(None, words)
		path = self.directory
		for word in words:
			if os.path.dirname(word) or word in (os.curdir, os.pardir):
				# Ignore components that are not a simple file/directory name
				continue
			path = os.path.join(path, word)
		if trailing_slash:
			path += '/'
		return path
 
	def copyfile(self, source, outputfile):
		shutil.copyfileobj(source, outputfile)
 
	def guess_type(self, path):
		base, ext = posixpath.splitext(path)
		if ext in self.extensions_map:
			return self.extensions_map[ext]
		ext = ext.lower()
		if ext in self.extensions_map:
			return self.extensions_map[ext]
		guess, _ = mimetypes.guess_type(path)
		if guess:
			return guess
		return 'application/octet-stream'
 
	def do_POST(self):
		r, info = self.deal_post_data()
		self.log_message('%s, %s => %s' % (r, info, self.client_address))
		enc = sys.getfilesystemencoding()
		res = [
			'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
			'"http://www.w3.org/TR/html4/strict.dtd">',
			'<html>\n<head>',
			'<meta http-equiv="Content-Type" content="text/html; charset=%s">' % enc,
			'<title>%s</title>\n</head>' % "Upload Result Page",
			'<body><h1>%s</h1>\n' % "Upload Result"
		]
		if r:
			res.append('<p>SUCCESS: %s</p>\n' % info)
		else:
			res.append('<p>FAILURE: %s</p>' % info)
		res.append('<a href=\"%s\">back</a>' % self.headers['referer'])
		res.append('</body></html>')
		encoded = '\n'.join(res).encode(enc, 'surrogate escape')
		f = io.BytesIO()
		f.write(encoded)
		length = f.tell()
		f.seek(0)
		self.send_response(200)
		self.send_header("Content-type", "text/html")
		self.send_header("Content-Length", str(length))
		self.end_headers()
		if f:
			self.copyfile(f, self.wfile)
			f.close()
 
	def deal_post_data(self):
		content_type = self.headers['content-type']
		if not content_type:
			return False, "Content-Type header doesn't contain boundary"
		boundary = content_type.split("=")[1].encode()
		remain_bytes = int(self.headers['content-length'])
		line = self.rfile.readline()
		remain_bytes -= len(line)
		if boundary not in line:
			return False, "Content NOT begin with boundary"
		line = self.rfile.readline()
		remain_bytes -= len(line)
		fn = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', line.decode())
		if not fn:
			return False, "Can't find out file name..."
		path = self.translate_path(self.path)
		fn = os.path.join(path, fn[0])
		line = self.rfile.readline()
		remain_bytes -= len(line)
		line = self.rfile.readline()
		remain_bytes -= len(line)
		try:
			out = open(fn, 'wb')
		except IOError:
			return False, "Can't create file to write, do you have permission to write?"
 
		preline = self.rfile.readline()
		remain_bytes -= len(preline)
		while remain_bytes > 0:
			line = self.rfile.readline()
			remain_bytes -= len(line)
			if boundary in line:
				preline = preline[0:-1]
				if preline.endswith(b'\r'):
					preline = preline[0:-1]
				out.write(preline)
				out.close()
				return True, "File '%s' upload success!" % fn
			else:
				out.write(preline)
				preline = line
		return False, "Unexpect Ends of data."
 

def _get_best_family(*address):
	infos = socket.getaddrinfo(
		*address,
		type=socket.SOCK_STREAM,
		flags=socket.AI_PASSIVE,
	)
	family, type, proto, canonname, sockaddr = next(iter(infos))
	return family, sockaddr




def serve_forever(port=8000, bind=None, directory="."):
	"""
	This runs an HTTP server on port 8000 (or the port argument).
	"""

	# ensure dual-stack is not disabled; ref #38907
	class DualStackServer(http.server.ThreadingHTTPServer):
		def server_bind(self):
			# suppress exception when protocol is IPv4
			with contextlib.suppress(Exception):
				self.socket.setsockopt(
					socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
			return super().server_bind()

		def finish_request(self, request, client_address):
			self.RequestHandlerClass(request, client_address, self,
										directory=directory)


	HandlerClass=MySimpleHTTPRequestHandler
	ServerClass=DualStackServer
	protocol="HTTP/1.0"
	ServerClass.address_family, addr = _get_best_family(bind, port)
	HandlerClass.protocol_version = protocol
	with ServerClass(addr, HandlerClass) as httpd:
		host, port = httpd.socket.getsockname()[:2]
		url_host = f'[{host}]' if ':' in host else host
		print(
			f"Serving HTTP on {host} port {port} "
			f"(http://{url_host}:{port}/) ..."
		)
		try:
			httpd.serve_forever()
		except KeyboardInterrupt:
			print("\nKeyboard interrupt received, exiting.")
			sys.exit(0)
 
if __name__ == '__main__':
	import argparse
	parser = argparse.ArgumentParser()
	parser.add_argument('--bind', '-b', metavar='ADDRESS',
						help='specify alternate bind address '
							 '(default: all interfaces)')
	parser.add_argument('--directory', '-d', default=os.getcwd(),
						help='specify alternate directory '
							 '(default: current directory)')
	parser.add_argument('port', action='store', default=8000, type=int,
						nargs='?',
						help='specify alternate port (default: 8000)')
	args = parser.parse_args()

	# 在主线程中执行
	# serve_forever(
	# 	port=args.port,
	# 	bind=args.bind,
	#	directory = args.directory
	# )

	# 在子线程中执行,这样主线程可以执行异步工作
	thread1 = threading.Thread(name='t1',target= serve_forever,
			kwargs={
				"port" : args.port,
				"bind" : args.bind,
				"directory" : args.directory
			}
		)
	thread1.start()
	# thread1.join()
	

	

使用帮助:

>python3 simple_http_file_server.py --help              
usage: simple_http_file_server.py [-h] [--cgi] [--bind ADDRESS] [--directory DIRECTORY] [port]

positional arguments:
  port                  specify alternate port (default: 8000)

optional arguments:
  -h, --help            show this help message and exit
  --cgi                 run as CGI server
  --bind ADDRESS, -b ADDRESS
                        specify alternate bind address (default: all interfaces)
  --directory DIRECTORY, -d DIRECTORY
                        specify alternate directory (default: current directory)

使用示例:

>python3 simple_http_file_server.py -d ~ 12345
Serving HTTP on :: port 12345 (http://[::]:12345/) ...
::ffff:127.0.0.1 - - [30/Nov/2023 09:35:06] "GET / HTTP/1.1" 200 -

。。。

访问:

浏览器中访问:http://127.0.0.1:12345/


参考:

        Python3 实现简单HTTP服务器(附带文件上传)_python3 -m http.server-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1274481.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

游戏反Frida注入检测方案

在游戏安全对抗过程中&#xff0c;有不少外挂的实现基于对游戏内存模块进行修改&#xff0c;这类外挂通常会使用内存修改器&#xff0c;除此之外&#xff0c;还有一种门槛相对更高、也更难检测的「注入挂」。 据FairGuard游戏安全数据统计&#xff0c;在游戏面临的众多安全风险…

uniapp微信小程序实现地图展示控件

最终实现效果&#xff1a; 地图上展示控件&#xff0c;并可以点击。 目录 一、前言 二、在地图上展示控件信息 点击后可进行绘制面图形 1.使用cover-view将控件在地图上展示 2.设置控件样式&#xff0c;使用好看的图标 3.控件绑定点击事件 一、前言 原本使用的是control…

应用于智慧城管的AI边缘盒子+AI算法一体化解决方案

智慧城管支持十几种城市违规违法场景的识别&#xff0c;覆盖着市容环境、街面秩序、宣传广告、市政设施等类别&#xff0c;能够在很大程度上引导责任单位进行自动整改和自治&#xff0c;积极引导市民加强自我约束、自我教育、自我管理&#xff0c;努力增强市民参与城市管理的责…

Maya 2024(3D建模、动画和渲染软件)

Maya 2024是一款非常强大的3D建模、动画和渲染软件&#xff0c;它提供了许多新功能和改进&#xff0c;以帮助建模师、动画师和渲染师更加高效地进行创作。 在建模方面&#xff0c;Maya 2024引入了Symmetry&#xff08;对称&#xff09;功能&#xff0c;可以在网格两侧生成均匀…

(分类)KNN算法- 参数调优

在此专栏的上一篇文章的基础上&#xff0c;进行交叉实验获取最佳的K值 上一篇文章&#xff1a;KNN算法案例-鸢尾花分类 数据拆分的过程&#xff1a; 交叉验证&#xff08;Cross Validation&#xff09; 是一种在机器学习中广泛使用的模型评估和参数调优方法。在训练模型时&…

设计模式精讲:掌握工厂方法与抽象工厂的精髓

设计模式精讲&#xff1a;掌握工厂方法与抽象工厂的精髓 一、引言&#xff1a;如何学习设计模式&#xff1f;二、工厂方法&#xff08;也叫工厂模式&#xff09;2.1、代码结构2.2、符合的设计原则2.3、小结 三、抽象工厂3.1、代码结构3.2、符合的设计原则3.3、小结 总结 一、引…

麒麟操作系统根目录权限777修复方法

在麒麟操作系统中&#xff0c;误操作执行&#xff1a;chmod -R 777 /后&#xff0c;整个根目录权限都变成了777&#xff0c;先找一台相同版本的系统&#xff0c;越干净越好&#xff0c;把该系统的权限导出。 1.在好的机器上执行&#xff1a; #cd / 到根目录 # sudo…

Docker容器中的OpenCV:轻松构建可移植的计算机视觉环境

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 构建可移植的计算机视觉环境 文章目录 前言引言简介&#xff1a;目的和重要性&#xff1a; 深入理解Docker和OpenCVDocker的基本概念和优势&#xff1a;OpenCV简介和应用领域&#xff1a;…

四、虚拟机网络配置

目录 1、VMware网卡配置模式 1.1 桥接模式 1.2 NAT模式 1.3 仅主机模式 ​​​​​​​2、编辑虚拟机的网络编辑器 ​​​​​​​3、编辑Window的虚拟网卡 ​​​​​​​4、修改IP地址为静态 4.1 查看网卡名字 4.2 编辑修改网卡IP地址的配置文件 4.3 重启网络: 4.…

【Docker】Swarm的overlay网络

对于理解swarm的网络来讲&#xff0c;个人认为最重要的两个点&#xff1a; 第一是外部如何访问部署运行在swarm集群内的服务&#xff0c;可以称之为入方向流量&#xff0c;在swarm里我们通过ingress来解决。 第二是部署在swarm集群里的服务&#xff0c;如何对外进行访问&…

探索Playwright的现代自动化测试力量

在当今数字化时代&#xff0c;Web应用程序的质量和稳定性对于企业的成功至关重要。为了确保Web应用程序的无缝运行&#xff0c;自动化测试工具成为了开发人员和测试团队的重要工具。多年来&#xff0c;Selenium一直是自动化测试的黄金标准&#xff0c;然而&#xff0c;在不久前…

从0开始学习JavaScript--JavaScript CommonJS 和 Node.js 模块系统

JavaScript 的模块系统是实现模块化开发的关键&#xff0c;而 CommonJS 规范及其在 Node.js 中的应用是其中最为突出的例子。本文将深入研究 JavaScript 中的 CommonJS 模块规范&#xff0c;解析其核心概念&#xff0c;并通过丰富的示例代码展示其在 Node.js 中的实际应用。 C…

MySQL核心知识点整理大全1-笔记

MySQL 是一种流行的关系型数据库管理系统&#xff0c;它是以C和C语言编写的&#xff0c;最初是由瑞典公司MySQL AB开发的&#xff0c;现在是由Oracle公司维护和支持。MySQL是开源软件&#xff0c;可在Windows、Linux、Mac OS、FreeBSD等各种操作系统上运行。MySQL的主要特点是速…

数字图像处理(实践篇)十四 图像金字塔

目录 一 图像金字塔 二 涉及的函数 三 实践 一 图像金字塔 在某些情况下&#xff0c;需要处理不同分辨率的&#xff08;相同&#xff09;图像。比如&#xff0c;在图像中搜索某些目标&#xff08;比如人脸&#xff09;的时候&#xff0c;不确定该目标在所述图像中会以多大的…

智能优化算法应用:基于象群算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于象群算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于象群算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.象群算法4.实验参数设定5.算法结果6.参考文献7.MATLAB…

HTML5+CSS3小实例:纯CSS实现文字组成肖像特效

实例:纯CSS实现文字组成肖像特效 技术栈:HTML+CSS 效果: 源码: 【HTML】 <!DOCTYPE html> <html><head><meta http-equiv="content-type" content="text/html; charset=utf-8"><meta name="viewport" conten…

Java SpringBoot Controller常见写法

文章目录 环境Controller调用脚本运行结果总结 环境 系统: windows 11 工具: java, idea, git bash Controller 接口常见有以下几种方式 其中&#xff1a; Tobj 调用脚本 我的是windows 系统&#xff0c;使用 git bash 窗口运行, 用 cmd 或者 power shell 会有问题 curl …

《合成孔径雷达成像算法与实现》_使用CS算法对RADARSAT-1数据进行成像

CSA 简介&#xff1a;Chirp Scaling 算法 (简称 CS 算法&#xff0c;即 CSA) 避免了 RCMC 中的插值操作。该算法基于 Scaling 原理&#xff0c;通过对 chirp 信号进行频率调制&#xff0c;实现了对信号的尺度变换或平移。基于这种原理&#xff0c;可以通过相位相乘代替时域插值…

ssh连接docker容器处理备忘

1、查看容器ip&#xff0c;记下来之后要用 docker inspect elastic | grep IPAddress 2、使用root进入docker容器 docker exec -it -u root elastic /bin/bash 3、安装openssh #更新apt apt-get update#安装ssh client apt-get install openssh-client#安装ssh server apt-…

子类拷贝构造函数会调用父类拷贝构造函数吗?

一. 编译器提供的默认子类拷贝构造函数会调用父类拷贝构造函数。 #include <iostream> #include <string> using namespace std;class Parent { public:Parent(string home_address "中国") : m_home_address(home_address) {cout << "调用…