#!/usr/bin/python3 # -*- coding:utf-8 -*- import os import datetime import re import time import traceback import math from urllib.parse import urlparse from urllib3 import encode_multipart_formdata from wsgiref.handlers import format_date_time from time import mktime import hashlib import base64 import hmac from urllib.parse import urlencode import json import requests import azure.cognitiveservices.speech as speechsdk # 常量定义 LFASR_HOST = "http://upload-ost-api.xfyun.cn/file" # 文件上传Host API_INIT = "/mpupload/init" # 初始化接口 API_UPLOAD = "/upload" # 上传接口 API_CUT = "/mpupload/upload" # 分片上传接口 API_CUT_COMPLETE = "/mpupload/complete" # 分片完成接口 API_CUT_CANCEL = "/mpupload/cancel" # 分片取消接口 FILE_PIECE_SIZE = 5242880 # 文件分片大小5M PRO_CREATE_URI = "/v2/ost/pro_create" QUERY_URI = "/v2/ost/query" # 文件上传类 class FileUploader: def __init__(self, app_id, api_key, api_secret, upload_file_path): self.app_id = app_id self.api_key = api_key self.api_secret = api_secret self.upload_file_path = upload_file_path def get_request_id(self): """生成请求ID""" return time.strftime("%Y%m%d%H%M") def hashlib_256(self, data): """计算 SHA256 哈希""" m = hashlib.sha256(bytes(data.encode(encoding="utf-8"))).digest() digest = "SHA-256=" + base64.b64encode(m).decode(encoding="utf-8") return digest def assemble_auth_header(self, request_url, file_data_type, method="", body=""): """组装鉴权头部""" u = urlparse(request_url) host = u.hostname path = u.path now = datetime.datetime.now() date = format_date_time(mktime(now.timetuple())) digest = "SHA256=" + self.hashlib_256("") signature_origin = "host: {}\ndate: {}\n{} {} HTTP/1.1\ndigest: {}".format( host, date, method, path, digest ) signature_sha = hmac.new( self.api_secret.encode("utf-8"), signature_origin.encode("utf-8"), digestmod=hashlib.sha256, ).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding="utf-8") authorization = 'api_key="%s", algorithm="%s", headers="%s", signature="%s"' % ( self.api_key, "hmac-sha256", "host date request-line digest", signature_sha, ) headers = { "host": host, "date": date, "authorization": authorization, "digest": digest, "content-type": file_data_type, } return headers def call_api(self, url, file_data, file_data_type): """调用POST API接口""" headers = self.assemble_auth_header( url, file_data_type, method="POST", body=file_data ) try: resp = requests.post(url, headers=headers, data=file_data, timeout=8) print("上传状态:", resp.status_code, resp.text) return resp.json() except Exception as e: print("上传失败!Exception :%s" % e) return None def upload_cut_complete(self, upload_id): """分块上传完成""" body_dict = { "app_id": self.app_id, "request_id": self.get_request_id(), "upload_id": upload_id, } file_data_type = "application/json" url = LFASR_HOST + API_CUT_COMPLETE response = self.call_api(url, json.dumps(body_dict), file_data_type) if response and "data" in response and "url" in response["data"]: file_url = response["data"]["url"] print("任务上传结束") return file_url else: print("分片上传完成失败", response) return None def upload_file(self): """上传文件,根据文件大小选择分片或普通上传""" file_total_size = os.path.getsize(self.upload_file_path) if file_total_size < 31457280: # 30MB print("-----不使用分块上传-----") return self.simple_upload() else: print("-----使用分块上传-----") return self.multipart_upload() def simple_upload(self): """简单上传文件""" try: with open(self.upload_file_path, mode="rb") as f: file = { "data": (self.upload_file_path, f.read()), "app_id": self.app_id, "request_id": self.get_request_id(), } encode_data = encode_multipart_formdata(file) file_data = encode_data[0] file_data_type = encode_data[1] url = LFASR_HOST + API_UPLOAD response = self.call_api(url, file_data, file_data_type) if response and "data" in response and "url" in response["data"]: return response["data"]["url"] else: print("简单上传失败", response) return None except FileNotFoundError: print("文件未找到:", self.upload_file_path) return None def multipart_upload(self): """分片上传文件""" upload_id = self.prepare_upload() if not upload_id: return None if not self.do_upload(upload_id): return None file_url = self.upload_cut_complete(upload_id) print("分片上传地址:", file_url) return file_url def prepare_upload(self): """预处理,获取upload_id""" body_dict = { "app_id": self.app_id, "request_id": self.get_request_id(), } url = LFASR_HOST + API_INIT file_data_type = "application/json" response = self.call_api(url, json.dumps(body_dict), file_data_type) if response and "data" in response and "upload_id" in response["data"]: return response["data"]["upload_id"] else: print("预处理失败", response) return None def do_upload(self, upload_id): """执行分片上传""" file_total_size = os.path.getsize(self.upload_file_path) chunk_size = FILE_PIECE_SIZE chunks = math.ceil(file_total_size / chunk_size) request_id = self.get_request_id() slice_id = 1 print( "文件:", self.upload_file_path, " 文件大小:", file_total_size, " 分块大小:", chunk_size, " 分块数:", chunks, ) with open(self.upload_file_path, mode="rb") as content: while slice_id <= chunks: current_size = min( chunk_size, file_total_size - (slice_id - 1) * chunk_size ) file = { "data": (self.upload_file_path, content.read(current_size)), "app_id": self.app_id, "request_id": request_id, "upload_id": upload_id, "slice_id": slice_id, } encode_data = encode_multipart_formdata(file) file_data = encode_data[0] file_data_type = encode_data[1] url = LFASR_HOST + API_CUT resp = self.call_api(url, file_data, file_data_type) count = 0 while not resp and (count < 3): print("上传重试") resp = self.call_api(url, file_data, file_data_type) count = count + 1 time.sleep(1) if not resp: print("分片上传失败") return False slice_id += 1 return True class ResultExtractor: def __init__(self, appid, apikey, apisecret): # POST 请求相关参数 self.Host = "ost-api.xfyun.cn" self.RequestUriCreate = PRO_CREATE_URI self.RequestUriQuery = QUERY_URI # 设置 URL if re.match(r"^\d", self.Host): self.urlCreate = "http://" + self.Host + self.RequestUriCreate self.urlQuery = "http://" + self.Host + self.RequestUriQuery else: self.urlCreate = "https://" + self.Host + self.RequestUriCreate self.urlQuery = "https://" + self.Host + self.RequestUriQuery self.HttpMethod = "POST" self.APPID = appid self.Algorithm = "hmac-sha256" self.HttpProto = "HTTP/1.1" self.UserName = apikey self.Secret = apisecret # 设置当前时间 cur_time_utc = datetime.datetime.now(datetime.timezone.utc) self.Date = self.httpdate(cur_time_utc) # 设置测试音频文件参数 self.BusinessArgsCreate = { "language": "zh_cn", "accent": "mandarin", "domain": "pro_ost_ed", } def img_read(self, path): with open(path, "rb") as fo: return fo.read() def hashlib_256(self, res): m = hashlib.sha256(bytes(res.encode(encoding="utf-8"))).digest() result = "SHA-256=" + base64.b64encode(m).decode(encoding="utf-8") return result def httpdate(self, dt): weekday = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()] month = [ "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec", ][dt.month - 1] return "%s, %02d %s %04d %02d:%02d:%02d GMT" % ( weekday, dt.day, month, dt.year, dt.hour, dt.minute, dt.second, ) def generateSignature(self, digest, uri): signature_str = "host: " + self.Host + "\n" signature_str += "date: " + self.Date + "\n" signature_str += self.HttpMethod + " " + uri + " " + self.HttpProto + "\n" signature_str += "digest: " + digest signature = hmac.new( bytes(self.Secret.encode("utf-8")), bytes(signature_str.encode("utf-8")), digestmod=hashlib.sha256, ).digest() result = base64.b64encode(signature) return result.decode(encoding="utf-8") def init_header(self, data, uri): digest = self.hashlib_256(data) sign = self.generateSignature(digest, uri) auth_header = ( 'api_key="%s",algorithm="%s", ' 'headers="host date request-line digest", ' 'signature="%s"' % (self.UserName, self.Algorithm, sign) ) headers = { "Content-Type": "application/json", "Accept": "application/json", "Method": "POST", "Host": self.Host, "Date": self.Date, "Digest": digest, "Authorization": auth_header, } return headers def get_create_body(self, fileurl): post_data = { "common": {"app_id": self.APPID}, "business": self.BusinessArgsCreate, "data": {"audio_src": "http", "audio_url": fileurl, "encoding": "raw"}, } body = json.dumps(post_data) return body def get_query_body(self, task_id): post_data = { "common": {"app_id": self.APPID}, "business": { "task_id": task_id, }, } body = json.dumps(post_data) return body def call(self, url, body, headers): try: response = requests.post(url, data=body, headers=headers, timeout=8) status_code = response.status_code if status_code != 200: info = response.content return info else: try: return json.loads(response.text) except json.JSONDecodeError: return response.text except Exception as e: print("Exception :%s" % e) return None def task_create(self, fileurl): body = self.get_create_body(fileurl) headers_create = self.init_header(body, self.RequestUriCreate) return self.call(self.urlCreate, body, headers_create) def task_query(self, task_id): query_body = self.get_query_body(task_id) headers_query = self.init_header(query_body, self.RequestUriQuery) return self.call(self.urlQuery, query_body, headers_query) def extract_text(self, result): """ 从API响应中提取文本内容 支持多种结果格式,增强错误处理 """ # 调试输出:打印原始结果类型 print(f"\n[DEBUG] extract_text 输入类型: {type(result)}") # 如果是字符串,尝试解析为JSON if isinstance(result, str): print(f"[DEBUG] 字符串内容 (前200字符): {result[:200]}") try: result = json.loads(result) print("[DEBUG] 成功解析字符串为JSON对象") except json.JSONDecodeError: print("[DEBUG] 无法解析为JSON,返回原始字符串") return result # 处理字典类型的结果 if isinstance(result, dict): print("[DEBUG] 处理字典类型结果") # 1. 检查错误信息 if "code" in result and result["code"] != 0: error_msg = result.get("message", "未知错误") print( f"[ERROR] API返回错误: code={result['code']}, message={error_msg}" ) return f"错误: {error_msg}" # 2. 检查直接包含文本结果的情况 if "result" in result and isinstance(result["result"], str): print("[DEBUG] 找到直接结果字段") return result["result"] # 3. 检查lattice结构(详细结果) if "lattice" in result and isinstance(result["lattice"], list): print("[DEBUG] 解析lattice结构") text_parts = [] for lattice in result["lattice"]: if not isinstance(lattice, dict): continue # 获取json_1best内容 json_1best = lattice.get("json_1best", {}) if not json_1best or not isinstance(json_1best, dict): continue # 处理st字段 - 修正:st可能是字典或列表 st_content = json_1best.get("st") st_list = [] if isinstance(st_content, dict): st_list = [st_content] # 转为列表统一处理 elif isinstance(st_content, list): st_list = st_content for st in st_list: if isinstance(st, str): # 直接是字符串结果 text_parts.append(st) elif isinstance(st, dict): # 处理字典结构的st rt = st.get("rt", []) if not isinstance(rt, list): continue for item in rt: if isinstance(item, dict): ws_list = item.get("ws", []) if isinstance(ws_list, list): for ws in ws_list: if isinstance(ws, dict): cw_list = ws.get("cw", []) if isinstance(cw_list, list): for cw in cw_list: if isinstance(cw, dict): w = cw.get("w", "") if w: text_parts.append(w) return "".join(text_parts) # 4. 检查简化结构(直接包含st) if "st" in result and isinstance(result["st"], list): print("[DEBUG] 解析st结构") text_parts = [] for st in result["st"]: if isinstance(st, str): text_parts.append(st) elif isinstance(st, dict): rt = st.get("rt", []) if isinstance(rt, list): for item in rt: if isinstance(item, dict): ws_list = item.get("ws", []) if isinstance(ws_list, list): for ws in ws_list: if isinstance(ws, dict): cw_list = ws.get("cw", []) if isinstance(cw_list, list): for cw in cw_list: if isinstance(cw, dict): w = cw.get("w", "") if w: text_parts.append(w) return "".join(text_parts) # 5. 其他未知结构 print("[WARNING] 无法识别的结果结构") return json.dumps(result, indent=2, ensure_ascii=False) # 6. 非字典类型结果 print(f"[WARNING] 非字典类型结果: {type(result)}") return str(result) def audio_to_str(appid, apikey, apisecret, file_path): """ 调用讯飞开放平台接口,获取音频文件的转写结果。 参数: appid (str): 讯飞开放平台的appid。 apikey (str): 讯飞开放平台的apikey。 apisecret (str): 讯飞开放平台的apisecret。 file_path (str): 音频文件路径。 返回值: str: 转写结果文本,如果发生错误则返回None。 """ # 检查文件是否存在 if not os.path.exists(file_path): print(f"错误:文件 {file_path} 不存在") return None try: # 1. 文件上传 file_uploader = FileUploader( app_id=appid, api_key=apikey, api_secret=apisecret, upload_file_path=file_path, ) fileurl = file_uploader.upload_file() if not fileurl: print("文件上传失败") return None print("文件上传成功,fileurl:", fileurl) # 2. 创建任务并查询结果 result_extractor = ResultExtractor(appid, apikey, apisecret) print("\n------ 创建任务 -------") create_response = result_extractor.task_create(fileurl) # 调试输出创建响应 print( f"[DEBUG] 创建任务响应: {json.dumps(create_response, indent=2, ensure_ascii=False)}" ) if not isinstance(create_response, dict) or "data" not in create_response: print("创建任务失败:", create_response) return None task_id = create_response["data"]["task_id"] print(f"任务ID: {task_id}") # 查询任务 print("\n------ 查询任务 -------") print("任务转写中······") max_attempts = 30 attempt = 0 while attempt < max_attempts: result = result_extractor.task_query(task_id) # 调试输出查询响应 print(f"\n[QUERY {attempt + 1}] 响应类型: {type(result)}") if isinstance(result, dict): print( f"[QUERY {attempt + 1}] 响应内容: {json.dumps(result, indent=2, ensure_ascii=False)}" ) else: print( f"[QUERY {attempt + 1}] 响应内容 (前200字符): {str(result)[:200]}" ) # 检查响应是否有效 if not isinstance(result, dict): print(f"无效响应类型: {type(result)}") return None # 检查API错误码 if "code" in result and result["code"] != 0: error_msg = result.get("message", "未知错误") print(f"API错误: code={result['code']}, message={error_msg}") return None # 获取任务状态 task_data = result.get("data", {}) task_status = task_data.get("task_status") if not task_status: print("响应中缺少任务状态字段") print("完整响应:", json.dumps(result, indent=2, ensure_ascii=False)) return None # 处理不同状态 if task_status in ["3", "4"]: # 任务已完成或回调完成 print("转写完成···") # 提取结果 result_content = task_data.get("result") if result_content is not None: try: result_text = result_extractor.extract_text(result_content) print("\n转写结果:\n", result_text) return result_text except Exception as e: print(f"\n提取文本时出错: {str(e)}") print(f"错误详情:\n{traceback.format_exc()}") print( "原始结果内容:", json.dumps(result_content, indent=2, ensure_ascii=False), ) return None else: print("\n响应中缺少结果字段") print("完整响应:", json.dumps(result, indent=2, ensure_ascii=False)) return None elif task_status in ["1", "2"]: # 任务待处理或处理中 print( f"任务状态:{task_status},等待中... (尝试 {attempt + 1}/{max_attempts})" ) time.sleep(5) attempt += 1 else: print(f"未知任务状态:{task_status}") print("完整响应:", json.dumps(result, indent=2, ensure_ascii=False)) return None else: print(f"超过最大查询次数({max_attempts}),任务可能仍在处理中") return None except Exception as e: print(f"发生异常: {str(e)}") print(f"错误详情:\n{traceback.format_exc()}") return None """ 1、通用文字识别,图像数据base64编码后大小不得超过10M 2、appid、apiSecret、apiKey请到讯飞开放平台控制台获取并填写到此demo中 3、支持中英文,支持手写和印刷文字。 4、在倾斜文字上效果有提升,同时支持部分生僻字的识别 """ # 图像识别接口地址 URL = "https://api.xf-yun.com/v1/private/sf8e6aca1" class AssembleHeaderException(Exception): def __init__(self, msg): self.message = msg class Url: def __init__(self, host, path, schema): self.host = host self.path = path self.schema = schema pass # calculate sha256 and encode to base64 def sha256base64(data): sha256 = hashlib.sha256() sha256.update(data) digest = base64.b64encode(sha256.digest()).decode(encoding="utf-8") return digest def parse_url(requset_url): stidx = requset_url.index("://") host = requset_url[stidx + 3 :] schema = requset_url[: stidx + 3] edidx = host.index("/") if edidx <= 0: raise AssembleHeaderException("invalid request url:" + requset_url) path = host[edidx:] host = host[:edidx] u = Url(host, path, schema) return u # build websocket auth request url def assemble_ws_auth_url(requset_url, method="POST", api_key="", api_secret=""): u = parse_url(requset_url) host = u.host path = u.path now = datetime.datetime.now() date = format_date_time(mktime(now.timetuple())) # print(date) # 可选:打印Date值 signature_origin = "host: {}\ndate: {}\n{} {} HTTP/1.1".format( host, date, method, path ) # print(signature_origin) # 可选:打印签名原文 signature_sha = hmac.new( api_secret.encode("utf-8"), signature_origin.encode("utf-8"), digestmod=hashlib.sha256, ).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding="utf-8") authorization_origin = ( 'api_key="%s", algorithm="%s", headers="%s", signature="%s"' % (api_key, "hmac-sha256", "host date request-line", signature_sha) ) authorization = base64.b64encode(authorization_origin.encode("utf-8")).decode( encoding="utf-8" ) # print(authorization_origin) # 可选:打印鉴权原文 values = {"host": host, "date": date, "authorization": authorization} return requset_url + "?" + urlencode(values) def image_to_str(endpoint=None, key=None, unused_param=None, file_path=None): """ 调用Azure Computer Vision API识别图片中的文字。 参数: endpoint (str): Azure Computer Vision endpoint URL。 key (str): Azure Computer Vision API key。 unused_param (str): 未使用的参数,保持兼容性。 file_path (str): 图片文件路径。 返回值: str: 图片中的文字识别结果,如果发生错误则返回None。 """ # 默认配置 if endpoint is None: endpoint = "https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/" if key is None: key = "45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ" try: # 读取图片文件 with open(file_path, "rb") as f: image_data = f.read() # 构造请求URL analyze_url = endpoint.rstrip('/') + "/vision/v3.2/read/analyze" # 设置请求头 headers = { 'Ocp-Apim-Subscription-Key': key, 'Content-Type': 'application/octet-stream' } # 发送POST请求开始分析 response = requests.post(analyze_url, headers=headers, data=image_data) if response.status_code != 202: print(f"分析请求失败: {response.status_code}, {response.text}") return None # 获取操作位置 operation_url = response.headers["Operation-Location"] # 轮询结果 import time while True: result_response = requests.get(operation_url, headers={'Ocp-Apim-Subscription-Key': key}) result = result_response.json() if result["status"] == "succeeded": # 提取文字 text_results = [] if "analyzeResult" in result and "readResults" in result["analyzeResult"]: for read_result in result["analyzeResult"]["readResults"]: for line in read_result["lines"]: text_results.append(line["text"]) return " ".join(text_results) if text_results else "" elif result["status"] == "failed": print(f"文字识别失败: {result}") return None # 等待1秒后重试 time.sleep(1) except Exception as e: print(f"发生异常: {e}") return None if __name__ == "__main__": # 输入讯飞开放平台的 appid,secret、key 和文件路径 appid = "33c1b63d" apikey = "40bf7cd82e31ace30a9cfb76309a43a3" apisecret = "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4" audio_path = r"audio_sample_little.wav" # 确保文件路径正确 image_path = r"1.png" # 确保文件路径正确 # 音频转文字 audio_text = audio_to_str(appid, apikey, apisecret, audio_path) # 图片转文字 image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_path) print("-"* 20) print("\n音频转文字结果:", audio_text) print("\n图片转文字结果:", image_text) def azure_speech_to_text(speech_key, speech_region, audio_file_path): """ 使用Azure Speech服务将音频文件转换为文本。 参数: speech_key (str): Azure Speech服务的API密钥。 speech_region (str): Azure Speech服务的区域。 audio_file_path (str): 音频文件路径。 返回值: str: 转换后的文本,如果发生错误则返回None。 """ try: # 设置语音配置 speech_config = speechsdk.SpeechConfig(subscription=speech_key, region=speech_region) speech_config.speech_recognition_language = "zh-CN" # 设置为中文 # 设置音频配置 audio_config = speechsdk.audio.AudioConfig(filename=audio_file_path) # 创建语音识别器 speech_recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, audio_config=audio_config) # 执行语音识别 result = speech_recognizer.recognize_once() # 检查识别结果 if result.reason == speechsdk.ResultReason.RecognizedSpeech: print(f"Azure Speech识别成功: {result.text}") return result.text elif result.reason == speechsdk.ResultReason.NoMatch: print("Azure Speech未识别到语音") return None elif result.reason == speechsdk.ResultReason.Canceled: cancellation_details = result.cancellation_details print(f"Azure Speech识别被取消: {cancellation_details.reason}") if cancellation_details.reason == speechsdk.CancellationReason.Error: print(f"错误详情: {cancellation_details.error_details}") return None except Exception as e: print(f"Azure Speech识别出错: {str(e)}") return None