CoSTA / ST /inference /demo_streamlit.py
bhavanishankarpullela's picture
Upload 360 files
b817ab5 verified
raw
history blame
6.53 kB
import requests
import soundfile as sf
import time
import streamlit as st
def speech_translation(audio_file_path, language):
if audio_file_path is None:
return "No audio input provided!"
# Convert audio to .wav format if not already
if not audio_file_path.endswith(".wav"):
wav_data, samplerate = sf.read(audio_file_path)
sf.write("temp_audio.wav", wav_data, samplerate)
audio_file_path = "temp_audio.wav"
else:
audio_file_path = audio_file_path
# ASR processing
files = {
'file': open(audio_file_path, "rb"),
'language': (None, language),
'vtt': (None, 'true'),
}
response = requests.post('https://asr.iitm.ac.in/ssl_asr/decode', files=files)
print(response.json())
try:
asr_output = response.json()['transcript']
except:
return "Error in ASR processing"
asr_output = asr_output.replace("।", "")
asr_output = asr_output.replace(".", "")
time.sleep(1)
lang = ""
if language == "telugu":
lang = "te"
elif language == "hindi":
lang = "hi"
elif language == "marathi":
lang = "mr"
elif language == "bengali":
lang = "bn"
payload = {
"pipelineTasks": [
{
"taskType": "translation",
"config": {
"language": {
"sourceLanguage": lang,
"targetLanguage": "en",
},
},
}
],
"pipelineRequestConfig": {
"pipelineId": "64392f96daac500b55c543cd"
}
}
headers = {
"Content-Type": "application/json",
"userID": "2aeef589f4584eb08aa0b9c49761aeb8",
"ulcaApiKey": "02ed10445a-66b0-4061-9030-9b0b8b37a4f1"
}
response = requests.post('https://meity-auth.ulcacontrib.org/ulca/apis/v0/model/getModelsPipeline', json=payload, headers=headers)
if response.status_code == 200:
response_data = response.json()
print(response_data)
compute_payload = {
"pipelineTasks": [
{
"taskType": "translation",
"config": {
"language": {
"sourceLanguage": lang,
"targetLanguage": "en",
},
},
}
],
"inputData": {"input": [{"source": asr_output}]},
}
callback_url = response_data["pipelineInferenceAPIEndPoint"]["callbackUrl"]
headers2 = {
"Content-Type": "application/json",
response_data["pipelineInferenceAPIEndPoint"]["inferenceApiKey"]["name"]:
response_data["pipelineInferenceAPIEndPoint"]["inferenceApiKey"]["value"]
}
compute_response = requests.post(callback_url, json=compute_payload, headers=headers2)
if compute_response.status_code == 200:
compute_response_data = compute_response.json()
print(compute_response_data)
translated_content = compute_response_data["pipelineResponse"][0]["output"][0]["target"]
return translated_content
else:
return f"Error in translation: status code {compute_response.status_code}"
else:
return f"Error in fetching model pipeline: status code {response.status_code}"
return "Translation failed"
# Streamlit UI
st.title("Speech Translation")
st.write("Record your speech and get the English translation.")
# Audio Recorder HTML
st.markdown("""
<h3>Record Audio</h3>
<button id="startButton">Start Recording</button>
<button id="stopButton" disabled>Stop Recording</button>
<audio id="recordedAudio" controls></audio>
<script>
let chunks = [];
let recorder;
let audioURL;
let recordedAudio = document.getElementById('recordedAudio');
document.getElementById('startButton').onclick = function() {
navigator.mediaDevices.getUserMedia({ audio: true })
.then(stream => {
recorder = new MediaRecorder(stream);
recorder.ondataavailable = e => chunks.push(e.data);
recorder.onstop = e => {
let blob = new Blob(chunks, { type: 'audio/wav' });
chunks = [];
audioURL = URL.createObjectURL(blob);
recordedAudio.src = audioURL;
// Send the recorded audio blob to Streamlit
var reader = new FileReader();
reader.readAsDataURL(blob);
reader.onloadend = function() {
var base64data = reader.result;
fetch('/upload-audio', {
method: 'POST',
body: JSON.stringify({ audio: base64data }),
headers: { 'Content-Type': 'application/json' }
}).then(response => response.json())
.then(data => console.log(data));
}
};
recorder.start();
document.getElementById('startButton').disabled = true;
document.getElementById('stopButton').disabled = false;
});
};
document.getElementById('stopButton').onclick = function() {
recorder.stop();
document.getElementById('startButton').disabled = false;
document.getElementById('stopButton').disabled = true;
};
</script>
""", unsafe_allow_html=True)
uploaded_file = st.file_uploader("Upload an audio file", type=["wav", "mp3"])
language = st.selectbox("Select Language", ["telugu", "hindi", "marathi", "bengali"])
if st.button("Translate"):
if uploaded_file is not None:
with open("uploaded_audio.wav", "wb") as f:
f.write(uploaded_file.getbuffer())
result = speech_translation("uploaded_audio.wav", language)
st.text_area("Translation", result)
elif st.session_state.get('recorded_audio'):
result = speech_translation(st.session_state['recorded_audio'], language)
st.text_area("Translation", result)
else:
st.write("Please upload an audio file or record your speech and select a language.")