import spaces import imageio import os import gradio as gr from subprocess import getoutput from diffusers.schedulers import EulerAncestralDiscreteScheduler from transformers import T5EncoderModel, T5Tokenizer from allegro.pipelines.pipeline_allegro import AllegroPipeline from allegro.models.vae.vae_allegro import AllegroAutoencoderKL3D from allegro.models.transformers.transformer_3d_allegro import AllegroTransformer3DModel from huggingface_hub import snapshot_download # Ensure the weights directory exists weights_dir = './allegro_weights' os.makedirs(weights_dir, exist_ok=True) print(f"Downloading models to {weights_dir}...") snapshot_download( repo_id='rhymes-ai/Allegro', local_dir=weights_dir, allow_patterns=['**'] ) print(f"Downloaded models to {weights_dir}.") # Check if the directories exist required_dirs = ['vae', 'text_encoder', 'tokenizer', 'scheduler', 'transformer'] missing_dirs = [d for d in required_dirs if not os.path.exists(os.path.join(weights_dir, d))] if missing_dirs: print(f"Missing directories: {missing_dirs}") raise FileNotFoundError(f"The following directories are missing in '{weights_dir}': {missing_dirs}") else: print(f"All required directories are present in {weights_dir}.") # Check directory structure required_dirs = ['vae', 'text_encoder', 'tokenizer', 'scheduler', 'transformer'] missing_dirs = [d for d in required_dirs if not os.path.exists(os.path.join(weights_dir, d))] if missing_dirs: raise FileNotFoundError(f"The following directories are missing in '{weights_dir}': {missing_dirs}") @spaces.GPU(duration=120) # Request GPU for the entire process def process_pipeline(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload): # is_gpu_associated = torch.cuda.is_available() # if is_gpu_associated: # gpu_info = getoutput('nvidia-smi') # print(f"GPU Info: {gpu_info}") # Define dtype dtype = torch.bfloat16 # Load models vae = AllegroAutoencoderKL3D.from_pretrained( os.path.join(weights_dir, 'vae/'), torch_dtype=torch.float32 ).cuda() vae.eval() text_encoder = T5EncoderModel.from_pretrained( os.path.join(weights_dir, 'text_encoder/'), torch_dtype=dtype ).eval() tokenizer = T5Tokenizer.from_pretrained(os.path.join(weights_dir, 'tokenizer/')) scheduler = EulerAncestralDiscreteScheduler() transformer = AllegroTransformer3DModel.from_pretrained( os.path.join(weights_dir, 'transformer/'), torch_dtype=dtype ).cuda() transformer.eval() allegro_pipeline = AllegroPipeline( vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, scheduler=scheduler, transformer=transformer ).to("cuda:0") # Prompts positive_prompt = """ (masterpiece), (best quality), (ultra-detailed), (unwatermarked), {} emotional, harmonious, vignette, 4k epic detailed, shot on kodak, 35mm photo, sharp focus, high budget, cinemascope, moody, epic, gorgeous """ negative_prompt = """ nsfw, lowres, bad anatomy, bad hands, text, error, missing fingers, extra digit, fewer digits, cropped, worst quality, low quality, normal quality, jpeg artifacts, signature, watermark, username, blurry. """ # Format the user prompt user_prompt = positive_prompt.format(user_prompt.lower().strip()) if enable_cpu_offload: allegro_pipeline.enable_sequential_cpu_offload() # Generate the video out_video = allegro_pipeline( user_prompt, negative_prompt=negative_prompt, num_frames=30, height=360, width=640, num_inference_steps=num_sampling_steps, guidance_scale=guidance_scale, max_sequence_length=512, generator=torch.Generator(device="cuda:0").manual_seed(seed) ).video[0] # Save the video save_path = "./output_videos/generated_video.mp4" os.makedirs(os.path.dirname(save_path), exist_ok=True) imageio.mimwrite(save_path, out_video, fps=15, quality=8) return save_path # Gradio interface function def run_inference(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload, progress=gr.Progress(track_tqdm=True)): result_path = process_pipeline(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload) return result_path css = """ div#col-container { margin: 0 auto; max-width: 800px; } """ # Create Gradio interface with gr.Blocks(css=css) as demo: with gr.Column(elem_id="col-container"): gr.Markdown("# Allegro Video Generation") user_prompt = gr.Textbox(label="User Prompt") with gr.Row(): guidance_scale = gr.Slider(minimum=0, maximum=20, step=0.1, label="Guidance Scale", value=7.5) num_sampling_steps = gr.Slider(minimum=10, maximum=100, step=1, label="Number of Sampling Steps", value=20) with gr.Row(): seed = gr.Slider(minimum=0, maximum=10000, step=1, label="Random Seed", value=42) enable_cpu_offload = gr.Checkbox(label="Enable CPU Offload", value=False) submit_btn = gr.Button("Generate Video") video_output = gr.Video(label="Generated Video") submit_btn.click( fn=run_inference, inputs=[user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload], outputs=video_output ) # Launch the interface demo.launch(show_error=True, show_api=False)