Source code for atomica.calibration

"""
Implements automatic calibration

This module defines the :func:`calibrate` function, which is the entry-point for
automatic calibration

"""

import numpy as np
import sciris as sc
from .model import BadInitialization
from .system import logger
from .parameters import ParameterSet
import logging
import atomica

__all__ = ["calibrate"]

# TODO: Determine whether this is necessary.
calibration_settings = dict()
calibration_settings["tolerance"] = 1e-6


def _update_parset(parset, y_factors, pars_to_adjust) -> None:
    """

    :param parset: a ParameterSet object
    :param y_factors: Array with as many elements as pars_to_adjust
    :param pars_to_adjust: Array of tuples (par_name,pop_name,...) with special value pop='all' supported to set meta factor
                           Must have as many elements as y_factors. pop=None is not allowed - it must be converted
                           to a full list of pops previously (in perform_autofit)
    :return: None (parset is modified in-place)
    """

    for i, (par_name, pop_name, *_) in enumerate(pars_to_adjust):
        par = parset.get_par(par_name)
        if pop_name.lower() == "all":
            par.meta_y_factor = y_factors[i]
        else:
            par.y_factor[pop_name] = y_factors[i]


def _calculate_objective(y_factors, pars_to_adjust, output_quantities, parset, project, *args, **kwargs) -> float:
    """
    Run the model for a given set of y-factors and return the objective/goodness-of-fit

    Additional extra arguments will be ignored but will not raise an error.

    :param y_factors: array of y-factors to apply to specified output_quantities
    :param pars_to_adjust: list of tuples (par_name,pop_name,...) recognized by parset.update()
    :param output_quantities: a tuple containing (pop, var, weight, metric, start_year, end_year) - start year and end year are inclusive
    :param parset:
    :param project:
    :return: The value of the objective function defined by the output_quantities
    """

    _update_parset(parset, y_factors, pars_to_adjust)

    try:
        result = project.run_sim(parset=parset, store_results=False)
    except BadInitialization:  # If the proposed parameters lead to invalid initial compartment sizes
        return np.inf

    objective = 0.0

    for var_label, pop_name, weight, metric, start_year, end_year in output_quantities:

        target = project.data.get_ts(var_label, pop_name)  # This is the TimeSeries with the data for the requested quantity
        if target is None:
            continue
        if not target.has_time_data:  # Only use this output quantity if the user entered time-specific data
            continue

        if pop_name.lower() == "total":
            var = atomica.PlotData(result, outputs=var_label, pops="total", project=project)
        else:
            var = result.model.get_pop(pop_name).get_variable(var_label)

        data_t, data_v = target.get_arrays()
        inds = (data_t >= start_year) & (data_t <= end_year)
        if np.count_nonzero(inds) == 0:
            # If no time points remain after filtering down to the time points the user requested
            logger.info(f"No data points remaining after filtering down to requested time period. Skipping...")
            continue
        data_t = data_t[inds]
        data_v = data_v[inds]

        # Interpolate the model outputs onto the data times
        # If there is data outside the range when the model was simulated, don't
        # extrapolate the model outputs
        y = data_v
        if pop_name.lower() == "total":
            y2 = np.interp(data_t, var.series[0].tvec, var.series[0].vals, left=np.nan, right=np.nan)
        else:
            y2 = np.interp(data_t, var[0].t, var[0].vals, left=np.nan, right=np.nan)

        idx = ~np.isnan(y) & ~np.isnan(y2)
        objective += weight * sum(_calculate_fitscore(y[idx], y2[idx], metric))

    return objective


def _get_fitscore_func(metric):
    availfns = globals().copy()
    availfns.update(locals())
    try:
        return availfns.get("_calc_%s" % metric)
    except Exception:
        raise NotImplementedError("No method associated with _calc_%s (calibration.py)" % metric)


def _calculate_fitscore(y_obs, y_fit, metric="meansquare"):
    return _get_fitscore_func(metric)(y_obs, y_fit)


def _calc_meansquare(y_obs, y_fit):
    """
    Calcs the RMS error.

    Note: could also use implementation from sklearn in future ...
    """
    return np.sqrt(((y_fit - y_obs) ** 2).mean())


def _calc_fractional(y_obs, y_fit):
    return np.abs((y_fit - y_obs) / np.clip(y_obs, 1, None))  # Use clipping to handle cases where data has value 0


def _calc_wape(y_obs, y_fit):
    """
    Calculates the weighted absolute percentage error
    """
    return abs(y_fit - y_obs) / (y_obs.mean() + calibration_settings["tolerance"])


[docs] def calibrate(project, parset: ParameterSet, pars_to_adjust, output_quantities, max_time=None, method="asd", time_period=(-np.inf, np.inf), **kwargs) -> ParameterSet: """ Run automated calibration :param project: A project instance to provide data and sim settings :param parset: A :class:`ParameterSet` instance to calibrate :param pars_to_adjust: list of tuples, `(par_name, pop_name, lower_bound, upper_bound, initial_value)` the pop name can be None, which will be expanded to all populations relevant to the parameter independently, or 'all' which will instead operate on the meta y factor. To calibrate a transfer, the parameter name should be set to ``'<tranfer_code_name>_from_<from_pop>'`` and then the destination population can be specified as the ``pop_name``. For example, to automatically calibrate an aging transfer 'age' from 5-14 to 15-64, the tuple would contain ``pars_to_adjust=[('age_from_5-14','15-64',...)]`` :param output_quantities: list of tuples, (var_label,pop_name,weight,metric), for use in the objective function. pop_name=None will expand to all pops. pop_name='all' is not supported. The output can optionally contain `(var_label, pop_name, weight, metric, start_year, end_year)` to select a subset of the data for evaluating the objective. The start year and end year specified here will take precedence over the time_period argument. In some cases, it may be desirable to fit to an aggregated total value across populations. In that case, the databook should have an extra row in the TDVE table for a population called "Total". The measurable can then be given pop_name="Total" which will cause the Atomica model outputs to be aggregated over all populations, and the aggregate value compared to the "Total" data. The aggregation methods will be automatically selected depending on units of the quantity (sum for "number" units, average for others). :param max_time: If using ASD, the maximum run time :param time_period: Tuple of start and end years to use for the objective function. Applies to all outputs unless the output has an explicitly specified start and end year :param method: 'asd' or 'pso'. If using 'pso' all upper and lower limits must be finite :param kwargs: Dictionary of additional arguments to be passed to the optimization function, e.g. stepsize or pinitial :return: A calibrated :class:`ParameterSet` """ # Expand out pop=None in pars_to_adjust p2 = [] for par_tuple in pars_to_adjust: if len(par_tuple) == 5: initial_value = par_tuple[4] else: initial_value = None if par_tuple[1] is None: # If the pop name is None par = parset.get_par(par_tuple[0]) for pop_name in par.pops: p2.append((par_tuple[0], pop_name, par_tuple[2], par_tuple[3], initial_value)) else: p2.append((par_tuple[0], par_tuple[1], par_tuple[2], par_tuple[3], initial_value)) pars_to_adjust = p2 # Expand out pop=None in output_quantities outputs = [] for output_tuple in output_quantities: var_label = output_tuple[0] pop_name = output_tuple[1] weight = output_tuple[2] metric = output_tuple[3] if len(output_tuple) == 6: start_year = output_tuple[4] end_year = output_tuple[5] else: start_year = time_period[0] end_year = time_period[1] if pop_name is None: # If the pop name is None for pop in project.data.pops.keys(): outputs.append((var_label, pop, weight, metric, start_year, end_year)) else: outputs.append((var_label, pop_name, weight, metric, start_year, end_year)) output_quantities = outputs x0 = [] xmin = [] xmax = [] filtered_pars_to_adjust = [] parset = parset.copy() for i, x in enumerate(pars_to_adjust): par_name, pop_name, scale_min, scale_max, initial_value = x par = parset.get_par(par_name) # if initial_value has not been explicitly set in the tuple: use y_factor in parset if initial_value is None: if pop_name == "all": initial_value = par.meta_y_factor else: initial_value = par.y_factor[pop_name] # if this value is outside the min and max bounds, make it equal to min or max (whichever is closest) # if min == max, this should make the initial value equal to min and max initial_value = np.clip(initial_value, scale_min, scale_max) else: assert (initial_value >= scale_min) and (initial_value <= scale_max), "Initial value is not consistent with the lower/upper bounds" # update y_factors in parset if pop_name == "all": par.meta_y_factor = initial_value else: par.y_factor[pop_name] = initial_value if scale_min != scale_max: # Only include the y-factor in the adjustments made in the optimization function if a range # of y-factor values is permitted filtered_pars_to_adjust.append(x) x0.append(initial_value) xmin.append(scale_min) xmax.append(scale_max) args = { "project": project, "parset": parset, "pars_to_adjust": filtered_pars_to_adjust, "output_quantities": output_quantities, } original_sim_end = project.settings.sim_end project.settings.sim_end = min(project.data.tvec[-1], original_sim_end) if len(filtered_pars_to_adjust) > 0: try: if method == "asd": optim_args = { "stepsize": 0.1, "maxiters": 2000, "sinc": 1.5, "sdec": 2.0, "reltol": 1e-3, "abstol": 1e-6, "xmin": xmin, "xmax": xmax, "maxtime": 60 if max_time is None else max_time, "minval": 0, } optim_args.update(kwargs) if "verbose" not in optim_args: log_level = logger.getEffectiveLevel() if log_level < logging.WARNING: optim_args["verbose"] = 2 else: optim_args["verbose"] = 0 opt_result = sc.asd(_calculate_objective, x0, args, **optim_args) x1 = opt_result["x"] elif method == "pso": import pyswarm optim_args = {"maxiter": 3, "lb": xmin, "ub": xmax, "minstep": 1e-3, "debug": True} if np.any(~np.isfinite(xmin)) or np.any(~np.isfinite(xmax)): errormsg = "PSO optimization requires finite upper and lower bounds to specify the search domain (i.e. every parameter being adjusted needs to have finite bounds)" raise Exception(errormsg) x1, _ = pyswarm.pso(_calculate_objective, kwargs=args, **optim_args) else: raise Exception("Unrecognized method") except Exception as e: raise e finally: project.settings.sim_end = original_sim_end # Restore the simulation end year _update_parset(args["parset"], x1, args["pars_to_adjust"]) else: logger.info("No parameters to adjust provided to the optimisation function. Skipping optimisation...") # Log out the commands required for equivalent manual calibration if desired for i, x in enumerate(pars_to_adjust): par_name = x[0] pop_name = x[1] if par_name in parset.pars: par = args["parset"].pars[par_name] if pop_name.lower() == "all": logger.debug("parset.get_par('{}').meta_y_factor={:.2f}".format(par_name, par.meta_y_factor)) else: logger.debug("parset.get_par('{}').y_factor['{}']={:.2f}".format(par_name, pop_name, par.y_factor[pop_name])) else: tokens = par_name.split("_from_") par = args["parset"].transfers[tokens[0]][tokens[1]] logger.debug("parset.transfers['{}']['{}'].y_factor['{}']={:.2f}".format(tokens[0], tokens[1], pop_name, par.y_factor[pop_name])) args["parset"].name = "calibrated_" + args["parset"].name return args["parset"]