Spaces:
Runtime error
Runtime error
import os | |
import optuna | |
from optuna.trial import TrialState | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from configs import * | |
import data_loader | |
from torch.utils.tensorboard import SummaryWriter | |
import numpy as np | |
import pygad | |
import pygad.torchga | |
torch.cuda.empty_cache() | |
model = MODEL.to(DEVICE) | |
EPOCHS = 10 | |
N_TRIALS = 20 | |
TIMEOUT = 1800 | |
EARLY_STOPPING_PATIENCE = ( | |
4 # Number of epochs with no improvement to trigger early stopping | |
) | |
NUM_GENERATIONS = 10 | |
SOL_PER_POP = 10 # Number of solutions in the population | |
NUM_GENES = 2 | |
NUM_PARENTS_MATING = 4 | |
# Create a TensorBoard writer | |
writer = SummaryWriter(log_dir="output/tensorboard/tuning") | |
# Function to create or modify data loaders with the specified batch size | |
def create_data_loaders(batch_size): | |
train_loader, valid_loader = data_loader.load_data( | |
COMBINED_DATA_DIR + "1", | |
preprocess, | |
batch_size=batch_size, | |
) | |
return train_loader, valid_loader | |
# Objective function for optimization | |
def objective(trial): | |
global data_inputs, data_outputs | |
batch_size = trial.suggest_categorical("batch_size", [16, 32, 64]) | |
train_loader, valid_loader = create_data_loaders(batch_size) | |
lr = trial.suggest_float("lr", 1e-5, 1e-3, log=True) | |
optimizer = optim.Adam(model.parameters(), lr=lr) | |
criterion = nn.CrossEntropyLoss() | |
gamma = trial.suggest_float("gamma", 0.1, 0.9, step=0.1) | |
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS) | |
past_trials = 0 # Number of trials already completed | |
# Print best hyperparameters: | |
if past_trials > 0: | |
print("\nBest Hyperparameters:") | |
print(f"{study.best_trial.params}") | |
print(f"\n[INFO] Trial: {trial.number}") | |
print(f"Batch Size: {batch_size}") | |
print(f"Learning Rate: {lr}") | |
print(f"Gamma: {gamma}\n") | |
early_stopping_counter = 0 | |
best_accuracy = 0.0 | |
for epoch in range(EPOCHS): | |
model.train() | |
for batch_idx, (data, target) in enumerate(train_loader, 0): | |
data, target = data.to(DEVICE), target.to(DEVICE) | |
optimizer.zero_grad() | |
output = model(data) | |
loss = criterion(output, target) | |
loss.backward() | |
optimizer.step() | |
scheduler.step() | |
model.eval() | |
correct = 0 | |
with torch.no_grad(): | |
for batch_idx, (data, target) in enumerate(valid_loader, 0): | |
data, target = data.to(DEVICE), target.to(DEVICE) | |
output = model(data) | |
pred = output.argmax(dim=1, keepdim=True) | |
correct += pred.eq(target.view_as(pred)).sum().item() | |
accuracy = correct / len(valid_loader.dataset) | |
# Log hyperparameters and accuracy to TensorBoard | |
writer.add_scalar("Accuracy", accuracy, trial.number) | |
writer.add_hparams( | |
{"batch_size": batch_size, "lr": lr, "gamma": gamma}, | |
{"accuracy": accuracy}, | |
) | |
print(f"[EPOCH {epoch + 1}] Accuracy: {accuracy:.4f}") | |
trial.report(accuracy, epoch) | |
if accuracy > best_accuracy: | |
best_accuracy = accuracy | |
early_stopping_counter = 0 | |
else: | |
early_stopping_counter += 1 | |
# Early stopping check | |
if early_stopping_counter >= EARLY_STOPPING_PATIENCE: | |
print(f"\nEarly stopping at epoch {epoch + 1}") | |
break | |
if trial.number > 10 and trial.params["lr"] < 1e-3 and best_accuracy < 0.7: | |
return float("inf") | |
past_trials += 1 | |
return best_accuracy | |
# Custom genetic algorithm | |
def run_genetic_algorithm(fitness_func): | |
# Initial population | |
population = np.random.rand(SOL_PER_POP, NUM_GENES) # Random initialization | |
# Run for a fixed number of generations | |
for generation in range(NUM_GENERATIONS): | |
# Calculate fitness for each solution in the population | |
fitness = np.array( | |
[fitness_func(solution, idx) for idx, solution in enumerate(population)] | |
) | |
# Get the index of the best solution | |
best_idx = np.argmax(fitness) | |
best_solution = population[best_idx] | |
best_fitness = fitness[best_idx] | |
# Print the best solution and fitness for this generation | |
print(f"Generation {generation + 1}:") | |
print("Best Solution:") | |
print("Learning Rate = {lr}".format(lr=best_solution[0])) | |
print("Gamma = {gamma}".format(gamma=best_solution[1])) | |
print("Best Fitness = {fitness}".format(fitness=best_fitness)) | |
# Perform selection and crossover to create the next generation | |
population = selection_and_crossover(population, fitness) | |
# Selection and crossover logic | |
def selection_and_crossover(population, fitness): | |
# Perform tournament selection | |
parents = [] | |
for _ in range(SOL_PER_POP): | |
tournament_idxs = np.random.choice(range(SOL_PER_POP), NUM_PARENTS_MATING) | |
tournament_fitness = [fitness[idx] for idx in tournament_idxs] | |
selected_parent_idx = tournament_idxs[np.argmax(tournament_fitness)] | |
parents.append(population[selected_parent_idx]) | |
# Perform single-point crossover | |
offspring = [] | |
for i in range(0, SOL_PER_POP, 2): | |
if i + 1 < SOL_PER_POP: | |
crossover_point = np.random.randint(0, NUM_GENES) | |
offspring.extend( | |
[ | |
np.concatenate( | |
(parents[i][:crossover_point], parents[i + 1][crossover_point:]) | |
) | |
] | |
) | |
offspring.extend( | |
[ | |
np.concatenate( | |
(parents[i + 1][:crossover_point], parents[i][crossover_point:]) | |
) | |
] | |
) | |
return np.array(offspring) | |
# Modify callback function to log best accuracy | |
def callback_generation(ga_instance): | |
global study | |
# Fetch the parameters of the best solution | |
solution, solution_fitness, _ = ga_instance.best_solution() | |
best_learning_rate, best_gamma = solution | |
# Report the best accuracy to Optuna study | |
study.set_user_attr("best_accuracy", solution_fitness) | |
# Print generation number and best fitness | |
print( | |
"Generation = {generation}".format(generation=ga_instance.generations_completed) | |
) | |
print("Best Fitness = {fitness}".format(fitness=solution_fitness)) | |
print("Best Learning Rate = {lr}".format(lr=best_learning_rate)) | |
print("Best Gamma = {gamma}".format(gamma=best_gamma)) | |
if __name__ == "__main__": | |
global study | |
pruner = optuna.pruners.HyperbandPruner() | |
study = optuna.create_study( | |
direction="maximize", | |
pruner=pruner, | |
study_name="hyperparameter_tuning", | |
) | |
# Define data_inputs and data_outputs | |
# You need to populate these with your own data | |
# Define the loss function | |
loss_function = nn.CrossEntropyLoss() | |
def fitness_func(solution, sol_idx): | |
global data_inputs, data_outputs, model, loss_function | |
learning_rate, momentum = solution | |
# Update optimizer with the current learning rate and momentum | |
optimizer = torch.optim.SGD( | |
model.parameters(), lr=learning_rate, momentum=momentum | |
) | |
# Load the model weights | |
model_weights_dict = pygad.torchga.model_weights_as_dict( | |
model=model, weights_vector=solution | |
) | |
model.load_state_dict(model_weights_dict) | |
# Forward pass | |
predictions = model(data_inputs) | |
# Calculate cross-entropy loss | |
loss = loss_function(predictions, data_outputs) | |
# Higher fitness for lower loss | |
solution_fitness = 1.0 / (loss.detach().numpy() + 1e-8) | |
return solution_fitness | |
# Run the custom genetic algorithm | |
run_genetic_algorithm(fitness_func) | |