import gradio as gr import whisper from transformers import pipeline import requests import cv2 import string import numpy as np import tensorflow as tf import edge_tts import asyncio import tempfile # Load models whisper_model = whisper.load_model("base") sentiment_analysis = pipeline( "sentiment-analysis", framework="pt", model="SamLowe/roberta-base-go_emotions" ) def load_sign_language_model(): return tf.keras.models.load_model("best_model.h5") sign_language_model = load_sign_language_model() # Get available voices asynchronously async def get_voices(): voices = await edge_tts.list_voices() return { f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v["ShortName"] for v in voices } # Audio-based functions def analyze_sentiment(text): results = sentiment_analysis(text) return {result["label"]: result["score"] for result in results} def display_sentiment_results(sentiment_results, option): return "\n".join( f"{sentiment}: {score:.2f}" if option == "Sentiment + Score" else sentiment for sentiment, score in sentiment_results.items() ) def search_text(text, api_key): api_endpoint = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent" headers = {"Content-Type": "application/json"} payload = {"contents": [{"parts": [{"text": text}]}]} try: response = requests.post(api_endpoint, headers=headers, json=payload, params={"key": api_key}) response.raise_for_status() response_json = response.json() if "candidates" in response_json and response_json["candidates"]: content_parts = response_json["candidates"][0]["content"]["parts"] return content_parts[0]["text"].strip() if content_parts else "No relevant content found." except requests.exceptions.RequestException as e: return f"Error: {str(e)}" return "No relevant content found." async def text_to_speech(text, voice, rate, pitch): if not isinstance(text, str) or not text.strip(): return None, gr.Warning("Please enter valid text to convert.") if not voice: return None, gr.Warning("Please select a voice.") voice_short_name = voice.split(" - ")[0] communicate = edge_tts.Communicate(text, voice_short_name, rate=f"{rate:+d}%", pitch=f"{pitch:+d}Hz") with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: await communicate.save(tmp_file.name) return tmp_file.name, None async def tts_interface(text, voice, rate, pitch): return await text_to_speech(text, voice, rate, pitch) async def inference_audio(audio, sentiment_option, api_key, tts_voice, tts_rate, tts_pitch): if audio is None: return "No audio file provided.", "", "", "", None audio = whisper.load_audio(audio) audio = whisper.pad_or_trim(audio) mel = whisper.log_mel_spectrogram(audio).to(whisper_model.device) _, probs = whisper_model.detect_language(mel) lang = max(probs, key=probs.get) result = whisper.decode(whisper_model, mel, whisper.DecodingOptions(fp16=False)) sentiment_results = analyze_sentiment(result.text) sentiment_output = display_sentiment_results(sentiment_results, sentiment_option) search_results = search_text(result.text, api_key) if not isinstance(search_results, str): search_results = "Error processing text." explanation_audio, _ = await tts_interface(search_results, tts_voice, tts_rate, tts_pitch) return lang.upper(), result.text, sentiment_output, search_results, explanation_audio async def classify_sign_language(image, api_key): img = np.array(image) gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) gray_img = cv2.resize(gray_img, (28, 28)) input_img = np.expand_dims(gray_img / 255.0, axis=0) output = np.argmax(sign_language_model.predict(input_img), axis=1).item() output = output + 1 if output > 7 else output pred = string.ascii_uppercase[output] explanation = search_text(f"Explain the American Sign Language letter '{pred}'.", api_key) if not isinstance(explanation, str): explanation = "Error processing explanation." explanation_audio, _ = await tts_interface(explanation, None, 0, 0) return pred, explanation, explanation_audio async def process_input(input_type, audio=None, image=None, sentiment_option=None, api_key=None, tts_voice=None, tts_rate=0, tts_pitch=0): return await inference_audio(audio, sentiment_option, api_key, tts_voice, tts_rate, tts_pitch) if input_type == "Audio" else await classify_sign_language(image, api_key) async def main(): voices = await get_voices() with gr.Blocks() as demo: gr.Markdown("# Speak & Sign AI Assistant") input_type = gr.Radio(label="Choose Input Type", choices=["Audio", "Image"], value="Audio") api_key_input = gr.Textbox(label="API Key", type="password") audio_input = gr.Audio(label="Upload Audio", type="filepath") sentiment_option = gr.Radio(choices=["Sentiment Only", "Sentiment + Score"], label="Sentiment Output", value="Sentiment Only") image_input = gr.Image(label="Upload Image", type="pil", visible=False) tts_voice = gr.Dropdown(label="Select Voice", choices=[""] + list(voices.keys()), value="") tts_rate = gr.Slider(-50, 50, value=0, label="Speech Rate (%)") tts_pitch = gr.Slider(-20, 20, value=0, label="Pitch (Hz)") submit_btn = gr.Button("Submit") lang_str, text, sentiment_output, search_results, audio_output = [gr.Textbox(interactive=False) for _ in range(5)] submit_btn.click(process_input, [input_type, audio_input, image_input, sentiment_option, api_key_input, tts_voice, tts_rate, tts_pitch], [lang_str, text, sentiment_output, search_results, audio_output]) demo.launch(share=True) asyncio.run(main())