Source code for atomica.parameters

"""
Implements data-based model parameters (:class:`ParameterSet`)

A :class:`ParameterSet` (or 'parset') is an intermediate representation of
model parameters. The main role of the parset is to store the calibration
values that are used to scale model parameters. Therefore, every parameter
in the model appears in the parset, not just the parameters in the databook.

"""

import io
from collections import defaultdict

import atomica.excel
import numpy as np
import pandas as pd

import sciris as sc
from .utils import NamedItem, TimeSeries
from .system import logger
import itertools
import json
import hashlib
from .version import version, gitinfo

__all__ = ["Parameter", "ParameterSet"]


[docs] class Parameter(NamedItem): """ Class to hold one set of parameter values disaggregated by populations. :param name: The name of the parameter (should match framework code name) :param ts: A dict where the key is population name and the value is a TimeSeries instance """ def __init__(self, name: str, ts: dict): NamedItem.__init__(self, name) self.ts = ts # : Population-specific data is stored in TimeSeries within this dict, keyed by population name self.y_factor = sc.odict.fromkeys(self.ts, 1.0) # : Calibration scale factors for the parameter in each population self.meta_y_factor = 1.0 # : Calibration scale factor for all populations self.skip_function = sc.odict.fromkeys(self.ts, None) # : This can be a range of years [start,stop] between which the parameter function will not be evaluated self._interpolation_method = "linear" #: Fallback interpolation method. It is _strongly_ recommended not to change this, but to call ``Parameter.smooth()` instead @property def pops(self): """ Get populations contained in the Parameter :return: Generator/list of available populations """ return self.ts.keys()
[docs] def has_values(self, pop_name: str) -> bool: """ Check if any values are present Returns True if this Parameter has values specified for the given population If the Parameter has an assumption, then the time value will be nan but a y-value will be present. If the Parameter normally has a function, then the y-value will be None. If a function Parameter has a scenario overwrite applied then actual values will be present. Essentially, if this function returns True, then the `interpolate()` method will return usable values :param pop_name: The code name of a population :return: ``True`` if any values are present for specified population, otherwise ``False`` """ if pop_name not in self.ts: return False elif self.ts[pop_name].has_data: return True else: return False
[docs] def interpolate(self, tvec, pop_name: str) -> np.array: """ Return interpolated parameter values for a given population The Parameter internally stores the interpolation method. The default is linear. It is possible to set it to 'pchip' or 'previous' or some other method. However, this would also be applied to any parameter scenarios that have modified the parameter and require interpolation. Therefore, it is STRONGLY recommended not to modify the fallback interpolation method, but to instead call `Parameter.smooth()` in advance with the appropriate options, if the interpolation matters. :param tvec: A scalar, list, or array or time values :param pop_name: The population to interpolate data for :return: An array with the interpolated values """ return self.ts[pop_name].interpolate(tvec, method=self._interpolation_method)
[docs] def sample(self, constant: bool) -> None: """ Perturb parameter based on uncertainties This function modifies the parameter in-place. It would normally be called via :meth:`ParameterSet.sample()` which is responsible for copying this instance first. :param constant: If True, time series will be perturbed by a single constant offset. If False, a different perturbation will be applied to each time specific value independently. """ for k, ts in self.ts.items(): self.ts[k] = ts.sample(constant)
[docs] def smooth(self, tvec, method="smoothinterp", pop_names=None, **kwargs): """ Smooth the parameter's time values Normally, Parameter instances contain temporally-sparse values from the databook. These are then interpolated to get the input parameter values at all model time points. The default interpolation is linear. However, sometimes it may be desired to make specific assumptions about the parameter value at intermediate times. These could be added directly to the Parameter's underlying `TimeSeries`. This method applies a smoothing method to the Parameter, modifying the underlying TimeSeries in-place. The operation is - Interpolated or smoothed values are generated for the requested times - All existing time points between and including the minimum and maximum values of `tvec` are removed - The time values generated in this function are inserted For example, to apply pchip interpolation to generate intermediate values >>> Parameter.smooth(P.settings.tvec, method='pchip',**kwargs) As this goes through `TimeSeries.interpolate` the same rules apply for the conditions under which the interpolation function gets used - specifically, interpolation is used if there are at least two finite time values present, otherwise, the assumption or single value will be used. :param tvec: New time points to add to the TimeSeries :param method: Method for generation of smoothed/interpolated values, default uses sciris smoothinterp :param pop_names: Optionally specify a list of populations to modify :param kwargs: Optionally pass arguments to the generating function/class constructor :return: """ pop_names = sc.promotetolist(pop_names) if not pop_names: pop_names = self.ts.keys() if sc.isstring(method): if method == "smoothinterp": # Generating function for smooth-interp interpolator def smoothinterp(x1, y1, **kwargs): def fcn(x2): return sc.smoothinterp(x2, x1, y1, **kwargs) return fcn method = smoothinterp elif method in ["pchip", "linear", "previous"]: pass else: raise Exception("Unknown smoothing method") for pop in pop_names: ts = self.ts[pop] v2 = ts.interpolate(tvec, method=method, **kwargs) ts.remove_between((min(tvec), max(tvec))) for t, v in zip(tvec, v2): ts.insert(t, v)
[docs] class Initialization: """ This class stores initial compartment sizes In some cases it may be desirable to explicitly set initial compartment sizes rather than having them calculated based on the databook values/characteristics. An example of this could be wanting to initialize the model using steady-state compartment sizes computed from a prior model run. This class facilitates storing/applying the initial compartment sizes as well as capturing metadata for validation purposes. """ def __init__(self, values=None, year=None, init_y_factor_hash=None, dt=None): """ Construct an Initialization with explicit initial values This function can be used to explicitly set initial compartment sizes. More typically, the initial compartment sizes would be drawn from a previous model run. In that case, construct the ``Initialization`` using the ``Initialization.from_result()`` method, passing in the parset and result. For users, this is typically handled via ``ParameterSet.set_initialization(result)`` which allows easily passing a result into the ``ParameterSet`` to set the initialization in one step. :param values: Provide a dictionary of values with compartment sizes. Keys should be tuples with (comp_name,pop_name) and values should be either scalars (for normal compartments) or arrays (for timed compartments). The size of the arrays for timed compartments will reflect both the duration of the timed compartment and the simulation step size (noting that if timed compartments are being used, the ``Initialization`` instance will not be reusable if the simulation step size is subsequently changed, and will need to be re-created using the new step size). :param year: Optionally specify the year used to generate the initialization (for provenance) :param init_y_factor_hash: Optionally specify the y-factor hash. If supplied, this will be checked against the parset when the initialization is applied :param dt: Optionally specify the timestep used to generate the initialization (for provenance) """ self.year = year self.init_y_factor_hash = init_y_factor_hash self.dt = dt self.values = values
[docs] @classmethod def from_result(cls, res, parset=None, year=None): """ Construct an initialization based on a Result This method is used to create an ``Initialization`` instance when the initial compartment sizes are drawn from the state of a previously-run model. This facilitates initializing the model after numerically converging to a steady state or after an initial transient has passed. :param res: An Atomica ``Result`` instance :param parset: Optionally specify a ``ParameterSet`` instance containing y-factor values. If provided, subsequent use of the initialization will check if the y-factors have changed since the initialization was saved and display a warning if so. :param year: Optionally specify the year to draw compartment sizes from in the result. If not provided, the last time point will be used. The year must exactly match a year contained in the result. :return: A new ``Initialization`` instance """ from atomica.model import TimedCompartment # Avoid circular import if year is None: year = res.model.t[-1] elif year not in res.model.t: raise Exception(f"Year {year} was not present in the result") idx = np.nonzero(res.model.t == year)[0][0] values = dict() #: Dictionary of values with either {'comp':val} or {'comp':[vals]} depending on for pop in res.model.pops: for comp in pop.comps: if isinstance(comp, TimedCompartment): values[(comp.name, pop.name)] = comp._vals[:, idx] else: values[(comp.name, pop.name)] = comp.vals[idx] self = cls(values) self.year = year #: Record year from which the initialization was originally computed self.init_y_factor_hash = None if parset is None else self.hash_y_factors(res.model.framework, parset) # Record a hash of the Y-factors used for initialization self.dt = res.dt return self
[docs] @staticmethod def hash_y_factors(framework, parset) -> str: """ Hash y-factors used for initialization This method calculates a hash of the y-factors for the purpose of identifying if they have changed since the Initialization was originally created. :param framework: An ``at.Framework`` instance containing setup weights for compartments/characteristics :param parset: An ``at.ParameterSet`` instance containing y-factors :return: A hash computed from the y-factors used for normal compartment initialization """ init_quantities = {k for k, v in framework.comps["setup weight"].to_dict().items() if v > 0} init_quantities.update(k for k, v in framework.characs["setup weight"].to_dict().items() if v > 0) d = {} for quantity in init_quantities: d[quantity] = dict(parset.pars[quantity].y_factor) d[quantity]["_meta_y_factor"] = parset.pars[quantity].meta_y_factor # This might fail if there actually was a population called '_meta_y_factor' but that seems unlikely... return hashlib.sha256(json.dumps(d, sort_keys=True).encode("utf-8")).hexdigest()
[docs] def apply(self, pop, framework=None, parset=None) -> None: """ Insert saved values into compartments The Initialization may contain a hash of the y-factors for initialization compartments/characteristics that were used when the Initialization was generated, if the Initialization was generated based on a previous Result. When applying the initialization, if a framework and parset are provided, then the y-factors will be checked to determine if there are any differences in the initialization y-factors or not, and a warning will be displayed if so. :param pop: An at.model.Population instance :param framework: Optionally specify a framework object containing the specification of which comps/characs are used for initialization :param parset: Optionally specify the requested parset for initialization containing Y-factors to compare to the saved Y-factors """ from atomica.model import TimedCompartment # Avoid circular import # Check the y-factors if self.init_y_factor_hash is not None and framework is not None and parset is not None: init_y_factor_hash = self.hash_y_factors(framework, parset) if init_y_factor_hash != self.init_y_factor_hash: logger.warning("Y-factors used for initialization have changed since the saved initialization was generated. These Y-factors will have no effect because a saved initialization is being used. Remove the initialization by setting `Parset.initialization=None` to return to using the Y-factors to adjust the initialization.") for comp in pop.comps: if isinstance(comp, TimedCompartment): if (comp.name, pop.name) not in self.values: comp._vals[:, 0] = 0 else: comp._vals[:, 0] = self.values[(comp.name, pop.name)] else: if (comp.name, pop.name) not in self.values: comp.vals[0] = 0 else: comp.vals[0] = self.values[(comp.name, pop.name)]
def to_excel(self, writer): max_len = max(len(v) if not np.isscalar(v) else 1 for v in self.values.values()) d = {} for k, v in self.values.items(): d[k] = np.full(max_len, fill_value=np.nan) if np.isscalar(v): d[k][0] = v else: d[k][: len(v)] = v values = pd.DataFrame(d).T metadata = { "year": self.year, "init_y_factor_hash": self.init_y_factor_hash, "dt": self.dt, } metadata = pd.DataFrame.from_dict(metadata, orient="index") metadata.to_excel(writer, sheet_name="Initialization", startcol=0, startrow=0, index=True, header=False) values.to_excel(writer, sheet_name="Initialization", startcol=0, startrow=len(metadata) + 1, index=True, header=False) def __repr__(self): return sc.prepr(self)
[docs] @classmethod def from_excel(cls, excelfile: pd.ExcelFile): """ Construct an initialization from saved spreadsheet :param excelfile: A ``pd.ExcelFile`` containing an 'Initialization' sheet :return: """ # excelfile = spreadsheet.pandas() metadata, value_df = atomica.excel.read_dataframes(excelfile.book['Initialization']) values = {} for k,s in value_df.T.reset_index().T.set_index([0,1]).iterrows(): v = s.dropna().values values[k] = v[0] if len(v) == 1 else v self = cls(values) for k,v in metadata.T.reset_index().T.set_index(0)[1].to_dict().items(): setattr(self, k, v) return self
[docs] class ParameterSet(NamedItem): """ Collection of model parameters to run a simulation A ParameterSet contains a collection of Parameters required to run the simulation. The parameters contain scale factors used for calibration, so often a project will contain multiple ParameterSets corresponding to different calibrations. Although parameters are constructed from ProjectData, there are two key differences - The ParameterSet contains calibration scale factors - The ParameterSet expands transfers and interactions into per-population parameters so they are stored on an equal basis (whereas ProjectData segregates them in :class:`TimeDependentValuesEntry` and :class:`TimeDependentConnections` due to the difference in how they are formatted in the databook) :param framework: A :class:`ProjectFramework` instance :param data: A :class:`ProjectData` instance :param name: Optionally specify the name of the parset """ def __init__(self, framework, data, name="default"): NamedItem.__init__(self, name) self.version = version # Track versioning information self.gitinfo = gitinfo self.pop_names = data.pops.keys() # : List of all population code names contained in the ``ParameterSet`` self.pop_labels = [pop["label"] for pop in data.pops.values()] # : List of corresponding full names for populations self.pop_types = [pop["type"] for pop in data.pops.values()] # : List of corresponding population types self.pars = sc.odict() # : Stores the Parameter instances contained by this ParameterSet associated with framework comps, characs, and parameters self.transfers = sc.odict() # : Stores the Parameter instances contained by this ParameterSet associated with databook transfers, keyed by source population self.interactions = sc.odict() # : Stores the Parameter instances contained by this ParameterSet associated with framework interactions, keyed by source population self.initialization = None #: Optionally store an ``Initialization`` instance to explicitly set initial compartment sizes # Instantiate all quantities that appear in the databook (compartments, characteristics, parameters) for name, tdve in data.tdve.items(): ts = sc.odict() units = framework.get_databook_units(name).strip().lower() # First check units for all quantities for k, v in tdve.ts.items(): if units != v.units.strip().lower(): message = f'The units for quantity "{framework.get_label(name)}" in the databook do not match the units in the framework. Expecting "{units}" but the databook contained "{ts.units.strip().lower()}"' raise Exception(message) # Then populate the parameter time series for k in self.pop_names: if k in tdve.ts: ts[k] = tdve.ts[k].copy() elif "all" in tdve.ts: ts[k] = tdve.ts["all"].copy() elif "All" in tdve.ts: ts[k] = tdve.ts["All"].copy() self.pars[name] = Parameter(name, ts) # Keep only valid populations (discard any extra ones here) # Instantiate compartments and characteristics that have a default value and do not appear in the databook # Note that the default value will be used in all populations of the appropriate type # For the moment, framework validation ensures the default value is 0 - to avoid the confusion of default values being invisibly inserted # This requirement could be dropped in future if the use cases end up being unambiguous for _, spec in itertools.chain(framework.comps.iterrows(), framework.characs.iterrows()): if pd.isna(spec["databook page"]) and not pd.isna(spec["default value"]): assert spec.name not in self.pars, f"Quantity '{spec.name}' is not marked in the framework as having a databook page and it has a default value, but it has been loaded into the ParameterSet as data. If the quantity needs to be populated from the databook, either provide a databook page or remove the default value" ts = dict() units = framework.get_databook_units(name).strip().lower() for pop_name in self.pop_names: if data.pops[pop_name]["type"] == spec["population type"]: ts[pop_name] = TimeSeries(units=units, assumption=spec["default value"]) self.pars[spec.name] = Parameter(spec.name, ts) # Instantiate parameters not in the databook for _, spec in framework.pars.iterrows(): if spec.name not in self.pars: ts = dict() units = framework.get_databook_units(spec.name).strip().lower() for pop_name in self.pop_names: if data.pops[pop_name]["type"] == spec["population type"]: ts[pop_name] = TimeSeries(units=units) self.pars[spec.name] = Parameter(spec.name, ts) # Instantiate parameters for transfers and interactions # As with TDVE quantities, these must be initialized from the databook for tdc in data.transfers + data.interpops: if tdc.type == "transfer": item_storage = self.transfers elif tdc.type == "interaction": item_storage = self.interactions else: raise Exception("Unknown time-dependent connection type") name = tdc.code_name # The name of this interaction e.g. 'age' item_storage[name] = sc.odict() ts_dict = defaultdict(dict) for pop_link, ts in tdc.ts.items(): ts_dict[pop_link[0]][pop_link[1]] = ts.copy() for source_pop, ts in ts_dict.items(): item_storage[name][source_pop] = Parameter(name + "_from_" + source_pop, sc.dcp(ts)) def __setstate__(self, d): from .migration import migrate self.__dict__ = d parset = migrate(self) self.__dict__ = parset.__dict__
[docs] def all_pars(self): """ Return an iterator over all Parameters This is useful because transfers and interaction :class:`Parameter` instances are stored in nested dictionaries, so it's not trivial to iterate over all of them. :return: Generator over all :class:`Parameter` instances contained in the ``ParameterSet`` """ for par in self.pars.values(): yield par for obj in self.transfers.values() + self.interactions.values(): for par in obj.values(): yield par
[docs] def get_par(self, name: str, pop: str = None) -> Parameter: """ Retrieve parameter instance The parameter values for interactions and transfers are stored keyed by the source/from population. Thus, if the quantity name is an interaction or transfer, it is also necessary to specify the source population in order to return a :class:`Parameter` instance. :param name: The code name of a parameter, interaction, or transfer :param pop: :return: A :class:`Parameter` instance """ if name in self.pars: assert pd.isna(pop), f'"{name}" is a parameter so the ``pop`` should not be specified' return self.pars[name] elif name in self.transfers: assert not pd.isna(pop), f'"{name}" is a transfer, so the ``pop`` must be specified' return self.transfers[name][pop] elif name in self.interactions: assert not pd.isna(pop), f'"{name}" is an interaction, so the ``pop`` must be specified' return self.interactions[name][pop] else: raise KeyError(f'Parameter "{name}" not found')
[docs] def sample(self, constant=True): """ Return a sampled copy of the ParameterSet :param constant: If True, time series will be perturbed by a single constant offset. If False, a different perturbation will be applied to each time specific value independently. :return: A new :class:`ParameterSet` with perturbed values """ new = sc.dcp(self) for par in new.all_pars(): par.sample(constant) return new
[docs] def make_constant(self, year: float): """ Return a constant copy of the ParameterSet This function will return a copy of the ParameterSet where all parameter time series have been replaced with temporally static versions. :param year: Year to use for interpolation :return: A copy of the ParameterSet with constant parameters """ ps = self.copy(f'{self.name} (constant)') for par in ps.all_pars(): for ts in par.ts.values(): ts.insert(None, ts.interpolate(year)) ts.t = [] ts.vals = [] return ps
# SAVE AND LOAD CALIBRATION (Y-FACTORS) @property def y_factors(self) -> dict: """ Return y-values in a dictionary Note that any missing populations reflect population types. For example, the dictionary for a parameter in one population type will not contain any entries for populations that belong to another type :return: Dictionary keyed by ``(par, pop)`` containing a dict of y-values """ y_factors = {} for par_name, par in self.pars.items(): y_factors[(par_name, None)] = sc.mergedicts({"meta_y_factor": par.meta_y_factor}, par.y_factor) for par_name, d in self.interactions.items() + self.transfers.items(): for pop_name, par in d.items(): y_factors[(par_name, pop_name)] = sc.mergedicts({"meta_y_factor": par.meta_y_factor}, par.y_factor) return y_factors
[docs] def calibration_spreadsheet(self) -> sc.Spreadsheet: """ Return y-values in a spreadsheet Note that the tabular structure contains missing entries for any interactions that don't exist (e.g. due to missing population types) - these can be identified with ``pd.isna`` :return: Spreadsheet containing y-factors in tabular form """ # Initialize the bytestream f = io.BytesIO() writer = pd.ExcelWriter(f, engine="xlsxwriter") # writer.book.set_properties({"category": "atomica:framework"}) # standard_formats(writer.book) # Apply formatting # worksheet = writer.book.add_worksheet("Y-factors") df = pd.DataFrame(self.y_factors).T df.index.set_names(["par", "pop"], inplace=True) df.to_excel(writer, sheet_name="Y-factors", merge_cells=False) # Write index if present if self.initialization: self.initialization.to_excel(writer) # Close the workbook writer.close() return sc.Spreadsheet(f)
def set_initialization(self, res, year=None): self.initialization = Initialization.from_result(res, parset=self, year=year)
[docs] def apply_initialization(self, pop, framework=None) -> None: """ :param pop: An ``at.Model.Population`` instance containing compartment objects that require initialization :param framework: A Framework containing a specification of which compartments/characteristics are ordinarily used for initialization. If a framework is not provided, then the y-factors will not be validated. :return: """ if self.initialization is None: raise Exception("Attempted to apply an explicit compartment initialization but the parset does not contain an initialization") self.initialization.apply(pop, framework, self)
[docs] def save_calibration(self, fname) -> None: """ Save y-values to file :param fname: ``str`` or ``Path`` specifying location of file to save """ ss = self.calibration_spreadsheet() ss.save(fname)
[docs] def load_calibration(self, spreadsheet: sc.Spreadsheet) -> None: """ Load calibration y-factors This function reads a spreadsheet created by ``ParameterSet.save_calibration()`` and inserts the y-factors. It is permissive in that - If y-factors are present in the spreadsheet and not in the ``ParameterSet`` then they will be skipped - If y-factors are missing in the spreadsheet, the existing values will be maintained :param spreadsheet: :return: """ if not isinstance(spreadsheet, sc.Spreadsheet): spreadsheet = sc.Spreadsheet(spreadsheet) excelfile = spreadsheet.pandas() df = pd.read_excel(excelfile, "Y-factors" if "Y-factors" in excelfile.sheet_names else 0) df.set_index(["par", "pop"], inplace=True) if df.index.duplicated().any(): msg = "The calibration file contained duplicate entries:" for par, pop in df.index[df.index.duplicated()].unique(): msg += f'\n\t- {par} ({"meta/all populations" if pd.isna(pop) else pop})' raise Exception(msg) for (par_name, pop_name), values in df.to_dict(orient="index").items(): try: par = self.get_par(par_name, pop_name) except KeyError: if pop_name: logger.debug(f"{par.name} in {pop_name} was not found, ignoring y-factors for this quantity") else: logger.debug(f"{par.name} was not found, ignoring y-factors for this quantity") continue for k, v in values.items(): if pd.isna(v): continue if k == "meta_y_factor": par.meta_y_factor = v else: if k in par.y_factor: par.y_factor[k] = v else: logger.debug(f"The ParameterSet does not define a y-factor for {par.name} in the {k} population (e.g., because the population type does not match the parameter) - skipping") if "Initialization" in excelfile.sheet_names: self.initialization = Initialization.from_excel(excelfile) logger.debug("Loaded calibration from file")