diff --git a/pygad/pygad.py b/pygad/pygad.py index f17bd99..4d1f7cf 100644 --- a/pygad/pygad.py +++ b/pygad/pygad.py @@ -63,7 +63,8 @@ def __init__(self, stop_criteria=None, parallel_processing=None, random_seed=None, - logger=None): + logger=None, + constraint_func=None): """ The constructor of the GA class accepts all parameters required to create an instance of the GA class. It validates such parameters. @@ -129,6 +130,17 @@ def __init__(self, random_seed: Added in PyGAD 2.18.0. It defines the random seed to be used by the random function generators (we use random functions in the NumPy and random modules). This helps to reproduce the same results by setting the same random seed. logger: Added in PyGAD 2.20.0. It accepts a logger object of the 'logging.Logger' class to log the messages. If no logger is passed, then a default logger is created to log/print the messages to the console exactly like using the 'print()' function. + + constraint_func: Added for constrained multi-objective optimization (NSGA-II). Accepts a function/method that returns the constraint violation values for a solution. + For each constraint, return a value <= 0 if the constraint is satisfied, or > 0 if the constraint is violated. + The function signature must be: constraint_func(solution, solution_idx) + - solution: The solution/chromosome to check constraints for. + - solution_idx: The index of the solution within the population. + Returns a list/tuple/numpy.ndarray of constraint violation values, one for each constraint. + When using NSGA-II with constraints, constraint domination principle is used instead of regular Pareto domination: + 1) A feasible solution (all constraints satisfied) always dominates an infeasible solution. + 2) Between two feasible solutions, regular Pareto domination applies. + 3) Between two infeasible solutions, the one with smaller constraint violation dominates. """ try: self.validate_parameters(num_generations=num_generations, @@ -171,7 +183,8 @@ def __init__(self, stop_criteria=stop_criteria, parallel_processing=parallel_processing, random_seed=random_seed, - logger=logger) + logger=logger, + constraint_func=constraint_func) except Exception as e: self.logger.exception(e) # sys.exit(-1) diff --git a/pygad/utils/nsga2.py b/pygad/utils/nsga2.py index e904fed..99b2762 100644 --- a/pygad/utils/nsga2.py +++ b/pygad/utils/nsga2.py @@ -6,14 +6,20 @@ class NSGA2: def __init__(self): pass - def get_non_dominated_set(self, curr_solutions): + def get_non_dominated_set(self, curr_solutions, constraint_violations=None): """ Get the set of non-dominated solutions from the current set of solutions. + If constraint_violations is provided, constraint domination is used instead of regular Pareto domination. Parameters ---------- curr_solutions : TYPE The set of solutions to find its non-dominated set. + constraint_violations : numpy.ndarray, optional + An array where constraint_violations[i] is the total constraint violation for solution i. + A value of 0 means the solution is feasible (satisfies all constraints). + A value > 0 means the solution is infeasible (violates some constraints). + If None, regular Pareto domination is used. Returns ------- @@ -23,58 +29,78 @@ def get_non_dominated_set(self, curr_solutions): A set of the non-dominated set. """ - # List of the members of the current dominated pareto front/set. dominated_set = [] - # List of the non-members of the current dominated pareto front/set. - # The non-dominated set is the pareto front set. non_dominated_set = [] for idx1, sol1 in enumerate(curr_solutions): - # Flag indicates whether the solution is a member of the current dominated set. is_not_dominated = True for idx2, sol2 in enumerate(curr_solutions): if idx1 == idx2: continue - # Zipping the 2 solutions so the corresponding genes are in the same list. - # The returned array is of size (N, 2) where N is the number of genes. + if constraint_violations is not None: + sol1_cv = constraint_violations[sol1[0]] + sol2_cv = constraint_violations[sol2[0]] + + sol1_feasible = sol1_cv <= 0.0 + sol2_feasible = sol2_cv <= 0.0 + + if sol1_feasible and not sol2_feasible: + continue + elif not sol1_feasible and sol2_feasible: + is_not_dominated = False + dominated_set.append(sol1) + break + elif not sol1_feasible and not sol2_feasible: + if sol2_cv < sol1_cv: + is_not_dominated = False + dominated_set.append(sol1) + break + else: + continue + two_solutions = numpy.array(list(zip(sol1[1], sol2[1]))) - # Use < for minimization problems and > for maximization problems. - # Checking if any solution dominates the current solution by applying the 2 conditions. - # gr_eq (greater than or equal): All elements must be True. - # gr (greater than): Only 1 element must be True. gr_eq = two_solutions[:, 1] >= two_solutions[:, 0] gr = two_solutions[:, 1] > two_solutions[:, 0] - # If the 2 conditions hold, then a solution (sol2) dominates the current solution (sol1). - # The current solution (sol1) is not considered a member of the non-dominated set. if gr_eq.all() and gr.any(): - # Set the is_not_dominated flag to False because another solution dominates the current solution (sol1) is_not_dominated = False - # DO NOT insert the current solution in the current non-dominated set. - # Instead, insert it into the dominated set. dominated_set.append(sol1) break else: - # Reaching here means the solution does not dominate the current solution. pass - # If the flag is True, then no solution dominates the current solution. - # Insert the current solution (sol1) into the non-dominated set. if is_not_dominated: non_dominated_set.append(sol1) - # Return the dominated and non-dominated sets. return dominated_set, non_dominated_set - def non_dominated_sorting(self, fitness): + def _get_constraint_violations(self): + """ + Helper method to get constraint violations if constraint_func is defined. + + Returns + ------- + constraint_violations : numpy.ndarray or None + Total constraint violations for each solution, or None if no constraint_func. + """ + if hasattr(self, 'constraint_func') and self.constraint_func is not None: + if hasattr(self, 'population') and self.population is not None: + return self.calculate_constraint_violations(self.constraint_func, self.population)[0] + return None + + def non_dominated_sorting(self, fitness, constraint_violations=None): """ Apply non-dominant sorting over the fitness to create the pareto fronts based on non-dominated sorting of the solutions. + If constraint_violations is provided or constraint_func is defined, constraint domination is used. Parameters ---------- fitness : TYPE An array of the population fitness across all objective function. + constraint_violations : numpy.ndarray, optional + An array where constraint_violations[i] is the total constraint violation for solution i. + If None and self.constraint_func is defined, constraint violations will be calculated automatically. Returns ------- @@ -83,7 +109,6 @@ def non_dominated_sorting(self, fitness): """ - # Verify that the problem is multi-objective optimization as non-dominated sorting is only applied to multi-objective problems. if type(fitness[0]) in [list, tuple, numpy.ndarray]: pass elif type(fitness[0]) in self.supported_int_float_types: @@ -91,32 +116,24 @@ def non_dominated_sorting(self, fitness): else: raise TypeError(f'Non-dominated sorting is only applied when optimizing multi-objective problems. \n\nTo use multi-objective optimization, consider returning an iterable of any of these data types:\n1)list\n2)tuple\n3)numpy.ndarray\n\nBut the data type {type(fitness[0])} found.') - # A list of all non-dominated sets. + if constraint_violations is None: + constraint_violations = self._get_constraint_violations() + pareto_fronts = [] - # The remaining set to be explored for non-dominance. - # Initially it is set to the entire population. - # The solutions of each non-dominated set are removed after each iteration. remaining_set = fitness.copy() - # Zipping the solution index with the solution's fitness. - # This helps to easily identify the index of each solution. - # Each element has: - # 1) The index of the solution. - # 2) An array of the fitness values of this solution across all objectives. remaining_set = list(zip(range(0, fitness.shape[0]), remaining_set)) - # A list mapping the index of each pareto front to the set of solutions in this front. solutions_fronts_indices = [-1]*len(remaining_set) solutions_fronts_indices = numpy.array(solutions_fronts_indices) - # Index of the current pareto front. front_index = -1 while len(remaining_set) > 0: front_index += 1 - # Get the current non-dominated set of solutions. - remaining_set, pareto_front = self.get_non_dominated_set(curr_solutions=remaining_set) + remaining_set, pareto_front = self.get_non_dominated_set(curr_solutions=remaining_set, + constraint_violations=constraint_violations) pareto_front = numpy.array(pareto_front, dtype=object) pareto_fronts.append(pareto_front) @@ -148,61 +165,37 @@ def crowding_distance(self, pareto_front, fitness): The indices of the solutions (relative to the population) sorted by the crowding distance. """ - # Each solution in the pareto front has 2 elements: - # 1) The index of the solution in the population. - # 2) A list of the fitness values for all objectives of the solution. - # Before proceeding, remove the indices from each solution in the pareto front. pareto_front_no_indices = numpy.array([pareto_front[:, 1][idx] for idx in range(pareto_front.shape[0])]) - # If there is only 1 solution, then return empty arrays for the crowding distance. if pareto_front_no_indices.shape[0] == 1: - # There is only 1 index. return numpy.array([]), numpy.array([]), numpy.array([0]), pareto_front[:, 0].astype(int) - # An empty list holding info about the objectives of each solution. The info includes the objective value and crowding distance. obj_crowding_dist_list = [] - # Loop through the objectives to calculate the crowding distance of each solution across all objectives. for obj_idx in range(pareto_front_no_indices.shape[1]): obj = pareto_front_no_indices[:, obj_idx] - # This variable has a nested list where each child list zip the following together: - # 1) The index of the objective value. - # 2) The objective value. - # 3) Initialize the crowding distance by zero. obj = list(zip(range(len(obj)), obj, [0]*len(obj))) obj = [list(element) for element in obj] - # This variable is the sorted version where sorting is done by the objective value (second element). - # Note that the first element is still the original objective index before sorting. obj_sorted = sorted(obj, key=lambda x: x[1]) - # Get the minimum and maximum values for the current objective. obj_min_val = min(fitness[:, obj_idx]) obj_max_val = max(fitness[:, obj_idx]) denominator = obj_max_val - obj_min_val - # To avoid division by zero, set the denominator to a tiny value. if denominator == 0: denominator = 0.0000001 - # Set the crowding distance to the first and last solutions (after being sorted) to infinity. inf_val = float('inf') - # crowding_distance[0] = inf_val obj_sorted[0][2] = inf_val - # crowding_distance[-1] = inf_val obj_sorted[-1][2] = inf_val - # If there are only 2 solutions in the current pareto front, then do not proceed. - # The crowding distance for such 2 solutions is infinity. if len(obj_sorted) <= 2: obj_crowding_dist_list.append(obj_sorted) break for idx in range(1, len(obj_sorted)-1): - # Calculate the crowding distance. crowding_dist = obj_sorted[idx+1][1] - obj_sorted[idx-1][1] crowding_dist = crowding_dist / denominator - # Insert the crowding distance back into the list to override the initial zero. obj_sorted[idx][2] = crowding_dist - # Sort the objective by the original index at index 0 of each child list. obj_sorted = sorted(obj_sorted, key=lambda x: x[0]) obj_crowding_dist_list.append(obj_sorted) @@ -210,18 +203,11 @@ def crowding_distance(self, pareto_front, fitness): crowding_dist = numpy.array([obj_crowding_dist_list[idx, :, 2] for idx in range(len(obj_crowding_dist_list))]) crowding_dist_sum = numpy.sum(crowding_dist, axis=0) - # An array of the sum of crowding distances across all objectives. - # Each row has 2 elements: - # 1) The index of the solution. - # 2) The sum of all crowding distances for all objective of the solution. crowding_dist_sum = numpy.array(list(zip(obj_crowding_dist_list[0, :, 0], crowding_dist_sum))) crowding_dist_sum = sorted(crowding_dist_sum, key=lambda x: x[1], reverse=True) - # The sorted solutions' indices by the crowding distance. crowding_dist_front_sorted_indices = numpy.array(crowding_dist_sum)[:, 0] crowding_dist_front_sorted_indices = crowding_dist_front_sorted_indices.astype(int) - # Note that such indices are relative to the front, NOT the population. - # It is mandatory to map such front indices to population indices before using them to refer to the population. crowding_dist_pop_sorted_indices = pareto_front[:, 0] crowding_dist_pop_sorted_indices = crowding_dist_pop_sorted_indices[crowding_dist_front_sorted_indices] crowding_dist_pop_sorted_indices = crowding_dist_pop_sorted_indices.astype(int) @@ -230,7 +216,8 @@ def crowding_distance(self, pareto_front, fitness): def sort_solutions_nsga2(self, fitness, - find_best_solution=False): + find_best_solution=False, + constraint_violations=None): """ Sort the solutions based on the fitness. The sorting procedure differs based on whether the problem is single-objective or multi-objective optimization. @@ -243,6 +230,9 @@ def sort_solutions_nsga2(self, ---------- fitness: The fitness of the entire population. find_best_solution: Whether the method is called only to find the best solution or as part of the PyGAD lifecycle. This is to decide whether the pareto_fronts instance attribute is edited or not. + constraint_violations : numpy.ndarray, optional + An array where constraint_violations[i] is the total constraint violation for solution i. + If None and self.constraint_func is defined, constraint violations will be calculated automatically. Returns ------- @@ -251,30 +241,66 @@ def sort_solutions_nsga2(self, """ if type(fitness[0]) in [list, tuple, numpy.ndarray]: - # Multi-objective optimization problem. + if constraint_violations is None: + constraint_violations = self._get_constraint_violations() + solutions_sorted = [] - # Split the solutions into pareto fronts using non-dominated sorting. - pareto_fronts, solutions_fronts_indices = self.non_dominated_sorting(fitness) + pareto_fronts, solutions_fronts_indices = self.non_dominated_sorting(fitness, + constraint_violations=constraint_violations) if find_best_solution: - # Do not edit the pareto_fronts instance attribute when just getting the best solution. pass else: - # The method is called within the regular GA lifecycle. - # We have to edit the pareto_fronts to be assigned the latest pareto front. self.pareto_fronts = pareto_fronts.copy() for pareto_front in pareto_fronts: - # Sort the solutions in the front using crowded distance. _, _, _, crowding_dist_pop_sorted_indices = self.crowding_distance(pareto_front=pareto_front.copy(), fitness=fitness) crowding_dist_pop_sorted_indices = list(crowding_dist_pop_sorted_indices) - # Append the sorted solutions into the list. solutions_sorted.extend(crowding_dist_pop_sorted_indices) elif type(fitness[0]) in pygad.GA.supported_int_float_types: - # Single-objective optimization problem. solutions_sorted = sorted(range(len(fitness)), key=lambda k: fitness[k]) - # Reverse the sorted solutions so that the best solution comes first. solutions_sorted.reverse() else: raise TypeError(f'Each element in the fitness array must be of a number of an iterable (list, tuple, numpy.ndarray). But the type {type(fitness[0])} found') return solutions_sorted + + def calculate_constraint_violations(self, constraint_func, solutions): + """ + Calculate the constraint violations for each solution. + + Parameters + ---------- + constraint_func : callable + A function that takes a solution and returns a list/array of constraint violations. + Each violation value should be: + - <= 0 if the constraint is satisfied + - > 0 if the constraint is violated + The function signature is: constraint_func(solution, solution_idx) + solutions : numpy.ndarray + The population of solutions. + + Returns + ------- + total_violations : numpy.ndarray + An array of total constraint violation for each solution (sum of all violations). + A value of 0 means feasible, > 0 means infeasible. + per_constraint_violations : numpy.ndarray + A 2D array of constraint violations for each constraint of each solution. + """ + num_solutions = solutions.shape[0] + per_constraint_violations = [] + + for idx, sol in enumerate(solutions): + violations = constraint_func(sol, idx) + if violations is None: + violations = [] + if not isinstance(violations, (list, tuple, numpy.ndarray)): + violations = [violations] + per_constraint_violations.append(violations) + + per_constraint_violations = numpy.array(per_constraint_violations) + + positive_violations = numpy.maximum(per_constraint_violations, 0.0) + total_violations = numpy.sum(positive_violations, axis=1) + + return total_violations, per_constraint_violations diff --git a/pygad/utils/validation.py b/pygad/utils/validation.py index 6c61bea..d9936cc 100644 --- a/pygad/utils/validation.py +++ b/pygad/utils/validation.py @@ -46,7 +46,8 @@ def validate_parameters(self, stop_criteria, parallel_processing, random_seed, - logger): + logger, + constraint_func=None): # If no logger is passed, then create a logger that logs only the messages to the console. if logger is None: @@ -1317,6 +1318,35 @@ def validate_parameters(self, self.valid_parameters = False raise ValueError(f"Unexpected value ({parallel_processing}) of type ({type(parallel_processing)}) assigned to the 'parallel_processing' parameter. The accepted values for this parameter are:\n1) None: (Default) It means no parallel processing is used.\n2) A positive integer referring to the number of threads to be used (i.e. threads, not processes, are used.\n3) list/tuple: If a list or a tuple of exactly 2 elements is assigned, then:\n\t*1) The first element can be either 'process' or 'thread' to specify whether processes or threads are used, respectively.\n\t*2) The second element can be:\n\t\t**1) A positive integer to select the maximum number of processes or threads to be used.\n\t\t**2) 0 to indicate that parallel processing is not used. This is identical to setting 'parallel_processing=None'.\n\t\t**3) None to use the default value as calculated by the concurrent.futures module.") + # Validate the constraint_func parameter for constrained multi-objective optimization. + if constraint_func is None: + self.constraint_func = None + elif inspect.ismethod(constraint_func): + if len(inspect.signature(constraint_func).parameters) == 2: + self.constraint_func = constraint_func + else: + self.valid_parameters = False + raise ValueError(f"The constraint_func method must accept 2 parameters:\n1) The solution/chromosome to check constraints for.\n2) The index of the solution within the population.\n\nThe passed constraint_func method named '{constraint_func.__code__.co_name}' accepts {len(inspect.signature(constraint_func).parameters)} parameter(s).") + elif inspect.isfunction(constraint_func): + if len(inspect.signature(constraint_func).parameters) == 2: + self.constraint_func = constraint_func + else: + self.valid_parameters = False + raise ValueError(f"The constraint_func function must accept 2 parameters:\n1) The solution/chromosome to check constraints for.\n2) The index of the solution within the population.\n\nThe passed constraint_func function named '{constraint_func.__code__.co_name}' accepts {len(inspect.signature(constraint_func).parameters)} parameter(s).") + elif callable(constraint_func) and not inspect.isclass(constraint_func): + if hasattr(constraint_func, '__call__'): + if len(inspect.signature(constraint_func).parameters) == 2: + self.constraint_func = constraint_func + else: + self.valid_parameters = False + raise ValueError(f"When 'constraint_func' is assigned a class instance, then its __call__ method must accept 2 parameters:\n1) The solution/chromosome to check constraints for.\n2) The index of the solution within the population.\n\nThe passed instance of the class named '{constraint_func.__class__.__name__}' accepts {len(inspect.signature(constraint_func).parameters)} parameter(s).") + else: + self.valid_parameters = False + raise ValueError("When 'constraint_func' is assigned a class instance, then its __call__ method must be implemented and accept 2 parameters.") + else: + self.valid_parameters = False + raise TypeError(f"The value assigned to the constraint_func parameter is expected to be None, a function, a method, or a callable class instance but {type(constraint_func)} found.") + # Set the `run_completed` property to False. It is set to `True` only after the `run()` method is complete. self.run_completed = False diff --git a/tests/test_constrained_nsga2.py b/tests/test_constrained_nsga2.py new file mode 100644 index 0000000..9364cec --- /dev/null +++ b/tests/test_constrained_nsga2.py @@ -0,0 +1,273 @@ +import pygad +import numpy + +def test_constraint_domination_principle(): + """ + Test the constraint domination principle for NSGA-II. + + Constraint Domination Principle: + 1. A feasible solution (no constraint violations) always dominates an infeasible solution. + 2. Between two feasible solutions, regular Pareto domination applies. + 3. Between two infeasible solutions, the one with smaller constraint violation dominates. + """ + + function_inputs1 = [4, -2, 3.5, 5, -11, -4.7] + function_inputs2 = [-2, 0.7, -9, 1.4, 3, 5] + desired_output1 = 50 + desired_output2 = 30 + + def fitness_func(ga_instance, solution, solution_idx): + output1 = numpy.sum(solution * function_inputs1) + output2 = numpy.sum(solution * function_inputs2) + fitness1 = 1.0 / (numpy.abs(output1 - desired_output1) + 0.000001) + fitness2 = 1.0 / (numpy.abs(output2 - desired_output2) + 0.000001) + return [fitness1, fitness2] + + def constraint_func(solution, solution_idx): + """ + Example constraint: sum of all genes should be <= 0. + Returns: + - <= 0 if constraint is satisfied + - > 0 if constraint is violated + """ + sum_genes = numpy.sum(solution) + return [sum_genes] + + num_generations = 20 + num_parents_mating = 4 + sol_per_pop = 10 + num_genes = len(function_inputs1) + + ga_instance = pygad.GA(num_generations=num_generations, + num_parents_mating=num_parents_mating, + sol_per_pop=sol_per_pop, + num_genes=num_genes, + fitness_func=fitness_func, + parent_selection_type='nsga2', + constraint_func=constraint_func, + random_seed=42) + + initial_population = ga_instance.population.copy() + + print("Testing constraint function validation...") + assert ga_instance.constraint_func is not None, "constraint_func should be set" + print("✓ constraint_func is properly set") + + ga_instance.run() + + print("\nChecking constraint domination behavior...") + constraint_violations = [] + for idx, sol in enumerate(ga_instance.population): + violations = constraint_func(sol, idx) + total_violation = sum(max(0, v) for v in violations) + constraint_violations.append(total_violation) + print(f" Solution {idx}: sum={numpy.sum(sol):.4f}, violation={total_violation:.4f}") + + constraint_violations = numpy.array(constraint_violations) + feasible_count = numpy.sum(constraint_violations <= 0) + infeasible_count = numpy.sum(constraint_violations > 0) + + print(f"\nFeasible solutions: {feasible_count}") + print(f"Infeasible solutions: {infeasible_count}") + + if feasible_count > 0: + print("\n✓ Found feasible solutions") + else: + print("\n⚠ No feasible solutions found (may need more generations)") + + print("\n" + "="*60) + print("Test completed successfully!") + print("="*60) + + return ga_instance + + +def test_feasible_vs_infeasible_domination(): + """ + Test that a feasible solution always dominates an infeasible solution. + """ + print("\n" + "="*60) + print("Testing: Feasible vs Infeasible Domination") + print("="*60) + + nsga2 = pygad.utils.nsga2.NSGA2() + nsga2.supported_int_float_types = [int, float, numpy.float64, numpy.int64] + + fitness = numpy.array([ + [1.0, 1.0], + [0.5, 0.5], + ]) + + constraint_violations = numpy.array([ + 0.0, + 5.0, + ]) + + pareto_fronts, solutions_fronts_indices = nsga2.non_dominated_sorting( + fitness, + constraint_violations=constraint_violations + ) + + print(f"\nFitness values:") + print(f" Solution 0: {fitness[0]} (feasible, violation={constraint_violations[0]})") + print(f" Solution 1: {fitness[1]} (infeasible, violation={constraint_violations[1]})") + + print(f"\nPareto fronts:") + for i, front in enumerate(pareto_fronts): + print(f" Front {i}: {[int(sol[0]) for sol in front]}") + + assert solutions_fronts_indices[0] < solutions_fronts_indices[1], \ + "Feasible solution (0) should be in a better (lower index) front than infeasible solution (1)" + + print("\n✓ Feasible solution correctly dominates infeasible solution") + return True + + +def test_infeasible_vs_infeasible_domination(): + """ + Test that between two infeasible solutions, the one with smaller + constraint violation dominates. + """ + print("\n" + "="*60) + print("Testing: Infeasible vs Infeasible Domination") + print("="*60) + + nsga2 = pygad.utils.nsga2.NSGA2() + nsga2.supported_int_float_types = [int, float, numpy.float64, numpy.int64] + + fitness = numpy.array([ + [0.1, 0.1], + [1.0, 1.0], + ]) + + constraint_violations = numpy.array([ + 1.0, + 5.0, + ]) + + pareto_fronts, solutions_fronts_indices = nsga2.non_dominated_sorting( + fitness, + constraint_violations=constraint_violations + ) + + print(f"\nFitness values:") + print(f" Solution 0: {fitness[0]} (violation={constraint_violations[0]})") + print(f" Solution 1: {fitness[1]} (violation={constraint_violations[1]})") + + print(f"\nPareto fronts:") + for i, front in enumerate(pareto_fronts): + print(f" Front {i}: {[int(sol[0]) for sol in front]}") + + assert solutions_fronts_indices[0] < solutions_fronts_indices[1], \ + "Solution with smaller violation (0) should be in a better front" + + print("\n✓ Solution with smaller constraint violation correctly dominates") + return True + + +def test_feasible_pareto_domination(): + """ + Test that between two feasible solutions, regular Pareto domination applies. + """ + print("\n" + "="*60) + print("Testing: Feasible Pareto Domination (Regular Pareto)") + print("="*60) + + nsga2 = pygad.utils.nsga2.NSGA2() + nsga2.supported_int_float_types = [int, float, numpy.float64, numpy.int64] + + fitness = numpy.array([ + [2.0, 2.0], + [1.0, 1.0], + [1.5, 0.5], + ]) + + constraint_violations = numpy.array([ + 0.0, + 0.0, + 0.0, + ]) + + pareto_fronts, solutions_fronts_indices = nsga2.non_dominated_sorting( + fitness, + constraint_violations=constraint_violations + ) + + print(f"\nFitness values (all feasible):") + for i in range(len(fitness)): + print(f" Solution {i}: {fitness[i]}") + + print(f"\nPareto fronts:") + for i, front in enumerate(pareto_fronts): + print(f" Front {i}: {[int(sol[0]) for sol in front]}") + + assert 0 in [int(sol[0]) for sol in pareto_fronts[0]], \ + "Solution 0 [2.0, 2.0] should be in the first Pareto front" + + print("\n✓ Regular Pareto domination works correctly for feasible solutions") + return True + + +def test_backward_compatibility(): + """ + Test that NSGA-II works without constraint_func (backward compatibility). + """ + print("\n" + "="*60) + print("Testing: Backward Compatibility (No Constraint)") + print("="*60) + + function_inputs1 = [4, -2, 3.5, 5, -11, -4.7] + function_inputs2 = [-2, 0.7, -9, 1.4, 3, 5] + desired_output1 = 50 + desired_output2 = 30 + + def fitness_func(ga_instance, solution, solution_idx): + output1 = numpy.sum(solution * function_inputs1) + output2 = numpy.sum(solution * function_inputs2) + fitness1 = 1.0 / (numpy.abs(output1 - desired_output1) + 0.000001) + fitness2 = 1.0 / (numpy.abs(output2 - desired_output2) + 0.000001) + return [fitness1, fitness2] + + num_generations = 10 + num_parents_mating = 4 + sol_per_pop = 8 + num_genes = len(function_inputs1) + + ga_instance = pygad.GA(num_generations=num_generations, + num_parents_mating=num_parents_mating, + sol_per_pop=sol_per_pop, + num_genes=num_genes, + fitness_func=fitness_func, + parent_selection_type='nsga2', + random_seed=42) + + ga_instance.run() + + assert ga_instance.pareto_fronts is not None, "Pareto fronts should be computed" + + print(f"\nNumber of Pareto fronts: {len(ga_instance.pareto_fronts)}") + for i, front in enumerate(ga_instance.pareto_fronts): + print(f" Front {i}: {len(front)} solutions") + + print("\n✓ Backward compatibility test passed") + return True + + +if __name__ == "__main__": + print("="*60) + print("Constrained NSGA-II Test Suite") + print("="*60) + + test_backward_compatibility() + test_feasible_vs_infeasible_domination() + test_infeasible_vs_infeasible_domination() + test_feasible_pareto_domination() + + print("\n" + "="*60) + print("Running full GA test with constraints...") + print("="*60) + test_constraint_domination_principle() + + print("\n" + "="*60) + print("All tests completed!") + print("="*60)