import os import json import shutil import librosa import soundfile import numpy as np import gradio as gr from UVR_interface import root, UVRInterface, VR_MODELS_DIR, MDX_MODELS_DIR, DEMUCS_MODELS_DIR from gui_data.constants import * from typing import List, Dict, Callable, Union import wget class UVRWebUI: def __init__(self, uvr: UVRInterface, online_data_path: str) -> None: self.uvr = uvr self.models_url = self.get_models_url(online_data_path) self.define_layout() self.input_temp_dir = '__temp' self.export_path = 'out' if not os.path.exists(self.input_temp_dir): os.makedirs(self.input_temp_dir, exist_ok=True) if not os.path.exists(self.export_path): os.makedirs(self.export_path, exist_ok=True) def get_models_url(self, models_info_path: str) -> Dict[str, Dict]: with open(models_info_path, 'r') as f: online_data = json.loads(f.read()) models_url = {} for arch, download_list_key in zip([VR_ARCH_TYPE, MDX_ARCH_TYPE], ['vr_download_list', 'mdx_download_list']): models_url[arch] = {model_name: NORMAL_REPO + model_filename_part for model_name, model_filename_part in online_data[download_list_key].items()} models_url[DEMUCS_ARCH_TYPE] = online_data['demucs_download_list'] return models_url def get_local_models(self, arch: str) -> List[str]: model_config = {VR_ARCH_TYPE: (VR_MODELS_DIR, '.pth'), MDX_ARCH_TYPE: (MDX_MODELS_DIR, '.onnx'), DEMUCS_ARCH_TYPE: (DEMUCS_MODELS_DIR, '.yaml')} try: model_dir, suffix = model_config[arch] if not os.path.exists(model_dir): os.makedirs(model_dir, exist_ok=True) return [] except KeyError: print(f'Error: Unknown arch type: {arch} in get_local_models') return [] if not os.path.exists(model_dir): print(f'Warning: Model directory {model_dir} still does not exist for arch {arch}.') return [] return sorted([os.path.splitext(f)[0] for f in os.listdir(model_dir) if f.endswith(suffix) and os.path.isfile(os.path.join(model_dir, f))]) def set_arch_setting_value(self, arch: str, setting1, setting2): if arch == VR_ARCH_TYPE: root.window_size_var.set(setting1) root.aggression_setting_var.set(setting2) elif arch == MDX_ARCH_TYPE: root.mdx_batch_size_var.set(setting1) root.compensate_var.set(setting2) elif arch == DEMUCS_ARCH_TYPE: pass def arch_select_update(self, arch: str) -> List[Dict]: choices = self.get_local_models(arch) if not choices: print(f'Warning: No local models found for {arch}. Dropdown will be empty.') model_update_label = CHOOSE_MODEL if arch == VR_ARCH_TYPE: model_update_label = SELECT_VR_MODEL_MAIN_LABEL setting1_update = self.arch_setting1.update(choices=VR_WINDOW, label=WINDOW_SIZE_MAIN_LABEL, value=root.window_size_var.get()) setting2_update = self.arch_setting2.update(choices=VR_AGGRESSION, label=AGGRESSION_SETTING_MAIN_LABEL, value=root.aggression_setting_var.get()) elif arch == MDX_ARCH_TYPE: model_update_label = CHOOSE_MDX_MODEL_MAIN_LABEL setting1_update = self.arch_setting1.update(choices=BATCH_SIZE, label=BATCHES_MDX_MAIN_LABEL, value=root.mdx_batch_size_var.get()) setting2_update = self.arch_setting2.update(choices=VOL_COMPENSATION, label=VOL_COMP_MDX_MAIN_LABEL, value=root.compensate_var.get()) elif arch == DEMUCS_ARCH_TYPE: model_update_label = CHOOSE_DEMUCS_MODEL_MAIN_LABEL setting1_update = self.arch_setting1.update(choices=[], label='Demucs Setting 1', value=None, visible=False) setting2_update = self.arch_setting2.update(choices=[], label='Demucs Setting 2', value=None, visible=False) else: gr.Error(f'Unknown arch type: {arch}') model_update = self.model_choice.update(choices=[], value=CHOOSE_MODEL, label='Error: Unknown Arch') setting1_update = self.arch_setting1.update(choices=[], value=None, label='Setting 1') setting2_update = self.arch_setting2.update(choices=[], value=None, label='Setting 2') return [model_update, setting1_update, setting2_update] model_update = self.model_choice.update(choices=choices, value=CHOOSE_MODEL, label=model_update_label) return [model_update, setting1_update, setting2_update] def model_select_update(self, arch: str, model_name: str) -> List[Union[str, Dict, None]]: if model_name == CHOOSE_MODEL or model_name is None: return [self.primary_stem_only.update(label=f'{PRIMARY_STEM} only'), self.secondary_stem_only.update(label=f'{SECONDARY_STEM} only'), self.primary_stem_out.update(label=f'Output {PRIMARY_STEM}'), self.secondary_stem_out.update(label=f'Output {SECONDARY_STEM}')] model_data_list = self.uvr.assemble_model_data(model_name, arch) if not model_data_list: gr.Error(f'Cannot get model data for model {model_name}, arch {arch}. Model list empty.') return [None for _ in range(4)] model = model_data_list[0] if not model.model_status: gr.Error(f'Cannot get model data, model hash = {model.model_hash}') return [None for _ in range(4)] stem1_check_update = self.primary_stem_only.update(label=f'{model.primary_stem} Only') stem2_check_update = self.secondary_stem_only.update(label=f'{model.secondary_stem} Only') stem1_out_update = self.primary_stem_out.update(label=f'Output {model.primary_stem}') stem2_out_update = self.secondary_stem_out.update(label=f'Output {model.secondary_stem}') return [stem1_check_update, stem2_check_update, stem1_out_update, stem2_out_update] def checkbox_set_root_value(self, checkbox: gr.Checkbox, root_attr: str): checkbox.change(lambda value: root.__getattribute__(root_attr).set(value), inputs=checkbox) def set_checkboxes_exclusive(self, checkboxes: List[gr.Checkbox], pure_callbacks: List[Callable], exclusive_value=True): def exclusive_onchange(i, callback_i): def new_onchange(*check_values): current_values = [cb.value for cb in checkboxes] if current_values[i] == exclusive_value: return_values = [] for j, value_j in enumerate(current_values): if j != i and value_j == exclusive_value: return_values.append(not exclusive_value) else: return_values.append(current_values[j]) return_values[i] = exclusive_value else: return_values = current_values for cb_idx, final_val in enumerate(return_values): pure_callbacks[cb_idx](final_val) return tuple(return_values) return new_onchange for i, (checkbox, callback) in enumerate(zip(checkboxes, pure_callbacks)): def create_exclusive_handler(changed_idx, all_checkboxes, all_callbacks): def handler(is_checked): outputs = [] all_callbacks[changed_idx](is_checked) for k_idx, cb_k in enumerate(all_checkboxes): if k_idx == changed_idx: outputs.append(is_checked) elif is_checked: all_callbacks[k_idx](False) outputs.append(False) else: outputs.append(gr.update()) return tuple(outputs) return handler checkbox.change(create_exclusive_handler(i, checkboxes, pure_callbacks), inputs=checkbox, outputs=checkboxes) def process(self, input_audio, input_filename, model_name, arch, setting1, setting2, progress=gr.Progress(track_tqdm=True)): if input_audio is None: return (None, None, 'Error: No input audio provided.') if model_name == CHOOSE_MODEL or model_name is None: return (None, None, 'Error: Please select a model.') def set_progress_func(step, inference_iterations=0): pass sampling_rate, audio_data = input_audio if np.issubdtype(audio_data.dtype, np.integer): audio_data = (audio_data / np.iinfo(audio_data.dtype).max).astype(np.float32) elif not np.issubdtype(audio_data.dtype, np.floating): return (None, None, f'Error: Unsupported audio data type {audio_data.dtype}') if len(audio_data.shape) > 1 and audio_data.shape[0] > 5: audio_data = audio_data.T if len(audio_data.shape) > 1: audio_data = librosa.to_mono(audio_data) if not input_filename: input_filename = 'audio_input.wav' elif not input_filename.lower().endswith(('.wav', '.mp3', '.flac')): input_filename += '.wav' input_path = os.path.join(self.input_temp_dir, os.path.basename(input_filename)) try: soundfile.write(input_path, audio_data, sampling_rate, format='wav') except Exception as e: return (None, None, f'Error writing temporary input file: {e}') self.set_arch_setting_value(arch, setting1, setting2) separator = self.uvr.process(model_name=model_name, arch_type=arch, audio_file=input_path, export_path=self.export_path, is_model_sample_mode=root.model_sample_mode_var.get(), set_progress_func=set_progress_func) if separator is None: if os.path.exists(input_path): os.remove(input_path) return (None, None, 'Error during processing. Separator object is None.') primary_audio_out = None secondary_audio_out = None msg = '' if separator.export_path and separator.audio_file_base and separator.primary_stem: if not separator.is_secondary_stem_only: primary_stem_path = os.path.join(separator.export_path, f'{separator.audio_file_base}_({separator.primary_stem}).wav') if os.path.exists(primary_stem_path): audio_p, rate_p = soundfile.read(primary_stem_path) primary_audio_out = (rate_p, audio_p) msg += f'{separator.primary_stem} saved at {primary_stem_path}\n' else: msg += f'Error: Primary stem file not found at {primary_stem_path}\n' else: msg += 'Error: Missing data in separator object for primary stem.\n' if separator.export_path and separator.audio_file_base and separator.secondary_stem: if not separator.is_primary_stem_only: secondary_stem_path = os.path.join(separator.export_path, f'{separator.audio_file_base}_({separator.secondary_stem}).wav') if os.path.exists(secondary_stem_path): audio_s, rate_s = soundfile.read(secondary_stem_path) secondary_audio_out = (rate_s, audio_s) msg += f'{separator.secondary_stem} saved at {secondary_stem_path}\n' else: msg += f'Error: Secondary stem file not found at {secondary_stem_path}\n' else: msg += 'Error: Missing data in separator object for secondary stem.\n' if os.path.exists(input_path): os.remove(input_path) return (primary_audio_out, secondary_audio_out, msg.strip()) def define_layout(self): with gr.Blocks() as app: self.app = app gr.HTML('