File size: 5,484 Bytes
a7c2aac
c967e90
3f16334
c967e90
 
 
 
 
 
 
 
 
e7124b2
c967e90
 
 
0b5ae0b
 
 
 
 
d7630b8
 
0b5ae0b
b202b74
 
 
 
 
 
 
 
 
c967e90
e74157f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c967e90
 
e74157f
c967e90
e74157f
c967e90
e74157f
 
c967e90
e7124b2
e74157f
e7124b2
 
c967e90
e7124b2
c967e90
 
 
e7124b2
e74157f
e7124b2
e74157f
 
c967e90
 
 
 
 
 
 
 
 
f2e9fca
c967e90
 
 
 
 
 
 
 
 
 
 
 
f2e9fca
c967e90
 
 
 
 
f2e9fca
c967e90
e74157f
 
a7c2aac
 
 
c967e90
 
 
 
 
 
f2e9fca
 
c967e90
 
 
 
 
 
 
e74157f
c967e90
 
f2e9fca
 
c967e90
 
 
 
 
e74157f
c967e90
 
 
f2e9fca
c967e90
f2e9fca
 
c967e90
f2e9fca
 
 
 
c967e90
 
 
 
 
 
 
 
e74157f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import spaces
import imageio
import os
import gradio as gr
from subprocess import getoutput
from diffusers.schedulers import EulerAncestralDiscreteScheduler
from transformers import T5EncoderModel, T5Tokenizer
from allegro.pipelines.pipeline_allegro import AllegroPipeline
from allegro.models.vae.vae_allegro import AllegroAutoencoderKL3D
from allegro.models.transformers.transformer_3d_allegro import AllegroTransformer3DModel
from huggingface_hub import snapshot_download

# Ensure the weights directory exists
weights_dir = './allegro_weights'
os.makedirs(weights_dir, exist_ok=True)

print(f"Downloading models to {weights_dir}...")
snapshot_download(
    repo_id='rhymes-ai/Allegro',
    local_dir=weights_dir,
    allow_patterns=['**']
)

print(f"Downloaded models to {weights_dir}.")

# Check if the directories exist
required_dirs = ['vae', 'text_encoder', 'tokenizer', 'scheduler', 'transformer']
missing_dirs = [d for d in required_dirs if not os.path.exists(os.path.join(weights_dir, d))]
if missing_dirs:
    print(f"Missing directories: {missing_dirs}")
    raise FileNotFoundError(f"The following directories are missing in '{weights_dir}': {missing_dirs}")
else:
    print(f"All required directories are present in {weights_dir}.")



# Check directory structure
required_dirs = ['vae', 'text_encoder', 'tokenizer', 'scheduler', 'transformer']
missing_dirs = [d for d in required_dirs if not os.path.exists(os.path.join(weights_dir, d))]
if missing_dirs:
    raise FileNotFoundError(f"The following directories are missing in '{weights_dir}': {missing_dirs}")

@spaces.GPU(duration=120)  # Request GPU for the entire process
def process_pipeline(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload):
#    is_gpu_associated = torch.cuda.is_available()
#    if is_gpu_associated:
#        gpu_info = getoutput('nvidia-smi')
#        print(f"GPU Info: {gpu_info}")

    # Define dtype
    dtype = torch.bfloat16

    # Load models
    vae = AllegroAutoencoderKL3D.from_pretrained(
        os.path.join(weights_dir, 'vae/'), 
        torch_dtype=torch.float32
    ).cuda()
    vae.eval()

    text_encoder = T5EncoderModel.from_pretrained(
        os.path.join(weights_dir, 'text_encoder/'), 
        torch_dtype=dtype
    ).eval()

    tokenizer = T5Tokenizer.from_pretrained(os.path.join(weights_dir, 'tokenizer/'))

    scheduler = EulerAncestralDiscreteScheduler()

    transformer = AllegroTransformer3DModel.from_pretrained(
        os.path.join(weights_dir, 'transformer/'), 
        torch_dtype=dtype
    ).cuda()
    transformer.eval()

    allegro_pipeline = AllegroPipeline(
        vae=vae,
        text_encoder=text_encoder,
        tokenizer=tokenizer,
        scheduler=scheduler,
        transformer=transformer
    ).to("cuda:0")

    # Prompts
    positive_prompt = """
    (masterpiece), (best quality), (ultra-detailed), (unwatermarked), 
    {} 
    emotional, harmonious, vignette, 4k epic detailed, shot on kodak, 35mm photo, 
    sharp focus, high budget, cinemascope, moody, epic, gorgeous
    """

    negative_prompt = """
    nsfw, lowres, bad anatomy, bad hands, text, error, missing fingers, extra digit, fewer digits, cropped, worst quality, 
    low quality, normal quality, jpeg artifacts, signature, watermark, username, blurry.
    """

    # Format the user prompt
    user_prompt = positive_prompt.format(user_prompt.lower().strip())

    if enable_cpu_offload:
        allegro_pipeline.enable_sequential_cpu_offload()

    # Generate the video
    out_video = allegro_pipeline(
        user_prompt, 
        negative_prompt=negative_prompt, 
        num_frames=30,
        height=360,
        width=640,
        num_inference_steps=num_sampling_steps,
        guidance_scale=guidance_scale,
        max_sequence_length=512,
        generator=torch.Generator(device="cuda:0").manual_seed(seed)
    ).video[0]

    # Save the video
    save_path = "./output_videos/generated_video.mp4"
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    imageio.mimwrite(save_path, out_video, fps=15, quality=8)

    return save_path

# Gradio interface function
def run_inference(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload, progress=gr.Progress(track_tqdm=True)):
    result_path = process_pipeline(user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload)
    return result_path

css = """
div#col-container {
    margin: 0 auto;
    max-width: 800px;
}
"""

# Create Gradio interface
with gr.Blocks(css=css) as demo:
    with gr.Column(elem_id="col-container"):
        gr.Markdown("# Allegro Video Generation")
        user_prompt = gr.Textbox(label="User Prompt")
        with gr.Row():
            guidance_scale = gr.Slider(minimum=0, maximum=20, step=0.1, label="Guidance Scale", value=7.5)
            num_sampling_steps = gr.Slider(minimum=10, maximum=100, step=1, label="Number of Sampling Steps", value=20)
        with gr.Row():
            seed = gr.Slider(minimum=0, maximum=10000, step=1, label="Random Seed", value=42)
            enable_cpu_offload = gr.Checkbox(label="Enable CPU Offload", value=False)
        submit_btn = gr.Button("Generate Video")
        video_output = gr.Video(label="Generated Video")

    submit_btn.click(
        fn=run_inference,
        inputs=[user_prompt, guidance_scale, num_sampling_steps, seed, enable_cpu_offload],
        outputs=video_output
    )

# Launch the interface
demo.launch(show_error=True, show_api=False)