Martim-Ramos-Neural's picture
nada
e74157f
import spaces
import imageio
import os
import gradio as gr
from subprocess import getoutput
from diffusers.schedulers import EulerAncestralDiscreteScheduler
from transformers import T5EncoderModel, T5Tokenizer
from allegro.pipelines.pipeline_allegro import AllegroPipeline
from allegro.models.vae.vae_allegro import AllegroAutoencoderKL3D
from allegro.models.transformers.transformer_3d_allegro import AllegroTransformer3DModel
from huggingface_hub import snapshot_download
# Ensure the weights directory exists
weights_dir = './allegro_weights'
os.makedirs(weights_dir, exist_ok=True)
print(f"Downloading models to {weights_dir}...")
snapshot_download(
repo_id='rhymes-ai/Allegro',
local_dir=weights_dir,
allow_patterns=['**']
)
print(f"Downloaded models to {weights_dir}.")
# Check if the directories exist
required_dirs = ['vae', 'text_encoder', 'tokenizer', 'scheduler', 'transformer']
missing_dirs = [d for d in required_dirs if not os.path.exists(os.path.join(weights_dir, d))]
if missing_dirs:
print(f"Missing directories: {missing_dirs}")
raise FileNotFoundError(f"The following directories are missing in '{weights_dir}': {missing_dirs}")
else:
print(f"All required directories are present in {weights_dir}.")
# Check directory structure
required_dirs = ['vae', 'text_encoder', 'tokenizer', 'scheduler', 'transformer']
missing_dirs = [d for d in required_dirs if not os.path.exists(os.path.join(weights_dir, d))]
if missing_dirs:
raise FileNotFoundError(f"The following directories are missing in '{weights_dir}': {missing_dirs}")
@spaces.GPU(duration=120) # Request GPU for the entire process
def process_pipeline(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload):
# is_gpu_associated = torch.cuda.is_available()
# if is_gpu_associated:
# gpu_info = getoutput('nvidia-smi')
# print(f"GPU Info: {gpu_info}")
# Define dtype
dtype = torch.bfloat16
# Load models
vae = AllegroAutoencoderKL3D.from_pretrained(
os.path.join(weights_dir, 'vae/'),
torch_dtype=torch.float32
).cuda()
vae.eval()
text_encoder = T5EncoderModel.from_pretrained(
os.path.join(weights_dir, 'text_encoder/'),
torch_dtype=dtype
).eval()
tokenizer = T5Tokenizer.from_pretrained(os.path.join(weights_dir, 'tokenizer/'))
scheduler = EulerAncestralDiscreteScheduler()
transformer = AllegroTransformer3DModel.from_pretrained(
os.path.join(weights_dir, 'transformer/'),
torch_dtype=dtype
).cuda()
transformer.eval()
allegro_pipeline = AllegroPipeline(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
scheduler=scheduler,
transformer=transformer
).to("cuda:0")
# Prompts
positive_prompt = """
(masterpiece), (best quality), (ultra-detailed), (unwatermarked),
{}
emotional, harmonious, vignette, 4k epic detailed, shot on kodak, 35mm photo,
sharp focus, high budget, cinemascope, moody, epic, gorgeous
"""
negative_prompt = """
nsfw, lowres, bad anatomy, bad hands, text, error, missing fingers, extra digit, fewer digits, cropped, worst quality,
low quality, normal quality, jpeg artifacts, signature, watermark, username, blurry.
"""
# Format the user prompt
user_prompt = positive_prompt.format(user_prompt.lower().strip())
if enable_cpu_offload:
allegro_pipeline.enable_sequential_cpu_offload()
# Generate the video
out_video = allegro_pipeline(
user_prompt,
negative_prompt=negative_prompt,
num_frames=30,
height=360,
width=640,
num_inference_steps=num_sampling_steps,
guidance_scale=guidance_scale,
max_sequence_length=512,
generator=torch.Generator(device="cuda:0").manual_seed(seed)
).video[0]
# Save the video
save_path = "./output_videos/generated_video.mp4"
os.makedirs(os.path.dirname(save_path), exist_ok=True)
imageio.mimwrite(save_path, out_video, fps=15, quality=8)
return save_path
# Gradio interface function
def run_inference(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload, progress=gr.Progress(track_tqdm=True)):
result_path = process_pipeline(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload)
return result_path
css = """
div#col-container {
margin: 0 auto;
max-width: 800px;
}
"""
# Create Gradio interface
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.Markdown("# Allegro Video Generation")
user_prompt = gr.Textbox(label="User Prompt")
with gr.Row():
guidance_scale = gr.Slider(minimum=0, maximum=20, step=0.1, label="Guidance Scale", value=7.5)
num_sampling_steps = gr.Slider(minimum=10, maximum=100, step=1, label="Number of Sampling Steps", value=20)
with gr.Row():
seed = gr.Slider(minimum=0, maximum=10000, step=1, label="Random Seed", value=42)
enable_cpu_offload = gr.Checkbox(label="Enable CPU Offload", value=False)
submit_btn = gr.Button("Generate Video")
video_output = gr.Video(label="Generated Video")
submit_btn.click(
fn=run_inference,
inputs=[user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload],
outputs=video_output
)
# Launch the interface
demo.launch(show_error=True, show_api=False)