Spaces:
Running
Running
File size: 5,701 Bytes
c8ead6d c573a1a c8ead6d c573a1a c8ead6d c573a1a c8ead6d 94904d8 c8ead6d 94904d8 c8ead6d c573a1a c8ead6d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
import os
import torch
import gradio as gr
from abc import ABC, abstractmethod
from typing import List
from datetime import datetime
from modules.whisper_parameter import *
from modules.subtitle_manager import *
class TranslationBase(ABC):
def __init__(self,
model_dir: str,
output_dir: str):
super().__init__()
self.model = None
self.model_dir = model_dir
self.output_dir = output_dir
os.makedirs(self.model_dir, exist_ok=True)
os.makedirs(self.output_dir, exist_ok=True)
self.current_model_size = None
self.device = self.get_device()
@abstractmethod
def translate(self,
text: str
):
pass
@abstractmethod
def update_model(self,
model_size: str,
src_lang: str,
tgt_lang: str,
progress: gr.Progress
):
pass
def translate_file(self,
fileobjs: list,
model_size: str,
src_lang: str,
tgt_lang: str,
add_timestamp: bool,
progress=gr.Progress()) -> list:
"""
Translate subtitle file from source language to target language
Parameters
----------
fileobjs: list
List of files to transcribe from gr.Files()
model_size: str
Whisper model size from gr.Dropdown()
src_lang: str
Source language of the file to translate from gr.Dropdown()
tgt_lang: str
Target language of the file to translate from gr.Dropdown()
add_timestamp: bool
Boolean value from gr.Checkbox() that determines whether to add a timestamp at the end of the filename.
progress: gr.Progress
Indicator to show progress directly in gradio.
I use a forked version of whisper for this. To see more info : https://github.com/jhj0517/jhj0517-whisper/tree/add-progress-callback
Returns
----------
A List of
String to return to gr.Textbox()
Files to return to gr.Files()
"""
try:
self.update_model(model_size=model_size,
src_lang=src_lang,
tgt_lang=tgt_lang,
progress=progress)
files_info = {}
for fileobj in fileobjs:
file_path = fileobj.name
file_name, file_ext = os.path.splitext(os.path.basename(fileobj.name))
if file_ext == ".srt":
parsed_dicts = parse_srt(file_path=file_path)
total_progress = len(parsed_dicts)
for index, dic in enumerate(parsed_dicts):
progress(index / total_progress, desc="Translating..")
translated_text = self.translate(dic["sentence"])
dic["sentence"] = translated_text
subtitle = get_serialized_srt(parsed_dicts)
timestamp = datetime.now().strftime("%m%d%H%M%S")
if add_timestamp:
output_path = os.path.join("outputs", "translations", f"{file_name}-{timestamp}.srt")
else:
output_path = os.path.join("outputs", "translations", f"{file_name}.srt")
elif file_ext == ".vtt":
parsed_dicts = parse_vtt(file_path=file_path)
total_progress = len(parsed_dicts)
for index, dic in enumerate(parsed_dicts):
progress(index / total_progress, desc="Translating..")
translated_text = self.translate(dic["sentence"])
dic["sentence"] = translated_text
subtitle = get_serialized_vtt(parsed_dicts)
timestamp = datetime.now().strftime("%m%d%H%M%S")
if add_timestamp:
output_path = os.path.join(self.output_dir, "translations", f"{file_name}-{timestamp}.vtt")
else:
output_path = os.path.join(self.output_dir, "translations", f"{file_name}.vtt")
write_file(subtitle, output_path)
files_info[file_name] = subtitle
total_result = ''
for file_name, subtitle in files_info.items():
total_result += '------------------------------------\n'
total_result += f'{file_name}\n\n'
total_result += f'{subtitle}'
gr_str = f"Done! Subtitle is in the outputs/translation folder.\n\n{total_result}"
return [gr_str, output_path]
except Exception as e:
print(f"Error: {str(e)}")
finally:
self.release_cuda_memory()
self.remove_input_files([fileobj.name for fileobj in fileobjs])
@staticmethod
def get_device():
if torch.cuda.is_available():
return "cuda"
elif torch.backends.mps.is_available():
return "mps"
else:
return "cpu"
@staticmethod
def release_cuda_memory():
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.reset_max_memory_allocated()
@staticmethod
def remove_input_files(file_paths: List[str]):
if not file_paths:
return
for file_path in file_paths:
if file_path and os.path.exists(file_path):
os.remove(file_path)
|