Source code for l2l.optimizers.simulatedannealing.optimizer

import logging
from collections import namedtuple

import numpy as np
from enum import Enum

from l2l import dict_to_list
from l2l import list_to_dict
from l2l.optimizers.optimizer import Optimizer

logger = logging.getLogger("optimizers.simulatedannealing")

SimulatedAnnealingParameters = namedtuple('SimulatedAnnealingParameters',
                                          ['n_parallel_runs', 'noisy_step', 'temp_decay', 'n_iteration', 'stop_criterion', 'seed', 'cooling_schedule'])
SimulatedAnnealingParameters.__doc__ = """
:param n_parallel_runs: Number of individuals per simulation / Number of parallel Simulated Annealing runs
:param noisy_step: Size of the random step
:param temp_decay: A function of the form f(t) = temperature at time t
:param n_iteration: number of iteration to perform
:param stop_criterion: Stop if change in fitness is below this value
:param seed: Random seed
:param cooling_schedule: Which of the available schedules to use

"""

AvailableCoolingSchedules = Enum('Schedule', 'DEFAULT LOGARITHMIC EXPONENTIAL LINEAR_MULTIPLICATIVE QUADRATIC_MULTIPLICATIVE LINEAR_ADDAPTIVE QUADRATIC_ADDAPTIVE EXPONENTIAL_ADDAPTIVE TRIGONOMETRIC_ADDAPTIVE')

"""
Multiplicative Monotonic Cooling
This schedule type multiplies the starting temperature by a factor that 
decreases over time (number k of the performed iteration steps). It requires a 
decay parameter (alpha) but not an ending temperature, as the prgression of the 
temperature is well definded by the decay parameter only. The Multiplicative 
Monotonic Cooling schedules are: Exponential multiplicative cooling, 
Logarithmical multiplicative cooling, Linear multiplicative cooling and 
Quadratic multiplicative cooling.
Source: Kirkpatrick, Gelatt and Vecchi (1983)

- Exponential multiplicative cooling
Default cooling schedule for typical applications of simulated annealing. Each 
step, the temperature T_k is multiplied by the factor alpha (which has to be 
between 0 and 1) or in other words it is the starting temperature T_0 
multiplied by the factor alpha by the power of k: T_k = T_0 * alpha^k

- Logarithmical multiplicative cooling
The factor by which the temperature decreases, is indirectly proportional to 
the log of k.  Therefore it slows down the cooling, the further progressed 
the schedule is. Alpha has to be largert than one. 
T_k = T_0 / ( 1 + alpha* log (1 + k) )

- Linear multiplicative cooling
Behaves similar to Logarithmical multiplicative cooling in that the decrease 
gets lower over time, but not as pronounced. The decrease is indirectly 
proportional to alpha times k and alpha has to be larger than zero:
T_k = T_0 / ( 1 + alpha*k)

- Quadratic multiplicative cooling 
This schedule stays at high temperatures longer, than the other schedules and 
has a steeper cooling later in the process. Alpha has to be larger than zero.
T_k = T_0 / ( 1 + alpha*k^2)

Additive Monotonic Cooling
The differences to Multiplicative Monotonic Cooling are, that the final 
temperature T_n and the number of iterations n are needed also. So this 
cannot be used as intended, if the stop criterion is something different, 
than a certain number of iteration steps. A decay parameter is not needed. 
Each temperature is computed, by adding a term to the final temperature. The 
Additive Monotonic Cooling schedules are: Linear additive cooling, Quadratic 
additive cooling, Exponential additive cooling and Trigonometric additive 
cooling.
Source. Additive monotonic cooling B. T. Luke (2005) 

- Linear additive cooling 
This schedule adds a term to the final temperature, which decreases linearily 
with the progression of the schedule.
T_k = T_n + (T_0 -T_n)*((n-k)/n)

- Quadratic additive cooling 
This schedule adds a term to the final temperature, which decreases q
uadratically with the progression of the schedule.
T_k = T_n + (T_0 -T_n)*((n-k)/n)^2

- Exponential additive
Uses a complicated formula, to come up with a schedule, that has a slow start, 
a steep decrease in temperature in the middle and a slow decrease at the end 
of the process.
T_k = T_n + (T_0 - T_n) * (1/(1+exp( 2*ln(T_0 - T_n)/n * (k- n/2) ) ) )

- Trigonometric additive cooling
This schedule has a similar behavior as Exponential additive, but less pronounced. 
T_k = T_n + (T_0 - T_n)/2 * (1+cos(k*pi/n))

"""


[docs]class SimulatedAnnealingOptimizer(Optimizer): """ Class for a generic simulate annealing solver. In the pseudo code the algorithm does: For n iterations do: 1. Take a step of size noisy step in a random direction 2. If it reduces the cost, keep the solution 3. Otherwise keep with probability exp(- (f_new - f) / T) NOTE: This expects all parameters of the system to be of floating point :param ~l2l.utils.trajectory.Trajectory traj: Use this trajectory to store the parameters of the specific runs. The parameters should be initialized based on the values in `parameters` :param optimizee_create_individual: Function that creates a new individual :param optimizee_fitness_weights: Fitness weights. The fitness returned by the Optimizee is multiplied by these values (one for each element of the fitness vector) :param parameters: Instance of :func:`~collections.namedtuple` :class:`.SimulatedAnnealingParameters` containing the parameters needed by the Optimizer """ def __init__(self, traj, optimizee_create_individual, optimizee_fitness_weights, parameters, optimizee_bounding_func=None): super().__init__(traj, optimizee_create_individual=optimizee_create_individual, optimizee_fitness_weights=optimizee_fitness_weights, parameters=parameters, optimizee_bounding_func=optimizee_bounding_func) self.optimizee_bounding_func = optimizee_bounding_func # The following parameters are recorded traj.f_add_parameter('n_parallel_runs', parameters.n_parallel_runs, comment='Number of parallel simulated annealing runs / Size of Population') traj.f_add_parameter('noisy_step', parameters.noisy_step, comment='Size of the random step') traj.f_add_parameter('temp_decay', parameters.temp_decay, comment='A temperature decay parameter (multiplicative)') traj.f_add_parameter('n_iteration', parameters.n_iteration, comment='Number of iteration to perform') traj.f_add_parameter('stop_criterion', parameters.stop_criterion, comment='Stopping criterion parameter') traj.f_add_parameter('seed', np.uint32(parameters.seed), comment='Seed for RNG') _, self.optimizee_individual_dict_spec = dict_to_list(self.optimizee_create_individual(), get_dict_spec=True) # Note that this array stores individuals as an np.array of floats as opposed to Individual-Dicts # This is because this array is used within the context of the simulated annealing algorithm and # Thus needs to handle the optimizee individuals as vectors self.current_individual_list = [np.array(dict_to_list(self.optimizee_create_individual())) for _ in range(parameters.n_parallel_runs)] self.random_state = np.random.RandomState(parameters.seed) # The following parameters are NOT recorded self.T = 1. # Initialize temperature self.g = 0 # the current generation # Keep track of current fitness value to decide whether we want the next individual to be accepted or not self.current_fitness_value_list = [-np.Inf] * parameters.n_parallel_runs new_individual_list = [ list_to_dict( ind_as_list + self.random_state.normal(0.0, parameters.noisy_step, ind_as_list.size) * traj.noisy_step * self.T, self.optimizee_individual_dict_spec) for ind_as_list in self.current_individual_list ] if optimizee_bounding_func is not None: new_individual_list = [self.optimizee_bounding_func(ind) for ind in new_individual_list] self.eval_pop = new_individual_list self._expand_trajectory(traj) self.cooling_schedule = parameters.cooling_schedule
[docs] def cooling(self,temperature, cooling_schedule, temperature_decay, temperature_end, steps_total): # assumes, that the temperature always starts at 1 T0 = 1 k = self.g + 1 if cooling_schedule == AvailableCoolingSchedules.DEFAULT: return temperature * temperature_decay # Simulated Annealing and Boltzmann Machines: # A Stochastic Approach to Combinatorial Optimization and Neural Computing (1989) elif cooling_schedule == AvailableCoolingSchedules.LOGARITHMIC: return T0 / (1 + np.log(1 + k)) # Kirkpatrick, Gelatt and Vecchi (1983) elif cooling_schedule == AvailableCoolingSchedules.EXPONENTIAL: alpha = 0.85 return T0 * (alpha ** (k)) elif cooling_schedule == AvailableCoolingSchedules.LINEAR_MULTIPLICATIVE: alpha = 1 return T0 / (1 + alpha * k) elif cooling_schedule == AvailableCoolingSchedules.QUADRATIC_MULTIPLICATIVE: alpha = 1 return T0 / (1 + alpha * np.square(k)) # Additive monotonic cooling B. T. Luke (2005) elif cooling_schedule == AvailableCoolingSchedules.LINEAR_ADDAPTIVE: return temperature_end + (T0 - temperature) * ((steps_total - k) / steps_total) elif cooling_schedule == AvailableCoolingSchedules.QUADRATIC_ADDAPTIVE: return temperature_end + (T0 - temperature) * np.square((steps_total - k) / steps_total) elif cooling_schedule == AvailableCoolingSchedules.EXPONENTIAL_ADDAPTIVE: return temperature_end + (T0 - temperature) * (1 / (1 + np.exp((2 * np.log(T0 - temperature_end) / steps_total) * (k - steps_total / 2)))) elif cooling_schedule == AvailableCoolingSchedules.TRIGONOMETRIC_ADDAPTIVE: return temperature_end + (T0 - temperature_end) * (1 + np.cos(k * 3.1415 / steps_total)) / 2 return -1
[docs] def post_process(self, traj, fitnesses_results): """ See :meth:`~l2l.optimizers.optimizer.Optimizer.post_process` """ noisy_step, temp_decay, n_iteration, stop_criterion = \ traj.noisy_step, traj.temp_decay, traj.n_iteration, traj.stop_criterion old_eval_pop = self.eval_pop.copy() self.eval_pop.clear() temperature = self.T temperature_end = 0 self.T = self.cooling(temperature, self.cooling_schedule, temp_decay, temperature_end, n_iteration) logger.info(" Evaluating %i individuals" % len(fitnesses_results)) assert len(fitnesses_results) == traj.n_parallel_runs weighted_fitness_list = [] for i, (run_index, fitness) in enumerate(fitnesses_results): weighted_fitness = sum(f * w for f, w in zip(fitness, self.optimizee_fitness_weights)) weighted_fitness_list.append(weighted_fitness) # We need to convert the current run index into an ind_idx # (index of individual within one generation) traj.v_idx = run_index ind_index = traj.par.ind_idx individual = old_eval_pop[ind_index] # Accept or reject the new solution current_fitness_value_i = self.current_fitness_value_list[i] r = self.random_state.rand() p = np.exp((weighted_fitness - current_fitness_value_i) / self.T) # Accept if r < p or weighted_fitness >= current_fitness_value_i: self.current_fitness_value_list[i] = weighted_fitness self.current_individual_list[i] = np.array(dict_to_list(individual)) traj.f_add_result('$set.$.individual', individual) # Watchout! if weighted fitness is a tuple/np array it should be converted to a list first here traj.f_add_result('$set.$.fitness', weighted_fitness) current_individual = self.current_individual_list[i] new_individual = list_to_dict( current_individual + self.random_state.randn(current_individual.size) * noisy_step * self.T, self.optimizee_individual_dict_spec) if self.optimizee_bounding_func is not None: new_individual = self.optimizee_bounding_func(new_individual) logger.debug("Current best fitness for individual %d is %.2f. New individual is %s", i, self.current_fitness_value_list[i], new_individual) self.eval_pop.append(new_individual) logger.debug("Current best fitness within population is %.2f", max(self.current_fitness_value_list)) traj.v_idx = -1 # set the trajectory back to default logger.info("-- End of generation {} --".format(self.g)) # ------- Create the next generation by crossover and mutation -------- # # not necessary for the last generation if self.g < n_iteration - 1 and stop_criterion > max(self.current_fitness_value_list): fitnesses_results.clear() self.g += 1 # Update generation counter self._expand_trajectory(traj)
[docs] def end(self, traj): """ See :meth:`~l2l.optimizers.optimizer.Optimizer.end` """ # ------------ Finished all runs and print result --------------- # best_last_indiv_index = np.argmax(self.current_fitness_value_list) best_last_indiv = self.current_individual_list[best_last_indiv_index] best_last_fitness = self.current_fitness_value_list[best_last_indiv_index] best_last_indiv_dict = list_to_dict(best_last_indiv.tolist(), self.optimizee_individual_dict_spec) traj.f_add_result('final_individual', best_last_indiv_dict) traj.f_add_result('final_fitness', best_last_fitness) traj.f_add_result('n_iteration', self.g + 1) logger.info("The best last individual was %s with fitness %s", best_last_indiv, best_last_fitness) logger.info("-- End of (successful) annealing --")