Spaces:
Running
on
Zero
Running
on
Zero
import spaces | |
import imageio | |
import os | |
import gradio as gr | |
from subprocess import getoutput | |
from diffusers.schedulers import EulerAncestralDiscreteScheduler | |
from transformers import T5EncoderModel, T5Tokenizer | |
from allegro.pipelines.pipeline_allegro import AllegroPipeline | |
from allegro.models.vae.vae_allegro import AllegroAutoencoderKL3D | |
from allegro.models.transformers.transformer_3d_allegro import AllegroTransformer3DModel | |
from huggingface_hub import snapshot_download | |
# Ensure the weights directory exists | |
weights_dir = './allegro_weights' | |
os.makedirs(weights_dir, exist_ok=True) | |
print(f"Downloading models to {weights_dir}...") | |
snapshot_download( | |
repo_id='rhymes-ai/Allegro', | |
local_dir=weights_dir, | |
allow_patterns=['**'] | |
) | |
print(f"Downloaded models to {weights_dir}.") | |
# Check if the directories exist | |
required_dirs = ['vae', 'text_encoder', 'tokenizer', 'scheduler', 'transformer'] | |
missing_dirs = [d for d in required_dirs if not os.path.exists(os.path.join(weights_dir, d))] | |
if missing_dirs: | |
print(f"Missing directories: {missing_dirs}") | |
raise FileNotFoundError(f"The following directories are missing in '{weights_dir}': {missing_dirs}") | |
else: | |
print(f"All required directories are present in {weights_dir}.") | |
# Check directory structure | |
required_dirs = ['vae', 'text_encoder', 'tokenizer', 'scheduler', 'transformer'] | |
missing_dirs = [d for d in required_dirs if not os.path.exists(os.path.join(weights_dir, d))] | |
if missing_dirs: | |
raise FileNotFoundError(f"The following directories are missing in '{weights_dir}': {missing_dirs}") | |
# Request GPU for the entire process | |
def process_pipeline(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload): | |
# is_gpu_associated = torch.cuda.is_available() | |
# if is_gpu_associated: | |
# gpu_info = getoutput('nvidia-smi') | |
# print(f"GPU Info: {gpu_info}") | |
# Define dtype | |
dtype = torch.bfloat16 | |
# Load models | |
vae = AllegroAutoencoderKL3D.from_pretrained( | |
os.path.join(weights_dir, 'vae/'), | |
torch_dtype=torch.float32 | |
).cuda() | |
vae.eval() | |
text_encoder = T5EncoderModel.from_pretrained( | |
os.path.join(weights_dir, 'text_encoder/'), | |
torch_dtype=dtype | |
).eval() | |
tokenizer = T5Tokenizer.from_pretrained(os.path.join(weights_dir, 'tokenizer/')) | |
scheduler = EulerAncestralDiscreteScheduler() | |
transformer = AllegroTransformer3DModel.from_pretrained( | |
os.path.join(weights_dir, 'transformer/'), | |
torch_dtype=dtype | |
).cuda() | |
transformer.eval() | |
allegro_pipeline = AllegroPipeline( | |
vae=vae, | |
text_encoder=text_encoder, | |
tokenizer=tokenizer, | |
scheduler=scheduler, | |
transformer=transformer | |
).to("cuda:0") | |
# Prompts | |
positive_prompt = """ | |
(masterpiece), (best quality), (ultra-detailed), (unwatermarked), | |
{} | |
emotional, harmonious, vignette, 4k epic detailed, shot on kodak, 35mm photo, | |
sharp focus, high budget, cinemascope, moody, epic, gorgeous | |
""" | |
negative_prompt = """ | |
nsfw, lowres, bad anatomy, bad hands, text, error, missing fingers, extra digit, fewer digits, cropped, worst quality, | |
low quality, normal quality, jpeg artifacts, signature, watermark, username, blurry. | |
""" | |
# Format the user prompt | |
user_prompt = positive_prompt.format(user_prompt.lower().strip()) | |
if enable_cpu_offload: | |
allegro_pipeline.enable_sequential_cpu_offload() | |
# Generate the video | |
out_video = allegro_pipeline( | |
user_prompt, | |
negative_prompt=negative_prompt, | |
num_frames=30, | |
height=360, | |
width=640, | |
num_inference_steps=num_sampling_steps, | |
guidance_scale=guidance_scale, | |
max_sequence_length=512, | |
generator=torch.Generator(device="cuda:0").manual_seed(seed) | |
).video[0] | |
# Save the video | |
save_path = "./output_videos/generated_video.mp4" | |
os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
imageio.mimwrite(save_path, out_video, fps=15, quality=8) | |
return save_path | |
# Gradio interface function | |
def run_inference(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload, progress=gr.Progress(track_tqdm=True)): | |
result_path = process_pipeline(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload) | |
return result_path | |
css = """ | |
div#col-container { | |
margin: 0 auto; | |
max-width: 800px; | |
} | |
""" | |
# Create Gradio interface | |
with gr.Blocks(css=css) as demo: | |
with gr.Column(elem_id="col-container"): | |
gr.Markdown("# Allegro Video Generation") | |
user_prompt = gr.Textbox(label="User Prompt") | |
with gr.Row(): | |
guidance_scale = gr.Slider(minimum=0, maximum=20, step=0.1, label="Guidance Scale", value=7.5) | |
num_sampling_steps = gr.Slider(minimum=10, maximum=100, step=1, label="Number of Sampling Steps", value=20) | |
with gr.Row(): | |
seed = gr.Slider(minimum=0, maximum=10000, step=1, label="Random Seed", value=42) | |
enable_cpu_offload = gr.Checkbox(label="Enable CPU Offload", value=False) | |
submit_btn = gr.Button("Generate Video") | |
video_output = gr.Video(label="Generated Video") | |
submit_btn.click( | |
fn=run_inference, | |
inputs=[user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload], | |
outputs=video_output | |
) | |
# Launch the interface | |
demo.launch(show_error=True, show_api=False) |