Sign-language / src /main-backup.py
ginipick's picture
Update src/main-backup.py
68397f9 verified
raw
history blame
12.5 kB
import display_gloss as dg
import synonyms_preprocess as sp
from NLP_Spacy_base_translator import NlpSpacyBaseTranslator
from flask import Flask, render_template, Response, request, send_file
import io
import cv2
import numpy as np
import os
import requests
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import quote, unquote
import tempfile
import re
from functools import lru_cache
from typing import List, Dict, Any
import logging
from contextlib import contextmanager
# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__, static_folder='static')
app.config['TITLE'] = 'Sign Language Translate'
# 전역 변수를 초기화하고 캐싱
nlp, dict_docs_spacy = sp.load_spacy_values()
dataset, list_2000_tokens = dg.load_data()
# 스레드 풀 생성
executor = ThreadPoolExecutor(max_workers=4)
# 메모리 캐시 데코레이터
@lru_cache(maxsize=1000)
def clean_quotes(text: str) -> str:
"""따옴표 정리 함수"""
text = re.sub(r"'+", "'", text)
text = re.sub(r'\s+', ' ', text).strip()
return text
@lru_cache(maxsize=1000)
def is_korean(text: str) -> bool:
"""한글이 포함되어 있는지 확인"""
return bool(re.search('[가-힣]', text))
@lru_cache(maxsize=1000)
def is_english(text: str) -> bool:
"""텍스트가 영어인지 확인하는 함수"""
text_without_quotes = re.sub(r"'[^']*'|\s", "", text)
return bool(re.match(r'^[A-Za-z.,!?-]*$', text_without_quotes))
@lru_cache(maxsize=1000)
def normalize_quotes(text: str) -> str:
"""따옴표 형식을 정규화하는 함수"""
text = re.sub(r"'+", "'", text)
text = re.sub(r'\s+', ' ', text).strip()
if re.search(r"'[^']*'", text):
return text
return text
@lru_cache(maxsize=1000)
def find_quoted_words(text: str) -> List[str]:
"""작은따옴표로 묶인 단어들을 찾는 함수"""
return re.findall(r"'([^']*)'", text)
@lru_cache(maxsize=1000)
def spell_out_word(word: str) -> str:
"""단어를 개별 알파벳으로 분리하는 함수"""
return ' '.join(list(word.lower()))
def translate_text_chunk(text: str, source_lang: str, target_lang: str) -> str:
"""텍스트 번역 함수"""
try:
url = "https://translate.googleapis.com/translate_a/single"
params = {
"client": "gtx",
"sl": source_lang,
"tl": target_lang,
"dt": "t",
"q": text
}
response = requests.get(url, params=params)
if response.status_code != 200:
logger.error(f"Translation API error: {response.status_code}")
return text
data = response.json()
return ' '.join(item[0] for item in data[0] if item[0])
except Exception as e:
logger.error(f"Translation error: {e}")
return text
def translate_korean_text(text: str) -> str:
"""한글 전용 번역 함수"""
try:
quoted_match = re.search(r"'([^']*)'", text)
if not quoted_match:
# 인용부호가 없는 경우 전체 텍스트를 번역하도록 함
translated_text = translate_text_chunk(text, "ko", "en")
# 번역 후 추가 전처리 (필요한 경우)
translated_text = re.sub(r'\bNAME\b', 'name', translated_text)
translated_text = translated_text.replace(" .", ".")
return translated_text
quoted_word = quoted_match.group(1)
# 본문 번역: 인용부호 단어를 placeholder로 대체
main_text = text.replace(f"'{quoted_word}'", "XXXXX")
translated_main = translate_text_chunk(main_text, "ko", "en")
# 인용된 단어 처리
if re.match(r'^[A-Za-z]+$', quoted_word):
proper_noun = quoted_word.upper()
else:
proper_noun = translate_text_chunk(quoted_word, "ko", "en").upper()
final_text = translated_main.replace("XXXXX", f"'{proper_noun}'")
final_text = re.sub(r'\bNAME\b', 'name', final_text)
final_text = final_text.replace(" .", ".")
return final_text
except Exception as e:
logger.error(f"Korean translation error: {e}")
return text
def translate_korean_to_english(text: str) -> str:
"""전체 텍스트 번역 함수"""
try:
text = normalize_quotes(text)
if is_english(text):
# 영어 문장 안의 작은따옴표 단어를 대문자로 바꿔주는 처리
quoted_match = re.search(r"'([^']*)'", text)
if quoted_match:
quoted_word = quoted_match.group(1).upper()
text = re.sub(r"'[^']*'", f"'{quoted_word}'", text, 1)
return text
if is_korean(text):
return translate_korean_text(text)
return text
except Exception as e:
logger.error(f"Translation error: {e}")
return text
def process_frame(frame_data: bytes) -> np.ndarray:
"""프레임 처리 함수"""
try:
frame_content = frame_data.split(b'\r\n\r\n')[1]
nparr = np.frombuffer(frame_content, np.uint8)
return cv2.imdecode(nparr, cv2.IMREAD_COLOR)
except Exception as e:
logger.error(f"Frame processing error: {e}")
raise
@contextmanager
def video_writer(path: str, frame_size: tuple, fps: int = 25):
"""비디오 작성을 위한 컨텍스트 매니저"""
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(path, fourcc, fps, frame_size)
try:
yield writer
finally:
writer.release()
def generate_complete_video(gloss_list: List[str], dataset: Dict[str, Any], list_2000_tokens: List[str]) -> bytes:
"""최적화된 비디오 생성 함수"""
try:
frames = []
is_spelling = False
# 프레임 생성을 병렬로 처리
with ThreadPoolExecutor() as executor:
for gloss in gloss_list:
if gloss == 'FINGERSPELL-START':
is_spelling = True
continue
elif gloss == 'FINGERSPELL-END':
is_spelling = False
continue
frame_futures = [
executor.submit(process_frame, frame)
for frame in dg.generate_video([gloss], dataset, list_2000_tokens)
]
frames.extend([future.result() for future in frame_futures])
if not frames:
raise Exception("No frames generated")
height, width = frames[0].shape[:2]
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
temp_path = temp_file.name
with video_writer(temp_path, (width, height)) as out:
for frame in frames:
out.write(frame)
with open(temp_path, 'rb') as f:
video_bytes = f.read()
os.remove(temp_path)
return video_bytes
except Exception as e:
logger.error(f"Video generation error: {str(e)}")
raise
@app.route('/')
def index():
return render_template('index.html', title=app.config['TITLE'])
@app.route('/translate/', methods=['POST'])
def result():
if request.method == 'POST':
input_text = request.form['inputSentence'].strip()
if not input_text:
# 에러 페이지로 이동
return render_template('error.html', error="Please enter text to translate")
try:
input_text = normalize_quotes(input_text)
english_text = translate_korean_to_english(input_text)
if not english_text:
raise Exception("Translation failed")
quoted_words = find_quoted_words(english_text)
# NLP 처리를 스레드 풀에서 실행
def process_nlp():
clean_english = re.sub(r"'([^']*)'", r"\1", english_text)
eng_to_asl_translator = NlpSpacyBaseTranslator(sentence=clean_english)
return eng_to_asl_translator.translate_to_gloss()
generated_gloss = executor.submit(process_nlp).result()
# 작은따옴표 처리(지정된 영단어는 fingerspell로 변환)
processed_gloss = []
words = generated_gloss.split()
for word in words:
word_upper = word.upper()
if quoted_words and word_upper in [w.upper() for w in quoted_words]:
spelled_word = spell_out_word(word)
processed_gloss.extend(['FINGERSPELL-START'] + spelled_word.split() + ['FINGERSPELL-END'])
else:
processed_gloss.append(word.lower())
gloss_sentence_before_synonym = " ".join(processed_gloss)
# 동의어 처리
final_gloss = []
i = 0
while i < len(processed_gloss):
if processed_gloss[i] == 'FINGERSPELL-START':
# fingerspell 구간은 그대로 넣는다
final_gloss.extend(processed_gloss[i:i+2]) # FINGERSPELL-START + 첫 글자
i += 2
while i < len(processed_gloss) and processed_gloss[i] != 'FINGERSPELL-END':
final_gloss.append(processed_gloss[i])
i += 1
if i < len(processed_gloss):
final_gloss.append(processed_gloss[i]) # FINGERSPELL-END
i += 1
else:
word = processed_gloss[i]
# 동의어 찾기를 스레드 풀에서 비동기로 실행하여 결과 얻기
synonym = executor.submit(
sp.find_synonyms,
word,
nlp,
dict_docs_spacy,
list_2000_tokens
).result()
final_gloss.append(synonym)
i += 1
gloss_sentence_after_synonym = " ".join(final_gloss)
return render_template(
'result.html',
title=app.config['TITLE'],
original_sentence=input_text,
english_translation=english_text,
gloss_sentence_before_synonym=gloss_sentence_before_synonym,
gloss_sentence_after_synonym=gloss_sentence_after_synonym
)
except Exception as e:
logger.error(f"Translation processing error: {str(e)}")
return render_template('error.html', error=f"Translation error: {str(e)}")
@app.route('/video_feed')
def video_feed():
sentence = request.args.get('gloss_sentence_to_display', '')
gloss_list = sentence.split()
return Response(
dg.generate_video(gloss_list, dataset, list_2000_tokens),
mimetype='multipart/x-mixed-replace; boundary=frame'
)
@app.route('/download_video/<path:gloss_sentence>')
def download_video(gloss_sentence: str):
try:
decoded_sentence = unquote(gloss_sentence)
gloss_list = decoded_sentence.split()
if not gloss_list:
return "No gloss provided", 400
video_bytes = generate_complete_video(gloss_list, dataset, list_2000_tokens)
if not video_bytes:
return "Failed to generate video", 500
return send_file(
io.BytesIO(video_bytes),
mimetype='video/mp4',
as_attachment=True,
download_name='sign_language.mp4'
)
except Exception as e:
logger.error(f"Video download error: {str(e)}")
return f"Error downloading video: {str(e)}", 500
# 만약 main.py를 직접 실행하는 경우(개발 환경)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=True)
# ---------------------------------------------------------------------
# Gunicorn 설정을 main.py 파일 내에 포함 (Gunicorn 실행 시 -c 옵션으로 사용)
# 아래 변수들은 Gunicorn이 main.py를 설정 파일로 사용할 경우 적용됩니다.
bind = "0.0.0.0:7860"
workers = 4
timeout = 120 # worker timeout을 120초로 설정