import gradio as gr import subprocess import os import shutil import tempfile import torch import logging import numpy as np import re from concurrent.futures import ThreadPoolExecutor from functools import lru_cache # 로깅 설정 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('yue_generation.log'), logging.StreamHandler() ] ) # 가사 분석 함수 def analyze_lyrics(lyrics): # 줄 단위로 분리 lines = [line.strip() for line in lyrics.split('\n') if line.strip()] # 섹션 카운트 sections = { 'verse': 0, 'chorus': 0, 'bridge': 0, 'total_lines': len(lines) } current_section = None section_lines = { 'verse': 0, 'chorus': 0, 'bridge': 0 } for line in lines: lower_line = line.lower() if '[verse]' in lower_line: current_section = 'verse' sections['verse'] += 1 elif '[chorus]' in lower_line: current_section = 'chorus' sections['chorus'] += 1 elif '[bridge]' in lower_line: current_section = 'bridge' sections['bridge'] += 1 elif current_section and line.strip(): section_lines[current_section] += 1 # 총 섹션 수 계산 total_sections = sections['verse'] + sections['chorus'] + sections['bridge'] return sections, total_sections, len(lines), section_lines def calculate_generation_params(lyrics): sections, total_sections, total_lines, section_lines = analyze_lyrics(lyrics) # 기본 토큰 수 계산 base_tokens_per_line = 200 verse_tokens = section_lines['verse'] * base_tokens_per_line chorus_tokens = section_lines['chorus'] * (base_tokens_per_line * 1.5) # 코러스는 50% 더 많은 토큰 bridge_tokens = section_lines['bridge'] * base_tokens_per_line # 총 토큰 수 계산 total_tokens = int(verse_tokens + chorus_tokens + bridge_tokens) # 섹션 기반 세그먼트 수 계산 num_segments = max(2, min(4, total_sections)) # 토큰 수 제한 max_tokens = min(32000, max(3000, total_tokens)) return { 'max_tokens': max_tokens, 'num_segments': num_segments, 'sections': sections, 'section_lines': section_lines } # 언어 감지 및 모델 선택 함수 def detect_and_select_model(text): if re.search(r'[\u3131-\u318E\uAC00-\uD7A3]', text): # 한글 return "m-a-p/YuE-s1-7B-anneal-jp-kr-cot" elif re.search(r'[\u4e00-\u9fff]', text): # 중국어 return "m-a-p/YuE-s1-7B-anneal-zh-cot" elif re.search(r'[\u3040-\u309F\u30A0-\u30FF]', text): # 일본어 return "m-a-p/YuE-s1-7B-anneal-jp-kr-cot" else: # 영어/기타 return "m-a-p/YuE-s1-7B-anneal-en-cot" def optimize_model_selection(lyrics, genre): model_path = detect_and_select_model(lyrics) params = calculate_generation_params(lyrics) model_config = { "m-a-p/YuE-s1-7B-anneal-en-cot": { "max_tokens": params['max_tokens'], "temperature": 0.8, "batch_size": 8, "num_segments": params['num_segments'], "chorus_strength": 1.2 if params['sections']['chorus'] > 0 else 1.0 }, "m-a-p/YuE-s1-7B-anneal-jp-kr-cot": { "max_tokens": params['max_tokens'], "temperature": 0.7, "batch_size": 8, "num_segments": params['num_segments'], "chorus_strength": 1.2 if params['sections']['chorus'] > 0 else 1.0 }, "m-a-p/YuE-s1-7B-anneal-zh-cot": { "max_tokens": params['max_tokens'], "temperature": 0.7, "batch_size": 8, "num_segments": params['num_segments'], "chorus_strength": 1.2 if params['sections']['chorus'] > 0 else 1.0 } } return model_path, model_config[model_path], params # GPU 설정 최적화 def optimize_gpu_settings(): if torch.cuda.is_available(): torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.benchmark = True torch.backends.cudnn.deterministic = False torch.backends.cudnn.enabled = True torch.cuda.empty_cache() torch.cuda.set_device(0) logging.info(f"Using GPU: {torch.cuda.get_device_name(0)}") logging.info(f"Available GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB") else: logging.warning("GPU not available!") def install_flash_attn(): try: logging.info("Installing flash-attn...") subprocess.run( ["pip", "install", "flash-attn", "--no-build-isolation"], check=True, capture_output=True ) logging.info("flash-attn installed successfully!") except subprocess.CalledProcessError as e: logging.error(f"Failed to install flash-attn: {e}") raise def initialize_system(): optimize_gpu_settings() install_flash_attn() from huggingface_hub import snapshot_download folder_path = './inference/xcodec_mini_infer' os.makedirs(folder_path, exist_ok=True) logging.info(f"Created folder at: {folder_path}") snapshot_download( repo_id="m-a-p/xcodec_mini_infer", local_dir="./inference/xcodec_mini_infer", resume_download=True ) try: os.chdir("./inference") logging.info(f"Working directory changed to: {os.getcwd()}") except FileNotFoundError as e: logging.error(f"Directory error: {e}") raise @lru_cache(maxsize=100) def get_cached_file_path(content_hash, prefix): return create_temp_file(content_hash, prefix) def empty_output_folder(output_dir): try: shutil.rmtree(output_dir) os.makedirs(output_dir) logging.info(f"Output folder cleaned: {output_dir}") except Exception as e: logging.error(f"Error cleaning output folder: {e}") raise def create_temp_file(content, prefix, suffix=".txt"): temp_file = tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=prefix, suffix=suffix) content = content.strip() + "\n\n" content = content.replace("\r\n", "\n").replace("\r", "\n") temp_file.write(content) temp_file.close() logging.debug(f"Temporary file created: {temp_file.name}") return temp_file.name def get_last_mp3_file(output_dir): mp3_files = [f for f in os.listdir(output_dir) if f.endswith('.mp3')] if not mp3_files: logging.warning("No MP3 files found") return None mp3_files_with_path = [os.path.join(output_dir, f) for f in mp3_files] mp3_files_with_path.sort(key=os.path.getmtime, reverse=True) return mp3_files_with_path[0] def infer(genre_txt_content, lyrics_txt_content, num_segments, max_new_tokens): try: # 모델 선택 및 설정 model_path, config, params = optimize_model_selection(lyrics_txt_content, genre_txt_content) logging.info(f"Selected model: {model_path}") logging.info(f"Lyrics analysis: {params}") # 실제 사용할 파라미터 actual_num_segments = config['num_segments'] actual_max_tokens = config['max_tokens'] logging.info(f"Using segments: {actual_num_segments}, tokens: {actual_max_tokens}") # 임시 파일 생성 genre_txt_path = create_temp_file(genre_txt_content, prefix="genre_") lyrics_txt_path = create_temp_file(lyrics_txt_content, prefix="lyrics_") output_dir = "./output" os.makedirs(output_dir, exist_ok=True) empty_output_folder(output_dir) # 명령어 구성 command = [ "python", "infer.py", "--stage1_model", model_path, "--stage2_model", "m-a-p/YuE-s2-1B-general", "--genre_txt", genre_txt_path, "--lyrics_txt", lyrics_txt_path, "--run_n_segments", str(actual_num_segments), "--stage2_batch_size", str(config['batch_size']), "--output_dir", output_dir, "--cuda_idx", "0", "--max_new_tokens", str(actual_max_tokens), "--temperature", str(config['temperature']), "--disable_offload_model", "--use_flash_attention_2", "--bf16", "--chorus_strength", str(config['chorus_strength']) ] # CUDA 환경 변수 설정 env = os.environ.copy() env.update({ "CUDA_VISIBLE_DEVICES": "0", "CUDA_HOME": "/usr/local/cuda", "PATH": f"/usr/local/cuda/bin:{env.get('PATH', '')}", "LD_LIBRARY_PATH": f"/usr/local/cuda/lib64:{env.get('LD_LIBRARY_PATH', '')}", "PYTORCH_CUDA_ALLOC_CONF": "max_split_size_mb:512" }) # 명령 실행 process = subprocess.run(command, env=env, check=True, capture_output=True) logging.info("Inference completed successfully") # 결과 처리 last_mp3 = get_last_mp3_file(output_dir) if last_mp3: logging.info(f"Generated audio file: {last_mp3}") return last_mp3 else: logging.warning("No output audio file generated") return None except Exception as e: logging.error(f"Inference error: {e}") raise finally: # 임시 파일 정리 for file in [genre_txt_path, lyrics_txt_path]: try: os.remove(file) logging.debug(f"Removed temporary file: {file}") except Exception as e: logging.warning(f"Failed to remove temporary file {file}: {e}") # Gradio 인터페이스 with gr.Blocks() as demo: with gr.Column(): gr.Markdown("# YuE: Open Music Foundation Models for Full-Song Generation (Multi-Language Support)") gr.HTML("""
""") with gr.Row(): with gr.Column(): genre_txt = gr.Textbox( label="Genre", placeholder="Enter music genre and style descriptions..." ) lyrics_txt = gr.Textbox( label="Lyrics (Supports English, Korean, Japanese, Chinese)", placeholder="Enter song lyrics with [verse], [chorus], [bridge] tags...", lines=10 ) with gr.Column(): num_segments = gr.Number( label="Number of Song Segments (Auto-adjusted based on lyrics)", value=2, minimum=1, maximum=4, step=1, interactive=False ) max_new_tokens = gr.Slider( label="Max New Tokens (Auto-adjusted based on lyrics)", minimum=500, maximum=32000, step=500, value=4000, interactive=False ) submit_btn = gr.Button("Generate Music", variant="primary") music_out = gr.Audio(label="Generated Audio") # 다국어 예제 gr.Examples( examples=[ # 영어 예제 [ "female blues airy vocal bright vocal piano sad romantic guitar jazz", """[verse] In the quiet of the evening, shadows start to fall Whispers of the night wind echo through the hall Lost within the silence, I hear your gentle voice Guiding me back homeward, making my heart rejoice [chorus] Don't let this moment fade, hold me close tonight With you here beside me, everything's alright Can't imagine life alone, don't want to let you go Stay with me forever, let our love just flow [verse] Morning light is breaking, through the window pane Memories of yesterday, like soft summer rain In your arms I'm finding, all I'm dreaming of Every day beside you, fills my heart with love [chorus] Don't let this moment fade, hold me close tonight With you here beside me, everything's alright Can't imagine life alone, don't want to let you go Stay with me forever, let our love just flow """ ], # 한국어 예제 [ "K-pop bright energetic synth dance electronic", """[verse] 빛나는 별들처럼 우리의 꿈이 저 하늘을 수놓아 반짝이네 함께라면 어디든 갈 수 있어 우리의 이야기가 시작되네 [chorus] 달려가자 더 높이 더 멀리 두려움은 없어 너와 함께라면 영원히 계속될 우리의 노래 이 순간을 기억해 forever [verse] 새로운 내일을 향해 나아가 우리만의 길을 만들어가 믿음으로 가득한 우리의 맘 절대 멈추지 않아 계속해서 [chorus] 달려가자 더 높이 더 멀리 두려움은 없어 너와 함께라면 영원히 계속될 우리의 노래 이 순간을 기억해 forever """ ] ], inputs=[genre_txt, lyrics_txt] ) # 시스템 초기화 initialize_system() # 이벤트 핸들러 submit_btn.click( fn=infer, inputs=[genre_txt, lyrics_txt, num_segments, max_new_tokens], outputs=[music_out] ) # 서버 설정으로 실행 demo.queue(concurrency_count=2).launch( server_name="0.0.0.0", server_port=7860, share=True, enable_queue=True, show_api=True, show_error=True )