Omarrran commited on
Commit
0b8958e
·
verified ·
1 Parent(s): d7e5954

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +277 -143
app.py CHANGED
@@ -1,17 +1,19 @@
1
  """
2
- TTS Dataset Collection Tool with Font Support and Enhanced Error Handling
3
  """
4
 
5
  import os
6
  import json
7
  import nltk
8
  import gradio as gr
 
9
  from datetime import datetime
10
  from pathlib import Path
11
- import shutil
12
  import logging
13
- from typing import Dict, List, Tuple, Optional
14
  import traceback
 
 
15
 
16
  # Download NLTK data during initialization
17
  try:
@@ -43,12 +45,12 @@ logger = logging.getLogger(__name__)
43
  FONT_STYLES = {
44
  "english_serif": {
45
  "name": "Times New Roman",
46
- "family": "serif",
47
  "css": "font-family: 'Times New Roman', serif;"
48
  },
49
  "english_sans": {
50
  "name": "Arial",
51
- "family": "sans-serif",
52
  "css": "font-family: Arial, sans-serif;"
53
  },
54
  "nastaliq": {
@@ -66,80 +68,82 @@ FONT_STYLES = {
66
 
67
  class TTSDatasetCollector:
68
  """Manages TTS dataset collection and organization with enhanced features"""
69
-
70
  def __init__(self):
71
  """Initialize the TTS Dataset Collector"""
72
  self.root_path = Path(os.path.dirname(os.path.abspath(__file__))) / "dataset"
 
73
  self.sentences = []
74
  self.current_index = 0
75
  self.current_font = "english_serif"
 
76
  self.setup_directories()
77
-
78
  # Ensure NLTK data is downloaded
79
  try:
80
  nltk.data.find('tokenizers/punkt')
81
  except LookupError:
82
  nltk.download('punkt', quiet=True)
83
-
84
  logger.info("TTS Dataset Collector initialized")
85
-
86
  def setup_directories(self) -> None:
87
  """Create necessary directory structure with logging"""
88
  try:
89
  # Create main dataset directory
90
- self.root_path.mkdir(exist_ok=True)
91
-
92
  # Create subdirectories
93
  for subdir in ['audio', 'transcriptions', 'metadata', 'fonts']:
94
- (self.root_path / subdir).mkdir(exist_ok=True)
95
-
96
  # Initialize log file
97
  log_file = self.root_path / 'dataset_log.txt'
98
  if not log_file.exists():
99
  with open(log_file, 'w', encoding='utf-8') as f:
100
  f.write(f"Dataset collection initialized on {datetime.now().isoformat()}\n")
101
-
102
  logger.info("Directory structure created successfully")
103
-
104
  except Exception as e:
105
  logger.error(f"Failed to create directory structure: {str(e)}")
106
  logger.error(traceback.format_exc())
107
  raise RuntimeError("Failed to initialize directory structure")
108
-
109
  def log_operation(self, message: str, level: str = "info") -> None:
110
  """Log operations with timestamp and level"""
111
  try:
112
  log_file = self.root_path / 'dataset_log.txt'
113
  timestamp = datetime.now().isoformat()
114
-
115
  with open(log_file, 'a', encoding='utf-8') as f:
116
  f.write(f"[{timestamp}] [{level.upper()}] {message}\n")
117
-
118
  if level.lower() == "error":
119
  logger.error(message)
120
  else:
121
  logger.info(message)
122
-
123
  except Exception as e:
124
  logger.error(f"Failed to log operation: {str(e)}")
125
-
126
  def process_text(self, text: str) -> Tuple[bool, str]:
127
  """Process pasted or loaded text with error handling"""
128
  try:
129
  if not text.strip():
130
  return False, "Text is empty"
131
-
132
  # Simple sentence splitting as fallback
133
  def simple_split_sentences(text):
134
  # Split on common sentence endings
135
  sentences = []
136
  current = []
137
-
138
  for line in text.split('\n'):
139
  line = line.strip()
140
  if not line:
141
  continue
142
-
143
  # Split on common sentence endings
144
  parts = line.replace('!', '.').replace('?', '.').split('.')
145
  for part in parts:
@@ -148,12 +152,12 @@ class TTSDatasetCollector:
148
  current.append(part)
149
  sentences.append(' '.join(current))
150
  current = []
151
-
152
  if current:
153
  sentences.append(' '.join(current))
154
-
155
  return [s.strip() for s in sentences if s.strip()]
156
-
157
  try:
158
  # Try NLTK first
159
  self.sentences = nltk.sent_tokenize(text.strip())
@@ -161,16 +165,16 @@ class TTSDatasetCollector:
161
  logger.warning(f"NLTK tokenization failed, falling back to simple splitting: {str(e)}")
162
  # Fallback to simple splitting
163
  self.sentences = simple_split_sentences(text.strip())
164
-
165
  if not self.sentences:
166
  return False, "No valid sentences found in text"
167
-
168
  self.current_index = 0
169
-
170
  # Log success
171
  self.log_operation(f"Processed text with {len(self.sentences)} sentences")
172
  return True, f"Successfully loaded {len(self.sentences)} sentences"
173
-
174
  except Exception as e:
175
  error_msg = f"Error processing text: {str(e)}"
176
  self.log_operation(error_msg, "error")
@@ -181,17 +185,17 @@ class TTSDatasetCollector:
181
  """Process and load text file with enhanced error handling"""
182
  if not file:
183
  return False, "No file provided"
184
-
185
  try:
186
  # Validate file extension
187
  if not file.name.endswith('.txt'):
188
  return False, "Only .txt files are supported"
189
-
190
  with open(file.name, 'r', encoding='utf-8') as f:
191
  text = f.read()
192
-
193
  return self.process_text(text)
194
-
195
  except UnicodeDecodeError:
196
  error_msg = "File encoding error. Please ensure the file is UTF-8 encoded"
197
  self.log_operation(error_msg, "error")
@@ -209,55 +213,111 @@ class TTSDatasetCollector:
209
 
210
  def set_font(self, font_style: str) -> Tuple[bool, str]:
211
  """Set the current font style"""
212
- if font_style not in FONT_STYLES:
213
- return False, f"Invalid font style. Available styles: {', '.join(FONT_STYLES.keys())}"
214
  self.current_font = font_style
215
  return True, f"Font style set to {font_style}"
216
-
217
-
218
-
219
- def generate_filenames(self, dataset_name: str, speaker_id: str) -> Tuple[str, str]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
220
  """Generate unique filenames for audio and text files"""
 
221
  timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
222
- sentence_id = f"{self.current_index+1:04d}"
223
- base_name = f"{dataset_name}_{speaker_id}_{sentence_id}_{timestamp}"
 
 
 
 
 
 
224
  return f"{base_name}.wav", f"{base_name}.txt"
225
 
226
  def save_recording(self, audio_file, speaker_id: str, dataset_name: str) -> Tuple[bool, str]:
227
  """Save recording with enhanced error handling and logging"""
228
  if not all([audio_file, speaker_id, dataset_name]):
229
  missing = []
230
- if not audio_file: missing.append("audio recording")
231
- if not speaker_id: missing.append("speaker ID")
232
- if not dataset_name: missing.append("dataset name")
 
 
 
233
  return False, f"Missing required information: {', '.join(missing)}"
234
-
 
 
 
 
 
 
235
  try:
236
  # Validate inputs
237
  if not speaker_id.strip().isalnum():
238
  return False, "Speaker ID must contain only letters and numbers"
239
-
240
  if not dataset_name.strip().isalnum():
241
  return False, "Dataset name must contain only letters and numbers"
242
-
 
 
 
243
  # Generate filenames
244
- audio_name, text_name = self.generate_filenames(dataset_name, speaker_id)
245
-
246
  # Create speaker directories
247
  audio_dir = self.root_path / 'audio' / speaker_id
248
  text_dir = self.root_path / 'transcriptions' / speaker_id
249
- audio_dir.mkdir(exist_ok=True)
250
- text_dir.mkdir(exist_ok=True)
251
-
252
  # Save audio file
253
  audio_path = audio_dir / audio_name
254
- shutil.copy2(audio_file, audio_path)
255
-
 
 
 
 
 
256
  # Save transcription
257
  text_path = text_dir / text_name
258
  self.save_transcription(
259
  text_path,
260
- self.sentences[self.current_index],
261
  {
262
  'speaker_id': speaker_id,
263
  'dataset_name': dataset_name,
@@ -266,24 +326,24 @@ class TTSDatasetCollector:
266
  'font_style': self.current_font
267
  }
268
  )
269
-
270
  # Update metadata
271
  self.update_metadata(speaker_id, dataset_name)
272
-
273
  # Log success
274
  self.log_operation(
275
  f"Saved recording: Speaker={speaker_id}, Dataset={dataset_name}, "
276
  f"Audio={audio_name}, Text={text_name}"
277
  )
278
-
279
  return True, f"Recording saved successfully as {audio_name}"
280
-
281
  except Exception as e:
282
  error_msg = f"Error saving recording: {str(e)}"
283
  self.log_operation(error_msg, "error")
284
  logger.error(traceback.format_exc())
285
  return False, error_msg
286
-
287
  def save_transcription(self, file_path: Path, text: str, metadata: Dict) -> None:
288
  """Save transcription with metadata"""
289
  content = f"""[METADATA]
@@ -298,60 +358,64 @@ Font_Style: {metadata['font_style']}
298
  """
299
  with open(file_path, 'w', encoding='utf-8') as f:
300
  f.write(content)
301
-
302
  def update_metadata(self, speaker_id: str, dataset_name: str) -> None:
303
  """Update dataset metadata with error handling"""
304
  metadata_file = self.root_path / 'metadata' / 'dataset_info.json'
305
-
306
  try:
307
  if metadata_file.exists():
308
  with open(metadata_file, 'r') as f:
309
  metadata = json.load(f)
310
  else:
311
  metadata = {'speakers': {}, 'last_updated': None}
312
-
313
  # Update speaker data
314
  if speaker_id not in metadata['speakers']:
315
  metadata['speakers'][speaker_id] = {
316
  'total_recordings': 0,
317
  'datasets': {}
318
  }
319
-
320
  if dataset_name not in metadata['speakers'][speaker_id]['datasets']:
321
  metadata['speakers'][speaker_id]['datasets'][dataset_name] = {
322
  'recordings': 0,
323
  'sentences': len(self.sentences),
 
324
  'first_recording': datetime.now().isoformat(),
325
  'last_recording': None,
326
  'font_styles_used': []
327
  }
328
-
329
  # Update counts and timestamps
330
  metadata['speakers'][speaker_id]['total_recordings'] += 1
331
  metadata['speakers'][speaker_id]['datasets'][dataset_name]['recordings'] += 1
332
  metadata['speakers'][speaker_id]['datasets'][dataset_name]['last_recording'] = \
333
  datetime.now().isoformat()
334
-
 
 
 
 
335
  # Update font styles
336
  if self.current_font not in metadata['speakers'][speaker_id]['datasets'][dataset_name]['font_styles_used']:
337
  metadata['speakers'][speaker_id]['datasets'][dataset_name]['font_styles_used'].append(
338
  self.current_font
339
  )
340
-
341
  metadata['last_updated'] = datetime.now().isoformat()
342
-
343
  # Save updated metadata
344
  with open(metadata_file, 'w') as f:
345
  json.dump(metadata, f, indent=2)
346
-
347
  self.log_operation(f"Updated metadata for {speaker_id} in {dataset_name}")
348
-
349
  except Exception as e:
350
  error_msg = f"Error updating metadata: {str(e)}"
351
  self.log_operation(error_msg, "error")
352
  logger.error(traceback.format_exc())
353
-
354
- # Add these methods to the TTSDatasetCollector class
355
  def get_navigation_info(self) -> Dict[str, Optional[str]]:
356
  """Get current and next sentence information"""
357
  if not self.sentences:
@@ -360,15 +424,15 @@ Font_Style: {metadata['font_style']}
360
  'next': None,
361
  'progress': "No text loaded"
362
  }
363
-
364
  current = self.get_styled_text(self.sentences[self.current_index])
365
  next_text = None
366
-
367
  if self.current_index < len(self.sentences) - 1:
368
  next_text = self.get_styled_text(self.sentences[self.current_index + 1])
369
-
370
  progress = f"Sentence {self.current_index + 1} of {len(self.sentences)}"
371
-
372
  return {
373
  'current': current,
374
  'next': next_text,
@@ -384,15 +448,15 @@ Font_Style: {metadata['font_style']}
384
  'progress': "No text loaded",
385
  'status': "⚠️ Please load a text file first"
386
  }
387
-
388
  if direction == "next" and self.current_index < len(self.sentences) - 1:
389
  self.current_index += 1
390
  elif direction == "prev" and self.current_index > 0:
391
  self.current_index -= 1
392
-
393
  nav_info = self.get_navigation_info()
394
  nav_info['status'] = "✅ Navigation successful"
395
-
396
  return nav_info
397
 
398
  def get_dataset_statistics(self) -> Dict:
@@ -401,18 +465,47 @@ Font_Style: {metadata['font_style']}
401
  metadata_file = self.root_path / 'metadata' / 'dataset_info.json'
402
  if not metadata_file.exists():
403
  return {}
404
-
405
  with open(metadata_file, 'r') as f:
406
- return json.load(f)
 
 
 
 
 
 
 
 
 
 
 
407
  except Exception as e:
408
  logger.error(f"Error reading dataset statistics: {str(e)}")
409
  return {}
410
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
411
 
412
- # Then create the interface function outside the class
413
  def create_interface():
414
  """Create Gradio interface with enhanced features"""
415
-
 
 
416
  # Create custom CSS for fonts
417
  custom_css = """
418
  .gradio-container {
@@ -431,22 +524,24 @@ def create_interface():
431
  min-height: 100px !important;
432
  }
433
  """
434
-
435
  # Add font-face declarations
 
436
  for font_style, font_info in FONT_STYLES.items():
437
- if font_style in ['nastaliq', 'naskh']:
438
- custom_css += f"""
 
439
  @font-face {{
440
  font-family: '{font_info["family"]}';
441
- src: url('fonts/{font_info["family"]}.ttf') format('truetype');
442
  }}
443
  """
444
-
445
- collector = TTSDatasetCollector()
446
-
447
  with gr.Blocks(title="TTS Dataset Collection Tool", css=custom_css) as interface:
448
  gr.Markdown("# TTS Dataset Collection Tool")
449
-
450
  with gr.Row():
451
  # Left column - Configuration and Input
452
  with gr.Column():
@@ -472,48 +567,54 @@ def create_interface():
472
  value="english_serif",
473
  label="Select Font Style"
474
  )
475
-
 
 
 
 
 
 
476
  # Right column - Recording
477
  with gr.Column():
478
  current_text = gr.HTML(
479
  label="Current Sentence",
480
  elem_classes=["sentence-display"]
481
  )
 
 
 
 
 
 
482
  audio_recorder = gr.Audio(
483
  label="Record Audio",
484
  type="filepath",
485
  elem_classes=["record-button"]
486
  )
487
- next_text = gr.HTML(
488
- label="Next Sentence",
489
- elem_classes=["sentence-display"]
490
- )
491
-
492
- # Controls
493
- with gr.Row():
494
- prev_btn = gr.Button("Previous", variant="secondary")
495
- next_btn = gr.Button("Next", variant="primary")
496
- save_btn = gr.Button("Save Recording", variant="primary", elem_classes=["record-button"])
497
-
498
- # Status and Progress
499
- with gr.Row():
500
- progress = gr.Textbox(
501
- label="Progress",
502
- interactive=False
503
- )
504
  status = gr.Textbox(
505
  label="Status",
506
  interactive=False,
507
  max_lines=3
508
  )
509
-
510
- # Dataset Info
511
  with gr.Row():
512
  dataset_info = gr.JSON(
513
  label="Dataset Statistics",
514
  value={}
515
  )
516
-
 
 
 
 
517
  def process_pasted_text(text):
518
  """Handle pasted text input"""
519
  if not text:
@@ -534,29 +635,30 @@ def create_interface():
534
  status: f"❌ {msg}",
535
  dataset_info: collector.get_dataset_statistics()
536
  }
537
-
538
  nav_info = collector.get_navigation_info()
 
539
  return {
540
  current_text: nav_info['current'],
541
  next_text: nav_info['next'],
542
- progress: nav_info['progress'],
543
  status: f"✅ {msg}",
544
  dataset_info: collector.get_dataset_statistics()
545
  }
546
-
547
  def update_font(font_style):
548
  """Update font and refresh display"""
549
  success, msg = collector.set_font(font_style)
550
  if not success:
551
  return {status: msg}
552
-
553
  nav_info = collector.get_navigation_info()
554
  return {
555
  current_text: nav_info['current'],
556
  next_text: nav_info['next'],
557
  status: f"Font updated to {font_style}"
558
  }
559
-
560
  def load_file(file):
561
  """Handle file loading with enhanced error reporting"""
562
  if not file:
@@ -577,98 +679,130 @@ def create_interface():
577
  status: f"❌ {msg}",
578
  dataset_info: collector.get_dataset_statistics()
579
  }
580
-
581
  nav_info = collector.get_navigation_info()
 
582
  return {
583
  current_text: nav_info['current'],
584
  next_text: nav_info['next'],
585
- progress: nav_info['progress'],
586
  status: f"✅ {msg}",
587
  dataset_info: collector.get_dataset_statistics()
588
  }
589
-
590
  def save_current_recording(audio_file, speaker_id_value, dataset_name_value):
591
  """Handle saving the current recording"""
592
  if not audio_file:
593
- return {status: "⚠️ Please record audio first"}
594
-
 
 
 
 
595
  success, msg = collector.save_recording(
596
  audio_file, speaker_id_value, dataset_name_value
597
  )
598
-
599
  if not success:
600
  return {
601
  status: f"❌ {msg}",
602
- dataset_info: collector.get_dataset_statistics()
 
 
603
  }
604
-
 
 
 
 
605
  # Auto-advance to next sentence after successful save
606
  nav_info = collector.navigate("next")
607
-
608
  return {
609
  current_text: nav_info['current'],
610
  next_text: nav_info['next'],
611
- progress: nav_info['progress'],
612
  status: f"✅ {msg}",
613
- dataset_info: collector.get_dataset_statistics()
 
 
614
  }
615
-
616
  def navigate_sentences(direction):
617
  """Handle navigation between sentences"""
618
  nav_info = collector.navigate(direction)
 
619
  return {
620
  current_text: nav_info['current'],
621
  next_text: nav_info['next'],
622
- progress: nav_info['progress'],
623
  status: nav_info['status']
624
  }
625
-
 
 
 
 
 
 
 
 
 
 
 
 
626
  # Event handlers
627
  text_input.change(
628
  process_pasted_text,
629
  inputs=[text_input],
630
  outputs=[current_text, next_text, progress, status, dataset_info]
631
  )
632
-
633
  file_input.upload(
634
  load_file,
635
  inputs=[file_input],
636
  outputs=[current_text, next_text, progress, status, dataset_info]
637
  )
638
-
639
  font_select.change(
640
  update_font,
641
  inputs=[font_select],
642
  outputs=[current_text, next_text, status]
643
  )
644
-
 
 
 
 
 
 
645
  save_btn.click(
646
  save_current_recording,
647
  inputs=[audio_recorder, speaker_id, dataset_name],
648
- outputs=[current_text, next_text, progress, status, dataset_info]
649
  )
650
-
651
  prev_btn.click(
652
  lambda: navigate_sentences("prev"),
653
  outputs=[current_text, next_text, progress, status]
654
  )
655
-
656
  next_btn.click(
657
  lambda: navigate_sentences("next"),
658
  outputs=[current_text, next_text, progress, status]
659
  )
660
-
661
  # Initialize dataset info
662
  dataset_info.value = collector.get_dataset_statistics()
663
-
664
- return interface
665
-
666
  if __name__ == "__main__":
667
  try:
668
  # Set up any required environment variables
669
  os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0"
670
  os.environ["GRADIO_SERVER_PORT"] = "7860"
671
-
672
  # Create and launch the interface
673
  interface = create_interface()
674
  interface.queue() # Enable queuing for better handling of concurrent users
 
1
  """
2
+ TTS Dataset Collection Tool with Custom Fonts and Enhanced Features
3
  """
4
 
5
  import os
6
  import json
7
  import nltk
8
  import gradio as gr
9
+ import uuid
10
  from datetime import datetime
11
  from pathlib import Path
 
12
  import logging
13
+ from typing import Dict, Tuple, Optional
14
  import traceback
15
+ import soundfile as sf
16
+ import re
17
 
18
  # Download NLTK data during initialization
19
  try:
 
45
  FONT_STYLES = {
46
  "english_serif": {
47
  "name": "Times New Roman",
48
+ "family": "Times New Roman",
49
  "css": "font-family: 'Times New Roman', serif;"
50
  },
51
  "english_sans": {
52
  "name": "Arial",
53
+ "family": "Arial",
54
  "css": "font-family: Arial, sans-serif;"
55
  },
56
  "nastaliq": {
 
68
 
69
  class TTSDatasetCollector:
70
  """Manages TTS dataset collection and organization with enhanced features"""
71
+
72
  def __init__(self):
73
  """Initialize the TTS Dataset Collector"""
74
  self.root_path = Path(os.path.dirname(os.path.abspath(__file__))) / "dataset"
75
+ self.fonts_path = self.root_path / "fonts"
76
  self.sentences = []
77
  self.current_index = 0
78
  self.current_font = "english_serif"
79
+ self.custom_fonts = {}
80
  self.setup_directories()
81
+
82
  # Ensure NLTK data is downloaded
83
  try:
84
  nltk.data.find('tokenizers/punkt')
85
  except LookupError:
86
  nltk.download('punkt', quiet=True)
87
+
88
  logger.info("TTS Dataset Collector initialized")
89
+
90
  def setup_directories(self) -> None:
91
  """Create necessary directory structure with logging"""
92
  try:
93
  # Create main dataset directory
94
+ self.root_path.mkdir(parents=True, exist_ok=True)
95
+
96
  # Create subdirectories
97
  for subdir in ['audio', 'transcriptions', 'metadata', 'fonts']:
98
+ (self.root_path / subdir).mkdir(parents=True, exist_ok=True)
99
+
100
  # Initialize log file
101
  log_file = self.root_path / 'dataset_log.txt'
102
  if not log_file.exists():
103
  with open(log_file, 'w', encoding='utf-8') as f:
104
  f.write(f"Dataset collection initialized on {datetime.now().isoformat()}\n")
105
+
106
  logger.info("Directory structure created successfully")
107
+
108
  except Exception as e:
109
  logger.error(f"Failed to create directory structure: {str(e)}")
110
  logger.error(traceback.format_exc())
111
  raise RuntimeError("Failed to initialize directory structure")
112
+
113
  def log_operation(self, message: str, level: str = "info") -> None:
114
  """Log operations with timestamp and level"""
115
  try:
116
  log_file = self.root_path / 'dataset_log.txt'
117
  timestamp = datetime.now().isoformat()
118
+
119
  with open(log_file, 'a', encoding='utf-8') as f:
120
  f.write(f"[{timestamp}] [{level.upper()}] {message}\n")
121
+
122
  if level.lower() == "error":
123
  logger.error(message)
124
  else:
125
  logger.info(message)
126
+
127
  except Exception as e:
128
  logger.error(f"Failed to log operation: {str(e)}")
129
+
130
  def process_text(self, text: str) -> Tuple[bool, str]:
131
  """Process pasted or loaded text with error handling"""
132
  try:
133
  if not text.strip():
134
  return False, "Text is empty"
135
+
136
  # Simple sentence splitting as fallback
137
  def simple_split_sentences(text):
138
  # Split on common sentence endings
139
  sentences = []
140
  current = []
141
+
142
  for line in text.split('\n'):
143
  line = line.strip()
144
  if not line:
145
  continue
146
+
147
  # Split on common sentence endings
148
  parts = line.replace('!', '.').replace('?', '.').split('.')
149
  for part in parts:
 
152
  current.append(part)
153
  sentences.append(' '.join(current))
154
  current = []
155
+
156
  if current:
157
  sentences.append(' '.join(current))
158
+
159
  return [s.strip() for s in sentences if s.strip()]
160
+
161
  try:
162
  # Try NLTK first
163
  self.sentences = nltk.sent_tokenize(text.strip())
 
165
  logger.warning(f"NLTK tokenization failed, falling back to simple splitting: {str(e)}")
166
  # Fallback to simple splitting
167
  self.sentences = simple_split_sentences(text.strip())
168
+
169
  if not self.sentences:
170
  return False, "No valid sentences found in text"
171
+
172
  self.current_index = 0
173
+
174
  # Log success
175
  self.log_operation(f"Processed text with {len(self.sentences)} sentences")
176
  return True, f"Successfully loaded {len(self.sentences)} sentences"
177
+
178
  except Exception as e:
179
  error_msg = f"Error processing text: {str(e)}"
180
  self.log_operation(error_msg, "error")
 
185
  """Process and load text file with enhanced error handling"""
186
  if not file:
187
  return False, "No file provided"
188
+
189
  try:
190
  # Validate file extension
191
  if not file.name.endswith('.txt'):
192
  return False, "Only .txt files are supported"
193
+
194
  with open(file.name, 'r', encoding='utf-8') as f:
195
  text = f.read()
196
+
197
  return self.process_text(text)
198
+
199
  except UnicodeDecodeError:
200
  error_msg = "File encoding error. Please ensure the file is UTF-8 encoded"
201
  self.log_operation(error_msg, "error")
 
213
 
214
  def set_font(self, font_style: str) -> Tuple[bool, str]:
215
  """Set the current font style"""
216
+ if font_style not in FONT_STYLES and font_style not in self.custom_fonts:
217
+ return False, f"Invalid font style. Available styles: {', '.join(FONT_STYLES.keys()) + ', ' + ', '.join(self.custom_fonts.keys())}"
218
  self.current_font = font_style
219
  return True, f"Font style set to {font_style}"
220
+
221
+ def add_custom_font(self, font_file) -> Tuple[bool, str]:
222
+ """Add a custom font from the uploaded TTF file"""
223
+ try:
224
+ if not font_file.name.endswith('.ttf'):
225
+ return False, "Only .ttf font files are supported"
226
+
227
+ # Generate a unique font family name
228
+ font_family = f"font_{uuid.uuid4().hex[:8]}"
229
+ font_filename = font_family + '.ttf'
230
+ font_dest = self.fonts_path / font_filename
231
+
232
+ # Save the font file
233
+ with open(font_dest, 'wb') as f:
234
+ f.write(font_file.read())
235
+
236
+ # Add to custom fonts
237
+ self.custom_fonts[font_family] = {
238
+ 'name': font_file.name,
239
+ 'family': font_family,
240
+ 'css': f"font-family: '{font_family}', serif;"
241
+ }
242
+
243
+ # Update the FONT_STYLES with the custom font
244
+ FONT_STYLES[font_family] = self.custom_fonts[font_family]
245
+
246
+ # Log success
247
+ self.log_operation(f"Added custom font: {font_file.name} as {font_family}")
248
+ return True, f"Custom font '{font_file.name}' added successfully"
249
+
250
+ except Exception as e:
251
+ error_msg = f"Error adding custom font: {str(e)}"
252
+ self.log_operation(error_msg, "error")
253
+ logger.error(traceback.format_exc())
254
+ return False, error_msg
255
+
256
+ def generate_filenames(self, dataset_name: str, speaker_id: str, sentence_text: str) -> Tuple[str, str]:
257
  """Generate unique filenames for audio and text files"""
258
+ line_number = self.current_index + 1
259
  timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
260
+ # Sanitize strings for filenames
261
+ def sanitize_filename(s):
262
+ return re.sub(r'[^a-zA-Z0-9_-]', '_', s)[:50]
263
+
264
+ dataset_name_safe = sanitize_filename(dataset_name)
265
+ speaker_id_safe = sanitize_filename(speaker_id)
266
+ sentence_excerpt = sanitize_filename(sentence_text[:20])
267
+ base_name = f"{dataset_name_safe}_{speaker_id_safe}_line{line_number}_{sentence_excerpt}_{timestamp}"
268
  return f"{base_name}.wav", f"{base_name}.txt"
269
 
270
  def save_recording(self, audio_file, speaker_id: str, dataset_name: str) -> Tuple[bool, str]:
271
  """Save recording with enhanced error handling and logging"""
272
  if not all([audio_file, speaker_id, dataset_name]):
273
  missing = []
274
+ if not audio_file:
275
+ missing.append("audio recording")
276
+ if not speaker_id:
277
+ missing.append("speaker ID")
278
+ if not dataset_name:
279
+ missing.append("dataset name")
280
  return False, f"Missing required information: {', '.join(missing)}"
281
+
282
+ # Check if sentences have been loaded
283
+ if not self.sentences:
284
+ return False, "No sentences have been loaded. Please load text before saving recordings."
285
+ if self.current_index >= len(self.sentences):
286
+ return False, "Current sentence index is out of range."
287
+
288
  try:
289
  # Validate inputs
290
  if not speaker_id.strip().isalnum():
291
  return False, "Speaker ID must contain only letters and numbers"
 
292
  if not dataset_name.strip().isalnum():
293
  return False, "Dataset name must contain only letters and numbers"
294
+
295
+ # Get current sentence text
296
+ sentence_text = self.sentences[self.current_index]
297
+
298
  # Generate filenames
299
+ audio_name, text_name = self.generate_filenames(dataset_name, speaker_id, sentence_text)
300
+
301
  # Create speaker directories
302
  audio_dir = self.root_path / 'audio' / speaker_id
303
  text_dir = self.root_path / 'transcriptions' / speaker_id
304
+ audio_dir.mkdir(parents=True, exist_ok=True)
305
+ text_dir.mkdir(parents=True, exist_ok=True)
306
+
307
  # Save audio file
308
  audio_path = audio_dir / audio_name
309
+
310
+ # Read the audio file using soundfile
311
+ audio_data, sampling_rate = sf.read(audio_file)
312
+
313
+ # Save audio file
314
+ sf.write(str(audio_path), audio_data, sampling_rate)
315
+
316
  # Save transcription
317
  text_path = text_dir / text_name
318
  self.save_transcription(
319
  text_path,
320
+ sentence_text,
321
  {
322
  'speaker_id': speaker_id,
323
  'dataset_name': dataset_name,
 
326
  'font_style': self.current_font
327
  }
328
  )
329
+
330
  # Update metadata
331
  self.update_metadata(speaker_id, dataset_name)
332
+
333
  # Log success
334
  self.log_operation(
335
  f"Saved recording: Speaker={speaker_id}, Dataset={dataset_name}, "
336
  f"Audio={audio_name}, Text={text_name}"
337
  )
338
+
339
  return True, f"Recording saved successfully as {audio_name}"
340
+
341
  except Exception as e:
342
  error_msg = f"Error saving recording: {str(e)}"
343
  self.log_operation(error_msg, "error")
344
  logger.error(traceback.format_exc())
345
  return False, error_msg
346
+
347
  def save_transcription(self, file_path: Path, text: str, metadata: Dict) -> None:
348
  """Save transcription with metadata"""
349
  content = f"""[METADATA]
 
358
  """
359
  with open(file_path, 'w', encoding='utf-8') as f:
360
  f.write(content)
361
+
362
  def update_metadata(self, speaker_id: str, dataset_name: str) -> None:
363
  """Update dataset metadata with error handling"""
364
  metadata_file = self.root_path / 'metadata' / 'dataset_info.json'
365
+
366
  try:
367
  if metadata_file.exists():
368
  with open(metadata_file, 'r') as f:
369
  metadata = json.load(f)
370
  else:
371
  metadata = {'speakers': {}, 'last_updated': None}
372
+
373
  # Update speaker data
374
  if speaker_id not in metadata['speakers']:
375
  metadata['speakers'][speaker_id] = {
376
  'total_recordings': 0,
377
  'datasets': {}
378
  }
379
+
380
  if dataset_name not in metadata['speakers'][speaker_id]['datasets']:
381
  metadata['speakers'][speaker_id]['datasets'][dataset_name] = {
382
  'recordings': 0,
383
  'sentences': len(self.sentences),
384
+ 'recorded_sentences': [],
385
  'first_recording': datetime.now().isoformat(),
386
  'last_recording': None,
387
  'font_styles_used': []
388
  }
389
+
390
  # Update counts and timestamps
391
  metadata['speakers'][speaker_id]['total_recordings'] += 1
392
  metadata['speakers'][speaker_id]['datasets'][dataset_name]['recordings'] += 1
393
  metadata['speakers'][speaker_id]['datasets'][dataset_name]['last_recording'] = \
394
  datetime.now().isoformat()
395
+
396
+ # Add current index to recorded sentences
397
+ if self.current_index not in metadata['speakers'][speaker_id]['datasets'][dataset_name]['recorded_sentences']:
398
+ metadata['speakers'][speaker_id]['datasets'][dataset_name]['recorded_sentences'].append(self.current_index)
399
+
400
  # Update font styles
401
  if self.current_font not in metadata['speakers'][speaker_id]['datasets'][dataset_name]['font_styles_used']:
402
  metadata['speakers'][speaker_id]['datasets'][dataset_name]['font_styles_used'].append(
403
  self.current_font
404
  )
405
+
406
  metadata['last_updated'] = datetime.now().isoformat()
407
+
408
  # Save updated metadata
409
  with open(metadata_file, 'w') as f:
410
  json.dump(metadata, f, indent=2)
411
+
412
  self.log_operation(f"Updated metadata for {speaker_id} in {dataset_name}")
413
+
414
  except Exception as e:
415
  error_msg = f"Error updating metadata: {str(e)}"
416
  self.log_operation(error_msg, "error")
417
  logger.error(traceback.format_exc())
418
+
 
419
  def get_navigation_info(self) -> Dict[str, Optional[str]]:
420
  """Get current and next sentence information"""
421
  if not self.sentences:
 
424
  'next': None,
425
  'progress': "No text loaded"
426
  }
427
+
428
  current = self.get_styled_text(self.sentences[self.current_index])
429
  next_text = None
430
+
431
  if self.current_index < len(self.sentences) - 1:
432
  next_text = self.get_styled_text(self.sentences[self.current_index + 1])
433
+
434
  progress = f"Sentence {self.current_index + 1} of {len(self.sentences)}"
435
+
436
  return {
437
  'current': current,
438
  'next': next_text,
 
448
  'progress': "No text loaded",
449
  'status': "⚠️ Please load a text file first"
450
  }
451
+
452
  if direction == "next" and self.current_index < len(self.sentences) - 1:
453
  self.current_index += 1
454
  elif direction == "prev" and self.current_index > 0:
455
  self.current_index -= 1
456
+
457
  nav_info = self.get_navigation_info()
458
  nav_info['status'] = "✅ Navigation successful"
459
+
460
  return nav_info
461
 
462
  def get_dataset_statistics(self) -> Dict:
 
465
  metadata_file = self.root_path / 'metadata' / 'dataset_info.json'
466
  if not metadata_file.exists():
467
  return {}
 
468
  with open(metadata_file, 'r') as f:
469
+ metadata = json.load(f)
470
+ # Flatten statistics for display
471
+ total_sentences = len(self.sentences)
472
+ recorded = len(set(metadata['speakers'][list(metadata['speakers'].keys())[0]]['datasets'][list(metadata['speakers'][list(metadata['speakers'].keys())[0]]['datasets'].keys())[0]]['recorded_sentences'])) if metadata['speakers'] else 0
473
+ remaining = total_sentences - recorded
474
+ stats = {
475
+ "Total Sentences": total_sentences,
476
+ "Recorded Sentences": recorded,
477
+ "Remaining Sentences": remaining,
478
+ "Last Updated": metadata.get('last_updated', 'N/A')
479
+ }
480
+ return stats
481
  except Exception as e:
482
  logger.error(f"Error reading dataset statistics: {str(e)}")
483
  return {}
484
 
485
+ def get_last_audio_path(self, speaker_id: str) -> Optional[str]:
486
+ """Get the path to the last saved audio file for downloading"""
487
+ audio_dir = self.root_path / 'audio' / speaker_id
488
+ audio_files = sorted(audio_dir.glob('*.wav'), key=lambda f: f.stat().st_mtime, reverse=True)
489
+ if audio_files:
490
+ return str(audio_files[0])
491
+ else:
492
+ return None
493
+
494
+ def get_last_transcript_path(self, speaker_id: str) -> Optional[str]:
495
+ """Get the path to the last saved transcription file for downloading"""
496
+ text_dir = self.root_path / 'transcriptions' / speaker_id
497
+ text_files = sorted(text_dir.glob('*.txt'), key=lambda f: f.stat().st_mtime, reverse=True)
498
+ if text_files:
499
+ return str(text_files[0])
500
+ else:
501
+ return None
502
+
503
 
 
504
  def create_interface():
505
  """Create Gradio interface with enhanced features"""
506
+
507
+ collector = TTSDatasetCollector()
508
+
509
  # Create custom CSS for fonts
510
  custom_css = """
511
  .gradio-container {
 
524
  min-height: 100px !important;
525
  }
526
  """
527
+
528
  # Add font-face declarations
529
+ font_face_css = ""
530
  for font_style, font_info in FONT_STYLES.items():
531
+ if font_style in ['nastaliq', 'naskh'] or font_style in collector.custom_fonts:
532
+ font_file_name = font_info['family'] + '.ttf' if font_style not in collector.custom_fonts else font_info['family'] + '.ttf'
533
+ font_face_css += f"""
534
  @font-face {{
535
  font-family: '{font_info["family"]}';
536
+ src: url('fonts/{font_file_name}') format('truetype');
537
  }}
538
  """
539
+
540
+ custom_css += font_face_css
541
+
542
  with gr.Blocks(title="TTS Dataset Collection Tool", css=custom_css) as interface:
543
  gr.Markdown("# TTS Dataset Collection Tool")
544
+
545
  with gr.Row():
546
  # Left column - Configuration and Input
547
  with gr.Column():
 
567
  value="english_serif",
568
  label="Select Font Style"
569
  )
570
+ # Custom font upload
571
+ font_file_input = gr.File(
572
+ label="Upload Custom Font (.ttf)",
573
+ file_types=[".ttf"]
574
+ )
575
+ add_font_btn = gr.Button("Add Custom Font")
576
+
577
  # Right column - Recording
578
  with gr.Column():
579
  current_text = gr.HTML(
580
  label="Current Sentence",
581
  elem_classes=["sentence-display"]
582
  )
583
+ next_text = gr.HTML(
584
+ label="Next Sentence",
585
+ elem_classes=["sentence-display"]
586
+ )
587
+ progress = gr.Markdown("")
588
+
589
  audio_recorder = gr.Audio(
590
  label="Record Audio",
591
  type="filepath",
592
  elem_classes=["record-button"]
593
  )
594
+ # Controls
595
+ with gr.Row():
596
+ prev_btn = gr.Button("Previous", variant="secondary")
597
+ save_btn = gr.Button("Save Recording", variant="primary", elem_classes=["record-button"])
598
+ next_btn = gr.Button("Next", variant="primary")
599
+
600
+ # Status and Progress
 
 
 
 
 
 
 
 
 
 
601
  status = gr.Textbox(
602
  label="Status",
603
  interactive=False,
604
  max_lines=3
605
  )
606
+
607
+ # Dataset Info and Download Links
608
  with gr.Row():
609
  dataset_info = gr.JSON(
610
  label="Dataset Statistics",
611
  value={}
612
  )
613
+
614
+ with gr.Row():
615
+ download_audio = gr.File(label="Download Audio", interactive=False)
616
+ download_transcript = gr.File(label="Download Transcript", interactive=False)
617
+
618
  def process_pasted_text(text):
619
  """Handle pasted text input"""
620
  if not text:
 
635
  status: f"❌ {msg}",
636
  dataset_info: collector.get_dataset_statistics()
637
  }
638
+
639
  nav_info = collector.get_navigation_info()
640
+ progress_bar = gr.HTML.update(value=f"<progress value='{collector.current_index}' max='{len(collector.sentences)}'></progress>")
641
  return {
642
  current_text: nav_info['current'],
643
  next_text: nav_info['next'],
644
+ progress: progress_bar,
645
  status: f"✅ {msg}",
646
  dataset_info: collector.get_dataset_statistics()
647
  }
648
+
649
  def update_font(font_style):
650
  """Update font and refresh display"""
651
  success, msg = collector.set_font(font_style)
652
  if not success:
653
  return {status: msg}
654
+
655
  nav_info = collector.get_navigation_info()
656
  return {
657
  current_text: nav_info['current'],
658
  next_text: nav_info['next'],
659
  status: f"Font updated to {font_style}"
660
  }
661
+
662
  def load_file(file):
663
  """Handle file loading with enhanced error reporting"""
664
  if not file:
 
679
  status: f"❌ {msg}",
680
  dataset_info: collector.get_dataset_statistics()
681
  }
682
+
683
  nav_info = collector.get_navigation_info()
684
+ progress_bar = gr.HTML.update(value=f"<progress value='{collector.current_index}' max='{len(collector.sentences)}'></progress>")
685
  return {
686
  current_text: nav_info['current'],
687
  next_text: nav_info['next'],
688
+ progress: progress_bar,
689
  status: f"✅ {msg}",
690
  dataset_info: collector.get_dataset_statistics()
691
  }
692
+
693
  def save_current_recording(audio_file, speaker_id_value, dataset_name_value):
694
  """Handle saving the current recording"""
695
  if not audio_file:
696
+ return {
697
+ status: "⚠️ Please record audio first",
698
+ download_audio: None,
699
+ download_transcript: None
700
+ }
701
+
702
  success, msg = collector.save_recording(
703
  audio_file, speaker_id_value, dataset_name_value
704
  )
705
+
706
  if not success:
707
  return {
708
  status: f"❌ {msg}",
709
+ dataset_info: collector.get_dataset_statistics(),
710
+ download_audio: None,
711
+ download_transcript: None
712
  }
713
+
714
+ # Get paths to the saved files
715
+ audio_path = collector.get_last_audio_path(speaker_id_value)
716
+ transcript_path = collector.get_last_transcript_path(speaker_id_value)
717
+
718
  # Auto-advance to next sentence after successful save
719
  nav_info = collector.navigate("next")
720
+ progress_bar = gr.HTML.update(value=f"<progress value='{collector.current_index}' max='{len(collector.sentences)}'></progress>")
721
  return {
722
  current_text: nav_info['current'],
723
  next_text: nav_info['next'],
724
+ progress: progress_bar,
725
  status: f"✅ {msg}",
726
+ dataset_info: collector.get_dataset_statistics(),
727
+ download_audio: audio_path,
728
+ download_transcript: transcript_path
729
  }
730
+
731
  def navigate_sentences(direction):
732
  """Handle navigation between sentences"""
733
  nav_info = collector.navigate(direction)
734
+ progress_bar = gr.HTML.update(value=f"<progress value='{collector.current_index}' max='{len(collector.sentences)}'></progress>")
735
  return {
736
  current_text: nav_info['current'],
737
  next_text: nav_info['next'],
738
+ progress: progress_bar,
739
  status: nav_info['status']
740
  }
741
+
742
+ def add_custom_font(font_file):
743
+ """Handle adding a custom font"""
744
+ if not font_file:
745
+ return {status: "⚠️ No font file selected"}
746
+ success, msg = collector.add_custom_font(font_file)
747
+ if not success:
748
+ return {status: f"❌ {msg}"}
749
+ # Update font dropdown
750
+ font_choices = list(FONT_STYLES.keys())
751
+ font_select.update(choices=font_choices)
752
+ return {status: f"✅ {msg}"}
753
+
754
  # Event handlers
755
  text_input.change(
756
  process_pasted_text,
757
  inputs=[text_input],
758
  outputs=[current_text, next_text, progress, status, dataset_info]
759
  )
760
+
761
  file_input.upload(
762
  load_file,
763
  inputs=[file_input],
764
  outputs=[current_text, next_text, progress, status, dataset_info]
765
  )
766
+
767
  font_select.change(
768
  update_font,
769
  inputs=[font_select],
770
  outputs=[current_text, next_text, status]
771
  )
772
+
773
+ add_font_btn.click(
774
+ add_custom_font,
775
+ inputs=[font_file_input],
776
+ outputs=[status]
777
+ )
778
+
779
  save_btn.click(
780
  save_current_recording,
781
  inputs=[audio_recorder, speaker_id, dataset_name],
782
+ outputs=[current_text, next_text, progress, status, dataset_info, download_audio, download_transcript]
783
  )
784
+
785
  prev_btn.click(
786
  lambda: navigate_sentences("prev"),
787
  outputs=[current_text, next_text, progress, status]
788
  )
789
+
790
  next_btn.click(
791
  lambda: navigate_sentences("next"),
792
  outputs=[current_text, next_text, progress, status]
793
  )
794
+
795
  # Initialize dataset info
796
  dataset_info.value = collector.get_dataset_statistics()
797
+
798
+ return interface
799
+
800
  if __name__ == "__main__":
801
  try:
802
  # Set up any required environment variables
803
  os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0"
804
  os.environ["GRADIO_SERVER_PORT"] = "7860"
805
+
806
  # Create and launch the interface
807
  interface = create_interface()
808
  interface.queue() # Enable queuing for better handling of concurrent users