Spaces:
Runtime error
Runtime error
from src.cocktails.utilities.cocktail_generation_utilities.individual import * | |
from sklearn.neighbors import NearestNeighbors | |
import time | |
import pickle | |
from src.cocktails.config import COCKTAIL_NN_PATH, COCKTAILS_CSV_DATA | |
class Population: | |
def __init__(self, target, pop_params, target_affective_cluster=None, known_target_dict=None): | |
self.pop_params = pop_params | |
self.pop_size = pop_params['pop_size'] | |
self.nb_elite = pop_params['nb_elites'] | |
self.nb_generations = pop_params['nb_generations'] | |
self.target = target | |
self.mutation_params = pop_params['mutation_params'] | |
self.dist = pop_params['dist'] | |
self.n_neighbors = pop_params['n_neighbors'] | |
self.known_target_dict = known_target_dict | |
with open(COCKTAIL_NN_PATH, 'rb') as f: | |
data = pickle.load(f) | |
self.nn_model_cocktail = data['nn_model'] | |
self.dim_rep_cocktail = data['dim_rep_cocktail'] | |
self.n_cocktails = data['n_cocktails'] | |
self.cocktail_data = pd.read_csv(COCKTAILS_CSV_DATA) | |
if target_affective_cluster is None: | |
cocktail_rep_affective = get_normalized_affective_cocktail_rep_from_normalized_cocktail_rep(target) | |
self.target_affective_cluster = cocktail2affective_cluster(cocktail_rep_affective)[0] | |
else: | |
self.target_affective_cluster = target_affective_cluster | |
self.pop_elite = [] | |
self.pop = [] | |
self.add_target_individual() # create a target individual (not in pop) | |
self.add_nearest_neighbors_in_pop() # add nearest neighbor from dataset into the population | |
# fill population | |
while self.get_pop_size() < self.pop_size: | |
self.add_individual() | |
while len(self.pop_elite) < self.nb_elite: | |
self.pop_elite.append(IndividualCocktail(pop_params=self.pop_params, | |
target=self.target.copy(), | |
target_affective_cluster=self.target_affective_cluster)) | |
self.update_elite_and_get_next_pop() | |
def add_target_individual(self): | |
if self.known_target_dict is not None: | |
genes_presence, genes_quantity = self.get_q_rep(*extract_ingredients(self.known_target_dict['ing_str'])) | |
self.target_individual = IndividualCocktail(pop_params=self.pop_params, | |
target=self.target.copy(), | |
known_target_dict=self.known_target_dict, | |
target_affective_cluster=self.target_affective_cluster, | |
genes_presence=genes_presence, | |
genes_quantity=genes_quantity | |
) | |
else: | |
self.target_individual = None | |
def add_nearest_neighbors_in_pop(self): | |
# add nearest neighbor from dataset into the population | |
if self.n_neighbors > 0: | |
dists, indexes = self.nn_model_cocktail.kneighbors(self.target.reshape(1, -1)) | |
dists, indexes = dists.flatten(), indexes.flatten() | |
first = 1 if dists[0] == 0 else 0 # avoid taking the target when testing with known targets from the dataset | |
indexes = indexes[first:first + self.n_neighbors] | |
self.ing_strs = np.array(self.cocktail_data['ingredients_str'])[indexes] | |
recipes = [extract_ingredients(ing_str) for ing_str in self.ing_strs] | |
for r in recipes: | |
genes_presence, genes_quantity = self.get_q_rep(r[0], r[1]) | |
genes_presence[-1] = 0 # remove water ingredient | |
self.add_individual(genes_presence=genes_presence.copy(), genes_quantity=genes_quantity.copy()) | |
self.nn_recipes = [ind.get_recipe()[3] for ind in self.pop] | |
self.nn_scores = [ind.perf for ind in self.pop] | |
else: | |
self.ing_strs = None | |
def add_individual(self, genes_presence=None, genes_quantity=None): | |
self.pop.append(IndividualCocktail(pop_params=self.pop_params, | |
target=self.target.copy(), | |
target_affective_cluster=self.target_affective_cluster, | |
genes_presence=genes_presence, | |
genes_quantity=genes_quantity)) | |
def get_elite_perf(self): | |
return np.array([e.perf for e in self.pop_elite]) | |
def get_pop_perf(self): | |
return np.array([ind.perf for ind in self.pop]) | |
def update_elite_and_get_next_pop(self): | |
time_dict = dict() | |
init_time = time.time() | |
elite_perfs = self.get_elite_perf() | |
pop_perfs = self.get_pop_perf() | |
all_perfs = np.concatenate([elite_perfs, pop_perfs]) | |
temp_list = self.pop_elite + self.pop | |
time_dict[' get pop perfs'] = [time.time() - init_time] | |
init_time = time.time() | |
# update elite population with new bests | |
indexes_sorted = np.flip(np.argsort(all_perfs)) | |
new_pop_elite = [IndividualCocktail(pop_params=self.pop_params, | |
target=self.target.copy(), | |
target_affective_cluster=self.target_affective_cluster, | |
genes_presence=temp_list[i_new_e].genes_presence.copy(), | |
genes_quantity=temp_list[i_new_e].genes_quantity.copy()) for i_new_e in indexes_sorted[:self.nb_elite]] | |
time_dict[' recreate elite individuals'] = [time.time() - init_time] | |
init_time = time.time() | |
# select parents | |
rank_perfs = np.flip(np.arange(len(temp_list))) | |
sampling_probs = rank_perfs / np.sum(rank_perfs) | |
if self.mutation_params['asexual_rep'] and not self.mutation_params['crossover']: | |
new_pop_indexes = np.random.choice(indexes_sorted, p=sampling_probs, size=self.pop_size) | |
self.pop = [temp_list[i].get_child() for i in new_pop_indexes] | |
elif self.mutation_params['crossover'] and not self.mutation_params['asexual_rep']: | |
self.pop = [] | |
while len(self.pop) < self.pop_size: | |
parents = np.random.choice(indexes_sorted, p=sampling_probs, size=2, replace=False) | |
self.pop.append(temp_list[parents[0]].get_child_with(temp_list[parents[1]])) | |
elif self.mutation_params['crossover'] and self.mutation_params['asexual_rep']: | |
new_pop_indexes = np.random.choice(indexes_sorted, p=sampling_probs, size=self.pop_size//2) | |
time_dict[' choose asexual parent indexes'] = [time.time() - init_time] | |
init_time = time.time() | |
self.pop = [] | |
for i in new_pop_indexes: | |
child, this_time_dict = temp_list[i].get_child() | |
self.pop.append(child) | |
time_dict = self.update_time_dict(time_dict, this_time_dict) | |
time_dict[' get asexual children'] = [time.time() - init_time] | |
init_time = time.time() | |
while len(self.pop) < self.pop_size: | |
parents = np.random.choice(indexes_sorted, p=sampling_probs, size=2, replace=False) | |
child, this_time_dict = temp_list[parents[0]].get_child_with(temp_list[parents[1]]) | |
self.pop.append(child) | |
time_dict = self.update_time_dict(time_dict, this_time_dict) | |
time_dict[' get sexual children'] = [time.time() - init_time] | |
self.pop_elite = new_pop_elite | |
return time_dict | |
def get_pop_size(self): | |
return len(self.pop) | |
def get_q_rep(self, ingredients, quantities): | |
ingredient_q_rep = np.zeros([len(ingredient_list)]) | |
genes_presence = np.zeros([len(ingredient_list)]) | |
for ing, q in zip(ingredients, quantities): | |
ingredient_q_rep[ingredient_list.index(ing)] = q | |
genes_presence[ingredient_list.index(ing)] = 1 | |
return genes_presence.copy(), normalize_ingredient_q_rep(ingredient_q_rep) | |
def get_best_score(self, affective_cluster_check=False): | |
elite_perfs = self.get_elite_perf() | |
pop_perfs = self.get_pop_perf() | |
all_perfs = np.concatenate([elite_perfs, pop_perfs]) | |
temp_list = self.pop_elite + self.pop | |
if affective_cluster_check: | |
indexes = np.array([i for i in range(len(temp_list)) if temp_list[i].does_affective_cluster_match()]) | |
if indexes.size > 0: | |
temp_list = np.array(temp_list)[indexes] | |
all_perfs = all_perfs[indexes] | |
indexes_best = np.flip(np.argsort(all_perfs)) | |
return np.array(all_perfs)[indexes_best], np.array(temp_list)[indexes_best] | |
def update_time_dict(self, main_dict, new_dict): | |
for k in new_dict.keys(): | |
if k in main_dict.keys(): | |
main_dict[k].append(np.sum(new_dict[k])) | |
else: | |
main_dict[k] = [np.sum(new_dict[k])] | |
return main_dict | |
def run_one_generation(self, verbose=True, affective_cluster_check=False): | |
time_dict = dict() | |
init_time = time.time() | |
this_time_dict = self.update_elite_and_get_next_pop() | |
time_dict['update_elite_and_pop'] = [time.time() - init_time] | |
time_dict = self.update_time_dict(time_dict, this_time_dict) | |
init_time = time.time() | |
best_perfs, best_individuals = self.get_best_score(affective_cluster_check) | |
time_dict['get best scores'] = [time.time() - init_time] | |
return best_perfs[0], time_dict | |
def run_evolution(self, verbose=False, print_every=10, affective_cluster_check=False, level=0): | |
best_score = -np.inf | |
time_dict = dict() | |
init_time = time.time() | |
for i in range(self.nb_generations): | |
best_score, this_time_dict = self.run_one_generation(verbose, affective_cluster_check=affective_cluster_check) | |
time_dict = self.update_time_dict(time_dict, this_time_dict) | |
if verbose and (i+1) % print_every == 0: | |
print(' ' * level + f'Gen #{i+1} - Current best perf: {best_score:.2f}, time: {time.time() - init_time:.4f}') | |
init_time = time.time() | |
# | |
# to_print = time_dict.copy() | |
# keys = sorted(to_print.keys()) | |
# values = [] | |
# for k in keys: | |
# to_print[k] = np.sum(to_print[k]) | |
# values.append(to_print[k]) | |
# sorted_inds = np.flip(np.argsort(values)) | |
# for i in sorted_inds: | |
# print(f'{keys[i]}: {values[i]:.4f}') | |
if verbose: print(' ' * level + f'Evolution over, best perf: {best_score:.2f}') | |
return self.get_best_score() | |
def print_results(self, n=3): | |
best_scores, best_ind = self.get_best_score() | |
for i in range(n): | |
best_ind[i].print_recipe(f'Candidate #{i+1}, Score: {best_scores[i]:.2f}') | |