# Assignement with allowed groups¶

## Problem description¶

In this example, we want to assign T tasks to W workers in order to minimize the total cost of the tasks. A worker cannot be assigned more than one task. Workers are divided into two teams and each team has a maximum number of tasks that can be assigned to it.

## Immplemntation with a GAC constraint¶

This is a non-convex problem and the constraint ‘only some pair of workers are allowed to work together’ can be difficult to implement. In the first version, the
Kalis class `KGeneralizedArcConsistencyConstraint`

is used to create a `ValidTeamPattern`

class by overloading the `testIfSatisfied`

method to make it
check wether the worker assigned to some tasks are actually allowed.

This implementation can be done as follows:

class ValidTeamPattern(KGeneralizedArcConsistencyConstraint): """This class ineherits from the KACBinConstraint and allow to define a custom binary arc-consistency constraint.""" def __init__(self, vars, valid_combination): """Initialize the StartingTimeCombinationGAC constraint.""" KGeneralizedArcConsistencyConstraint.__init__(self, vars, KGeneralizedArcConsistencyConstraint.GENERALIZED_ARC_CONSISTENCY, "StartingTimeCombinationGAC") self.vars = vars self.valid_combination = valid_combination def testIfSatisfied(self, values): """Overload the 'testIfSatisfied' method in 'KGeneralizedArcConsistencyConstraint' class by using a user defined function. """ return list(values) in self.valid_combination def allowed_pattern(allowed_groups_team: list, teams_worker: list) -> list: """ Creation of an array summing up the allowed groups from a team. The output has the following format : [[0, 1, 0, 1], [1, 0, 0, 1]] In this example, the first line allows the second and fourth workers to work together and the second line allows the first and fourth workers to work together. """ res = [] for group in allowed_groups_team: combination = [] for worker in teams_worker: combination += [1] if worker in group else [0] res += [combination] return res def solve(cost_matrix: list, allowed_groups: list): # Get the number of tasks and workers from the cost matrix nb_worker = len(cost_matrix) nb_task = len(cost_matrix[1]) # Get the number of groups from the allowed_groups array nb_teams = len(allowed_groups) # Get the composition of the teams from the allowed_groups array teams_worker = [] for team in range(nb_teams): teams_worker += [[]] for pair in allowed_groups[team]: for worker in pair: if worker not in teams_worker[team]: teams_worker[team] += [worker] try: # Declaration of the problem session = KSession() problem = KProblem(session, "allowed_groups") # Variable declaration # Declaration of a binary variable for each task/worker combination, stating whether the task has been assigned # to the worker worker_has_task = {} for worker in range(nb_worker): worker_has_task[worker] = KIntVarArray() for task in range(nb_task): worker_has_task[worker] += KIntVar(problem, "Worker_{}_Task_{}".format(worker, task), 0, 1) # Constraint declaration # Ensure that each task is assigned once for task in range(nb_task): problem.post(sum(worker_has_task[worker][task] for worker in range(nb_worker)) == 1) # Ensure that the selected workers from each team are allowed to work together # First a variable counting the number of tasks allocated to each worker is created. Those variables are binary, # thus ensuring that every worker has one tasks at most nb_task_worker = KIntVarArray() for worker in range(nb_worker): nb_task_worker += KIntVar(problem, "Nb_task_worker_{}".format(worker), 0, 1) problem.post(nb_task_worker[worker] == sum(worker_has_task[worker][task] for task in range(nb_task))) # Post the allowed group constraint patterns = {} allowed = {} for team in range(nb_teams): patterns[team] = KIntVarArray() for worker in teams_worker[team]: patterns[team] += nb_task_worker[worker] allowed_pattern_array = allowed_pattern(allowed_groups[team], teams_worker[team]) allowed[team] = ValidTeamPattern(patterns[team], allowed_pattern_array) problem.post(allowed[team]) # Objective declaration # The objective will be greater than nb_task * the minimum of cost_matrix and lower than the sum of the # coefficients in cost_matrix objective_sup = sum(cost_matrix[worker][task] for task in range(nb_task) for worker in range(nb_worker)) objective_inf = nb_task * min( cost_matrix[worker][task] for task in range(nb_task) for worker in range(nb_worker)) # The objective is the sum of the costs objective = KIntVar(problem, "cost", objective_inf, objective_sup) problem.post(objective == sum(cost_matrix[worker][task] * worker_has_task[worker][task] for task in range(nb_task) for worker in range(nb_worker))) # Set the objective problem.setSense(KProblem.Minimize) problem.setObjective(objective) # Set the solver solver = KSolver(problem) # If the problem is infeasible if problem.propagate(): # Display of various details to help find the source of the problem problem.display() problem.printMinimalConflictSet() print("Problem is infeasible") exit() # Launch the optimization result = solver.optimize() if result: solution = problem.getSolution() # Display the solution for task in range(nb_task): for worker in range(nb_worker): if solution.getValue(worker_has_task[worker][task]) == 1: print( f'Task {task} has been affected to worker {worker} 'f'with cost {cost_matrix[worker][task]}') print(f'Total cost : {solution.getValue(objective)}') # Return the objective problem.getSolution().display() return solution.getValue(objective) except ArtelysException as e: print(e) if __name__ == '__main__': COST = [[90, 76, 75, 70, 50, 74], [35, 85, 55, 65, 48, 101], [125, 95, 90, 105, 59, 120], [45, 110, 95, 115, 104, 83], [60, 105, 80, 75, 59, 62], [45, 65, 110, 95, 47, 31], [38, 51, 107, 41, 69, 99], [47, 85, 57, 71, 92, 77], [39, 63, 97, 49, 118, 56], [47, 101, 71, 60, 88, 109], [17, 39, 103, 64, 61, 92], [101, 45, 83, 59, 92, 27]] ALLOWED_GROUPS = [[[2, 3], [1, 3], [1, 2], [0, 1], [0, 2]], [[6, 7], [5, 7], [5, 6], [4, 5], [4, 7]], [[10, 11], [9, 11], [9, 10], [8, 10], [8, 11]]] solve(COST, ALLOWED_GROUPS)

## Immplemntation with a GAC table constraint¶

The GAC method implemented above shows how any constraint can be implemented simply by overloading the `testIfSatisfied`

method. However,
the `KGeneralizedArcConsistencyTableConstraint`

class already implemented in Kalis is precisely made to ensure that the value taken by a set
of variable is equal to one of the sets of variables present in a tuple.

A way to implement this method is as follows :

def allowed_pattern(allowed_groups_team: list, teams_worker: list) -> KTupleArray: """ Creation of an array summing up the allowed groups from a team. The output has the following format : [[0, 1, 0, 1], [1, 0, 0, 1]] In this example, the first line allows the second and fourth workers to work together and the second line allows the first and fourth workers to work together. """ res = KTupleArray() for group in allowed_groups_team: combination = KIntArray() for worker in teams_worker: combination += 1 if worker in group else 0 res += combination res.display() return res def solve(cost_matrix: list, allowed_groups: list): # Get the number of tasks and workers from the cost matrix nb_worker = len(cost_matrix) nb_task = len(cost_matrix[1]) # Get the number of groups from the allowed_groups array nb_teams = len(allowed_groups) # Get the composition of the teams from the allowed_groups array teams_worker = [] for team in range(nb_teams): teams_worker += [[]] for pair in allowed_groups[team]: for worker in pair: if worker not in teams_worker[team]: teams_worker[team] += [worker] try: # Declaration of the problem session = KSession() problem = KProblem(session, "allowed_groups") # Variable declaration # Declaration of a binary variable for each task/worker combination, stating wether the task has been assigned # to the worker worker_has_task = {} for worker in range(nb_worker): worker_has_task[worker] = KIntVarArray() for task in range(nb_task): worker_has_task[worker] += KIntVar(problem, "Worker_{}_Task_{}".format(worker, task), 0, 1) # Constraint declaration # Ensure that each task is assigned once for task in range(nb_task): problem.post(sum(worker_has_task[worker][task] for worker in range(nb_worker)) == 1) # Ensure that the selected workers from each team are allowed to work together # First a variable counting the number of tasks allocated to each worker is created. Those variables are binary, # thus ensuring that every worker has one tasks at most nb_task_worker = KIntVarArray() for worker in range(nb_worker): nb_task_worker += KIntVar(problem, "Nb_task_worker_{}".format(worker), 0, 1) problem.post(nb_task_worker[worker] == sum(worker_has_task[worker][task] for task in range(nb_task))) # Post the allowed group constraint patterns = {} allowed = {} for team in range(nb_teams): patterns[team] = KIntVarArray() for worker in teams_worker[team]: patterns[team] += nb_task_worker[worker] allowed_pattern_array = allowed_pattern(allowed_groups[team], teams_worker[team]) allowed[team] = KGeneralizedArcConsistencyTableConstraint(patterns[team], allowed_pattern_array, "Pair_team_{}".format(team)) problem.post(allowed[team]) # Objective declaration # The objective will be greater than nb_task * the minimum of cost_matrix and lower than the sum of the # coefficients in cost_matrix objective_sup = sum(cost_matrix[worker][task] for task in range(nb_task) for worker in range(nb_worker)) objective_inf = nb_task * min( cost_matrix[worker][task] for task in range(nb_task) for worker in range(nb_worker)) # The objective is the sum of the costs objective = KIntVar(problem, "cost", objective_inf, objective_sup) problem.post(objective == sum(cost_matrix[worker][task] * worker_has_task[worker][task] for task in range(nb_task) for worker in range(nb_worker))) # Set the objective problem.setSense(KProblem.Minimize) problem.setObjective(objective) # Set the solver solver = KSolver(problem) # If the problem is infeasible if problem.propagate(): # Display of various details to help find the source of the problem problem.display() problem.printMinimalConflictSet() print("Problem is infeasible") exit() # Launch the optimization result = solver.optimize() print("end result") if result: solution = problem.getSolution() # Display the solution for task in range(nb_task): for worker in range(nb_worker): if solution.getValue(worker_has_task[worker][task]) == 1: print( f'Task {task} has been affected to worker {worker} 'f'with cost {cost_matrix[worker][task]}') print(f'Total cost : {solution.getValue(objective)}') # Return the objective problem.getSolution().display() return solution.getValue(objective) except ArtelysException as e: print(e) if __name__ == '__main__': COST = [[90, 76, 75, 70, 50, 74], [35, 85, 55, 65, 48, 101], [125, 95, 90, 105, 59, 120], [45, 110, 95, 115, 104, 83], [60, 105, 80, 75, 59, 62], [45, 65, 110, 95, 47, 31], [38, 51, 107, 41, 69, 99], [47, 85, 57, 71, 92, 77], [39, 63, 97, 49, 118, 56], [47, 101, 71, 60, 88, 109], [17, 39, 103, 64, 61, 92], [101, 45, 83, 59, 92, 27]] ALLOWED_GROUPS = [[[2, 3], [1, 3], [1, 2], [0, 1], [0, 2]], [[6, 7], [5, 7], [5, 6], [4, 5], [4, 7]], [[10, 11], [9, 11], [9, 10], [8, 10], [8, 11]]] solve(COST, ALLOWED_GROUPS)