SpiralSense / genetric_algorithm.py
cycool29's picture
Update
97dcf92
raw
history blame
7.86 kB
import os
import optuna
from optuna.trial import TrialState
import torch
import torch.nn as nn
import torch.optim as optim
from configs import *
import data_loader
from torch.utils.tensorboard import SummaryWriter
import numpy as np
import pygad
import pygad.torchga
torch.cuda.empty_cache()
model = MODEL.to(DEVICE)
EPOCHS = 10
N_TRIALS = 20
TIMEOUT = 1800
EARLY_STOPPING_PATIENCE = (
4 # Number of epochs with no improvement to trigger early stopping
)
NUM_GENERATIONS = 10
SOL_PER_POP = 10 # Number of solutions in the population
NUM_GENES = 2
NUM_PARENTS_MATING = 4
# Create a TensorBoard writer
writer = SummaryWriter(log_dir="output/tensorboard/tuning")
# Function to create or modify data loaders with the specified batch size
def create_data_loaders(batch_size):
train_loader, valid_loader = data_loader.load_data(
COMBINED_DATA_DIR + "1",
preprocess,
batch_size=batch_size,
)
return train_loader, valid_loader
# Objective function for optimization
def objective(trial):
global data_inputs, data_outputs
batch_size = trial.suggest_categorical("batch_size", [16, 32, 64])
train_loader, valid_loader = create_data_loaders(batch_size)
lr = trial.suggest_float("lr", 1e-5, 1e-3, log=True)
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
gamma = trial.suggest_float("gamma", 0.1, 0.9, step=0.1)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)
past_trials = 0 # Number of trials already completed
# Print best hyperparameters:
if past_trials > 0:
print("\nBest Hyperparameters:")
print(f"{study.best_trial.params}")
print(f"\n[INFO] Trial: {trial.number}")
print(f"Batch Size: {batch_size}")
print(f"Learning Rate: {lr}")
print(f"Gamma: {gamma}\n")
early_stopping_counter = 0
best_accuracy = 0.0
for epoch in range(EPOCHS):
model.train()
for batch_idx, (data, target) in enumerate(train_loader, 0):
data, target = data.to(DEVICE), target.to(DEVICE)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
scheduler.step()
model.eval()
correct = 0
with torch.no_grad():
for batch_idx, (data, target) in enumerate(valid_loader, 0):
data, target = data.to(DEVICE), target.to(DEVICE)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
accuracy = correct / len(valid_loader.dataset)
# Log hyperparameters and accuracy to TensorBoard
writer.add_scalar("Accuracy", accuracy, trial.number)
writer.add_hparams(
{"batch_size": batch_size, "lr": lr, "gamma": gamma},
{"accuracy": accuracy},
)
print(f"[EPOCH {epoch + 1}] Accuracy: {accuracy:.4f}")
trial.report(accuracy, epoch)
if accuracy > best_accuracy:
best_accuracy = accuracy
early_stopping_counter = 0
else:
early_stopping_counter += 1
# Early stopping check
if early_stopping_counter >= EARLY_STOPPING_PATIENCE:
print(f"\nEarly stopping at epoch {epoch + 1}")
break
if trial.number > 10 and trial.params["lr"] < 1e-3 and best_accuracy < 0.7:
return float("inf")
past_trials += 1
return best_accuracy
# Custom genetic algorithm
def run_genetic_algorithm(fitness_func):
# Initial population
population = np.random.rand(SOL_PER_POP, NUM_GENES) # Random initialization
# Run for a fixed number of generations
for generation in range(NUM_GENERATIONS):
# Calculate fitness for each solution in the population
fitness = np.array(
[fitness_func(solution, idx) for idx, solution in enumerate(population)]
)
# Get the index of the best solution
best_idx = np.argmax(fitness)
best_solution = population[best_idx]
best_fitness = fitness[best_idx]
# Print the best solution and fitness for this generation
print(f"Generation {generation + 1}:")
print("Best Solution:")
print("Learning Rate = {lr}".format(lr=best_solution[0]))
print("Gamma = {gamma}".format(gamma=best_solution[1]))
print("Best Fitness = {fitness}".format(fitness=best_fitness))
# Perform selection and crossover to create the next generation
population = selection_and_crossover(population, fitness)
# Selection and crossover logic
def selection_and_crossover(population, fitness):
# Perform tournament selection
parents = []
for _ in range(SOL_PER_POP):
tournament_idxs = np.random.choice(range(SOL_PER_POP), NUM_PARENTS_MATING)
tournament_fitness = [fitness[idx] for idx in tournament_idxs]
selected_parent_idx = tournament_idxs[np.argmax(tournament_fitness)]
parents.append(population[selected_parent_idx])
# Perform single-point crossover
offspring = []
for i in range(0, SOL_PER_POP, 2):
if i + 1 < SOL_PER_POP:
crossover_point = np.random.randint(0, NUM_GENES)
offspring.extend(
[
np.concatenate(
(parents[i][:crossover_point], parents[i + 1][crossover_point:])
)
]
)
offspring.extend(
[
np.concatenate(
(parents[i + 1][:crossover_point], parents[i][crossover_point:])
)
]
)
return np.array(offspring)
# Modify callback function to log best accuracy
def callback_generation(ga_instance):
global study
# Fetch the parameters of the best solution
solution, solution_fitness, _ = ga_instance.best_solution()
best_learning_rate, best_gamma = solution
# Report the best accuracy to Optuna study
study.set_user_attr("best_accuracy", solution_fitness)
# Print generation number and best fitness
print(
"Generation = {generation}".format(generation=ga_instance.generations_completed)
)
print("Best Fitness = {fitness}".format(fitness=solution_fitness))
print("Best Learning Rate = {lr}".format(lr=best_learning_rate))
print("Best Gamma = {gamma}".format(gamma=best_gamma))
if __name__ == "__main__":
global study
pruner = optuna.pruners.HyperbandPruner()
study = optuna.create_study(
direction="maximize",
pruner=pruner,
study_name="hyperparameter_tuning",
)
# Define data_inputs and data_outputs
# You need to populate these with your own data
# Define the loss function
loss_function = nn.CrossEntropyLoss()
def fitness_func(solution, sol_idx):
global data_inputs, data_outputs, model, loss_function
learning_rate, momentum = solution
# Update optimizer with the current learning rate and momentum
optimizer = torch.optim.SGD(
model.parameters(), lr=learning_rate, momentum=momentum
)
# Load the model weights
model_weights_dict = pygad.torchga.model_weights_as_dict(
model=model, weights_vector=solution
)
model.load_state_dict(model_weights_dict)
# Forward pass
predictions = model(data_inputs)
# Calculate cross-entropy loss
loss = loss_function(predictions, data_outputs)
# Higher fitness for lower loss
solution_fitness = 1.0 / (loss.detach().numpy() + 1e-8)
return solution_fitness
# Run the custom genetic algorithm
run_genetic_algorithm(fitness_func)