import logging
from collections import namedtuple
import numpy as np
from l2l import dict_to_list
from l2l import list_to_dict
from l2l.optimizers.optimizer import Optimizer
logger = logging.getLogger("optimizers.gradientdescent")
ClassicGDParameters = namedtuple(
'ClassicGDParameters',
['learning_rate', 'exploration_step_size', 'n_random_steps', 'n_iteration', 'stop_criterion', 'seed'])
ClassicGDParameters.__doc__ = """
:param learning_rate: The rate of learning per step of gradient descent
:param exploration_step_size: The standard deviation of random steps used for finite difference gradient
:param n_random_steps: The amount of random steps used to estimate gradient
:param n_iteration: number of iteration to perform
:param stop_criterion: Stop if change in fitness is below this value
"""
StochasticGDParameters = namedtuple(
'StochasticGDParameters',
['learning_rate', 'stochastic_deviation', 'stochastic_decay', 'exploration_step_size', 'n_random_steps', 'n_iteration',
'stop_criterion', 'seed'])
StochasticGDParameters.__doc__ = """
:param learning_rate: The rate of learning per step of gradient descent
:param stochastic_deviation: The standard deviation of the random vector used to perturbate the gradient
:param stochastic_decay: The decay of the influence of the random vector that is added to the gradient
(set to 0 to disable stochastic perturbation)
:param exploration_step_size: The standard deviation of random steps used for finite difference gradient
:param n_random_steps: The amount of random steps used to estimate gradient
:param n_iteration: number of iteration to perform
:param stop_criterion: Stop if change in fitness is below this value
"""
AdamParameters = namedtuple(
'AdamParameters',
['learning_rate', 'exploration_step_size', 'n_random_steps', 'first_order_decay', 'second_order_decay', 'n_iteration',
'stop_criterion', 'seed'])
AdamParameters.__doc__ = """
:param learning_rate: The rate of learning per step of gradient descent
:param exploration_step_size: The standard deviation of random steps used for finite difference gradient
:param n_random_steps: The amount of random steps used to estimate gradient
:param first_order_decay: Specifies the amount of decay of the historic first order momentum per gradient descent step
:param second_order_decay: Specifies the amount of decay of the historic second order momentum per gradient descent step
:param n_iteration: number of iteration to perform
:param stop_criterion: Stop if change in fitness is below this value
"""
RMSPropParameters = namedtuple(
'RMSPropParameters',
['learning_rate', 'exploration_step_size', 'n_random_steps', 'momentum_decay', 'n_iteration', 'stop_criterion', 'seed'])
RMSPropParameters.__doc__ = """
:param learning_rate: The rate of learning per step of gradient descent
:param exploration_step_size: The standard deviation of random steps used for finite difference gradient
:param n_random_steps: The amount of random steps used to estimate gradient
:param momentum_decay: Specifies the decay of the historic momentum at each gradient descent step
:param n_iteration: number of iteration to perform
:param stop_criterion: Stop if change in fitness is below this value
:param seed: The random seed used for random number generation in the optimizer
"""
[docs]class GradientDescentOptimizer(Optimizer):
"""
Class for a generic gradient descent solver.
In the pseudo code the algorithm does:
For n iterations do:
- Explore the fitness of individuals in the close vicinity of the current one
- Calculate the gradient based on these fitnesses.
- Create the new 'current individual' by taking a step in the parameters space along the direction
of the largest ascent of the plane
NOTE: This expects all parameters of the system to be of floating point
:param ~l2l.utils.trajectory.Trajectory traj:
Use this trajectory to store the parameters of the specific runs. The parameters should be
initialized based on the values in `parameters`
:param optimizee_create_individual:
Function that creates a new individual
:param optimizee_fitness_weights:
Fitness weights. The fitness returned by the Optimizee is multiplied by these values (one for each
element of the fitness vector)
:param parameters:
Instance of :func:`~collections.namedtuple` :class:`.ClassicGDParameters`,
:func:`~collections.namedtuple` :class:`.StochasticGDParameters`,
:func:`~collections.namedtuple` :class:`.RMSPropParameters` or
:func:`~collections.namedtuple` :class:`.AdamParameters` containing the
parameters needed by the Optimizer. The type of this parameter is used to select one of the GD variants.
"""
def __init__(self, traj,
optimizee_create_individual,
optimizee_fitness_weights,
parameters,
optimizee_bounding_func=None):
super().__init__(traj,
optimizee_create_individual=optimizee_create_individual,
optimizee_fitness_weights=optimizee_fitness_weights,
parameters=parameters, optimizee_bounding_func=optimizee_bounding_func)
self.optimizee_bounding_func = optimizee_bounding_func
traj.f_add_parameter('learning_rate', parameters.learning_rate, comment='Value of learning rate')
traj.f_add_parameter('exploration_step_size', parameters.exploration_step_size,
comment='Standard deviation of the random steps')
traj.f_add_parameter('n_random_steps', parameters.n_random_steps,
comment='Amount of random steps taken for calculating the gradient')
traj.f_add_parameter('n_iteration', parameters.n_iteration, comment='Number of iteration to perform')
traj.f_add_parameter('stop_criterion', parameters.stop_criterion, comment='Stopping criterion parameter')
traj.f_add_parameter('seed', np.uint32(parameters.seed), comment='Optimizer random seed')
_, self.optimizee_individual_dict_spec = dict_to_list(self.optimizee_create_individual(), get_dict_spec=True)
self.random_state = np.random.RandomState(seed=traj.par.seed)
# Note that this array stores individuals as an np.array of floats as opposed to Individual-Dicts
# This is because this array is used within the context of the gradient descent algorithm and
# Thus needs to handle the optimizee individuals as vectors
self.current_individual = np.array(dict_to_list(self.optimizee_create_individual()))
# Depending on the algorithm used, initialize the necessary variables
self.updateFunction = None
if type(parameters) is ClassicGDParameters:
self.init_classic_gd(parameters, traj)
elif type(parameters) is StochasticGDParameters:
self.init_stochastic_gd(parameters, traj)
elif type(parameters) is AdamParameters:
self.init_adam(parameters, traj)
elif type(parameters) is RMSPropParameters:
self.init_rmsprop(parameters, traj)
else:
raise Exception('Class of the provided "parameters" argument is not among the supported types')
# Added a generation-wise parameter logging
traj.results.f_add_result_group('generation_params',
comment='This contains the optimizer parameters that are'
' common across a generation')
# Explore the neighbourhood in the parameter space of current individual
new_individual_list = [
list_to_dict(self.current_individual +
self.random_state.normal(0.0, parameters.exploration_step_size, self.current_individual.size),
self.optimizee_individual_dict_spec)
for i in range(parameters.n_random_steps)
]
# Also add the current individual to determine it's fitness
new_individual_list.append(list_to_dict(self.current_individual, self.optimizee_individual_dict_spec))
if optimizee_bounding_func is not None:
new_individual_list = [self.optimizee_bounding_func(ind) for ind in new_individual_list]
# Storing the fitness of the current individual
self.current_fitness = -np.Inf
self.g = 0
self.eval_pop = new_individual_list
self._expand_trajectory(traj)
[docs] def post_process(self, traj, fitnesses_results):
"""
See :meth:`~l2l.optimizers.optimizer.Optimizer.post_process`
"""
old_eval_pop = self.eval_pop.copy()
self.eval_pop.clear()
logger.info(" Evaluating %i individuals" % len(fitnesses_results))
assert len(fitnesses_results) - 1 == traj.n_random_steps
# We need to collect the directions of the random steps along with the fitness evaluated there
fitnesses = np.zeros((traj.n_random_steps))
dx = np.zeros((traj.n_random_steps, len(self.current_individual)))
weighted_fitness_list = []
for i, (run_index, fitness) in enumerate(fitnesses_results):
# We need to convert the current run index into an ind_idx
# (index of individual within one generation
traj.v_idx = run_index
ind_index = traj.par.ind_idx
individual = old_eval_pop[ind_index]
traj.f_add_result('$set.$.individual', individual)
traj.f_add_result('$set.$.fitness', fitness)
weighted_fitness = np.dot(fitness, self.optimizee_fitness_weights)
weighted_fitness_list.append(weighted_fitness)
# The last element of the list is the evaluation of the individual obtained via gradient descent
if i == len(fitnesses_results) - 1:
self.current_fitness = weighted_fitness
else:
fitnesses[i] = weighted_fitness
dx[i] = np.array(dict_to_list(individual)) - self.current_individual
traj.v_idx = -1 # set the trajectory back to default
# Performs descending arg-sort of weighted fitness
fitness_sorting_indices = list(reversed(np.argsort(weighted_fitness_list)))
old_eval_pop_as_array = np.array([dict_to_list(x) for x in old_eval_pop])
# Sorting the data according to fitness
sorted_population = old_eval_pop_as_array[fitness_sorting_indices]
sorted_fitness = np.asarray(weighted_fitness_list)[fitness_sorting_indices]
logger.info("-- End of generation %d --", self.g)
logger.info(" Evaluated %d individuals", len(fitnesses_results))
logger.info(' Average Fitness: %.4f', np.mean(sorted_fitness))
logger.info(" Current fitness is %.2f", self.current_fitness)
logger.info(' Best Fitness: %.4f', sorted_fitness[0])
logger.info(" Best individual is %s", sorted_population[0])
generation_result_dict = {
'generation': self.g,
'current_fitness': self.current_fitness,
'best_fitness_in_run': sorted_fitness[0],
'average_fitness_in_run': np.mean(sorted_fitness),
}
generation_name = 'generation_{}'.format(self.g)
traj.results.generation_params.f_add_result_group(generation_name)
traj.results.generation_params.f_add_result(
generation_name + '.algorithm_params', generation_result_dict)
logger.info("-- End of iteration {}, current fitness is {} --".format(self.g, self.current_fitness))
if self.g < traj.n_iteration - 1 and traj.stop_criterion > self.current_fitness:
# Create new individual using the appropriate gradient descent
self.update_function(traj, np.dot(np.linalg.pinv(dx), fitnesses - self.current_fitness))
current_individual_dict = list_to_dict(self.current_individual, self.optimizee_individual_dict_spec)
if self.optimizee_bounding_func is not None:
current_individual_dict = self.optimizee_bounding_func(current_individual_dict)
self.current_individual = np.array(dict_to_list(current_individual_dict))
# Explore the neighbourhood in the parameter space of the current individual
new_individual_list = [
list_to_dict(self.current_individual +
self.random_state.normal(0.0, traj.exploration_step_size, self.current_individual.size),
self.optimizee_individual_dict_spec)
for _ in range(traj.n_random_steps)
]
if self.optimizee_bounding_func is not None:
new_individual_list = [self.optimizee_bounding_func(ind) for ind in new_individual_list]
new_individual_list.append(current_individual_dict)
fitnesses_results.clear()
self.eval_pop = new_individual_list
self.g += 1 # Update generation counter
self._expand_trajectory(traj)
[docs] def end(self, traj):
"""
See :meth:`~l2l.optimizers.optimizer.Optimizer.end`
"""
best_last_indiv_dict = list_to_dict(self.current_individual.tolist(),
self.optimizee_individual_dict_spec)
traj.f_add_result('final_individual', best_last_indiv_dict)
traj.f_add_result('final_fitness', self.current_fitness)
traj.f_add_result('n_iteration', self.g + 1)
logger.info("The last individual was %s with fitness %s", self.current_individual, self.current_fitness)
logger.info("-- End of (successful) gradient descent --")
[docs] def init_classic_gd(self, parameters, traj):
"""
Classic Gradient Descent specific initializiation.
:param ~l2l.utils.trajectory.Trajectory traj: The trajectory on which the parameters should get stored.
:return:
"""
self.update_function = self.classic_gd_update
[docs] def init_rmsprop(self, parameters, traj):
"""
RMSProp specific initializiation.
:param ~l2l.utils.trajectory.Trajectory traj: The trajectory on which the parameters should get stored.
:return:
"""
self.update_function = self.rmsprop_update
traj.f_add_parameter('momentum_decay', parameters.momentum_decay,
comment='Decay of the historic momentum at each gradient descent step')
self.delta = 10**(-6) # used to for numerical stabilization
self.so_moment = np.zeros(len(self.current_individual)) # second order moment
[docs] def init_adam(self, parameters, traj):
"""
ADAM specific initializiation.
:param ~l2l.utils.trajectory.Trajectory traj: The trajectory on which the parameters should get stored.
:return:
"""
self.update_function = self.adam_update
traj.f_add_parameter('first_order_decay', parameters.first_order_decay,
comment='Decay of the first order momentum')
traj.f_add_parameter('second_order_decay', parameters.second_order_decay,
comment='Decay of the second order momentum')
self.delta = 10**(-8) # used for numerical stablization
self.fo_moment = np.zeros(len(self.current_individual)) # first order moment
self.so_moment = np.zeros(len(self.current_individual)) # second order moment
[docs] def init_stochastic_gd(self, parameters, traj):
"""
Stochastic Gradient Descent specific initializiation.
:param ~l2l.utils.trajectory.Trajectory traj: The trajectory on which the parameters should get stored.
:return:
"""
self.update_function = self.stochastic_gd_update
traj.f_add_parameter('stochastic_deviation', parameters.stochastic_deviation,
comment='Standard deviation of the random vector added to the gradient')
traj.f_add_parameter('stochastic_decay', parameters.stochastic_decay, comment='Decay of the random vector')
[docs] def classic_gd_update(self, traj, gradient):
"""
Updates the current individual using the classic Gradient Descent algorithm.
:param ~l2l.utils.trajectory.Trajectory traj: The trajectory which contains the parameters
required by the update algorithm
:param ~numpy.ndarray gradient: The gradient of the fitness curve, evaluated at the current individual
:return:
"""
self.current_individual += traj.learning_rate * gradient
[docs] def rmsprop_update(self, traj, gradient):
"""
Updates the current individual using the RMSProp algorithm.
:param ~l2l.utils.trajectory.Trajectory traj: The trajectory which contains the parameters
required by the update algorithm
:param ~numpy.ndarray gradient: The gradient of the fitness curve, evaluated at the current individual
:return:
"""
self.so_moment = (traj.momentum_decay * self.so_moment +
(1 - traj.momentum_decay) * np.multiply(gradient, gradient))
self.current_individual += np.multiply(traj.learning_rate / (np.sqrt(self.so_moment + self.delta)),
gradient)
[docs] def adam_update(self, traj, gradient):
"""
Updates the current individual using the ADAM algorithm.
:param ~l2l.utils.trajectory.Trajectory traj: The trajectory which contains the parameters
required by the update algorithm
:param ~numpy.ndarray gradient: The gradient of the fitness curve, evaluated at the current individual
:return:
"""
self.fo_moment = (traj.first_order_decay * self.fo_moment +
(1 - traj.first_order_decay) * gradient)
self.so_moment = (traj.second_order_decay * self.so_moment +
(1 - traj.second_order_decay) * np.multiply(gradient, gradient))
fo_moment_corrected = self.fo_moment / (1 - traj.first_order_decay ** (self.g + 1))
so_moment_corrected = self.so_moment / (1 - traj.second_order_decay ** (self.g + 1))
self.current_individual += traj.learning_rate * fo_moment_corrected / \
(np.sqrt(so_moment_corrected) + self.delta)
[docs] def stochastic_gd_update(self, traj, gradient):
"""
Updates the current individual using a stochastic version of the gradient descent algorithm.
:param ~l2l.utils.trajectory.Trajectory traj: The trajectory which contains the parameters
required by the update algorithm
:param ~numpy.ndarray gradient: The gradient of the fitness curve, evaluated at the current individual
:return:
"""
gradient += (self.random_state.normal(0.0, traj.stochastic_deviation, self.current_individual.size) *
traj.stochastic_decay**(self.g + 1))
self.current_individual += traj.learning_rate * gradient