ginipick commited on
Commit
68397f9
·
verified ·
1 Parent(s): c7329e7

Update src/main-backup.py

Browse files
Files changed (1) hide show
  1. src/main-backup.py +176 -90
src/main-backup.py CHANGED
@@ -7,99 +7,134 @@ import cv2
7
  import numpy as np
8
  import os
9
  import requests
 
10
  from urllib.parse import quote, unquote
11
  import tempfile
12
  import re
 
 
 
 
 
 
 
 
13
 
14
  app = Flask(__name__, static_folder='static')
15
  app.config['TITLE'] = 'Sign Language Translate'
16
 
 
17
  nlp, dict_docs_spacy = sp.load_spacy_values()
18
  dataset, list_2000_tokens = dg.load_data()
19
 
20
- def clean_quotes(text):
 
 
 
 
 
21
  """따옴표 정리 함수"""
22
  text = re.sub(r"'+", "'", text)
23
  text = re.sub(r'\s+', ' ', text).strip()
24
  return text
25
 
26
- def is_korean(text):
 
27
  """한글이 포함되어 있는지 확인"""
28
  return bool(re.search('[가-힣]', text))
29
 
30
- def is_english(text):
 
31
  """텍스트가 영어인지 확인하는 함수"""
32
  text_without_quotes = re.sub(r"'[^']*'|\s", "", text)
33
  return bool(re.match(r'^[A-Za-z.,!?-]*$', text_without_quotes))
34
 
35
- def normalize_quotes(text):
 
36
  """따옴표 형식을 정규화하는 함수"""
37
  text = re.sub(r"'+", "'", text)
38
  text = re.sub(r'\s+', ' ', text).strip()
39
 
40
- # 이미 따옴표로 묶인 단어가 있으면 그대로 반환
41
  if re.search(r"'[^']*'", text):
42
  return text
43
-
44
  return text
45
 
46
- def find_quoted_words(text):
 
47
  """작은따옴표로 묶인 단어들을 찾는 함수"""
48
  return re.findall(r"'([^']*)'", text)
49
 
50
- def spell_out_word(word):
 
51
  """단어를 개별 알파벳으로 분리하는 함수"""
52
  return ' '.join(list(word.lower()))
53
 
54
- def translate_korean_text(text):
55
- """한글 전용 번역 함수"""
56
  try:
57
- quoted_match = re.search(r"'([^']*)'", text)
58
- if not quoted_match:
59
- return text
60
-
61
- quoted_word = quoted_match.group(1)
62
-
63
  url = "https://translate.googleapis.com/translate_a/single"
64
  params = {
65
  "client": "gtx",
66
- "sl": "ko",
67
- "tl": "en",
68
  "dt": "t",
69
- "q": text.replace(f"'{quoted_word}'", "XXXXX")
70
  }
 
71
  response = requests.get(url, params=params)
72
  if response.status_code != 200:
 
73
  return text
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
 
75
- translated_text = ' '.join(item[0] for item in response.json()[0] if item[0])
 
 
 
 
76
 
 
77
  if re.match(r'^[A-Za-z]+$', quoted_word):
78
  proper_noun = quoted_word.upper()
79
  else:
80
- params["q"] = quoted_word
81
- response = requests.get(url, params=params)
82
- if response.status_code == 200:
83
- proper_noun = response.json()[0][0][0].upper()
84
- else:
85
- proper_noun = quoted_word.upper()
86
 
87
- final_text = translated_text.replace("XXXXX", f"'{proper_noun}'")
88
  final_text = re.sub(r'\bNAME\b', 'name', final_text)
89
  final_text = final_text.replace(" .", ".")
90
 
91
  return final_text
92
 
93
  except Exception as e:
94
- print(f"Korean translation error: {e}")
95
  return text
96
 
97
- def translate_korean_to_english(text):
 
98
  """전체 텍스트 번역 함수"""
99
  try:
100
  text = normalize_quotes(text)
101
 
102
  if is_english(text):
 
103
  quoted_match = re.search(r"'([^']*)'", text)
104
  if quoted_match:
105
  quoted_word = quoted_match.group(1).upper()
@@ -111,9 +146,73 @@ def translate_korean_to_english(text):
111
 
112
  return text
113
  except Exception as e:
114
- print(f"Translation error: {e}")
115
  return text
116
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
  @app.route('/')
118
  def index():
119
  return render_template('index.html', title=app.config['TITLE'])
@@ -123,6 +222,7 @@ def result():
123
  if request.method == 'POST':
124
  input_text = request.form['inputSentence'].strip()
125
  if not input_text:
 
126
  return render_template('error.html', error="Please enter text to translate")
127
 
128
  try:
@@ -133,10 +233,15 @@ def result():
133
 
134
  quoted_words = find_quoted_words(english_text)
135
 
136
- clean_english = re.sub(r"'([^']*)'", r"\1", english_text)
137
- eng_to_asl_translator = NlpSpacyBaseTranslator(sentence=clean_english)
138
- generated_gloss = eng_to_asl_translator.translate_to_gloss()
 
 
 
 
139
 
 
140
  processed_gloss = []
141
  words = generated_gloss.split()
142
 
@@ -150,86 +255,59 @@ def result():
150
 
151
  gloss_sentence_before_synonym = " ".join(processed_gloss)
152
 
 
153
  final_gloss = []
154
  i = 0
155
  while i < len(processed_gloss):
156
  if processed_gloss[i] == 'FINGERSPELL-START':
157
- final_gloss.extend(processed_gloss[i:i+2])
 
158
  i += 2
159
  while i < len(processed_gloss) and processed_gloss[i] != 'FINGERSPELL-END':
160
  final_gloss.append(processed_gloss[i])
161
  i += 1
162
  if i < len(processed_gloss):
163
- final_gloss.append(processed_gloss[i])
164
  i += 1
165
  else:
166
  word = processed_gloss[i]
167
- final_gloss.append(sp.find_synonyms(word, nlp, dict_docs_spacy, list_2000_tokens))
 
 
 
 
 
 
 
 
168
  i += 1
169
 
170
  gloss_sentence_after_synonym = " ".join(final_gloss)
171
 
172
- return render_template('result.html',
173
- title=app.config['TITLE'],
174
- original_sentence=input_text,
175
- english_translation=english_text,
176
- gloss_sentence_before_synonym=gloss_sentence_before_synonym,
177
- gloss_sentence_after_synonym=gloss_sentence_after_synonym)
 
 
 
178
  except Exception as e:
 
179
  return render_template('error.html', error=f"Translation error: {str(e)}")
180
 
181
- def generate_complete_video(gloss_list, dataset, list_2000_tokens):
182
- try:
183
- frames = []
184
- is_spelling = False
185
-
186
- for gloss in gloss_list:
187
- if gloss == 'FINGERSPELL-START':
188
- is_spelling = True
189
- continue
190
- elif gloss == 'FINGERSPELL-END':
191
- is_spelling = False
192
- continue
193
-
194
- for frame in dg.generate_video([gloss], dataset, list_2000_tokens):
195
- frame_data = frame.split(b'\r\n\r\n')[1]
196
- nparr = np.frombuffer(frame_data, np.uint8)
197
- img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
198
- frames.append(img)
199
-
200
- if not frames:
201
- raise Exception("No frames generated")
202
-
203
- height, width = frames[0].shape[:2]
204
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
205
-
206
- with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
207
- temp_path = temp_file.name
208
-
209
- out = cv2.VideoWriter(temp_path, fourcc, 25, (width, height))
210
-
211
- for frame in frames:
212
- out.write(frame)
213
- out.release()
214
-
215
- with open(temp_path, 'rb') as f:
216
- video_bytes = f.read()
217
-
218
- os.remove(temp_path)
219
- return video_bytes
220
- except Exception as e:
221
- print(f"Error generating video: {str(e)}")
222
- raise
223
-
224
  @app.route('/video_feed')
225
  def video_feed():
226
  sentence = request.args.get('gloss_sentence_to_display', '')
227
  gloss_list = sentence.split()
228
- return Response(dg.generate_video(gloss_list, dataset, list_2000_tokens),
229
- mimetype='multipart/x-mixed-replace; boundary=frame')
 
 
230
 
231
  @app.route('/download_video/<path:gloss_sentence>')
232
- def download_video(gloss_sentence):
233
  try:
234
  decoded_sentence = unquote(gloss_sentence)
235
  gloss_list = decoded_sentence.split()
@@ -249,8 +327,16 @@ def download_video(gloss_sentence):
249
  download_name='sign_language.mp4'
250
  )
251
  except Exception as e:
252
- print(f"Download error: {str(e)}")
253
  return f"Error downloading video: {str(e)}", 500
254
 
 
255
  if __name__ == "__main__":
256
- app.run(host="0.0.0.0", port=7860, debug=True)
 
 
 
 
 
 
 
 
7
  import numpy as np
8
  import os
9
  import requests
10
+ from concurrent.futures import ThreadPoolExecutor
11
  from urllib.parse import quote, unquote
12
  import tempfile
13
  import re
14
+ from functools import lru_cache
15
+ from typing import List, Dict, Any
16
+ import logging
17
+ from contextlib import contextmanager
18
+
19
+ # 로깅 설정
20
+ logging.basicConfig(level=logging.INFO)
21
+ logger = logging.getLogger(__name__)
22
 
23
  app = Flask(__name__, static_folder='static')
24
  app.config['TITLE'] = 'Sign Language Translate'
25
 
26
+ # 전역 변수를 초기화하고 캐싱
27
  nlp, dict_docs_spacy = sp.load_spacy_values()
28
  dataset, list_2000_tokens = dg.load_data()
29
 
30
+ # 스레드 풀 생성
31
+ executor = ThreadPoolExecutor(max_workers=4)
32
+
33
+ # 메모리 캐시 데코레이터
34
+ @lru_cache(maxsize=1000)
35
+ def clean_quotes(text: str) -> str:
36
  """따옴표 정리 함수"""
37
  text = re.sub(r"'+", "'", text)
38
  text = re.sub(r'\s+', ' ', text).strip()
39
  return text
40
 
41
+ @lru_cache(maxsize=1000)
42
+ def is_korean(text: str) -> bool:
43
  """한글이 포함되어 있는지 확인"""
44
  return bool(re.search('[가-힣]', text))
45
 
46
+ @lru_cache(maxsize=1000)
47
+ def is_english(text: str) -> bool:
48
  """텍스트가 영어인지 확인하는 함수"""
49
  text_without_quotes = re.sub(r"'[^']*'|\s", "", text)
50
  return bool(re.match(r'^[A-Za-z.,!?-]*$', text_without_quotes))
51
 
52
+ @lru_cache(maxsize=1000)
53
+ def normalize_quotes(text: str) -> str:
54
  """따옴표 형식을 정규화하는 함수"""
55
  text = re.sub(r"'+", "'", text)
56
  text = re.sub(r'\s+', ' ', text).strip()
57
 
 
58
  if re.search(r"'[^']*'", text):
59
  return text
 
60
  return text
61
 
62
+ @lru_cache(maxsize=1000)
63
+ def find_quoted_words(text: str) -> List[str]:
64
  """작은따옴표로 묶인 단어들을 찾는 함수"""
65
  return re.findall(r"'([^']*)'", text)
66
 
67
+ @lru_cache(maxsize=1000)
68
+ def spell_out_word(word: str) -> str:
69
  """단어를 개별 알파벳으로 분리하는 함수"""
70
  return ' '.join(list(word.lower()))
71
 
72
+ def translate_text_chunk(text: str, source_lang: str, target_lang: str) -> str:
73
+ """텍스트 번역 함수"""
74
  try:
 
 
 
 
 
 
75
  url = "https://translate.googleapis.com/translate_a/single"
76
  params = {
77
  "client": "gtx",
78
+ "sl": source_lang,
79
+ "tl": target_lang,
80
  "dt": "t",
81
+ "q": text
82
  }
83
+
84
  response = requests.get(url, params=params)
85
  if response.status_code != 200:
86
+ logger.error(f"Translation API error: {response.status_code}")
87
  return text
88
+
89
+ data = response.json()
90
+ return ' '.join(item[0] for item in data[0] if item[0])
91
+
92
+ except Exception as e:
93
+ logger.error(f"Translation error: {e}")
94
+ return text
95
+
96
+ def translate_korean_text(text: str) -> str:
97
+ """한글 전용 번역 함수"""
98
+ try:
99
+ quoted_match = re.search(r"'([^']*)'", text)
100
+ if not quoted_match:
101
+ # 인용부호가 없는 경우 전체 텍스트를 번역하도록 함
102
+ translated_text = translate_text_chunk(text, "ko", "en")
103
+ # 번역 후 추가 전처리 (필요한 경우)
104
+ translated_text = re.sub(r'\bNAME\b', 'name', translated_text)
105
+ translated_text = translated_text.replace(" .", ".")
106
+ return translated_text
107
 
108
+ quoted_word = quoted_match.group(1)
109
+
110
+ # 본문 번역: 인용부호 단어를 placeholder로 대체
111
+ main_text = text.replace(f"'{quoted_word}'", "XXXXX")
112
+ translated_main = translate_text_chunk(main_text, "ko", "en")
113
 
114
+ # 인용된 단어 처리
115
  if re.match(r'^[A-Za-z]+$', quoted_word):
116
  proper_noun = quoted_word.upper()
117
  else:
118
+ proper_noun = translate_text_chunk(quoted_word, "ko", "en").upper()
 
 
 
 
 
119
 
120
+ final_text = translated_main.replace("XXXXX", f"'{proper_noun}'")
121
  final_text = re.sub(r'\bNAME\b', 'name', final_text)
122
  final_text = final_text.replace(" .", ".")
123
 
124
  return final_text
125
 
126
  except Exception as e:
127
+ logger.error(f"Korean translation error: {e}")
128
  return text
129
 
130
+
131
+ def translate_korean_to_english(text: str) -> str:
132
  """전체 텍스트 번역 함수"""
133
  try:
134
  text = normalize_quotes(text)
135
 
136
  if is_english(text):
137
+ # 영어 문장 안의 작은따옴표 단어를 대문자로 바꿔주는 처리
138
  quoted_match = re.search(r"'([^']*)'", text)
139
  if quoted_match:
140
  quoted_word = quoted_match.group(1).upper()
 
146
 
147
  return text
148
  except Exception as e:
149
+ logger.error(f"Translation error: {e}")
150
  return text
151
 
152
+ def process_frame(frame_data: bytes) -> np.ndarray:
153
+ """프레임 처리 함수"""
154
+ try:
155
+ frame_content = frame_data.split(b'\r\n\r\n')[1]
156
+ nparr = np.frombuffer(frame_content, np.uint8)
157
+ return cv2.imdecode(nparr, cv2.IMREAD_COLOR)
158
+ except Exception as e:
159
+ logger.error(f"Frame processing error: {e}")
160
+ raise
161
+
162
+ @contextmanager
163
+ def video_writer(path: str, frame_size: tuple, fps: int = 25):
164
+ """비디오 작성을 위한 컨텍스트 매니저"""
165
+ fourcc = cv2.VideoWriter_fourcc(*'mp4v')
166
+ writer = cv2.VideoWriter(path, fourcc, fps, frame_size)
167
+ try:
168
+ yield writer
169
+ finally:
170
+ writer.release()
171
+
172
+ def generate_complete_video(gloss_list: List[str], dataset: Dict[str, Any], list_2000_tokens: List[str]) -> bytes:
173
+ """최적화된 비디오 생성 함수"""
174
+ try:
175
+ frames = []
176
+ is_spelling = False
177
+
178
+ # 프레임 생성을 병렬로 처리
179
+ with ThreadPoolExecutor() as executor:
180
+ for gloss in gloss_list:
181
+ if gloss == 'FINGERSPELL-START':
182
+ is_spelling = True
183
+ continue
184
+ elif gloss == 'FINGERSPELL-END':
185
+ is_spelling = False
186
+ continue
187
+
188
+ frame_futures = [
189
+ executor.submit(process_frame, frame)
190
+ for frame in dg.generate_video([gloss], dataset, list_2000_tokens)
191
+ ]
192
+ frames.extend([future.result() for future in frame_futures])
193
+
194
+ if not frames:
195
+ raise Exception("No frames generated")
196
+
197
+ height, width = frames[0].shape[:2]
198
+
199
+ with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
200
+ temp_path = temp_file.name
201
+
202
+ with video_writer(temp_path, (width, height)) as out:
203
+ for frame in frames:
204
+ out.write(frame)
205
+
206
+ with open(temp_path, 'rb') as f:
207
+ video_bytes = f.read()
208
+
209
+ os.remove(temp_path)
210
+ return video_bytes
211
+
212
+ except Exception as e:
213
+ logger.error(f"Video generation error: {str(e)}")
214
+ raise
215
+
216
  @app.route('/')
217
  def index():
218
  return render_template('index.html', title=app.config['TITLE'])
 
222
  if request.method == 'POST':
223
  input_text = request.form['inputSentence'].strip()
224
  if not input_text:
225
+ # 에러 페이지로 이동
226
  return render_template('error.html', error="Please enter text to translate")
227
 
228
  try:
 
233
 
234
  quoted_words = find_quoted_words(english_text)
235
 
236
+ # NLP 처리를 스레드 풀에서 실행
237
+ def process_nlp():
238
+ clean_english = re.sub(r"'([^']*)'", r"\1", english_text)
239
+ eng_to_asl_translator = NlpSpacyBaseTranslator(sentence=clean_english)
240
+ return eng_to_asl_translator.translate_to_gloss()
241
+
242
+ generated_gloss = executor.submit(process_nlp).result()
243
 
244
+ # 작은따옴표 처리(지정된 영단어는 fingerspell로 변환)
245
  processed_gloss = []
246
  words = generated_gloss.split()
247
 
 
255
 
256
  gloss_sentence_before_synonym = " ".join(processed_gloss)
257
 
258
+ # 동의어 처리
259
  final_gloss = []
260
  i = 0
261
  while i < len(processed_gloss):
262
  if processed_gloss[i] == 'FINGERSPELL-START':
263
+ # fingerspell 구간은 그대로 넣는다
264
+ final_gloss.extend(processed_gloss[i:i+2]) # FINGERSPELL-START + 첫 글자
265
  i += 2
266
  while i < len(processed_gloss) and processed_gloss[i] != 'FINGERSPELL-END':
267
  final_gloss.append(processed_gloss[i])
268
  i += 1
269
  if i < len(processed_gloss):
270
+ final_gloss.append(processed_gloss[i]) # FINGERSPELL-END
271
  i += 1
272
  else:
273
  word = processed_gloss[i]
274
+ # 동의어 찾기를 스레드 풀에서 비동기로 실행하여 결과 얻기
275
+ synonym = executor.submit(
276
+ sp.find_synonyms,
277
+ word,
278
+ nlp,
279
+ dict_docs_spacy,
280
+ list_2000_tokens
281
+ ).result()
282
+ final_gloss.append(synonym)
283
  i += 1
284
 
285
  gloss_sentence_after_synonym = " ".join(final_gloss)
286
 
287
+ return render_template(
288
+ 'result.html',
289
+ title=app.config['TITLE'],
290
+ original_sentence=input_text,
291
+ english_translation=english_text,
292
+ gloss_sentence_before_synonym=gloss_sentence_before_synonym,
293
+ gloss_sentence_after_synonym=gloss_sentence_after_synonym
294
+ )
295
+
296
  except Exception as e:
297
+ logger.error(f"Translation processing error: {str(e)}")
298
  return render_template('error.html', error=f"Translation error: {str(e)}")
299
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
300
  @app.route('/video_feed')
301
  def video_feed():
302
  sentence = request.args.get('gloss_sentence_to_display', '')
303
  gloss_list = sentence.split()
304
+ return Response(
305
+ dg.generate_video(gloss_list, dataset, list_2000_tokens),
306
+ mimetype='multipart/x-mixed-replace; boundary=frame'
307
+ )
308
 
309
  @app.route('/download_video/<path:gloss_sentence>')
310
+ def download_video(gloss_sentence: str):
311
  try:
312
  decoded_sentence = unquote(gloss_sentence)
313
  gloss_list = decoded_sentence.split()
 
327
  download_name='sign_language.mp4'
328
  )
329
  except Exception as e:
330
+ logger.error(f"Video download error: {str(e)}")
331
  return f"Error downloading video: {str(e)}", 500
332
 
333
+ # 만약 main.py를 직접 실행하는 경우(개발 환경)
334
  if __name__ == "__main__":
335
+ app.run(host="0.0.0.0", port=7860, debug=True)
336
+
337
+ # ---------------------------------------------------------------------
338
+ # Gunicorn 설정을 main.py 파일 내에 포함 (Gunicorn 실행 시 -c 옵션으로 사용)
339
+ # 아래 변수들은 Gunicorn이 main.py를 설정 파일로 사용할 경우 적용됩니다.
340
+ bind = "0.0.0.0:7860"
341
+ workers = 4
342
+ timeout = 120 # worker timeout을 120초로 설정