ccolas's picture
Upload 174 files
93c029f
from src.cocktails.utilities.cocktail_generation_utilities.individual import *
from sklearn.neighbors import NearestNeighbors
import time
import pickle
from src.cocktails.config import COCKTAIL_NN_PATH, COCKTAILS_CSV_DATA
class Population:
def __init__(self, target, pop_params, target_affective_cluster=None, known_target_dict=None):
self.pop_params = pop_params
self.pop_size = pop_params['pop_size']
self.nb_elite = pop_params['nb_elites']
self.nb_generations = pop_params['nb_generations']
self.target = target
self.mutation_params = pop_params['mutation_params']
self.dist = pop_params['dist']
self.n_neighbors = pop_params['n_neighbors']
self.known_target_dict = known_target_dict
with open(COCKTAIL_NN_PATH, 'rb') as f:
data = pickle.load(f)
self.nn_model_cocktail = data['nn_model']
self.dim_rep_cocktail = data['dim_rep_cocktail']
self.n_cocktails = data['n_cocktails']
self.cocktail_data = pd.read_csv(COCKTAILS_CSV_DATA)
if target_affective_cluster is None:
cocktail_rep_affective = get_normalized_affective_cocktail_rep_from_normalized_cocktail_rep(target)
self.target_affective_cluster = cocktail2affective_cluster(cocktail_rep_affective)[0]
else:
self.target_affective_cluster = target_affective_cluster
self.pop_elite = []
self.pop = []
self.add_target_individual() # create a target individual (not in pop)
self.add_nearest_neighbors_in_pop() # add nearest neighbor from dataset into the population
# fill population
while self.get_pop_size() < self.pop_size:
self.add_individual()
while len(self.pop_elite) < self.nb_elite:
self.pop_elite.append(IndividualCocktail(pop_params=self.pop_params,
target=self.target.copy(),
target_affective_cluster=self.target_affective_cluster))
self.update_elite_and_get_next_pop()
def add_target_individual(self):
if self.known_target_dict is not None:
genes_presence, genes_quantity = self.get_q_rep(*extract_ingredients(self.known_target_dict['ing_str']))
self.target_individual = IndividualCocktail(pop_params=self.pop_params,
target=self.target.copy(),
known_target_dict=self.known_target_dict,
target_affective_cluster=self.target_affective_cluster,
genes_presence=genes_presence,
genes_quantity=genes_quantity
)
else:
self.target_individual = None
def add_nearest_neighbors_in_pop(self):
# add nearest neighbor from dataset into the population
if self.n_neighbors > 0:
dists, indexes = self.nn_model_cocktail.kneighbors(self.target.reshape(1, -1))
dists, indexes = dists.flatten(), indexes.flatten()
first = 1 if dists[0] == 0 else 0 # avoid taking the target when testing with known targets from the dataset
indexes = indexes[first:first + self.n_neighbors]
self.ing_strs = np.array(self.cocktail_data['ingredients_str'])[indexes]
recipes = [extract_ingredients(ing_str) for ing_str in self.ing_strs]
for r in recipes:
genes_presence, genes_quantity = self.get_q_rep(r[0], r[1])
genes_presence[-1] = 0 # remove water ingredient
self.add_individual(genes_presence=genes_presence.copy(), genes_quantity=genes_quantity.copy())
self.nn_recipes = [ind.get_recipe()[3] for ind in self.pop]
self.nn_scores = [ind.perf for ind in self.pop]
else:
self.ing_strs = None
def add_individual(self, genes_presence=None, genes_quantity=None):
self.pop.append(IndividualCocktail(pop_params=self.pop_params,
target=self.target.copy(),
target_affective_cluster=self.target_affective_cluster,
genes_presence=genes_presence,
genes_quantity=genes_quantity))
def get_elite_perf(self):
return np.array([e.perf for e in self.pop_elite])
def get_pop_perf(self):
return np.array([ind.perf for ind in self.pop])
def update_elite_and_get_next_pop(self):
time_dict = dict()
init_time = time.time()
elite_perfs = self.get_elite_perf()
pop_perfs = self.get_pop_perf()
all_perfs = np.concatenate([elite_perfs, pop_perfs])
temp_list = self.pop_elite + self.pop
time_dict[' get pop perfs'] = [time.time() - init_time]
init_time = time.time()
# update elite population with new bests
indexes_sorted = np.flip(np.argsort(all_perfs))
new_pop_elite = [IndividualCocktail(pop_params=self.pop_params,
target=self.target.copy(),
target_affective_cluster=self.target_affective_cluster,
genes_presence=temp_list[i_new_e].genes_presence.copy(),
genes_quantity=temp_list[i_new_e].genes_quantity.copy()) for i_new_e in indexes_sorted[:self.nb_elite]]
time_dict[' recreate elite individuals'] = [time.time() - init_time]
init_time = time.time()
# select parents
rank_perfs = np.flip(np.arange(len(temp_list)))
sampling_probs = rank_perfs / np.sum(rank_perfs)
if self.mutation_params['asexual_rep'] and not self.mutation_params['crossover']:
new_pop_indexes = np.random.choice(indexes_sorted, p=sampling_probs, size=self.pop_size)
self.pop = [temp_list[i].get_child() for i in new_pop_indexes]
elif self.mutation_params['crossover'] and not self.mutation_params['asexual_rep']:
self.pop = []
while len(self.pop) < self.pop_size:
parents = np.random.choice(indexes_sorted, p=sampling_probs, size=2, replace=False)
self.pop.append(temp_list[parents[0]].get_child_with(temp_list[parents[1]]))
elif self.mutation_params['crossover'] and self.mutation_params['asexual_rep']:
new_pop_indexes = np.random.choice(indexes_sorted, p=sampling_probs, size=self.pop_size//2)
time_dict[' choose asexual parent indexes'] = [time.time() - init_time]
init_time = time.time()
self.pop = []
for i in new_pop_indexes:
child, this_time_dict = temp_list[i].get_child()
self.pop.append(child)
time_dict = self.update_time_dict(time_dict, this_time_dict)
time_dict[' get asexual children'] = [time.time() - init_time]
init_time = time.time()
while len(self.pop) < self.pop_size:
parents = np.random.choice(indexes_sorted, p=sampling_probs, size=2, replace=False)
child, this_time_dict = temp_list[parents[0]].get_child_with(temp_list[parents[1]])
self.pop.append(child)
time_dict = self.update_time_dict(time_dict, this_time_dict)
time_dict[' get sexual children'] = [time.time() - init_time]
self.pop_elite = new_pop_elite
return time_dict
def get_pop_size(self):
return len(self.pop)
def get_q_rep(self, ingredients, quantities):
ingredient_q_rep = np.zeros([len(ingredient_list)])
genes_presence = np.zeros([len(ingredient_list)])
for ing, q in zip(ingredients, quantities):
ingredient_q_rep[ingredient_list.index(ing)] = q
genes_presence[ingredient_list.index(ing)] = 1
return genes_presence.copy(), normalize_ingredient_q_rep(ingredient_q_rep)
def get_best_score(self, affective_cluster_check=False):
elite_perfs = self.get_elite_perf()
pop_perfs = self.get_pop_perf()
all_perfs = np.concatenate([elite_perfs, pop_perfs])
temp_list = self.pop_elite + self.pop
if affective_cluster_check:
indexes = np.array([i for i in range(len(temp_list)) if temp_list[i].does_affective_cluster_match()])
if indexes.size > 0:
temp_list = np.array(temp_list)[indexes]
all_perfs = all_perfs[indexes]
indexes_best = np.flip(np.argsort(all_perfs))
return np.array(all_perfs)[indexes_best], np.array(temp_list)[indexes_best]
def update_time_dict(self, main_dict, new_dict):
for k in new_dict.keys():
if k in main_dict.keys():
main_dict[k].append(np.sum(new_dict[k]))
else:
main_dict[k] = [np.sum(new_dict[k])]
return main_dict
def run_one_generation(self, verbose=True, affective_cluster_check=False):
time_dict = dict()
init_time = time.time()
this_time_dict = self.update_elite_and_get_next_pop()
time_dict['update_elite_and_pop'] = [time.time() - init_time]
time_dict = self.update_time_dict(time_dict, this_time_dict)
init_time = time.time()
best_perfs, best_individuals = self.get_best_score(affective_cluster_check)
time_dict['get best scores'] = [time.time() - init_time]
return best_perfs[0], time_dict
def run_evolution(self, verbose=False, print_every=10, affective_cluster_check=False, level=0):
best_score = -np.inf
time_dict = dict()
init_time = time.time()
for i in range(self.nb_generations):
best_score, this_time_dict = self.run_one_generation(verbose, affective_cluster_check=affective_cluster_check)
time_dict = self.update_time_dict(time_dict, this_time_dict)
if verbose and (i+1) % print_every == 0:
print(' ' * level + f'Gen #{i+1} - Current best perf: {best_score:.2f}, time: {time.time() - init_time:.4f}')
init_time = time.time()
#
# to_print = time_dict.copy()
# keys = sorted(to_print.keys())
# values = []
# for k in keys:
# to_print[k] = np.sum(to_print[k])
# values.append(to_print[k])
# sorted_inds = np.flip(np.argsort(values))
# for i in sorted_inds:
# print(f'{keys[i]}: {values[i]:.4f}')
if verbose: print(' ' * level + f'Evolution over, best perf: {best_score:.2f}')
return self.get_best_score()
def print_results(self, n=3):
best_scores, best_ind = self.get_best_score()
for i in range(n):
best_ind[i].print_recipe(f'Candidate #{i+1}, Score: {best_scores[i]:.2f}')