1. 背景介绍
在多线程环境下使用 PyAudio 可能会导致段错误(Segmentation Fault)或其他不可预期的行为。这是因为 PyAudio 在多线程环境下可能会出现资源冲突或线程安全问题。
PyAudio 是一个用于音频输入输出的 Python 库,它依赖于 PortAudio 库。在多线程环境下,如果多个线程同时创建和销毁 PyAudio
实例,可能会导致资源冲突,从而引发段错误。
在多线程环境下,尽量避免在每个线程中创建和销毁 PyAudio
实例。相反,应该在主线程中创建一个共享的 PyAudio
实例,并在子线程中使用它。
import pyaudio
import threading
# 在主线程中创建共享的 PyAudio 实例
p = pyaudio.PyAudio()
def audio_thread_function():
# 在子线程中使用共享的 PyAudio 实例
stream = p.open(format=pyaudio.paInt16,
channels=1,
rate=44100,
input=True,
frames_per_buffer=1024)
# 音频处理逻辑
stream.stop_stream()
stream.close()
# 创建并启动多个音频线程
threads = []
for _ in range(4):
thread = threading.Thread(target=audio_thread_function)
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
# 在主线程中释放 PyAudio 资源
p.terminate()
2. 实际问题和解决方案
2.1 问题代码
import time
import pyaudio
import numpy as np
import librosa
import soundfile as sf
import threading
import os
# 查找设备索引
def find_device_index(device_name, is_input=True):
p = pyaudio.PyAudio()
device_count = p.get_device_count()
for i in range(device_count):
device_info = p.get_device_info_by_index(i)
if device_name in device_info['name']:
if is_input and device_info['maxInputChannels'] > 0:
p.terminate()
return i
elif not is_input and device_info['maxOutputChannels'] > 0:
p.terminate()
return i
p.terminate()
raise ValueError(f"Device '{device_name}' not found or not a {'input' if is_input else 'output'} device.")
# 播放音频文件
def play_audio(file_path, device_index, start_event):
start_event.wait() # 等待事件被设置
audio_data, sr = librosa.load(file_path, sr=None)
# 创建 PyAudio 实例
p = pyaudio.PyAudio()
# 打开流
stream = p.open(format=pyaudio.paFloat32,
channels=1,
rate=sr,
output=True,
output_device_index=device_index)
# 播放音频
stream.write(audio_data.astype(np.float32).tobytes())
# 关闭流
stream.stop_stream()
stream.close()
p.terminate()
# 录音
def record_audio(device_index, output_file, start_event, stop_event):
start_event.wait() # 等待事件被设置
p = pyaudio.PyAudio()
# 打开输入流
stream = p.open(format=pyaudio.paFloat32,
channels=1,
rate=44100,
input=True,
input_device_index=device_index,
frames_per_buffer=1024)
print(f"Recording to {output_file}...")
frames = []
while not stop_event.is_set(): # 检查停止事件
data = stream.read(1024)
frames.append(data)
print("Recording finished.")
# 关闭流
stream.stop_stream()
stream.close()
p.terminate()
# 保存录音
audio_data = b''.join(frames)
audio_array = np.frombuffer(audio_data, dtype=np.float32)
sf.write(output_file, audio_array, 44100)
# 主程序
if __name__ == "__main__":
# 根据设备名称查找设备索引
CUBE_4NANO_DYNA_INDEX = find_device_index("Cube 4Nano Dyna", is_input=False)
SOUNDMATRIX_A10_OUTPUT_INDEX = find_device_index("SoundMatrix A10", is_input=False)
SOUNDMATRIX_A10_INPUT_INDEX = find_device_index("SoundMatrix A10", is_input=True)
# 获取音频文件列表
audio_file1_folder = './Soundplay'
audio_file2_folder = './A10play'
audio_file1_list = [f for f in os.listdir(audio_file1_folder) if f.endswith('.wav')]
audio_file2_list = [f for f in os.listdir(audio_file2_folder) if f.endswith('.wav')]
# 创建事件对象
start_event = threading.Event()
stop_event = threading.Event()
# 遍历每一对组合
for audio_file1 in audio_file1_list:
for audio_file2 in audio_file2_list:
file_path1 = os.path.join(audio_file1_folder, audio_file1)
file_path2 = os.path.join(audio_file2_folder, audio_file2)
print(file_path1, file_path2)
# 创建线程
play_thread1 = threading.Thread(target=play_audio, args=(file_path1, CUBE_4NANO_DYNA_INDEX, start_event))
play_thread2 = threading.Thread(target=play_audio,
args=(file_path2, SOUNDMATRIX_A10_OUTPUT_INDEX, start_event))
output_file_name = f"{os.path.splitext(audio_file1)[0]}_{os.path.splitext(audio_file2)[0]}_soundmatrix.wav"
output_file_path = os.path.join('./A10rec', output_file_name)
record_thread = threading.Thread(target=record_audio, args=(
SOUNDMATRIX_A10_INPUT_INDEX, output_file_path, start_event, stop_event))
# 启动录音和播放线程
record_thread.start() # 启动录音线程
play_thread1.start() # 播放第一个音频
play_thread2.start() # 播放第二个音频
# 设置事件,开始播放和录音
start_event.set()
# 等待播放线程完成
play_thread1.join()
play_thread2.join()
# 设置停止事件,结束录音
stop_event.set()
record_thread.join()
# 重置事件以便下次使用
start_event.clear()
stop_event.clear()
print("All tasks completed.")
报错:
./Soundplay/cafeteria_SNR0_副本3.wav ./A10play/SER0.wav
Recording to ./A10rec/cafeteria_SNR0_副本3_SER0_soundmatrix.wav...
Recording finished.
./Soundplay/cafeteria_SNR0_副本2.wav ./A10play/SER0.wav
Process finished with exit code 139 (interrupted by signal 11:SIGSEGV)
2.2 解决方案:共享 PyAudio
实例
import time
import pyaudio
import numpy as np
import librosa
import soundfile as sf
import threading
import os
# 查找设备索引
def find_device_index(device_name, is_input=True, p=None):
if p is None:
p = pyaudio.PyAudio()
need_terminate = True
else:
need_terminate = False
device_count = p.get_device_count()
for i in range(device_count):
device_info = p.get_device_info_by_index(i)
if device_name in device_info['name']:
if is_input and device_info['maxInputChannels'] > 0:
if need_terminate:
p.terminate()
return i
elif not is_input and device_info['maxOutputChannels'] > 0:
if need_terminate:
p.terminate()
return i
if need_terminate:
p.terminate()
raise ValueError(f"Device '{device_name}' not found or not a {'input' if is_input else 'output'} device.")
# 播放音频文件
def play_audio(file_path, device_index, start_event, p):
start_event.wait() # 等待事件被设置
audio_data, sr = librosa.load(file_path, sr=None)
# 打开流
stream = p.open(format=pyaudio.paFloat32,
channels=1,
rate=sr,
output=True,
output_device_index=device_index)
# 播放音频
stream.write(audio_data.astype(np.float32).tobytes())
# 关闭流
stream.stop_stream()
stream.close()
# 录音
def record_audio(device_index, output_file, start_event, stop_event, p):
start_event.wait() # 等待事件被设置
buffer_size = 2048
# 打开输入流
stream = p.open(format=pyaudio.paFloat32,
channels=1,
rate=44100,
input=True,
input_device_index=device_index,
frames_per_buffer=buffer_size)
print(f"Recording to {output_file}...")
frames = []
while not stop_event.is_set(): # 检查停止事件
data = stream.read(buffer_size)
frames.append(data)
print("Recording finished.")
# 关闭流
stream.stop_stream()
stream.close()
# 保存录音
audio_data = b''.join(frames)
audio_array = np.frombuffer(audio_data, dtype=np.float32)
sf.write(output_file, audio_array, 44100)
# 主程序
if __name__ == "__main__":
# 创建共享的 PyAudio 实例
p = pyaudio.PyAudio()
# 根据设备名称查找设备索引
CUBE_4NANO_DYNA_INDEX = find_device_index("Cube 4Nano Dyna", is_input=False, p=p)
SOUNDMATRIX_A10_OUTPUT_INDEX = find_device_index("SoundMatrix A10", is_input=False, p=p)
SOUNDMATRIX_A10_INPUT_INDEX = find_device_index("SoundMatrix A10", is_input=True, p=p)
# 获取音频文件列表
audio_file1_folder = './Soundplay'
audio_file2_folder = './A10play'
audio_file1_list = [f for f in os.listdir(audio_file1_folder) if f.endswith('.wav')]
audio_file2_list = [f for f in os.listdir(audio_file2_folder) if f.endswith('.wav')]
# 创建事件对象
start_event = threading.Event()
stop_event = threading.Event()
# 遍历每一对组合
for audio_file1 in audio_file1_list:
for audio_file2 in audio_file2_list:
file_path1 = os.path.join(audio_file1_folder, audio_file1)
file_path2 = os.path.join(audio_file2_folder, audio_file2)
print(file_path1, file_path2)
# 创建线程
play_thread1 = threading.Thread(target=play_audio, args=(file_path1, CUBE_4NANO_DYNA_INDEX, start_event, p))
play_thread2 = threading.Thread(target=play_audio,
args=(file_path2, SOUNDMATRIX_A10_OUTPUT_INDEX, start_event, p))
output_file_name = f"{os.path.splitext(audio_file1)[0]}_{os.path.splitext(audio_file2)[0]}_soundmatrix.wav"
output_file_path = os.path.join('./A10rec', output_file_name)
record_thread = threading.Thread(target=record_audio, args=(
SOUNDMATRIX_A10_INPUT_INDEX, output_file_path, start_event, stop_event, p))
# 启动录音和播放线程
record_thread.start() # 启动录音线程
play_thread1.start() # 播放第一个音频
play_thread2.start() # 播放第二个音频
# 设置事件,开始播放和录音
start_event.set()
# 等待播放线程完成
play_thread1.join()
play_thread2.join()
# 设置停止事件,结束录音
stop_event.set()
record_thread.join()
# 重置事件以便下次使用
start_event.clear()
stop_event.clear()
time.sleep(3)
# 释放 PyAudio 资源
p.terminate()