Spaces:
Sleeping
Sleeping
import gradio as gr | |
import whisper | |
from transformers import pipeline | |
import requests | |
import cv2 | |
import string | |
import numpy as np | |
import tensorflow as tf | |
import edge_tts | |
import asyncio | |
import tempfile | |
# Load models | |
whisper_model = whisper.load_model("base") | |
sentiment_analysis = pipeline( | |
"sentiment-analysis", framework="pt", model="SamLowe/roberta-base-go_emotions" | |
) | |
def load_sign_language_model(): | |
return tf.keras.models.load_model("best_model.h5") | |
sign_language_model = load_sign_language_model() | |
# Get available voices asynchronously | |
async def get_voices(): | |
voices = await edge_tts.list_voices() | |
return { | |
f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v["ShortName"] | |
for v in voices | |
} | |
# Audio-based functions | |
def analyze_sentiment(text): | |
results = sentiment_analysis(text) | |
return {result["label"]: result["score"] for result in results} | |
def display_sentiment_results(sentiment_results, option): | |
return "\n".join( | |
f"{sentiment}: {score:.2f}" if option == "Sentiment + Score" else sentiment | |
for sentiment, score in sentiment_results.items() | |
) | |
def search_text(text, api_key): | |
api_endpoint = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent" | |
headers = {"Content-Type": "application/json"} | |
payload = {"contents": [{"parts": [{"text": text}]}]} | |
try: | |
response = requests.post(api_endpoint, headers=headers, json=payload, params={"key": api_key}) | |
response.raise_for_status() | |
response_json = response.json() | |
if "candidates" in response_json and response_json["candidates"]: | |
content_parts = response_json["candidates"][0]["content"]["parts"] | |
return content_parts[0]["text"].strip() if content_parts else "No relevant content found." | |
except requests.exceptions.RequestException as e: | |
return f"Error: {str(e)}" | |
return "No relevant content found." | |
async def text_to_speech(text, voice, rate, pitch): | |
if not isinstance(text, str) or not text.strip(): | |
return None, gr.Warning("Please enter valid text to convert.") | |
if not voice: | |
return None, gr.Warning("Please select a voice.") | |
voice_short_name = voice.split(" - ")[0] | |
communicate = edge_tts.Communicate(text, voice_short_name, rate=f"{rate:+d}%", pitch=f"{pitch:+d}Hz") | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: | |
await communicate.save(tmp_file.name) | |
return tmp_file.name, None | |
async def tts_interface(text, voice, rate, pitch): | |
return await text_to_speech(text, voice, rate, pitch) | |
async def inference_audio(audio, sentiment_option, api_key, tts_voice, tts_rate, tts_pitch): | |
if audio is None: | |
return "No audio file provided.", "", "", "", None | |
audio = whisper.load_audio(audio) | |
audio = whisper.pad_or_trim(audio) | |
mel = whisper.log_mel_spectrogram(audio).to(whisper_model.device) | |
_, probs = whisper_model.detect_language(mel) | |
lang = max(probs, key=probs.get) | |
result = whisper.decode(whisper_model, mel, whisper.DecodingOptions(fp16=False)) | |
sentiment_results = analyze_sentiment(result.text) | |
sentiment_output = display_sentiment_results(sentiment_results, sentiment_option) | |
search_results = search_text(result.text, api_key) | |
if not isinstance(search_results, str): | |
search_results = "Error processing text." | |
explanation_audio, _ = await tts_interface(search_results, tts_voice, tts_rate, tts_pitch) | |
return lang.upper(), result.text, sentiment_output, search_results, explanation_audio | |
async def classify_sign_language(image, api_key): | |
img = np.array(image) | |
gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
gray_img = cv2.resize(gray_img, (28, 28)) | |
input_img = np.expand_dims(gray_img / 255.0, axis=0) | |
output = np.argmax(sign_language_model.predict(input_img), axis=1).item() | |
output = output + 1 if output > 7 else output | |
pred = string.ascii_uppercase[output] | |
explanation = search_text(f"Explain the American Sign Language letter '{pred}'.", api_key) | |
if not isinstance(explanation, str): | |
explanation = "Error processing explanation." | |
explanation_audio, _ = await tts_interface(explanation, None, 0, 0) | |
return pred, explanation, explanation_audio | |
async def process_input(input_type, audio=None, image=None, sentiment_option=None, api_key=None, tts_voice=None, tts_rate=0, tts_pitch=0): | |
return await inference_audio(audio, sentiment_option, api_key, tts_voice, tts_rate, tts_pitch) if input_type == "Audio" else await classify_sign_language(image, api_key) | |
async def main(): | |
voices = await get_voices() | |
with gr.Blocks() as demo: | |
gr.Markdown("# Speak & Sign AI Assistant") | |
input_type = gr.Radio(label="Choose Input Type", choices=["Audio", "Image"], value="Audio") | |
api_key_input = gr.Textbox(label="API Key", type="password") | |
audio_input = gr.Audio(label="Upload Audio", type="filepath") | |
sentiment_option = gr.Radio(choices=["Sentiment Only", "Sentiment + Score"], label="Sentiment Output", value="Sentiment Only") | |
image_input = gr.Image(label="Upload Image", type="pil", visible=False) | |
tts_voice = gr.Dropdown(label="Select Voice", choices=[""] + list(voices.keys()), value="") | |
tts_rate = gr.Slider(-50, 50, value=0, label="Speech Rate (%)") | |
tts_pitch = gr.Slider(-20, 20, value=0, label="Pitch (Hz)") | |
submit_btn = gr.Button("Submit") | |
lang_str, text, sentiment_output, search_results, audio_output = [gr.Textbox(interactive=False) for _ in range(5)] | |
submit_btn.click(process_input, [input_type, audio_input, image_input, sentiment_option, api_key_input, tts_voice, tts_rate, tts_pitch], [lang_str, text, sentiment_output, search_results, audio_output]) | |
demo.launch(share=True) | |
asyncio.run(main()) | |