Anupam251272 commited on
Commit
74d2202
·
verified ·
1 Parent(s): 7e0bb4a

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +39 -161
app.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import gradio as gr
2
  import yt_dlp
3
  import torch
@@ -6,196 +7,73 @@ import whisper
6
  import threading
7
  from queue import Queue
8
  import time
9
- import os
 
 
 
10
 
11
  # Device setup
12
  device = "cuda" if torch.cuda.is_available() else "cpu"
13
  print(f"Using device: {device}")
14
 
15
- # Load Whisper model
16
- try:
17
- processor = AutoProcessor.from_pretrained("openai/whisper-large-v3-turbo")
18
- model = AutoModelForSpeechSeq2Seq.from_pretrained("openai/whisper-large-v3-turbo")
19
- if device == "cuda":
20
- model.to(device)
21
- print("Whisper model loaded successfully")
22
- except Exception as e:
23
- print(f"Error loading Whisper model: {e}")
24
 
25
  # Load summarization model
26
  summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device=-1)
27
 
28
- class YouTubeHandler:
29
- def __init__(self):
30
- self.current_video_id = None
31
- self.transcript_queue = Queue()
32
- self.processing = False
33
- self.download_thread = None
34
-
35
- def search_youtube(self, query):
36
- """Search YouTube and return video details"""
37
- ydl_opts = {
38
- "format": "bestaudio/best",
39
- "quiet": True,
40
- "noplaylist": True,
41
- "default_search": "ytsearch1",
42
- }
43
-
44
- try:
45
- with yt_dlp.YoutubeDL(ydl_opts) as ydl:
46
- info = ydl.extract_info(f"ytsearch:{query}", download=False)
47
- if "entries" not in info or not info["entries"]:
48
- return None, None
49
-
50
- video = info["entries"][0]
51
- video_id = video['id']
52
- video_url = f"https://www.youtube.com/watch?v={video_id}"
53
- return video_id, video['title']
54
- except Exception as e:
55
- print(f"Error searching YouTube: {e}")
56
- return None, None
57
-
58
- def process_audio(self, video_id):
59
- """Download and process audio in chunks"""
60
- ydl_opts = {
61
- "format": "bestaudio/best",
62
- "quiet": True,
63
- "outtmpl": "temp_audio",
64
- "postprocessors": [{
65
- "key": "FFmpegExtractAudio",
66
- "preferredcodec": "wav",
67
- }],
68
- }
69
-
70
- try:
71
- # Download audio
72
- with yt_dlp.YoutubeDL(ydl_opts) as ydl:
73
- url = f"https://www.youtube.com/watch?v={video_id}"
74
- ydl.download([url])
75
-
76
- # Process audio file
77
- audio = whisper.load_audio("temp_audio.wav")
78
-
79
- chunk_duration = 10 # seconds
80
- sample_rate = 16000
81
- chunk_size = chunk_duration * sample_rate
82
-
83
- for i in range(0, len(audio), chunk_size):
84
- if not self.processing:
85
- break
86
-
87
- chunk = audio[i:i + chunk_size]
88
- if len(chunk) < chunk_size/2: # Skip very short chunks
89
- continue
90
-
91
- input_features = processor(
92
- chunk,
93
- sampling_rate=16000,
94
- return_tensors="pt"
95
- ).input_features
96
-
97
- if device == "cuda":
98
- input_features = input_features.to(device)
99
-
100
- predicted_ids = model.generate(input_features)
101
- transcription = processor.batch_decode(
102
- predicted_ids,
103
- skip_special_tokens=True
104
- )[0]
105
 
106
- if transcription.strip():
107
- self.transcript_queue.put(transcription)
108
-
109
- time.sleep(0.1) # Prevent overwhelming the system
110
-
111
- except Exception as e:
112
- print(f"Error processing audio: {e}")
113
- finally:
114
- # Cleanup
115
- if os.path.exists("temp_audio.wav"):
116
- try:
117
- os.remove("temp_audio.wav")
118
- except:
119
- pass
120
-
121
- def start_processing(self, video_id):
122
- """Start processing a new video"""
123
- self.stop_processing()
124
 
125
- self.current_video_id = video_id
126
- self.processing = True
127
- self.download_thread = threading.Thread(
128
- target=self.process_audio,
129
- args=(video_id,)
130
- )
131
- self.download_thread.start()
132
- return f"Processing video {video_id}"
133
 
134
- def stop_processing(self):
135
- """Stop current processing"""
136
- if self.processing:
137
- self.processing = False
138
- if self.download_thread:
139
- self.download_thread.join()
140
- return "Processing stopped"
141
- return "No video processing"
142
 
143
  def summarize_text(text):
144
  """Summarize transcript into a short version."""
145
  if len(text.split()) < 10:
146
  return "Transcript too short for summarization."
147
-
148
  try:
149
  summary = summarizer(text, max_length=50, min_length=10, do_sample=False)
150
  return summary[0]['summary_text']
151
  except Exception as e:
152
- print(f"Error summarizing text: {e}")
153
  return "Summarization failed"
154
 
155
- # Gradio UI
156
  with gr.Blocks() as demo:
157
  gr.Markdown("# 🎥 YouTube Real-Time Transcriber")
158
-
159
- youtube_handler = YouTubeHandler()
160
-
161
- with gr.Row():
162
- word_input = gr.Textbox(label="Enter a Word or Phrase")
163
- search_button = gr.Button("Search Video")
164
-
165
  video_embed = gr.HTML()
166
- status_output = gr.Textbox(label="Status", interactive=False)
167
  transcript_output = gr.Textbox(label="Live Transcript", interactive=False)
168
  summary_output = gr.Textbox(label="Summary", interactive=False)
169
 
170
- stop_button = gr.Button("Stop Processing")
171
-
172
- def handle_video(word):
173
- video_id, title = youtube_handler.search_youtube(word)
174
- if not video_id:
175
- return "<p>No video found.</p>", "No video found.", "", ""
176
-
177
- embed_html = f'''<iframe width="560" height="315" src="https://www.youtube.com/embed/{video_id}" frameborder="0" allowfullscreen></iframe>'''
178
- status = youtube_handler.start_processing(video_id)
179
- return embed_html, status, "", ""
180
-
181
- def update_transcript():
182
- transcript = ""
183
- while not youtube_handler.transcript_queue.empty():
184
- transcript += " " + youtube_handler.transcript_queue.get()
185
-
186
- if transcript.strip():
187
- summary = summarize_text(transcript)
188
- transcript_output.update(value=transcript)
189
- summary_output.update(value=summary)
190
-
191
- def background_update():
192
- while True:
193
- update_transcript()
194
- time.sleep(1) # Update every second
195
-
196
- threading.Thread(target=background_update, daemon=True).start()
197
 
198
- search_button.click(handle_video, inputs=word_input, outputs=[video_embed, status_output, transcript_output, summary_output])
199
- stop_button.click(youtube_handler.stop_processing, outputs=status_output)
200
 
201
  demo.launch(debug=True)
 
1
+ import os
2
  import gradio as gr
3
  import yt_dlp
4
  import torch
 
7
  import threading
8
  from queue import Queue
9
  import time
10
+
11
+ # Ensure the cache directory exists
12
+ MODEL_CACHE = "./models"
13
+ os.makedirs(MODEL_CACHE, exist_ok=True)
14
 
15
  # Device setup
16
  device = "cuda" if torch.cuda.is_available() else "cpu"
17
  print(f"Using device: {device}")
18
 
19
+ # Load Whisper model with cache_dir
20
+ processor = AutoProcessor.from_pretrained("openai/whisper-large-v3-turbo", cache_dir=MODEL_CACHE)
21
+ model = AutoModelForSpeechSeq2Seq.from_pretrained("openai/whisper-large-v3-turbo", cache_dir=MODEL_CACHE).to(device)
 
 
 
 
 
 
22
 
23
  # Load summarization model
24
  summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device=-1)
25
 
26
+ audio_path = "/tmp/temp_audio.wav" # Use /tmp for temporary files
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
 
28
+ def process_audio(video_id):
29
+ """Download and transcribe YouTube audio"""
30
+ ydl_opts = {
31
+ "format": "bestaudio/best",
32
+ "quiet": True,
33
+ "outtmpl": "/tmp/temp_audio.%(ext)s", # Save inside /tmp
34
+ "postprocessors": [{"key": "FFmpegExtractAudio", "preferredcodec": "wav"}],
35
+ }
36
+ try:
37
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
38
+ ydl.download([f"https://www.youtube.com/watch?v={video_id}"])
 
 
 
 
 
 
 
39
 
40
+ audio = whisper.load_audio(audio_path)
41
+ input_features = processor(audio, sampling_rate=16000, return_tensors="pt").input_features.to(device)
42
+ predicted_ids = model.generate(input_features)
43
+ transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
 
 
 
 
44
 
45
+ return transcription
46
+ except Exception as e:
47
+ return f"Error processing audio: {e}"
48
+ finally:
49
+ if os.path.exists(audio_path):
50
+ os.remove(audio_path) # Clean up
 
 
51
 
52
  def summarize_text(text):
53
  """Summarize transcript into a short version."""
54
  if len(text.split()) < 10:
55
  return "Transcript too short for summarization."
 
56
  try:
57
  summary = summarizer(text, max_length=50, min_length=10, do_sample=False)
58
  return summary[0]['summary_text']
59
  except Exception as e:
 
60
  return "Summarization failed"
61
 
 
62
  with gr.Blocks() as demo:
63
  gr.Markdown("# 🎥 YouTube Real-Time Transcriber")
64
+
65
+ video_input = gr.Textbox(label="Enter YouTube Video ID")
66
+ process_button = gr.Button("Transcribe")
 
 
 
 
67
  video_embed = gr.HTML()
 
68
  transcript_output = gr.Textbox(label="Live Transcript", interactive=False)
69
  summary_output = gr.Textbox(label="Summary", interactive=False)
70
 
71
+ def handle_video(video_id):
72
+ embed_html = f"""<iframe width='560' height='315' src='https://www.youtube.com/embed/{video_id}' frameborder='0' allowfullscreen></iframe>"""
73
+ transcript = process_audio(video_id)
74
+ summary = summarize_text(transcript) if transcript else ""
75
+ return embed_html, transcript, summary
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
 
77
+ process_button.click(handle_video, inputs=[video_input], outputs=[video_embed, transcript_output, summary_output])
 
78
 
79
  demo.launch(debug=True)