1. 环境
python版本:3.11.9
2. 完整代码
import sqlite3
import time
import wave # 使用wave库可读、写wav类型的音频文件
from funasr import AutoModel
import sounddevice as sd
import numpy as np
from modelscope import pipeline, Tasks
from pypinyin import lazy_pinyin
import pyaudio # 使用pyaudio库可以进行录音,播放,生成wav文件
# 模型参数设置
chunk_size = [0, 10, 5]
encoder_chunk_look_back = 7
decoder_chunk_look_back = 5
is_task_running= True
model = AutoModel(model="D:\SpeechRecognize\speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch")
# 假设模型要求的采样率为 16000
fs = 16000
duration = 3 #时间
chunk_stride = chunk_size[1] * 960
cache = {}
window_size = 3
# 连接到 SQLite 数据库,如果不存在则会创建新的数据库文件
conn = sqlite3.connect('speech_recognition.db')
cursor = conn.cursor()
# 创建表格
cursor.execute('''
CREATE TABLE IF NOT EXISTS speech_data
(text TEXT, time_stamp TEXT, batch TEXT)
''')
def record(time): # 录音程序
# 定义数据流块
CHUNK = 1024 # 音频帧率(也就是每次读取的数据是多少,默认1024)
FORMAT = pyaudio.paInt16 # 采样时生成wav文件正常格式
CHANNELS = 1 # 音轨数(每条音轨定义了该条音轨的属性,如音轨的音色、音色库、通道数、输入/输出端口、音量等。可以多个音轨,不唯一)
RATE = 16000 # 采样率(即每秒采样多少数据)
RECORD_SECONDS = time # 录音时间
WAVE_OUTPUT_FILENAME = "./output.wav" # 保存音频路径
p = pyaudio.PyAudio() # 创建PyAudio对象
stream = p.open(format=FORMAT, # 采样生成wav文件的正常格式
channels=CHANNELS, # 音轨数
rate=RATE, # 采样率
input=True, # Ture代表这是一条输入流,False代表这不是输入流
frames_per_buffer=CHUNK) # 每个缓冲多少帧
print("* 开始录音") # 开始录音标志
frames = [] # 定义frames为一个空列表
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): # 计算要读多少次,每秒的采样率/每次读多少数据*录音时间=需要读多少次
data = stream.read(CHUNK) # 每次读chunk个数据
frames.append(data) # 将读出的数据保存到列表中
print("* 结束语音") # 结束录音标志
stream.stop_stream() # 停止输入流
stream.close() # 关闭输入流
p.terminate() # 终止pyaudio
wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb') # 以'wb‘二进制流写的方式打开一个文件
wf.setnchannels(CHANNELS) # 设置音轨数
wf.setsampwidth(p.get_sample_size(FORMAT)) # 设置采样点数据的格式,和FOMART保持一致
wf.setframerate(RATE) # 设置采样率与RATE要一致
wf.writeframes(b''.join(frames)) # 将声音数据写入文件
wf.close() # 数据流保存完,关闭文件
while is_task_running:
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
myrecording = sd.rec(int(fs * duration), samplerate=fs, channels=1)
sd.wait()
speech_chunk = myrecording.flatten()
# 噪声处理
filtered_chunk = np.convolve(speech_chunk, np.ones(window_size) / window_size, mode='same')
speech_chunk = filtered_chunk
is_final = False
res = model.generate(input=speech_chunk, cache=cache, is_final=is_final, chunk_size=chunk_size,
encoder_chunk_look_back=encoder_chunk_look_back,
decoder_chunk_look_back=decoder_chunk_look_back)
text_result=''.join(lazy_pinyin(str(res[0]['text']))).replace(" ", "")
# 唤醒词
s1=''.join(lazy_pinyin(str("小爱")))
if s1 in text_result:
#关闭循环
is_task_running ==False
print("已唤醒,开始录音")
record(5) # 定义录音时间,单位/s
inference_pipeline = pipeline(
task=Tasks.auto_speech_recognition,
model='D:/SpeechRecognize/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch',
model_revision="v2.0.4")
rec_result = inference_pipeline('./output.wav', hotword='')
same = ''.join(lazy_pinyin(rec_result[0]["text"].replace(" ", "")))
print("语音转文字" + same)
#匹配字符关键词
#关键词1 、、、、
g1 = ''.join(lazy_pinyin(str("打开空调")))
if g1 in same:
#通讯发送消息,我会提供五种硬件通讯方式 MTTT、Socket、ModBusTcpIP、串口、HTTP请求
print("发送给设备")
is_task_running == True
cursor.execute("INSERT INTO speech_data (text, time_stamp, batch) VALUES (?,?,?)",
(text_result, start_time, 'eerr'))
conn.commit()
3. 硬件通讯
很多人搞不懂,如何用软件控制硬件,但是实际上没大家想的那么复杂,一般的硬件都会提供接口,只要找到厂家要他的通讯方式和通讯内容,就可以实现用软件控制硬件
3.1ModbusTCPIP
比较通用的工业通讯协议,读写PLC数据
from pymodbus.client import ModbusTcpClient
def read_data(ip, port, postion):
# 创建 Modbus TCP 客户端并连接
client = ModbusTcpClient(ip, port=port) # 请替换为实际的设备 IP 和端口
client.connect()
try:
# 读取保持寄存器
num = 0
result = client.read_holding_registers(postion, 1)
for value in result.registers:
print(value)
num = value
except Exception as e:
print("Exception:", e)
finally:
# 关闭连接
client.close()
return str(num)
def send_data(ip, port, postion, num):
client = ModbusTcpClient(ip, port=port)
client.connect()
try:
# 发送数据到保持寄存器
write_result = client.write_registers(postion, [num]) # 从地址 20 开始写入数据
if not write_result.isError():
print("Write Success")
else:
print("Write Error:", write_result)
except Exception as e:
print("Exception:", e)
finally:
client.close()
return 2
3.2MQTT
最近比较流行的工业协议
读取消息
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("your_topic")
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("broker_ip_address", 1883, 60)
client.loop_forever()
写入消息
import paho.mqtt.publish as publish
publish.single("your_topic", "your_message", hostname="broker_ip_address", port=1883)
3.3 Socket TCPIP
def socketddd(ip, port, code):
# 要发送的内容
content = str(code)+'\a\r\n'
# 创建套接字
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 连接服务器
server_address = (str(ip), int(port))
s.connect(server_address)
# 发送数据
s.sendall(content.encode())
# 关闭连接
s.close()
3.4 串口
比较基础的串口
def task1():
serials = serial.Serial('COM5', 9600, timeout=0.5)
while is_task_running1:
if serials.isOpen():
print("open success")
send_data_hex = bytes.fromhex('5A 06 00 00 60\r\n')
serials.write(send_data_hex) # 编码
else:
print("open failed")
time.sleep(1) # 每隔 5 秒执行一次
4. 效果
5.问题
1.必须有麦克风才能跑起来
2.关于模型包,可以直接从模型社区下载(也可私信我)
3.最后的效果与你电脑的显卡有直接联系
4.关于唤醒后定时录音,实际上可以通过计算音频波形判断是否有音频输入,一般2秒没有音频就默认结束录音
5.关于通讯,实际上本质就是报文,在局域网和公网只要连接上即可