from __future__ import annotations
import math
from dataclasses import dataclass
from functools import partial
from typing import TYPE_CHECKING, Any, Literal
import numpy as np
from numpy.typing import NDArray
from optimagic import mark
from optimagic.config import IS_GRADIENT_FREE_OPTIMIZERS_INSTALLED
from optimagic.optimization.algo_options import (
CONVERGENCE_FTOL_ABS,
STOPPING_MAXFUN_GLOBAL,
STOPPING_MAXITER,
get_population_size,
)
from optimagic.optimization.algorithm import Algorithm, InternalOptimizeResult
from optimagic.optimization.internal_optimization_problem import (
InternalBounds,
InternalOptimizationProblem,
)
from optimagic.parameters.conversion import Converter
from optimagic.typing import (
AggregationLevel,
NonNegativeFloat,
PositiveInt,
PyTree,
)
from optimagic.typing import UnitIntervalFloat as ProbabilityFloat
if TYPE_CHECKING:
import pandas as pd
from gradient_free_optimizers.optimizers.base_optimizer import BaseOptimizer
@dataclass(frozen=True)
class GFOCommonOptions:
"""Common options for all optimizers from GFO."""
n_grid_points: PositiveInt | PyTree = 201
"""Number of grid points per dimension.
If an integer is provided, it will be used for all dimensions.
"""
n_init: PositiveInt = 20
"""Number of initialization steps to run.
Accordingly, N//2 positions will be initialized in a grid like pattern and remaining
initialized at the vertices and randomly in the search space.
"""
stopping_maxiter: PositiveInt = STOPPING_MAXITER
"""Maximum number of iterations."""
stopping_maxtime: NonNegativeFloat | None = None
"""Maximum time in seconds before termination."""
convergence_target_value: float | None = None
""""Stop the optimization if the objective function is less than this value."""
convergence_iter_noimprove: PositiveInt = 1000000 # do not want to trigger this
"""Number of iterations without improvement before termination."""
convergence_ftol_abs: NonNegativeFloat | None = (
CONVERGENCE_FTOL_ABS # set to zero, so disabled
)
"""Converge if the absolute change in the objective function is less than this
value."""
convergence_ftol_rel: NonNegativeFloat | None = None
"""Converge if the relative change in the objective function is less than this
value."""
caching: bool = True
"""Whether to cache evaluated param and function values in a dictionary for
lookup."""
extra_start_params: list[PyTree] | None = None
"""List of additional start points for the optimization run.
In case of population based optimizers, the initial_population can be provided
via `extra_start_params`
"""
warm_start: pd.DataFrame | None = None
"""Pandas dataframe that contains score and paramter information that will be
automatically loaded into the memory.
example:
score x1 x2 x...
0.756 0.1 0.2 ...
0.823 0.3 0.1 ...
... ... ... ...
... ... ... ...
"""
verbosity: Literal["progress_bar", "print_results", "print_times"] | bool = False
"""Determines what part of the optimization information will be printed."""
seed: int | None = None
"""Random seed for reproducibility."""
# ==================================================================================
# Population Based
# ==================================================================================
[docs]
@mark.minimizer(
name="gfo_pso",
solver_type=AggregationLevel.SCALAR,
is_available=IS_GRADIENT_FREE_OPTIMIZERS_INSTALLED,
is_global=True,
needs_jac=False,
needs_hess=False,
needs_bounds=True,
supports_parallelism=False,
supports_bounds=True,
supports_infinite_bounds=False,
supports_linear_constraints=False,
supports_nonlinear_constraints=False,
disable_history=False,
)
@dataclass(frozen=True)
class GFOParticleSwarmOptimization(Algorithm, GFOCommonOptions):
"""Minimize a scalar function using the Particle Swarm Optimization algorithm.
This algorithm is a Python implementation of the Particle Swarm Optimization
algorithm through the gradient_free_optimizers package.
Particle Swarm Optimization is a global population based algorithm.
The algorithm simulates a swarm of particles which move according to their own
inertia across the search space.
Each particle adjusts its position based on its own experience (cognitive weight)
and the experiences of its neighbors or the swarm (social weight), using
velocity updates.
The algorithm iteratively guides the swarm toward promising regions of the
search space.
The velocity of a particle is calculated by the following
equation:
.. math::
v_{n+1} = \\omega \\cdot v_n + c_k \\cdot r_1 \\cdot (p_{best}-p_n)
+ c_s \\cdot r_2 \\cdot (g_{best} - p_n)
"""
population_size: PositiveInt | None = None
"""Size of the population."""
inertia: NonNegativeFloat = 0.5 / math.log(2.0)
"""The inertia of the movement of the individual particles in the population."""
cognitive_weight: NonNegativeFloat = 0.5 + math.log(2.0)
"""A factor of the movement towards the personal best position of the individual
particles in the population."""
social_weight: NonNegativeFloat = 0.5 + math.log(2.0)
"""A factor of the movement towards the global best position of the individual
particles in the population."""
rand_rest_p: NonNegativeFloat = 0.01
"""Probability for the optimization algorithm to jump to a random position in an
iteration step."""
def _solve_internal_problem(
self, problem: InternalOptimizationProblem, x0: NDArray[np.float64]
) -> InternalOptimizeResult:
import gradient_free_optimizers as gfo
population_size = get_population_size(
population_size=self.population_size, x=x0, lower_bound=10
)
opt = gfo.ParticleSwarmOptimizer
optimizer = partial(
opt,
population=population_size,
inertia=self.inertia,
cognitive_weight=self.cognitive_weight,
social_weight=self.social_weight,
rand_rest_p=self.rand_rest_p,
)
res = _gfo_internal(
common_options=self,
problem=problem,
x0=x0,
optimizer=optimizer,
)
return res
[docs]
@mark.minimizer(
name="gfo_parallel_tempering",
solver_type=AggregationLevel.SCALAR,
is_available=IS_GRADIENT_FREE_OPTIMIZERS_INSTALLED,
is_global=True,
needs_jac=False,
needs_hess=False,
needs_bounds=True,
supports_parallelism=False,
supports_bounds=True,
supports_infinite_bounds=False,
supports_linear_constraints=False,
supports_nonlinear_constraints=False,
disable_history=False,
)
@dataclass(frozen=True)
class GFOParallelTempering(Algorithm, GFOCommonOptions):
"""Minimize a scalar function using the Parallel Tempering algorithm.
This algorithm is a Python implementation of the Parallel Tempering
algorithm through the gradient_free_optimizers package.
Parallel Tempering is a global optimization algorithm that is inspired by
metallurgical annealing.
It runs multiple optimizer instances at different
"starting temperatures" in parallel. Periodically, swaps between these runs are
attempted. Swaps between optimization runs at different temperatures allow the
optimizer to overcome local optima.
The probability of swapping temperatures for any combination of optimizer instances
is given by.
.. math::
p = \\min \\left( 1, \\exp\\left[{(\\text{score}_i-
\\text{score}_j)\\left(\\frac{1}{T_i}-\\frac{1}{T_j}\\right)}\\right] \\right)
"""
population_size: PositiveInt | None = None
"""Size of the population."""
n_iter_swap: PositiveInt = 10
"""The number of iterations the algorithm performs before switching temperatures of
the individual optimizers in the population."""
rand_rest_p: NonNegativeFloat = 0
"""Probability for the optimization algorithm to jump to a random position in an
iteration step."""
def _solve_internal_problem(
self, problem: InternalOptimizationProblem, x0: NDArray[np.float64]
) -> InternalOptimizeResult:
import gradient_free_optimizers as gfo
population_size = get_population_size(
population_size=self.population_size, x=x0, lower_bound=10
)
opt = gfo.ParallelTemperingOptimizer
optimizer = partial(
opt,
population=population_size,
n_iter_swap=self.n_iter_swap,
rand_rest_p=self.rand_rest_p,
)
res = _gfo_internal(
common_options=self,
problem=problem,
x0=x0,
optimizer=optimizer,
)
return res
[docs]
@mark.minimizer(
name="gfo_spiral_optimization",
solver_type=AggregationLevel.SCALAR,
is_available=IS_GRADIENT_FREE_OPTIMIZERS_INSTALLED,
is_global=True,
needs_jac=False,
needs_hess=False,
needs_bounds=True,
supports_parallelism=False,
supports_bounds=True,
supports_infinite_bounds=False,
supports_linear_constraints=False,
supports_nonlinear_constraints=False,
disable_history=False,
)
@dataclass(frozen=True)
class GFOSpiralOptimization(Algorithm, GFOCommonOptions):
"""Minimize a scalar function using the Spiral Optimization algorithm.
This algorithm is a Python implementation of the Spiral Optimization
algorithm through the gradient_free_optimizers package.
Spiral Optimization is a population-based algorithm, in which a number of particles
move in a spiral-like pattern to explore the search space and converge to the
best known position as the spiral decays.
The position of each particle is updated according to the following equation:
.. math::
x_i (k+1) = x^* (k) + r(k) \\cdot R(\\theta) \\cdot (x_i(k)- x^*(k))
where:
- `k` = k-th iteration
- `x_i(k)` = current position.
- `x*(k)` = center position (known best position of all particles)
- `r(k)` = decay rate ,
- `R` = rotation matrix.
and rotation matrix R is given by
.. math::
R(\\theta) = \\begin{bmatrix}
0^{\\top}_{n-1} & -1 \\\\
I_{n-1} & 0_{n-1}
\\end{bmatrix}
"""
population_size: PositiveInt | None = None
"""Size of the population."""
decay_rate: NonNegativeFloat = 0.99
"""The decay rate `r` is a factor, by which the radius of the spiral movement of the
particles decays during their spiral movement.
Lower values accelerate the convergence of the particles to the best known position,
while values above 1 eventually lead to a movement where the particles spiral away
from each other. Typical range: 0.85 to 1.15.
"""
rand_rest_p: NonNegativeFloat = 0
"""Probability for the optimization algorithm to jump to a random position in an
iteration step."""
def _solve_internal_problem(
self, problem: InternalOptimizationProblem, x0: NDArray[np.float64]
) -> InternalOptimizeResult:
import gradient_free_optimizers as gfo
population_size = get_population_size(
population_size=self.population_size, x=x0, lower_bound=10
)
opt = gfo.SpiralOptimization
optimizer = partial(
opt,
population=population_size,
decay_rate=self.decay_rate,
)
res = _gfo_internal(
common_options=self,
problem=problem,
x0=x0,
optimizer=optimizer,
)
return res
[docs]
@mark.minimizer(
name="gfo_genetic_algorithm",
solver_type=AggregationLevel.SCALAR,
is_available=IS_GRADIENT_FREE_OPTIMIZERS_INSTALLED,
is_global=True,
needs_jac=False,
needs_hess=False,
needs_bounds=True,
supports_parallelism=False,
supports_bounds=True,
supports_infinite_bounds=False,
supports_linear_constraints=False,
supports_nonlinear_constraints=False,
disable_history=False,
)
@dataclass(frozen=True)
class GFOGeneticAlgorithm(Algorithm, GFOCommonOptions):
"""Minimize a scalar function using the Genetic Algorithm.
This algorithm is a Python implementation of the Genetic Algorithm through the
gradient_free_optimizers package.
The Genetic Algorithm is an evolutionary algorithm inspired by the process of
natural selection. It evolves a population of candidate solutions over generations
using mechanisms like selection, crossover, and mutation of genes(bits) to find the
best solution.
"""
population_size: PositiveInt | None = None
"""Size of the population."""
mutation_rate: ProbabilityFloat = 0.5
"""Probability of a mutation event occurring in an individual of the population.
Mutation helps in maintaining genetic diversity within the population and prevents
the algorithm from getting stuck in local optima. Bits are randomly altered with.
.. math::
x'_i =
\\begin{cases}
x_i & \\text{if } \\text{rand} > p_m \\\\
1 - x_i & \\text{if } \\text{rand} \\leq p_m
\\end{cases}
where p_m is mutation_rate.
"""
crossover_rate: ProbabilityFloat = 0.5
"""Probability of a crossover event occurring between two parents. A higher
crossover rate increases the diversity of the offspring, which can help in exploring
the search space more effectively. Crossover happens with.
.. math::
u_{i,j}^{(g)} =
\\begin{cases}
v_{i,j}^{(g)} & \\text{if } \\text{rand}_j \\leq C_r \\text{ or } j =
j_{\\text{rand}} \\\\
x_{i,j}^{(g)} & \\text{otherwise}
\\end{cases}
where C_r is crossover_rate .
"""
n_parents: PositiveInt = 2
"""The number of parents selected from the current population to participate in the
crossover process to produce offspring.
By default, pairs of parents are selected to generate new offspring.
"""
n_offsprings: PositiveInt = 10
"""The number of offsprings generated in each generation through the processes of
crossover and mutation.
Typically, the number of offspring is equal to the population size, ensuring that
the population size remains constant over generations.
"""
def _solve_internal_problem(
self, problem: InternalOptimizationProblem, x0: NDArray[np.float64]
) -> InternalOptimizeResult:
import gradient_free_optimizers as gfo
population_size = get_population_size(
population_size=self.population_size, x=x0, lower_bound=10
)
opt = gfo.GeneticAlgorithmOptimizer
optimizer = partial(
opt,
population=population_size,
mutation_rate=self.mutation_rate,
crossover_rate=self.crossover_rate,
n_parents=self.n_parents,
offspring=self.n_offsprings,
)
res = _gfo_internal(
common_options=self,
problem=problem,
x0=x0,
optimizer=optimizer,
)
return res
[docs]
@mark.minimizer(
name="gfo_evolution_strategy",
solver_type=AggregationLevel.SCALAR,
is_available=IS_GRADIENT_FREE_OPTIMIZERS_INSTALLED,
is_global=True,
needs_jac=False,
needs_hess=False,
needs_bounds=True,
supports_parallelism=False,
supports_bounds=True,
supports_infinite_bounds=False,
supports_linear_constraints=False,
supports_nonlinear_constraints=False,
disable_history=False,
)
@dataclass(frozen=True)
class GFOEvolutionStrategy(Algorithm, GFOCommonOptions):
"""Minimize a scalar function using the Evolution Strategy algorithm.
This algorithm is a Python implementation of the Evolution Strategy algorithm
through the gradient_free_optimizers package.
Evolution Strategy is a evolutionary algorithm inspired by natural evolution and
work by iteratively improving a population of candidate solutions through mutation,
crossover, and selection.
A population of parents generates offspring, and only the fittest individuals
from both parents and offspring are selected to form the next generation.
The algorithm uses both mutation and crossover to create new candidate solutions.
The choice between mutation and crossover is determined probabilistically based on
their respective rates in the following way.
.. math::
\\text{total_rate} = \\text{mutation_rate} + \\text{crossover_rate}
.. math::
R = \\text{random_float} (0 ... \\text{total_rate})
.. code-block::
if R <= mutation-rate:
do mutation
else:
do crossover
"""
population_size: PositiveInt | None = None
"""Size of the population."""
stopping_maxiter: PositiveInt = STOPPING_MAXFUN_GLOBAL
"""Maximum number of iterations."""
mutation_rate: ProbabilityFloat = 0.7
"""Probability of a mutation event occurring in an individual."""
crossover_rate: ProbabilityFloat = 0.3
"""Probability of an individual to perform a crossover with the best individual in
the population."""
rand_rest_p: NonNegativeFloat = 0
"""Probability for the optimization algorithm to jump to a random position in an
iteration step."""
def _solve_internal_problem(
self, problem: InternalOptimizationProblem, x0: NDArray[np.float64]
) -> InternalOptimizeResult:
import gradient_free_optimizers as gfo
population_size = get_population_size(
population_size=self.population_size, x=x0, lower_bound=10
)
opt = gfo.EvolutionStrategyOptimizer
optimizer = partial(
opt,
population=population_size,
mutation_rate=self.mutation_rate,
crossover_rate=self.crossover_rate,
rand_rest_p=self.rand_rest_p,
)
res = _gfo_internal(
common_options=self,
problem=problem,
x0=x0,
optimizer=optimizer,
)
return res
[docs]
@mark.minimizer(
name="gfo_differential_evolution",
solver_type=AggregationLevel.SCALAR,
is_available=IS_GRADIENT_FREE_OPTIMIZERS_INSTALLED,
is_global=True,
needs_jac=False,
needs_hess=False,
needs_bounds=True,
supports_parallelism=False,
supports_bounds=True,
supports_infinite_bounds=False,
supports_linear_constraints=False,
supports_nonlinear_constraints=False,
disable_history=False,
)
@dataclass(frozen=True)
class GFODifferentialEvolution(Algorithm, GFOCommonOptions):
"""Minimize a scalar function using the Differential Evolution algorithm.
This algorithm is a Python implementation of the Differential Evolution
algorithm through the gradient_free_optimizers package.
Differential Evolution is a population-based optimization algorithm that
creates iteratively improves a population of candidate solutions by combining and
perturbing them based on their differences.
It creates new
positions in the search space by adding the weighted difference between two
individuals in the population to a third individual creating trial solutions that
are evaluated for their fitness and if a trial solution is better than the target
it replaces, ensures continual improvement.
A new trial solution is generated according to:
.. math::
x_{trial} = x_{r1} + F \\cdot (x_{r2} - x_{r3})
where :math:`r1, r2, r3` are random individuals from the population, and
:math:`F` is the differential weight or mutation_rate.
"""
population_size: PositiveInt | None = None
"""Size of the population."""
mutation_rate: ProbabilityFloat = 0.9
r"""Probability of a mutation event occurring in an individual.
The mutation rate influences the algorithm's ability to explore the search space.
A higher value of mutation_rate also called the differential weight `F` increases
the diversity of the mutant individuals, leading to broader exploration,
while a lower value encourages convergence by making smaller adjustments.
.. math::
\mathbf{v}_{i,G+1} = \mathbf{x}_{r1,G} + F \cdot (\mathbf{x}_{r2,G} -
\mathbf{x}_{r3,G})
"""
crossover_rate: ProbabilityFloat = 0.9
"""Probability of a crossover event occurring between two parents. It determines how
much of the trial vector inherits its components from the mutant individual versus
the target individual. A high crossover rate means that more components will come
from the mutant individual, promoting exploration of new solutions. Conversely, a
low crossover rate results in more components being taken from the target
individual, which can help maintain existing solutions and refine them.
.. math::
u_{i,j,G+1} =
\\begin{cases}
v_{i,j,G+1} & \\text{if } \\text{rand}_j(0,1) \\leq CR \\text{ or } j =
j_{\\text{rand}} \\\\
x_{i,j,G} & \\text{otherwise}
\\end{cases}
"""
def _solve_internal_problem(
self, problem: InternalOptimizationProblem, x0: NDArray[np.float64]
) -> InternalOptimizeResult:
import gradient_free_optimizers as gfo
population_size = get_population_size(
population_size=self.population_size, x=x0, lower_bound=10
)
opt = gfo.DifferentialEvolutionOptimizer
optimizer = partial(
opt,
population=population_size,
mutation_rate=self.mutation_rate,
crossover_rate=self.crossover_rate,
)
res = _gfo_internal(
common_options=self,
problem=problem,
x0=x0,
optimizer=optimizer,
)
return res
# ==================================================================================
# Helper functions
# ==================================================================================
def _gfo_internal(
common_options: GFOCommonOptions,
problem: InternalOptimizationProblem,
x0: NDArray[np.float64],
optimizer: BaseOptimizer,
) -> InternalOptimizeResult:
"""Internal helper function.
Define the search space and inital params, define the objective function and run
optimization.
"""
# Use common options from GFOCommonOptions
common = common_options
# set early stopping criterion
early_stopping = {
"n_iter_no_change": common.convergence_iter_noimprove,
"tol_abs": common.convergence_ftol_abs,
"tol_rel": common.convergence_ftol_rel,
}
# define search space, initial params, initial_population and constraints
opt = optimizer(
search_space=_get_search_space_gfo(
problem.bounds,
common.n_grid_points,
problem.converter,
),
initialize=_get_initialize_gfo(
x0, common.n_init, common.extra_start_params, problem.converter
),
constraints=_get_gfo_constraints(),
random_state=common.seed,
)
# define objective function, negate to perform minimize
def objective_function(para: dict[str, float]) -> float | NDArray[np.float64]:
x = np.array(opt.conv.para2value(para))
return -problem.fun(x)
# negate in case of minimize
convergence_target_value = (
-1 * common.convergence_target_value
if common.convergence_target_value is not None
else None
)
# run optimization
opt.search(
objective_function=objective_function,
n_iter=common.stopping_maxiter,
max_time=common.stopping_maxtime,
max_score=convergence_target_value,
early_stopping=early_stopping,
memory=common.caching,
memory_warm_start=common.warm_start,
verbosity=common.verbosity,
)
return _process_result_gfo(opt)
def _get_search_space_gfo(
bounds: InternalBounds, n_grid_points: PositiveInt | PyTree, converter: Converter
) -> dict[str, NDArray[np.float64]]:
"""Create search space.
Args:
bounds: Internal Bounds
n_grid_points: number of grid points in each dimension
Returns:
dict: search_space dictionary
"""
search_space = {}
if bounds.lower is not None and bounds.upper is not None:
dim = len(bounds.lower)
upper = bounds.upper
lower = bounds.lower
if isinstance(n_grid_points, int):
n_grid_points = [n_grid_points] * dim
else:
n_grid_points = list(map(int, converter.params_to_internal(n_grid_points)))
for i in range(dim):
search_space[f"x{i}"] = np.linspace(lower[i], upper[i], n_grid_points[i])
return search_space
def _get_gfo_constraints() -> list[Any]:
"""Process constraints."""
return []
def _get_initialize_gfo(
x0: NDArray[np.float64],
n_init: PositiveInt,
extra_start_points: list[PyTree] | None,
converter: Converter,
) -> dict[str, Any]:
"""Set initial params x0, additional start params for the optimization run or the
initial_population. Here, warm_start is actually extra_start_params.
Args:
x0: initial param
Returns:
dict: initialize dictionary with initial parameters set
"""
init = _value2para(x0)
x_list = [init]
if extra_start_points is not None:
internal_values = [converter.params_to_internal(x) for x in extra_start_points]
extra_start_points = [_value2para(x) for x in internal_values]
x_list += extra_start_points
initialize = {
"warm_start": x_list,
"vertices": n_init // 2,
"grid": n_init // 2,
}
return initialize
def _process_result_gfo(opt: "BaseOptimizer") -> InternalOptimizeResult:
"""Process result.
Args:
opt: Optimizer instance after optimization run is complete
Returns:
InternalOptimizeResult: Internal optimization result.
"""
res = InternalOptimizeResult(
x=np.array(opt.best_value),
fun=-opt.best_score, # negate once again
success=True,
n_fun_evals=len(opt.eval_times),
n_jac_evals=0,
n_hess_evals=0,
n_iterations=opt.n_iter_search,
)
return res
def _value2para(x: NDArray[np.float64]) -> dict[str, float]:
"""Convert values to dict.
Args:
x: Array of parameter values
Returns:
dict: Dictionary of parameter values with key-value pair as { x{i} : x[i]}
"""
para = {}
for i in range(len(x)):
para[f"x{i}"] = x[i]
return para