"""
Implements various Optimizations in Atomica
This module implements the :class:`Optimization` class, which contains the
information required to perform an optimization in Atomica. An Optimization
effectively serves as a mapping from one set of program instructions to another.
"""
import logging
import pickle
from collections import defaultdict
import atomica
import numpy as np
import scipy.optimize
import sciris as sc
from .cascade import get_cascade_vals
from .model import Model, Link
from .parameters import ParameterSet
from .programs import ProgramSet, ProgramInstructions
from .results import Result
from .system import logger, NotFoundError
from .utils import NamedItem
from .utils import TimeSeries
__all__ = ["InvalidInitialConditions", "UnresolvableConstraint", "FailedConstraint", "Adjustable", "Adjustment", "SpendingAdjustment", "StartTimeAdjustment", "ExponentialSpendingAdjustment", "SpendingPackageAdjustment", "PairedLinearSpendingAdjustment", "Measurable", "MinimizeMeasurable", "MaximizeMeasurable", "AtMostMeasurable", "AtLeastMeasurable", "IncreaseByMeasurable", "DecreaseByMeasurable", "MaximizeCascadeStage", "MaximizeCascadeConversionRate", "Constraint", "TotalSpendConstraint", "Optimization", "optimize"]
[docs]
class InvalidInitialConditions(Exception):
"""
Invalid initial parameter values
This error gets thrown if the initial conditions yield an objective value
that is not finite
"""
pass
[docs]
class UnresolvableConstraint(Exception):
"""
Unresolvable (ill-posed) constraint
This error gets thrown if it is _impossible_ to satisfy the constraints. There are
two modes of constraint failure
- The constraint might not be satisfied on this iteration, but could be satisfied by other
parameter values
- The constraint is impossible to satisfy because it is inconsistent (for example, if the
total spend is greater than the sum of the upper bounds on all the individual programs)
in which case the algorithm cannot continue
This error gets raised in the latter case, while the former should result in the iteration
being skipped
"""
pass
[docs]
class FailedConstraint(Exception):
"""
Not possible to apply constraint
This error gets raised if a ``Constraint`` is unable to transform the instructions
given the supplied parameter values (but other values may be acceptable). It signals
that the algorithm should proceed immediately to the next iteration.
"""
pass
[docs]
class Adjustable:
"""
Class to store single optimizable parameter
An ``Adjustable`` represents a single entry in the ASD matrix. An ``Adjustment`` uses one or more
``Adjustables`` to make changes to the program instructions.
:param name: The name of the adjustable
:param limit_type: If bounds are provided, are they relative or absolute (``'abs'`` or ``'rel'``)
:param lower_bound: Optionally specify minimum value
:param upper_bound: Optionally specify maximum value
:param initial_value: Optionally specify initial value. Most commonly, the initial value would be set by the ``Adjustment`` containing the ``Adjustable``
"""
def __init__(self, name, limit_type="abs", lower_bound=-np.inf, upper_bound=np.inf, initial_value=None):
self.name = name
self.limit_type = limit_type
self.lower_bound = lower_bound
self.upper_bound = upper_bound
self.initial_value = initial_value
[docs]
def get_hard_bounds(self, x0: float = None) -> tuple:
"""
Return hard bounds for the adjustable
The hard bounds could be relative or absolute. If they are relative, then they are treated
as being relative to the initial value for this adjustable. This is because not all adjustables
can be directly drawn from the program instructions (e.g. parametric overwrites) in which case
it would not be possible to extract initial values from the instructions alone. For consistency,
all adjustables behave the same way with constraints relative to their initialization.
:param x0: The reference value for relative constraints - only required if the limit type is not absolute
:return: A tuple with ``(min,max)`` limits
"""
self.lower_bound = self.lower_bound if self.lower_bound is not None else -np.inf
self.upper_bound = self.upper_bound if self.upper_bound is not None else np.inf
xmin = self.lower_bound if self.limit_type == "abs" else x0 * self.lower_bound
xmax = self.upper_bound if self.limit_type == "abs" else x0 * self.upper_bound
return xmin, xmax
[docs]
class Adjustment:
"""
Class to represent changes to instructions
An ``Adjustment`` represents a change to program instructions, governed by one or more ``Adjustables``.
This base class specifies the interface for all ``Adjustments``
:param name: The name of the adjustment
"""
def __init__(self, name):
self.name = name
self.adjustables = list() #: A list of ``Adjustables``
[docs]
def get_initialization(self, progset: ProgramSet, instructions: ProgramInstructions) -> list:
"""
Return initial values for ASD
:param progset: The ``ProgramSet`` being used for the optimization
:param instructions: The initial instructions
:return: A list of initial values, one for each adjustable
"""
return [x.initial_value for x in self.adjustables]
def update_instructions(self, adjustable_values, instructions: ProgramInstructions):
# adjustable_values contains the values for each adjustable in self.adjustables
# at the current ASD iteration. This function updates the provided instructions in place
return
[docs]
class SpendingAdjustment(Adjustment):
"""
Adjust program spending
This adjustment class represents making a spending quantity adjustable. By default, the
base class simply overwrites the spending value at a particular point in time
A SpendingAdjustment has a separate Adjustable for each time reached (independently)
:param prog_name: The code name of a program
:param t: A single time, or list/array of times at which to make adjustments
:param limit_type: Interpret ``lower`` and ``upper`` as absolute or relative limits (should be ``'abs'`` or ``'rel'``)
:param lower: Lower bound (0 by default). A single value (used for all times) or a list/array the same length as ``t``
:param upper: Upper bound (``np.inf`` by default). A single value (used for all times) or a list/array the same length as ``t``
:param initial: Optionally specify the initial value, either as a scalar or list/array the same length as ``t``. If not specified,
the initial spend will be drawn from the program instructions, or the progset.
"""
def __init__(self, prog_name, t, limit_type="abs", lower=0.0, upper=np.inf, initial=None):
Adjustment.__init__(self, name=prog_name)
self.prog_name = prog_name
self.t = sc.promotetoarray(t) # Time at which to apply the adjustment
lower = sc.promotetolist(lower, keepnone=True)
if len(lower) == 1:
lower = lower * len(self.t)
else:
assert len(lower) == len(self.t), "If supplying lower bounds, you must either specify one, or one for every time point"
upper = sc.promotetolist(upper, keepnone=True)
if len(upper) == 1:
upper = upper * len(self.t)
else:
assert len(upper) == len(self.t), "If supplying upper bounds, you must either specify one, or one for every time point"
initial = sc.promotetolist(initial, keepnone=True)
if len(initial) == 1:
initial = initial * len(self.t)
else:
assert len(initial) == len(self.t), "If supplying initial values, you must either specify one, or one for every time point"
self.adjustables = [Adjustable(prog_name, limit_type, lower_bound=lb, upper_bound=ub, initial_value=init) for lb, ub, init in zip(lower, upper, initial)]
def update_instructions(self, adjustable_values, instructions: ProgramInstructions):
# There is one Adjustable for each time point, so the adjustable_values
# are a list of this same length, one value for each time point
for i, t in enumerate(self.t):
if self.prog_name not in instructions.alloc:
instructions.alloc[self.prog_name] = TimeSeries(t=t, vals=adjustable_values[i])
else:
instructions.alloc[self.prog_name].insert(t, adjustable_values[i])
[docs]
def get_initialization(self, progset: ProgramSet, instructions: ProgramInstructions) -> list:
"""
Return initial values for ASD
The initial values correspond to either
- The explicitly specified initial spend
- The initial spend from the program set/instructions
Note that the initial spend is NOT clipped to any bounds. This is because the initial spend is in turn used to compute
relative spending constraints. If the initial spend is not consistent then an error will be subsequently raised
at that point.
:param progset: The ``ProgramSet`` being used for the optimization
:param instructions: The initial instructions
:return: A list of initial values, one for each adjustable
"""
initialization = []
for adjustable, t in zip(self.adjustables, self.t):
if adjustable.initial_value is not None:
initialization.append(adjustable.initial_value)
else:
alloc = progset.get_alloc(t, instructions)
initialization.append(alloc[self.prog_name][0]) # The Adjustable's name corresponds to the name of the program being overwritten.
return initialization
[docs]
class StartTimeAdjustment(Adjustment):
"""
Optimize program start year
This is an example of an Adjustment that does not target a spending value
"""
def __init__(self, name, lower, upper, initial):
Adjustment.__init__(self, name=name)
self.adjustables = [Adjustable("start_year", limit_type="abs", lower=lower, upper=upper, initial=initial)]
def update_instructions(self, adjustable_values, instructions: ProgramInstructions):
instructions.start_year = adjustable_values[0]
[docs]
def get_initialization(self, progset, instructions: ProgramInstructions):
if self.initial_value:
return self.initial_value
else:
return instructions.start_year
[docs]
class ExponentialSpendingAdjustment(Adjustment):
"""
Parametric overwrite example
This is an example of an Adjustment that uses a function of several variables to
compute time-dependent spending.
"""
# Example of a parametric time-varying budget where multiple parameters
# are mapped to spending via a function
def __init__(self, prog_name, t, t_0, t_end, p1, a1, a2):
Adjustment.__init__(self, name=prog_name)
self.prog_name = prog_name
self.t = t # Vector of time values instructions are updated at
self.t_0 = t_0 # non-adjustable parameter
self.t_end = t_end # non_adjustable parameter
self.adjustables = [Adjustable("p1", initial_value=p1), Adjustable("a1", initial_value=a1), Adjustable("a2", initial_value=a2)] # Would need to specify initial values and limits for these parameters
# Note - we can use get_initialization from the base class because all of the Adjustables
# have explicit initial values. Note that it's essential to provide explicit initial values
# for any Adjustable that does not explicitly appear in the instructions. Although in theory,
# it would be possible to write a `get_initialization` function that fits the Adjustables
# to the initial spending...
def update_instructions(self, adjustable_values, instructions: ProgramInstructions):
# p1*exp(a1*(t-t_0))*exp(b1*(t-t_end))
instructions.alloc[self.prog_name][self.t] = adjustable_values[0] * np.exp(adjustable_values[1] * (self.t - self.t_0)) * np.exp(adjustable_values[2] * (self.t - self.t_end))
[docs]
class SpendingPackageAdjustment(Adjustment):
"""
Adjustment to set total spending on several programs
This adjustment can be used when it is important to constrain the sum of spending on multiple programs. For example,
a package could be defined for 'MDR short' and 'MDR standard' standard programs, where the adjustments would be the
total spend on MDR, and the fraction spent on MDR short.
"""
def __init__(self, package_name: str, t: float, prog_names: list, initial_spends: np.array, min_props: list = None, max_props: list = None, min_total_spend: float = None, max_total_spend: float = None, fix_props: bool = False):
"""
By default, the total spend on the package is constrained. In practice, it would usual to either
- Fix the proportions of the programs within the package
OR
- Fix the total spend on the package
Although a less common usage would be to constrain the ratio of the programs within the
package (e.g., program B must have no more than half the funding of program A).
Essentially, if none of `min_props`, `max_props`, `min_total_spend` or `max_total_spend` are
specified, the SpendingPackageAdjustment should function the same as simply optimizing each
intervention independently.
:param package_name: The name of this spending package
:param t: Year in which to apply spending adjustments
:param prog_names: List of program names to include in this package
:param initial_spends: Array of spending values for each program, same length as `prog_names`
:param min_props: (optional) List of minimum proportion to spend on each program, same length as ``prog_names``. Default: 0 for all programs
:param max_props: (optional) List of maximum proportion to spend on each program, same length as ``prog_names``. Default: 1 for all programs
:param min_total_spend: (optional) Minimum total spend. Default: equal to the sum of the initial spends
:param max_total_spend: (optional) Maximum total spend. Default: equal to the sum of the initial spends
:param fix_props: (optional) If True, do not allow initial spending proportion to change
"""
self.name = package_name
self.prog_name = prog_names
self.t = t
self.min_props = np.zeros(len(prog_names)) if min_props is None else sc.promotetoarray(min_props)
self.max_props = np.ones(len(prog_names)) if max_props is None else sc.promotetoarray(max_props)
self.initial_spends = sc.promotetoarray(initial_spends)
initial_total = self.initial_spends.sum()
assert self.min_props.sum() <= 1, "Constraining fractions of an intervention package where the minimum total is >1 is impossible"
assert self.max_props.sum() >= 1, "Constraining fractions of an intervention package where the maximum total is <1 is impossible"
self.adjustables = []
if initial_total == 0.0:
initial_props = np.array([1.0 / len(initial_spends) for _ in initial_spends]) # if there was no spending on the package initially, assume equal proportions for all components
else:
initial_props = self.initial_spends / initial_total
if not fix_props:
for program, initial_prop, lb, ub in zip(prog_names, initial_props, self.min_props, self.max_props):
assert initial_prop >= lb, f"SpendingProgramAdjustment: Initial spend on {program} is a smaller proportion ({initial_prop}) of the package than requested minimum proportion ({lb})"
assert initial_prop <= ub, f"SpendingProgramAdjustment: Initial spend on {program} is a larger proportion ({initial_prop}) of the package than requested maximum proportion ({ub})"
# set to the current proportion - note that the overall fractions won't end up being constrained so will have to be rescaled
# Similarly, the upper and lower bounds here will still need to be enforced later after scaling when updating instructions
self.adjustables.append(Adjustable("frac_" + program, initial_value=initial_prop, lower_bound=lb, upper_bound=ub))
else:
# If the initial proportions are not to be changed, then the min_props and max_props arguments have no effect. Warn the
# user if they have been provided and are going to be ignored (but not with logger.warning since this might be expected)
if self.min_props is not None:
logger.info("Minimum proportions are being ignored because the `fix_props` flag has been set")
if self.max_props is not None:
logger.info("Maximum proportions are being ignored because the `fix_props` flag has been set")
if min_total_spend is not None:
assert initial_total >= min_total_spend, "SpendingProgramAdjustment: Initial total spend is less than minimum constraint"
else:
min_total_spend = initial_total
if max_total_spend is not None:
assert initial_total <= max_total_spend, "SpendingProgramAdjustment: Initial total spend is greater than maximum constraint"
else:
max_total_spend = initial_total
# total spending can be adjusted as a single adjustable
if not (min_total_spend == initial_total and min_total_spend == max_total_spend):
self.adjustables.append(Adjustable("package_spend", initial_value=initial_total, lower_bound=min_total_spend, upper_bound=max_total_spend))
@property
def adjust_total_spend(self) -> bool:
"""
Returns True if the total spend on this package can be optimized
:return:
"""
if self.adjustables:
return self.adjustables[-1].name == "package_spend"
else:
return False
@property
def fix_props(self) -> bool:
if len(self.adjustables) == 0 or self.adjustables[0].name == "package_spend":
return True
else:
return False
def update_instructions(self, adjustable_values, instructions):
adjustable_values = sc.promotetoarray(adjustable_values)
if self.fix_props:
if self.initial_spends.sum() == 0:
fracs = np.array([1.0 / len(self.initial_spends) for _ in self.initial_spends]) # evenly distributed fractions though it seems unlikely that there would be constrained fractions of zero spending
else:
fracs = self.initial_spends / self.initial_spends.sum()
elif self.adjust_total_spend:
fracs = adjustable_values[:-1]
else:
fracs = adjustable_values
# Scale the fractions such that they add to 1
fracs = constrain_sum_bounded(fracs, 1, self.min_props, self.max_props)
# now that the fractions sum to 1 and don't violate any constraints, update the spending on each program
if self.adjust_total_spend:
total_spending = adjustable_values[-1] # last on the list
else:
total_spending = self.initial_spends.sum()
for pn, prog_name in enumerate(self.prog_name):
this_prog_spend = fracs[pn] * total_spending
if prog_name not in instructions.alloc:
instructions.alloc[prog_name] = TimeSeries(t=self.t, vals=this_prog_spend)
else:
instructions.alloc[prog_name].insert(t=self.t, v=this_prog_spend)
def get_total_spend(self, instructions):
return sum([instructions.alloc[prog_name].get(self.t) for prog_name in self.prog_name])
def set_total_spend(self, instructions, total_spend):
if self.get_total_spend(instructions) > 0:
spend_factor = total_spend / self.get_total_spend(instructions)
else:
spend_factor = 0.0 # if total spending is zero, spending on each program must be zero?
for prog in self.prog_name:
ts = instructions.alloc[prog]
ts.insert(t=self.t, v=ts.get(self.t) * spend_factor)
[docs]
class PairedLinearSpendingAdjustment(Adjustment):
"""
Parametric overwrite with multiple programs
This example Adjustment demonstrates a parametric time-varying budget reaching more than one
program. A single adjustable corresponding to the rate of change simultaneously acts on
two programs in opposite directions
"""
def __init__(self, prog_names, t):
Adjustment.__init__(self, name=None)
assert len(prog_names) == 2, "PairedLinearSpendingAdjustment needs exactly two program names"
self.prog_name = prog_names # Store names in the attribute 'prog_name' so that TotalSpendConstraint picks this up
self.t = t # [t_start,t_stop] for when to start/stop ramping
self.adjustables = [Adjustable("ramp", initial_value=0.0)]
def update_instructions(self, adjustable_values, instructions: ProgramInstructions):
gradient = adjustable_values[0]
tspan = self.t[1] - self.t[0]
if gradient < 0: # We are transferring money from program 2 to program 1
available = -instructions.alloc[self.prog_name[1]].get(self.t[0])
max_gradient = float(available) / tspan
if gradient < max_gradient:
gradient = max_gradient
else:
available = instructions.alloc[self.prog_name[0]].get(self.t[0])
max_gradient = float(available) / tspan
if gradient > max_gradient:
gradient = max_gradient
funding_change = gradient * tspan # Amount transferred from program 1 to program 2
# This does not change the amount of funds allocated in the initial year
for prog in self.prog_name:
instructions.alloc[prog].insert(self.t[0], instructions.alloc[prog].interpolate(self.t[0])[0])
instructions.alloc[prog].remove_between(self.t)
instructions.alloc[self.prog_name[0]].insert(self.t[1], instructions.alloc[self.prog_name[0]].get(self.t[0]) - funding_change)
instructions.alloc[self.prog_name[1]].insert(self.t[1], instructions.alloc[self.prog_name[1]].get(self.t[0]) + funding_change)
[docs]
class Measurable:
"""
Optimization objective
A ``Measurable`` is a class that returns an objective value based on a simulated
``Model`` object. It takes in a ``Model`` and returns a scalar value. Often, an optimization
may contain multiple ``Measurable`` objects, and the objective value returned by each of them
is summed together.
:param measurable_name: The base measurable class accepts the name of a program (for spending) or a quantity supported by ``Population.get_variable()``
:param t: Single year, or a list of two start/stop years. If specifying a single year, that year must appear in the simulation output.
The quantity will be summed over all simulation time points
:param pop_names: The base `Measurable` class takes in the names of the populations to use. If multiple populations are provided, the objective will be added across the named populations
:param weight: The weight factor multiplies the quantity
"""
def __init__(self, measurable_name, t, pop_names=None, weight=1.0):
self.measurable_name = measurable_name
self.t = sc.promotetoarray(t)
assert len(self.t) <= 2, "Measurable time must either be a year, or the `[low,high)` values defining a period of time"
self.weight = weight
self.pop_names = pop_names
def eval(self, model, baseline):
# This is the main interface with the optimization code - this function gets called
# to return the transformed, weighted objective value from this Measurable for use in ASD
# Only overload this if you want to customize the transformation and weighting
return self.weight * self.get_objective_val(model, baseline)
[docs]
def get_baseline(self, model):
"""
Return cached baseline values
Similar to ``get_hard_constraint``, sometimes a relative ``Measurable`` might be desired e.g. 'Reduce deaths by at least 50%'.
In that case, we need to perform a procedure similar to getting a hard constraint, where the ``Measurable`` receives an initial
``Model`` object and extracts baseline data for subsequent use in ``get_objective_val``.
Thus, the output of this function is paired to its usage in ``get_objective_val``.
:param model:
:return: The value to pass back to the ``Measurable`` during optimization
"""
return None
[docs]
def get_objective_val(self, model: Model, baseline) -> float:
"""
Return objective value
This method should return the _unweighted_ objective value. Note that further transformation may occur
:param model: A ``Model`` object after integration
:param baseline: The baseline variable returned by this ``Measurable`` at the start of optimization
:return: A scalar objective value
"""
# This returns the base objective value, prior to any function transformation
# or weighting. The function transformation and weighting are handled by this base
# class - derived classes may wish to not expose things like the function mapping
# in their constructors, if that wouldn't be appropriate. But otherwise, they can
# inherit this behaviour for free. So derived classes should overload this method
#
# The base class has the default behaviour that the 'measurable name' is a model variable
if len(self.t) == 1:
t_filter = model.t == self.t # boolean vector for whether to use the time point or not. This could be relaxed using interpolation if needed, but safer not to unless essential
else:
t_filter = (model.t >= self.t[0]) & (model.t < self.t[1]) # Don't include upper bound, so [2018,2019] will include exactly one year
if self.measurable_name in model.progset.programs:
alloc = model.progset.get_alloc(model.t, model.program_instructions)
val = np.sum(alloc[self.measurable_name][t_filter])
else: # If the measurable is a model output...
val = 0.0
matched = False # Flag whether any variables were found
for pop in model.pops:
if not self.pop_names:
# If no pops were provided, then iterate over all pops but skip those where the measureable is not defined
# Use this approach rather than checking the pop type in the framework because user could be optimizating
# flow rates or transitions that don't appear in the framework
try:
vars = pop.get_variable(self.measurable_name)
matched = True
except NotFoundError:
continue
elif pop not in self.pop_names:
continue
else:
vars = pop.get_variable(self.measurable_name) # If variable is missing and the pop was explicitly defined, raise the error
matched = True
for var in vars:
if isinstance(var, Link):
val += np.sum(var.vals[t_filter] / var.dt) # Annualize link values - usually this won't make a difference, but it matters if the user mixes Links with something else in the objective
else:
val += np.sum(var.vals[t_filter])
if not matched:
# Raise an error if the measureable was not found in any populations
raise Exception('"%s" not found in any populations' % (self.measurable_name))
return val
[docs]
class MinimizeMeasurable(Measurable):
# Syntactic sugar for a measurable that minimizes its quantity
def __init__(self, measurable_name, t, pop_names=None):
Measurable.__init__(self, measurable_name, t=t, weight=1, pop_names=pop_names)
[docs]
class MaximizeMeasurable(Measurable):
# Syntactic sugar for a measurable that maximizes its quantity
def __init__(self, measurable_name, t, pop_names=None):
Measurable.__init__(self, measurable_name, t=t, weight=-1, pop_names=pop_names)
[docs]
class AtMostMeasurable(Measurable):
"""
Enforce quantity is below a value
This Measurable imposes a penalty if the quantity is larger than some threshold
The initial points should be 'valid' in the sense that the quantity starts out
below the threshold (and during optimization it will never be allowed to cross
the threshold).
Typically, this Measurable would be used in conjunction with other measurables -
for example, optimizing one quantity while ensuring another quantity does not
cross a threshold.
The measurable returns ``np.inf`` if the condition is violated, and ``0.0`` otherwise.
"""
def __init__(self, measurable_name, t, threshold, pop_names=None):
Measurable.__init__(self, measurable_name, t=t, weight=np.inf, pop_names=pop_names)
self.weight = 1.0
self.threshold = threshold
[docs]
def get_objective_val(self, model, baseline):
val = Measurable.get_objective_val(self, model, baseline)
return np.inf if val > self.threshold else 0.0
def __repr__(self):
return "AtMostMeasurable(%s < %f)" % (self.measurable_name, self.threshold)
[docs]
class AtLeastMeasurable(Measurable):
"""
Enforce quantity exceeds a value
This Measurable imposes a penalty if the quantity is smaller than some threshold
The initial points should be 'valid' in the sense that the quantity starts out
above the threshold (and during optimization it will never be allowed to cross
the threshold)
Typically, this Measurable would be used in money minimization in conjunction
with measurables that aim to minimize spending.
The measurable returns ``np.inf`` if the condition is violated, and ``0.0`` otherwise.
"""
def __init__(self, measurable_name, t, threshold, pop_names=None):
Measurable.__init__(self, measurable_name, t=t, weight=np.inf, pop_names=pop_names)
self.weight = 1.0
self.threshold = threshold
[docs]
def get_objective_val(self, model, baseline):
val = Measurable.get_objective_val(self, model, baseline)
return np.inf if val < self.threshold else 0.0
def __repr__(self):
return "AtLeastMeasurable(%s > %f)" % (self.measurable_name, self.threshold)
[docs]
class IncreaseByMeasurable(Measurable):
"""
Increase quantity by percentage
This Measurable stores the value of a quantity using the original instructions.
It then requires that there is a minimum increase in the value of the quantity
during optimization. For example
>>> IncreaseByMeasurable('alive',2030,0.05)
This Measurable would correspond to an increase of 5% in the number of people
alive in 2030.
The measurable returns ``np.inf`` if the condition is violated, and ``0.0`` otherwise.
:param measurable_name: The base measurable class accepts the name of a program (for spending) or a quantity supported by ``Population.get_variable()``
:param t: Single year, or a list of two start/stop years. If specifying a single year, that year must appear in the simulation output.
The quantity will be summed over all simulation time points
:param increase: The amount by which to increase the measurable (e.g. 0.05 for a 5% increase). Use ``target_type='abs'`` to specify an absolute increase
:param pop_names: The base `Measurable` class takes in the names of the populations to use. If multiple populations are provided, the objective will be added across the named populations
:param target_type: Specify fractional 'frac' or absolute 'abs' increase (default is fractional)
"""
def __init__(self, measurable_name, t, increase, pop_names=None, target_type="frac"):
Measurable.__init__(self, measurable_name, t=t, weight=np.inf, pop_names=pop_names)
assert increase >= 0, "Cannot set negative increase"
self.weight = 1.0
self.increase = increase # Required increase
self.target_type = target_type
[docs]
def get_baseline(self, model) -> float:
return Measurable.get_objective_val(self, model, None) # Get the baseline value using the underlying Measurable
[docs]
def get_objective_val(self, model: Model, baseline: float) -> float:
val = Measurable.get_objective_val(self, model, None)
if self.target_type == "frac":
return np.inf if (val / baseline) < (1 + self.increase) else 0.0
elif self.target_type == "abs":
return np.inf if val < (baseline + self.increase) else 0.0
else:
raise Exception("Unknown target type")
[docs]
class DecreaseByMeasurable(Measurable):
"""
Decrease quantity by percentage
This Measurable stores the value of a quantity using the original instructions.
It then requires that there is a minimum increase in the value of the quantity
during optimization. For example
>>> DecreaseByMeasurable('deaths',2030,0.05)
This Measurable would correspond to an decrease of 5% in the number of deaths in 2030.
The measurable returns ``np.inf`` if the condition is violated, and ``0.0`` otherwise.
:param measurable_name: The base measurable class accepts the name of a program (for spending) or a quantity supported by ``Population.get_variable()``
:param t: Single year, or a list of two start/stop years. If specifying a single year, that year must appear in the simulation output.
The quantity will be summed over all simulation time points
:param decrease: The amount by which to decrease the measurable (e.g. 0.05 for a 5% decrease). Use ``target_type='abs'`` to specify an absolute decrease
:param pop_names: The base `Measurable` class takes in the names of the populations to use. If multiple populations are provided, the objective will be added across the named populations
:param target_type: Specify fractional 'frac' or absolute 'abs' decrease (default is fractional)
"""
def __init__(self, measurable_name, t, decrease, pop_names=None, target_type="frac"):
Measurable.__init__(self, measurable_name, t=t, weight=np.inf, pop_names=pop_names)
self.weight = 1.0
assert decrease >= 0, "Set positive value for magnitude of decrease e.g. 0.05 for a 5%% decrease"
assert target_type == "abs" or decrease <= 1, "Cannot decrease by more than 100%% - fractional decrease should be a value between 0 and 1"
self.decrease = decrease
self.target_type = target_type
[docs]
def get_baseline(self, model) -> float:
return Measurable.get_objective_val(self, model, None) # Get the baseline value using the underlying Measurable
[docs]
def get_objective_val(self, model: Model, baseline: float) -> float:
val = Measurable.get_objective_val(self, model, None)
if self.target_type == "frac":
return np.inf if (val / baseline) > (1 - self.decrease) else 0.0
elif self.target_type == "abs":
return np.inf if val > (baseline - self.decrease) else 0.0
else:
raise Exception("Unknown target type")
[docs]
class MaximizeCascadeStage(Measurable):
# This Measurable will maximize the number of people in a cascade stage, whatever that stage is
# e.g. `measurable = MaximizeCascadeStage('main',2020)
# If multiple years are provided, they will be summed over
# Could add option to weight specific years later on, if desired
def __init__(self, cascade_name, t, pop_names="all", weight=1.0, cascade_stage=-1):
# Instantiate thpop_names can be a single pop name (including all), or a list of pop names
# aggregations are supported by setting pop_names to a dict e.g.
# pop_names = {'foo':['0-4','5-14']}
#
# The cascade name is passed to `get_cascade_vals` so any cascade can be passed in
# Usually this would be 'None' (fallback cascade, or first cascade) or the name of a
# cascade
#
# - t : Can be a single time or array of times (as supposed by get_cascade_vals)
# - cascade_stage specifies which stage(s) to include. It can be
# - An integer indexing the `cascade_vals` dict e.g. -1 is the last stage
# - The name of a cascade stage
# - A list of integers and/or names to include multiple stages
Measurable.__init__(self, cascade_name, t=t, weight=-weight, pop_names=pop_names)
if not isinstance(cascade_stage, list):
cascade_stage = [cascade_stage]
self.cascade_stage = cascade_stage
if not isinstance(self.pop_names, list):
self.pop_names = [self.pop_names]
[docs]
def get_objective_val(self, model, baseline):
result = Result(model=model)
val = 0
for pop_name in self.pop_names:
cascade_vals = get_cascade_vals(result, self.measurable_name, pop_name, self.t)
for stage in self.cascade_stage: # Loop over included stages
val += np.sum(cascade_vals[0][stage]) # Add the values from the stage, summed over time
return val
[docs]
class MaximizeCascadeConversionRate(Measurable):
"""
Maximize overall conversion rate
Maximize conversion summed over all cascade stages
:param cascade_name: The name of one of the cascades in the Framework
:param t: A single time value e.g. 2020
:param pop_names: A single pop name (including 'all'), a list of populations,
or a dict/list of dicts, each with a single aggregation e.g. ``{'foo':['0-4','5-14']}``
:param weight: Weighting factor for this Measurable in the overall objective function
"""
def __init__(self, cascade_name, t: float, pop_names="all", weight=1.0):
Measurable.__init__(self, cascade_name, t=t, weight=-weight, pop_names=pop_names)
if not isinstance(self.pop_names, list):
self.pop_names = [self.pop_names]
[docs]
def get_objective_val(self, model, baseline):
if self.t < model.t[0] or self.t > model.t[-1]:
raise Exception("Measurable year for optimization (%d) is outside the simulation range (%d-%d)" % (self.t, model.t[0], model.t[-1]))
result = Result(model=model)
val = 0
for pop_name in self.pop_names:
cascade_vals = get_cascade_vals(result, self.measurable_name, pop_name, self.t)[0]
cascade_array = np.hstack(cascade_vals.values())
conversion = cascade_array[1:] / cascade_array[0:-1]
val += np.sum(conversion)
return val
[docs]
class Constraint:
"""
Store conditions to satisfy during optimization
A Constraint represents a condition that must be satisfied by the Instructions
after the cumulative effect of all adjustments. The Instructions are rescaled to
satisfy the constraint directly (rather than changing the value of the Adjustables)
although this distinction really only matters in the context of parametric spending.
"""
[docs]
def get_hard_constraint(self, optimization, instructions: ProgramInstructions):
"""
Return hard constraint from initial instructions
Often constraints can be specified relative to the initial conditions. For example,
fixing total spend regardless of what the total spend is in the initial instructions.
Therefore, during ``constrain_instructions``, it is necessary to examine properties
from the initial instructions in order to perform the constraining.
This method is called at the very start of optimization, passing in the initial instructions.
It then returns an arbitrary value that is passed back to the instance's ``constrain_instructions``
during optimization. For example, consider the total spending constraint
- ``get_hard_constraint`` would extract the total spend from the initial instructions
- This value is passed to ``constrain_instructions`` where it is used to rescale spending
Because subclasses implement both ``get_hard_constraint`` and ``constrain_instructions``
no assumptions need to be made about the value returned by this method - it simply needs
to be paired to ``constrain_instructions``.
:param optimization: An ``Optimization``
:param instructions: A set of initial instructions to extract absolute constraints from
:return: Arbitrary variable that will be passed back during ``constrain_instructions``
"""
return
[docs]
def constrain_instructions(self, instructions: ProgramInstructions, hard_constraints, optimization) -> float:
"""
Apply constraint to instructions
Constrains the instructions, returns a metric penalizing the constraint
If there is no penalty associated with adjusting (perhaps if all of the Adjustments are
parametric?) then this would be 0.0. The penalty represents in some sense the quality
of the constraint. For example, the default ``TotalSpendConstraint`` rescales spending
such that the total spend matches a target value. The penalty reflects the distance between
the requested spend and the constrained spend, so it is desirable to minimize it.
If it is not possible to constrain the instructions, raise ``FailedConstraint``.
:param instructions: The ``ProgramInstructions`` instance to constrain (in place)
:param hard_constraints: The hard constraint returned by ``get_hard_constraint``
:param optimization: The parent optimization, in case it is needed
:return: A numeric penalty value. Return `np.inf` if constraint penalty could not be computed
:raises: :class:`FailedConstraint` if the instructions could not be constrained
"""
return 0.0
[docs]
class TotalSpendConstraint(Constraint):
"""
Fix total spending
This class implements a constraint on the total spend at every time point when a program
is optimizable. A program is considered optimizable if an Adjustment reaches that program
at the specified time. Spending is constrained independently at all times when any program
is adjustable.
The ``initial_total_spend`` argument allows the total spending in a particular year to be explicitly specified
rather than drawn from the initial allocation. This can be useful when using parametric programs where
the adjustables do not directly correspond to spending value. If the total spend is not provided, it will
automatically be computed from the first ASD step. Note that it is computed based on the initial instructions
*after* the initial ASD values have been applied. This is because relative constraints on all adjustables are
interpreted relative to the initial value of the adjustable, since not all adjustables map directly to values
in the instructions. Since the adjustable hard upper and lower bounds are used as part of the constraint, for
consistency, the total spend constraint itself is drawn from the same set of instructions (i.e. after the initial
value has been applied). In cases where this is not the desired behaviour, the cause would likely be that the
default value does not agree with a known desired total spend value. In that case, the desired total spend should
simply be specified here explicitly as an absolute value.
This constraint can also be set to only apply in certain years.
The ``budget_factor`` multiplies the total spend at the time the ``hard_constraint`` is assigned
Typically this is to scale up the available spending when that spending is being drawn from
the instructions/progset (otherwise the budget_factor could already be part of the specified total spend)
Note that if no times are specified, the budget factor should be a scalar but no explicit
spending values can be specified. This is because in the case where different programs are
optimized in different years, an explicit total spending constraint applying to all
times is unlikely to be a sensible choice (so we just ask the user to specify the time as well).
:param total_spend: A list of spending amounts the same size as ``t`` (can contain Nones), or ``None``.
For times in which the total spend is ``None``, it will be automatically set to the sum of
spending on optimizable programs in the corresponding year
:param t: A time, or list of times, at which to apply the total spending constraint. If None, it will automatically be set to all years in which spending adjustments are being made
:param budget_factor: The budget factor multiplies whatever the ``initial_total_spend`` is. This can either be a single value, or a year specific value
"""
def __init__(self, total_spend=None, t=None, budget_factor=1.0):
self.total_spend = sc.promotetoarray(total_spend) if total_spend is not None else ()
self.t = sc.promotetoarray(t) if t is not None else ()
self.budget_factor = sc.promotetoarray(budget_factor)
if t is None:
assert total_spend is None, "If no times are specified, no total spend values can be specified either"
assert len(self.budget_factor) == 1, "If no times are specified, the budget factor must be scalar"
if t is not None and total_spend is not None:
assert len(self.total_spend) == len(self.t), "If specifying both the times and values for the total spending constraint, their lengths must be the same"
if len(self.budget_factor) > 1:
assert len(self.budget_factor) == len(self.t), "If specifying multiple budget factors, you must also specify the years in which they are used"
[docs]
def get_hard_constraint(self, optimization, instructions: ProgramInstructions) -> dict:
"""
Return hard constraint dictionary
:param optimization: ``Optimization`` instance
:param instructions: Initial ``ProgramInstructions``
:return:
"""
# First, at each time point where a program overwrite exists, we need to store
# the names of all of the programs being overwritten
# e.g.
# hard_constraints['programs'][2020] = ['Program 1','Program 2']
# hard_constraints['programs'][2030] = ['Program 2','Program 3']
# By convention, an Adjustable affecting a program should store the program's name in the
# `prog_name` attribute. If a program reaches multiple programs, then `prog_name` will be a list
# First, we make a dictionary storing the years in which any adjustments are made
# as well as the upper and lower bounds in that year. This is only done for spending
# adjustments (i.e. those where `adjustment.prog_name` is defined)
_package_adjustments = set() # Check for duplicate names
hard_constraints = {}
hard_constraints["programs"] = defaultdict(set) # It's a set so that it will work properly if multiple Adjustments reach the same parameter at the same time. However, this would a bad idea and nobody should do this!
for adjustment in optimization.adjustments:
if hasattr(adjustment, "prog_name"):
for t in sc.promotetoarray(adjustment.t):
if isinstance(adjustment, SpendingPackageAdjustment):
if adjustment.adjust_total_spend and adjustment.t == t:
hard_constraints["programs"][t].add(adjustment.name)
if adjustment.name in _package_adjustments:
raise Exception("More than one `SpendingPackageAdjustment` in the optimization has the same name - not compatible with total spend constraint")
else:
_package_adjustments.add(adjustment.name)
elif isinstance(adjustment.prog_name, list):
hard_constraints["programs"][t].update(adjustment.prog_name)
else:
hard_constraints["programs"][t].add(adjustment.prog_name)
if len(self.t):
# Check that every explictly specified time has
# corresponding spending adjustments available for use
missing_times = set(self.t) - set(hard_constraints["programs"].keys())
if missing_times:
raise Exception("Total spending constraint was specified in %s but the optimization does not have any adjustments at those times" % (missing_times))
# Now we have a set of times and programs for which we need to get total spend, and
# also which programs should be included in the total for that year
#
# hard_constraints['initial_total_spend'][2020] = 300
# hard_constraints['initial_total_spend'][2030] = 400
hard_constraints["initial_total_spend"] = {}
for t, progs in hard_constraints["programs"].items():
# For every time at which programs are optimizable...
if len(self.t) and t not in self.t:
# If we are not wanting to constrain spending in this year, then continue
continue
elif len(self.t):
idx = np.where(self.t == t)[0][0] # This is the index for the constraint year
else:
idx = None
if not len(self.total_spend) or self.total_spend[idx] is None:
# Get the total spend from the allocation in this year
total_spend = 0.0
for prog in progs:
if prog in instructions.alloc:
total_spend += instructions.alloc[prog].get(t)
else:
# It's a SpendingPackageAdjustment name
adj = optimization.get_adjustment(prog)
if adj.adjust_total_spend:
total_spend += adj.initial_spends.sum()
else:
total_spend = self.total_spend[idx]
# Lastly, apply the budget factor
if len(self.budget_factor) == 1:
hard_constraints["initial_total_spend"][t] = total_spend * self.budget_factor
else:
hard_constraints["initial_total_spend"][t] = total_spend * self.budget_factor[idx]
# Finally, for each adjustable, we need to store its upper and lower bounds
# _in the year that the adjustment is being made_
#
# This is because in the case where we have an multiple adjustables targeting the same Program at different
# time points, the upper/lower bounds might be different. For example, suppose we have spending on
# Program 1 specified somehow as 2020=$20 and 2030=$30. Then suppose we add an Adjustment for Program 1 in
# 2020 *and* 2030 with relative bounds of [0.5,2.0]. Then, in 2020 spending on Program 1 is limited to
# $10-40 while in 2030 it's limited to $15-$60. Thus the spending constraint could also be time-varying.
# In the case of a parametric overwrite, there will be no direct spending bounds based on the Adjustable. Then,
# the spending should only be restricted to a positive value.
hard_constraints["bounds"] = dict()
for t, progs in hard_constraints["programs"].items(): # For each time point being constrained, and for each program
if t not in hard_constraints["initial_total_spend"]:
# If the time is not one where the total spending constraint is being applied, then
# just skip it
continue
hard_constraints["bounds"][t] = dict()
# If there is an Adjustable that reaches this Program in the appropriate year:
# Keep track of the absolute lower and upper bounds on spending permitted by the program constraints
minimum_spend = 0.0
maximum_spend = 0.0
for adjustment in optimization.adjustments:
if isinstance(adjustment, SpendingPackageAdjustment):
if adjustment.adjust_total_spend and adjustment.t == t:
hard_constraints["bounds"][t][adjustment.name] = (adjustment.adjustables[-1].lower_bound, adjustment.adjustables[-1].upper_bound)
minimum_spend += adjustment.adjustables[-1].lower_bound
maximum_spend += adjustment.adjustables[-1].upper_bound
elif hasattr(adjustment, "prog_name") and np.array([prog in progs for prog in sc.promotetolist(adjustment.prog_name)]).all() and t in adjustment.t:
if isinstance(adjustment, SpendingAdjustment):
idx = np.where(adjustment.t == t)[0][0] # If it is a SpendingAdjustment then set bounds from the appropriate Adjustable
adjustable = adjustment.adjustables[idx]
hard_constraints["bounds"][t][adjustment.prog_name] = adjustable.get_hard_bounds(instructions.alloc[adjustment.prog_name].get(t)) # The instructions should already have the initial spend on this program inserted. This may be inconsistent if multiple Adjustments reach the same program...!
else:
for prog in sc.promotetolist(adjustment.prog_name):
hard_constraints["bounds"][t][prog] = (0.0, np.inf) # If the Adjustment reaches spending but is not a SpendingAdjustment then do not constrain the alloc
for prog in sc.promotetolist(adjustment.prog_name):
minimum_spend += hard_constraints["bounds"][t][prog][0]
maximum_spend += hard_constraints["bounds"][t][prog][1]
if minimum_spend > hard_constraints["initial_total_spend"][t]:
raise UnresolvableConstraint("The total spend in %.2f is constrained to %.2f but the individual programs have a total minimum spend of %.2f which is impossible to satisfy. Please either raise the total spending, or lower the minimum spend on one or more programs" % (t, hard_constraints["initial_total_spend"][t], minimum_spend))
if maximum_spend < hard_constraints["initial_total_spend"][t]:
raise UnresolvableConstraint("The total spend in %.2f is constrained to %.2f but the individual programs have a total maximum spend of %.2f which is impossible to satisfy. Please either lower the total spending, or raise the maximum spend on one or more programs" % (t, hard_constraints["initial_total_spend"][t], maximum_spend))
return hard_constraints
[docs]
def constrain_instructions(self, instructions: ProgramInstructions, hard_constraints: dict, optimization) -> float:
"""
Apply total spend constraint
:param instructions: The ``ProgramInstructions`` instance to constrain
:param hard_constraints: Dictionary of hard constraints
:param optimization: The parent optimization, in case it's needed
:return: Distance-like difference between initial spending and constrained spending, `np.inf` if constraint failed
"""
penalty = 0.0
for t, total_spend in hard_constraints["initial_total_spend"].items():
total_spend = sc.promotetoarray(total_spend).ravel()[0] # Make sure total spend is a scalar
x0 = sc.odict() # Order matters here
lb = []
ub = []
progs = hard_constraints["programs"][t] # Programs eligible for constraining at this time
for prog in progs:
if prog not in instructions.alloc:
# The program name corresponds to the name of a SpendingPackageAdjustment
adj = optimization.get_adjustment(prog) # Find the adjustment. Note that we find the adjustment by name, therefore SpendingPackageAdjustment names must be unique
x0[prog] = adj.get_total_spend(instructions)
else:
x0[prog] = instructions.alloc[prog].get(t)
low, high = hard_constraints["bounds"][t][prog]
lb.append(low)
ub.append(high)
x0_array = np.array(x0.values()).ravel()
lb = np.array(lb)
ub = np.array(ub)
x1_array = constrain_sum_bounded(x0_array, total_spend, lb, ub) # scaled spending that optimally satisfies constraints
penalty += total_spend * np.linalg.norm(x1_array - x0_array) # Penalty is the distance between the unconstrained budget and the constrained budget
for name, val in zip(x0.keys(), x1_array):
if name in instructions.alloc:
instructions.alloc[name].insert(t, val)
else:
adj = optimization.get_adjustment(name)
adj.set_total_spend(instructions, val)
return penalty
[docs]
class Optimization(NamedItem):
"""
Instructions on how to perform an optimization
The Optimization object stores the information that defines an optimization operation.
Optimization can be thought of as a function mapping one set of program instructions
to another set of program instructions. The parameters of that function are stored in the
Optimization object, and amount to
- A definition of optimality
- A specification of allowed changes to the program instructions
- Any additional information required by a particular optimization algorithm e.g. ASD
:param name:
:param adjustments: An `Adjustment` or list of `Adjustment` objects
:param measurables: A `Measurable` or list of `Measurable` objects
:param constraints: Optionally provide a `Constraint` or list of `Constraint` objects
:param maxtime: Optionally specify maximum ASD time
:param maxiters: Optionally specify maximum number of ASD iterations or hyperopt evaluations
:param method: One of ['asd','pso','hyperopt'] to use
- asd (to use normal ASD)
- pso (to use particle swarm optimization from pyswarm)
- hyperopt (to use hyperopt's Bayesian optimization function)
"""
def __init__(self, name=None, adjustments=None, measurables=None, constraints=None, maxtime=None, maxiters=None, method="asd"):
# Get the name
if name is None:
name = "default"
NamedItem.__init__(self, name)
self.maxiters = maxiters #: Maximum number of ASD iterations or hyperopt evaluations
self.maxtime = maxtime #: Maximum ASD time
self.method = method #: Optimization method name
assert adjustments is not None, "Must specify some adjustments to carry out an optimization"
assert measurables is not None, "Must specify some measurables to carry out an optimization"
self.adjustments = sc.promotetolist(adjustments)
self.measurables = sc.promotetolist(measurables)
if constraints:
self.constraints = [constraints] if not isinstance(constraints, list) else constraints
else:
self.constraints = None
if adjustments is None or measurables is None:
raise Exception("Must supply either a json or an adjustments+measurables")
return
def __repr__(self):
return sc.prepr(self)
[docs]
def get_initialization(self, progset: ProgramSet, instructions: ProgramInstructions) -> tuple:
"""
Get initial values for each adjustment
The initial conditions depend nontrivially on both the progset and the instructions. Spending is
present in the progset and optionally overwritten in the instructions. Therefore, it is necessary
to check both when determining initial spending. Extraction of the initial values for each
``Adjustment`` is delegated to the ``Adjustment`` itself.
Note also that the return arrays have length equal to the number of ``Adjustables``
(since an ``Adjustment`` may contain several ``Adjustables``).
:param progset: The program set to extract initial conditions from
:param instructions: Instructions to extract initial conditions from
:return: Tuple containing ``(initial,low,high)`` with arrays for
- The initial value of each adjustable
- The lower limit for each adjustable
- The upper limit for each adjustable
"""
# Return arrays of lower and upper bounds for each adjustable
x0 = []
for adjustment in self.adjustments:
x0 += adjustment.get_initialization(progset, instructions)
x0 = np.array(x0)
xmin = np.zeros(x0.shape)
xmax = np.zeros(x0.shape)
ptr = 0
for adjustment in self.adjustments:
for adjustable in adjustment.adjustables:
bounds = adjustable.get_hard_bounds(x0[ptr])
xmin[ptr] = bounds[0]
xmax[ptr] = bounds[1]
if x0[ptr] > xmax[ptr]:
raise InvalidInitialConditions('Adjustment "%s" has an adjustable with initial value of %.2f but an upper bound of %.2f' % (adjustment.name, x0[ptr], xmax[ptr]))
elif x0[ptr] < xmin[ptr]:
raise InvalidInitialConditions('Adjustment "%s" has an adjustable with initial value of %.2f but a lower bound of %.2f' % (adjustment.name, x0[ptr], xmax[ptr]))
ptr += 1
return x0, xmin, xmax
[docs]
def update_instructions(self, asd_values, instructions: ProgramInstructions) -> None:
"""
Apply all Adjustments
This method takes in a list of values (same length as number of adjustables) and iteratively
calls each ``Adjustment`` in the optimization to update the instructions (in place)
:param asd_values: A list of values
:param instructions: The ``ProgramInstructions`` instance to update
"""
idx = 0
for adjustment in self.adjustments:
adjustment.update_instructions(asd_values[idx : idx + len(adjustment.adjustables)], instructions)
idx += len(adjustment.adjustables)
[docs]
def get_hard_constraints(self, x0, instructions: ProgramInstructions) -> list:
"""
Get hard constraints
This method calls ``get_hard_constraint`` on each ``Constraint`` in the ``Optimization``
iteratively, and returns them as a list.
Note that the initial optimization values ``x0`` are applied _before_ the hard constraint is computed.
This ensures that the hard constraints are relative to the initial conditions in the optimization, not
the initial instructions. For example, if a parametric overwrite is present, the hard constraint will
be relative to whatever spending is produced by the initial values of the parametric overwrite.
:param x0: The initial values for optimization - these are applied to the instructions prior to extracting hard constraints
:param instructions: The initial instructions
:return: A list of hard constraints, as many items as there are constraints
"""
# Return hard constraints based on the starting initialization
instructions = sc.dcp(instructions)
self.update_instructions(x0, instructions)
if not self.constraints:
return list()
else:
return [x.get_hard_constraint(self, instructions) for x in self.constraints]
[docs]
def get_baselines(self, pickled_model) -> list:
"""
Return Measurable baseline values
This method is run at the start of the `optimize` script, and is used to
retrieve the baseline values for the Measurable. Note that the baseline values are obtained
based on the original instructions (stored in the pickled model), independent of the initial
parameters used for optimization. The logic is that the initial parameters for the optimization
are a choice dictated by the numerics of optimization (e.g. needing to start from a particular
part of the parameter space) rather than anything intrinsic to the problem, whereas the
initial instructions reflect the actual baseline conditions.
:param pickled_model:
:param x0: The initial parameter values
:param hard_constraints: List of hard constraint values
:return: A list of Measurable baseline values
"""
model = pickle.loads(pickled_model)
model.process()
baselines = [m.get_baseline(model) for m in self.measurables]
return baselines
[docs]
def get_adjustment(self, name: str) -> Adjustment:
"""
Retrieve adjustment by name
:param name:
:return:
"""
for adjustment in self.adjustments:
if adjustment.name == name:
return adjustment
else:
sc.suggest(name, [x.name for x in self.adjustments], die=True)
raise NotFoundError(f'Adjustment "{name}" could not be found')
[docs]
def constrain_instructions(self, instructions: ProgramInstructions, hard_constraints: list) -> float:
"""
Apply all constraints in-place, return penalty
This method takes in the proposed instructions, and a list of hard constraints. Each constraint is
applied to the instructions iteratively, passing in that constraint's own hard constraint, and the
penalty is accumulated and returned.
:param instructions: The current proposed ``ProgramInstructions``
:param hard_constraints: A list of hard constraints the same length as the number of constraints
:return: The total penalty value (if not finite, model integration will be skipped and the parameters will be rejected)
"""
constraint_penalty = 0.0
if self.constraints:
for constraint, hard_constraint in zip(self.constraints, hard_constraints):
constraint_penalty += constraint.constrain_instructions(instructions, hard_constraint, self)
return constraint_penalty
[docs]
def compute_objective(self, model, baselines: list) -> float:
"""
Return total objective function
This method accumulates the objective values returned by each ``Measurable``, passing in
the corresponding baseline values where required.
:param model: A simulated ``Model`` object
:param baselines: List of baseline values the same length as the number of ``Measurables``
:return: The total/net objective value
"""
# TODO - This method just adds the objective from each Measurable
# It might be desirable in future to have more complex functions of the Measurable e.g.
# sqrt of sum of squares. It's not clear yet whether this would be better as a transformation
# applied here, or as a kind of meta-measurable. The former is probably simpler
# to implement. But since there is no immediate need, this can be implemented later
objective = 0.0
for measurable, baseline in zip(self.measurables, baselines):
objective += measurable.eval(model, baseline)
return objective
def _objective_fcn(x, pickled_model, optimization, hard_constraints: list, baselines: list):
"""
Return objective value
This wrapper function takes in a vector of proposed parameters and returns the objective value.
It is typically not called directly - instead, it is partialled to bind all arguments except ``x``
and then passed to whichever optimization algorithm is used to optimize ``x``
:param x: Vector of proposed parameter values
:param pickled_model: A pickled ``Model`` - should contain a set of instructions
:param optimization: An ``Optimization``
:param hard_constraints: A list of hard constraints (should be the same length as ``optimization.constraints``)
:param baselines: A list of measurable baselines (should be the same length as ``optimization.measurables``)
:return:
"""
try:
model = pickle.loads(pickled_model)
optimization.update_instructions(x, model.program_instructions)
optimization.constrain_instructions(model.program_instructions, hard_constraints)
model.process()
except FailedConstraint:
return np.inf # Return an objective of `np.inf` if the constraints could not be satisfied by ``x``
obj_val = optimization.compute_objective(model, baselines)
# TODO - use constraint penalty somehow
# The idea is to keep the optimization in a parameter regime where large corrections to the instructions
# are not required. However, using ths constraint penalty directly can lead to it dominating the objective,
# and with ASD's single-parameter stepping this prevents convergence. So needs some further work
#
# constaint_penalty = optimization.constrain_instructions(...)
# obj_val += 0.0 * constraint_penalty
return obj_val
[docs]
def optimize(project, optimization, parset: ParameterSet, progset: ProgramSet, instructions: ProgramInstructions, x0=None, xmin=None, xmax=None, hard_constraints=None, baselines=None, optim_args: dict = None):
"""
Main user entry point for optimization
The optional inputs `x0`, `xmin`, `xmax` and `hard_constraints` are used when
performing parallel optimization (implementation not complete yet), in which case
they are computed by the parallel wrapper to `optimize()`. Normally these variables
would not be specified by users, because they are computed from the `Optimization`
together with the instructions (because relative constraints in the Optimization are
interpreted as being relative to the allocation in the instructions).
:param project: A :class:`Project` instance
:param optimization: An :class:`Optimization` instance
:param parset: A :class:`ParameterSet` instance
:param progset: A :class:`ProgramSet` instance
:param instructions: A :class:`ProgramInstructions` instance
:param x0: Not for manual use - override initial values
:param xmin: Not for manual use - override lower bounds
:param xmax: Not for manual use - override upper bounds
:param hard_constraints: Not for manual use - override hard constraints
:param baselines: Not for manual use - override Measurable baseline values (for relative Measurables)
:param optim_args: Pass a dictionary of keyword arguments to pass to the optimization algorithm (set in ``optimization.method``)
:return: A :class:`ProgramInstructions` instance representing optimal instructions
"""
assert optimization.method in ["asd", "pso", "hyperopt"]
model = Model(project.settings, project.framework, parset, progset, instructions)
pickled_model = pickle.dumps(model) # Unpickling effectively makes a deep copy, so this _should_ be faster
initialization = optimization.get_initialization(progset, model.program_instructions)
x0 = x0 if x0 is not None else initialization[0]
xmin = xmin if xmin is not None else initialization[1]
xmax = xmax if xmax is not None else initialization[2]
if not hard_constraints:
hard_constraints = optimization.get_hard_constraints(x0, model.program_instructions) # The optimization passed in here knows how to calculate the hard constraints based on the program instructions
if not baselines:
baselines = optimization.get_baselines(pickled_model) # The optimization passed in here knows how to calculate the hard constraints based on the program instructions
# Prepare additional arguments for the objective function
args = {
"pickled_model": pickled_model,
"optimization": optimization,
"hard_constraints": hard_constraints,
"baselines": baselines,
}
# Check that the initial conditions are OK
# Note that this cannot be done by `optimization.get_baselines` because the baselines need to be computed against the
# initial instructions which might be different to the initial conditions (e.g. baseline spending vs the scaled-up
# initialization used when minimizing spending)
initial_objective = _objective_fcn(x0, **args)
if not np.isfinite(initial_objective):
raise InvalidInitialConditions("Optimization cannot begin because the objective function was %s for the specified initialization" % (initial_objective))
if optimization.method == "asd":
default_args = {
"maxiters": optimization.maxiters,
"maxtime": optimization.maxtime,
"xmin": xmin,
"xmax": xmax,
}
# Set ASD verbosity based on Atomica logging level
log_level = logger.getEffectiveLevel()
if log_level < logging.WARNING:
default_args["verbose"] = 2
else:
default_args["verbose"] = 0
optim_args = sc.mergedicts(default_args, optim_args)
opt_result = sc.asd(_objective_fcn, x0, args, **optim_args)
x_opt = opt_result["x"]
elif optimization.method == "pso":
import pyswarm
default_args = {"maxiter": 3, "lb": xmin, "ub": xmax, "minstep": 1e-3, "debug": True}
optim_args = sc.mergedicts(default_args, optim_args)
if np.any(~np.isfinite(xmin)) or np.any(~np.isfinite(xmax)):
errormsg = "PSO optimization requires finite upper and lower bounds to specify the search domain (i.e. every Adjustable needs to have finite bounds)"
raise Exception(errormsg)
x_opt, _ = pyswarm.pso(_objective_fcn, kwargs=args, **optim_args)
elif optimization.method == "hyperopt":
import hyperopt
import functools
if np.any(~np.isfinite(xmin)) or np.any(~np.isfinite(xmax)):
errormsg = "hyperopt optimization requires finite upper and lower bounds to specify the search domain (i.e. every Adjustable needs to have finite bounds)"
raise Exception(errormsg)
space = []
for i, (lower, upper) in enumerate(zip(xmin, xmax)):
space.append(hyperopt.hp.uniform(str(i), lower, upper))
fcn = functools.partial(_objective_fcn, **args) # Partial out the extra arguments to the objective
default_args = {"max_evals": optimization.maxiters if optimization.maxiters is not None else 100, "algo": hyperopt.tpe.suggest}
optim_args = sc.mergedicts(default_args, optim_args)
x_opt = hyperopt.fmin(fcn, space, **optim_args)
x_opt = np.array([x_opt[str(n)] for n in range(len(x_opt.keys()))])
elif callable(optimization.method):
# Use custom optimization function
# Placeholder functionality - not tested!
optim_args = {} if optim_args is None else optim_args
x_opt = optimization.method(_objective_fcn, **optim_args)
else:
raise Exception("Unrecognized optimization method")
# Use the optimal parameter values to generate new instructions
optimization.update_instructions(x_opt, model.program_instructions)
optimization.constrain_instructions(model.program_instructions, hard_constraints)
return model.program_instructions # Return the modified instructions
# Note that we do not return the value of the objective here because *in general* the objective isn't required
# or expected to have a meaningful interpretation because it may arbitrarily combine quantities (e.g. spending
# and epi outcomes) or is otherwise subject to the choice of weighting (e.g. impact vs equity). Therefore,
# it is recommended that an _absolute_ metric be computed from the returned optimized instructions rather than
# capturing the optimization objective. For example, in an optimization trading off equity against impact,
# the deaths averted could be computed using the optimized instructions and returned as an absolute measure of quality.
# def parallel_optimize(project,optimization,parset,progset,program_instructions):
#
# raise NotImplementedError
#
# initialization = optimization.get_initialization(progset, program_instructions)
# xmin = initialization[1]
# xmax = initialization[2]
#
# initial_instructions = program_instructions
# for i in range(0,n_rounds):
# for i in range(0,n_threads):
# optimized_instructions[i] = optimize(optimization, parset, progset, initial_instructions, x0=None, xmin=xmin, xmax=xmax)
# initial_instructions = pick_best(optimized_instructions)
# return initial_instructions # Best one from the last round
[docs]
def constrain_sum_bounded(x: np.array, s: float, lb: np.array, ub: np.array) -> np.array:
"""
Bounded nearest constraint sum
:param x: Array of proposed values to constrain
:param s: Target value for ``sum(x)``
:param lb: Array of lower bounds, same size as x
:param ub: Array of upper bounds, same size as x
:param tolerance: Absolute tolerance for the constrained sum ``s``
:return: Array same size as ``x``, such that ``sum(x)==s`` and ``x[i]>=lb`` and ``x[i]<=ub`` for ``i<len(x)``
:raises: :class:`FailedConstraint` if it was not possible to constrain
"""
tolerance = 1e-6
# Normalize values
x0_scaled = x / (x.sum() or 1) # Normalize the initial values, unless they sum to 0 (i.e., they are all zero)
lb_scaled = lb / s
ub_scaled = ub / s
# First, check if the constraint is already satisfied just by multiplicative rescaling
# The final check for x0_scaled.sum()==1 catches the case where all of the input values are 0
if np.all((x0_scaled >= lb_scaled) & (x0_scaled <= ub_scaled)) and np.isclose(x0_scaled.sum(), 1):
return x0_scaled * s
# If not, we need to actually run the constrained optimization
bounds = [(lower, upper) for lower, upper in zip(lb_scaled, ub_scaled)]
def jacfcn(x):
# Explicitly specify the Jacobian associated with changes in the allocation
dist = np.linalg.norm(x - x0_scaled)
if dist == 0:
return np.zeros(x.shape)
else:
return (x - x0_scaled) / dist
LinearConstraint = [{"type": "eq", "fun": lambda x: np.sum(x) - 1, "jac": lambda x: np.ones(x.shape)}]
res = scipy.optimize.minimize(lambda x: np.linalg.norm(x - x0_scaled), x0_scaled, jac=jacfcn, bounds=bounds, constraints=LinearConstraint, method="SLSQP", options={"ftol": 1e-5, "maxiter": 1000})
if not res["success"]:
logger.warning("constrain_sum_bounded() failed - rejecting proposed parameters")
raise FailedConstraint()
# Enforce upper/lower bound constraints to prevent numerically exceeding them
sol = np.minimum(np.maximum(res["x"], lb_scaled), ub_scaled) * s
assert np.isclose(sol.sum(), s), f"FAILED as {sol} has a total of {sol.sum()} which is not sufficiently close to the target value {s}"
return sol