1、进入自己的项目
复制APPID、APISecret、APIKey
2、添加好听发音人
复制参数
3、需要替换代码部分:
换自己喜欢的发声人的参数
4、完整代码:
import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket
import os
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Ws_Param(object):
def __init__(self, APPID, APIKey, APISecret, gpt_url):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.host = urlparse(gpt_url).netloc
self.path = urlparse(gpt_url).path
self.gpt_url = gpt_url
def create_url(self):
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
signature_origin = "host: " + self.host + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + self.path + " HTTP/1.1"
signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()
signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
v = {
"authorization": authorization,
"date": date,
"host": self.host
}
url = self.gpt_url + '?' + urlencode(v)
return url
def on_error(ws, error):
logging.error(f"### error: {error}")
def on_close(ws, close_status_code, close_msg):
logging.info("### closed ###")
def on_open(ws):
try:
thread.start_new_thread(run, (ws,))
except Exception as e:
logging.error(f"启动新线程失败: {e}")
def on_message(ws, message):
try:
message = json.loads(message)
code = message['header']['code']
if code!= 0:
logging.error(f"### 请求出错: {message}")
else:
payload = message.get("payload")
status = message['header']['status']
if status == 2:
logging.info("### 合成完毕")
ws.close()
if payload and payload!= "null":
audio = payload.get("audio")
if audio:
audio = audio["audio"]
with open(ws.save_file_name, 'ab') as f:
f.write(base64.b64decode(audio))
except Exception as e:
logging.error(f"处理消息时出错: {e}")
def run(ws, *args):
body = {
"header": {
"app_id": ws.appid,
"status": 0
},
"parameter": {
"oral": {
"spark_assist": 1,
"oral_level": "mid"
},
"tts": {
"vcn": ws.vcn,
"speed": 50,
"volume": 50,
"pitch": 50,
"bgs": 0,
"reg": 0,
"rdn": 0,
"rhy": 0,
"scn": 5,
"version": 0,
"L5SilLen": 0,
"ParagraphSilLen": 0,
"audio": {
"encoding": "lame",
"sample_rate": 16000,
"channels": 1,
"bit_depth": 16,
"frame_size": 0
},
"pybuf": {
"encoding": "utf8",
"compress": "raw",
"format": "plain"
}
}
},
"payload": {
"text": {
"encoding": "utf8",
"compress": "raw",
"format": "json",
"status": 0,
"seq": 0,
"text": str(base64.b64encode(ws.text.encode('utf-8')), "UTF8")
}
}
}
try:
ws.send(json.dumps(body))
except Exception as e:
logging.error(f"发送消息时出错: {e}")
def main(appid, api_secret, api_key, url, text, vcn, save_file_name):
wsParam = Ws_Param(appid, api_key, api_secret, url)
wsUrl = wsParam.create_url()
ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
websocket.enableTrace(False)
ws.appid = appid
ws.text = text
ws.vcn = vcn
ws.save_file_name = save_file_name
if os.path.exists(ws.save_file_name):
os.remove(ws.save_file_name)
try:
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_REQUIRED}) # 启用证书验证
except Exception as e:
logging.error(f"运行WebSocket连接时出错: {e}")
if __name__ == "__main__":
main(
appid="###",
api_secret="###",
api_key="###",
url="wss://cbm01.cn-huabei-1.xf-yun.com/v1/private/mcd9m97e6",
text="我喜欢你!",
vcn="x4_lingyuyan_oral",
save_file_name="./test.mp3"
)