from src.cocktails.utilities.cocktail_generation_utilities.individual import * from sklearn.neighbors import NearestNeighbors import time import pickle from src.cocktails.config import COCKTAIL_NN_PATH, COCKTAILS_CSV_DATA class Population: def __init__(self, target, pop_params, target_affective_cluster=None, known_target_dict=None): self.pop_params = pop_params self.pop_size = pop_params['pop_size'] self.nb_elite = pop_params['nb_elites'] self.nb_generations = pop_params['nb_generations'] self.target = target self.mutation_params = pop_params['mutation_params'] self.dist = pop_params['dist'] self.n_neighbors = pop_params['n_neighbors'] self.known_target_dict = known_target_dict with open(COCKTAIL_NN_PATH, 'rb') as f: data = pickle.load(f) self.nn_model_cocktail = data['nn_model'] self.dim_rep_cocktail = data['dim_rep_cocktail'] self.n_cocktails = data['n_cocktails'] self.cocktail_data = pd.read_csv(COCKTAILS_CSV_DATA) if target_affective_cluster is None: cocktail_rep_affective = get_normalized_affective_cocktail_rep_from_normalized_cocktail_rep(target) self.target_affective_cluster = cocktail2affective_cluster(cocktail_rep_affective)[0] else: self.target_affective_cluster = target_affective_cluster self.pop_elite = [] self.pop = [] self.add_target_individual() # create a target individual (not in pop) self.add_nearest_neighbors_in_pop() # add nearest neighbor from dataset into the population # fill population while self.get_pop_size() < self.pop_size: self.add_individual() while len(self.pop_elite) < self.nb_elite: self.pop_elite.append(IndividualCocktail(pop_params=self.pop_params, target=self.target.copy(), target_affective_cluster=self.target_affective_cluster)) self.update_elite_and_get_next_pop() def add_target_individual(self): if self.known_target_dict is not None: genes_presence, genes_quantity = self.get_q_rep(*extract_ingredients(self.known_target_dict['ing_str'])) self.target_individual = IndividualCocktail(pop_params=self.pop_params, target=self.target.copy(), known_target_dict=self.known_target_dict, target_affective_cluster=self.target_affective_cluster, genes_presence=genes_presence, genes_quantity=genes_quantity ) else: self.target_individual = None def add_nearest_neighbors_in_pop(self): # add nearest neighbor from dataset into the population if self.n_neighbors > 0: dists, indexes = self.nn_model_cocktail.kneighbors(self.target.reshape(1, -1)) dists, indexes = dists.flatten(), indexes.flatten() first = 1 if dists[0] == 0 else 0 # avoid taking the target when testing with known targets from the dataset indexes = indexes[first:first + self.n_neighbors] self.ing_strs = np.array(self.cocktail_data['ingredients_str'])[indexes] recipes = [extract_ingredients(ing_str) for ing_str in self.ing_strs] for r in recipes: genes_presence, genes_quantity = self.get_q_rep(r[0], r[1]) genes_presence[-1] = 0 # remove water ingredient self.add_individual(genes_presence=genes_presence.copy(), genes_quantity=genes_quantity.copy()) self.nn_recipes = [ind.get_recipe()[3] for ind in self.pop] self.nn_scores = [ind.perf for ind in self.pop] else: self.ing_strs = None def add_individual(self, genes_presence=None, genes_quantity=None): self.pop.append(IndividualCocktail(pop_params=self.pop_params, target=self.target.copy(), target_affective_cluster=self.target_affective_cluster, genes_presence=genes_presence, genes_quantity=genes_quantity)) def get_elite_perf(self): return np.array([e.perf for e in self.pop_elite]) def get_pop_perf(self): return np.array([ind.perf for ind in self.pop]) def update_elite_and_get_next_pop(self): time_dict = dict() init_time = time.time() elite_perfs = self.get_elite_perf() pop_perfs = self.get_pop_perf() all_perfs = np.concatenate([elite_perfs, pop_perfs]) temp_list = self.pop_elite + self.pop time_dict[' get pop perfs'] = [time.time() - init_time] init_time = time.time() # update elite population with new bests indexes_sorted = np.flip(np.argsort(all_perfs)) new_pop_elite = [IndividualCocktail(pop_params=self.pop_params, target=self.target.copy(), target_affective_cluster=self.target_affective_cluster, genes_presence=temp_list[i_new_e].genes_presence.copy(), genes_quantity=temp_list[i_new_e].genes_quantity.copy()) for i_new_e in indexes_sorted[:self.nb_elite]] time_dict[' recreate elite individuals'] = [time.time() - init_time] init_time = time.time() # select parents rank_perfs = np.flip(np.arange(len(temp_list))) sampling_probs = rank_perfs / np.sum(rank_perfs) if self.mutation_params['asexual_rep'] and not self.mutation_params['crossover']: new_pop_indexes = np.random.choice(indexes_sorted, p=sampling_probs, size=self.pop_size) self.pop = [temp_list[i].get_child() for i in new_pop_indexes] elif self.mutation_params['crossover'] and not self.mutation_params['asexual_rep']: self.pop = [] while len(self.pop) < self.pop_size: parents = np.random.choice(indexes_sorted, p=sampling_probs, size=2, replace=False) self.pop.append(temp_list[parents[0]].get_child_with(temp_list[parents[1]])) elif self.mutation_params['crossover'] and self.mutation_params['asexual_rep']: new_pop_indexes = np.random.choice(indexes_sorted, p=sampling_probs, size=self.pop_size//2) time_dict[' choose asexual parent indexes'] = [time.time() - init_time] init_time = time.time() self.pop = [] for i in new_pop_indexes: child, this_time_dict = temp_list[i].get_child() self.pop.append(child) time_dict = self.update_time_dict(time_dict, this_time_dict) time_dict[' get asexual children'] = [time.time() - init_time] init_time = time.time() while len(self.pop) < self.pop_size: parents = np.random.choice(indexes_sorted, p=sampling_probs, size=2, replace=False) child, this_time_dict = temp_list[parents[0]].get_child_with(temp_list[parents[1]]) self.pop.append(child) time_dict = self.update_time_dict(time_dict, this_time_dict) time_dict[' get sexual children'] = [time.time() - init_time] self.pop_elite = new_pop_elite return time_dict def get_pop_size(self): return len(self.pop) def get_q_rep(self, ingredients, quantities): ingredient_q_rep = np.zeros([len(ingredient_list)]) genes_presence = np.zeros([len(ingredient_list)]) for ing, q in zip(ingredients, quantities): ingredient_q_rep[ingredient_list.index(ing)] = q genes_presence[ingredient_list.index(ing)] = 1 return genes_presence.copy(), normalize_ingredient_q_rep(ingredient_q_rep) def get_best_score(self, affective_cluster_check=False): elite_perfs = self.get_elite_perf() pop_perfs = self.get_pop_perf() all_perfs = np.concatenate([elite_perfs, pop_perfs]) temp_list = self.pop_elite + self.pop if affective_cluster_check: indexes = np.array([i for i in range(len(temp_list)) if temp_list[i].does_affective_cluster_match()]) if indexes.size > 0: temp_list = np.array(temp_list)[indexes] all_perfs = all_perfs[indexes] indexes_best = np.flip(np.argsort(all_perfs)) return np.array(all_perfs)[indexes_best], np.array(temp_list)[indexes_best] def update_time_dict(self, main_dict, new_dict): for k in new_dict.keys(): if k in main_dict.keys(): main_dict[k].append(np.sum(new_dict[k])) else: main_dict[k] = [np.sum(new_dict[k])] return main_dict def run_one_generation(self, verbose=True, affective_cluster_check=False): time_dict = dict() init_time = time.time() this_time_dict = self.update_elite_and_get_next_pop() time_dict['update_elite_and_pop'] = [time.time() - init_time] time_dict = self.update_time_dict(time_dict, this_time_dict) init_time = time.time() best_perfs, best_individuals = self.get_best_score(affective_cluster_check) time_dict['get best scores'] = [time.time() - init_time] return best_perfs[0], time_dict def run_evolution(self, verbose=False, print_every=10, affective_cluster_check=False, level=0): best_score = -np.inf time_dict = dict() init_time = time.time() for i in range(self.nb_generations): best_score, this_time_dict = self.run_one_generation(verbose, affective_cluster_check=affective_cluster_check) time_dict = self.update_time_dict(time_dict, this_time_dict) if verbose and (i+1) % print_every == 0: print(' ' * level + f'Gen #{i+1} - Current best perf: {best_score:.2f}, time: {time.time() - init_time:.4f}') init_time = time.time() # # to_print = time_dict.copy() # keys = sorted(to_print.keys()) # values = [] # for k in keys: # to_print[k] = np.sum(to_print[k]) # values.append(to_print[k]) # sorted_inds = np.flip(np.argsort(values)) # for i in sorted_inds: # print(f'{keys[i]}: {values[i]:.4f}') if verbose: print(' ' * level + f'Evolution over, best perf: {best_score:.2f}') return self.get_best_score() def print_results(self, n=3): best_scores, best_ind = self.get_best_score() for i in range(n): best_ind[i].print_recipe(f'Candidate #{i+1}, Score: {best_scores[i]:.2f}')