File size: 5,701 Bytes
c8ead6d
 
 
 
 
 
 
 
 
 
 
 
 
c573a1a
 
c8ead6d
 
 
c573a1a
c8ead6d
c573a1a
c8ead6d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94904d8
c8ead6d
 
 
 
 
 
 
 
 
 
 
 
 
 
94904d8
c8ead6d
c573a1a
c8ead6d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import os
import torch
import gradio as gr
from abc import ABC, abstractmethod
from typing import List
from datetime import datetime

from modules.whisper_parameter import *
from modules.subtitle_manager import *


class TranslationBase(ABC):
    def __init__(self,
                 model_dir: str,
                 output_dir: str):
        super().__init__()
        self.model = None
        self.model_dir = model_dir
        self.output_dir = output_dir
        os.makedirs(self.model_dir, exist_ok=True)
        os.makedirs(self.output_dir, exist_ok=True)
        self.current_model_size = None
        self.device = self.get_device()

    @abstractmethod
    def translate(self,
                  text: str
                  ):
        pass

    @abstractmethod
    def update_model(self,
                     model_size: str,
                     src_lang: str,
                     tgt_lang: str,
                     progress: gr.Progress
                     ):
        pass

    def translate_file(self,
                       fileobjs: list,
                       model_size: str,
                       src_lang: str,
                       tgt_lang: str,
                       add_timestamp: bool,
                       progress=gr.Progress()) -> list:
        """
        Translate subtitle file from source language to target language

        Parameters
        ----------
        fileobjs: list
            List of files to transcribe from gr.Files()
        model_size: str
            Whisper model size from gr.Dropdown()
        src_lang: str
            Source language of the file to translate from gr.Dropdown()
        tgt_lang: str
            Target language of the file to translate from gr.Dropdown()
        add_timestamp: bool
            Boolean value from gr.Checkbox() that determines whether to add a timestamp at the end of the filename.
        progress: gr.Progress
            Indicator to show progress directly in gradio.
            I use a forked version of whisper for this. To see more info : https://github.com/jhj0517/jhj0517-whisper/tree/add-progress-callback

        Returns
        ----------
        A List of
        String to return to gr.Textbox()
        Files to return to gr.Files()
        """
        try:
            self.update_model(model_size=model_size,
                              src_lang=src_lang,
                              tgt_lang=tgt_lang,
                              progress=progress)

            files_info = {}
            for fileobj in fileobjs:
                file_path = fileobj.name
                file_name, file_ext = os.path.splitext(os.path.basename(fileobj.name))
                if file_ext == ".srt":
                    parsed_dicts = parse_srt(file_path=file_path)
                    total_progress = len(parsed_dicts)
                    for index, dic in enumerate(parsed_dicts):
                        progress(index / total_progress, desc="Translating..")
                        translated_text = self.translate(dic["sentence"])
                        dic["sentence"] = translated_text
                    subtitle = get_serialized_srt(parsed_dicts)

                    timestamp = datetime.now().strftime("%m%d%H%M%S")
                    if add_timestamp:
                        output_path = os.path.join("outputs", "translations", f"{file_name}-{timestamp}.srt")
                    else:
                        output_path = os.path.join("outputs", "translations", f"{file_name}.srt")

                elif file_ext == ".vtt":
                    parsed_dicts = parse_vtt(file_path=file_path)
                    total_progress = len(parsed_dicts)
                    for index, dic in enumerate(parsed_dicts):
                        progress(index / total_progress, desc="Translating..")
                        translated_text = self.translate(dic["sentence"])
                        dic["sentence"] = translated_text
                    subtitle = get_serialized_vtt(parsed_dicts)

                    timestamp = datetime.now().strftime("%m%d%H%M%S")
                    if add_timestamp:
                        output_path = os.path.join(self.output_dir, "translations", f"{file_name}-{timestamp}.vtt")
                    else:
                        output_path = os.path.join(self.output_dir, "translations", f"{file_name}.vtt")

                write_file(subtitle, output_path)
                files_info[file_name] = subtitle

            total_result = ''
            for file_name, subtitle in files_info.items():
                total_result += '------------------------------------\n'
                total_result += f'{file_name}\n\n'
                total_result += f'{subtitle}'

            gr_str = f"Done! Subtitle is in the outputs/translation folder.\n\n{total_result}"
            return [gr_str, output_path]
        except Exception as e:
            print(f"Error: {str(e)}")
        finally:
            self.release_cuda_memory()
            self.remove_input_files([fileobj.name for fileobj in fileobjs])

    @staticmethod
    def get_device():
        if torch.cuda.is_available():
            return "cuda"
        elif torch.backends.mps.is_available():
            return "mps"
        else:
            return "cpu"

    @staticmethod
    def release_cuda_memory():
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
            torch.cuda.reset_max_memory_allocated()

    @staticmethod
    def remove_input_files(file_paths: List[str]):
        if not file_paths:
            return

        for file_path in file_paths:
            if file_path and os.path.exists(file_path):
                os.remove(file_path)