|
|
|
|
|
import os
|
|
import datetime
|
|
import re
|
|
import time
|
|
import traceback
|
|
import math
|
|
from urllib.parse import urlparse
|
|
from urllib3 import encode_multipart_formdata
|
|
from wsgiref.handlers import format_date_time
|
|
from time import mktime
|
|
import hashlib
|
|
import base64
|
|
import hmac
|
|
from urllib.parse import urlencode
|
|
import json
|
|
import requests
|
|
import azure.cognitiveservices.speech as speechsdk
|
|
|
|
|
|
LFASR_HOST = "http://upload-ost-api.xfyun.cn/file"
|
|
API_INIT = "/mpupload/init"
|
|
API_UPLOAD = "/upload"
|
|
API_CUT = "/mpupload/upload"
|
|
API_CUT_COMPLETE = "/mpupload/complete"
|
|
API_CUT_CANCEL = "/mpupload/cancel"
|
|
FILE_PIECE_SIZE = 5242880
|
|
PRO_CREATE_URI = "/v2/ost/pro_create"
|
|
QUERY_URI = "/v2/ost/query"
|
|
|
|
|
|
|
|
class FileUploader:
|
|
def __init__(self, app_id, api_key, api_secret, upload_file_path):
|
|
self.app_id = app_id
|
|
self.api_key = api_key
|
|
self.api_secret = api_secret
|
|
self.upload_file_path = upload_file_path
|
|
|
|
def get_request_id(self):
|
|
"""生成请求ID"""
|
|
return time.strftime("%Y%m%d%H%M")
|
|
|
|
def hashlib_256(self, data):
|
|
"""计算 SHA256 哈希"""
|
|
m = hashlib.sha256(bytes(data.encode(encoding="utf-8"))).digest()
|
|
digest = "SHA-256=" + base64.b64encode(m).decode(encoding="utf-8")
|
|
return digest
|
|
|
|
def assemble_auth_header(self, request_url, file_data_type, method="", body=""):
|
|
"""组装鉴权头部"""
|
|
u = urlparse(request_url)
|
|
host = u.hostname
|
|
path = u.path
|
|
now = datetime.datetime.now()
|
|
date = format_date_time(mktime(now.timetuple()))
|
|
digest = "SHA256=" + self.hashlib_256("")
|
|
signature_origin = "host: {}\ndate: {}\n{} {} HTTP/1.1\ndigest: {}".format(
|
|
host, date, method, path, digest
|
|
)
|
|
signature_sha = hmac.new(
|
|
self.api_secret.encode("utf-8"),
|
|
signature_origin.encode("utf-8"),
|
|
digestmod=hashlib.sha256,
|
|
).digest()
|
|
signature_sha = base64.b64encode(signature_sha).decode(encoding="utf-8")
|
|
authorization = 'api_key="%s", algorithm="%s", headers="%s", signature="%s"' % (
|
|
self.api_key,
|
|
"hmac-sha256",
|
|
"host date request-line digest",
|
|
signature_sha,
|
|
)
|
|
headers = {
|
|
"host": host,
|
|
"date": date,
|
|
"authorization": authorization,
|
|
"digest": digest,
|
|
"content-type": file_data_type,
|
|
}
|
|
return headers
|
|
|
|
def call_api(self, url, file_data, file_data_type):
|
|
"""调用POST API接口"""
|
|
headers = self.assemble_auth_header(
|
|
url, file_data_type, method="POST", body=file_data
|
|
)
|
|
try:
|
|
resp = requests.post(url, headers=headers, data=file_data, timeout=8)
|
|
print("上传状态:", resp.status_code, resp.text)
|
|
return resp.json()
|
|
except Exception as e:
|
|
print("上传失败!Exception :%s" % e)
|
|
return None
|
|
|
|
def upload_cut_complete(self, upload_id):
|
|
"""分块上传完成"""
|
|
body_dict = {
|
|
"app_id": self.app_id,
|
|
"request_id": self.get_request_id(),
|
|
"upload_id": upload_id,
|
|
}
|
|
file_data_type = "application/json"
|
|
url = LFASR_HOST + API_CUT_COMPLETE
|
|
response = self.call_api(url, json.dumps(body_dict), file_data_type)
|
|
if response and "data" in response and "url" in response["data"]:
|
|
file_url = response["data"]["url"]
|
|
print("任务上传结束")
|
|
return file_url
|
|
else:
|
|
print("分片上传完成失败", response)
|
|
return None
|
|
|
|
def upload_file(self):
|
|
"""上传文件,根据文件大小选择分片或普通上传"""
|
|
file_total_size = os.path.getsize(self.upload_file_path)
|
|
if file_total_size < 31457280:
|
|
print("-----不使用分块上传-----")
|
|
return self.simple_upload()
|
|
else:
|
|
print("-----使用分块上传-----")
|
|
return self.multipart_upload()
|
|
|
|
def simple_upload(self):
|
|
"""简单上传文件"""
|
|
try:
|
|
with open(self.upload_file_path, mode="rb") as f:
|
|
file = {
|
|
"data": (self.upload_file_path, f.read()),
|
|
"app_id": self.app_id,
|
|
"request_id": self.get_request_id(),
|
|
}
|
|
encode_data = encode_multipart_formdata(file)
|
|
file_data = encode_data[0]
|
|
file_data_type = encode_data[1]
|
|
url = LFASR_HOST + API_UPLOAD
|
|
response = self.call_api(url, file_data, file_data_type)
|
|
if response and "data" in response and "url" in response["data"]:
|
|
return response["data"]["url"]
|
|
else:
|
|
print("简单上传失败", response)
|
|
return None
|
|
except FileNotFoundError:
|
|
print("文件未找到:", self.upload_file_path)
|
|
return None
|
|
|
|
def multipart_upload(self):
|
|
"""分片上传文件"""
|
|
upload_id = self.prepare_upload()
|
|
if not upload_id:
|
|
return None
|
|
|
|
if not self.do_upload(upload_id):
|
|
return None
|
|
|
|
file_url = self.upload_cut_complete(upload_id)
|
|
print("分片上传地址:", file_url)
|
|
return file_url
|
|
|
|
def prepare_upload(self):
|
|
"""预处理,获取upload_id"""
|
|
body_dict = {
|
|
"app_id": self.app_id,
|
|
"request_id": self.get_request_id(),
|
|
}
|
|
url = LFASR_HOST + API_INIT
|
|
file_data_type = "application/json"
|
|
response = self.call_api(url, json.dumps(body_dict), file_data_type)
|
|
if response and "data" in response and "upload_id" in response["data"]:
|
|
return response["data"]["upload_id"]
|
|
else:
|
|
print("预处理失败", response)
|
|
return None
|
|
|
|
def do_upload(self, upload_id):
|
|
"""执行分片上传"""
|
|
file_total_size = os.path.getsize(self.upload_file_path)
|
|
chunk_size = FILE_PIECE_SIZE
|
|
chunks = math.ceil(file_total_size / chunk_size)
|
|
request_id = self.get_request_id()
|
|
slice_id = 1
|
|
|
|
print(
|
|
"文件:",
|
|
self.upload_file_path,
|
|
" 文件大小:",
|
|
file_total_size,
|
|
" 分块大小:",
|
|
chunk_size,
|
|
" 分块数:",
|
|
chunks,
|
|
)
|
|
|
|
with open(self.upload_file_path, mode="rb") as content:
|
|
while slice_id <= chunks:
|
|
current_size = min(
|
|
chunk_size, file_total_size - (slice_id - 1) * chunk_size
|
|
)
|
|
|
|
file = {
|
|
"data": (self.upload_file_path, content.read(current_size)),
|
|
"app_id": self.app_id,
|
|
"request_id": request_id,
|
|
"upload_id": upload_id,
|
|
"slice_id": slice_id,
|
|
}
|
|
|
|
encode_data = encode_multipart_formdata(file)
|
|
file_data = encode_data[0]
|
|
file_data_type = encode_data[1]
|
|
url = LFASR_HOST + API_CUT
|
|
|
|
resp = self.call_api(url, file_data, file_data_type)
|
|
count = 0
|
|
while not resp and (count < 3):
|
|
print("上传重试")
|
|
resp = self.call_api(url, file_data, file_data_type)
|
|
count = count + 1
|
|
time.sleep(1)
|
|
if not resp:
|
|
print("分片上传失败")
|
|
return False
|
|
slice_id += 1
|
|
|
|
return True
|
|
|
|
|
|
class ResultExtractor:
|
|
def __init__(self, appid, apikey, apisecret):
|
|
|
|
self.Host = "ost-api.xfyun.cn"
|
|
self.RequestUriCreate = PRO_CREATE_URI
|
|
self.RequestUriQuery = QUERY_URI
|
|
|
|
if re.match(r"^\d", self.Host):
|
|
self.urlCreate = "http://" + self.Host + self.RequestUriCreate
|
|
self.urlQuery = "http://" + self.Host + self.RequestUriQuery
|
|
else:
|
|
self.urlCreate = "https://" + self.Host + self.RequestUriCreate
|
|
self.urlQuery = "https://" + self.Host + self.RequestUriQuery
|
|
self.HttpMethod = "POST"
|
|
self.APPID = appid
|
|
self.Algorithm = "hmac-sha256"
|
|
self.HttpProto = "HTTP/1.1"
|
|
self.UserName = apikey
|
|
self.Secret = apisecret
|
|
|
|
|
|
cur_time_utc = datetime.datetime.now(datetime.timezone.utc)
|
|
self.Date = self.httpdate(cur_time_utc)
|
|
|
|
|
|
self.BusinessArgsCreate = {
|
|
"language": "zh_cn",
|
|
"accent": "mandarin",
|
|
"domain": "pro_ost_ed",
|
|
}
|
|
|
|
def img_read(self, path):
|
|
with open(path, "rb") as fo:
|
|
return fo.read()
|
|
|
|
def hashlib_256(self, res):
|
|
m = hashlib.sha256(bytes(res.encode(encoding="utf-8"))).digest()
|
|
result = "SHA-256=" + base64.b64encode(m).decode(encoding="utf-8")
|
|
return result
|
|
|
|
def httpdate(self, dt):
|
|
weekday = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()]
|
|
month = [
|
|
"Jan",
|
|
"Feb",
|
|
"Mar",
|
|
"Apr",
|
|
"May",
|
|
"Jun",
|
|
"Jul",
|
|
"Aug",
|
|
"Sep",
|
|
"Oct",
|
|
"Nov",
|
|
"Dec",
|
|
][dt.month - 1]
|
|
return "%s, %02d %s %04d %02d:%02d:%02d GMT" % (
|
|
weekday,
|
|
dt.day,
|
|
month,
|
|
dt.year,
|
|
dt.hour,
|
|
dt.minute,
|
|
dt.second,
|
|
)
|
|
|
|
def generateSignature(self, digest, uri):
|
|
signature_str = "host: " + self.Host + "\n"
|
|
signature_str += "date: " + self.Date + "\n"
|
|
signature_str += self.HttpMethod + " " + uri + " " + self.HttpProto + "\n"
|
|
signature_str += "digest: " + digest
|
|
signature = hmac.new(
|
|
bytes(self.Secret.encode("utf-8")),
|
|
bytes(signature_str.encode("utf-8")),
|
|
digestmod=hashlib.sha256,
|
|
).digest()
|
|
result = base64.b64encode(signature)
|
|
return result.decode(encoding="utf-8")
|
|
|
|
def init_header(self, data, uri):
|
|
digest = self.hashlib_256(data)
|
|
sign = self.generateSignature(digest, uri)
|
|
auth_header = (
|
|
'api_key="%s",algorithm="%s", '
|
|
'headers="host date request-line digest", '
|
|
'signature="%s"' % (self.UserName, self.Algorithm, sign)
|
|
)
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
"Method": "POST",
|
|
"Host": self.Host,
|
|
"Date": self.Date,
|
|
"Digest": digest,
|
|
"Authorization": auth_header,
|
|
}
|
|
return headers
|
|
|
|
def get_create_body(self, fileurl):
|
|
post_data = {
|
|
"common": {"app_id": self.APPID},
|
|
"business": self.BusinessArgsCreate,
|
|
"data": {"audio_src": "http", "audio_url": fileurl, "encoding": "raw"},
|
|
}
|
|
body = json.dumps(post_data)
|
|
return body
|
|
|
|
def get_query_body(self, task_id):
|
|
post_data = {
|
|
"common": {"app_id": self.APPID},
|
|
"business": {
|
|
"task_id": task_id,
|
|
},
|
|
}
|
|
body = json.dumps(post_data)
|
|
return body
|
|
|
|
def call(self, url, body, headers):
|
|
try:
|
|
response = requests.post(url, data=body, headers=headers, timeout=8)
|
|
status_code = response.status_code
|
|
if status_code != 200:
|
|
info = response.content
|
|
return info
|
|
else:
|
|
try:
|
|
return json.loads(response.text)
|
|
except json.JSONDecodeError:
|
|
return response.text
|
|
except Exception as e:
|
|
print("Exception :%s" % e)
|
|
return None
|
|
|
|
def task_create(self, fileurl):
|
|
body = self.get_create_body(fileurl)
|
|
headers_create = self.init_header(body, self.RequestUriCreate)
|
|
return self.call(self.urlCreate, body, headers_create)
|
|
|
|
def task_query(self, task_id):
|
|
query_body = self.get_query_body(task_id)
|
|
headers_query = self.init_header(query_body, self.RequestUriQuery)
|
|
return self.call(self.urlQuery, query_body, headers_query)
|
|
|
|
def extract_text(self, result):
|
|
"""
|
|
从API响应中提取文本内容
|
|
支持多种结果格式,增强错误处理
|
|
"""
|
|
|
|
print(f"\n[DEBUG] extract_text 输入类型: {type(result)}")
|
|
|
|
|
|
if isinstance(result, str):
|
|
print(f"[DEBUG] 字符串内容 (前200字符): {result[:200]}")
|
|
try:
|
|
result = json.loads(result)
|
|
print("[DEBUG] 成功解析字符串为JSON对象")
|
|
except json.JSONDecodeError:
|
|
print("[DEBUG] 无法解析为JSON,返回原始字符串")
|
|
return result
|
|
|
|
|
|
if isinstance(result, dict):
|
|
print("[DEBUG] 处理字典类型结果")
|
|
|
|
|
|
if "code" in result and result["code"] != 0:
|
|
error_msg = result.get("message", "未知错误")
|
|
print(
|
|
f"[ERROR] API返回错误: code={result['code']}, message={error_msg}"
|
|
)
|
|
return f"错误: {error_msg}"
|
|
|
|
|
|
if "result" in result and isinstance(result["result"], str):
|
|
print("[DEBUG] 找到直接结果字段")
|
|
return result["result"]
|
|
|
|
|
|
if "lattice" in result and isinstance(result["lattice"], list):
|
|
print("[DEBUG] 解析lattice结构")
|
|
text_parts = []
|
|
for lattice in result["lattice"]:
|
|
if not isinstance(lattice, dict):
|
|
continue
|
|
|
|
|
|
json_1best = lattice.get("json_1best", {})
|
|
if not json_1best or not isinstance(json_1best, dict):
|
|
continue
|
|
|
|
|
|
st_content = json_1best.get("st")
|
|
st_list = []
|
|
if isinstance(st_content, dict):
|
|
st_list = [st_content]
|
|
elif isinstance(st_content, list):
|
|
st_list = st_content
|
|
|
|
for st in st_list:
|
|
if isinstance(st, str):
|
|
|
|
text_parts.append(st)
|
|
elif isinstance(st, dict):
|
|
|
|
rt = st.get("rt", [])
|
|
if not isinstance(rt, list):
|
|
continue
|
|
|
|
for item in rt:
|
|
if isinstance(item, dict):
|
|
ws_list = item.get("ws", [])
|
|
if isinstance(ws_list, list):
|
|
for ws in ws_list:
|
|
if isinstance(ws, dict):
|
|
cw_list = ws.get("cw", [])
|
|
if isinstance(cw_list, list):
|
|
for cw in cw_list:
|
|
if isinstance(cw, dict):
|
|
w = cw.get("w", "")
|
|
if w:
|
|
text_parts.append(w)
|
|
return "".join(text_parts)
|
|
|
|
|
|
if "st" in result and isinstance(result["st"], list):
|
|
print("[DEBUG] 解析st结构")
|
|
text_parts = []
|
|
for st in result["st"]:
|
|
if isinstance(st, str):
|
|
text_parts.append(st)
|
|
elif isinstance(st, dict):
|
|
rt = st.get("rt", [])
|
|
if isinstance(rt, list):
|
|
for item in rt:
|
|
if isinstance(item, dict):
|
|
ws_list = item.get("ws", [])
|
|
if isinstance(ws_list, list):
|
|
for ws in ws_list:
|
|
if isinstance(ws, dict):
|
|
cw_list = ws.get("cw", [])
|
|
if isinstance(cw_list, list):
|
|
for cw in cw_list:
|
|
if isinstance(cw, dict):
|
|
w = cw.get("w", "")
|
|
if w:
|
|
text_parts.append(w)
|
|
return "".join(text_parts)
|
|
|
|
|
|
print("[WARNING] 无法识别的结果结构")
|
|
return json.dumps(result, indent=2, ensure_ascii=False)
|
|
|
|
|
|
print(f"[WARNING] 非字典类型结果: {type(result)}")
|
|
return str(result)
|
|
|
|
|
|
def audio_to_str(appid, apikey, apisecret, file_path):
|
|
"""
|
|
调用讯飞开放平台接口,获取音频文件的转写结果。
|
|
|
|
参数:
|
|
appid (str): 讯飞开放平台的appid。
|
|
apikey (str): 讯飞开放平台的apikey。
|
|
apisecret (str): 讯飞开放平台的apisecret。
|
|
file_path (str): 音频文件路径。
|
|
|
|
返回值:
|
|
str: 转写结果文本,如果发生错误则返回None。
|
|
"""
|
|
|
|
if not os.path.exists(file_path):
|
|
print(f"错误:文件 {file_path} 不存在")
|
|
return None
|
|
|
|
try:
|
|
|
|
file_uploader = FileUploader(
|
|
app_id=appid,
|
|
api_key=apikey,
|
|
api_secret=apisecret,
|
|
upload_file_path=file_path,
|
|
)
|
|
fileurl = file_uploader.upload_file()
|
|
if not fileurl:
|
|
print("文件上传失败")
|
|
return None
|
|
print("文件上传成功,fileurl:", fileurl)
|
|
|
|
|
|
result_extractor = ResultExtractor(appid, apikey, apisecret)
|
|
print("\n------ 创建任务 -------")
|
|
create_response = result_extractor.task_create(fileurl)
|
|
|
|
|
|
print(
|
|
f"[DEBUG] 创建任务响应: {json.dumps(create_response, indent=2, ensure_ascii=False)}"
|
|
)
|
|
|
|
if not isinstance(create_response, dict) or "data" not in create_response:
|
|
print("创建任务失败:", create_response)
|
|
return None
|
|
|
|
task_id = create_response["data"]["task_id"]
|
|
print(f"任务ID: {task_id}")
|
|
|
|
|
|
print("\n------ 查询任务 -------")
|
|
print("任务转写中······")
|
|
max_attempts = 30
|
|
attempt = 0
|
|
|
|
while attempt < max_attempts:
|
|
result = result_extractor.task_query(task_id)
|
|
|
|
|
|
print(f"\n[QUERY {attempt + 1}] 响应类型: {type(result)}")
|
|
if isinstance(result, dict):
|
|
print(
|
|
f"[QUERY {attempt + 1}] 响应内容: {json.dumps(result, indent=2, ensure_ascii=False)}"
|
|
)
|
|
else:
|
|
print(
|
|
f"[QUERY {attempt + 1}] 响应内容 (前200字符): {str(result)[:200]}"
|
|
)
|
|
|
|
|
|
if not isinstance(result, dict):
|
|
print(f"无效响应类型: {type(result)}")
|
|
return None
|
|
|
|
|
|
if "code" in result and result["code"] != 0:
|
|
error_msg = result.get("message", "未知错误")
|
|
print(f"API错误: code={result['code']}, message={error_msg}")
|
|
return None
|
|
|
|
|
|
task_data = result.get("data", {})
|
|
task_status = task_data.get("task_status")
|
|
|
|
if not task_status:
|
|
print("响应中缺少任务状态字段")
|
|
print("完整响应:", json.dumps(result, indent=2, ensure_ascii=False))
|
|
return None
|
|
|
|
|
|
if task_status in ["3", "4"]:
|
|
print("转写完成···")
|
|
|
|
|
|
result_content = task_data.get("result")
|
|
if result_content is not None:
|
|
try:
|
|
result_text = result_extractor.extract_text(result_content)
|
|
print("\n转写结果:\n", result_text)
|
|
return result_text
|
|
except Exception as e:
|
|
print(f"\n提取文本时出错: {str(e)}")
|
|
print(f"错误详情:\n{traceback.format_exc()}")
|
|
print(
|
|
"原始结果内容:",
|
|
json.dumps(result_content, indent=2, ensure_ascii=False),
|
|
)
|
|
return None
|
|
else:
|
|
print("\n响应中缺少结果字段")
|
|
print("完整响应:", json.dumps(result, indent=2, ensure_ascii=False))
|
|
return None
|
|
|
|
elif task_status in ["1", "2"]:
|
|
print(
|
|
f"任务状态:{task_status},等待中... (尝试 {attempt + 1}/{max_attempts})"
|
|
)
|
|
time.sleep(5)
|
|
attempt += 1
|
|
else:
|
|
print(f"未知任务状态:{task_status}")
|
|
print("完整响应:", json.dumps(result, indent=2, ensure_ascii=False))
|
|
return None
|
|
else:
|
|
print(f"超过最大查询次数({max_attempts}),任务可能仍在处理中")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"发生异常: {str(e)}")
|
|
print(f"错误详情:\n{traceback.format_exc()}")
|
|
return None
|
|
|
|
|
|
"""
|
|
1、通用文字识别,图像数据base64编码后大小不得超过10M
|
|
2、appid、apiSecret、apiKey请到讯飞开放平台控制台获取并填写到此demo中
|
|
3、支持中英文,支持手写和印刷文字。
|
|
4、在倾斜文字上效果有提升,同时支持部分生僻字的识别
|
|
"""
|
|
|
|
|
|
URL = "https://api.xf-yun.com/v1/private/sf8e6aca1"
|
|
|
|
|
|
class AssembleHeaderException(Exception):
|
|
def __init__(self, msg):
|
|
self.message = msg
|
|
|
|
|
|
class Url:
|
|
def __init__(self, host, path, schema):
|
|
self.host = host
|
|
self.path = path
|
|
self.schema = schema
|
|
pass
|
|
|
|
|
|
|
|
def sha256base64(data):
|
|
sha256 = hashlib.sha256()
|
|
sha256.update(data)
|
|
digest = base64.b64encode(sha256.digest()).decode(encoding="utf-8")
|
|
return digest
|
|
|
|
|
|
def parse_url(requset_url):
|
|
stidx = requset_url.index("://")
|
|
host = requset_url[stidx + 3 :]
|
|
schema = requset_url[: stidx + 3]
|
|
edidx = host.index("/")
|
|
if edidx <= 0:
|
|
raise AssembleHeaderException("invalid request url:" + requset_url)
|
|
path = host[edidx:]
|
|
host = host[:edidx]
|
|
u = Url(host, path, schema)
|
|
return u
|
|
|
|
|
|
|
|
def assemble_ws_auth_url(requset_url, method="POST", api_key="", api_secret=""):
|
|
u = parse_url(requset_url)
|
|
host = u.host
|
|
path = u.path
|
|
now = datetime.datetime.now()
|
|
date = format_date_time(mktime(now.timetuple()))
|
|
|
|
|
|
signature_origin = "host: {}\ndate: {}\n{} {} HTTP/1.1".format(
|
|
host, date, method, path
|
|
)
|
|
|
|
signature_sha = hmac.new(
|
|
api_secret.encode("utf-8"),
|
|
signature_origin.encode("utf-8"),
|
|
digestmod=hashlib.sha256,
|
|
).digest()
|
|
signature_sha = base64.b64encode(signature_sha).decode(encoding="utf-8")
|
|
authorization_origin = (
|
|
'api_key="%s", algorithm="%s", headers="%s", signature="%s"'
|
|
% (api_key, "hmac-sha256", "host date request-line", signature_sha)
|
|
)
|
|
authorization = base64.b64encode(authorization_origin.encode("utf-8")).decode(
|
|
encoding="utf-8"
|
|
)
|
|
|
|
values = {"host": host, "date": date, "authorization": authorization}
|
|
|
|
return requset_url + "?" + urlencode(values)
|
|
|
|
|
|
def image_to_str(endpoint=None, key=None, unused_param=None, file_path=None):
|
|
"""
|
|
调用Azure Computer Vision API识别图片中的文字。
|
|
|
|
参数:
|
|
endpoint (str): Azure Computer Vision endpoint URL。
|
|
key (str): Azure Computer Vision API key。
|
|
unused_param (str): 未使用的参数,保持兼容性。
|
|
file_path (str): 图片文件路径。
|
|
|
|
返回值:
|
|
str: 图片中的文字识别结果,如果发生错误则返回None。
|
|
"""
|
|
|
|
|
|
if endpoint is None:
|
|
endpoint = "https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/"
|
|
if key is None:
|
|
key = "45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ"
|
|
|
|
try:
|
|
|
|
with open(file_path, "rb") as f:
|
|
image_data = f.read()
|
|
|
|
|
|
analyze_url = endpoint.rstrip('/') + "/vision/v3.2/read/analyze"
|
|
|
|
|
|
headers = {
|
|
'Ocp-Apim-Subscription-Key': key,
|
|
'Content-Type': 'application/octet-stream'
|
|
}
|
|
|
|
|
|
response = requests.post(analyze_url, headers=headers, data=image_data)
|
|
|
|
if response.status_code != 202:
|
|
print(f"分析请求失败: {response.status_code}, {response.text}")
|
|
return None
|
|
|
|
|
|
operation_url = response.headers["Operation-Location"]
|
|
|
|
|
|
import time
|
|
while True:
|
|
result_response = requests.get(operation_url, headers={'Ocp-Apim-Subscription-Key': key})
|
|
result = result_response.json()
|
|
|
|
if result["status"] == "succeeded":
|
|
|
|
text_results = []
|
|
if "analyzeResult" in result and "readResults" in result["analyzeResult"]:
|
|
for read_result in result["analyzeResult"]["readResults"]:
|
|
for line in read_result["lines"]:
|
|
text_results.append(line["text"])
|
|
|
|
return " ".join(text_results) if text_results else ""
|
|
|
|
elif result["status"] == "failed":
|
|
print(f"文字识别失败: {result}")
|
|
return None
|
|
|
|
|
|
time.sleep(1)
|
|
|
|
except Exception as e:
|
|
print(f"发生异常: {e}")
|
|
return None
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
appid = "33c1b63d"
|
|
apikey = "40bf7cd82e31ace30a9cfb76309a43a3"
|
|
apisecret = "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4"
|
|
audio_path = r"audio_sample_little.wav"
|
|
image_path = r"1.png"
|
|
|
|
|
|
audio_text = audio_to_str(appid, apikey, apisecret, audio_path)
|
|
|
|
image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_path)
|
|
|
|
print("-"* 20)
|
|
|
|
print("\n音频转文字结果:", audio_text)
|
|
print("\n图片转文字结果:", image_text)
|
|
|
|
|
|
def azure_speech_to_text(speech_key, speech_region, audio_file_path):
|
|
"""
|
|
使用Azure Speech服务将音频文件转换为文本。
|
|
|
|
参数:
|
|
speech_key (str): Azure Speech服务的API密钥。
|
|
speech_region (str): Azure Speech服务的区域。
|
|
audio_file_path (str): 音频文件路径。
|
|
|
|
返回值:
|
|
str: 转换后的文本,如果发生错误则返回None。
|
|
"""
|
|
try:
|
|
|
|
speech_config = speechsdk.SpeechConfig(subscription=speech_key, region=speech_region)
|
|
speech_config.speech_recognition_language = "zh-CN"
|
|
|
|
|
|
audio_config = speechsdk.audio.AudioConfig(filename=audio_file_path)
|
|
|
|
|
|
speech_recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, audio_config=audio_config)
|
|
|
|
|
|
result = speech_recognizer.recognize_once()
|
|
|
|
|
|
if result.reason == speechsdk.ResultReason.RecognizedSpeech:
|
|
print(f"Azure Speech识别成功: {result.text}")
|
|
return result.text
|
|
elif result.reason == speechsdk.ResultReason.NoMatch:
|
|
print("Azure Speech未识别到语音")
|
|
return None
|
|
elif result.reason == speechsdk.ResultReason.Canceled:
|
|
cancellation_details = result.cancellation_details
|
|
print(f"Azure Speech识别被取消: {cancellation_details.reason}")
|
|
if cancellation_details.reason == speechsdk.CancellationReason.Error:
|
|
print(f"错误详情: {cancellation_details.error_details}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Azure Speech识别出错: {str(e)}")
|
|
return None
|
|
|