需求:在Python-主线程控制子线程-3的基础上,新增使用UDP接收指令功能,代替从键盘输入指令
# 修改后的程序,主线程可以获取子线程的结果
import threading
import time
import queue
import traceback
from loguru import logger
import socket
class WorkerThread(threading.Thread):
def __init__(self, result_queue):
super().__init__()
self.stop_event = threading.Event()
self.result_queue = result_queue
static_variable = 0
def run(self):
thread_id = threading.get_ident()
print(f"Worker thread {thread_id} has started.")
while not self.stop_event.is_set():
print(f"Worker thread {thread_id} is running.")
result = self.do_work()
self.result_queue.put(result)
time.sleep(1)
print(f"Worker thread {thread_id} has stopped.")
def increment_static_variable(cls):
cls.static_variable += 1
def do_work(self): # Simulate some work
# self.increment_static_variable()
self.static_variable += 1
return self.static_variable
def stop(self):
self.stop_event.set()
self.join() # 等待子线程结束
class KeyListener:
def __init__(self, result_queue_listener):
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.address = ("127.0.0.1", 12580)
self.server_socket.bind(self.address)
self.stop_event = threading.Event()
self.result_queue_listener = result_queue_listener
def udp_recv(self):
while not self.stop_event.is_set():
try:
receive_dat, client_address = self.server_socket.recvfrom(1024)
command = receive_dat.decode()
print(command)
self.result_queue_listener.put(command)
except socket.error:
# This will be triggered when the socket is closed
pass
def start(self):
keyboard_thread = threading.Thread(target=self.udp_recv) # 创建键盘输入监听线程
keyboard_thread.start()
logger.info(f"Keyboard monitor thread {keyboard_thread.ident} has started~")
def stop(self):
self.stop_event.set()
self.server_socket.close()
def end_child_thread():
try:
if worker_thread.is_alive():
worker_thread.stop()
logger.info(f"Stopping worker thread {worker_thread.ident}...")
if worker_thread.is_alive():
logger.info(f"Worker thread {worker_thread.ident} is still active")
else:
logger.info(f"Worker thread {worker_thread.ident} doesn't exist ..")
except Exception as e:
logger.info("中止线程失败:" + e)
if __name__ == '__main__':
result_queue = queue.Queue()
result_queue_listener = queue.Queue()
main_thread_id = threading.main_thread().ident
logger.info(f"Main thread {main_thread_id} has started~")
worker_thread = WorkerThread(result_queue)
worker_thread.start()
logger.info(f"worker thread {worker_thread.ident} has started~")
key_listener = KeyListener(result_queue_listener)
key_listener.start()
exit_program = False
try:
while not exit_program:
if not result_queue_listener.empty():
command = result_queue_listener.get()
if command == '0':
logger.info(f"工作线程状态{worker_thread}")
end_child_thread()
logger.info(f"工作线程状态{worker_thread}")
elif command == '1':
try:
if worker_thread.is_alive():
pass
else:
worker_thread = WorkerThread(result_queue)
worker_thread.start()
logger.info(f"Started new worker thread {worker_thread.ident}")
except Exception as e:
logger.info("新建工作线程失败:" + e)
elif command == '2':
end_child_thread()
key_listener.stop()
print(f"停止工作线程{worker_thread}")
print(f"停止监听线程{key_listener}")
exit_program = True
break
else:
pass
time.sleep(0.1)
if not result_queue.empty():
result = result_queue.get()
print("主线程获取的子线程结果:", result)
else:
pass
except Exception as e:
print("Exception caught:", e)
traceback.print_exc() # 打印详细的异常堆栈信息
end_child_thread()
key_listener.stop()
print(f"except工作线程{worker_thread}")
print(f"except监听线程{key_listener}")
# 主线程退出
logger.info(f"Main thread {main_thread_id} is exiting~")
使用网络调试助手向程序发送指令,运行效果如下: