Spaces:
Running
on
Zero
Running
on
Zero
| import subprocess | |
| from huggingface_hub import snapshot_download, hf_hub_download | |
| def sh(cmd): subprocess.check_call(cmd, shell=True) | |
| sh("pip uninstall onnxruntime onnxruntime-gpu -y && pip install onnxruntime-gpu") | |
| import os | |
| import shutil | |
| src = "checkpoints" # your source folder | |
| dst = "/home/user/.cache/torch/hub/checkpoints" | |
| # Create destination folder if it doesn't exist | |
| os.makedirs(dst, exist_ok=True) | |
| # Copy each item from src β dst | |
| for item in os.listdir(src): | |
| s = os.path.join(src, item) | |
| d = os.path.join(dst, item) | |
| if os.path.isdir(s): | |
| # Copy directory | |
| shutil.copytree(s, d, dirs_exist_ok=True) | |
| else: | |
| # Copy file | |
| shutil.copy2(s, d) | |
| print("β Done copying checkpoints!") | |
| import spaces | |
| import io | |
| import torch | |
| import inspect | |
| import pyannote.audio.core.task as task_module | |
| from pathlib import Path | |
| from pydub import AudioSegment | |
| import math | |
| # Collect all classes from pyannote.audio.core.task | |
| safe_globals = [torch.torch_version.TorchVersion] | |
| for name, obj in inspect.getmembers(task_module): | |
| if inspect.isclass(obj): | |
| safe_globals.append(obj) | |
| # Allow these classes to be used when unpickling weights with weights_only=True | |
| torch.serialization.add_safe_globals(safe_globals) | |
| from typing import List, Dict | |
| import time | |
| from time_util import timer | |
| import os, pathlib, sys, ctypes | |
| import uuid | |
| # preload the CNN component | |
| ctypes.CDLL("/usr/local/lib/python3.10/site-packages/nvidia/cudnn/lib/libcudnn_cnn.so.9") | |
| # print(os.environ.get('LD_LIBRARY_PATH', '')) | |
| import torch, ctranslate2, os | |
| import numpy as np | |
| from pydub import AudioSegment | |
| from faster_whisper import WhisperModel | |
| from pyannote.audio import Pipeline | |
| from pyannote.audio.pipelines.utils.hook import ProgressHook | |
| import gradio as gr | |
| from pydub import AudioSegment | |
| import srt | |
| import io | |
| from pydub import AudioSegment | |
| import math | |
| from datetime import timedelta | |
| import torchaudio | |
| import tigersound.look2hear.models | |
| # @spaces.GPU() | |
| # def print_ort(): | |
| # import onnxruntime as ort | |
| # print(ort.get_available_providers()) | |
| # print_ort() | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| snapshot_download("IndexTeam/IndexTTS-2", local_dir=os.path.join(current_dir,"checkpoints")) | |
| dnr_model = tigersound.look2hear.models.TIGERDNR.from_pretrained("JusperLee/TIGER-DnR").to("cuda").eval() | |
| from indextts.infer_v2 import IndexTTS2 | |
| MODE = 'local' | |
| tts = IndexTTS2(model_dir="./checkpoints", | |
| cfg_path=os.path.join("./checkpoints", "config.yaml"), | |
| use_fp16=True, | |
| use_deepspeed=False, | |
| use_cuda_kernel=False, | |
| ) | |
| os.environ["PROCESSED_RESULTS"] = f"{os.getcwd()}/proprocess_results" | |
| from lipsync import apply_lipsync | |
| import logging | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("httpcore").setLevel(logging.WARNING) | |
| def split_subtitles_max_duration( | |
| subtitles, | |
| max_seconds: float = 10.0, | |
| min_last_chunk_seconds: float = 1.0, | |
| ): | |
| """ | |
| Take a list of srt.Subtitle and return a new list where | |
| no subtitle duration is longer than max_seconds, except that | |
| the *last* chunk is allowed to exceed max_seconds slightly | |
| if the leftover duration would otherwise be less than | |
| min_last_chunk_seconds. | |
| Text is split by words roughly evenly across the chunks. | |
| """ | |
| max_td = timedelta(seconds=max_seconds) | |
| new_subs = [] | |
| new_index = 1 | |
| for sub in subtitles: | |
| start = sub.start | |
| end = sub.end | |
| duration = end - start | |
| total_secs = duration.total_seconds() | |
| # If already short enough, just copy it | |
| if total_secs <= max_seconds: | |
| new_subs.append( | |
| srt.Subtitle( | |
| index=new_index, | |
| start=start, | |
| end=end, | |
| content=sub.content, | |
| ) | |
| ) | |
| new_index += 1 | |
| continue | |
| # Need to split this subtitle | |
| words = sub.content.split() | |
| if not words: | |
| # No text, skip | |
| continue | |
| # --- Determine number of chunks, avoiding tiny last chunk --- | |
| base_chunks = int(total_secs // max_seconds) | |
| remainder = total_secs - base_chunks * max_seconds | |
| if base_chunks == 0: | |
| # total_secs > max_seconds due to earlier check, but just in case | |
| num_chunks = 1 | |
| else: | |
| if remainder == 0: | |
| num_chunks = base_chunks | |
| elif remainder < min_last_chunk_seconds: | |
| # Don't create a tiny last chunk; merge its time into previous chunks | |
| num_chunks = base_chunks | |
| else: | |
| num_chunks = base_chunks + 1 | |
| # Ensure at least one chunk | |
| num_chunks = max(1, num_chunks) | |
| # Words per chunk (roughly even) | |
| words_per_chunk = max(1, int(math.ceil(len(words) / num_chunks))) | |
| chunk_start = start | |
| word_idx = 0 | |
| for chunk_idx in range(num_chunks): | |
| # Last chunk takes us all the way to the original end, | |
| # so it can be slightly > max_seconds if needed. | |
| if chunk_idx == num_chunks - 1: | |
| chunk_end = end | |
| else: | |
| chunk_end = min(end, chunk_start + max_td) | |
| if chunk_end <= chunk_start: | |
| break | |
| chunk_words = words[word_idx:word_idx + words_per_chunk] | |
| word_idx += words_per_chunk | |
| if not chunk_words: | |
| break | |
| new_subs.append( | |
| srt.Subtitle( | |
| index=new_index, | |
| start=chunk_start, | |
| end=chunk_end, | |
| content=" ".join(chunk_words), | |
| ) | |
| ) | |
| new_index += 1 | |
| chunk_start = chunk_end | |
| return new_subs | |
| def split_text_into_chunks(text, max_chars=400): | |
| """ | |
| Rough splitter: breaks text into chunks <= max_chars, | |
| preferring to split at sentence boundaries, then spaces. | |
| """ | |
| text = text.strip() | |
| chunks = [] | |
| while len(text) > max_chars: | |
| # Try to split at the last sentence end before max_chars | |
| split_at = max( | |
| text.rfind(". ", 0, max_chars), | |
| text.rfind("! ", 0, max_chars), | |
| text.rfind("? ", 0, max_chars), | |
| ) | |
| # If there was no sentence boundary, fall back to last space | |
| if split_at == -1: | |
| split_at = text.rfind(" ", 0, max_chars) | |
| # If still nothing, just hard cut | |
| if split_at == -1: | |
| split_at = max_chars | |
| chunk = text[:split_at + 1].strip() | |
| chunks.append(chunk) | |
| text = text[split_at + 1 :].strip() | |
| if text: | |
| chunks.append(text) | |
| return chunks | |
| def sh(cmd): subprocess.check_call(cmd, shell=True) | |
| # sh("find / -name \"libcudnn*\" 2>/dev/null") | |
| # -------------------- | |
| # CONFIG | |
| # -------------------- | |
| MODEL_SIZE = "medium" # e.g. "small", "medium", "large-v2" | |
| MIN_SEGMENT_SECONDS = 0.5 # only transcribe segments longer than this | |
| # If your pyannote pipeline needs a HF token, set it here or via env var: | |
| # HUGGINGFACE_TOKEN = "hf_..." | |
| HF_TOKEN = os.getenv("HF_TOKEN", None) | |
| # -------------------- | |
| # LOAD GLOBAL MODELS (ONCE) | |
| # -------------------- | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"Loading pyannote diarization model...") | |
| diarization_pipeline = Pipeline.from_pretrained( | |
| "pyannote/speaker-diarization-3.1" | |
| ) | |
| # -------------------- | |
| # HELPERS | |
| # -------------------- | |
| def format_timestamp(ts: float) -> str: | |
| """Convert seconds to SRT timestamp format.""" | |
| hrs = int(ts // 3600) | |
| mins = int((ts % 3600) // 60) | |
| secs = int(ts % 60) | |
| ms = int((ts - int(ts)) * 1000) | |
| return f"{hrs:02d}:{mins:02d}:{secs:02d},{ms:03d}" | |
| def extract_audio_to_wav(input_video: str, output_dir: str): | |
| audio_file = os.path.join(output_dir, "audio_og.wav") | |
| background_file = os.path.join(output_dir, "background_og.wav") | |
| vocal_file = os.path.join(output_dir, "vocal_og.wav") | |
| effect_file = os.path.join(output_dir, "effect_og.wav") | |
| audio_16k_file = os.path.join(output_dir, "audio_16k.wav") | |
| video_path = input_video | |
| separator_dir = Path(os.path.join(output_dir, "separator_directory")) | |
| os.makedirs(separator_dir, exist_ok=True) | |
| # Extract raw audio | |
| cmd = [ | |
| "ffmpeg", | |
| "-loglevel", "error", | |
| "-y", | |
| "-i", video_path, | |
| "-vn", | |
| "-acodec", "pcm_s16le", | |
| "-ar", "44100", | |
| "-ac", "2", | |
| audio_file | |
| ] | |
| subprocess.run(cmd, check=True) | |
| audio, sr = torchaudio.load(audio_file) | |
| audio = audio.to("cuda") | |
| with torch.no_grad(): | |
| dialog, effect, music = dnr_model(audio[None]) | |
| torchaudio.save(vocal_file, dialog.cpu(), sr) | |
| torchaudio.save(effect_file, effect.cpu(), sr) | |
| torchaudio.save(background_file, music.cpu(), sr) | |
| # Convert vocals to 16k mono | |
| cmd = [ | |
| "ffmpeg", | |
| "-loglevel", "error", | |
| "-y", | |
| "-i", vocal_file, | |
| "-ac", "1", | |
| "-ar", "16000", | |
| "-acodec", "pcm_s16le", | |
| audio_16k_file | |
| ] | |
| subprocess.run(cmd, check=True) | |
| return audio_file, effect_file, background_file, audio_16k_file, vocal_file | |
| def diarize_audio(audio_path: str) -> List[Dict]: | |
| """Run pyannote diarization and return segments.""" | |
| diarization_pipeline.to(torch.device(device)) | |
| with ProgressHook() as hook: | |
| diarization_result = diarization_pipeline(audio_path, hook=hook) | |
| segments = [] | |
| for segment, _, speaker in diarization_result.itertracks(yield_label=True): | |
| duration = segment.end - segment.start | |
| if duration >= MIN_SEGMENT_SECONDS: | |
| segments.append( | |
| { | |
| "start": float(segment.start), | |
| "end": float(segment.end), | |
| "speaker": speaker, | |
| } | |
| ) | |
| segments.sort(key=lambda x: x["start"]) | |
| return segments | |
| def chunk_to_float32(chunk: AudioSegment) -> np.ndarray: | |
| """Convert a pydub chunk to mono 16kHz float32 numpy array in [-1, 1].""" | |
| chunk = chunk.set_frame_rate(16000).set_channels(1) | |
| samples = np.array(chunk.get_array_of_samples()) | |
| # Normalize based on sample width | |
| if chunk.sample_width == 2: # 16-bit | |
| samples = samples.astype(np.float32) / 32768.0 | |
| elif chunk.sample_width == 4: # 32-bit | |
| samples = samples.astype(np.float32) / 2147483648.0 | |
| else: | |
| samples = samples.astype(np.float32) | |
| return samples | |
| def transcribe_segment(whisper_model, samples: np.ndarray) -> str: | |
| """Transcribe+translate a single segment with faster-whisper.""" | |
| segment_text_parts = [] | |
| segments, info = whisper_model.transcribe( | |
| samples, | |
| beam_size=1, | |
| vad_filter=False, # diarization already detected speech | |
| condition_on_previous_text=True, # independent segments | |
| task="translate", # translate to English | |
| word_timestamps=True, | |
| ) | |
| for seg in segments: | |
| if seg.text: | |
| segment_text_parts.append(seg.text.strip()) | |
| return " ".join(segment_text_parts) | |
| def transcribe_segment_words( | |
| whisper_model, | |
| samples: np.ndarray, | |
| offset_sec: float, | |
| speaker: str | None = None, | |
| ): | |
| """ | |
| Transcribe+translate a single diarization segment, returning a | |
| list of word dicts with absolute timestamps. | |
| """ | |
| words_out = [] | |
| segments, info = whisper_model.transcribe( | |
| samples, | |
| beam_size=1, | |
| vad_filter=False, # diarization already detected speech | |
| condition_on_previous_text=False, # better for hard cuts / segments | |
| task="translate", | |
| word_timestamps=True, | |
| ) | |
| for seg in segments: | |
| if not seg.words: | |
| continue | |
| for w in seg.words: | |
| words_out.append( | |
| { | |
| "start": offset_sec + float(w.start), | |
| "end": offset_sec + float(w.end), | |
| "text": w.word, | |
| "speaker": speaker, | |
| } | |
| ) | |
| return words_out | |
| def words_to_subtitles(words, max_seconds: float = 10.0): | |
| """ | |
| Group word-level timings into SRT subtitles, each up to max_seconds long, | |
| cutting ONLY at word boundaries, AND never mixing speakers in the same subtitle. | |
| Whenever the speaker changes, we close the current subtitle and start a new one. | |
| Expects each word dict to have: | |
| - "start" (float, seconds) | |
| - "end" (float, seconds) | |
| - "text" (str) | |
| - "speaker" (str or None) | |
| """ | |
| # sort just in case | |
| words = sorted(words, key=lambda w: w["start"]) | |
| subtitles = [] | |
| current_words = [] | |
| current_start = None | |
| current_speaker = None | |
| index = 1 | |
| for w in words: | |
| w_start = w["start"] | |
| w_end = w["end"] | |
| w_speaker = w.get("speaker") | |
| if current_start is None: | |
| # start first subtitle | |
| current_start = w_start | |
| current_words = [w] | |
| current_speaker = w_speaker | |
| continue | |
| speaker_changed = (w_speaker != current_speaker) | |
| duration_if_added = w_end - current_start | |
| exceeds_max = duration_if_added > max_seconds | |
| # If adding this word would: | |
| # - exceed max_seconds, OR | |
| # - cross into a different speaker, | |
| # then we close the current subtitle and start a new one. | |
| if (speaker_changed or exceeds_max) and current_words: | |
| text = " ".join(x["text"] for x in current_words).strip() | |
| sub_start = current_start | |
| sub_end = current_words[-1]["end"] | |
| subtitles.append( | |
| srt.Subtitle( | |
| index=index, | |
| start=timedelta(seconds=sub_start), | |
| end=timedelta(seconds=sub_end), | |
| content=text, | |
| ) | |
| ) | |
| index += 1 | |
| # start new subtitle from this word | |
| current_start = w_start | |
| current_words = [w] | |
| current_speaker = w_speaker | |
| else: | |
| current_words.append(w) | |
| # flush last subtitle | |
| if current_words: | |
| text = " ".join(x["text"] for x in current_words).strip() | |
| sub_start = current_start | |
| sub_end = current_words[-1]["end"] | |
| subtitles.append( | |
| srt.Subtitle( | |
| index=index, | |
| start=timedelta(seconds=sub_start), | |
| end=timedelta(seconds=sub_end), | |
| content=text, | |
| ) | |
| ) | |
| return subtitles | |
| def build_srt(segments: List[Dict], audio_wav: str, out_srt_path: str): | |
| """ | |
| Generate SRT file from diarized segments and audio, | |
| using word-level timestamps and grouping into ~10s subtitles. | |
| """ | |
| audio = AudioSegment.from_file(audio_wav) | |
| print(f"Loading faster-whisper model ({MODEL_SIZE})...") | |
| whisper_model = WhisperModel( | |
| MODEL_SIZE, | |
| device="cuda", | |
| compute_type="float16", | |
| ) | |
| all_words = [] | |
| for i, seg in enumerate(segments, start=1): | |
| start_sec = seg["start"] | |
| end_sec = seg["end"] | |
| speaker = seg["speaker"] | |
| start_ms = int(start_sec * 1000) | |
| end_ms = int(end_sec * 1000) | |
| chunk = audio[start_ms:end_ms] | |
| samples = chunk_to_float32(chunk) | |
| # get words for this diar segment, with absolute times | |
| seg_words = transcribe_segment_words( | |
| whisper_model, | |
| samples, | |
| offset_sec=start_sec, | |
| speaker=speaker, | |
| ) | |
| all_words.extend(seg_words) | |
| print(f"Diar segment {i} ({speaker}): {len(seg_words)} words") | |
| # group words into β€10s subtitles, word aligned | |
| subtitles = words_to_subtitles(all_words, max_seconds=10.0) | |
| # write SRT | |
| with open(out_srt_path, "w", encoding="utf-8") as f: | |
| f.write(srt.compose(subtitles)) | |
| def translate_video(video_file, duration, session_id = None, progress=gr.Progress(track_tqdm=True)): | |
| if video_file is None: | |
| raise gr.Error("Please upload a clip.") | |
| return process_video(video_file, False, duration, session_id, progress) | |
| def translate_lipsync_video(video_file, duration, session_id = None, progress=gr.Progress(track_tqdm=True)): | |
| if video_file is None: | |
| raise gr.Error("Please upload a clip.") | |
| return process_video(video_file, True, duration, session_id, progress) | |
| def run_example(video_file, allow_lipsync, duration, session_id = None, progress=gr.Progress(track_tqdm=True)): | |
| with timer("processed"): | |
| result = process_video(video_file, allow_lipsync, duration, session_id, progress) | |
| return result | |
| def get_duration(video_file, allow_lipsync, duration, session_id, progress): | |
| if allow_lipsync: | |
| if duration <= 3: | |
| return 30 | |
| elif duration <= 5: | |
| return 60 | |
| elif duration <= 10: | |
| return 90 | |
| elif duration <= 20: | |
| return 120 | |
| elif duration <= 30: | |
| return 150 | |
| else: | |
| return 40 | |
| def process_video(video_file, allow_lipsync, duration, session_id = None, progress=gr.Progress(track_tqdm=True)): | |
| """ | |
| Gradio callback: | |
| - video_file: temp file object/path from Gradio | |
| - returns path to generated SRT file (for download) | |
| """ | |
| import onnxruntime as ort | |
| if session_id == None: | |
| session_id = uuid.uuid4().hex | |
| output_dir = os.path.join(os.environ["PROCESSED_RESULTS"], session_id) | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Gradio's File/Video component gives dict or str depending on version | |
| if isinstance(video_file, dict): | |
| video_path = video_file.get("name") or video_file.get("path") | |
| else: | |
| video_path = video_file | |
| if video_path is None or not os.path.exists(video_path): | |
| raise gr.Error("Could not read uploaded video file.") | |
| # Create temp directory to hold WAV + SRT | |
| srt_path = os.path.join(output_dir, "diarized_translated.srt") | |
| src_video_path = video_path | |
| cropped_video_path = os.path.join(output_dir, "input_30s.mp4") | |
| duration_s = int(duration) | |
| print(f"duration_s:{duration_s}") | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", src_video_path, | |
| "-t", f"{duration_s}", | |
| "-c", "copy", # stream copy, no re-encode | |
| cropped_video_path, | |
| ] | |
| subprocess.run(cmd, check=True) | |
| video_path = cropped_video_path | |
| # 1. Extract audio | |
| audio_wav, effect_wav, background_wav, audio_16k_wav, vocal_wav = extract_audio_to_wav(video_path, output_dir) | |
| # 2. Diarization | |
| segments = diarize_audio(audio_16k_wav) | |
| if not segments: | |
| raise gr.Error("No valid speech segments found for diarization.") | |
| # 3. Build SRT from diarized segments + whisper | |
| with timer("Generating srt"): | |
| build_srt(segments, audio_16k_wav, srt_path) | |
| # ---- ORIGINAL SRT (used for TTS) ---- | |
| with open(srt_path, "r", encoding="utf-8") as f: | |
| srt_data = f.read() | |
| subtitles = list(srt.parse(srt_data)) | |
| # Keep this list as-is for TTS timing | |
| tts_subtitles = subtitles | |
| # ---- CREATE 10s-MAX SRT FOR DOWNLOAD ---- | |
| max10_subtitles = tts_subtitles | |
| # max10_subtitles = split_subtitles_max_duration(subtitles, max_seconds=10.0) | |
| tts_subtitles = max10_subtitles | |
| srt_10s_path = os.path.join(output_dir, "diarized_translated_max10s.srt") | |
| with open(srt_10s_path, "w", encoding="utf-8") as f: | |
| f.write(srt.compose(max10_subtitles)) | |
| # ---- TTS USING ORIGINAL SRT ---- | |
| last_end_seconds = tts_subtitles[-1].end.total_seconds() | |
| total_ms = int((last_end_seconds + 1) * 1000) | |
| timeline = AudioSegment.silent(duration=total_ms) | |
| original_audio = AudioSegment.from_file(audio_wav) | |
| MAX_BATCH_MS = 300_000 # ~5 minutes of target subtitle duration per batch | |
| with timer("Generating speech"): | |
| num_subs = len(tts_subtitles) | |
| idx = 0 | |
| while idx < num_subs: | |
| spk_prompts = [] # paths to src_prompt_*.wav | |
| texts = [] # subtitle texts for this batch | |
| out_paths = [] # where IndexTTS2 will save generated wavs | |
| starts_ms = [] # for overlaying later | |
| target_ms_list = [] # per-subtitle target durations | |
| batch_ms_sum = 0 | |
| batch_start = idx | |
| # ---- fill one batch until we hit ~MAX_BATCH_MS ---- | |
| while idx < num_subs: | |
| sub = tts_subtitles[idx] | |
| start_ms = int(sub.start.total_seconds() * 1000) | |
| end_ms = int(sub.end.total_seconds() * 1000) | |
| target_ms = max(end_ms - start_ms, 0) | |
| # If adding this subtitle would exceed the limit and we already | |
| # have something in the batch, stop and process the current batch. | |
| if batch_ms_sum + target_ms > MAX_BATCH_MS and len(target_ms_list) > 0: | |
| break | |
| global_idx = idx | |
| # 1) prompt audio for this subtitle | |
| src_chunk = original_audio[start_ms:end_ms] | |
| src_prompt_path = os.path.join(output_dir, f"src_prompt_{global_idx}.wav") | |
| src_chunk.export(src_prompt_path, format="wav") | |
| # 2) text + output path | |
| text = sub.content.replace("\n", " ") | |
| out_path = os.path.join(output_dir, f"gen_{global_idx}.wav") | |
| spk_prompts.append(src_prompt_path) | |
| texts.append(text) | |
| out_paths.append(out_path) | |
| starts_ms.append(start_ms) | |
| target_ms_list.append(target_ms) | |
| batch_ms_sum += target_ms | |
| idx += 1 | |
| print(f"batch from {batch_start} to {idx - 1}, batch_ms_sum: {batch_ms_sum}") | |
| # --- call batched TTS once for this batch --- | |
| do_sample = True | |
| top_p = 0.8 | |
| top_k = 30 | |
| temperature = 0.8 | |
| length_penalty = 0.0 | |
| num_beams = 3 | |
| repetition_penalty = 10.0 | |
| max_mel_tokens = 1500 | |
| # You could compute some aggregate target_length_ms here if your API supports it, | |
| # e.g. avg or max(target_ms_list). For now, keep None as before. | |
| tts_outputs = tts.infer_batch( | |
| spk_audio_prompts=spk_prompts, | |
| texts=texts, | |
| output_paths=out_paths, | |
| emo_audio_prompts=None, | |
| emo_alpha=1.0, | |
| emo_vectors=None, | |
| use_emo_text=False, | |
| emo_texts=None, | |
| use_random=False, | |
| interval_silence=200, | |
| verbose=False, | |
| max_text_tokens_per_segment=120, | |
| speed=1.0, | |
| target_length_ms=target_ms_list, | |
| do_sample=do_sample, | |
| top_p=top_p, | |
| top_k=top_k, | |
| temperature=temperature, | |
| length_penalty=length_penalty, | |
| num_beams=num_beams, | |
| repetition_penalty=repetition_penalty, | |
| max_mel_tokens=max_mel_tokens, | |
| ) | |
| # --- read generated wavs and overlay them --- | |
| for local_idx, out_path in enumerate(tts_outputs): | |
| start_ms = starts_ms[local_idx] | |
| seg = AudioSegment.from_file(out_path, format="wav") | |
| seg = seg - 2 | |
| timeline = timeline.overlay(seg, position=start_ms) | |
| # cleanup | |
| os.remove(out_path) | |
| os.remove(spk_prompts[local_idx]) | |
| # ------------------------------------------------------- | |
| # Bring back original dialog in the *gaps* (grunts, etc.) | |
| # ------------------------------------------------------- | |
| # Load separated dialog track | |
| dialog = AudioSegment.from_file(vocal_wav) | |
| # Make sure it matches the TTS timeline parameters | |
| dialog = dialog.set_frame_rate(timeline.frame_rate).set_channels(timeline.channels) | |
| total_len_ms = len(timeline) | |
| # Collect speech regions from subtitles (approximate "where TTS will speak") | |
| speech_regions = [] | |
| for sub in tts_subtitles: | |
| start_ms = int(sub.start.total_seconds() * 1000) | |
| end_ms = int(sub.end.total_seconds() * 1000) | |
| # clamp to track length | |
| start_ms = max(0, min(start_ms, total_len_ms)) | |
| end_ms = max(0, min(end_ms, total_len_ms)) | |
| if end_ms > start_ms: | |
| speech_regions.append((start_ms, end_ms)) | |
| # Merge overlapping/adjacent regions | |
| speech_regions.sort() | |
| merged = [] | |
| for s, e in speech_regions: | |
| if not merged: | |
| merged.append([s, e]) | |
| else: | |
| last_s, last_e = merged[-1] | |
| if s <= last_e: # overlap or touch | |
| merged[-1][1] = max(last_e, e) | |
| else: | |
| merged.append([s, e]) | |
| # Compute the complement: regions where there's NO subtitle (gaps) | |
| gaps = [] | |
| cursor = 0 | |
| for s, e in merged: | |
| if cursor < s: | |
| gaps.append((cursor, s)) | |
| cursor = max(cursor, e) | |
| if cursor < total_len_ms: | |
| gaps.append((cursor, total_len_ms)) | |
| # Overlay original dialog only in those gaps | |
| MIN_GAP_MS = 10 # ignore ultra-tiny gaps | |
| for g_start, g_end in gaps: | |
| if g_end - g_start < MIN_GAP_MS: | |
| continue | |
| # Extract that piece of the original dialog | |
| original_chunk = dialog[g_start:g_end] | |
| original_chunk = original_chunk + 6 | |
| timeline = timeline.overlay(original_chunk, position=g_start) | |
| video_in = video_path | |
| audio_in = output_dir + "/final_output.wav" | |
| audio_16k_in = output_dir + "/final_16k_output.wav" | |
| # ---------- 5. Mix background + new TTS vocal ---------- | |
| if background_wav is not None: | |
| eff = AudioSegment.from_file(effect_wav) | |
| bg = AudioSegment.from_file(background_wav) | |
| # If background is shorter than the TTS timeline, loop it | |
| if len(eff) < len(timeline): | |
| loops = math.ceil(len(timeline) / len(eff)) | |
| eff = eff * loops | |
| if len(bg) < len(timeline): | |
| loops = math.ceil(len(timeline) / len(bg)) | |
| bg = bg * loops | |
| # Cut or match to TTS length | |
| eff = eff[:len(timeline)] | |
| bg = bg[:len(timeline)] | |
| bg = bg + 6 | |
| eff = eff + 6 | |
| eff_timeline = eff.overlay(timeline) | |
| final_audio = bg.overlay(eff_timeline) | |
| final_16k_audio = timeline.set_frame_rate(16000).set_channels(1) | |
| else: | |
| # Fallback: no background found, just use TTS | |
| final_audio = timeline | |
| final_16k_audio = timeline | |
| final_audio.export(audio_in, format="wav") | |
| final_16k_audio.export(audio_16k_in, format="wav") | |
| print(f"Done! Saved to {audio_in}") | |
| lipsynced_video = output_dir + "/output_with_lipsync_16k.mp4" | |
| if allow_lipsync: | |
| apply_lipsync(video_in, audio_16k_in, lipsynced_video) | |
| else: | |
| lipsynced_video = video_in | |
| video_out = output_dir + "/output_with_lipsync.mp4" | |
| cmd = [ | |
| "ffmpeg", | |
| "-loglevel", "error", | |
| "-y", # overwrite output file | |
| "-i", lipsynced_video, # input video | |
| "-i", audio_in, # new audio | |
| "-c:v", "copy", # do not re-encode video | |
| "-map", "0:v:0", # take video from input 0 | |
| "-map", "1:a:0", # take audio from input 1 | |
| "-shortest", # stop when either track ends | |
| video_out, | |
| ] | |
| subprocess.run(cmd, check=True) | |
| # IMPORTANT: return the 10s-max SRT for download | |
| return video_out, srt_10s_path, audio_16k_in | |
| css = """ | |
| #col-container { | |
| margin: 0 auto; | |
| max-width: 1600px; | |
| } | |
| #modal-container { | |
| width: 100vw; /* Take full viewport width */ | |
| height: 100vh; /* Take full viewport height (optional) */ | |
| display: flex; | |
| justify-content: center; /* Center content horizontally */ | |
| align-items: center; /* Center content vertically if desired */ | |
| } | |
| #modal-content { | |
| width: 100%; | |
| max-width: 700px; /* Limit content width */ | |
| margin: 0 auto; | |
| border-radius: 8px; | |
| padding: 1.5rem; | |
| } | |
| #step-column { | |
| padding: 10px; | |
| border-radius: 8px; | |
| box-shadow: var(--card-shadow); | |
| margin: 10px; | |
| } | |
| #col-showcase { | |
| margin: 0 auto; | |
| max-width: 1100px; | |
| } | |
| .button-gradient { | |
| background: linear-gradient(45deg, rgb(255, 65, 108), rgb(255, 75, 43), rgb(255, 155, 0), rgb(255, 65, 108)) 0% 0% / 400% 400%; | |
| border: none; | |
| padding: 14px 28px; | |
| font-size: 16px; | |
| font-weight: bold; | |
| color: white; | |
| border-radius: 10px; | |
| cursor: pointer; | |
| transition: 0.3s ease-in-out; | |
| animation: 2s linear 0s infinite normal none running gradientAnimation; | |
| box-shadow: rgba(255, 65, 108, 0.6) 0px 4px 10px; | |
| } | |
| .toggle-container { | |
| display: inline-flex; | |
| background-color: #ffd6ff; /* light pink background */ | |
| border-radius: 9999px; | |
| padding: 4px; | |
| position: relative; | |
| width: fit-content; | |
| font-family: sans-serif; | |
| } | |
| .toggle-container input[type="radio"] { | |
| display: none; | |
| } | |
| .toggle-container label { | |
| position: relative; | |
| z-index: 2; | |
| flex: 1; | |
| text-align: center; | |
| font-weight: 700; | |
| color: #4b2ab5; /* dark purple text for unselected */ | |
| padding: 6px 22px; | |
| border-radius: 9999px; | |
| cursor: pointer; | |
| transition: color 0.25s ease; | |
| } | |
| /* Moving highlight */ | |
| .toggle-highlight { | |
| position: absolute; | |
| top: 4px; | |
| left: 4px; | |
| width: calc(50% - 4px); | |
| height: calc(100% - 8px); | |
| background-color: #4b2ab5; /* dark purple background */ | |
| border-radius: 9999px; | |
| transition: transform 0.25s ease; | |
| z-index: 1; | |
| } | |
| /* When "True" is checked */ | |
| #true:checked ~ label[for="true"] { | |
| color: #ffd6ff; /* light pink text */ | |
| } | |
| /* When "False" is checked */ | |
| #false:checked ~ label[for="false"] { | |
| color: #ffd6ff; /* light pink text */ | |
| } | |
| /* Move highlight to right side when False is checked */ | |
| #false:checked ~ .toggle-highlight { | |
| transform: translateX(100%); | |
| } | |
| """ | |
| def cleanup(request: gr.Request): | |
| sid = request.session_hash | |
| if sid: | |
| print(f"{sid} left") | |
| d1 = os.path.join(os.environ["PROCESSED_RESULTS"], sid) | |
| shutil.rmtree(d1, ignore_errors=True) | |
| def start_session(request: gr.Request): | |
| return request.session_hash | |
| with gr.Blocks(css=css) as demo: | |
| session_state = gr.State() | |
| demo.load(start_session, outputs=[session_state]) | |
| with gr.Column(elem_id="col-container"): | |
| gr.HTML( | |
| """ | |
| <div style="text-align: center;"> | |
| <p style="font-size:16px; display: inline; margin: 0;"> | |
| Translate and lipsync your clips from any language to English | |
| </p> | |
| </div> | |
| <div style="text-align: center;"> | |
| <p style="font-size:16px; display: inline; margin: 0;"> | |
| <strong>OutofLipSync</strong> | |
| </p> | |
| <p style="font-size:16px; display: inline; margin: 0;"> | |
| -- HF Space By: | |
| </p> | |
| <a href="https://huggingface.co/alexnasa" style="display: inline-block; vertical-align: middle; margin-left: 0.5em;"> | |
| <img src="https://img.shields.io/badge/π€-Follow Me-yellow.svg"> | |
| </a> | |
| <a href="https://www.buymeacoffee.com/outofai" style="display: inline-block; vertical-align: middle; margin-left: 0.5em;" target="_blank"><img src="https://img.shields.io/badge/-buy_me_a%C2%A0coffee-red?logo=buy-me-a-coffee" alt="Buy Me A Coffee"></a> | |
| </div> | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(elem_id="step-column"): | |
| gr.HTML(""" | |
| <div> | |
| <span style="font-size: 24px;">1. Upload or Record a Video</span><br> | |
| </div> | |
| """) | |
| video_input = gr.Video( | |
| label="OG Clip", | |
| height=512 | |
| ) | |
| duration = gr.Slider(5, 30, 10, step=1, label="Duration(s)") | |
| uncached_examples = gr.Examples( | |
| examples=[ | |
| [ | |
| "assets/popup-2.mp4", | |
| ], | |
| [ | |
| "assets/sofia-esp.mp4", | |
| ], | |
| [ | |
| "assets/alba-port.mp4", | |
| ], | |
| [ | |
| "assets/lena-de.mp4", | |
| ], | |
| ], | |
| inputs=video_input, | |
| ) | |
| with gr.Column(elem_id="step-column"): | |
| gr.HTML(""" | |
| <div> | |
| <span style="font-size: 24px;">2. Translate + π </span><br> | |
| </div> | |
| """) | |
| video_output = gr.Video(label="Output", height=512) | |
| lipsync = gr.Checkbox(label="Lipsync", value=False, visible=False) | |
| translate_btn = gr.Button("π€ΉββοΈ Translate") | |
| translate_lipsync_btn = gr.Button("π€ΉββοΈ Translate + π Lipsync", variant='primary', elem_classes="button-gradient") | |
| with gr.Column(elem_id="step-column"): | |
| gr.HTML(""" | |
| <div> | |
| <span style="font-size: 24px;">Lipsynced Examples </span><br> | |
| </div> | |
| """) | |
| vocal_16k_output = gr.File(label="Vocal 16k", visible=False) | |
| srt_output = gr.File(label="Download translated diarized SRT", visible=False) | |
| cached_examples = gr.Examples( | |
| examples=[ | |
| [ | |
| "assets/monica-ita.mp4", | |
| True, | |
| 5 | |
| ], | |
| [ | |
| "assets/elena-es.mp4", | |
| True, | |
| 10 | |
| ], | |
| [ | |
| "assets/ana-es.mp4", | |
| True, | |
| 10 | |
| ], | |
| [ | |
| "assets/spanish-2.mp4", | |
| True, | |
| 5 | |
| ], | |
| [ | |
| "assets/italian.mp4", | |
| True, | |
| 5 | |
| ], | |
| [ | |
| "assets/alica-por-2.mp4", | |
| True, | |
| 10 | |
| ], | |
| ], | |
| fn=run_example, | |
| inputs=[video_input, lipsync, duration], | |
| outputs=[video_output, srt_output, vocal_16k_output], | |
| cache_examples=True | |
| ) | |
| translate_btn.click( | |
| fn=translate_video, | |
| inputs=[video_input, duration, session_state], | |
| outputs=[video_output, srt_output, vocal_16k_output], | |
| ) | |
| translate_lipsync_btn.click( | |
| fn=translate_lipsync_video, | |
| inputs=[video_input, duration, session_state], | |
| outputs=[video_output, srt_output, vocal_16k_output], | |
| ) | |
| if __name__ == "__main__": | |
| demo.unload(cleanup) | |
| demo.queue() | |
| demo.launch(ssr_mode=False) |