Martim-Ramos-Neural's picture
nada
a7c2aac
raw
history blame
5.48 kB
import spaces
import imageio
import os
import gradio as gr
from subprocess import getoutput
from diffusers.schedulers import EulerAncestralDiscreteScheduler
from transformers import T5EncoderModel, T5Tokenizer
from allegro.pipelines.pipeline_allegro import AllegroPipeline
from allegro.models.vae.vae_allegro import AllegroAutoencoderKL3D
from allegro.models.transformers.transformer_3d_allegro import AllegroTransformer3DModel
from huggingface_hub import snapshot_download
# Ensure the weights directory exists
weights_dir = './allegro_weights'
os.makedirs(weights_dir, exist_ok=True)
print(f"Downloading models to {weights_dir}...")
snapshot_download(
repo_id='rhymes-ai/Allegro',
local_dir=weights_dir,
allow_patterns=['**']
)
print(f"Downloaded models to {weights_dir}.")
# Check if the directories exist
required_dirs = ['vae', 'text_encoder', 'tokenizer', 'scheduler', 'transformer']
missing_dirs = [d for d in required_dirs if not os.path.exists(os.path.join(weights_dir, d))]
if missing_dirs:
print(f"Missing directories: {missing_dirs}")
raise FileNotFoundError(f"The following directories are missing in '{weights_dir}': {missing_dirs}")
else:
print(f"All required directories are present in {weights_dir}.")
# Check directory structure
required_dirs = ['vae', 'text_encoder', 'tokenizer', 'scheduler', 'transformer']
missing_dirs = [d for d in required_dirs if not os.path.exists(os.path.join(weights_dir, d))]
if missing_dirs:
raise FileNotFoundError(f"The following directories are missing in '{weights_dir}': {missing_dirs}")
@spaces.GPU(duration=120) # Request GPU for the entire process
def process_pipeline(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload):
# is_gpu_associated = torch.cuda.is_available()
# if is_gpu_associated:
# gpu_info = getoutput('nvidia-smi')
# print(f"GPU Info: {gpu_info}")
# Define dtype
dtype = torch.bfloat16
# Load models
vae = AllegroAutoencoderKL3D.from_pretrained(
os.path.join(weights_dir, 'vae/'),
torch_dtype=torch.float32
).cuda()
vae.eval()
text_encoder = T5EncoderModel.from_pretrained(
os.path.join(weights_dir, 'text_encoder/'),
torch_dtype=dtype
).eval()
tokenizer = T5Tokenizer.from_pretrained(os.path.join(weights_dir, 'tokenizer/'))
scheduler = EulerAncestralDiscreteScheduler()
transformer = AllegroTransformer3DModel.from_pretrained(
os.path.join(weights_dir, 'transformer/'),
torch_dtype=dtype
).cuda()
transformer.eval()
allegro_pipeline = AllegroPipeline(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
scheduler=scheduler,
transformer=transformer
).to("cuda:0")
# Prompts
positive_prompt = """
(masterpiece), (best quality), (ultra-detailed), (unwatermarked),
{}
emotional, harmonious, vignette, 4k epic detailed, shot on kodak, 35mm photo,
sharp focus, high budget, cinemascope, moody, epic, gorgeous
"""
negative_prompt = """
nsfw, lowres, bad anatomy, bad hands, text, error, missing fingers, extra digit, fewer digits, cropped, worst quality,
low quality, normal quality, jpeg artifacts, signature, watermark, username, blurry.
"""
# Format the user prompt
user_prompt = positive_prompt.format(user_prompt.lower().strip())
if enable_cpu_offload:
allegro_pipeline.enable_sequential_cpu_offload()
# Generate the video
out_video = allegro_pipeline(
user_prompt,
negative_prompt=negative_prompt,
num_frames=30,
height=360,
width=640,
num_inference_steps=num_sampling_steps,
guidance_scale=guidance_scale,
max_sequence_length=512,
generator=torch.Generator(device="cuda:0").manual_seed(seed)
).video[0]
# Save the video
save_path = "./output_videos/generated_video.mp4"
os.makedirs(os.path.dirname(save_path), exist_ok=True)
imageio.mimwrite(save_path, out_video, fps=15, quality=8)
return save_path
# Gradio interface function
def run_inference(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload, progress=gr.Progress(track_tqdm=True)):
result_path = process_pipeline(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload)
return result_path
css = """
div#col-container {
margin: 0 auto;
max-width: 800px;
}
"""
# Create Gradio interface
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.Markdown("# Allegro Video Generation")
user_prompt = gr.Textbox(label="User Prompt")
with gr.Row():
guidance_scale = gr.Slider(minimum=0, maximum=20, step=0.1, label="Guidance Scale", value=7.5)
num_sampling_steps = gr.Slider(minimum=10, maximum=100, step=1, label="Number of Sampling Steps", value=20)
with gr.Row():
seed = gr.Slider(minimum=0, maximum=10000, step=1, label="Random Seed", value=42)
enable_cpu_offload = gr.Checkbox(label="Enable CPU Offload", value=False)
submit_btn = gr.Button("Generate Video")
video_output = gr.Video(label="Generated Video")
submit_btn.click(
fn=run_inference,
inputs=[user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload],
outputs=video_output
)
# Launch the interface
demo.launch(show_error=True, show_api=False)