awacke1's picture
Create app.py
3874223 verified
raw
history blame
13 kB
import streamlit as st
import pandas as pd
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import torch
import json
import os
import glob
from pathlib import Path
from datetime import datetime, timedelta
import edge_tts
import asyncio
import requests
from collections import defaultdict
from audio_recorder_streamlit import audio_recorder
import streamlit.components.v1 as components
from urllib.parse import quote
from xml.etree import ElementTree as ET
from datasets import load_dataset
# 🧠 Initialize session state variables
SESSION_VARS = {
'search_history': [], # Track search history
'last_voice_input': "", # Last voice input
'transcript_history': [], # Conversation history
'should_rerun': False, # Trigger for UI updates
'search_columns': [], # Available search columns
'initial_search_done': False, # First search flag
'tts_voice': "en-US-AriaNeural", # Default voice
'arxiv_last_query': "", # Last ArXiv search
'dataset_loaded': False, # Dataset load status
'current_page': 0, # Current data page
'data_cache': None, # Data cache
'dataset_info': None, # Dataset metadata
'nps_submitted': False, # Track if user submitted NPS
'nps_last_shown': None, # When NPS was last shown
'voice_recorder_key': str(datetime.now()) # Unique key for voice recorder
}
# πŸ“Š Constants
ROWS_PER_PAGE = 100
MIN_SEARCH_SCORE = 0.3
EXACT_MATCH_BOOST = 2.0
# Initialize session state
for var, default in SESSION_VARS.items():
if var not in st.session_state:
st.session_state[var] = default
class NPSTracker:
"""🎯 Net Promoter Score Tracker - Measuring happiness in numbers!"""
def __init__(self, log_file="nps_logs.csv"):
self.log_file = Path(log_file)
self.initialize_log()
def initialize_log(self):
"""πŸ“ Create log file if it doesn't exist"""
if not self.log_file.exists():
df = pd.DataFrame(columns=['timestamp', 'score', 'feedback'])
df.to_csv(self.log_file, index=False)
def log_response(self, score, feedback=""):
"""πŸ–ŠοΈ Log new NPS response"""
new_entry = pd.DataFrame([{
'timestamp': datetime.now().isoformat(),
'score': score,
'feedback': feedback
}])
if self.log_file.exists():
df = pd.read_csv(self.log_file)
df = pd.concat([df, new_entry], ignore_index=True)
else:
df = new_entry
df.to_csv(self.log_file, index=False)
def get_nps_stats(self, days=30):
"""πŸ“Š Calculate NPS stats for recent period"""
if not self.log_file.exists():
return {
'nps_score': 0,
'promoters': 0,
'passives': 0,
'detractors': 0,
'total_responses': 0,
'recent_feedback': []
}
df = pd.read_csv(self.log_file)
df['timestamp'] = pd.to_datetime(df['timestamp'])
cutoff = datetime.now() - timedelta(days=days)
recent_df = df[df['timestamp'] > cutoff]
if len(recent_df) == 0:
return {
'nps_score': 0,
'promoters': 0,
'passives': 0,
'detractors': 0,
'total_responses': 0,
'recent_feedback': []
}
total = len(recent_df)
promoters = len(recent_df[recent_df['score'] >= 9])
passives = len(recent_df[recent_df['score'].between(7, 8)])
detractors = len(recent_df[recent_df['score'] <= 6])
nps = ((promoters/total) - (detractors/total)) * 100
recent_feedback = recent_df[recent_df['feedback'].notna()].sort_values(
'timestamp', ascending=False
)['feedback'].head(5).tolist()
return {
'nps_score': round(nps, 1),
'promoters': promoters,
'passives': passives,
'detractors': detractors,
'total_responses': total,
'recent_feedback': recent_feedback
}
def setup_voice_recorder():
"""🎀 Create an in-browser voice recorder component"""
return components.html(
"""
<div style="display: flex; flex-direction: column; align-items: center; gap: 10px;">
<button id="startButton"
style="padding: 10px 20px; background: #ff4b4b; color: white; border: none; border-radius: 5px; cursor: pointer">
Start Recording
</button>
<button id="stopButton"
style="padding: 10px 20px; background: #4b4bff; color: white; border: none; border-radius: 5px; cursor: pointer"
disabled>
Stop Recording
</button>
<audio id="audioPlayback" controls style="display: none;"></audio>
<div id="statusText" style="color: #666;">Ready to record...</div>
</div>
<script>
let mediaRecorder;
let audioChunks = [];
document.getElementById('startButton').onclick = async () => {
try {
const stream = await navigator.mediaDevices.getUserMedia({ audio: true });
mediaRecorder = new MediaRecorder(stream);
mediaRecorder.ondataavailable = (e) => {
audioChunks.push(e.data);
};
mediaRecorder.onstop = () => {
const audioBlob = new Blob(audioChunks, { type: 'audio/wav' });
const audioUrl = URL.createObjectURL(audioBlob);
document.getElementById('audioPlayback').src = audioUrl;
document.getElementById('audioPlayback').style.display = 'block';
// Send to Python
const reader = new FileReader();
reader.readAsDataURL(audioBlob);
reader.onloadend = () => {
window.parent.postMessage({
type: 'voiceData',
data: reader.result
}, '*');
};
};
mediaRecorder.start();
document.getElementById('startButton').disabled = true;
document.getElementById('stopButton').disabled = false;
document.getElementById('statusText').textContent = 'Recording...';
} catch (err) {
console.error('Error:', err);
document.getElementById('statusText').textContent = 'Error: ' + err.message;
}
};
document.getElementById('stopButton').onclick = () => {
mediaRecorder.stop();
document.getElementById('startButton').disabled = false;
document.getElementById('stopButton').disabled = true;
document.getElementById('statusText').textContent = 'Recording complete!';
};
</script>
""",
height=200,
)
def render_nps_sidebar():
"""🎨 Show NPS metrics in sidebar"""
tracker = NPSTracker()
stats = tracker.get_nps_stats()
st.sidebar.markdown("### πŸ“Š User Satisfaction Metrics")
score_color = (
"🟒" if stats['nps_score'] >= 50 else
"🟑" if stats['nps_score'] >= 0 else
"πŸ”΄"
)
st.sidebar.metric(
"Net Promoter Score",
f"{score_color} {stats['nps_score']}"
)
st.sidebar.markdown("#### Response Breakdown")
col1, col2, col3 = st.sidebar.columns(3)
with col1:
st.metric("πŸ˜ƒ", stats['promoters'])
with col2:
st.metric("😐", stats['passives'])
with col3:
st.metric("πŸ˜•", stats['detractors'])
if stats['recent_feedback']:
st.sidebar.markdown("#### Recent Feedback")
for feedback in stats['recent_feedback']:
st.sidebar.info(feedback[:100] + "..." if len(feedback) > 100 else feedback)
def render_nps_survey():
"""🎯 Show NPS survey form"""
tracker = NPSTracker()
st.markdown("### πŸ“ Your Feedback Matters!")
score = st.slider(
"How likely are you to recommend this search tool to others?",
0, 10,
help="0 = Not likely at all, 10 = Extremely likely"
)
feedback = st.text_area("Additional feedback (optional)")
if st.button("Submit Feedback", key="nps_submit"):
tracker.log_response(score, feedback)
st.session_state['nps_submitted'] = True
st.success("Thank you for your feedback! πŸ™")
st.experimental_rerun()
[... Rest of your existing code for search functionality ...]
def main():
st.title("πŸŽ₯ Smart Video Search with Voice & Feedback")
# Initialize search
search = VideoSearch()
# Add NPS metrics to sidebar
with st.sidebar:
render_nps_sidebar()
# Show survey periodically
current_time = datetime.now()
if (not st.session_state.get('nps_submitted') and
(not st.session_state.get('nps_last_shown') or
current_time - st.session_state['nps_last_shown'] > timedelta(hours=24))):
with st.expander("πŸ“ Quick Feedback", expanded=True):
render_nps_survey()
st.session_state['nps_last_shown'] = current_time
# Create main tabs
tab1, tab2, tab3, tab4 = st.tabs([
"πŸ” Search", "πŸŽ™οΈ Voice Input", "πŸ“š ArXiv", "πŸ“‚ Files"
])
# Search Tab
with tab1:
st.subheader("Search Videos")
col1, col2 = st.columns([3, 1])
with col1:
query = st.text_input("Enter search query:",
value="" if st.session_state['initial_search_done'] else "aliens")
with col2:
search_column = st.selectbox("Search in:",
["All Fields"] + st.session_state['search_columns'])
col3, col4 = st.columns(2)
with col3:
num_results = st.slider("Max results:", 1, 100, 20)
with col4:
search_button = st.button("πŸ” Search")
if (search_button or not st.session_state['initial_search_done']) and query:
st.session_state['initial_search_done'] = True
selected_column = None if search_column == "All Fields" else search_column
with st.spinner("Searching..."):
results = search.search(query, selected_column, num_results)
if results:
st.session_state['search_history'].append({
'query': query,
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'results': results[:5]
})
st.write(f"Found {len(results)} results:")
for i, result in enumerate(results, 1):
with st.expander(f"Result {i}", expanded=(i==1)):
render_result(result)
else:
st.warning("No matching results found.")
# Voice Input Tab
with tab2:
st.subheader("Voice Search")
st.write("πŸŽ™οΈ Record your query:")
voice_recorder = setup_voice_recorder()
if 'voice_data' in st.session_state:
with st.spinner("Processing voice..."):
voice_query = transcribe_audio(st.session_state['voice_data'])
st.markdown("**Transcribed Text:**")
st.write(voice_query)
if st.button("πŸ” Search with Voice"):
results = search.search(voice_query, None, 20)
for i, result in enumerate(results, 1):
with st.expander(f"Result {i}", expanded=(i==1)):
render_result(result)
# ArXiv Tab
with tab3:
st.subheader("ArXiv Search")
arxiv_query = st.text_input("Search ArXiv:", value=st.session_state['arxiv_last_query'])
vocal_summary = st.checkbox("πŸŽ™ Quick Audio Summary", value=True)
titles_summary = st.checkbox("πŸ”– Titles Only", value=True)
full_audio = st.checkbox("πŸ“š Full Audio Summary", value=False)
if st.button("πŸ” Search ArXiv"):
st.session_state['arxiv_last_query'] = arxiv_query
perform_arxiv_lookup(arxiv_query, vocal_summary, titles_summary, full_audio)
# File Manager Tab
with tab4:
show_file_manager()
if __name__ == "__main__":
main()