需求:主线程创建工作子线程和键盘输入监听子线程。
当用户输入 '0' 后, 工作子线程会收到停止信号并退出,此时键盘输入监听线程仍然运行;
当用户输入 '1' 后,会建立新的工作子线程;
当用户输入 '2' 后,整个程序退出。
import threading
import time
from loguru import logger
exit = False
class WorkerThread(threading.Thread):
def __init__(self):
super().__init__()
self.stop_event = threading.Event()
def run(self):
thread_id = threading.get_ident()
print(f"Worker thread {thread_id} has started.")
while not self.stop_event.is_set():
print(f"Worker thread {thread_id} is running.")
time.sleep(1)
print(f"Worker thread {thread_id} has stopped.")
def stop(self):
self.stop_event.set()
self.join() # 等待子线程结束
def end_child_thread():
try:
if worker_thread.is_alive():
worker_thread.stop()
logger.info(f"Stopping worker thread {worker_thread.ident}...")
if worker_thread.is_alive():
logger.info(f"Worker thread {worker_thread.ident} is still active")
else:
logger.info(f"Worker thread {worker_thread.ident} doesn't exist ..")
except Exception as e:
logger.info("中止线程失败:" + e)
def key_listener():
keyboard_thread_id = threading.get_ident() # 获取键盘监听线程的ID
global worker_thread # 声明为全局变量
global exit
print("""
Press '0' to stop worker thread,
'1' to start a new worker thread,
'2' to end the program.
...""")
while True:
print(f"Keyboard listener thread {keyboard_thread_id} has started.")
command = input("请输入指令~")
if command == '0':
logger.info(f"工作线程状态{worker_thread}")
end_child_thread()
logger.info(f"工作线程状态{worker_thread}")
elif command == '1':
try:
if worker_thread.is_alive():
pass
else:
worker_thread = WorkerThread()
worker_thread.start()
logger.info(f"Started new worker thread {worker_thread.ident}")
except Exception as e:
logger.info("新建工作线程失败:" + e)
elif command == '2':
end_child_thread()
exit = True
break
else:
pass
time.sleep(0.1)
if __name__ == '__main__':
main_thread_id = threading.main_thread().ident
logger.info(f"Main thread {main_thread_id} has started~")
worker_thread = WorkerThread()
keyboard_thread = threading.Thread(target=key_listener) # 创建键盘输入监听线程
keyboard_thread.start() # 启动键盘输入监听线程
worker_thread.start()
logger.info(f"Keyboard monitor thread {keyboard_thread.ident} has started~")
logger.info(f"worker thread {worker_thread.ident} has started~")
try:
while not exit:
pass
except:
print("手动停止程序...")
worker_thread.stop() # 在主线程退出时停止子线程
worker_thread.join() # 等待子线程结束
print('exit')
# 主线程退出
logger.info(f"Main thread {main_thread_id} is exiting~")
运行效果: