Rqtz : 个人主页
共享IT之美,共创机器未来
Sharing the Beauty of IT and Creating the Future of Machines Together
目录
项目背景
编辑专有名词介绍
服务器GUI展示
功能(位置见上图序号)
客户端GUI展示(h5+css+js),平板,手机
动图编辑
视频 图片互传-CSDN直播
一键自动获取IP地址
websocket通信实现
按钮映射到新线程启动websocket服务器
python中使用async异步实现全双工通信,B/S主动发送数据,被动接收数据
图片二进制转换显示到Qt中label控件(涉及到opencv)
上一张,下一张功能实现
整体代码结构
最后
项目背景
由于比赛需要,电脑的window系统无法满足要求,因此就需要安装linux系统,采用双系统安装。安装完成之后,发现在手机端与电脑端,电脑端和电脑端进行通信(传输图片)时,没有window上方便,用传统的方式传输的话,大家可能都倾向于QQ或微信,但是在linux上可能就不是那么方便。因此,基于这个问题,我想要开发一个可以在任何平台都可以运行的图片互传软件,一开始想借助python在各个操作系统上的通用,使用CS架构将服务器和客户端都编写成基于Pyqt的Gui界面,但是他们虽然支持在电脑端的各个操作系统之间运行,但是安卓或IOS平台就不是很优雅,手机不能使用的话,那就失去了方便性。因此,我将服务器端还保持原有的Pyqt的开发方式,在客户端采用Websocket的方式,原因是websocket支持javascripts编写,这样就可以通过浏览器来建立客户端,而浏览器在任何平台都可以使用,包括安卓和IOS平台,这样的BS架构就非常优雅的解决了安卓平台的限制性。同样这个项目也是对自己学习的一个检验。
经过测试,可以在手机端与电脑端,平板端与电脑端,电脑端与电脑端进行全双工的实时通信!
客户端实现请看 Websocket通信实战项目(js)(图片互传应用)(下)客户端H5+css+js实现-CSDN博客
直接看python异步通信async,点击目录
专有名词介绍
- websocket协议:
WebSocket是一种实现在单个TCP连接上进行全双工通信的网络传输协议。这种协议被设计用于改善客户端和服务器之间实时通信的效率,允许双方同时发送和接收信息,而无需像传统HTTP请求那样轮询。
- CS架构:
CS架构则是由客户端和服务器端组成的两层结构,客户端包含业务逻辑和界面展示,服务器端则负责数据管理。这种架构适用于局域网环境,能够提供快速响应和强大的事务处理能力。CS软件通常需要专门安装和维护客户端程序,因此安全性较高,个性化能力较强。然而,这也导致升级和维护成本较高,且兼容性受限于特定操作系统。
- BS架构
BS架构是基于浏览器和服务器的体系结构,用户界面通过Web浏览器实现,主要业务逻辑在服务器端处理。这种架构使得软件能够在不同平台上运行,客户端零维护,但个性化能力较低,响应速度相对较慢。由于不需要专门安装客户端程序,只需一个网络链接即可访问,这极大地方便了用户。然而,BS架构对网络稳定性要求较高,对硬件的直接支持较弱。
服务器GUI展示
功能(位置见上图序号)
- 点击按钮启动websocket服务器
- 一键自动识别本机 ip地址
- 图片接收并显示在窗口中
- 图片数量两张及以上时,可使用上一战,下一张切换图片
- 支持滚动条,按钮放大缩小图片
- 保存客户端发送的图片,支持自定义保存图片路径及名称
- 在服务器端主动向客户端发送选择的图片,并显示图片路径
- 必要信息输出在窗口中,方便观察。
客户端GUI展示(h5+css+js),平板,手机
动图
视频 图片互传-CSDN直播
一键自动获取IP地址
所谓的自动获取ip地址,本质上是在终端中输入查询ip地址的命令,windows上使用ipconfig,linux(这里是ubuntu)和mac上使用ifconfig,但是使用python要自动获取,省去了打开终端输入命令寻找ip的步骤,就需要使用python的os库,下面请看代码
def autoip(self):
if os.name == 'nt':
print("当前操作系统是Windows")
output = os.popen("ipconfig | findstr \"IPv4\"").read()
ip = output.split("\n")
self.myapp.ip.setText(ip[1].split(": ")[1])
elif os.name == 'posix':
print("当前操作系统是Linux")
output = os.popen("ifconfig | awk '/inet /{print $2}'").read()
ip = output.split("\n")
self.myapp.ip.setText(ip[1])
elif os.name == 'darwin':
print("当前操作系统是Mac")
output = os.popen("ifconfig en0 | awk '/inet /{print $2}'").read()
self.myapp.ip.setText(output)
1.判断是哪种操作系统
通过os.name输出的字符串来判断是哪种操作系统:
- ‘nt’ --> Windows系统
- ‘posix’ --> Linux系统
- ‘darwin’ --> Mac系统
2.使用os.popen函数获取命令输出
- windows系统
output = os.popen("ipconfig | findstr \"IPv4\"").read()
解释:
ipconfig:windows查询ip地址的命令
“I” :将命令通过管道传入 findstr命令(windows特有命令)
findstr \"IPv4\"" :查询命令输出中含有IPV4的那一行,注意\"IPv4\"有双引号
read() : 获取输出
- Linux系统
output = os.popen("ifconfig | awk '/inet /{print $2}'").read()
解释:
fconfig:linux查询ip地址的命令
“I” :将命令通过管道传入 awk命令(linux特有命令)
awk '/inet /{print $2}' 查询命令输出中含有IPV4的那一行的第二段字符串
read() : 获取输出
打印之后有两个ip,一个是本地,一个是WLAN,
使用split 函数
ip = output.split("\n")
self.myapp.ip.setText(ip[1])就可以将ip地址设置到qt的linedit控件中
样例:
- Mac系统
output = os.popen("ifconfig en0 | awk '/inet /{print $2}'").read()
查询指定装置en0,其他和上述一样
websocket通信实现
按钮映射到新线程启动websocket服务器
1.将按钮通过信号和曹连接到启动新线程函数中
#初始化信号和槽
self.myapp.start.clicked.connect(self.newprocess)
newprocess为启动新的线程的函数
2.启动子线程函数newprocess实现
#启用子线程
def newprocess(self):
if self.myapp.port.text() == "" or self.myapp.ip.text() == "":
self.myapp.picdata.append("【"+str(time.time())+"】"+"【错误】:"+"请输入端口或者ip地址")
else:
th = threading.Thread(target=self.connect_server)
th.start()
self.myapp.start.setDisabled(True)
解释:
(1) if self.myapp.port.text() == "" or self.myapp.ip.text() == "":
判断端口输入框和ip地址输入框是否为空,为空则发出警告,
(2) th = threading.Thread(target=self.connect_server) th.start()
不为空则可以用使用threading函数来创建一个线程启动websocket服务器。
(3)self.myapp.start.setDisabled(True)
启动成功则可以将按钮设置为不能点击,防止重复启动服务器
问题
为什么要用一个新的线程呢?因为websocket服务器启动时,会阻塞当前线程,当前有一个主线程用于GUI界面的交互(鼠标点击按钮,拖动页面等),如果服务器在主线程启动,且一直没有客户端连接的话,界面就会卡死,所有按钮都无法点击,因为主线程阻塞。所以要用一个新的线程启动服务器。
2.子线程函数connect_server实现,异步,协程
#初始化websocket服务器,异步
def connect_server(self):
self.emitdata.emit("【提示】:"+"服务器监听中")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.start_server = websockets.serve(self.handler,self.myapp.ip.text(), 8899,max_size=7000000)
loop.run_until_complete(self.start_server)
loop.run_forever()
#在显式的stop事件循环后,取消所有任务
for task in asyncio.all_tasks(loop):
task.cancel()
print(task.cancelled())
loop.close()
解释:
(1)
self.emitdata.emit("【提示】:"+"服务器监听中")
由于connect_server是一个子线程,子线程中无法直接访问主线程,emitdata是自定义的信号,通过在合适的位置发射信号,再连接到特定的函数中.
但是我的ui元素是定义在类的self属性中的,子线程可以直接通过self直接访问它,但是经过测试发现我们在子线程中直接向QTextEdit中增添数据时,会报错QObject::connect: Cannot queue arguments of type 'QTextCursor',因此最好还是使用信号和槽的方式来进行主子线程通信.
(2)
loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)
使用python中asyncio库新建一个event_loop事件循环,并且将新创建的事件循环设置为当前线程的事件循环,事件循环是处理异步操作的核心组件
(3)
self.start_server = websockets.serve(self.handler,self.myapp.ip.text(), 8899,max_size=7000000)
为什么是异步的呢,因为websocket服务器的回调函数self.handler必须是一个异步函数,
websockets.serve函数的参数
- 第一个参数:服务器连接成功后调用的函数,必须是异步的
- 第二个参数:ip地址
- 第三个参数:端口号
- 第四个参数:传输的最大字节数
- 还有其他可选参数,这里只用了四个
(4)
loop.run_until_complete(self.start_server) loop.run_forever()
运行传入的协程self.start_server,并让事件循环一直运行下去,self.start_server是一个协程
(5)bug
#在显式的stop事件循环后,取消所有任务 for task in asyncio.all_tasks(loop): task.cancel() print(task.cancelled()) loop.close()
由于loop.run_forever()会让事件循环一直运行下去,期间会阻塞线程,直到显式的使用stop()方法,这个stop方法的调用是在异步发送数据函数中通过捕捉closeflag标志位来实现的.然后在但前的事件循环中取消所有任务,但是发现有个报错,我一直都没有解决,下文也有提到
值的是task4被取消但是仍在挂起状态,这个task4是一个WebSocketServerProtocol.handler().
python中使用async异步实现全双工通信,B/S主动发送数据,被动接收数据
websocket回调函数
#websocket处理函数
async def handler(self,websocket,path):
#创建两个task,分别为发送和接收
sendtask = asyncio.get_event_loop().create_task(self.send(websocket))
receivetask = asyncio.get_event_loop().create_task(self.receive(websocket))
#异步执行
await sendtask
await receivetask
异步发送数据函数
#异步发送数据
async def send(self,websocket):
while True:
#点击发送图片按钮后,标志位为真
if self.sendflag:
#此时的self.curr_bytedata存储的二进制数据为选择的图片
await websocket.send(self.curr_bytedata)
self.sendflag = False
self.emitdata.emit("发送成功!")
#点击断开连接按钮后
if self.closeflag:
#关闭websocket
await websocket.close()
#显式的停止事件循环
loop = asyncio.get_event_loop()
loop.stop()
#跳出循环,终止协程
break
#挂起1s,切换到其他协程
await asyncio.sleep(1)
异步接收数据函数
async def receive(self,websocket):
self.emitdata.emit(f"客户端连接成功,连接到{websocket.remote_address}")
try:
async for message in websocket:
self.curr_bytedata = message
#字节大小
print(len(message))
self.show_image()
except websockets.ConnectionClosedError:
self.emitdata.emit("客户端意外断开连接,请客户端重连")
解释
(1)websocket回调函数handler
- 由于websocket服务器的回调函数必须是一个异步函数self.handler,因此该函数必须加上async前缀,才可以将其变成一个协程。当服务器检查到有客户端链接过来时就会调用这个回调函数handler。
当客户端连接后,
self.handler
将传入以下两个主要参数
- websocket: 这是一个
websockets.WebSocketServerProtocol
实例,它代表服务器端与客户端之间的WebSocket连接。通过这个对象,您可以发送和接收WebSocket帧。- path:这是一个字符串,表示请求的URL路径。对于WebSocket服务器来说,这个值通常是
/
,但理论上可以是任何值,取决于如何配置websockets.serve
函数。
创建出两个task,并且把他们设置到当前的事件循环中。sendtask = asyncio.get_event_loop().create_task(self.send(websocket)) receivetask = asyncio.get_event_loop().create_task(self.receive(websocket))
await 关键字,后面必须跟上一个可等待的对象,例如task,future等,这里面的send和receive就是task对象,使用await关键字就可以将控制权交给evet_loop事件循环。await sendtask await receivetask
- 注意async def为前缀的函数是一个异步函数,必须把它放到事件循环中才可以运行,如果像以往那样子直接调用函数是不会执行的,而是返回一个coroutine对象。
(2)异步发送数据函数与异步接收数据函数
在网上找到的资料几乎全部都是在服务器受到客户端消息时才向客户端发消息,但是我这个的话,发送图片的这个操作完全是有用户自主决定的,即用户想什么时候发送就什么时候发送,如果只在服务器收到消息才发的话,那也太没意思了。那即要求发送又要求实时接收,首先循环是必要的,但是通信过程中用户并不是时时刻刻 在发送,也不是时时刻刻在接收,因此大多数时间都是在等待的,因此我们需要使用异步休眠的方式在发送的协程和接收的协程之间不断的切换,在await等待的过程中做别的事情,以提高程序的效率。
首先创建了一个死循环,不断的判断用户有没有点击发送按钮,即sendflag有没有变为真,判断结束后, await asyncio.sleep(1),异步休眠一秒,这里休眠的作用是可以暂停该协程一秒,来去切换到其他协程,刚刚说了await可以将控制权交给事件循环,事件循环此时就检查当前还有哪些任务可以执行,发现还有一个receivetask可以执行,因此就利用这一秒钟的时间切换到这个receivetask协程,这也就是为什么服务器连接成功后会在窗口打印“客户端连接成功“, 因为利用了这一秒钟执行了receivetask协程中的self.emitdata.emit(f"客户端连接成功,连接到{websocket.remote_address}")。while True: if self.sendflag: ...... await asyncio.sleep(1)
接着进行try,async for message in websocket将会从websocket中检查有无数据, 注意:这也是一个异步的对象,也使用的async for,也会将控制权交给事件循环,如果此时客户端没有发数据的话,事件循环就会检查当前还有哪些协程可以执行,于是又切会sendtask协程,其实我认为1s之后还是会切换回去。总的来说,这个for循环是只要有数据发来就执行,没数据就等待,这个等待可以切换到别的协程中。try: async for message in websocket: self.curr_bytedata = message print(len(message)) #字节大小 self.show_image() except websockets.ConnectionClosedError: self.emitdata.emit("客户端意外断开连接,请客户端重连")
- 如果此时客户端发来数据时,会将发来的图片的二进制数据,赋值给一个变量,在经过self.show_image()处理显示。下方会有介绍
- except 检查报错。
- 如果用户点击了发送按钮,即self.sendflag 为真,
if self.sendflag: #此时的self.curr_bytedata存储的二进制数据为选择的图片 await websocket.send(self.curr_bytedata) self.sendflag = False self.emitdata.emit("发送成功!")
我们将当前用户选择的图片的二进制格式的数据发送给客户端,使用send方法,发送完成后,该task就结束了,一般很短时间内就发送完成,取决于网络,然后重新将标志位标为假,等待用户下一次点击。
(3) 关闭连接和停止事件循环(bug)
#点击断开连接按钮后 if self.closeflag: #关闭websocket await websocket.close() #显式的停止事件循环 loop = asyncio.get_event_loop() loop.stop() #跳出循环,终止协程 break
- 关闭连接后,关闭websocket连接,此时receivetask中的异步循环由于断开了连接,该任务终止,sendtask在关闭连接之后break跳出了循环,sendtask也终止,显式的stop事件循环,最后在conncect_server函数最后有取消掉了没有关闭的任务,但是显示取消失败,并附带报错,和上文提到的bug是同一个,也就是self.handler无法取消,task.canceled()返回false,我也不知道为什么.希望能看出问题的大佬解答!
图片二进制转换显示到Qt中label控件(涉及到opencv)
show_image()显示图像函数实现
#显示图像
def show_image(self):。
binarydata = np.frombuffer(self.curr_bytedata,np.uint8)
self.image = cv2.imdecode(binarydata,cv2.IMREAD_COLOR)
value = cv2.cvtColor(self.image,cv2.COLOR_BGR2RGB)
height, width, channels = self.image.shape
images = QImage(value.data, width, height, width * channels, QImage.Format_RGB888)
#显示图片
self.myapp.image.setPixmap(QPixmap.fromImage(images).scaled(int(width/self.scale_percent),int(height/self.scale_percent)))
解释
binarydata = np.frombuffer(self.curr_bytedata,np.uint8)
将二进制数据self.curr_bytedata转换为NumPy数组,数据类型为np.uint8。
self.image = cv2.imdecode(binarydata,cv2.IMREAD_COLOR)
将二进制数据解码为图self.image,解码格式为彩色(cv2.IMREAD_COLOR)。
value = cv2.cvtColor(self.image,cv2.COLOR_BGR2RGB)
将图像从BGR格式转换为RGB格式
height, width, channels = self.image.shape
获取图像的高度、宽度和通道数
images = QImage(value.data, width, height, width * channels, QImage.Format_RGB888)
转换成QImage在 ui上显示
self.myapp.image.setPixmap(QPixmap.fromImage(images).scaled(int(width/self.scale_percent),int(height/self.scale_percent)))
显示图片,在label控件上
上一张,下一张功能实现
排除重复图片
根据下面代码,在show_image中添加
# 转换成QImage在 ui上显示
#images = QImage(value.data, width, height, width * channels, QImage.Format_RGB888)
#中间插入下面的
flag = False
#将每次显示的不同的图像加入imagelist列表中,为按钮切换上,下张准备
for k in range(len(self.imagelist)):
if self.curr_bytedata == self.imagelist[k]:
flag = True
if flag == False:
self.imagelist.append(self.curr_bytedata)
self.number = len(self.imagelist)
#图片为2张及以上时使能上一张下一张按钮
if self.number > 1:
self.myapp.up.setDisabled(False)
self.myapp.down.setDisabled(False)
#中间插入上面的
#显示图片
#self.myapp.image.setPixmap(QPixmap.fromImage(images).scaled(int(width/self.scale_percent),int(height/self.scale_percent)))
解释
首先flag初始为假,这个for循环是指在存储图片二进制数据的imagelist列表中遍历当前的图片历表中是否有重复的,有的话flag为真。for k in range(len(self.imagelist)): if self.curr_bytedata == self.imagelist[k]: flag = True
当当前的图片数据没有和之前的重复时,就往该列表imagelist中追加新的数据,self.number为这个列表的长度。if flag == False: self.imagelist.append(self.curr_bytedata) self.number = len(self.imagelist)
图片为2张及以上时使能上一张下一张按钮if self.number > 1: self.myapp.up.setDisabled(False) self.myapp.down.setDisabled(False)
上,下一张按钮实现
上一张
#上一张
def uppic(self):
self.number -= 1
if self.number < 1:
self.number = len(self.imagelist)
self.curr_bytedata = self.imagelist[self.number-1]
self.show_image()
else:
self.curr_bytedata = self.imagelist[self.number-1]
self.show_image()
下一张
#下一张
def downpic(self):
self.number += 1
if self.number > len(self.imagelist):
self.number = 1
self.curr_bytedata = self.imagelist[self.number-1]
self.show_image()
else:
self.curr_bytedata = self.imagelist[self.number-1]
self.show_image()
解释
本质上是改变self.number(上面有提到)的值来对应到imagelist图片列表当中的索引,
达到最大值,或最小值时切换到列表的最小值,最大值。
整体代码结构
最后
这篇文章是我初次接触websocket和异步async写的一个小项目,可能有理解不到位的地方.
如果上述有误,请各位大佬及时批评指正,小弟感激不尽。