翻译能力:
- 使用讯飞的AI翻译能力:机器翻译 niutrans - 语音扩展 - 讯飞开放平台
- API: 机器翻译niutrans API 文档 | 讯飞开放平台文档中心
执行效果:
原文档:
翻译还原的文档:
源码如下:
import datetime
import hashlib
import base64
import hmac
import json
import requests
import threading
from threading import Thread
from queue import Queue, Full, Empty
from docx import Document
from concurrent.futures import ThreadPoolExecutor# 全局配置
APPId = "xxx"
APISecret = "xxxx"
APIKey = "xxxx"
HOST = "ntrans.xfyun.cn"
# 线程池
class ThreadPool:
def __init__(self, size: int, max_workers: int):
self.max_workers = max_workers # 最大可运行线程数
self._has_th = [] # 已启动线程数
self._pending = Queue(maxsize=max_workers) # 阻塞线程数
self._task = Queue(maxsize=size) # 任务等待队列
self._close = False # 线程池是否关闭def worker(self, *args):
"""
任务的执行者
:param args: 运行任务与任务参数
:return:
"""
th = threading.current_thread()
self._has_th.append(th)
# 执行创建线程时分配的任务
func, *args = args
func(*args)
# 线程阻塞,进入等待任务队列
while True:
self._pending.put(1)
try:
# 获取任务
task, *arg = self._task.get(timeout=1)
except Empty:
self._pending.get()
return
# 从等待队列取出并执行任务
self._pending.get()
task(arg)def submit(self, func, args):
# 线程池关闭
if self._close:
raise RuntimeError("thread closed")
# 当无空闲线程且当前线程数小于最大可运行线程数,创建新的线程。
elif self._pending.empty() and len(self._has_th) < self.max_workers:
th = Thread(target=self.worker, args=(func, args))
# th.setDaemon(True) # 主进程结束是否结束子进程 默认False
th.start()
else:
try:
# 当所有线程都在运行,且线程数达到最大值,任务进入等待队列
self._task.put((func, args), block=True, timeout=25)
except Full:
# 等待进入队列超时
raise TimeoutError("任务提交超时")def close(self):
self._close = True
# 鉴权
class get_result(object):
def __init__(self, host):
# 应用ID(到控制台获取)
self.APPID = APPId
# 接口APISercet(到控制台机器翻译服务页面获取)
self.Secret = APISecret
# 接口APIKey(到控制台机器翻译服务页面获取)
self.APIKey = APIKey# 以下为POST请求
self.Host = host
self.RequestUri = "/v2/ots"
# 设置url
# print(host)
self.url = "https://" + host + self.RequestUri
self.HttpMethod = "POST"
self.Algorithm = "hmac-sha256"
self.HttpProto = "HTTP/1.1"# 设置当前时间
curTime_utc = datetime.datetime.utcnow()
self.Date = self.httpdate(curTime_utc)
# 设置业务参数
# 语种列表参数值请参照接口文档:https://www.xfyun.cn/doc/nlp/niutrans/API.html
self.BusinessArgs = {
"from": "en",
"to": "cn",
}def hashlib_256(self, res):
m = hashlib.sha256(bytes(res.encode(encoding='utf-8'))).digest()
result = "SHA-256=" + base64.b64encode(m).decode(encoding='utf-8')
return resultdef httpdate(self, dt):
"""
Return a string representation of a date according to RFC 1123
(HTTP/1.1).The supplied date must be in UTC.
"""
weekday = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()]
month = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep",
"Oct", "Nov", "Dec"][dt.month - 1]
return "%s, %02d %s %04d %02d:%02d:%02d GMT" % (weekday, dt.day, month,
dt.year, dt.hour, dt.minute, dt.second)def generateSignature(self, digest):
signatureStr = "host: " + self.Host + "\n"
signatureStr += "date: " + self.Date + "\n"
signatureStr += self.HttpMethod + " " + self.RequestUri \
+ " " + self.HttpProto + "\n"
signatureStr += "digest: " + digest
signature = hmac.new(bytes(self.Secret.encode(encoding='utf-8')),
bytes(signatureStr.encode(encoding='utf-8')),
digestmod=hashlib.sha256).digest()
result = base64.b64encode(signature)
return result.decode(encoding='utf-8')def init_header(self, data):
digest = self.hashlib_256(data)
# print(digest)
sign = self.generateSignature(digest)
authHeader = 'api_key="%s", algorithm="%s", ' \
'headers="host date request-line digest", ' \
'signature="%s"' \
% (self.APIKey, self.Algorithm, sign)
# print(authHeader)
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Method": "POST",
"Host": self.Host,
"Date": self.Date,
"Digest": digest,
"Authorization": authHeader
}
return headersdef get_body(self, text):
content = str(base64.b64encode(text.encode('utf-8')), 'utf-8')
postdata = {
"common": {"app_id": self.APPID},
"business": self.BusinessArgs,
"data": {
"text": content,
}
}
body = json.dumps(postdata)
# print(body)
return bodydef call_url(self, text):
if self.APPID == '' or self.APIKey == '' or self.Secret == '':
print('Appid 或APIKey 或APISecret 为空!请打开demo代码,填写相关信息。')
else:
code = 0
body = self.get_body(text)
headers = self.init_header(body)
# print(self.url)
response = requests.post(self.url, data=body, headers=headers, timeout=8)
status_code = response.status_code
# print(response.content)
if status_code != 200:
# 鉴权失败
print("Http请求失败,状态码:" + str(status_code) + ",错误信息:" + response.text)
print("请根据错误信息检查代码,接口文档:https://www.xfyun.cn/doc/nlp/niutrans/API.html")
return None
else:
# 鉴权成功
respData = json.loads(response.text)
# print(respData)
# 以下仅用于调试
code = str(respData["code"])
if code != '0':
print("请前往https://www.xfyun.cn/document/error-code?code=" + code + "查询解决办法")
else:
return respData["data"]["result"]["trans_result"]["dst"]return None
def ai_translate(text):
if text.isspace():
print("the data is null")
return
ret = get_result(HOST).call_url(text)
print(ret)
return ret
def split_form(parm_form):
print("split_form: ")
try:
for row in parm_form.rows: # 从表格第一行开始循环读取表格数据
for cell in row.cells: # 遍历每一个单元格
for j in range(len(row.cells)):
if ("\n" != row.cells[j].text) & ("\r" != row.cells[j].text) & (0 != len(row.cells[j].text)) & \
("\r\n" != row.cells[j].text) & (not row.cells[j].text.isspace()):
row.cells[j].text = ai_translate(row.cells[j].text)
print(row.cells[j].text)
except AttributeError as e:
print("parm has no rows")
def split_paragraph_runs(parm_paragraph):
print("split_paragraph_runs: ")
try:
runs = parm_paragraph.runs
for i, run in enumerate(runs):
if run.text:
if ("\n" != run.text) & ("\r" != run.text) & (0 != len(run.text)) & ("\r\n" != run.text) & (
not run.text.isspace()):
run.text = ai_translate(run.text)
except AttributeError as e:
print("parm has no text")
if __name__ == '__main__':
thread_pool = ThreadPoolExecutor(max_workers=6)
start_time = datetime.datetime.now()
doc = Document('../source/src1.docx') # exist.docx为文件名
# 表格内容翻译
tables = doc.tables # 获取文件中的表格集
print("表格数量:", len(tables)) # 获取文件中的表格数量
for table in tables: # 遍历每一个表格
thread_pool.submit(split_form, table)# 文本内容翻译
for paragraph in doc.paragraphs:
if ("\n" != paragraph.text) & ("\r" != paragraph.text) & (0 != len(paragraph.text)) & (
"\r\n" != paragraph.text) & (not paragraph.text.isspace()):
thread_pool.submit(split_paragraph_runs, paragraph)
thread_pool.shutdown(wait=True)
doc.save('../source/dst.docx')end_time = datetime.datetime.now()
print("cost total time:", (end_time - start_time).seconds)