import display_gloss as dg import synonyms_preprocess as sp from NLP_Spacy_base_translator import NlpSpacyBaseTranslator from flask import Flask, render_template, Response, request, send_file import io import cv2 import numpy as np import os import requests from concurrent.futures import ThreadPoolExecutor from urllib.parse import quote, unquote import tempfile import re from functools import lru_cache from typing import List, Dict, Any import logging from contextlib import contextmanager # 로깅 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__, static_folder='static') app.config['TITLE'] = 'Sign Language Translate' # 전역 변수를 초기화하고 캐싱 nlp, dict_docs_spacy = sp.load_spacy_values() dataset, list_2000_tokens = dg.load_data() # 스레드 풀 생성 executor = ThreadPoolExecutor(max_workers=4) # 메모리 캐시 데코레이터 @lru_cache(maxsize=1000) def clean_quotes(text: str) -> str: """따옴표 정리 함수""" text = re.sub(r"'+", "'", text) text = re.sub(r'\s+', ' ', text).strip() return text @lru_cache(maxsize=1000) def is_korean(text: str) -> bool: """한글이 포함되어 있는지 확인""" return bool(re.search('[가-힣]', text)) @lru_cache(maxsize=1000) def is_english(text: str) -> bool: """텍스트가 영어인지 확인하는 함수""" text_without_quotes = re.sub(r"'[^']*'|\s", "", text) return bool(re.match(r'^[A-Za-z.,!?-]*$', text_without_quotes)) @lru_cache(maxsize=1000) def normalize_quotes(text: str) -> str: """따옴표 형식을 정규화하는 함수""" text = re.sub(r"'+", "'", text) text = re.sub(r'\s+', ' ', text).strip() if re.search(r"'[^']*'", text): return text return text @lru_cache(maxsize=1000) def find_quoted_words(text: str) -> List[str]: """작은따옴표로 묶인 단어들을 찾는 함수""" return re.findall(r"'([^']*)'", text) @lru_cache(maxsize=1000) def spell_out_word(word: str) -> str: """단어를 개별 알파벳으로 분리하는 함수""" return ' '.join(list(word.lower())) def translate_text_chunk(text: str, source_lang: str, target_lang: str) -> str: """텍스트 번역 함수""" try: url = "https://translate.googleapis.com/translate_a/single" params = { "client": "gtx", "sl": source_lang, "tl": target_lang, "dt": "t", "q": text } response = requests.get(url, params=params) if response.status_code != 200: logger.error(f"Translation API error: {response.status_code}") return text data = response.json() return ' '.join(item[0] for item in data[0] if item[0]) except Exception as e: logger.error(f"Translation error: {e}") return text def translate_korean_text(text: str) -> str: """한글 전용 번역 함수""" try: quoted_match = re.search(r"'([^']*)'", text) if not quoted_match: # 인용부호가 없는 경우 전체 텍스트를 번역하도록 함 translated_text = translate_text_chunk(text, "ko", "en") # 번역 후 추가 전처리 (필요한 경우) translated_text = re.sub(r'\bNAME\b', 'name', translated_text) translated_text = translated_text.replace(" .", ".") return translated_text quoted_word = quoted_match.group(1) # 본문 번역: 인용부호 단어를 placeholder로 대체 main_text = text.replace(f"'{quoted_word}'", "XXXXX") translated_main = translate_text_chunk(main_text, "ko", "en") # 인용된 단어 처리 if re.match(r'^[A-Za-z]+$', quoted_word): proper_noun = quoted_word.upper() else: proper_noun = translate_text_chunk(quoted_word, "ko", "en").upper() final_text = translated_main.replace("XXXXX", f"'{proper_noun}'") final_text = re.sub(r'\bNAME\b', 'name', final_text) final_text = final_text.replace(" .", ".") return final_text except Exception as e: logger.error(f"Korean translation error: {e}") return text def translate_korean_to_english(text: str) -> str: """전체 텍스트 번역 함수""" try: text = normalize_quotes(text) if is_english(text): # 영어 문장 안의 작은따옴표 단어를 대문자로 바꿔주는 처리 quoted_match = re.search(r"'([^']*)'", text) if quoted_match: quoted_word = quoted_match.group(1).upper() text = re.sub(r"'[^']*'", f"'{quoted_word}'", text, 1) return text if is_korean(text): return translate_korean_text(text) return text except Exception as e: logger.error(f"Translation error: {e}") return text def process_frame(frame_data: bytes) -> np.ndarray: """프레임 처리 함수""" try: frame_content = frame_data.split(b'\r\n\r\n')[1] nparr = np.frombuffer(frame_content, np.uint8) return cv2.imdecode(nparr, cv2.IMREAD_COLOR) except Exception as e: logger.error(f"Frame processing error: {e}") raise @contextmanager def video_writer(path: str, frame_size: tuple, fps: int = 25): """비디오 작성을 위한 컨텍스트 매니저""" fourcc = cv2.VideoWriter_fourcc(*'mp4v') writer = cv2.VideoWriter(path, fourcc, fps, frame_size) try: yield writer finally: writer.release() def generate_complete_video(gloss_list: List[str], dataset: Dict[str, Any], list_2000_tokens: List[str]) -> bytes: """최적화된 비디오 생성 함수""" try: frames = [] is_spelling = False # 프레임 생성을 병렬로 처리 with ThreadPoolExecutor() as executor: for gloss in gloss_list: if gloss == 'FINGERSPELL-START': is_spelling = True continue elif gloss == 'FINGERSPELL-END': is_spelling = False continue frame_futures = [ executor.submit(process_frame, frame) for frame in dg.generate_video([gloss], dataset, list_2000_tokens) ] frames.extend([future.result() for future in frame_futures]) if not frames: raise Exception("No frames generated") height, width = frames[0].shape[:2] with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file: temp_path = temp_file.name with video_writer(temp_path, (width, height)) as out: for frame in frames: out.write(frame) with open(temp_path, 'rb') as f: video_bytes = f.read() os.remove(temp_path) return video_bytes except Exception as e: logger.error(f"Video generation error: {str(e)}") raise @app.route('/') def index(): return render_template('index.html', title=app.config['TITLE']) @app.route('/translate/', methods=['POST']) def result(): if request.method == 'POST': input_text = request.form['inputSentence'].strip() if not input_text: # 에러 페이지로 이동 return render_template('error.html', error="Please enter text to translate") try: input_text = normalize_quotes(input_text) english_text = translate_korean_to_english(input_text) if not english_text: raise Exception("Translation failed") quoted_words = find_quoted_words(english_text) # NLP 처리를 스레드 풀에서 실행 def process_nlp(): clean_english = re.sub(r"'([^']*)'", r"\1", english_text) eng_to_asl_translator = NlpSpacyBaseTranslator(sentence=clean_english) return eng_to_asl_translator.translate_to_gloss() generated_gloss = executor.submit(process_nlp).result() # 작은따옴표 처리(지정된 영단어는 fingerspell로 변환) processed_gloss = [] words = generated_gloss.split() for word in words: word_upper = word.upper() if quoted_words and word_upper in [w.upper() for w in quoted_words]: spelled_word = spell_out_word(word) processed_gloss.extend(['FINGERSPELL-START'] + spelled_word.split() + ['FINGERSPELL-END']) else: processed_gloss.append(word.lower()) gloss_sentence_before_synonym = " ".join(processed_gloss) # 동의어 처리 final_gloss = [] i = 0 while i < len(processed_gloss): if processed_gloss[i] == 'FINGERSPELL-START': # fingerspell 구간은 그대로 넣는다 final_gloss.extend(processed_gloss[i:i+2]) # FINGERSPELL-START + 첫 글자 i += 2 while i < len(processed_gloss) and processed_gloss[i] != 'FINGERSPELL-END': final_gloss.append(processed_gloss[i]) i += 1 if i < len(processed_gloss): final_gloss.append(processed_gloss[i]) # FINGERSPELL-END i += 1 else: word = processed_gloss[i] # 동의어 찾기를 스레드 풀에서 비동기로 실행하여 결과 얻기 synonym = executor.submit( sp.find_synonyms, word, nlp, dict_docs_spacy, list_2000_tokens ).result() final_gloss.append(synonym) i += 1 gloss_sentence_after_synonym = " ".join(final_gloss) return render_template( 'result.html', title=app.config['TITLE'], original_sentence=input_text, english_translation=english_text, gloss_sentence_before_synonym=gloss_sentence_before_synonym, gloss_sentence_after_synonym=gloss_sentence_after_synonym ) except Exception as e: logger.error(f"Translation processing error: {str(e)}") return render_template('error.html', error=f"Translation error: {str(e)}") @app.route('/video_feed') def video_feed(): sentence = request.args.get('gloss_sentence_to_display', '') gloss_list = sentence.split() return Response( dg.generate_video(gloss_list, dataset, list_2000_tokens), mimetype='multipart/x-mixed-replace; boundary=frame' ) @app.route('/download_video/') def download_video(gloss_sentence: str): try: decoded_sentence = unquote(gloss_sentence) gloss_list = decoded_sentence.split() if not gloss_list: return "No gloss provided", 400 video_bytes = generate_complete_video(gloss_list, dataset, list_2000_tokens) if not video_bytes: return "Failed to generate video", 500 return send_file( io.BytesIO(video_bytes), mimetype='video/mp4', as_attachment=True, download_name='sign_language.mp4' ) except Exception as e: logger.error(f"Video download error: {str(e)}") return f"Error downloading video: {str(e)}", 500 # 만약 main.py를 직접 실행하는 경우(개발 환경) if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=True) # --------------------------------------------------------------------- # Gunicorn 설정을 main.py 파일 내에 포함 (Gunicorn 실행 시 -c 옵션으로 사용) # 아래 변수들은 Gunicorn이 main.py를 설정 파일로 사용할 경우 적용됩니다. bind = "0.0.0.0:7860" workers = 4 timeout = 120 # worker timeout을 120초로 설정