Spaces:
Running
on
Zero
Running
on
Zero
File size: 5,484 Bytes
a7c2aac c967e90 3f16334 c967e90 e7124b2 c967e90 0b5ae0b d7630b8 0b5ae0b b202b74 c967e90 e74157f c967e90 e74157f c967e90 e74157f c967e90 e74157f c967e90 e7124b2 e74157f e7124b2 c967e90 e7124b2 c967e90 e7124b2 e74157f e7124b2 e74157f c967e90 f2e9fca c967e90 f2e9fca c967e90 f2e9fca c967e90 e74157f a7c2aac c967e90 f2e9fca c967e90 e74157f c967e90 f2e9fca c967e90 e74157f c967e90 f2e9fca c967e90 f2e9fca c967e90 f2e9fca c967e90 e74157f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
import spaces
import imageio
import os
import gradio as gr
from subprocess import getoutput
from diffusers.schedulers import EulerAncestralDiscreteScheduler
from transformers import T5EncoderModel, T5Tokenizer
from allegro.pipelines.pipeline_allegro import AllegroPipeline
from allegro.models.vae.vae_allegro import AllegroAutoencoderKL3D
from allegro.models.transformers.transformer_3d_allegro import AllegroTransformer3DModel
from huggingface_hub import snapshot_download
# Ensure the weights directory exists
weights_dir = './allegro_weights'
os.makedirs(weights_dir, exist_ok=True)
print(f"Downloading models to {weights_dir}...")
snapshot_download(
repo_id='rhymes-ai/Allegro',
local_dir=weights_dir,
allow_patterns=['**']
)
print(f"Downloaded models to {weights_dir}.")
# Check if the directories exist
required_dirs = ['vae', 'text_encoder', 'tokenizer', 'scheduler', 'transformer']
missing_dirs = [d for d in required_dirs if not os.path.exists(os.path.join(weights_dir, d))]
if missing_dirs:
print(f"Missing directories: {missing_dirs}")
raise FileNotFoundError(f"The following directories are missing in '{weights_dir}': {missing_dirs}")
else:
print(f"All required directories are present in {weights_dir}.")
# Check directory structure
required_dirs = ['vae', 'text_encoder', 'tokenizer', 'scheduler', 'transformer']
missing_dirs = [d for d in required_dirs if not os.path.exists(os.path.join(weights_dir, d))]
if missing_dirs:
raise FileNotFoundError(f"The following directories are missing in '{weights_dir}': {missing_dirs}")
@spaces.GPU(duration=120) # Request GPU for the entire process
def process_pipeline(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload):
# is_gpu_associated = torch.cuda.is_available()
# if is_gpu_associated:
# gpu_info = getoutput('nvidia-smi')
# print(f"GPU Info: {gpu_info}")
# Define dtype
dtype = torch.bfloat16
# Load models
vae = AllegroAutoencoderKL3D.from_pretrained(
os.path.join(weights_dir, 'vae/'),
torch_dtype=torch.float32
).cuda()
vae.eval()
text_encoder = T5EncoderModel.from_pretrained(
os.path.join(weights_dir, 'text_encoder/'),
torch_dtype=dtype
).eval()
tokenizer = T5Tokenizer.from_pretrained(os.path.join(weights_dir, 'tokenizer/'))
scheduler = EulerAncestralDiscreteScheduler()
transformer = AllegroTransformer3DModel.from_pretrained(
os.path.join(weights_dir, 'transformer/'),
torch_dtype=dtype
).cuda()
transformer.eval()
allegro_pipeline = AllegroPipeline(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
scheduler=scheduler,
transformer=transformer
).to("cuda:0")
# Prompts
positive_prompt = """
(masterpiece), (best quality), (ultra-detailed), (unwatermarked),
{}
emotional, harmonious, vignette, 4k epic detailed, shot on kodak, 35mm photo,
sharp focus, high budget, cinemascope, moody, epic, gorgeous
"""
negative_prompt = """
nsfw, lowres, bad anatomy, bad hands, text, error, missing fingers, extra digit, fewer digits, cropped, worst quality,
low quality, normal quality, jpeg artifacts, signature, watermark, username, blurry.
"""
# Format the user prompt
user_prompt = positive_prompt.format(user_prompt.lower().strip())
if enable_cpu_offload:
allegro_pipeline.enable_sequential_cpu_offload()
# Generate the video
out_video = allegro_pipeline(
user_prompt,
negative_prompt=negative_prompt,
num_frames=30,
height=360,
width=640,
num_inference_steps=num_sampling_steps,
guidance_scale=guidance_scale,
max_sequence_length=512,
generator=torch.Generator(device="cuda:0").manual_seed(seed)
).video[0]
# Save the video
save_path = "./output_videos/generated_video.mp4"
os.makedirs(os.path.dirname(save_path), exist_ok=True)
imageio.mimwrite(save_path, out_video, fps=15, quality=8)
return save_path
# Gradio interface function
def run_inference(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload, progress=gr.Progress(track_tqdm=True)):
result_path = process_pipeline(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload)
return result_path
css = """
div#col-container {
margin: 0 auto;
max-width: 800px;
}
"""
# Create Gradio interface
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.Markdown("# Allegro Video Generation")
user_prompt = gr.Textbox(label="User Prompt")
with gr.Row():
guidance_scale = gr.Slider(minimum=0, maximum=20, step=0.1, label="Guidance Scale", value=7.5)
num_sampling_steps = gr.Slider(minimum=10, maximum=100, step=1, label="Number of Sampling Steps", value=20)
with gr.Row():
seed = gr.Slider(minimum=0, maximum=10000, step=1, label="Random Seed", value=42)
enable_cpu_offload = gr.Checkbox(label="Enable CPU Offload", value=False)
submit_btn = gr.Button("Generate Video")
video_output = gr.Video(label="Generated Video")
submit_btn.click(
fn=run_inference,
inputs=[user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload],
outputs=video_output
)
# Launch the interface
demo.launch(show_error=True, show_api=False) |