diff --git "a/lib/functions.py" "b/lib/functions.py"
new file mode 100644--- /dev/null
+++ "b/lib/functions.py"
@@ -0,0 +1,4108 @@
+# NOTE!!NOTE!!!NOTE!!NOTE!!!NOTE!!NOTE!!!NOTE!!NOTE!!!
+# THE WORD "CHAPTER" IN THE CODE DOES NOT MEAN
+# IT'S THE REAL CHAPTER OF THE EBOOK SINCE NO STANDARDS
+# ARE DEFINING A CHAPTER ON .EPUB FORMAT. THE WORD "BLOCK"
+# IS USED TO PRINT IT OUT TO THE TERMINAL, AND "CHAPTER" TO THE CODE
+# WHICH IS LESS GENERIC FOR THE DEVELOPERS
+
+import argparse, asyncio, csv, fnmatch, hashlib, io, json, math, os, platform, random, shutil, socket, subprocess, sys, tempfile, threading, time, traceback
+import unicodedata, urllib.request, uuid, zipfile, ebooklib, gradio as gr, psutil, pymupdf4llm, regex as re, requests, stanza, torch, uvicorn
+
+from soynlp.tokenizer import LTokenizer
+from pythainlp.tokenize import word_tokenize
+from sudachipy import dictionary, tokenizer
+from PIL import Image
+from tqdm import tqdm
+from bs4 import BeautifulSoup, NavigableString, Tag
+from collections import Counter
+from collections.abc import Mapping
+from collections.abc import MutableMapping
+from datetime import datetime
+from ebooklib import epub
+from glob import glob
+from iso639 import languages
+from markdown import markdown
+from multiprocessing import Pool, cpu_count
+from multiprocessing import Manager, Event
+from multiprocessing.managers import DictProxy, ListProxy
+from num2words import num2words
+from pathlib import Path
+from pydub import AudioSegment
+from pydub.utils import mediainfo
+from queue import Queue, Empty
+from types import MappingProxyType
+from urllib.parse import urlparse
+from starlette.requests import ClientDisconnect
+
+from lib import *
+from lib.classes.voice_extractor import VoiceExtractor
+from lib.classes.tts_manager import TTSManager
+#from lib.classes.redirect_console import RedirectConsole
+#from lib.classes.argos_translator import ArgosTranslator
+
+context = None
+is_gui_process = False
+active_sessions = set()
+
+#import logging
+#logging.basicConfig(
+# level=logging.INFO, # DEBUG for more verbosity
+# format="%(asctime)s [%(levelname)s] %(message)s"
+#)
+
+class DependencyError(Exception):
+ def __init__(self, message=None):
+ super().__init__(message)
+ print(message)
+ # Automatically handle the exception when it's raised
+ self.handle_exception()
+
+ def handle_exception(self):
+ # Print the full traceback of the exception
+ traceback.print_exc()
+ # Print the exception message
+ error = f'Caught DependencyError: {self}'
+ print(error)
+ # Exit the script if it's not a web process
+ if not is_gui_process:
+ sys.exit(1)
+
+class SessionTracker:
+ def __init__(self):
+ self.lock = threading.Lock()
+
+ def start_session(self, id):
+ with self.lock:
+ session = context.get_session(id)
+ if session['status'] is None:
+ session['status'] = 'ready'
+ return True
+ return False
+
+ def end_session(self, id, socket_hash):
+ active_sessions.discard(socket_hash)
+ with self.lock:
+ session = context.get_session(id)
+ session['cancellation_requested'] = True
+ session['tab_id'] = None
+ session['status'] = None
+ session[socket_hash] = None
+
+class SessionContext:
+ def __init__(self):
+ self.manager = Manager()
+ self.sessions = self.manager.dict()
+ self.cancellation_events = {}
+
+ def get_session(self, id):
+ if id not in self.sessions:
+ self.sessions[id] = recursive_proxy({
+ "script_mode": NATIVE,
+ "id": id,
+ "tab_id": None,
+ "process_id": None,
+ "status": None,
+ "event": None,
+ "progress": 0,
+ "cancellation_requested": False,
+ "device": default_device,
+ "system": None,
+ "client": None,
+ "language": default_language_code,
+ "language_iso1": None,
+ "audiobook": None,
+ "audiobooks_dir": None,
+ "process_dir": None,
+ "ebook": None,
+ "ebook_list": None,
+ "ebook_mode": "single",
+ "chapters_dir": None,
+ "chapters_dir_sentences": None,
+ "epub_path": None,
+ "filename_noext": None,
+ "tts_engine": default_tts_engine,
+ "fine_tuned": default_fine_tuned,
+ "voice": None,
+ "voice_dir": None,
+ "custom_model": None,
+ "custom_model_dir": None,
+ "temperature": default_engine_settings[TTS_ENGINES['XTTSv2']]['temperature'],
+ "length_penalty": default_engine_settings[TTS_ENGINES['XTTSv2']]['length_penalty'],
+ "num_beams": default_engine_settings[TTS_ENGINES['XTTSv2']]['num_beams'],
+ "repetition_penalty": default_engine_settings[TTS_ENGINES['XTTSv2']]['repetition_penalty'],
+ "top_k": default_engine_settings[TTS_ENGINES['XTTSv2']]['top_k'],
+ "top_p": default_engine_settings[TTS_ENGINES['XTTSv2']]['top_p'],
+ "speed": default_engine_settings[TTS_ENGINES['XTTSv2']]['speed'],
+ "enable_text_splitting": default_engine_settings[TTS_ENGINES['XTTSv2']]['enable_text_splitting'],
+ "text_temp": default_engine_settings[TTS_ENGINES['BARK']]['text_temp'],
+ "waveform_temp": default_engine_settings[TTS_ENGINES['BARK']]['waveform_temp'],
+ "final_name": None,
+ "output_format": default_output_format,
+ "output_split": default_output_split,
+ "output_split_hours": default_output_split_hours,
+ "metadata": {
+ "title": None,
+ "creator": None,
+ "contributor": None,
+ "language": None,
+ "identifier": None,
+ "publisher": None,
+ "date": None,
+ "description": None,
+ "subject": None,
+ "rights": None,
+ "format": None,
+ "type": None,
+ "coverage": None,
+ "relation": None,
+ "Source": None,
+ "Modified": None,
+ },
+ "toc": None,
+ "chapters": None,
+ "cover": None,
+ "duration": 0,
+ "playback_time": 0
+ }, manager=self.manager)
+ return self.sessions[id]
+
+ def find_id_by_hash(self, socket_hash):
+ for id, session in self.sessions.items():
+ if socket_hash in session:
+ return session.get('id')
+ return None
+
+ctx_tracker = SessionTracker()
+
+def recursive_proxy(data, manager=None):
+ if manager is None:
+ manager = Manager()
+ if isinstance(data, dict):
+ proxy_dict = manager.dict()
+ for key, value in data.items():
+ proxy_dict[key] = recursive_proxy(value, manager)
+ return proxy_dict
+ elif isinstance(data, list):
+ proxy_list = manager.list()
+ for item in data:
+ proxy_list.append(recursive_proxy(item, manager))
+ return proxy_list
+ elif isinstance(data, (str, int, float, bool, type(None))):
+ return data
+ else:
+ error = f"Unsupported data type: {type(data)}"
+ print(error)
+ return
+
+def prepare_dirs(src, session):
+ try:
+ resume = False
+ os.makedirs(os.path.join(models_dir,'tts'), exist_ok=True)
+ os.makedirs(session['session_dir'], exist_ok=True)
+ os.makedirs(session['process_dir'], exist_ok=True)
+ os.makedirs(session['custom_model_dir'], exist_ok=True)
+ os.makedirs(session['voice_dir'], exist_ok=True)
+ os.makedirs(session['audiobooks_dir'], exist_ok=True)
+ session['ebook'] = os.path.join(session['process_dir'], os.path.basename(src))
+ if os.path.exists(session['ebook']):
+ if compare_files_by_hash(session['ebook'], src):
+ resume = True
+ if not resume:
+ shutil.rmtree(session['chapters_dir'], ignore_errors=True)
+ os.makedirs(session['chapters_dir'], exist_ok=True)
+ os.makedirs(session['chapters_dir_sentences'], exist_ok=True)
+ shutil.copy(src, session['ebook'])
+ return True
+ except Exception as e:
+ DependencyError(e)
+ return False
+
+def check_programs(prog_name, command, options):
+ try:
+ subprocess.run(
+ [command, options],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ check=True,
+ text=True,
+ encoding='utf-8'
+ )
+ return True, None
+ except FileNotFoundError:
+ e = f'''********** Error: {prog_name} is not installed! if your OS calibre package version
+ is not compatible you still can run ebook2audiobook.sh (linux/mac) or ebook2audiobook.cmd (windows) **********'''
+ DependencyError(e)
+ return False, None
+ except subprocess.CalledProcessError:
+ e = f'Error: There was an issue running {prog_name}.'
+ DependencyError(e)
+ return False, None
+
+def analyze_uploaded_file(zip_path, required_files):
+ try:
+ if not os.path.exists(zip_path):
+ error = f"The file does not exist: {os.path.basename(zip_path)}"
+ print(error)
+ return False
+ files_in_zip = {}
+ empty_files = set()
+ with zipfile.ZipFile(zip_path, 'r') as zf:
+ for file_info in zf.infolist():
+ file_name = file_info.filename
+ if file_info.is_dir():
+ continue
+ base_name = os.path.basename(file_name)
+ files_in_zip[base_name.lower()] = file_info.file_size
+ if file_info.file_size == 0:
+ empty_files.add(base_name.lower())
+ required_files = [file.lower() for file in required_files]
+ missing_files = [f for f in required_files if f not in files_in_zip]
+ required_empty_files = [f for f in required_files if f in empty_files]
+ if missing_files:
+ print(f"Missing required files: {missing_files}")
+ if required_empty_files:
+ print(f"Required files with 0 KB: {required_empty_files}")
+ return not missing_files and not required_empty_files
+ except zipfile.BadZipFile:
+ error = "The file is not a valid ZIP archive."
+ raise ValueError(error)
+ except Exception as e:
+ error = f"An error occurred: {e}"
+ raise RuntimeError(error)
+
+def extract_custom_model(file_src, session, required_files=None):
+ try:
+ model_path = None
+ if required_files is None:
+ required_files = models[session['tts_engine']][default_fine_tuned]['files']
+ model_name = re.sub('.zip', '', os.path.basename(file_src), flags=re.IGNORECASE)
+ model_name = get_sanitized(model_name)
+ with zipfile.ZipFile(file_src, 'r') as zip_ref:
+ files = zip_ref.namelist()
+ files_length = len(files)
+ tts_dir = session['tts_engine']
+ model_path = os.path.join(session['custom_model_dir'], tts_dir, model_name)
+ if os.path.exists(model_path):
+ print(f'{model_path} already exists, bypassing files extraction')
+ return model_path
+ os.makedirs(model_path, exist_ok=True)
+ required_files_lc = set(x.lower() for x in required_files)
+ with tqdm(total=files_length, unit='files') as t:
+ for f in files:
+ base_f = os.path.basename(f).lower()
+ if base_f in required_files_lc:
+ out_path = os.path.join(model_path, base_f)
+ with zip_ref.open(f) as src, open(out_path, 'wb') as dst:
+ shutil.copyfileobj(src, dst)
+ t.update(1)
+ if is_gui_process:
+ os.remove(file_src)
+ if model_path is not None:
+ msg = f'Extracted files to {model_path}'
+ print(msg)
+ return model_path
+ else:
+ error = f'An error occured when unzip {file_src}'
+ return None
+ except asyncio.exceptions.CancelledError as e:
+ DependencyError(e)
+ if is_gui_process:
+ os.remove(file_src)
+ return None
+ except Exception as e:
+ DependencyError(e)
+ if is_gui_process:
+ os.remove(file_src)
+ return None
+
+def hash_proxy_dict(proxy_dict):
+ return hashlib.md5(str(proxy_dict).encode('utf-8')).hexdigest()
+
+def calculate_hash(filepath, hash_algorithm='sha256'):
+ hash_func = hashlib.new(hash_algorithm)
+ with open(filepath, 'rb') as f:
+ while chunk := f.read(8192): # Read in chunks to handle large files
+ hash_func.update(chunk)
+ return hash_func.hexdigest()
+
+def compare_files_by_hash(file1, file2, hash_algorithm='sha256'):
+ return calculate_hash(file1, hash_algorithm) == calculate_hash(file2, hash_algorithm)
+
+def compare_dict_keys(d1, d2):
+ if not isinstance(d1, Mapping) or not isinstance(d2, Mapping):
+ return d1 == d2
+ d1_keys = set(d1.keys())
+ d2_keys = set(d2.keys())
+ missing_in_d2 = d1_keys - d2_keys
+ missing_in_d1 = d2_keys - d1_keys
+ if missing_in_d2 or missing_in_d1:
+ return {
+ "missing_in_d2": missing_in_d2,
+ "missing_in_d1": missing_in_d1,
+ }
+ for key in d1_keys.intersection(d2_keys):
+ nested_result = compare_keys(d1[key], d2[key])
+ if nested_result:
+ return {key: nested_result}
+ return None
+
+def proxy2dict(proxy_obj):
+ def recursive_copy(source, visited):
+ # Handle circular references by tracking visited objects
+ if id(source) in visited:
+ return None # Stop processing circular references
+ visited.add(id(source)) # Mark as visited
+ if isinstance(source, dict):
+ result = {}
+ for key, value in source.items():
+ result[key] = recursive_copy(value, visited)
+ return result
+ elif isinstance(source, list):
+ return [recursive_copy(item, visited) for item in source]
+ elif isinstance(source, set):
+ return list(source)
+ elif isinstance(source, (int, float, str, bool, type(None))):
+ return source
+ elif isinstance(source, DictProxy):
+ # Explicitly handle DictProxy objects
+ return recursive_copy(dict(source), visited) # Convert DictProxy to dict
+ else:
+ return str(source) # Convert non-serializable types to strings
+ return recursive_copy(proxy_obj, set())
+
+def convert2epub(id):
+ session = context.get_session(id)
+ if session['cancellation_requested']:
+ print('Cancel requested')
+ return False
+ try:
+ title = False
+ author = False
+ util_app = shutil.which('ebook-convert')
+ if not util_app:
+ error = "The 'ebook-convert' utility is not installed or not found."
+ print(error)
+ return False
+ file_input = session['ebook']
+ if os.path.getsize(file_input) == 0:
+ error = f"Input file is empty: {file_input}"
+ print(error)
+ return False
+ file_ext = os.path.splitext(file_input)[1].lower()
+ if file_ext not in ebook_formats:
+ error = f'Unsupported file format: {file_ext}'
+ print(error)
+ return False
+ if file_ext == '.pdf':
+ import fitz
+ msg = 'File input is a PDF. flatten it in MarkDown...'
+ print(msg)
+ doc = fitz.open(session['ebook'])
+ pdf_metadata = doc.metadata
+ filename_no_ext = os.path.splitext(os.path.basename(session['ebook']))[0]
+ title = pdf_metadata.get('title') or filename_no_ext
+ author = pdf_metadata.get('author') or False
+ markdown_text = pymupdf4llm.to_markdown(session['ebook'])
+ # Remove single asterisks for italics (but not bold **)
+ markdown_text = re.sub(r'(? in the head of the first XHTML document
+ if all_docs:
+ html = all_docs[0].get_content().decode("utf-8")
+ soup = BeautifulSoup(html, "html.parser")
+ title_tag = soup.select_one("head > title")
+ if title_tag and title_tag.text.strip():
+ return title_tag.text.strip()
+ # 3. Try
if no visible
+ img = soup.find("img", alt=True)
+ if img:
+ alt = img['alt'].strip()
+ if alt and "cover" not in alt.lower():
+ return alt
+ return None
+
+def get_cover(epubBook, session):
+ try:
+ if session['cancellation_requested']:
+ msg = 'Cancel requested'
+ print(msg)
+ return False
+ cover_image = None
+ cover_path = os.path.join(session['process_dir'], session['filename_noext'] + '.jpg')
+ for item in epubBook.get_items_of_type(ebooklib.ITEM_COVER):
+ cover_image = item.get_content()
+ break
+ if not cover_image:
+ for item in epubBook.get_items_of_type(ebooklib.ITEM_IMAGE):
+ if 'cover' in item.file_name.lower() or 'cover' in item.get_id().lower():
+ cover_image = item.get_content()
+ break
+ if cover_image:
+ # Open the image from bytes
+ image = Image.open(io.BytesIO(cover_image))
+ # Convert to RGB if needed (JPEG doesn't support alpha)
+ if image.mode in ('RGBA', 'P'):
+ image = image.convert('RGB')
+ image.save(cover_path, format='JPEG')
+ return cover_path
+ return True
+ except Exception as e:
+ DependencyError(e)
+ return False
+
+def get_chapters(epubBook, session):
+ try:
+ msg = r'''
+*******************************************************************************
+NOTE:
+The warning "Character xx not found in the vocabulary."
+MEANS THE MODEL CANNOT INTERPRET THE CHARACTER AND WILL MAYBE GENERATE
+(AS WELL AS WRONG PUNCTUATION POSITION) AN HALLUCINATION TO IMPROVE THIS MODEL,
+IT NEEDS TO ADD THIS CHARACTER INTO A NEW TRAINING MODEL.
+YOU CAN IMPROVE IT OR ASK TO A TRAINING MODEL EXPERT.
+*******************************************************************************
+ '''
+ print(msg)
+ if session['cancellation_requested']:
+ print('Cancel requested')
+ return False
+ # Step 1: Extract TOC (Table of Contents)
+ try:
+ toc = epubBook.toc # Extract TOC
+ toc_list = [
+ nt for item in toc if hasattr(item, 'title')
+ if (nt := normalize_text(
+ str(item.title),
+ session['language'],
+ session['language_iso1'],
+ session['tts_engine']
+ )) is not None
+ ]
+ except Exception as toc_error:
+ error = f"Error extracting TOC: {toc_error}"
+ print(error)
+ # Get spine item IDs
+ spine_ids = [item[0] for item in epubBook.spine]
+ # Filter only spine documents (i.e., reading order)
+ all_docs = [
+ item for item in epubBook.get_items_of_type(ebooklib.ITEM_DOCUMENT)
+ if item.id in spine_ids
+ ]
+ if not all_docs:
+ return [], []
+ title = get_ebook_title(epubBook, all_docs)
+ chapters = []
+ stanza_nlp = False
+ if session['language'] in year_to_decades_languages:
+ stanza.download(session['language_iso1'])
+ stanza_nlp = stanza.Pipeline(session['language_iso1'], processors='tokenize,ner')
+ is_num2words_compat = get_num2words_compat(session['language_iso1'])
+ msg = 'Analyzing numbers, maths signs, dates and time to convert in words...'
+ print(msg)
+ for doc in all_docs:
+ sentences_list = filter_chapter(doc, session['language'], session['language_iso1'], session['tts_engine'], stanza_nlp, is_num2words_compat)
+ if sentences_list is None:
+ break
+ elif len(sentences_list) > 0:
+ chapters.append(sentences_list)
+ if len(chapters) == 0:
+ error = 'No chapters found!'
+ return None, None
+ return toc, chapters
+ except Exception as e:
+ error = f'Error extracting main content pages: {e}'
+ DependencyError(error)
+ return None, None
+
+def filter_chapter(doc, lang, lang_iso1, tts_engine, stanza_nlp, is_num2words_compat):
+
+ def tuple_row(node, last_text_char=None):
+ try:
+ for child in node.children:
+ if isinstance(child, NavigableString):
+ text = child.strip()
+ if text:
+ yield ("text", text)
+ last_text_char = text[-1] if text else last_text_char
+
+ elif isinstance(child, Tag):
+ name = child.name.lower()
+ if name in heading_tags:
+ title = child.get_text(strip=True)
+ if title:
+ yield ("heading", title)
+ last_text_char = title[-1] if title else last_text_char
+
+ elif name == "table":
+ yield ("table", child)
+
+ else:
+ return_data = False
+ if name in proc_tags:
+ for inner in tuple_row(child, last_text_char):
+ return_data = True
+ yield inner
+ # Track last char if this is text or heading
+ if inner[0] in ("text", "heading") and inner[1]:
+ last_text_char = inner[1][-1]
+
+ if return_data:
+ if name in break_tags:
+ # Only yield break if last char is NOT alnum or space
+ if not (last_text_char and (last_text_char.isalnum() or last_text_char.isspace())):
+ yield ("break", TTS_SML['break'])
+ elif name in heading_tags or name in pause_tags:
+ yield ("pause", TTS_SML['pause'])
+
+ else:
+ yield from tuple_row(child, last_text_char)
+
+ except Exception as e:
+ error = f'filter_chapter() tuple_row() error: {e}'
+ DependencyError(error)
+ return None
+
+ try:
+ heading_tags = [f'h{i}' for i in range(1, 5)]
+ break_tags = ['br', 'p']
+ pause_tags = ['div', 'span']
+ proc_tags = heading_tags + break_tags + pause_tags
+ raw_html = doc.get_body_content().decode("utf-8")
+ soup = BeautifulSoup(raw_html, 'html.parser')
+ body = soup.body
+ if not body or not body.get_text(strip=True):
+ return []
+ # Skip known non-chapter types
+ epub_type = body.get("epub:type", "").lower()
+ if not epub_type:
+ section_tag = soup.find("section")
+ if section_tag:
+ epub_type = section_tag.get("epub:type", "").lower()
+ excluded = {
+ "frontmatter", "backmatter", "toc", "titlepage", "colophon",
+ "acknowledgments", "dedication", "glossary", "index",
+ "appendix", "bibliography", "copyright-page", "landmark"
+ }
+ if any(part in epub_type for part in excluded):
+ return []
+ # remove scripts/styles
+ for tag in soup(["script", "style"]):
+ tag.decompose()
+ tuples_list = list(tuple_row(body))
+ if not tuples_list:
+ error = 'No tuples_list from body created!'
+ print(error)
+ return None
+ text_list = []
+ handled_tables = set()
+ prev_typ = None
+ for typ, payload in tuples_list:
+ if typ == "heading":
+ text_list.append(payload.strip())
+ elif typ == "break":
+ if prev_typ != 'break':
+ text_list.append(TTS_SML['break'])
+ elif typ == 'pause':
+ if prev_typ != 'pause':
+ text_list.append(TTS_SML['pause'])
+ elif typ == "table":
+ table = payload
+ if table in handled_tables:
+ prev_typ = typ
+ continue
+ handled_tables.add(table)
+ rows = table.find_all("tr")
+ if not rows:
+ prev_typ = typ
+ continue
+ headers = [c.get_text(strip=True) for c in rows[0].find_all(["td", "th"])]
+ for row in rows[1:]:
+ cells = [c.get_text(strip=True).replace('\xa0', ' ') for c in row.find_all("td")]
+ if not cells:
+ continue
+ if len(cells) == len(headers) and headers:
+ line = " — ".join(f"{h}: {c}" for h, c in zip(headers, cells))
+ else:
+ line = " — ".join(cells)
+ if line:
+ text_list.append(line.strip())
+ else:
+ text = payload.strip()
+ if text:
+ text_list.append(text)
+ prev_typ = typ
+ max_chars = language_mapping[lang]['max_chars'] - 4
+ clean_list = []
+ i = 0
+ while i < len(text_list):
+ current = text_list[i]
+ if current == "‡break‡":
+ if clean_list:
+ prev = clean_list[-1]
+ if prev in ("‡break‡", "‡pause‡"):
+ i += 1
+ continue
+ if prev and (prev[-1].isalnum() or prev[-1] == ' '):
+ if i + 1 < len(text_list):
+ next_sentence = text_list[i + 1]
+ merged_length = len(prev.rstrip()) + 1 + len(next_sentence.lstrip())
+ if merged_length <= max_chars:
+ # Merge with space handling
+ if not prev.endswith(" ") and not next_sentence.startswith(" "):
+ clean_list[-1] = prev + " " + next_sentence
+ else:
+ clean_list[-1] = prev + next_sentence
+ i += 2
+ continue
+ else:
+ clean_list.append(current)
+ i += 1
+ continue
+ clean_list.append(current)
+ i += 1
+ text = ' '.join(clean_list)
+ if not re.search(r"[^\W_]", text):
+ error = 'No valid text found!'
+ print(error)
+ return None
+ if stanza_nlp:
+ # Check if there are positive integers so possible date to convert
+ re_ordinal = re.compile(
+ r'(? "sixteenth"
+ if is_num2words_compat:
+ processed = re_ordinal.sub(
+ lambda m: num2words(int(m.group(1)), to="ordinal", lang=(lang_iso1 or "en")),
+ processed
+ )
+ else:
+ processed = re_ordinal.sub(
+ lambda m: math2words(m.group(), lang, lang_iso1, tts_engine, is_num2words_compat),
+ processed
+ )
+ # 3) convert other numbers (skip 4-digit years)
+ def _num_repl(m):
+ s = m.group(0)
+ # leave years alone (already handled above)
+ if re.fullmatch(r"\d{4}", s):
+ return s
+ n = float(s) if "." in s else int(s)
+ if is_num2words_compat:
+ return num2words(n, lang=(lang_iso1 or "en"))
+ else:
+ return math2words(m, lang, lang_iso1, tts_engine, is_num2words_compat)
+
+ processed = re_num.sub(_num_repl, processed)
+ result.append(processed)
+ last_pos = end
+ result.append(text[last_pos:])
+ text = ''.join(result)
+ else:
+ if is_num2words_compat:
+ text = re_ordinal.sub(
+ lambda m: num2words(int(m.group(1)), to="ordinal", lang=(lang_iso1 or "en")),
+ text
+ )
+ else:
+ text = re_ordinal.sub(
+ lambda m: math2words(int(m.group(1)), lang, lang_iso1, tts_engine, is_num2words_compat),
+ text
+ )
+ text = re.sub(
+ r"\b\d{4}\b",
+ lambda m: year2words(m.group(), lang, lang_iso1, is_num2words_compat),
+ text
+ )
+ text = roman2number(text)
+ text = clock2words(text, lang, lang_iso1, tts_engine, is_num2words_compat)
+ text = math2words(text, lang, lang_iso1, tts_engine, is_num2words_compat)
+ # build a translation table mapping each bad char to a space
+ specialchars_remove_table = str.maketrans({ch: ' ' for ch in specialchars_remove})
+ text = text.translate(specialchars_remove_table)
+ text = normalize_text(text, lang, lang_iso1, tts_engine)
+ # Ensure space before and after punctuation_list
+ #pattern_space = re.escape(''.join(punctuation_list))
+ #punctuation_pattern_space = r'(? max_chars:
+ yield buffer
+ buffer = ''
+ # 3) Append the token (word, punctuation, whatever) unless it's a sml token (already checked)
+ buffer += token
+ # 4) Flush any trailing text
+ if buffer:
+ yield buffer
+ except Exception as e:
+ DependencyError(e)
+ if buffer:
+ yield buffer
+
+ try:
+ max_chars = language_mapping[lang]['max_chars'] - 4
+ min_tokens = 5
+ # List or tuple of tokens that must never be appended to buffer
+ sml_tokens = tuple(TTS_SML.values())
+ sml_list = re.split(rf"({'|'.join(map(re.escape, sml_tokens))})", text)
+ sml_list = [s for s in sml_list if s.strip() or s in sml_tokens]
+ pattern_split = '|'.join(map(re.escape, punctuation_split_hard_set))
+ pattern = re.compile(rf"(.*?(?:{pattern_split}){''.join(punctuation_list_set)})(?=\s|$)", re.DOTALL)
+ hard_list = []
+ for s in sml_list:
+ if s in [TTS_SML['break'], TTS_SML['pause']] or len(s) <= max_chars:
+ hard_list.append(s)
+ else:
+ parts = split_inclusive(s, pattern)
+ if parts:
+ for text_part in parts:
+ text_part = text_part.strip()
+ if text_part:
+ hard_list.append(text_part)
+ else:
+ s = s.strip()
+ if s:
+ hard_list.append(s)
+ # Check if some hard_list entries exceed max_chars, so split on soft punctuation
+ pattern_split = '|'.join(map(re.escape, punctuation_split_soft_set))
+ pattern = re.compile(rf"(.*?(?:{pattern_split}))(?=\s|$)", re.DOTALL)
+ soft_list = []
+ for s in hard_list:
+ if s in [TTS_SML['break'], TTS_SML['pause']] or len(s) <= max_chars:
+ soft_list.append(s)
+ elif len(s) > max_chars:
+ parts = [p for p in split_inclusive(s, pattern) if p]
+ if parts:
+ buffer = ''
+ for idx, part in enumerate(parts):
+ # Predict length if we glue this part
+ predicted_length = len(buffer) + (1 if buffer else 0) + len(part)
+ # Peek ahead to see if gluing will exceed max_chars
+ if predicted_length <= max_chars:
+ buffer = (buffer + ' ' + part).strip() if buffer else part
+ else:
+ # If we overshoot, check if buffer ends with punctuation
+ if buffer and not any(buffer.rstrip().endswith(p) for p in punctuation_split_soft_set):
+ # Try to backtrack to last punctuation inside buffer
+ last_punct_idx = max((buffer.rfind(p) for p in punctuation_split_soft_set if p in buffer), default=-1)
+ if last_punct_idx != -1:
+ soft_list.append(buffer[:last_punct_idx+1].strip())
+ leftover = buffer[last_punct_idx+1:].strip()
+ buffer = leftover + ' ' + part if leftover else part
+ else:
+ # No punctuation, just split as-is
+ soft_list.append(buffer.strip())
+ buffer = part
+ else:
+ soft_list.append(buffer.strip())
+ buffer = part
+ if buffer:
+ cleaned = re.sub(r'[^\p{L}\p{N} ]+', '', buffer)
+ if any(ch.isalnum() for ch in cleaned):
+ soft_list.append(buffer.strip())
+ else:
+ cleaned = re.sub(r'[^\p{L}\p{N} ]+', '', s)
+ if any(ch.isalnum() for ch in cleaned):
+ soft_list.append(s.strip())
+ else:
+ cleaned = re.sub(r'[^\p{L}\p{N} ]+', '', s)
+ if any(ch.isalnum() for ch in cleaned):
+ soft_list.append(s.strip())
+
+ if lang in ['zho', 'jpn', 'kor', 'tha', 'lao', 'mya', 'khm']:
+ result = []
+ for s in soft_list:
+ if s in [TTS_SML['break'], TTS_SML['pause']]:
+ result.append(s)
+ else:
+ tokens = segment_ideogramms(s)
+ if isinstance(tokens, list):
+ result.extend([t for t in tokens if t.strip()])
+ else:
+ tokens = tokens.strip()
+ if tokens:
+ result.append(tokens)
+ return list(join_ideogramms(result))
+ else:
+ sentences = []
+ for s in soft_list:
+ if s in [TTS_SML['break'], TTS_SML['pause']] or len(s) <= max_chars:
+ sentences.append(s)
+ else:
+ words = s.split(' ')
+ text_part = words[0]
+ for w in words[1:]:
+ if len(text_part) + 1 + len(w) <= max_chars:
+ text_part += ' ' + w
+ else:
+ text_part = text_part.strip()
+ if text_part:
+ sentences.append(text_part)
+ text_part = w
+ if text_part:
+ cleaned = re.sub(r'[^\p{L}\p{N} ]+', '', text_part).strip()
+ if not any(ch.isalnum() for ch in cleaned):
+ continue
+ sentences.append(text_part)
+ return sentences
+ except Exception as e:
+ error = f'get_sentences() error: {e}'
+ print(error)
+ return None
+
+def get_ram():
+ vm = psutil.virtual_memory()
+ return vm.total // (1024 ** 3)
+
+def get_vram():
+ os_name = platform.system()
+ # NVIDIA (Cross-Platform: Windows, Linux, macOS)
+ try:
+ from pynvml import nvmlInit, nvmlDeviceGetHandleByIndex, nvmlDeviceGetMemoryInfo
+ nvmlInit()
+ handle = nvmlDeviceGetHandleByIndex(0) # First GPU
+ info = nvmlDeviceGetMemoryInfo(handle)
+ vram = info.total
+ return int(vram // (1024 ** 3)) # Convert to GB
+ except ImportError:
+ pass
+ except Exception as e:
+ pass
+ # AMD (Windows)
+ if os_name == "Windows":
+ try:
+ cmd = 'wmic path Win32_VideoController get AdapterRAM'
+ output = subprocess.run(cmd, capture_output=True, text=True, shell=True)
+ lines = output.stdout.splitlines()
+ vram_values = [int(line.strip()) for line in lines if line.strip().isdigit()]
+ if vram_values:
+ return int(vram_values[0] // (1024 ** 3))
+ except Exception as e:
+ pass
+ # AMD (Linux)
+ if os_name == "Linux":
+ try:
+ cmd = "lspci -v | grep -i 'VGA' -A 12 | grep -i 'preallocated' | awk '{print $2}'"
+ output = subprocess.run(cmd, capture_output=True, text=True, shell=True)
+ if output.stdout.strip().isdigit():
+ return int(output.stdout.strip()) // 1024
+ except Exception as e:
+ pass
+ # Intel (Linux Only)
+ intel_vram_paths = [
+ "/sys/kernel/debug/dri/0/i915_vram_total", # Intel dedicated GPUs
+ "/sys/class/drm/card0/device/resource0" # Some integrated GPUs
+ ]
+ for path in intel_vram_paths:
+ if os.path.exists(path):
+ try:
+ with open(path, "r") as f:
+ vram = int(f.read().strip()) // (1024 ** 3)
+ return vram
+ except Exception as e:
+ pass
+ # macOS (OpenGL Alternative)
+ if os_name == "Darwin":
+ try:
+ from OpenGL.GL import glGetIntegerv
+ from OpenGL.GLX import GLX_RENDERER_VIDEO_MEMORY_MB_MESA
+ vram = int(glGetIntegerv(GLX_RENDERER_VIDEO_MEMORY_MB_MESA) // 1024)
+ return vram
+ except ImportError:
+ pass
+ except Exception as e:
+ pass
+ msg = 'Could not detect GPU VRAM Capacity!'
+ return 0
+
+def get_sanitized(str, replacement="_"):
+ str = str.replace('&', 'And')
+ forbidden_chars = r'[<>:"/\\|?*\x00-\x1F ()]'
+ sanitized = re.sub(r'\s+', replacement, str)
+ sanitized = re.sub(forbidden_chars, replacement, sanitized)
+ sanitized = sanitized.strip("_")
+ return sanitized
+
+def get_date_entities(text, stanza_nlp):
+ try:
+ doc = stanza_nlp(text)
+ date_spans = []
+ for ent in doc.ents:
+ if ent.type == 'DATE':
+ date_spans.append((ent.start_char, ent.end_char, ent.text))
+ return date_spans
+ except Exception as e:
+ error = f'get_date_entities() error: {e}'
+ print(error)
+ return False
+
+def get_num2words_compat(lang_iso1):
+ try:
+ test = num2words(1, lang=lang_iso1.replace('zh', 'zh_CN'))
+ return True
+ except NotImplementedError:
+ return False
+ except Exception as e:
+ return False
+
+def set_formatted_number(text: str, lang, lang_iso1: str, is_num2words_compat: bool, max_single_value: int = 999_999_999_999_999_999):
+ # match up to 18 digits, optional “,…” groups (allowing spaces or NBSP after comma), optional decimal of up to 12 digits
+ # handle optional range with dash/en dash/em dash between numbers, and allow trailing punctuation
+ number_re = re.compile(
+ r'(? str:
+ """Normalize number string to standard comma format: 1,234,567"""
+ tok = num_str.replace('\u00A0', '').replace(' ', '')
+ if '.' in tok:
+ integer_part, decimal_part = tok.split('.', 1)
+ integer_part = integer_part.replace(',', '')
+ integer_part = "{:,}".format(int(integer_part))
+ return f"{integer_part}.{decimal_part}"
+ else:
+ integer_part = tok.replace(',', '')
+ return "{:,}".format(int(integer_part))
+
+ def clean_single_num(num_str):
+ tok = unicodedata.normalize('NFKC', num_str)
+ if tok.lower() in ('inf', 'infinity', 'nan'):
+ return tok
+ clean = tok.replace(',', '').replace('\u00A0', '').replace(' ', '')
+ try:
+ num = float(clean) if '.' in clean else int(clean)
+ except (ValueError, OverflowError):
+ return tok
+ if not math.isfinite(num) or abs(num) > max_single_value:
+ return tok
+
+ # Normalize commas before final output
+ tok = normalize_commas(tok)
+
+ if is_num2words_compat:
+ new_lang_iso1 = lang_iso1.replace('zh', 'zh_CN')
+ return num2words(num, lang=new_lang_iso1)
+ else:
+ phoneme_map = language_math_phonemes.get(
+ lang,
+ language_math_phonemes.get(default_language_code, language_math_phonemes['eng'])
+ )
+ return ' '.join(phoneme_map.get(ch, ch) for ch in str(num))
+
+ def clean_match(match):
+ first_num = clean_single_num(match.group(1))
+ dash_char = match.group(2) or ''
+ second_num = clean_single_num(match.group(3)) if match.group(3) else ''
+ trailing = match.group(4) or ''
+ if second_num:
+ return f"{first_num}{dash_char}{second_num}{trailing}"
+ else:
+ return f"{first_num}{trailing}"
+
+ return number_re.sub(clean_match, text)
+
+def year2words(year_str, lang, lang_iso1, is_num2words_compat):
+ try:
+ year = int(year_str)
+ first_two = int(year_str[:2])
+ last_two = int(year_str[2:])
+ lang_iso1 = lang_iso1 if lang in language_math_phonemes.keys() else default_language_code
+ lang_iso1 = lang_iso1.replace('zh', 'zh_CN')
+ if not year_str.isdigit() or len(year_str) != 4 or last_two < 10:
+ if is_num2words_compat:
+ return num2words(year, lang=lang_iso1)
+ else:
+ return ' '.join(language_math_phonemes[lang].get(ch, ch) for ch in year_str)
+ if is_num2words_compat:
+ return f"{num2words(first_two, lang=lang_iso1)} {num2words(last_two, lang=lang_iso1)}"
+ else:
+ return ' '.join(language_math_phonemes[lang].get(ch, ch) for ch in first_two) + ' ' + ' '.join(language_math_phonemes[lang].get(ch, ch) for ch in last_two)
+ except Exception as e:
+ error = f'year2words() error: {e}'
+ print(error)
+ raise
+ return False
+
+def clock2words(text, lang, lang_iso1, tts_engine, is_num2words_compat):
+ time_rx = re.compile(r'(\d{1,2})[:.](\d{1,2})(?:[:.](\d{1,2}))?')
+ lang_lc = (lang or "").lower()
+ lc = language_clock.get(lang_lc) if 'language_clock' in globals() else None
+ _n2w_cache = {}
+
+ def n2w(n: int) -> str:
+ key = (n, lang_lc, is_num2words_compat)
+ if key in _n2w_cache:
+ return _n2w_cache[key]
+ if is_num2words_compat:
+ word = num2words(n, lang=lang_lc)
+ else:
+ word = math2words(n, lang, lang_iso1, tts_engine, is_num2words_compat)
+ _n2w_cache[key] = word
+ return word
+
+ def repl_num(m: re.Match) -> str:
+ # Parse hh[:mm[:ss]]
+ try:
+ h = int(m.group(1))
+ mnt = int(m.group(2))
+ sec = m.group(3)
+ sec = int(sec) if sec is not None else None
+ except Exception:
+ return m.group(0)
+ # basic validation; if out of range, keep original
+ if not (0 <= h <= 23 and 0 <= mnt <= 59 and (sec is None or 0 <= sec <= 59)):
+ return m.group(0)
+ # If no language clock rules, just say numbers plainly
+ if not lc:
+ parts = [n2w(h)]
+ if mnt != 0:
+ parts.append(n2w(mnt))
+ if sec is not None and sec > 0:
+ parts.append(n2w(sec))
+ return " ".join(parts)
+
+ next_hour = (h + 1) % 24
+ special_hours = lc.get("special_hours", {})
+ # Build main phrase
+ if mnt == 0 and (sec is None or sec == 0):
+ if h in special_hours:
+ phrase = special_hours[h]
+ else:
+ phrase = lc["oclock"].format(hour=n2w(h))
+ elif mnt == 15:
+ phrase = lc["quarter_past"].format(hour=n2w(h))
+ elif mnt == 30:
+ # German "halb drei" (= 2:30) uses next hour
+ if lang_lc == "deu":
+ phrase = lc["half_past"].format(next_hour=n2w(next_hour))
+ else:
+ phrase = lc["half_past"].format(hour=n2w(h))
+ elif mnt == 45:
+ phrase = lc["quarter_to"].format(next_hour=n2w(next_hour))
+ elif mnt < 30:
+ phrase = lc["past"].format(hour=n2w(h), minute=n2w(mnt)) if mnt != 0 else lc["oclock"].format(hour=n2w(h))
+ else:
+ minute_to_hour = 60 - mnt
+ phrase = lc["to"].format(next_hour=n2w(next_hour), minute=n2w(minute_to_hour))
+ # Append seconds if present
+ if sec is not None and sec > 0:
+ second_phrase = lc["second"].format(second=n2w(sec))
+ phrase = lc["full"].format(phrase=phrase, second_phrase=second_phrase)
+ return phrase
+
+ return time_rx.sub(repl_num, text)
+
+def math2words(text, lang, lang_iso1, tts_engine, is_num2words_compat):
+
+ def repl_ambiguous(match):
+ # handles "num SYMBOL num" and "SYMBOL num"
+ if match.group(2) and match.group(2) in ambiguous_replacements:
+ return f"{match.group(1)} {ambiguous_replacements[match.group(2)]} {match.group(3)}"
+ if match.group(3) and match.group(3) in ambiguous_replacements:
+ return f"{ambiguous_replacements[match.group(3)]} {match.group(4)}"
+ return match.group(0)
+
+ def _ordinal_to_words(m):
+ n = int(m.group(1))
+ if is_num2words_compat:
+ try:
+ from num2words import num2words
+ return num2words(n, to="ordinal", lang=(lang_iso1 or "en"))
+ except Exception:
+ pass
+ # If num2words isn't available/compatible, keep original token as-is.
+ return m.group(0)
+
+ # Matches any digits + optional space/NBSP + st/nd/rd/th, not glued into words.
+ re_ordinal = re.compile(r'(?= 2
+ # This avoids: 19C, 19°C, °C, AC/DC, CD-ROM, single-letter "I"
+ text = re.sub(r'(? str:
+ token = match.group(1)
+ for k, expansion in mapping.items():
+ if token.lower() == k.lower():
+ return expansion
+ return token # fallback
+ mapping = abbreviations_mapping[lang]
+ # Sort keys by descending length so longer ones match first
+ keys = sorted(mapping.keys(), key=len, reverse=True)
+ # Build a regex that only matches whole “words” (tokens) exactly
+ pattern = re.compile(
+ r'(? resume_sentence or (sentence_number == 0 and resume_sentence == 0):
+ if sentence_number <= resume_sentence and sentence_number > 0:
+ msg = f'**Recovering missing file sentence {sentence_number}'
+ print(msg)
+ sentence = sentence.strip()
+ success = tts_manager.convert_sentence2audio(sentence_number, sentence) if sentence else True
+ if success:
+ total_progress = (t.n + 1) / total_iterations
+ progress_bar(total_progress)
+ is_sentence = sentence.strip() not in TTS_SML.values()
+ percentage = total_progress * 100
+ t.set_description(f'{percentage:.2f}%')
+ msg = f" | {sentence}" if is_sentence else f" | {sentence}"
+ print(msg)
+ else:
+ return False
+ if sentence.strip() not in TTS_SML.values():
+ sentence_number += 1
+ t.update(1) # advance for every iteration, including SML
+ end = sentence_number - 1 if sentence_number > 1 else sentence_number
+ msg = f"End of Block {chapter_num}"
+ print(msg)
+ if chapter_num in missing_chapters or sentence_number > resume_sentence:
+ if chapter_num <= resume_chapter:
+ msg = f'**Recovering missing file block {chapter_num}'
+ print(msg)
+ if combine_audio_sentences(chapter_audio_file, start, end, session):
+ msg = f'Combining block {chapter_num} to audio, sentence {start} to {end}'
+ print(msg)
+ else:
+ msg = 'combine_audio_sentences() failed!'
+ print(msg)
+ return False
+ return True
+ except Exception as e:
+ DependencyError(e)
+ return False
+
+def assemble_chunks(txt_file, out_file):
+ try:
+ ffmpeg_cmd = [
+ shutil.which('ffmpeg'), '-hide_banner', '-nostats', '-y',
+ '-safe', '0', '-f', 'concat', '-i', txt_file,
+ '-c:a', default_audio_proc_format, '-map_metadata', '-1', '-threads', '1', out_file
+ ]
+ process = subprocess.Popen(
+ ffmpeg_cmd,
+ env={},
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ encoding='utf-8',
+ errors='ignore'
+ )
+ for line in process.stdout:
+ print(line, end='') # Print each line of stdout
+ process.wait()
+ if process.returncode == 0:
+ return True
+ else:
+ error = process.returncode
+ print(error, ffmpeg_cmd)
+ return False
+ except subprocess.CalledProcessError as e:
+ DependencyError(e)
+ return False
+ except Exception as e:
+ error = f"assemble_chanks() Error: Failed to process {txt_file} → {out_file}: {e}"
+ print(error)
+ return False
+
+def combine_audio_sentences(chapter_audio_file, start, end, session):
+ try:
+ chapter_audio_file = os.path.join(session['chapters_dir'], chapter_audio_file)
+ chapters_dir_sentences = session['chapters_dir_sentences']
+ batch_size = 1024
+ sentence_files = [
+ f for f in os.listdir(chapters_dir_sentences)
+ if f.endswith(f'.{default_audio_proc_format}')
+ ]
+ sentences_ordered = sorted(
+ sentence_files, key=lambda x: int(os.path.splitext(x)[0])
+ )
+ selected_files = [
+ os.path.join(chapters_dir_sentences, f)
+ for f in sentences_ordered
+ if start <= int(os.path.splitext(f)[0]) <= end
+ ]
+ if not selected_files:
+ print('No audio files found in the specified range.')
+ return False
+ with tempfile.TemporaryDirectory() as tmpdir:
+ chunk_list = []
+ for i in range(0, len(selected_files), batch_size):
+ batch = selected_files[i:i + batch_size]
+ txt = os.path.join(tmpdir, f'chunk_{i:04d}.txt')
+ out = os.path.join(tmpdir, f'chunk_{i:04d}.{default_audio_proc_format}')
+ with open(txt, 'w') as f:
+ for file in batch:
+ f.write(f"file '{file.replace(os.sep, '/')}'\n")
+ chunk_list.append((txt, out))
+ try:
+ with Pool(cpu_count()) as pool:
+ results = pool.starmap(assemble_chunks, chunk_list)
+ except Exception as e:
+ error = f"combine_audio_sentences() multiprocessing error: {e}"
+ print(error)
+ return False
+ if not all(results):
+ error = "combine_audio_sentences() One or more chunks failed."
+ print(error)
+ return False
+ # Final merge
+ final_list = os.path.join(tmpdir, 'sentences_final.txt')
+ with open(final_list, 'w') as f:
+ for _, chunk_path in chunk_list:
+ f.write(f"file '{chunk_path.replace(os.sep, '/')}'\n")
+ if assemble_chunks(final_list, chapter_audio_file):
+ msg = f'********* Combined block audio file saved in {chapter_audio_file}'
+ print(msg)
+ return True
+ else:
+ error = "combine_audio_sentences() Final merge failed."
+ print(error)
+ return False
+ except Exception as e:
+ DependencyError(e)
+ return False
+
+def combine_audio_chapters(id):
+
+ def get_audio_duration(filepath):
+ try:
+ ffprobe_cmd = [
+ shutil.which('ffprobe'),
+ '-v', 'error',
+ '-show_entries', 'format=duration',
+ '-of', 'json',
+ filepath
+ ]
+ result = subprocess.run(ffprobe_cmd, capture_output=True, text=True)
+ try:
+ return float(json.loads(result.stdout)['format']['duration'])
+ except Exception:
+ return 0
+ except subprocess.CalledProcessError as e:
+ DependencyError(e)
+ return 0
+ except Exception as e:
+ error = f"get_audio_duration() Error: Failed to process {txt_file} → {out_file}: {e}"
+ print(error)
+ return 0
+
+ def generate_ffmpeg_metadata(part_chapters, session, output_metadata_path, default_audio_proc_format):
+ try:
+ out_fmt = session['output_format']
+ is_mp4_like = out_fmt in ['mp4', 'm4a', 'm4b', 'mov']
+ is_vorbis = out_fmt in ['ogg', 'webm']
+ is_mp3 = out_fmt == 'mp3'
+ def tag(key):
+ return key.upper() if is_vorbis else key
+ ffmpeg_metadata = ';FFMETADATA1\n'
+ if session['metadata'].get('title'):
+ ffmpeg_metadata += f"{tag('title')}={session['metadata']['title']}\n"
+ if session['metadata'].get('creator'):
+ ffmpeg_metadata += f"{tag('artist')}={session['metadata']['creator']}\n"
+ if session['metadata'].get('language'):
+ ffmpeg_metadata += f"{tag('language')}={session['metadata']['language']}\n"
+ if session['metadata'].get('description'):
+ ffmpeg_metadata += f"{tag('description')}={session['metadata']['description']}\n"
+ if session['metadata'].get('publisher') and (is_mp4_like or is_mp3):
+ ffmpeg_metadata += f"{tag('publisher')}={session['metadata']['publisher']}\n"
+ if session['metadata'].get('published'):
+ try:
+ if '.' in session['metadata']['published']:
+ year = datetime.strptime(session['metadata']['published'], '%Y-%m-%dT%H:%M:%S.%f%z').year
+ else:
+ year = datetime.strptime(session['metadata']['published'], '%Y-%m-%dT%H:%M:%S%z').year
+ except Exception:
+ year = datetime.now().year
+ else:
+ year = datetime.now().year
+ if is_vorbis:
+ ffmpeg_metadata += f"{tag('date')}={year}\n"
+ else:
+ ffmpeg_metadata += f"{tag('year')}={year}\n"
+ if session['metadata'].get('identifiers') and isinstance(session['metadata']['identifiers'], dict):
+ if is_mp3 or is_mp4_like:
+ isbn = session['metadata']['identifiers'].get('isbn')
+ if isbn:
+ ffmpeg_metadata += f"{tag('isbn')}={isbn}\n"
+ asin = session['metadata']['identifiers'].get('mobi-asin')
+ if asin:
+ ffmpeg_metadata += f"{tag('asin')}={asin}\n"
+ start_time = 0
+ for filename, chapter_title in part_chapters:
+ filepath = os.path.join(session['chapters_dir'], filename)
+ duration_ms = len(AudioSegment.from_file(filepath, format=default_audio_proc_format))
+ clean_title = re.sub(r'(^#)|[=\\]|(-$)', lambda m: '\\' + (m.group(1) or m.group(0)), chapter_title.replace(TTS_SML['pause'], ''))
+ ffmpeg_metadata += '[CHAPTER]\nTIMEBASE=1/1000\n'
+ ffmpeg_metadata += f'START={start_time}\nEND={start_time + duration_ms}\n'
+ ffmpeg_metadata += f"{tag('title')}={clean_title}\n"
+ start_time += duration_ms
+ with open(output_metadata_path, 'w', encoding='utf-8') as f:
+ f.write(ffmpeg_metadata)
+ return output_metadata_path
+ except Exception as e:
+ error = f"generate_ffmpeg_metadata() Error: Failed to process {txt_file} → {out_file}: {e}"
+ print(error)
+ return False
+
+ def export_audio(ffmpeg_combined_audio, ffmpeg_metadata_file, ffmpeg_final_file):
+ try:
+ if session['cancellation_requested']:
+ print('Cancel requested')
+ return False
+ cover_path = None
+ ffmpeg_cmd = [shutil.which('ffmpeg'), '-hide_banner', '-nostats', '-i', ffmpeg_combined_audio]
+ if session['output_format'] == 'wav':
+ ffmpeg_cmd += ['-map', '0:a', '-ar', '44100', '-sample_fmt', 's16']
+ elif session['output_format'] == 'aac':
+ ffmpeg_cmd += ['-c:a', 'aac', '-b:a', '192k', '-ar', '44100']
+ elif session['output_format'] == 'flac':
+ ffmpeg_cmd += ['-c:a', 'flac', '-compression_level', '5', '-ar', '44100', '-sample_fmt', 's16']
+ else:
+ ffmpeg_cmd += ['-f', 'ffmetadata', '-i', ffmpeg_metadata_file, '-map', '0:a']
+ if session['output_format'] in ['m4a', 'm4b', 'mp4', 'mov']:
+ ffmpeg_cmd += ['-c:a', 'aac', '-b:a', '192k', '-ar', '44100', '-movflags', '+faststart+use_metadata_tags']
+ elif session['output_format'] == 'mp3':
+ ffmpeg_cmd += ['-c:a', 'libmp3lame', '-b:a', '192k', '-ar', '44100']
+ elif session['output_format'] == 'webm':
+ ffmpeg_cmd += ['-c:a', 'libopus', '-b:a', '192k', '-ar', '48000']
+ elif session['output_format'] == 'ogg':
+ ffmpeg_cmd += ['-c:a', 'libopus', '-compression_level', '0', '-b:a', '192k', '-ar', '48000']
+ ffmpeg_cmd += ['-map_metadata', '1']
+ ffmpeg_cmd += ['-af', 'loudnorm=I=-16:LRA=11:TP=-1.5,afftdn=nf=-70', '-strict', 'experimental', '-threads', '1', '-y', ffmpeg_final_file]
+ process = subprocess.Popen(
+ ffmpeg_cmd,
+ env={},
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ encoding='utf-8',
+ errors='ignore'
+ )
+ for line in process.stdout:
+ print(line, end='')
+ process.wait()
+ if process.returncode == 0:
+ if session['output_format'] in ['mp3', 'm4a', 'm4b', 'mp4']:
+ if session['cover'] is not None:
+ cover_path = session['cover']
+ msg = f'Adding cover {cover_path} into the final audiobook file...'
+ print(msg)
+ if session['output_format'] == 'mp3':
+ from mutagen.mp3 import MP3
+ from mutagen.id3 import ID3, APIC, error
+ audio = MP3(ffmpeg_final_file, ID3=ID3)
+ try:
+ audio.add_tags()
+ except error:
+ pass
+ with open(cover_path, 'rb') as img:
+ audio.tags.add(
+ APIC(
+ encoding=3,
+ mime='image/jpeg',
+ type=3,
+ desc='Cover',
+ data=img.read()
+ )
+ )
+ elif session['output_format'] in ['mp4', 'm4a', 'm4b']:
+ from mutagen.mp4 import MP4, MP4Cover
+ audio = MP4(ffmpeg_final_file)
+ with open(cover_path, 'rb') as f:
+ cover_data = f.read()
+ audio["covr"] = [MP4Cover(cover_data, imageformat=MP4Cover.FORMAT_JPEG)]
+ if audio:
+ audio.save()
+ final_vtt = f"{Path(ffmpeg_final_file).stem}.vtt"
+ proc_vtt_path = os.path.join(session['process_dir'], final_vtt)
+ final_vtt_path = os.path.join(session['audiobooks_dir'], final_vtt)
+ shutil.move(proc_vtt_path, final_vtt_path)
+ return True
+ else:
+ error = process.returncode
+ print(error, ffmpeg_cmd)
+ return False
+ except Exception as e:
+ DependencyError(e)
+ return False
+
+ try:
+ session = context.get_session(id)
+ chapter_files = [f for f in os.listdir(session['chapters_dir']) if f.endswith(f'.{default_audio_proc_format}')]
+ chapter_files = sorted(chapter_files, key=lambda x: int(re.search(r'\d+', x).group()))
+ chapter_titles = [c[0] for c in session['chapters']]
+ if len(chapter_files) == 0:
+ print('No block files exists!')
+ return None
+ # Calculate total duration
+ durations = []
+ for file in chapter_files:
+ filepath = os.path.join(session['chapters_dir'], file)
+ durations.append(get_audio_duration(filepath))
+ total_duration = sum(durations)
+ exported_files = []
+ if session.get('output_split'):
+ part_files = []
+ part_chapter_indices = []
+ cur_part = []
+ cur_indices = []
+ cur_duration = 0
+ max_part_duration = session['output_split_hours'] * 3600
+ needs_split = total_duration > (int(session['output_split_hours']) * 2) * 3600
+ for idx, (file, dur) in enumerate(zip(chapter_files, durations)):
+ if cur_part and (cur_duration + dur > max_part_duration):
+ part_files.append(cur_part)
+ part_chapter_indices.append(cur_indices)
+ cur_part = []
+ cur_indices = []
+ cur_duration = 0
+ cur_part.append(file)
+ cur_indices.append(idx)
+ cur_duration += dur
+ if cur_part:
+ part_files.append(cur_part)
+ part_chapter_indices.append(cur_indices)
+
+ for part_idx, (part_file_list, indices) in enumerate(zip(part_files, part_chapter_indices)):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ batch_size = 1024
+ chunk_list = []
+ for i in range(0, len(part_file_list), batch_size):
+ batch = part_file_list[i:i + batch_size]
+ txt = os.path.join(tmpdir, f'chunk_{i:04d}.txt')
+ out = os.path.join(tmpdir, f'chunk_{i:04d}.{default_audio_proc_format}')
+ with open(txt, 'w') as f:
+ for file in batch:
+ path = os.path.join(session['chapters_dir'], file).replace("\\", "/")
+ f.write(f"file '{path}'\n")
+ chunk_list.append((txt, out))
+ with Pool(cpu_count()) as pool:
+ results = pool.starmap(assemble_chunks, chunk_list)
+ if not all(results):
+ print(f"assemble_segments() One or more chunks failed for part {part_idx+1}.")
+ return None
+ # Final merge for this part
+ combined_chapters_file = os.path.join(
+ session['process_dir'],
+ f"{get_sanitized(session['metadata']['title'])}_part{part_idx+1}.{default_audio_proc_format}" if needs_split else f"{get_sanitized(session['metadata']['title'])}.{default_audio_proc_format}"
+ )
+ final_list = os.path.join(tmpdir, f'part_{part_idx+1:02d}_final.txt')
+ with open(final_list, 'w') as f:
+ for _, chunk_path in chunk_list:
+ f.write(f"file '{chunk_path.replace(os.sep, '/')}'\n")
+ if not assemble_chunks(final_list, combined_chapters_file):
+ print(f"assemble_segments() Final merge failed for part {part_idx+1}.")
+ return None
+
+ metadata_file = os.path.join(session['process_dir'], f'metadata_part{part_idx+1}.txt')
+ part_chapters = [(chapter_files[i], chapter_titles[i]) for i in indices]
+ generate_ffmpeg_metadata(part_chapters, session, metadata_file, default_audio_proc_format)
+
+ final_file = os.path.join(
+ session['audiobooks_dir'],
+ f"{session['final_name'].rsplit('.', 1)[0]}_part{part_idx+1}.{session['output_format']}" if needs_split else session['final_name']
+ )
+ if export_audio(combined_chapters_file, metadata_file, final_file):
+ exported_files.append(final_file)
+ else:
+ with tempfile.TemporaryDirectory() as tmpdir:
+ # 1) build a single ffmpeg file list
+ txt = os.path.join(tmpdir, 'all_chapters.txt')
+ merged_tmp = os.path.join(tmpdir, f'all.{default_audio_proc_format}')
+ with open(txt, 'w') as f:
+ for file in chapter_files:
+ path = os.path.join(session['chapters_dir'], file).replace("\\", "/")
+ f.write(f"file '{path}'\n")
+
+ # 2) merge into one temp file
+ if not assemble_chunks(txt, merged_tmp):
+ print("assemble_segments() Final merge failed.")
+ return None
+
+ # 3) generate metadata for entire book
+ metadata_file = os.path.join(session['process_dir'], 'metadata.txt')
+ all_chapters = list(zip(chapter_files, chapter_titles))
+ generate_ffmpeg_metadata(all_chapters, session, metadata_file, default_audio_proc_format)
+
+ # 4) export in one go
+ final_file = os.path.join(
+ session['audiobooks_dir'],
+ session['final_name']
+ )
+ if export_audio(merged_tmp, metadata_file, final_file):
+ exported_files.append(final_file)
+ return exported_files if exported_files else None
+ except Exception as e:
+ DependencyError(e)
+ return False
+
+def delete_unused_tmp_dirs(web_dir, days, session):
+ dir_array = [
+ tmp_dir,
+ web_dir,
+ os.path.join(models_dir, '__sessions'),
+ os.path.join(voices_dir, '__sessions')
+ ]
+ current_user_dirs = {
+ f"proc-{session['id']}",
+ f"web-{session['id']}",
+ f"voice-{session['id']}",
+ f"model-{session['id']}"
+ }
+ current_time = time.time()
+ threshold_time = current_time - (days * 24 * 60 * 60) # Convert days to seconds
+ for dir_path in dir_array:
+ if os.path.exists(dir_path) and os.path.isdir(dir_path):
+ for dir in os.listdir(dir_path):
+ if dir in current_user_dirs:
+ full_dir_path = os.path.join(dir_path, dir)
+ if os.path.isdir(full_dir_path):
+ try:
+ dir_mtime = os.path.getmtime(full_dir_path)
+ dir_ctime = os.path.getctime(full_dir_path)
+ if dir_mtime < threshold_time and dir_ctime < threshold_time:
+ shutil.rmtree(full_dir_path, ignore_errors=True)
+ msg = f"Deleted expired session: {full_dir_path}"
+ print(msg)
+ except Exception as e:
+ error = f"Error deleting {full_dir_path}: {e}"
+ print(error)
+
+def compare_file_metadata(f1, f2):
+ if os.path.getsize(f1) != os.path.getsize(f2):
+ return False
+ if os.path.getmtime(f1) != os.path.getmtime(f2):
+ return False
+ return True
+
+def get_compatible_tts_engines(language):
+ compatible_engines = [
+ tts for tts in models.keys()
+ if language in language_tts.get(tts, {})
+ ]
+ return compatible_engines
+
+def convert_ebook_batch(args, ctx=None):
+ if isinstance(args['ebook_list'], list):
+ ebook_list = args['ebook_list'][:]
+ for file in ebook_list: # Use a shallow copy
+ if any(file.endswith(ext) for ext in ebook_formats):
+ args['ebook'] = file
+ print(f'Processing eBook file: {os.path.basename(file)}')
+ progress_status, passed = convert_ebook(args, ctx)
+ if passed is False:
+ print(f'Conversion failed: {progress_status}')
+ sys.exit(1)
+ args['ebook_list'].remove(file)
+ reset_ebook_session(args['session'])
+ return progress_status, passed
+ else:
+ print(f'the ebooks source is not a list!')
+ sys.exit(1)
+
+def convert_ebook(args, ctx=None):
+ try:
+ global is_gui_process, context
+ error = None
+ id = None
+ info_session = None
+ if args['language'] is not None:
+ if not os.path.splitext(args['ebook'])[1]:
+ error = f"{args['ebook']} needs a format extension."
+ print(error)
+ return error, false
+ if not os.path.exists(args['ebook']):
+ error = 'File does not exist or Directory empty.'
+ print(error)
+ return error, false
+ try:
+ if len(args['language']) == 2:
+ lang_array = languages.get(part1=args['language'])
+ if lang_array:
+ args['language'] = lang_array.part3
+ args['language_iso1'] = lang_array.part1
+ elif len(args['language']) == 3:
+ lang_array = languages.get(part3=args['language'])
+ if lang_array:
+ args['language'] = lang_array.part3
+ args['language_iso1'] = lang_array.part1
+ else:
+ args['language_iso1'] = None
+ except Exception as e:
+ pass
+
+ if args['language'] not in language_mapping.keys():
+ error = 'The language you provided is not (yet) supported'
+ print(error)
+ return error, false
+
+ if ctx is not None:
+ context = ctx
+
+ is_gui_process = args['is_gui_process']
+ id = args['session'] if args['session'] is not None else str(uuid.uuid4())
+
+ session = context.get_session(id)
+ session['script_mode'] = args['script_mode'] if args['script_mode'] is not None else NATIVE
+ session['ebook'] = args['ebook']
+ session['ebook_list'] = args['ebook_list']
+ session['device'] = args['device']
+ session['language'] = args['language']
+ session['language_iso1'] = args['language_iso1']
+ session['tts_engine'] = args['tts_engine'] if args['tts_engine'] is not None else get_compatible_tts_engines(args['language'])[0]
+ session['custom_model'] = args['custom_model'] if not is_gui_process or args['custom_model'] is None else os.path.join(session['custom_model_dir'], args['custom_model'])
+ session['fine_tuned'] = args['fine_tuned']
+ session['voice'] = args['voice']
+ session['temperature'] = args['temperature']
+ session['length_penalty'] = args['length_penalty']
+ session['num_beams'] = args['num_beams']
+ session['repetition_penalty'] = args['repetition_penalty']
+ session['top_k'] = args['top_k']
+ session['top_p'] = args['top_p']
+ session['speed'] = args['speed']
+ session['enable_text_splitting'] = args['enable_text_splitting']
+ session['text_temp'] = args['text_temp']
+ session['waveform_temp'] = args['waveform_temp']
+ session['audiobooks_dir'] = args['audiobooks_dir']
+ session['output_format'] = args['output_format']
+ session['output_split'] = args['output_split']
+ session['output_split_hours'] = args['output_split_hours'] if args['output_split_hours'] is not None else default_output_split_hours
+
+ info_session = f"\n*********** Session: {id} **************\nStore it in case of interruption, crash, reuse of custom model or custom voice,\nyou can resume the conversion with --session option"
+
+ if not is_gui_process:
+ session['voice_dir'] = os.path.join(voices_dir, '__sessions', f"voice-{session['id']}", session['language'])
+ os.makedirs(session['voice_dir'], exist_ok=True)
+ # As now uploaded voice files are in their respective language folder so check if no wav and bark folder are on the voice_dir root from previous versions
+ [shutil.move(src, os.path.join(session['voice_dir'], os.path.basename(src))) for src in glob(os.path.join(os.path.dirname(session['voice_dir']), '*.wav')) + ([os.path.join(os.path.dirname(session['voice_dir']), 'bark')] if os.path.isdir(os.path.join(os.path.dirname(session['voice_dir']), 'bark')) and not os.path.exists(os.path.join(session['voice_dir'], 'bark')) else [])]
+ session['custom_model_dir'] = os.path.join(models_dir, '__sessions',f"model-{session['id']}")
+ if session['custom_model'] is not None:
+ if not os.path.exists(session['custom_model_dir']):
+ os.makedirs(session['custom_model_dir'], exist_ok=True)
+ src_path = Path(session['custom_model'])
+ src_name = src_path.stem
+ if not os.path.exists(os.path.join(session['custom_model_dir'], src_name)):
+ required_files = models[session['tts_engine']]['internal']['files']
+ if analyze_uploaded_file(session['custom_model'], required_files):
+ model = extract_custom_model(session['custom_model'], session)
+ if model is not None:
+ session['custom_model'] = model
+ else:
+ error = f"{model} could not be extracted or mandatory files are missing"
+ else:
+ error = f'{os.path.basename(f)} is not a valid model or some required files are missing'
+ if session['voice'] is not None:
+ voice_name = get_sanitized(os.path.splitext(os.path.basename(session['voice']))[0])
+ final_voice_file = os.path.join(session['voice_dir'], f'{voice_name}.wav')
+ if not os.path.exists(final_voice_file):
+ extractor = VoiceExtractor(session, session['voice'], voice_name)
+ status, msg = extractor.extract_voice()
+ if status:
+ session['voice'] = final_voice_file
+ else:
+ error = f'VoiceExtractor.extract_voice() failed! {msg}'
+ print(error)
+ if error is None:
+ if session['script_mode'] == NATIVE:
+ bool, e = check_programs('Calibre', 'ebook-convert', '--version')
+ if not bool:
+ error = f'check_programs() Calibre failed: {e}'
+ bool, e = check_programs('FFmpeg', 'ffmpeg', '-version')
+ if not bool:
+ error = f'check_programs() FFMPEG failed: {e}'
+ if error is None:
+ old_session_dir = os.path.join(tmp_dir, f"ebook-{session['id']}")
+ session['session_dir'] = os.path.join(tmp_dir, f"proc-{session['id']}")
+ if os.path.isdir(old_session_dir):
+ os.rename(old_session_dir, session['session_dir'])
+ session['process_dir'] = os.path.join(session['session_dir'], f"{hashlib.md5(session['ebook'].encode()).hexdigest()}")
+ session['chapters_dir'] = os.path.join(session['process_dir'], "chapters")
+ session['chapters_dir_sentences'] = os.path.join(session['chapters_dir'], 'sentences')
+ if prepare_dirs(args['ebook'], session):
+ session['filename_noext'] = os.path.splitext(os.path.basename(session['ebook']))[0]
+ msg = ''
+ msg_extra = ''
+ vram_avail = get_vram()
+ if vram_avail <= 4:
+ msg_extra += 'VRAM capacity could not be detected. -' if vram_avail == 0 else 'VRAM under 4GB - '
+ if session['tts_engine'] == TTS_ENGINES['BARK']:
+ os.environ['SUNO_USE_SMALL_MODELS'] = 'True'
+ msg_extra += f"Switching BARK to SMALL models - "
+ else:
+ if session['tts_engine'] == TTS_ENGINES['BARK']:
+ os.environ['SUNO_USE_SMALL_MODELS'] = 'False'
+ if session['device'] == 'cuda':
+ session['device'] = session['device'] if torch.cuda.is_available() else 'cpu'
+ if session['device'] == 'cpu':
+ msg += f"GPU not recognized by torch! Read {default_gpu_wiki} - Switching to CPU - "
+ elif session['device'] == 'mps':
+ session['device'] = session['device'] if torch.backends.mps.is_available() else 'cpu'
+ if session['device'] == 'cpu':
+ msg += f"MPS not recognized by torch! Read {default_gpu_wiki} - Switching to CPU - "
+ if session['device'] == 'cpu':
+ if session['tts_engine'] == TTS_ENGINES['BARK']:
+ os.environ['SUNO_OFFLOAD_CPU'] = 'True'
+ if default_engine_settings[TTS_ENGINES['XTTSv2']]['use_deepspeed'] == True:
+ try:
+ import deepspeed
+ except:
+ default_engine_settings[TTS_ENGINES['XTTSv2']]['use_deepspeed'] = False
+ msg_extra += 'deepseed not installed or package is broken. set to False - '
+ else:
+ msg_extra += 'deepspeed detected and ready!'
+ if msg == '':
+ msg = f"Using {session['device'].upper()} - "
+ msg += msg_extra
+ if is_gui_process:
+ show_alert({"type": "warning", "msg": msg})
+ print(msg)
+ session['epub_path'] = os.path.join(session['process_dir'], '__' + session['filename_noext'] + '.epub')
+ if convert2epub(id):
+ epubBook = epub.read_epub(session['epub_path'], {'ignore_ncx': True})
+ metadata = dict(session['metadata'])
+ for key, value in metadata.items():
+ data = epubBook.get_metadata('DC', key)
+ if data:
+ for value, attributes in data:
+ metadata[key] = value
+ metadata['language'] = session['language']
+ metadata['title'] = metadata['title'] = metadata['title'] or Path(session['ebook']).stem.replace('_',' ')
+ metadata['creator'] = False if not metadata['creator'] or metadata['creator'] == 'Unknown' else metadata['creator']
+ session['metadata'] = metadata
+ try:
+ if len(session['metadata']['language']) == 2:
+ lang_array = languages.get(part1=session['language'])
+ if lang_array:
+ session['metadata']['language'] = lang_array.part3
+ except Exception as e:
+ pass
+ if session['metadata']['language'] != session['language']:
+ error = f"WARNING!!! language selected {session['language']} differs from the EPUB file language {session['metadata']['language']}"
+ print(error)
+ session['cover'] = get_cover(epubBook, session)
+ if session['cover']:
+ session['toc'], session['chapters'] = get_chapters(epubBook, session)
+ session['final_name'] = get_sanitized(session['metadata']['title'] + '.' + session['output_format'])
+ if session['chapters'] is not None:
+ if convert_chapters2audio(id):
+ msg = 'Conversion successful. Combining sentences and chapters...'
+ show_alert({"type": "info", "msg": msg})
+ exported_files = combine_audio_chapters(id)
+ if exported_files is not None:
+ chapters_dirs = [
+ dir_name for dir_name in os.listdir(session['process_dir'])
+ if fnmatch.fnmatch(dir_name, "chapters_*") and os.path.isdir(os.path.join(session['process_dir'], dir_name))
+ ]
+ shutil.rmtree(os.path.join(session['voice_dir'], 'proc'), ignore_errors=True)
+ if is_gui_process:
+ if len(chapters_dirs) > 1:
+ if os.path.exists(session['chapters_dir']):
+ shutil.rmtree(session['chapters_dir'], ignore_errors=True)
+ if os.path.exists(session['epub_path']):
+ os.remove(session['epub_path'])
+ if os.path.exists(session['cover']):
+ os.remove(session['cover'])
+ else:
+ if os.path.exists(session['process_dir']):
+ shutil.rmtree(session['process_dir'], ignore_errors=True)
+ else:
+ if os.path.exists(session['voice_dir']):
+ if not any(os.scandir(session['voice_dir'])):
+ shutil.rmtree(session['voice_dir'], ignore_errors=True)
+ if os.path.exists(session['custom_model_dir']):
+ if not any(os.scandir(session['custom_model_dir'])):
+ shutil.rmtree(session['custom_model_dir'], ignore_errors=True)
+ if os.path.exists(session['session_dir']):
+ shutil.rmtree(session['session_dir'], ignore_errors=True)
+ progress_status = f'Audiobook(s) {", ".join(os.path.basename(f) for f in exported_files)} created!'
+ session['audiobook'] = exported_files[-1]
+ print(info_session)
+ return progress_status, True
+ else:
+ error = 'combine_audio_chapters() error: exported_files not created!'
+ else:
+ error = 'convert_chapters2audio() failed!'
+ else:
+ error = 'get_chapters() failed!'
+ else:
+ error = 'get_cover() failed!'
+ else:
+ error = 'convert2epub() failed!'
+ else:
+ error = f"Temporary directory {session['process_dir']} not removed due to failure."
+ else:
+ error = f"Language {args['language']} is not supported."
+ if session['cancellation_requested']:
+ error = 'Cancelled'
+ else:
+ if not is_gui_process and id is not None:
+ error += info_session
+ print(error)
+ return error, False
+ except Exception as e:
+ print(f'convert_ebook() Exception: {e}')
+ return e, False
+
+def restore_session_from_data(data, session):
+ try:
+ for key, value in data.items():
+ if key in session: # Check if the key exists in session
+ if isinstance(value, dict) and isinstance(session[key], dict):
+ restore_session_from_data(value, session[key])
+ else:
+ session[key] = value
+ except Exception as e:
+ DependencyError(e)
+
+def reset_ebook_session(id):
+ session = context.get_session(id)
+ data = {
+ "ebook": None,
+ "chapters_dir": None,
+ "chapters_dir_sentences": None,
+ "epub_path": None,
+ "filename_noext": None,
+ "chapters": None,
+ "cover": None,
+ "status": None,
+ "progress": 0,
+ "duration": 0,
+ "playback_time": 0,
+ "cancellation_requested": False,
+ "event": None,
+ "metadata": {
+ "title": None,
+ "creator": None,
+ "contributor": None,
+ "language": None,
+ "identifier": None,
+ "publisher": None,
+ "date": None,
+ "description": None,
+ "subject": None,
+ "rights": None,
+ "format": None,
+ "type": None,
+ "coverage": None,
+ "relation": None,
+ "Source": None,
+ "Modified": None
+ }
+ }
+ restore_session_from_data(data, session)
+
+def get_all_ip_addresses():
+ ip_addresses = []
+ for interface, addresses in psutil.net_if_addrs().items():
+ for address in addresses:
+ if address.family == socket.AF_INET:
+ ip_addresses.append(address.address)
+ elif address.family == socket.AF_INET6:
+ ip_addresses.append(address.address)
+ return ip_addresses
+
+def show_alert(state):
+ if isinstance(state, dict):
+ if state['type'] is not None:
+ if state['type'] == 'error':
+ gr.Error(state['msg'])
+ elif state['type'] == 'warning':
+ gr.Warning(state['msg'])
+ elif state['type'] == 'info':
+ gr.Info(state['msg'])
+ elif state['type'] == 'success':
+ gr.Success(state['msg'])
+
+def web_interface(args, ctx):
+ global context, is_gui_process
+ context = ctx
+ script_mode = args['script_mode']
+ is_gui_process = args['is_gui_process']
+ is_gui_shared = args['share']
+ title = 'Ebook2Audiobook'
+ glass_mask_msg = 'Initialization, please wait...'
+ ebook_src = None
+ language_options = [
+ (
+ f"{details['name']} - {details['native_name']}" if details['name'] != details['native_name'] else details['name'],
+ lang
+ )
+ for lang, details in language_mapping.items()
+ ]
+ voice_options = []
+ tts_engine_options = []
+ custom_model_options = []
+ fine_tuned_options = []
+ audiobook_options = []
+ options_output_split_hours = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12']
+
+ src_label_file = 'Select a File'
+ src_label_dir = 'Select a Directory'
+
+ visible_gr_tab_xtts_params = interface_component_options['gr_tab_xtts_params']
+ visible_gr_tab_bark_params = interface_component_options['gr_tab_bark_params']
+ visible_gr_group_custom_model = interface_component_options['gr_group_custom_model']
+ visible_gr_group_voice_file = interface_component_options['gr_group_voice_file']
+
+ theme = gr.themes.Origin(
+ primary_hue='green',
+ secondary_hue='amber',
+ neutral_hue='gray',
+ radius_size='lg',
+ font_mono=['JetBrains Mono', 'monospace', 'Consolas', 'Menlo', 'Liberation Mono']
+ )
+
+ header_css = '''
+
+ '''
+
+ with gr.Blocks(theme=theme, title=title, css=header_css, delete_cache=(86400, 86400)) as app:
+ with gr.Tabs(elem_id='gr_tabs'):
+ gr_tab_main = gr.TabItem('Main Parameters', elem_id='gr_tab_main', elem_classes='tab_item')
+ with gr_tab_main:
+ with gr.Row(elem_id='gr_row_tab_main'):
+ with gr.Column(elem_id='gr_col_1', scale=3):
+ with gr.Group(elem_id='gr1'):
+ gr_ebook_file = gr.File(label=src_label_file, elem_id='gr_ebook_file', file_types=ebook_formats, file_count='single', allow_reordering=True, height=140)
+ gr_ebook_mode = gr.Radio(label='', elem_id='gr_ebook_mode', choices=[('File','single'), ('Directory','directory')], value='single', interactive=True)
+ with gr.Group(elem_id='gr_group_language'):
+ gr_language = gr.Dropdown(label='Language', elem_id='gr_language', choices=language_options, value=default_language_code, type='value', interactive=True)
+ gr_group_voice_file = gr.Group(elem_id='gr_group_voice_file', visible=visible_gr_group_voice_file)
+ with gr_group_voice_file:
+ gr_voice_file = gr.File(label='*Cloning Voice Audio Fiie', elem_id='gr_voice_file', file_types=voice_formats, value=None, height=140)
+ gr_row_voice_player = gr.Row(elem_id='gr_row_voice_player')
+ with gr_row_voice_player:
+ gr_voice_player = gr.Audio(elem_id='gr_voice_player', type='filepath', interactive=False, show_download_button=False, container=False, visible=False, show_share_button=False, show_label=False, waveform_options=gr.WaveformOptions(show_controls=False), scale=0, min_width=60)
+ gr_voice_list = gr.Dropdown(label='', elem_id='gr_voice_list', choices=voice_options, type='value', interactive=True, scale=2)
+ gr_voice_del_btn = gr.Button('🗑', elem_id='gr_voice_del_btn', elem_classes=['small-btn'], variant='secondary', interactive=True, visible=False, scale=0, min_width=60)
+ gr_optional_markdown = gr.Markdown(elem_id='gr_markdown_optional', value=' * Optional
')
+ with gr.Group(elem_id='gr_group_device'):
+ gr_device = gr.Dropdown(label='Processor Unit', elem_id='gr_device', choices=[('CPU','cpu'), ('GPU','cuda'), ('MPS','mps')], type='value', value=default_device, interactive=True)
+ gr_logo_markdown = gr.Markdown(elem_id='gr_logo_markdown', value=f'''
+
+ '''
+ )
+ with gr.Column(elem_id='gr_col_2', scale=3):
+ with gr.Group(elem_id='gr_group_engine'):
+ gr_tts_engine_list = gr.Dropdown(label='TTS Engine', elem_id='gr_tts_engine_list', choices=tts_engine_options, type='value', interactive=True)
+ gr_tts_rating = gr.HTML()
+ gr_fine_tuned_list = gr.Dropdown(label='Fine Tuned Models (Presets)', elem_id='gr_fine_tuned_list', choices=fine_tuned_options, type='value', interactive=True)
+ gr_group_custom_model = gr.Group(visible=visible_gr_group_custom_model)
+ with gr_group_custom_model:
+ gr_custom_model_file = gr.File(label=f"Upload Fine Tuned Model", elem_id='gr_custom_model_file', value=None, file_types=['.zip'], height=140)
+ with gr.Row(elem_id='gr_row_custom_model'):
+ gr_custom_model_list = gr.Dropdown(label='', elem_id='gr_custom_model_list', choices=custom_model_options, type='value', interactive=True, scale=2)
+ gr_custom_model_del_btn = gr.Button('🗑', elem_id='gr_custom_model_del_btn', elem_classes=['small-btn'], variant='secondary', interactive=True, visible=False, scale=0, min_width=60)
+ gr_custom_model_markdown = gr.Markdown(elem_id='gr_markdown_custom_model', value=' * Optional
')
+ with gr.Group(elem_id='gr_group_output_format'):
+ with gr.Row(elem_id='gr_row_output_format'):
+ gr_output_format_list = gr.Dropdown(label='Output Format', elem_id='gr_output_format_list', choices=output_formats, type='value', value=default_output_format, interactive=True, scale=2)
+ gr_output_split = gr.Checkbox(label='Split Output File', elem_id='gr_output_split', value=default_output_split, interactive=True, scale=1)
+ gr_output_split_hours = gr.Dropdown(label='Max hours / part', elem_id='gr_output_split_hours', choices=options_output_split_hours, type='value', value=default_output_split_hours, interactive=True, visible=False, scale=2)
+ gr_session = gr.Textbox(label='Session', elem_id='gr_session', interactive=False)
+ gr_tab_xtts_params = gr.TabItem('XTTSv2 Fine Tuned Parameters', elem_id='gr_tab_xtts_params', elem_classes='tab_item', visible=visible_gr_tab_xtts_params)
+ with gr_tab_xtts_params:
+ gr.Markdown(
+ elem_id='gr_markdown_tab_xtts_params',
+ value='''
+ ### Customize XTTSv2 Parameters
+ Adjust the settings below to influence how the audio is generated. You can control the creativity, speed, repetition, and more.
+ '''
+ )
+ gr_xtts_temperature = gr.Slider(
+ label='Temperature',
+ minimum=0.05,
+ maximum=10.0,
+ step=0.05,
+ value=float(default_engine_settings[TTS_ENGINES['XTTSv2']]['temperature']),
+ elem_id='gr_xtts_temperature',
+ info='Higher values lead to more creative, unpredictable outputs. Lower values make it more monotone.'
+ )
+ gr_xtts_length_penalty = gr.Slider(
+ label='Length Penalty',
+ minimum=0.3,
+ maximum=5.0,
+ step=0.1,
+ value=float(default_engine_settings[TTS_ENGINES['XTTSv2']]['length_penalty']),
+ elem_id='gr_xtts_length_penalty',
+ info='Adjusts how much longer sequences are preferred. Higher values encourage the model to produce longer and more natural speech.',
+ visible=False
+ )
+ gr_xtts_num_beams = gr.Slider(
+ label='Number Beams',
+ minimum=1,
+ maximum=10,
+ step=1,
+ value=int(default_engine_settings[TTS_ENGINES['XTTSv2']]['num_beams']),
+ elem_id='gr_xtts_num_beams',
+ info='Controls how many alternative sequences the model explores. Higher values improve speech coherence and pronunciation but increase inference time.',
+ visible=False
+ )
+ gr_xtts_repetition_penalty = gr.Slider(
+ label='Repetition Penalty',
+ minimum=1.0,
+ maximum=10.0,
+ step=0.1,
+ value=float(default_engine_settings[TTS_ENGINES['XTTSv2']]['repetition_penalty']),
+ elem_id='gr_xtts_repetition_penalty',
+ info='Penalizes repeated phrases. Higher values reduce repetition.'
+ )
+ gr_xtts_top_k = gr.Slider(
+ label='Top-k Sampling',
+ minimum=10,
+ maximum=100,
+ step=1,
+ value=int(default_engine_settings[TTS_ENGINES['XTTSv2']]['top_k']),
+ elem_id='gr_xtts_top_k',
+ info='Lower values restrict outputs to more likely words and increase speed at which audio generates.'
+ )
+ gr_xtts_top_p = gr.Slider(
+ label='Top-p Sampling',
+ minimum=0.1,
+ maximum=1.0,
+ step=0.01,
+ value=float(default_engine_settings[TTS_ENGINES['XTTSv2']]['top_p']),
+ elem_id='gr_xtts_top_p',
+ info='Controls cumulative probability for word selection. Lower values make the output more predictable and increase speed at which audio generates.'
+ )
+ gr_xtts_speed = gr.Slider(
+ label='Speed',
+ minimum=0.5,
+ maximum=3.0,
+ step=0.1,
+ value=float(default_engine_settings[TTS_ENGINES['XTTSv2']]['speed']),
+ elem_id='gr_xtts_speed',
+ info='Adjusts how fast the narrator will speak.'
+ )
+ gr_xtts_enable_text_splitting = gr.Checkbox(
+ label='Enable Text Splitting',
+ value=default_engine_settings[TTS_ENGINES['XTTSv2']]['enable_text_splitting'],
+ elem_id='gr_xtts_enable_text_splitting',
+ info='Coqui-tts builtin text splitting. Can help against hallucinations bu can also be worse.',
+ visible=False
+ )
+ gr_tab_bark_params = gr.TabItem('BARK fine Tuned Parameters', elem_id='gr_tab_bark_params', elem_classes='tab_item', visible=visible_gr_tab_bark_params)
+ with gr_tab_bark_params:
+ gr.Markdown(
+ elem_id='gr_markdown_tab_bark_params',
+ value='''
+ ### Customize BARK Parameters
+ Adjust the settings below to influence how the audio is generated, emotional and voice behavior random or more conservative
+ '''
+ )
+ gr_bark_text_temp = gr.Slider(
+ label='Text Temperature',
+ minimum=0.0,
+ maximum=1.0,
+ step=0.01,
+ value=float(default_engine_settings[TTS_ENGINES['BARK']]['text_temp']),
+ elem_id='gr_bark_text_temp',
+ info='Higher values lead to more creative, unpredictable outputs. Lower values make it more conservative.'
+ )
+ gr_bark_waveform_temp = gr.Slider(
+ label='Waveform Temperature',
+ minimum=0.0,
+ maximum=1.0,
+ step=0.01,
+ value=float(default_engine_settings[TTS_ENGINES['BARK']]['waveform_temp']),
+ elem_id='gr_bark_waveform_temp',
+ info='Higher values lead to more creative, unpredictable outputs. Lower values make it more conservative.'
+ )
+ gr_state_update = gr.State(value={"hash": None})
+ gr_read_data = gr.JSON(visible=False, elem_id='gr_read_data')
+ gr_write_data = gr.JSON(visible=False, elem_id='gr_write_data')
+ gr_tab_progress = gr.Textbox(elem_id='gr_tab_progress', label='Progress', interactive=False)
+ gr_group_audiobook_list = gr.Group(elem_id='gr_group_audiobook_list', visible=False)
+ with gr_group_audiobook_list:
+ gr_audiobook_vtt = gr.Textbox(elem_id='gr_audiobook_vtt', label='', interactive=False, visible=False)
+ gr_audiobook_sentence = gr.Textbox(elem_id='gr_audiobook_sentence', label='Audiobook', value='...', interactive=False, visible=True, lines=3, max_lines=3)
+ gr_audiobook_player = gr.Audio(elem_id='gr_audiobook_player', label='',type='filepath', autoplay=False, waveform_options=gr.WaveformOptions(show_recording_waveform=False), show_download_button=False, show_share_button=False, container=True, interactive=False, visible=True)
+ gr_audiobook_player_playback_time = gr.Number(label='', interactive=False, visible=True, elem_id="gr_audiobook_player_playback_time", value=0.0)
+ with gr.Row(elem_id='gr_row_audiobook_list'):
+ gr_audiobook_download_btn = gr.DownloadButton(elem_id='gr_audiobook_download_btn', label='↧', elem_classes=['small-btn'], variant='secondary', interactive=True, visible=True, scale=0, min_width=60)
+ gr_audiobook_list = gr.Dropdown(elem_id='gr_audiobook_list', label='', choices=audiobook_options, type='value', interactive=True, visible=True, scale=2)
+ gr_audiobook_del_btn = gr.Button(elem_id='gr_audiobook_del_btn', value='🗑', elem_classes=['small-btn'], variant='secondary', interactive=True, visible=True, scale=0, min_width=60)
+ gr_convert_btn = gr.Button(elem_id='gr_convert_btn', value='📚', elem_classes='icon-btn', variant='primary', interactive=False)
+
+ gr_modal = gr.HTML(visible=False)
+ gr_glass_mask = gr.HTML(f'{glass_mask_msg}
')
+ gr_confirm_field_hidden = gr.Textbox(elem_id='confirm_hidden', visible=False)
+ gr_confirm_yes_btn = gr.Button(elem_id='confirm_yes_btn', value='', visible=False)
+ gr_confirm_no_btn = gr.Button(elem_id='confirm_no_btn', value='', visible=False)
+
+ def cleanup_session(req: gr.Request):
+ socket_hash = req.session_hash
+ if any(socket_hash in session for session in context.sessions.values()):
+ session_id = context.find_id_by_hash(socket_hash)
+ ctx_tracker.end_session(session_id, socket_hash)
+
+ def load_vtt_data(path):
+ if not path or not os.path.exists(path):
+ return None
+ try:
+ vtt_path = Path(path).with_suffix('.vtt')
+ if not os.path.exists(vtt_path):
+ return None
+ with open(vtt_path, "r", encoding="utf-8-sig", errors="replace") as f:
+ content = f.read()
+ return content
+ except Exception:
+ return None
+
+ def show_modal(type, msg):
+ return f'''
+
+
+
+
{msg}
+ {show_confirm() if type == 'confirm' else '
'}
+
+
+ '''
+
+ def show_confirm():
+ return '''
+
+
+
+
+ '''
+
+ def show_rating(tts_engine):
+
+ def yellow_stars(n):
+ return "".join(
+ "★" for _ in range(n)
+ )
+
+ def color_box(value):
+ if value <= 4:
+ color = "#4CAF50" # Green = low
+ elif value <= 8:
+ color = "#FF9800" # Orange = medium
+ else:
+ color = "#F44336" # Red = high
+ return f"{value} GB"
+
+ rating = default_engine_settings[tts_engine]['rating']
+
+ return f"""
+
+ GPU VRAM: {color_box(rating["GPU VRAM"])}
+ CPU: {yellow_stars(rating["CPU"])}
+ RAM: {color_box(rating["RAM"])}
+ Realism: {yellow_stars(rating["Realism"])}
+
+ """
+
+ def alert_exception(error):
+ gr.Error(error)
+ DependencyError(error)
+
+ def restore_interface(id, req: gr.Request):
+ try:
+ session = context.get_session(id)
+ socket_hash = req.session_hash
+ if not session.get(socket_hash):
+ outputs = tuple([gr.update() for _ in range(24)])
+ return outputs
+ session = context.get_session(id)
+ ebook_data = None
+ file_count = session['ebook_mode']
+ if isinstance(session['ebook_list'], list) and file_count == 'directory':
+ #ebook_data = session['ebook_list']
+ ebook_data = None
+ elif isinstance(session['ebook'], str) and file_count == 'single':
+ ebook_data = session['ebook']
+ else:
+ ebook_data = None
+ ### XTTSv2 Params
+ session['temperature'] = session['temperature'] if session['temperature'] else default_engine_settings[TTS_ENGINES['XTTSv2']]['temperature']
+ session['length_penalty'] = default_engine_settings[TTS_ENGINES['XTTSv2']]['length_penalty']
+ session['num_beams'] = default_engine_settings[TTS_ENGINES['XTTSv2']]['num_beams']
+ session['repetition_penalty'] = session['repetition_penalty'] if session['repetition_penalty'] else default_engine_settings[TTS_ENGINES['XTTSv2']]['repetition_penalty']
+ session['top_k'] = session['top_k'] if session['top_k'] else default_engine_settings[TTS_ENGINES['XTTSv2']]['top_k']
+ session['top_p'] = session['top_p'] if session['top_p'] else default_engine_settings[TTS_ENGINES['XTTSv2']]['top_p']
+ session['speed'] = session['speed'] if session['speed'] else default_engine_settings[TTS_ENGINES['XTTSv2']]['speed']
+ session['enable_text_splitting'] = default_engine_settings[TTS_ENGINES['XTTSv2']]['enable_text_splitting']
+ ### BARK Params
+ session['text_temp'] = session['text_temp'] if session['text_temp'] else default_engine_settings[TTS_ENGINES['BARK']]['text_temp']
+ session['waveform_temp'] = session['waveform_temp'] if session['waveform_temp'] else default_engine_settings[TTS_ENGINES['BARK']]['waveform_temp']
+ return (
+ gr.update(value=ebook_data), gr.update(value=session['ebook_mode']), gr.update(value=session['device']),
+ gr.update(value=session['language']), update_gr_tts_engine_list(id), update_gr_custom_model_list(id),
+ update_gr_fine_tuned_list(id), gr.update(value=session['output_format']), update_gr_audiobook_list(id), gr.update(value=load_vtt_data(session['audiobook'])),
+ gr.update(value=float(session['temperature'])), gr.update(value=float(session['length_penalty'])), gr.update(value=int(session['num_beams'])),
+ gr.update(value=float(session['repetition_penalty'])), gr.update(value=int(session['top_k'])), gr.update(value=float(session['top_p'])), gr.update(value=float(session['speed'])),
+ gr.update(value=bool(session['enable_text_splitting'])), gr.update(value=float(session['text_temp'])), gr.update(value=float(session['waveform_temp'])), update_gr_voice_list(id),
+ gr.update(value=session['output_split']), gr.update(value=session['output_split_hours']), gr.update(active=True)
+ )
+ except Exception as e:
+ error = f'restore_interface(): {e}'
+ alert_exception(error)
+ outputs = tuple([gr.update() for _ in range(24)])
+ return outputs
+
+ def refresh_interface(id):
+ session = context.get_session(id)
+ return (
+ gr.update(interactive=False), gr.update(value=None), update_gr_audiobook_list(id),
+ gr.update(value=session['audiobook']), gr.update(visible=False), update_gr_voice_list(id)
+ )
+
+ def change_gr_audiobook_list(selected, id):
+ session = context.get_session(id)
+ session['audiobook'] = selected
+ if selected is not None:
+ audio_info = mediainfo(selected)
+ session['duration'] = float(audio_info['duration'])
+ visible = True if len(audiobook_options) else False
+ return gr.update(value=selected), gr.update(value=selected), gr.update(value=load_vtt_data(selected)), gr.update(visible=visible)
+
+ def update_gr_glass_mask(str=glass_mask_msg, attr=''):
+ return gr.update(value=f'{str}
')
+
+ def state_convert_btn(upload_file=None, upload_file_mode=None, custom_model_file=None, session=None):
+ try:
+ if session is None:
+ return gr.update(variant='primary', interactive=False)
+ else:
+ if hasattr(upload_file, 'name') and not hasattr(custom_model_file, 'name'):
+ return gr.update(variant='primary', interactive=True)
+ elif isinstance(upload_file, list) and len(upload_file) > 0 and upload_file_mode == 'directory' and not hasattr(custom_model_file, 'name'):
+ return gr.update(variant='primary', interactive=True)
+ else:
+ return gr.update(variant='primary', interactive=False)
+ except Exception as e:
+ error = f'state_convert_btn(): {e}'
+ alert_exception(error)
+
+ def disable_components():
+ outputs = tuple([gr.update(interactive=False) for _ in range(9)])
+ return outputs
+
+ def enable_components():
+ outputs = tuple([gr.update(interactive=True) for _ in range(9)])
+ return outputs
+
+ def change_gr_ebook_file(data, id):
+ try:
+ session = context.get_session(id)
+ session['ebook'] = None
+ session['ebook_list'] = None
+ if data is None:
+ if session['status'] == 'converting':
+ session['cancellation_requested'] = True
+ msg = 'Cancellation requested, please wait...'
+ yield gr.update(value=show_modal('wait', msg),visible=True)
+ return
+ if isinstance(data, list):
+ session['ebook_list'] = data
+ else:
+ session['ebook'] = data
+ session['cancellation_requested'] = False
+ except Exception as e:
+ error = f'change_gr_ebook_file(): {e}'
+ alert_exception(error)
+ return gr.update(visible=False)
+
+ def change_gr_ebook_mode(val, id):
+ session = context.get_session(id)
+ session['ebook_mode'] = val
+ if val == 'single':
+ return gr.update(label=src_label_file, value=None, file_count='single')
+ else:
+ return gr.update(label=src_label_dir, value=None, file_count='directory')
+
+ def change_gr_voice_file(f, id):
+ if f is not None:
+ state = {}
+ if len(voice_options) > max_custom_voices:
+ error = f'You are allowed to upload a max of {max_custom_voices} voices'
+ state['type'] = 'warning'
+ state['msg'] = error
+ elif os.path.splitext(f.name)[1] not in voice_formats:
+ error = f'The audio file format selected is not valid.'
+ state['type'] = 'warning'
+ state['msg'] = error
+ else:
+ session = context.get_session(id)
+ voice_name = os.path.splitext(os.path.basename(f))[0].replace('&', 'And')
+ voice_name = get_sanitized(voice_name)
+ final_voice_file = os.path.join(session['voice_dir'], f'{voice_name}.wav')
+ extractor = VoiceExtractor(session, f, voice_name)
+ status, msg = extractor.extract_voice()
+ if status:
+ session['voice'] = final_voice_file
+ msg = f"Voice {voice_name} added to the voices list"
+ state['type'] = 'success'
+ state['msg'] = msg
+ else:
+ error = 'failed! Check if you audio file is compatible.'
+ state['type'] = 'warning'
+ state['msg'] = error
+ show_alert(state)
+ return gr.update(value=None)
+ return gr.update()
+
+ def change_gr_voice_list(selected, id):
+ session = context.get_session(id)
+ session['voice'] = next((value for label, value in voice_options if value == selected), None)
+ visible = True if session['voice'] is not None else False
+ min_width = 60 if session['voice'] is not None else 0
+ return gr.update(value=session['voice'], visible=visible, min_width=min_width), gr.update(visible=visible)
+
+ def click_gr_voice_del_btn(selected, id):
+ try:
+ if selected is not None:
+ session = context.get_session(id)
+ speaker_path = os.path.abspath(selected)
+ speaker = re.sub(r'\.wav$|\.npz$', '', os.path.basename(selected))
+ builtin_root = os.path.join(voices_dir, session['language'])
+ sessions_root = os.path.join(voices_dir, '__sessions')
+ is_in_sessions = os.path.commonpath([speaker_path, os.path.abspath(sessions_root)]) == os.path.abspath(sessions_root)
+ is_in_builtin = os.path.commonpath([speaker_path, os.path.abspath(builtin_root)]) == os.path.abspath(builtin_root)
+ # Check if voice is built-in
+ is_builtin = any(
+ speaker in settings.get('voices', {})
+ for settings in (default_engine_settings[engine] for engine in TTS_ENGINES.values())
+ )
+ if is_builtin and is_in_builtin:
+ error = f'Voice file {speaker} is a builtin voice and cannot be deleted.'
+ show_alert({"type": "warning", "msg": error})
+ return gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
+ try:
+ selected_path = Path(selected).resolve()
+ parent_path = Path(session['voice_dir']).parent.resolve()
+ if parent_path in selected_path.parents:
+ msg = f'Are you sure to delete {speaker}...'
+ return (
+ gr.update(value='confirm_voice_del'),
+ gr.update(value=show_modal('confirm', msg), visible=True),
+ gr.update(visible=True),
+ gr.update(visible=True)
+ )
+ else:
+ error = f'{speaker} is part of the global voices directory. Only your own custom uploaded voices can be deleted!'
+ show_alert({"type": "warning", "msg": error})
+ return gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
+ except Exception as e:
+ error = f'Could not delete the voice file {selected}!\n{e}'
+ alert_exception(error)
+ return gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
+ # Fallback/default return if not selected or after errors
+ return gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
+ except Exception as e:
+ error = f'click_gr_voice_del_btn(): {e}'
+ alert_exception(error)
+ return gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
+
+ def click_gr_custom_model_del_btn(selected, id):
+ try:
+ if selected is not None:
+ session = context.get_session(id)
+ selected_name = os.path.basename(selected)
+ msg = f'Are you sure to delete {selected_name}...'
+ return gr.update(value='confirm_custom_model_del'), gr.update(value=show_modal('confirm', msg),visible=True), gr.update(visible=True), gr.update(visible=True)
+ except Exception as e:
+ error = f'Could not delete the custom model {selected_name}!'
+ alert_exception(error)
+ return gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
+
+ def click_gr_audiobook_del_btn(selected, id):
+ try:
+ if selected is not None:
+ session = context.get_session(id)
+ selected_name = Path(selected).stem
+ msg = f'Are you sure to delete {selected_name}...'
+ return gr.update(value='confirm_audiobook_del'), gr.update(value=show_modal('confirm', msg),visible=True), gr.update(visible=True), gr.update(visible=True)
+ except Exception as e:
+ error = f'Could not delete the audiobook {selected_name}!'
+ alert_exception(error)
+ return gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
+
+ def confirm_deletion(voice_path, custom_model, audiobook, id, method=None):
+ try:
+ if method is not None:
+ session = context.get_session(id)
+ if method == 'confirm_voice_del':
+ selected_name = Path(voice_path).stem
+ pattern = re.sub(r'\.wav$', '*.wav', voice_path)
+ files2remove = glob(pattern)
+ for file in files2remove:
+ os.remove(file)
+ shutil.rmtree(os.path.join(os.path.dirname(voice_path), 'bark', selected_name), ignore_errors=True)
+ msg = f"Voice file {re.sub(r'.wav$', '', selected_name)} deleted!"
+ session['voice'] = None
+ show_alert({"type": "warning", "msg": msg})
+ return gr.update(), gr.update(), gr.update(visible=False), update_gr_voice_list(id), gr.update(visible=False), gr.update(visible=False)
+ elif method == 'confirm_custom_model_del':
+ selected_name = os.path.basename(custom_model)
+ shutil.rmtree(custom_model, ignore_errors=True)
+ msg = f'Custom model {selected_name} deleted!'
+ session['custom_model'] = None
+ show_alert({"type": "warning", "msg": msg})
+ return update_gr_custom_model_list(id), gr.update(), gr.update(visible=False), gr.update(), gr.update(visible=False), gr.update(visible=False)
+ elif method == 'confirm_audiobook_del':
+ selected_name = Path(audiobook).stem
+ if os.path.isdir(audiobook):
+ shutil.rmtree(selected, ignore_errors=True)
+ elif os.path.exists(audiobook):
+ os.remove(audiobook)
+ vtt_path = Path(audiobook).with_suffix('.vtt')
+ if os.path.exists(vtt_path):
+ os.remove(vtt_path)
+ msg = f'Audiobook {selected_name} deleted!'
+ session['audiobook'] = None
+ show_alert({"type": "warning", "msg": msg})
+ return gr.update(), update_gr_audiobook_list(id), gr.update(visible=False), gr.update(), gr.update(visible=False), gr.update(visible=False)
+ return gr.update(), gr.update(), gr.update(visible=False), gr.update(), gr.update(visible=False), gr.update(visible=False)
+ except Exception as e:
+ error = f'confirm_deletion(): {e}!'
+ alert_exception(error)
+ return gr.update(), gr.update(), gr.update(visible=False), gr.update(), gr.update(visible=False), gr.update(visible=False)
+
+ def prepare_audiobook_download(selected):
+ if os.path.exists(selected):
+ return selected
+ return None
+
+ def update_gr_voice_list(id):
+ try:
+ nonlocal voice_options
+ session = context.get_session(id)
+ lang_dir = session['language'] if session['language'] != 'con' else 'con-' # Bypass Windows CON reserved name
+ file_pattern = "*.wav"
+ eng_options = []
+ bark_options = []
+ builtin_options = [
+ (os.path.splitext(f.name)[0], str(f))
+ for f in Path(os.path.join(voices_dir, lang_dir)).rglob(file_pattern)
+ ]
+ if session['language'] in language_tts[TTS_ENGINES['XTTSv2']]:
+ builtin_names = {t[0]: None for t in builtin_options}
+ eng_dir = Path(os.path.join(voices_dir, "eng"))
+ eng_options = [
+ (base, str(f))
+ for f in eng_dir.rglob(file_pattern)
+ for base in [os.path.splitext(f.name)[0]]
+ if base not in builtin_names
+ ]
+ if session['tts_engine'] == TTS_ENGINES['BARK']:
+ lang_array = languages.get(part3=session['language'])
+ if lang_array:
+ lang_iso1 = lang_array.part1
+ lang = lang_iso1.lower()
+ speakers_path = Path(default_engine_settings[TTS_ENGINES['BARK']]['speakers_path'])
+ pattern_speaker = re.compile(r"^.*?_speaker_(\d+)$")
+ bark_options = [
+ (pattern_speaker.sub(r"Speaker \1", f.stem), str(f.with_suffix(".wav")))
+ for f in speakers_path.rglob(f"{lang}_speaker_*.npz")
+ ]
+ voice_options = builtin_options + eng_options + bark_options
+ session['voice_dir'] = os.path.join(voices_dir, '__sessions', f"voice-{session['id']}", session['language'])
+ os.makedirs(session['voice_dir'], exist_ok=True)
+ if session['voice_dir'] is not None:
+ parent_dir = Path(session['voice_dir']).parent
+ voice_options += [
+ (os.path.splitext(f.name)[0], str(f))
+ for f in parent_dir.rglob(file_pattern)
+ if f.is_file()
+ ]
+ if session['tts_engine'] in [TTS_ENGINES['VITS'], TTS_ENGINES['FAIRSEQ'], TTS_ENGINES['TACOTRON2'], TTS_ENGINES['YOURTTS']]:
+ voice_options = [('Default', None)] + sorted(voice_options, key=lambda x: x[0].lower())
+ else:
+ voice_options = sorted(voice_options, key=lambda x: x[0].lower())
+ default_voice_path = models[session['tts_engine']][session['fine_tuned']]['voice']
+ if session['voice'] is None:
+ if voice_options[0][1] is not None:
+ default_name = Path(default_voice_path).stem
+ for name, value in voice_options:
+ if name == default_name:
+ session['voice'] = value
+ break
+ else:
+ values = [v for _, v in voice_options]
+ if default_voice_path in values:
+ session['voice'] = default_voice_path
+ else:
+ session['voice'] = voice_options[0][1]
+ else:
+ current_voice_name = Path(session['voice']).stem
+ current_voice_path = next(
+ (path for name, path in voice_options if name == current_voice_name and path == session['voice']), False
+ )
+ if current_voice_path:
+ session['voice'] = current_voice_path
+ else:
+ session['voice'] = default_voice_path
+ return gr.update(choices=voice_options, value=session['voice'])
+ except Exception as e:
+ error = f'update_gr_voice_list(): {e}!'
+ alert_exception(error)
+ return gr.update()
+
+ def update_gr_tts_engine_list(id):
+ try:
+ nonlocal tts_engine_options
+ session = context.get_session(id)
+ tts_engine_options = get_compatible_tts_engines(session['language'])
+ session['tts_engine'] = session['tts_engine'] if session['tts_engine'] in tts_engine_options else tts_engine_options[0]
+ return gr.update(choices=tts_engine_options, value=session['tts_engine'])
+ except Exception as e:
+ error = f'update_gr_tts_engine_list(): {e}!'
+ alert_exception(error)
+ return gr.update()
+
+ def update_gr_custom_model_list(id):
+ try:
+ nonlocal custom_model_options
+ session = context.get_session(id)
+ custom_model_tts_dir = check_custom_model_tts(session['custom_model_dir'], session['tts_engine'])
+ custom_model_options = [('None', None)] + [
+ (
+ str(dir),
+ os.path.join(custom_model_tts_dir, dir)
+ )
+ for dir in os.listdir(custom_model_tts_dir)
+ if os.path.isdir(os.path.join(custom_model_tts_dir, dir))
+ ]
+ session['custom_model'] = session['custom_model'] if session['custom_model'] in [option[1] for option in custom_model_options] else custom_model_options[0][1]
+ return gr.update(choices=custom_model_options, value=session['custom_model'])
+ except Exception as e:
+ error = f'update_gr_custom_model_list(): {e}!'
+ alert_exception(error)
+ return gr.update()
+
+ def update_gr_fine_tuned_list(id):
+ try:
+ nonlocal fine_tuned_options
+ session = context.get_session(id)
+ fine_tuned_options = [
+ name for name, details in models.get(session['tts_engine'],{}).items()
+ if details.get('lang') == 'multi' or details.get('lang') == session['language']
+ ]
+ session['fine_tuned'] = session['fine_tuned'] if session['fine_tuned'] in fine_tuned_options else default_fine_tuned
+ return gr.update(choices=fine_tuned_options, value=session['fine_tuned'])
+ except Exception as e:
+ error = f'update_gr_fine_tuned_list(): {e}!'
+ alert_exception(error)
+ return gr.update()
+
+ def change_gr_device(device, id):
+ session = context.get_session(id)
+ session['device'] = device
+
+ def change_gr_language(selected, id):
+ if selected:
+ session = context.get_session(id)
+ prev = session['language']
+ session['language'] = selected
+ return[
+ gr.update(value=session['language']),
+ update_gr_tts_engine_list(id),
+ update_gr_custom_model_list(id),
+ update_gr_fine_tuned_list(id)
+ ]
+ return (gr.update(), gr.update(), gr.update(), gr.update())
+
+ def check_custom_model_tts(custom_model_dir, tts_engine):
+ dir_path = None
+ if custom_model_dir is not None and tts_engine is not None:
+ dir_path = os.path.join(custom_model_dir, tts_engine)
+ if not os.path.isdir(dir_path):
+ os.makedirs(dir_path, exist_ok=True)
+ return dir_path
+
+ def change_gr_custom_model_file(f, t, id):
+ if f is not None:
+ state = {}
+ try:
+ if len(custom_model_options) > max_custom_model:
+ error = f'You are allowed to upload a max of {max_custom_models} models'
+ state['type'] = 'warning'
+ state['msg'] = error
+ else:
+ session = context.get_session(id)
+ session['tts_engine'] = t
+ required_files = models[session['tts_engine']]['internal']['files']
+ if analyze_uploaded_file(f, required_files):
+ model = extract_custom_model(f, session)
+ if model is None:
+ error = f'Cannot extract custom model zip file {os.path.basename(f)}'
+ state['type'] = 'warning'
+ state['msg'] = error
+ else:
+ session['custom_model'] = model
+ msg = f'{os.path.basename(model)} added to the custom models list'
+ state['type'] = 'success'
+ state['msg'] = msg
+ else:
+ error = f'{os.path.basename(f)} is not a valid model or some required files are missing'
+ state['type'] = 'warning'
+ state['msg'] = error
+ except ClientDisconnect:
+ error = 'Client disconnected during upload. Operation aborted.'
+ state['type'] = 'error'
+ state['msg'] = error
+ except Exception as e:
+ error = f'change_gr_custom_model_file() exception: {str(e)}'
+ state['type'] = 'error'
+ state['msg'] = error
+ show_alert(state)
+ return gr.update(value=None)
+ return gr.update()
+
+ def change_gr_tts_engine_list(engine, id):
+ session = context.get_session(id)
+ session['tts_engine'] = engine
+ default_voice_path = models[session['tts_engine']][session['fine_tuned']]['voice']
+ if default_voice_path is None:
+ session['voice'] = default_voice_path
+ bark_visible = False
+ if session['tts_engine'] == TTS_ENGINES['XTTSv2']:
+ visible_custom_model = True
+ if session['fine_tuned'] != 'internal':
+ visible_custom_model = False
+ return (
+ gr.update(value=show_rating(session['tts_engine'])),
+ gr.update(visible=visible_gr_tab_xtts_params), gr.update(visible=False), gr.update(visible=visible_custom_model), update_gr_fine_tuned_list(id),
+ gr.update(label=f"*Upload {session['tts_engine']} Model (Should be a ZIP file with {', '.join(models[session['tts_engine']][default_fine_tuned]['files'])})"),
+ gr.update(label=f"My {session['tts_engine']} custom models")
+ )
+ else:
+ if session['tts_engine'] == TTS_ENGINES['BARK']:
+ bark_visible = visible_gr_tab_bark_params
+ return (
+ gr.update(value=show_rating(session['tts_engine'])), gr.update(visible=False), gr.update(visible=bark_visible),
+ gr.update(visible=False), update_gr_fine_tuned_list(id), gr.update(label=f"*Upload Fine Tuned Model not available for {session['tts_engine']}"), gr.update(label='')
+ )
+
+ def change_gr_fine_tuned_list(selected, id):
+ if selected:
+ session = context.get_session(id)
+ visible = False
+ if session['tts_engine'] == TTS_ENGINES['XTTSv2']:
+ if selected == 'internal':
+ visible = visible_gr_group_custom_model
+ session['fine_tuned'] = selected
+ return gr.update(visible=visible)
+ return gr.update()
+
+ def change_gr_custom_model_list(selected, id):
+ session = context.get_session(id)
+ session['custom_model'] = next((value for label, value in custom_model_options if value == selected), None)
+ visible = True if session['custom_model'] is not None else False
+ return gr.update(visible=not visible), gr.update(visible=visible)
+
+ def change_gr_output_format_list(val, id):
+ session = context.get_session(id)
+ session['output_format'] = val
+ return
+
+ def change_gr_output_split(bool, id):
+ session = context.get_session(id)
+ session['output_split'] = bool
+ return gr.update(visible=bool)
+
+ def change_gr_output_split_hours(selected, id):
+ session = context.get_session(id)
+ session['output_split_hours'] = selected
+ return
+
+ def change_gr_audiobook_player_playback_time(str, id):
+ session = context.get_session(id)
+ session['playback_time'] = float(str)
+ return
+
+ def change_param(key, val, id, val2=None):
+ session = context.get_session(id)
+ session[key] = val
+ state = {}
+ if key == 'length_penalty':
+ if val2 is not None:
+ if float(val) > float(val2):
+ error = 'Length penalty must be always lower than num beams if greater than 1.0 or equal if 1.0'
+ state['type'] = 'warning'
+ state['msg'] = error
+ show_alert(state)
+ elif key == 'num_beams':
+ if val2 is not None:
+ if float(val) < float(val2):
+ error = 'Num beams must be always higher than length penalty or equal if its value is 1.0'
+ state['type'] = 'warning'
+ state['msg'] = error
+ show_alert(state)
+ return
+
+ def submit_convert_btn(
+ id, device, ebook_file, tts_engine, language, voice, custom_model, fine_tuned, output_format, temperature,
+ length_penalty, num_beams, repetition_penalty, top_k, top_p, speed, enable_text_splitting, text_temp, waveform_temp,
+ output_split, output_split_hours
+ ):
+ try:
+ session = context.get_session(id)
+ args = {
+ "is_gui_process": is_gui_process,
+ "session": id,
+ "script_mode": script_mode,
+ "device": device.lower(),
+ "tts_engine": tts_engine,
+ "ebook": ebook_file if isinstance(ebook_file, str) else None,
+ "ebook_list": ebook_file if isinstance(ebook_file, list) else None,
+ "audiobooks_dir": session['audiobooks_dir'],
+ "voice": voice,
+ "language": language,
+ "custom_model": custom_model,
+ "fine_tuned": fine_tuned,
+ "output_format": output_format,
+ "temperature": float(temperature),
+ "length_penalty": float(length_penalty),
+ "num_beams": session['num_beams'],
+ "repetition_penalty": float(repetition_penalty),
+ "top_k": int(top_k),
+ "top_p": float(top_p),
+ "speed": float(speed),
+ "enable_text_splitting": enable_text_splitting,
+ "text_temp": float(text_temp),
+ "waveform_temp": float(waveform_temp),
+ "output_split": output_split,
+ "output_split_hours": output_split_hours
+ }
+ error = None
+ if args['ebook'] is None and args['ebook_list'] is None:
+ error = 'Error: a file or directory is required.'
+ show_alert({"type": "warning", "msg": error})
+ elif args['num_beams'] < args['length_penalty']:
+ error = 'Error: num beams must be greater or equal than length penalty.'
+ show_alert({"type": "warning", "msg": error})
+ else:
+ session['status'] = 'converting'
+ session['progress'] = len(audiobook_options)
+ if isinstance(args['ebook_list'], list):
+ ebook_list = args['ebook_list'][:]
+ for file in ebook_list:
+ if any(file.endswith(ext) for ext in ebook_formats):
+ print(f'Processing eBook file: {os.path.basename(file)}')
+ args['ebook'] = file
+ progress_status, passed = convert_ebook(args)
+ if passed is False:
+ if session['status'] == 'converting':
+ error = 'Conversion cancelled.'
+ break
+ else:
+ error = 'Conversion failed.'
+ break
+ else:
+ show_alert({"type": "success", "msg": progress_status})
+ args['ebook_list'].remove(file)
+ reset_ebook_session(args['session'])
+ count_file = len(args['ebook_list'])
+ if count_file > 0:
+ msg = f"{len(args['ebook_list'])} remaining..."
+ else:
+ msg = 'Conversion successful!'
+ yield gr.update(value=msg)
+ session['status'] = 'ready'
+ else:
+ print(f"Processing eBook file: {os.path.basename(args['ebook'])}")
+ progress_status, passed = convert_ebook(args)
+ if passed is False:
+ if session['status'] == 'converting':
+ error = 'Conversion cancelled.'
+ else:
+ error = 'Conversion failed.'
+ session['status'] = 'ready'
+ else:
+ show_alert({"type": "success", "msg": progress_status})
+ reset_ebook_session(args['session'])
+ msg = 'Conversion successful!'
+ return gr.update(value=msg)
+ if error is not None:
+ show_alert({"type": "warning", "msg": error})
+ except Exception as e:
+ error = f'submit_convert_btn(): {e}'
+ alert_exception(error)
+ return gr.update(value='')
+
+ def update_gr_audiobook_list(id):
+ try:
+ nonlocal audiobook_options
+ session = context.get_session(id)
+ audiobook_options = [
+ (f, os.path.join(session['audiobooks_dir'], str(f)))
+ for f in os.listdir(session['audiobooks_dir'])
+ if not f.lower().endswith(".vtt") # exclude VTT files
+ ]
+ audiobook_options.sort(
+ key=lambda x: os.path.getmtime(x[1]),
+ reverse=True
+ )
+ session['audiobook'] = (
+ session['audiobook']
+ if session['audiobook'] in [option[1] for option in audiobook_options]
+ else None
+ )
+ if len(audiobook_options) > 0:
+ if session['audiobook'] is not None:
+ return gr.update(choices=audiobook_options, value=session['audiobook'])
+ else:
+ return gr.update(choices=audiobook_options, value=audiobook_options[0][1])
+ gr.update(choices=audiobook_options)
+ except Exception as e:
+ error = f'update_gr_audiobook_list(): {e}!'
+ alert_exception(error)
+ return gr.update()
+
+ def change_gr_read_data(data, state, req: gr.Request):
+ try:
+ msg = 'Error while loading saved session. Please try to delete your cookies and refresh the page'
+ if data is None:
+ data = context.get_session(str(uuid.uuid4()))
+ session = context.get_session(data['id'])
+ if data.get('tab_id') == session.get('tab_id') or len(active_sessions) == 0:
+ restore_session_from_data(data, session)
+ session['status'] = None
+ if not ctx_tracker.start_session(session['id']):
+ error = "Your session is already active.
If it's not the case please close your browser and relaunch it."
+ return gr.update(), gr.update(), gr.update(value=''), update_gr_glass_mask(str=error)
+ else:
+ active_sessions.add(req.session_hash)
+ session[req.session_hash] = req.session_hash
+ session['cancellation_requested'] = False
+ if isinstance(session['ebook'], str):
+ if not os.path.exists(session['ebook']):
+ session['ebook'] = None
+ if session['voice'] is not None:
+ if not os.path.exists(session['voice']):
+ session['voice'] = None
+ if session['custom_model'] is not None:
+ if not os.path.exists(session['custom_model_dir']):
+ session['custom_model'] = None
+ if session['fine_tuned'] is not None:
+ if session['tts_engine'] is not None:
+ if session['tts_engine'] in models.keys():
+ if session['fine_tuned'] not in models[session['tts_engine']].keys():
+ session['fine_tuned'] = default_fine_tuned
+ else:
+ session['tts_engine'] = default_tts_engine
+ session['fine_tuned'] = default_fine_tuned
+ if session['audiobook'] is not None:
+ if not os.path.exists(session['audiobook']):
+ session['audiobook'] = None
+ if session['status'] == 'converting':
+ session['status'] = 'ready'
+ session['system'] = (f"{platform.system()}-{platform.release()}").lower()
+ session['custom_model_dir'] = os.path.join(models_dir, '__sessions', f"model-{session['id']}")
+ session['voice_dir'] = os.path.join(voices_dir, '__sessions', f"voice-{session['id']}", session['language'])
+ os.makedirs(session['custom_model_dir'], exist_ok=True)
+ os.makedirs(session['voice_dir'], exist_ok=True)
+ # As now uploaded voice files are in their respective language folder so check if no wav and bark folder are on the voice_dir root from previous versions
+ [shutil.move(src, os.path.join(session['voice_dir'], os.path.basename(src))) for src in glob(os.path.join(os.path.dirname(session['voice_dir']), '*.wav')) + ([os.path.join(os.path.dirname(session['voice_dir']), 'bark')] if os.path.isdir(os.path.join(os.path.dirname(session['voice_dir']), 'bark')) and not os.path.exists(os.path.join(session['voice_dir'], 'bark')) else [])]
+ if is_gui_shared:
+ msg = f' Note: access limit time: {interface_shared_tmp_expire} days'
+ session['audiobooks_dir'] = os.path.join(audiobooks_gradio_dir, f"web-{session['id']}")
+ delete_unused_tmp_dirs(audiobooks_gradio_dir, interface_shared_tmp_expire, session)
+ else:
+ msg = f' Note: if no activity is detected after {tmp_expire} days, your session will be cleaned up.'
+ session['audiobooks_dir'] = os.path.join(audiobooks_host_dir, f"web-{session['id']}")
+ delete_unused_tmp_dirs(audiobooks_host_dir, tmp_expire, session)
+ if not os.path.exists(session['audiobooks_dir']):
+ os.makedirs(session['audiobooks_dir'], exist_ok=True)
+ previous_hash = state['hash']
+ new_hash = hash_proxy_dict(MappingProxyType(session))
+ state['hash'] = new_hash
+ session_dict = proxy2dict(session)
+ show_alert({"type": "info", "msg": msg})
+ return gr.update(value=session_dict), gr.update(value=state), gr.update(value=session['id']), gr.update()
+ except Exception as e:
+ error = f'change_gr_read_data(): {e}'
+ alert_exception(error)
+ return gr.update(), gr.update(), gr.update(), gr.update()
+
+ def save_session(id, state):
+ try:
+ if id:
+ if id in context.sessions:
+ session = context.get_session(id)
+ if session:
+ if session['event'] == 'clear':
+ session_dict = session
+ else:
+ previous_hash = state['hash']
+ new_hash = hash_proxy_dict(MappingProxyType(session))
+ if previous_hash == new_hash:
+ return gr.update(), gr.update(), gr.update()
+ else:
+ state['hash'] = new_hash
+ session_dict = proxy2dict(session)
+ if session['status'] == 'converting':
+ if session['progress'] != len(audiobook_options):
+ session['progress'] = len(audiobook_options)
+ return gr.update(value=json.dumps(session_dict, indent=4)), gr.update(value=state), update_gr_audiobook_list(id)
+ return gr.update(value=json.dumps(session_dict, indent=4)), gr.update(value=state), gr.update()
+ return gr.update(), gr.update(), gr.update()
+ except Exception as e:
+ error = f'save_session(): {e}!'
+ alert_exception(error)
+ return gr.update(), gr.update(value=e), gr.update()
+
+ def clear_event(id):
+ if id:
+ session = context.get_session(id)
+ if session['event'] is not None:
+ session['event'] = None
+
+ gr_ebook_file.change(
+ fn=state_convert_btn,
+ inputs=[gr_ebook_file, gr_ebook_mode, gr_custom_model_file, gr_session],
+ outputs=[gr_convert_btn]
+ ).then(
+ fn=change_gr_ebook_file,
+ inputs=[gr_ebook_file, gr_session],
+ outputs=[gr_modal]
+ )
+ gr_ebook_mode.change(
+ fn=change_gr_ebook_mode,
+ inputs=[gr_ebook_mode, gr_session],
+ outputs=[gr_ebook_file]
+ )
+ gr_voice_file.upload(
+ fn=change_gr_voice_file,
+ inputs=[gr_voice_file, gr_session],
+ outputs=[gr_voice_file]
+ ).then(
+ fn=update_gr_voice_list,
+ inputs=[gr_session],
+ outputs=[gr_voice_list]
+ )
+ gr_voice_list.change(
+ fn=change_gr_voice_list,
+ inputs=[gr_voice_list, gr_session],
+ outputs=[gr_voice_player, gr_voice_del_btn]
+ )
+ gr_voice_del_btn.click(
+ fn=click_gr_voice_del_btn,
+ inputs=[gr_voice_list, gr_session],
+ outputs=[gr_confirm_field_hidden, gr_modal, gr_confirm_yes_btn, gr_confirm_no_btn]
+ )
+ gr_device.change(
+ fn=change_gr_device,
+ inputs=[gr_device, gr_session],
+ outputs=None
+ )
+ gr_language.change(
+ fn=change_gr_language,
+ inputs=[gr_language, gr_session],
+ outputs=[gr_language, gr_tts_engine_list, gr_custom_model_list, gr_fine_tuned_list]
+ ).then(
+ fn=update_gr_voice_list,
+ inputs=[gr_session],
+ outputs=[gr_voice_list]
+ )
+ gr_tts_engine_list.change(
+ fn=change_gr_tts_engine_list,
+ inputs=[gr_tts_engine_list, gr_session],
+ outputs=[gr_tts_rating, gr_tab_xtts_params, gr_tab_bark_params, gr_group_custom_model, gr_fine_tuned_list, gr_custom_model_file, gr_custom_model_list]
+ ).then(
+ fn=update_gr_voice_list,
+ inputs=[gr_session],
+ outputs=[gr_voice_list]
+ )
+ gr_fine_tuned_list.change(
+ fn=change_gr_fine_tuned_list,
+ inputs=[gr_fine_tuned_list, gr_session],
+ outputs=[gr_group_custom_model]
+ ).then(
+ fn=update_gr_voice_list,
+ inputs=[gr_session],
+ outputs=[gr_voice_list]
+ )
+ gr_custom_model_file.upload(
+ fn=change_gr_custom_model_file,
+ inputs=[gr_custom_model_file, gr_tts_engine_list, gr_session],
+ outputs=[gr_custom_model_file]
+ ).then(
+ fn=update_gr_custom_model_list,
+ inputs=[gr_session],
+ outputs=[gr_custom_model_list]
+ )
+ gr_custom_model_list.change(
+ fn=change_gr_custom_model_list,
+ inputs=[gr_custom_model_list, gr_session],
+ outputs=[gr_fine_tuned_list, gr_custom_model_del_btn]
+ )
+ gr_custom_model_del_btn.click(
+ fn=click_gr_custom_model_del_btn,
+ inputs=[gr_custom_model_list, gr_session],
+ outputs=[gr_confirm_field_hidden, gr_modal, gr_confirm_yes_btn, gr_confirm_no_btn]
+ )
+ gr_output_format_list.change(
+ fn=change_gr_output_format_list,
+ inputs=[gr_output_format_list, gr_session],
+ outputs=None
+ )
+ gr_output_split.change(
+ fn=change_gr_output_split,
+ inputs=[gr_output_split, gr_session],
+ outputs=gr_output_split_hours
+ )
+ gr_output_split_hours.change(
+ fn=change_gr_output_split_hours,
+ inputs=[gr_output_split_hours, gr_session],
+ outputs=None
+ )
+ gr_audiobook_vtt.change(
+ fn=lambda: gr.update(value=''),
+ inputs=[],
+ outputs=[gr_audiobook_sentence]
+ ).then(
+ fn=None,
+ inputs=[gr_audiobook_vtt],
+ js='(data)=>{window.load_vtt?.(URL.createObjectURL(new Blob([data],{type: "text/vtt"})));}'
+ )
+ gr_tab_progress.change(
+ fn=None,
+ inputs=[gr_tab_progress],
+ outputs=[],
+ js=f'() => {{ document.title = "{title}"; }}'
+ )
+ gr_audiobook_player_playback_time.change(
+ fn=change_gr_audiobook_player_playback_time,
+ inputs=[gr_audiobook_player_playback_time, gr_session],
+ outputs=[]
+ )
+ gr_audiobook_download_btn.click(
+ fn=lambda audiobook: show_alert({"type": "info", "msg": f'Downloading {os.path.basename(audiobook)}'}),
+ inputs=[gr_audiobook_list],
+ outputs=None,
+ show_progress='minimal'
+ )
+ gr_audiobook_list.change(
+ fn=change_gr_audiobook_list,
+ inputs=[gr_audiobook_list, gr_session],
+ outputs=[gr_audiobook_download_btn, gr_audiobook_player, gr_audiobook_vtt, gr_group_audiobook_list]
+ )
+ gr_audiobook_del_btn.click(
+ fn=click_gr_audiobook_del_btn,
+ inputs=[gr_audiobook_list, gr_session],
+ outputs=[gr_confirm_field_hidden, gr_modal, gr_confirm_yes_btn, gr_confirm_no_btn]
+ )
+ ########### XTTSv2 Params
+ gr_xtts_temperature.change(
+ fn=lambda val, id: change_param('temperature', val, id),
+ inputs=[gr_xtts_temperature, gr_session],
+ outputs=None
+ )
+ gr_xtts_length_penalty.change(
+ fn=lambda val, id, val2: change_param('length_penalty', val, id, val2),
+ inputs=[gr_xtts_length_penalty, gr_session, gr_xtts_num_beams],
+ outputs=None,
+ )
+ gr_xtts_num_beams.change(
+ fn=lambda val, id, val2: change_param('num_beams', val, id, val2),
+ inputs=[gr_xtts_num_beams, gr_session, gr_xtts_length_penalty],
+ outputs=None,
+ )
+ gr_xtts_repetition_penalty.change(
+ fn=lambda val, id: change_param('repetition_penalty', val, id),
+ inputs=[gr_xtts_repetition_penalty, gr_session],
+ outputs=None
+ )
+ gr_xtts_top_k.change(
+ fn=lambda val, id: change_param('top_k', val, id),
+ inputs=[gr_xtts_top_k, gr_session],
+ outputs=None
+ )
+ gr_xtts_top_p.change(
+ fn=lambda val, id: change_param('top_p', val, id),
+ inputs=[gr_xtts_top_p, gr_session],
+ outputs=None
+ )
+ gr_xtts_speed.change(
+ fn=lambda val, id: change_param('speed', val, id),
+ inputs=[gr_xtts_speed, gr_session],
+ outputs=None
+ )
+ gr_xtts_enable_text_splitting.change(
+ fn=lambda val, id: change_param('enable_text_splitting', val, id),
+ inputs=[gr_xtts_enable_text_splitting, gr_session],
+ outputs=None
+ )
+ ########### BARK Params
+ gr_bark_text_temp.change(
+ fn=lambda val, id: change_param('text_temp', val, id),
+ inputs=[gr_bark_text_temp, gr_session],
+ outputs=None
+ )
+ gr_bark_waveform_temp.change(
+ fn=lambda val, id: change_param('waveform_temp', val, id),
+ inputs=[gr_bark_waveform_temp, gr_session],
+ outputs=None
+ )
+ ############ Timer to save session to localStorage
+ gr_timer = gr.Timer(9, active=False)
+ gr_timer.tick(
+ fn=save_session,
+ inputs=[gr_session, gr_state_update],
+ outputs=[gr_write_data, gr_state_update, gr_audiobook_list]
+ ).then(
+ fn=clear_event,
+ inputs=[gr_session],
+ outputs=None
+ )
+ gr_convert_btn.click(
+ fn=state_convert_btn,
+ inputs=None,
+ outputs=[gr_convert_btn]
+ ).then(
+ fn=disable_components,
+ inputs=[],
+ outputs=[gr_ebook_mode, gr_language, gr_voice_file, gr_voice_list, gr_device, gr_tts_engine_list, gr_fine_tuned_list, gr_custom_model_file, gr_custom_model_list]
+ ).then(
+ fn=submit_convert_btn,
+ inputs=[
+ gr_session, gr_device, gr_ebook_file, gr_tts_engine_list, gr_language, gr_voice_list,
+ gr_custom_model_list, gr_fine_tuned_list, gr_output_format_list,
+ gr_xtts_temperature, gr_xtts_length_penalty, gr_xtts_num_beams, gr_xtts_repetition_penalty, gr_xtts_top_k, gr_xtts_top_p, gr_xtts_speed, gr_xtts_enable_text_splitting,
+ gr_bark_text_temp, gr_bark_waveform_temp, gr_output_split, gr_output_split_hours
+ ],
+ outputs=[gr_tab_progress]
+ ).then(
+ fn=enable_components,
+ inputs=[],
+ outputs=[gr_ebook_mode, gr_language, gr_voice_file, gr_voice_list, gr_device, gr_tts_engine_list, gr_fine_tuned_list, gr_custom_model_file, gr_custom_model_list]
+ ).then(
+ fn=refresh_interface,
+ inputs=[gr_session],
+ outputs=[gr_convert_btn, gr_ebook_file, gr_audiobook_list, gr_audiobook_player, gr_modal, gr_voice_list]
+ )
+ gr_write_data.change(
+ fn=None,
+ inputs=[gr_write_data],
+ js="""
+ (data)=>{
+ try{
+ if(data){
+ localStorage.clear();
+ if(data['event'] != 'clear'){
+ //console.log('save: ', data);
+ window.localStorage.setItem('data', JSON.stringify(data));
+ }
+ }
+ }catch(e){
+ console.log('gr_write_data.change error: '+e)
+ }
+ }
+ """
+ )
+ gr_read_data.change(
+ fn=change_gr_read_data,
+ inputs=[gr_read_data, gr_state_update],
+ outputs=[gr_write_data, gr_state_update, gr_session, gr_glass_mask]
+ ).then(
+ fn=restore_interface,
+ inputs=[gr_session],
+ outputs=[
+ gr_ebook_file, gr_ebook_mode, gr_device, gr_language,
+ gr_tts_engine_list, gr_custom_model_list, gr_fine_tuned_list,
+ gr_output_format_list, gr_audiobook_list, gr_audiobook_vtt,
+ gr_xtts_temperature, gr_xtts_length_penalty, gr_xtts_num_beams, gr_xtts_repetition_penalty,
+ gr_xtts_top_k, gr_xtts_top_p, gr_xtts_speed, gr_xtts_enable_text_splitting, gr_bark_text_temp,
+ gr_bark_waveform_temp, gr_voice_list, gr_output_split, gr_output_split_hours, gr_timer
+ ]
+ ).then(
+ fn=lambda session: update_gr_glass_mask(attr='class="hide"') if session else gr.update(),
+ inputs=[gr_session],
+ outputs=[gr_glass_mask]
+ )
+ gr_confirm_yes_btn.click(
+ fn=confirm_deletion,
+ inputs=[gr_voice_list, gr_custom_model_list, gr_audiobook_list, gr_session, gr_confirm_field_hidden],
+ outputs=[gr_custom_model_list, gr_audiobook_list, gr_modal, gr_voice_list, gr_confirm_yes_btn, gr_confirm_no_btn]
+ )
+ gr_confirm_no_btn.click(
+ fn=confirm_deletion,
+ inputs=[gr_voice_list, gr_custom_model_list, gr_audiobook_list, gr_session],
+ outputs=[gr_custom_model_list, gr_audiobook_list, gr_modal, gr_voice_list, gr_confirm_yes_btn, gr_confirm_no_btn]
+ )
+ app.load(
+ fn=None,
+ js=r'''
+ ()=>{
+ try {
+ if (typeof(window.init_elements) !== "function") {
+ window.init_elements = () => {
+ try {
+ let lastCue = null;
+ let fade_timeout = null;
+ let last_time = 0;
+ if (gr_root && gr_checkboxes && gr_radios && gr_audiobook_player_playback_time && gr_audiobook_sentence && gr_tab_progress) {
+ let set_playback_time = false;
+ gr_audiobook_player.addEventListener("loadedmetadata", () => {
+ //console.log("loadedmetadata:", window.playback_time);
+ if (window.playback_time > 0) {
+ gr_audiobook_player.currentTime = window.playback_time;
+ }
+ set_playback_time = true;
+ },{once: true});
+ gr_audiobook_player.addEventListener("timeupdate", () => {
+ if (set_playback_time == true) {
+ window.playback_time = gr_audiobook_player.currentTime;
+ const cue = findCue(window.playback_time);
+ if (cue && cue !== lastCue) {
+ if (fade_timeout) {
+ gr_audiobook_sentence.style.opacity = "1";
+ } else {
+ gr_audiobook_sentence.style.opacity = "0";
+ }
+ gr_audiobook_sentence.style.transition = "none";
+ gr_audiobook_sentence.value = cue.text;
+ clearTimeout(fade_timeout);
+ fade_timeout = setTimeout(() => {
+ gr_audiobook_sentence.style.transition = "opacity 0.1s ease-in";
+ gr_audiobook_sentence.style.opacity = "1";
+ fade_timeout = null;
+ }, 33);
+ lastCue = cue;
+ } else if (!cue && lastCue !== null) {
+ gr_audiobook_sentence.value = "...";
+ lastCue = null;
+ }
+ const now = performance.now();
+ if (now - last_time > 1000) {
+ //console.log("timeupdate", window.playback_time);
+ gr_audiobook_player_playback_time.value = String(window.playback_time);
+ gr_audiobook_player_playback_time.dispatchEvent(new Event("input", { bubbles: true }));
+ last_time = now;
+ }
+ }
+ });
+ gr_audiobook_player.addEventListener("ended", () => {
+ gr_audiobook_sentence.value = "...";
+ lastCue = null;
+ });
+
+ ///////////////
+
+ // Observe programmatic changes
+ new MutationObserver(tab_progress).observe(gr_tab_progress, { attributes: true, childList: true, subtree: true, characterData: true });
+ // Also catch user edits
+ gr_tab_progress.addEventListener("input", tab_progress);
+
+ ///////////////
+
+ const url = new URL(window.location);
+ const theme = url.searchParams.get("__theme");
+ let osTheme;
+ let audioFilter = "";
+ let elColor = "#666666";
+ if (theme) {
+ if (theme === "dark") {
+ if (gr_audiobook_player) {
+ audioFilter = "invert(1) hue-rotate(180deg)";
+ }
+ elColor = "#fff";
+ }
+ gr_checkboxes.forEach(cb => { cb.style.border = "1px solid " + elColor; });
+ gr_radios.forEach(cb => { cb.style.border = "1px solid " + elColor; });
+ } else {
+ osTheme = window.matchMedia?.("(prefers-color-scheme: dark)").matches;
+ if (osTheme) {
+ if (gr_audiobook_player) {
+ audioFilter = "invert(1) hue-rotate(180deg)";
+ }
+ elColor = "#fff";
+ }
+ gr_checkboxes.forEach(cb => { cb.style.border = "1px solid " + elColor; });
+ gr_radios.forEach(cb => { cb.style.border = "1px solid " + elColor; });
+ }
+ if (!gr_audiobook_player.style.transition) {
+ gr_audiobook_player.style.transition = "filter 1s ease";
+ }
+ gr_audiobook_player.style.filter = audioFilter;
+ }
+ } catch (e) {
+ console.log("init_elements error:", e);
+ }
+ };
+ }
+ if (typeof(window.load_vtt) !== "function") {
+ window.load_vtt_timeout = null;
+ window.load_vtt = (path) => {
+ try {
+ if (gr_audiobook_player && gr_audiobook_player_playback_time && gr_audiobook_sentence) {
+ // Remove any