Source code for atomica.model

"""
Implements the Atomica computational graph

Fundamentally, models in Atomica can be represented as a graph, with
nodes corresponding to compartments, and edges corresponding to transitions/links.
This module implements the graph representation of the Framework in a form that can
be numerically integrated. It also implements the methods to actually perform the integration.

"""

from .system import NotFoundError
from .system import logger
from .system import FrameworkSettings as FS
from .results import Result
from .function_parser import parse_function
from .version import version, gitinfo
from collections import defaultdict
import sciris as sc
import numpy as np
import matplotlib.pyplot as plt
from .programs import ProgramSet, ProgramInstructions
from .parameters import Parameter as ParsetParameter
from .parameters import ParameterSet as ParameterSet
import math
import pandas as pd

model_settings = dict()
model_settings["tolerance"] = 1e-6

__all__ = [
    "BadInitialization",
    "ModelError",
    "Variable",
    "Compartment",
    "JunctionCompartment",
    "ResidualJunctionCompartment",
    "SourceCompartment",
    "SinkCompartment",
    "TimedCompartment",
    "Characteristic",
    "Parameter",
    "Link",
    "TimedLink",
    "Population",
    "Model",
    "run_model",
]


[docs] class BadInitialization(Exception): """ Error for invalid conditions This error gets raised if the simulation exited due to a bad initialization, specifically due to negative initial popsizes or an excessive residual. This commonly happens if the initial conditions are being programatically varied and thus may be an expected error. This error can then be caught and dealt with appropriately. For example: - calibration will catch this error and instruct ASD to reject the proposed parameters - ``Ensemble.run_sims`` catches this error and tries another sample """ pass
[docs] class ModelError(Exception): """ Exception type for general model errors This error gets raised for generic errors arising during integration """ pass
[docs] class Variable: """ Integration object to manage compartments, characteristics, parameters, and links This is a lightweight abstract class to store arrays of values at each simulation time step. It includes functionality that is common to all integration objects, and defines the interface to be implemented by derived classes. :param pop: A :class:`Population` instance. This allows references back to the population containing an object (which facilitates a number of operations such as those that require the population's size) :param id: ID is a tuple that uniquely identifies the Variable within a model. By convention, this is a ``population:code_name`` tuple (but in the case of links, there are additional terms) """ def __init__(self, pop, id): self.id = id #: Unique identifier for the integration object self.t = None # : Array of time values. This should be a reference to the base array stored in a :class:`model` object self.dt = None # : Time step size if "vals" not in dir(self): # If a derived class implements a @property method for `vals`, then don't instantiate it as an attribute self.vals = None # : The fundamental values stored by this object self.units = "unknown" #: The units for the quantity, used for plotting and for validation. Note that the default ``'unknown'`` units are distinct to dimensionless units, which have value ``''`` self.pop = pop #: Reference back to the Population containing this object @property def name(self) -> str: """ Variable code name This is implemented as a property method because the ``id`` of the ``Variable`` is a tuple containing the population name and the variable code name, so this property method returns just the variable code name portion. That way, storage does not need to be duplicated. :return: A code name """ return self.id[-1]
[docs] def preallocate(self, tvec: np.array, dt: float) -> None: """ Preallocate data storage This method gets called just before integration, once the final sizes of the arrays are known. Performance is improved by preallocating the arrays. The ``tvec`` attribute is assigned as-is, so it is typically a reference to the array stored in the parent ``Model`` object. Thus, there is no duplication of storage there, and ``Variable.tvec`` is mainly for convenience when interpolating or plotting. This method may be overloaded in derived classes to preallocate other variables specific to those classes. :param tvec: An array of time values :param dt: Time step size """ self.t = tvec self.dt = dt self.vals = np.empty(tvec.shape) self.vals.fill(np.nan)
[docs] def plot(self) -> None: """ Produce a time series plot This is a quick function to make a basic line plot of this ``Variable``. Mainly intended for debugging. Production-ready plots should be generated using the plotting library functions instead """ plt.figure() plt.plot(self.t, self.vals, label=self.name) plt.legend() plt.xlabel("Year") plt.ylabel("%s (%s)" % (self.name, self.units))
[docs] def update(self, ti: int) -> None: """ Update the value at a given time index This method performs any required computations to update the value of the variable at a given time index. For example, Parameters may require their update function to be called, while characteristics need their source compartments to be added up. This method generally must be overloaded in derived classes e.g. :meth:`Parameter.update` :param ti: Time index to update """ return
[docs] def set_dynamic(self, **kwargs) -> None: """ Make the variable a dependency For Compartments and Links, this does nothing. For Characteristics and Parameters, it will set the dynamic flag, but in addition, any validation constraints e.g. a Parameter that depends on Links cannot itself be dynamic, will be enforced. This method generally must be overloaded in derived classes e.g. :meth:`Parameter.set_dynamic` """ return
def unlink(self): self.pop = self.pop.name def relink(self, objs): self.pop = objs[self.pop] def __repr__(self): return '%s "%s" %s' % (self.__class__.__name__, self.name, self.id) def __getitem__(self, key): """ Shortcut to access .vals attribute This simplifies syntax for the most common operations on Variable objects, but most importantly, enables derived classes that implement a ``@property`` method for ``vals`` to also overload the indexing. e.g. ``alive[ti]`` or ``b_rate[ti]`` :param key: Indexer passed directly to the numpy array ``self.vals[key]`` :return: A float or array of floats the same size as ``key`` """ return self.vals[key] def __setitem__(self, key, value) -> None: """ Shortcut to set .vals attribute This method assigns values to the `.vals` attribute e.g., ``sus[ti] = 0`` instead of ``sus.vals[ti] = 0``. Importantly, this allows the derived class to overload ``__getitem__`` if ``vals`` is a property method instead of an actual array. :param key: Indexer passed directly to the numpy array ``self.vals[item]`` :param value: Value to assign to the result of the indexing operation """ self.vals[key] = value
[docs] class Compartment(Variable): """A class to wrap up data for one compartment within a cascade network.""" def __init__(self, pop, name): Variable.__init__(self, pop=pop, id=(pop.name, name)) self.units = "Number of people" self.outlinks = [] self.inlinks = [] self._cached_outflow = None def unlink(self): Variable.unlink(self) self.outlinks = [x.id for x in self.outlinks] self.inlinks = [x.id for x in self.inlinks] def relink(self, objs): Variable.relink(self, objs) self.outlinks = [objs[x] for x in self.outlinks] self.inlinks = [objs[x] for x in self.inlinks] @property def outflow(self) -> np.array: """ Return the outflow at all times :return: The sum of outgoing links at all time indices """ # Return the outflow at each timestep - for a junction, this is equal to the number # of people that were in the junction x = np.zeros(self.t.shape) if self.outlinks: for link in self.outlinks: x += link.vals return x # Function below is a reference implementation but isn't used or needed anywhere yet # Keeping it here just in case it's useful later # # def inflow(self, ti:int=None): # """ # Return the inflow at specified times # # :param ti: Optionally specify a time index. Default is all times # :return: If `ti` is None, an array the same size as `self.t`. Otherwise, returns a float # # """ # # if ti is None: # ti = np.arange(0, len(self.t)) # x = np.zeros(self.t.shape) # else: # x = 0 # # for link in self.inlinks: # x += link[ti] # # return x def expected_duration(self, ti=None): # Returns the expected number of years that an individual is expected to remain # in this compartment for, if the outgoing flow rates are maintained if ti is None: ti = np.arange(0, len(self.t)) outflow_probability = 0 # Outflow probability at this timestep for link in self.outlinks: par = link.parameter transition = par[ti] if par.units == FS.QUANTITY_TYPE_DURATION: outflow_probability += min(1, self.dt / (transition * par.timescale)) elif par.units in {FS.QUANTITY_TYPE_PROBABILITY, FS.QUANTITY_TYPE_RATE}: outflow_probability += min(1, transition * (self.dt / par.timescale)) elif par.units == FS.QUANTITY_TYPE_NUMBER: outflow_probability += par[ti] * self.dt / self[ti] elif par.units == FS.QUANTITY_TYPE_PROPORTION: outflow_probability += par[ti] else: raise ModelError("Unknown parameter units") remain_probability = 1 - outflow_probability # This is the algorithm - we calculate the probability of leaving after a specific number of time steps, and then # take the expectation value. The example below shows a discrete numerical calculation as well as a closed-form # approximation which is what actually gets used # remain_probability = 0.99 # x = np.arange(0,1000) # y = (remain_probability**x)*(1-remain_probability) # numerical = np.sum((x+1) * y) * self.dt # x+1 because if we leave at the first timestep, we are considered to have stayed for dt units of time # analytic = (1 - (1. / np.log(remain_probability) ** 2) * (remain_probability - 1)) * self.dt # print('numerical=%.2f, analytic=%.2f' % (numerical,analytic)) dur = (1 - (1.0 / np.log(remain_probability) ** 2) * (remain_probability - 1)) * self.dt return dur def expected_outflow(self, ti): # After 1 year, where are people expected to be? If people would leave in less than a year, # then the numbers correspond to when the compartment is empty popsize = self[ti] # This is the number of people who are leaving outflow = {link.dest.name: popsize * link[ti] / self.dt for link in self.outlinks} rescale = popsize / sum([y for _, y in outflow.items()]) for x in outflow.keys(): outflow[x] *= rescale outflow[self.name] = popsize - sum([y for _, y in outflow.items()]) return outflow
[docs] def resolve_outflows(self, ti: int) -> None: """ Resolve outgoing links and convert to number For the base class, rescale outgoing links so that the compartment won't go negative :param ti: Time index at which to update link outflows """ outflow = 0.0 for link in self.outlinks: outflow += link._cache if outflow > 1: rescale = 1 / outflow else: rescale = 1 n = rescale * self.vals[ti] self._cached_outflow = 0 for link in self.outlinks: link.vals[ti] = link._cache * n self._cached_outflow += link.vals[ti]
[docs] def update(self, ti: int) -> None: """ Update compartment value A compartment update passes in the time to assign (ti). We have x(ti) = x(ti-1) + inflow(ti-1) - outflow(ti-1) Thus, need to initialize the value and then resolve all inflows and outflows Junctions have already been flushed and thus do not need to be updated in this step :param ti: :return: """ tr = ti - 1 v = self.vals[tr] v -= self._cached_outflow for link in self.inlinks: v += link.vals[tr] # Guard against populations becoming negative due to numerical artifacts if v > 0: self.vals[ti] = v else: self.vals[ti] = 0.0
[docs] def connect(self, dest, par) -> None: """ Construct link out of this compartment :param dest: A ``Compartment`` instance :param par: The parameter that the Link will be associated with """ Link.create(pop=self.pop, parameter=par, source=self, dest=dest)
[docs] class JunctionCompartment(Compartment): def __init__(self, pop, name: str, duration_group: str = None): """ A TimedCompartment has a duration group by virtue of having a `.parameter` attribute and a flush link. A junction might belong to a duration group by having inflows and outflows exclusively from that group. However, it might not be directly attached to any timed compartments, if the connections are entirely via upstream and downstream junctions. Junctions also do not have flush links. Therefore, we record membership in the duration group by specifying the name of the parameter in the (indirect) upstream and downstream compartments. Note that having connections to timed compartments does not itself indicate membership in the duration group - the junction is only a member if all of the upstream and downstream compartments belong to the same duration group. The duration group is determined in framework validation and stored in the framework. :param pop: :param name: :param duration_group: Optionally specify a duration group """ super().__init__(pop, name) self.duration_group = duration_group #: Store the name of the duration group, if the junction belongs to one
[docs] def connect(self, dest, par) -> None: """ Construct link out of this compartment For junctions, outgoing links are normal links unless the junction belongs to a duration group. If the junction belongs to a duration group, then the output links should all be :class:`TimedLink` instances. :param dest: A ``Compartment`` instance :param par: The parameter that the Link will be associated with """ if self.duration_group: if (isinstance(dest, TimedCompartment) and dest.duration_group != self.duration_group) or (isinstance(dest, JunctionCompartment) and dest.duration_group != self.duration_group): raise ModelError("Mismatched junction duration groups - the framework has not been validated correctly") TimedLink.create(pop=self.pop, parameter=par, source=self, dest=dest) else: Link.create(pop=self.pop, parameter=par, source=self, dest=dest)
[docs] def resolve_outflows(self, ti: int) -> None: """ Resolve outgoing links and convert to number For junctions, links are updated in a separate step because the logic is different - instead of the value of links coming from parameters, they come from the inflow into the junction. We cannot update the junctions in the same step as compartments because while the junction subgraphs must be acyclic, the entire system may contain cycles. Therefore it is not possible to determine a topological ordering for the entire system. The system is solvable because for normal compartments, only one transition per timestep is allowed. But regardless, this prevents using graph analysis to determine an execution order for the entire system that would allow junction computation in the same step. :param ti: Time index to resolve outflows """ pass
[docs] def update(self, ti: int) -> None: """ Update the junction at given time index For compartments the ``update`` methods steps them forward in time. However, for junctions the value never needs to be stepped forward, because it's always zero. Thus, this function does nothing. :param ti: Time index to update """ pass
[docs] def preallocate(self, tvec: np.array, dt: float) -> None: """ Preallocate junction values Junction preallocation pre-fills the values with zeros, since the junction must be empty at all times. :param tvec: An array of time values :param dt: Time step size """ super().preallocate(tvec, dt) self.vals.fill(0.0) #: Junction compartments are always empty (note the first entry may be given a nonzero value after compartment initialization and before the initial flush)
[docs] def balance(self, ti: int) -> None: """ Balance junction inflows and outflows This is the primary update for a junction - where outflows are adjusted so that they equal the inflows. It's analogous to `resolve_outflows` as a primary update method, but it takes place at a separate integration stage (once the outflows for normal compartments have been finalized, as these outputs serve as the junction inputs in that same timestep) After this method is called, all outflows should equal inflows The parameter workflow is simplified because all links flowing out of junctions must be in 'proportion' units, so they can be looked up and used directly. :param ti: Time index to update (scalar only, as this function is only called during integration) """ # First, work out the total inflow that needs to pass through the junction net_inflow = 0 if self.duration_group: for link in self.inlinks: net_inflow += link._vals[:, ti] # If part of a duration group, get the flow from TimedLink._vals else: for link in self.inlinks: net_inflow += link.vals[ti] # If not part of a duration group, get scalar flow from Link.vals # Next, get the total outflow. Note that the parameters are guaranteed to be in proportion units here outflow_fractions = [link.parameter.vals[ti] for link in self.outlinks] total_outflow = sum(outflow_fractions) # Finally, assign the inflow to the outflow proportionately accounting for the total outflow downscaling for frac, link in zip(outflow_fractions, self.outlinks): if self.duration_group: link._vals[:, ti] = net_inflow * frac / total_outflow else: link.vals[ti] = net_inflow * frac / total_outflow
[docs] def initial_flush(self) -> None: """ Perform an initial junction flush If the junction was initialized with a nonzero value, then the initial people need to be flushed. This is distinct from balancing, because the source of the people is the junction itself, rather than the incoming links. In this step, we need to actually update the value of the downstream compartments, rather than updating the link values - the links get rebalanced again based on the parameters after the initial flush has been completed """ if self.vals[0] > 0: # Work out the outflow fractions outflow_fractions = np.array([link.parameter.vals[0] for link in self.outlinks]) outflow_fractions /= np.sum(outflow_fractions) # Assign the inflow directly to the outflow compartments # This is done using the [] indexing on the downstream compartment so it is agnostic # to the downstream compartment type for frac, link in zip(outflow_fractions, self.outlinks): link.dest[0] += self.vals[0] * frac self.vals[0] = 0.0
[docs] class ResidualJunctionCompartment(JunctionCompartment): """ Junction with a residual outflow A residual outflow junction has a single additional outflow link """
[docs] def balance(self, ti: int) -> None: """ Balance junction inflows and outflows For a ResidualJunctionCompartment, if the outflows sum to less than 1, rather than scaling them up proportionately, the residual is assigned to the residual link. :param ti: Time index to update (scalar only, as this function is only called during integration) """ # First, work out the total inflow that needs to pass through the junction net_inflow = np.array([0], dtype=float) if self.duration_group: for link in self.inlinks: net_inflow = net_inflow + link._vals[:, ti] # If part of a duration group, get the flow from TimedLink._vals. nb. using += doesn't work with some array size combinations else: for link in self.inlinks: net_inflow += link.vals[ti] # If not part of a duration group, get scalar flow from Link.vals outflow_fractions = np.zeros(len(self.outlinks)) for i, link in enumerate(self.outlinks): if link.parameter is not None: outflow_fractions[i] = link.parameter.vals[ti] else: outflow_fractions[i] = 0 total_outflow = outflow_fractions.sum() if total_outflow > 1: outflow_fractions /= total_outflow # Calculate outflows outflow = net_inflow.reshape(-1, 1) * outflow_fractions.reshape(1, -1) # Finally, assign the inflow to the outflow proportionately accounting for the total outflow downscaling for frac, link in zip(outflow_fractions, self.outlinks): if link.parameter is None and total_outflow < 1: flow = net_inflow - np.sum(outflow, axis=1) # Sum after multiplying by outflow fractions to reduce numerical precision errors and enforce conserved quantities more accurately else: flow = net_inflow * frac if self.duration_group: link._vals[:, ti] = flow else: link.vals[ti] = flow
[docs] def initial_flush(self) -> None: """ Perform an initial junction flush """ if self.vals[0] > 0: # Work out the outflow fractions outflow_fractions = np.zeros(len(self.outlinks)) for i, link in enumerate(self.outlinks): if link.parameter is not None: outflow_fractions[i] = link.parameter.vals[0] else: outflow_fractions[i] = 0 total_outflow = sum(outflow_fractions) if total_outflow < 1: has_residual = True else: outflow_fractions /= total_outflow has_residual = False # Assign the inflow directly to the outflow compartments # This is done using the [] indexing on the downstream compartment so it is agnostic # to the downstream compartment type # Finally, assign the inflow to the outflow proportionately accounting for the total outflow downscaling for frac, link in zip(outflow_fractions, self.outlinks): if link.parameter is None: if has_residual: link.dest[0] += self.vals[0] - sum(self.vals[0] * outflow_fractions) else: link.dest[0] += self.vals[0] * frac self.vals[0] = 0.0
[docs] class SourceCompartment(Compartment): """ Derived class for source compartments Source compartments are unlimited reservoirs, and are subject to limitations like - Unlimited size - No inflows - Only one outflow Therefore, the methods to update these compartments can take some shortcuts not available to normal compartments. These shortcuts are implemented in the overloaded methods here. """ def __init__(self, pop, name): super().__init__(pop, name)
[docs] def preallocate(self, tvec: np.array, dt: float) -> None: super().preallocate(tvec, dt) self.vals.fill(0.0) #: Source compartments have an unlimited number of people in them. TODO: If this complicates validation, set to 0.0
[docs] def resolve_outflows(self, ti: int) -> None: for link in self.outlinks: link.vals[ti] = link._cache
[docs] def update(self, ti: int) -> None: pass
[docs] class SinkCompartment(Compartment): def __init__(self, pop, name): super().__init__(pop, name)
[docs] def preallocate(self, tvec: np.array, dt: float) -> None: """ Preallocate sink compartment A sink compartment is initialized with 0.0 value at the initial timestep :param tvec: Simulation time vector :param dt: Simulation time step """ super().preallocate(tvec, dt) self.vals[0] = 0.0
[docs] def resolve_outflows(self, ti: int) -> None: """ Resolve sink outflows There should not be any outflows, so this function immediately returns """ pass
[docs] def connect(self, dest, par) -> None: """ Construct link out of sink compartment This method raises an error because sinks cannot have outflows :param dest: A ``Compartment`` instance :param par: The parameter that the Link will be associated with """ raise ModelError("Sink compartments cannot have outflows")
[docs] def update(self, ti: int) -> None: """ Update sink compartment value A sink compartment has inputs only :param ti: Time index to update """ tr = ti - 1 v = self.vals[tr] for link in self.inlinks: v += link.vals[tr] self.vals[ti] = v
[docs] class TimedCompartment(Compartment): def __init__(self, pop, name: str, parameter: ParsetParameter): """ Instantiate the TimedCompartment Preallocation takes place before the compartment values are initialized. Therefore, we need to calculate the duration here, hence there is a need to take in the parset here. :param pop: A ``Population`` instance (the instance that will store this compartment) :param name: The name of the compartment :param parameter: The parameter that will supply the duration (to be read after the parameter function is evaluated, if applicable) """ Compartment.__init__(self, pop=pop, name=name) self._vals = None #: Primary storage, a matrix of size (duration x timesteps). The first row is the one that people leave from, the last row is the one that people arrive in self.parameter = parameter #: The parameter to read the duration from - this needs to be done after parameters are precomputed though, in case the duration is coming from a (constant) function self.flush_link = None #: Reference to the timed outflow link that flushes the compartment. Note that this needs to be set externally because Links are instantiated after Compartments @property def duration_group(self): return self.parameter.name def unlink(self): Compartment.unlink(self) if self.flush_link: self.flush_link = self.flush_link.id def relink(self, objs): # Given a dictionary of objects, restore the internal references Compartment.relink(self, objs) if self.flush_link: self.flush_link = objs[self.flush_link] @property def vals(self) -> np.array: """ Compartment size This returns the compartment size at all times, obtained by summing over the people in each time bin :return: A numpy array with the compartment size """ return self._vals.sum(axis=0) def __getitem__(self, ti): """ Retrieve compartment size at given time index By only adding the requested timesteps, this approach is faster than accessing ``vals`` first i.e. >>> self.vals[ti] >>> self[ti] # <-- Faster :param ti: Time index/indices :return: Total compartment size at given time index/indices """ return self._vals[:, ti].sum(axis=0) def __setitem__(self, ti, value) -> None: """ Assign to values assuming constant arrival rate This method takes in the total population size at time(s) ``ti``, and assigns them equally to all of the time bins in the compartment. Currently, this is only expected to be needed during initialization, so for maximum safety, this method will raise an error if any other times are used. This is because setting the value will destroy any information about the distribution of times. This constraint could be safely relaxed if it's needed for a specific purpose, but otherwise it's safer not to permit it. :param ti: Time indices to assign. Currently, only `0` is allowed :param value: The value to assign to times `ti` """ ti = sc.promotetoarray(ti) value = sc.promotetoarray(value) if ti.size > 1 or ti[0] != 0: raise ModelError("For safety, explicitly setting the compartment size for a TimedCompartment can currently only be done for the initial conditions. This requirement can be likely be relaxed if needed for a particular use case") self._vals[:, ti] = value.reshape((1, -1)) / (self._vals.shape[0] * np.ones((self._vals.shape[0], 1)))
[docs] def preallocate(self, tvec: np.array, dt: float) -> None: """ Preallocate data storage :param tvec: An array of time values :param dt: Time step size """ # Preallocating the keyring matrix rounds the duration down # That way, if the duration is less than the step size, the compartment will # be emptied every timestep # Note that the effective duration is _entirely_ driven by the number of rows in `self._vals` # because the logic is simply - flush the topmost row, shift the array by 1, inflow goes in the bottom row self.t = tvec self.dt = dt assert np.all(self.parameter.vals == self.parameter.vals[0]), "Duration parameter value cannot vary over time" duration = self.parameter.vals[0] * self.parameter.timescale * self.parameter.scale_factor self._vals = np.empty((max(1, math.ceil(duration / dt)), tvec.size), order="F") # Fortran/column-major order should be faster for summing over lags to get `vals` self._vals.fill(np.nan)
[docs] def resolve_outflows(self, ti: int) -> None: """ Resolve outgoing links At this point, all links going out of the compartment would have had their ``._cache`` value set to the fraction to transfer. This method converts them to numbers, taking into account the fact that only certain subcompartments are eligible for timed links within the same duration group :param ti: Time index at which to update link outflows """ # First, work out the scale factors as usual self.flush_link._cache = 0.0 # At this stage, no outflow goes via the flush link total_outflow = np.zeros(self._vals.shape[0]) for link in self.outlinks: if isinstance(link, TimedLink): total_outflow[1:] += link._cache # Timed link outflows do not act on the final subcompartment else: total_outflow[:] += link._cache # Normal link outflows do act on the final subcompartment # Rescaling factors for each subcompartment rescale = np.divide(1, total_outflow, out=np.ones_like(total_outflow), where=total_outflow > 1) n = rescale * self._vals[:, ti] # Rescaled number of people - to multiply by the cache value on each link self._cached_outflow = np.zeros(self._vals.shape[0]) # Cache the outflow, because for Links, we accumulate the subcompartment outflow but we need to record the subcompartment outflow separately for link in self.outlinks: if isinstance(link, TimedLink): link._vals[:, ti] = n * link._cache link._vals[0, ti] = 0.0 # No flow out of final subcompartment self._cached_outflow += link._vals[:, ti] else: link.vals[ti] = sum(n * link._cache) self._cached_outflow += n * link._cache self.flush_link.vals[ti] = max(0, self._vals[0, ti] - self._cached_outflow[0]) self._cached_outflow[0] += self.flush_link.vals[ti]
[docs] def update(self, ti: int) -> None: """ Update compartment value This compartment update goes from ``ti-1`` to ``ti``. As part of this, the counts for each duration are rolled over. :param ti: Time index to step to """ tr = ti - 1 # First, apply all of the outflows (computed by `resolve_outflows()` at the last timestep) self._vals[:, ti] = self._vals[:, tr] - self._cached_outflow # Then, add in TimedLink inputs (prior to advancing the keyring) for link in self.inlinks: if isinstance(link, TimedLink): if self._vals.shape[0] == link._vals.shape[0]: # The sizes match exactly, no need to index rows at all self._vals[:, ti] += link._vals[:, tr] elif self._vals.shape[0] > link._vals.shape[0]: # This compartment has a longer duration, so only insert the rows we've got self._vals[: link._vals.shape[0], ti] += link._vals[:, tr] else: # This compartment has a shorter duration, so first insert the values we've got # then sum up and add the extra values to the initial subcompartment self._vals[:, ti] += link._vals[: self._vals.shape[0], tr] self._vals[-1, ti] += sum(link._vals[self._vals.shape[0] :, tr].tolist()) # Advance the keyring # If this TimedCompartment has only one row, then anyone coming in via TimedLinks will be placed directly # in the final subcompartment (which is also the initial subcompartment). We are assuming that the cached # outflow correctly emptied everyone in the flush compartment so don't check that the final subcompartment # is empty because people could have been added to it in this timestep if self._vals.shape[0] > 1: self._vals[0:-1, ti] = self._vals[1:, ti] self._vals[-1, ti] = 0.0 # Zero out the inflow (otherwise, it just replicates previous value) # Now, resolve other inputs for which durations are not preserved # Regardless of whether they are TimedLinks or not, they should go into the initial subcompartment for link in self.inlinks: if not isinstance(link, TimedLink): self._vals[-1, ti] += link[tr] self._vals[self._vals[:, ti] < 0, ti] = 0
[docs] def connect(self, dest, par) -> None: """ Construct link out of this compartment For TimedCompartments, outgoing links are TimedLinks if the downstream compartment belongs to the same duration group. For this purpose, a junction can be part of a duration group if all of the downstream junction outputs are in this compartment's duration group. :param dest: A ``Compartment`` instance :param par: The parameter that the Link will be associated with """ if (isinstance(dest, TimedCompartment) and dest.parameter.name == self.parameter.name) or (isinstance(dest, JunctionCompartment) and dest.duration_group == self.parameter.name): # Note that comparing the name rather than the instance ID will allow duration-preserving transfers where a difference instance of the same parameter is present in the dest population new_link = TimedLink.create(pop=self.pop, parameter=par, source=self, dest=dest) else: # Note that if the duration group doesn't match, then a Link rather than a TimedLink will be instantiated new_link = Link.create(pop=self.pop, parameter=par, source=self, dest=dest) if par is self.parameter: # If we are connecting up the timed parameter, also assign it to the flush link # We also need to break the connection between the parameter and the link # so that the parameter doesn't try to update the link during `update_pars()` assert not isinstance(new_link, TimedLink), "Cannot flush into the same duration group" self.flush_link = new_link new_link.parameter = None par.links = []
[docs] class Characteristic(Variable): """A characteristic represents a grouping of compartments.""" def __init__(self, pop, name): # includes is a list of Compartments, whose values are summed # the denominator is another Characteristic that normalizes this one # All passed by reference so minimal performance impact Variable.__init__(self, pop=pop, id=(pop.name, name)) self.units = "Number of people" self.includes = [] self.denominator = None # The following flag indicates if another variable depends on this one. # This indicates a value needs computation during integration. self._is_dynamic = False self._vals = None
[docs] def preallocate(self, tvec: np.array, dt: float) -> None: """ Preallocate data storage :param tvec: An array of time values :param dt: Time step size """ self.t = tvec self.dt = dt if self._is_dynamic: self._vals = np.empty(tvec.shape) self._vals.fill(np.nan)
def get_included_comps(self): includes = [] for inc in self.includes: if isinstance(inc, Characteristic): includes += inc.get_included_comps() else: includes.append(inc) return includes @property def vals(self): if self._vals is None: vals = np.zeros(self.t.shape) for comp in self.includes: vals += comp.vals if self.denominator is not None: denom = self.denominator.vals vals_zero = vals < model_settings["tolerance"] vals[denom > 0] /= denom[denom > 0] vals[vals_zero] = 0.0 vals[(denom <= 0) & (~vals_zero)] = np.inf return vals else: return self._vals
[docs] def set_dynamic(self, **kwargs): self._is_dynamic = True for inc in self.includes: inc.set_dynamic() if self.denominator is not None: self.denominator.set_dynamic()
def unlink(self): Variable.unlink(self) self.includes = [x.id for x in self.includes] self.denominator = self.denominator.id if self.denominator is not None else None def relink(self, objs): # Given a dictionary of objects, restore the internal references Variable.relink(self, objs) self.includes = [objs[x] for x in self.includes] self.denominator = objs[self.denominator] if self.denominator is not None else None def add_include(self, x): assert isinstance(x, Compartment) or isinstance(x, Characteristic) self.includes.append(x) def add_denom(self, x): assert isinstance(x, Compartment) or isinstance(x, Characteristic) self.denominator = x self.units = ""
[docs] def update(self, ti): self._vals[ti] = 0 for comp in self.includes: self._vals[ti] += comp[ti] if self.denominator is not None: denom = self.denominator[ti] if denom > 0: self._vals[ti] /= denom elif self._vals[ti] < model_settings["tolerance"]: self._vals[ti] = 0 # Given a zero/zero case, make the answer zero. else: self._vals[ti] = np.inf # Given a non-zero/zero case, keep the answer infinite.
[docs] class Parameter(Variable): """ Integration object to represent Parameters A parameter is a Variable that can have a value computed via an fcn_str and a list of dependent Variables. This is a Parameter in the cascade.xlsx sense - there is one Parameter object for every item in the Parameters sheet. A parameter that maps to multiple transitions (e.g. ``doth_rate``) will have one parameter and multiple Link instances that depend on the same Parameter instance :param pop: A :class:`Population` instance corresponding to the population that will contain this parameter :param name: The code name for this parameter """ def __init__(self, pop, name): Variable.__init__(self, pop=pop, id=(pop.name, name)) self.limits = None #: Can be a two element vector [min,max] self.pop_aggregation = None #: If not None, stores list of population aggregation information (special function, which weighting comp/charac and which interaction term to use) self.scale_factor = 1.0 #: This should be set to the product of the population-specific ``y_factor`` and the ``meta_y_factor`` from the ParameterSet self.links = [] #: References to links that derive from this parameter self._source_popsize_cache_time = None # : Internal cache for the time at which the source popsize was previously computed self._source_popsize_cache_val = None # : Internal cache for the last previously computed source popsize self.fcn_str = None #: String representation of parameter function self.deps = dict() #: Dict of dependencies containing lists of integration objects self._fcn = None #: Internal cache for parsed parameter function (this will be dropped when pickled) self._precompute = False #: If True, the parameter function will be computed in a vector operation prior to integration self._is_dynamic = False #: If True, this parameter has values that need to be updated or assigned during integration. Note that `precompute` and `dynamic` are mutually exclusive self.derivative = False #: If True, the parameter function will be treated as a derivative and the value added on to the end self._dx = None #: Internal cache for the value of the derivative at the current timestep self.skip_function = None #: Can optionally be set to a (start,stop) tuple of times. Between these times, the parameter will not be updated so the parset value will be left unchanged #: For transition parameters, the ``vals`` stored by the parameter is effectively a rate. The ``timescale`` #: attribute informs the time period corresponding to the units in which the rate has been provided. #: If the units are ``number`` or ``probability`` then the ``timescale`` corresponds to the denominator of the units #: e.g. probability/day (with ``timescale=1/365``). #: If the units are ``duration`` then the timescale stores the units in which the duration has been specified #: e.g. ``duration=1`` with ``timescale=1/52`` is the same as ``duration=7`` with ``timescale=1/365`` #: The effective duration used in the simulation is ``duration*timescale`` with units of years (so it will behave correctly if #: one wanted to use 1/365.25 instead of 1/365) self.timescale = 1.0
[docs] def set_fcn(self, fcn_str) -> None: """ Add a function to this parameter This method adds a function to the parameter. The following steps are carried out - The function is parsed and stored in the ``._fcn`` attribute (until the Parameter is pickled) - The dependencies are extracted, and are stored as references to the actual integration objects to increase performance during integration - If this is a transition parameter, then dependencies will have their `dynamic` flag updated :param framework: A py:class:`ProjectFramework` instance, used to identify and retrieve interaction terms :param fcn_str: The string containing the function to add """ assert sc.isstring(fcn_str), "Parameter function must be supplied as a string" self.fcn_str = fcn_str self._fcn, dep_list = parse_function(self.fcn_str) if fcn_str.startswith("SRC_POP_AVG") or fcn_str.startswith("TGT_POP_AVG") or fcn_str.startswith("SRC_POP_SUM") or fcn_str.startswith("TGT_POP_SUM"): # The function is like 'SRC_POP_AVG(par_name,interaction_name,charac_name)' # self.pop_aggregation will be ['SRC_POP_AVG',par_name,interaction_name,charac_object] special_function, temp_list = self.fcn_str.split("(") function_args = temp_list.rstrip(")").split(",") function_args = [x.strip() for x in function_args] self.pop_aggregation = [special_function] + function_args # Aggregation dependencies are set externally because they may cross populations - see `Model.build()` # Note that aggregations are computed externally rather than via `Parameter.update` else: for dep_name in dep_list: if not (dep_name in ["t", "dt"]): # There are no integration variables associated with the interactions, as they are treated as a special matrix self.deps[dep_name] = self.pop.get_variable(dep_name) # nb. this lookup will fail if the user has a function that depends on a quantity outside this population
[docs] def set_dynamic(self, progset=None) -> None: """ Mark this parameter as needing evaluation for integration If a parameter has a function and `set_dynamic()` has been called, that means the result of the function is required to drive a transition in the model. In this context, 'dynamic' means that the parameter depends on values that are only known during integration (i.e. compartment sizes). It could also depend on compartment sizes indirectly, if it depends on characteristics, or if it depends on parameters that depend on compartments or characteristics. If none of these are true, then the parameter function can be evaluated before integration starts, taking advantage of vector operations. If a parameter is overwritten by a program in this simulation, then its value may change during integration, and we again need to make this parameter dynamic. Note that this doesn't apply to the parameter *being* overwritten - if it gets overwritten, that has no bearing on when its function gets evaluated. But if a parameter depends on a value that has changed, then the function must be re-evaluated. Technically this only needs to be done after the time index where programs turn on, but it's simpler just to mark it as dynamic for the entire simulation, the gains aren't large enough to justify the extra complexity otherwise. :param progset: A ``ProgramSet`` instance (the one contained in the Model containing this Parameter) """ if self.fcn_str is None or self._is_dynamic or self._precompute: # Only parameters with functions need to be made dynamic # If the parameter is marked dynamic or precompute, that means this parameter and any of its # dependencies have already been checked return if self.pop_aggregation or self.derivative: # Pop aggregations and derivatives are handled in `update_pars()` so we set the dynamic flag # because the values are modified during integration self._is_dynamic = True if self.deps: # If there are no dependencies, then we know that this is precompute-only for deps in self.deps.values(): # deps is {'dep_name':[dep_objects]} for dep in deps: if isinstance(dep, Link): raise ModelError(f"Parameter '{self.name}' depends on transition flow '{dep.name}' thus it cannot be a dependency, it must be output only.") elif isinstance(dep, Compartment) or isinstance(dep, Characteristic): dep.set_dynamic(progset=progset) self._is_dynamic = True elif isinstance(dep, Parameter): dep.set_dynamic(progset=progset) # Run `set_dynamic()` on the parameter which will descend further to see if the Parameter depends on comps/characs or on overwritten parameters if dep._is_dynamic or (progset and dep.name in progset.pars): self._is_dynamic = True else: raise ModelError("Unexpected dependency type") # If not dynamic, then we need to precompute the function because the value is required for a transition if not self._is_dynamic: self._precompute = True
def unlink(self): Variable.unlink(self) self.links = [x.id for x in self.links] if self.deps is not None: for dep_name in self.deps: self.deps[dep_name] = [x.id for x in self.deps[dep_name]] if self._fcn is not None: self._fcn = None def relink(self, objs): # Given a dictionary of objects, restore the internal references Variable.relink(self, objs) self.links = [objs[x] for x in self.links] if self.deps is not None: for dep_name in self.deps: self.deps[dep_name] = [objs[x] for x in self.deps[dep_name]] if self.fcn_str: self._fcn = parse_function(self.fcn_str)[0]
[docs] def constrain(self, ti=None) -> None: """ Constrain the parameter value to allowed range If ``Parameter.limits`` is not None, then the parameter values at the specified time will be clipped to the limits. If no time index is provided, clipping will be performed on all values in a vectorized operation. Vector clipping is used for data parameters that are not evaluated during integration, while index-specific clipping is used for dependencies that are calculated during integration. :param ti: An integer index, or None (to operate on all time points) """ if self.limits is not None: if ti is None: self.vals = np.clip(self.vals, self.limits[0], self.limits[1]) else: if self.vals[ti] < self.limits[0]: self.vals[ti] = self.limits[0] if self.vals[ti] > self.limits[1]: self.vals[ti] = self.limits[1]
[docs] def update(self, ti=None) -> None: """ Update the value of this Parameter If the parameter contains a function, this entails evaluating the function. Typically this is done at a single time point during integration, or over all time points for precomputing and postcomputing :param ti: An ``int``, or a numpy array with index values. If ``None``, all time values will be used """ if not self._fcn or self.pop_aggregation: return if ti is None: if self.derivative: raise ModelError("Cannot perform a vector update of a derivative parameter (these parameters intrinsically require integration to be computed)") ti = np.arange(0, self.vals.size) # This corresponds to every time point if self.skip_function: # If we don't want to overwrite the parameter in certain years, those years need to be excluded if hasattr(ti, "__len__"): # Filter out years outside the excluded range ti = ti[np.where((self.t[ti] < self.skip_function[0]) | (self.t[ti] > self.skip_function[1]))] if ti.size == 0: # If all times were removed return else: # Dealing with a scalar ti if (self.t[ti] >= self.skip_function[0]) and (self.t[ti] <= self.skip_function[1]): return dep_vals = dict.fromkeys(self.deps, 0.0) for dep_name, deps in self.deps.items(): for dep in deps: if isinstance(dep, Parameter) or isinstance(dep, Characteristic): dep_vals[dep_name] += dep.vals[ti] elif isinstance(dep, Compartment): dep_vals[dep_name] += dep[ti] elif isinstance(dep, Link): dep_vals[dep_name] += dep[ti] / dep.dt else: raise ModelError("Unhandled case") dep_vals["t"] = self.t[ti] dep_vals["dt"] = self.dt try: v = self.scale_factor * self._fcn(**dep_vals) except Exception as e: raise ModelError(f"Error when calculating the value for parameter: {self}") from e if self.derivative: self._dx = v else: self[ti] = v
def source_popsize(self, ti): # Get the total number of people covered by this program # i.e. the sum of the source compartments of all links that # derive from this program. If the parameter has no links, return NaN # which disambiguates between a parameter whose source compartments are all # empty, vs a parameter that has no source compartments if ti == self._source_popsize_cache_time: return self._source_popsize_cache_val else: if self.links: n = 0 for link in self.links: n += link.source[ti] # Use the direct indexing because we don't know what type of compartment we are operating on else: raise ModelError("Cannot retrieve source popsize for a non-transition parameter") self._source_popsize_cache_time = ti self._source_popsize_cache_val = n return n
[docs] class Population: """ A class to wrap up data for one population within model. Each model population must contain a set of compartments with equivalent names. """ def __init__(self, framework, name: str, label: str, progset: ProgramSet, pop_type: str): """ Construct a Population This function constructs a population instance. Aside from creating the Population, it also instantiates all of the integration objects defined in the Framework. Note that transfers and interactions aren't added at this point - because they transcend populations, they are instantiated one level higher up, in ``model.build()`` :param framework: A ProjectFramework :param name: The code name of the population :param label: The full name of the population :param progset: A ProgramSet instance """ self.name = name #: The code name of the population self.label = label #: The full name/label of the population self.type = pop_type #: The population's type self.comps = list() #: List of Compartment objects self.characs = list() #: List of Characteristic objects self.links = list() #: List of Link objects self.pars = list() #: List of Parameter objects self.comp_lookup = dict() #: Maps name of a compartment to a Compartment self.charac_lookup = dict() #: Maps name of a compartment to a Characteristic self.par_lookup = dict() #: Maps name of a parameter to a Parameter self.link_lookup = dict() #: Maps name of link to a list of Links with that name self.build(framework=framework, progset=progset) # Convert compartmental cascade into lists of compartment and link objects. self.popsize_cache_time = None self.popsize_cache_val = None self.is_linked = True # Flag to manage double unlinking/relinking def __repr__(self): return '%s "%s"' % (self.__class__.__name__, self.name) def __contains__(self, item: str) -> bool: """ Check whether variables can be resolved This function returns True if ``Population.get_variable(item)`` would result in some variables being returned. This facilitates checking whether a population contains particular variables - for example, transfer links may only exist in a subset of the model populations, or if population types are being used, not all variables will be defined in any one population. :param item: The name of a parameter or a link specification (supported by ``get_variable``) :return: True if ``Population.get_variable`` would return at least one variable, otherwise False """ try: self.get_variable(item) return True except NotFoundError: return False def unlink(self): if not self.is_linked: return for obj in self.comps + self.characs + self.pars + self.links: obj.unlink() self.comp_lookup = None self.charac_lookup = None self.par_lookup = None self.link_lookup = None self.is_linked = False def relink(self, objs): if self.is_linked: return for obj in self.comps + self.characs + self.pars + self.links: obj.relink(objs) self.comp_lookup = {comp.name: comp for comp in self.comps} self.charac_lookup = {charac.name: charac for charac in self.characs} self.par_lookup = {par.name: par for par in self.pars} link_names = set([link.name for link in self.links]) self.link_lookup = {name: [link for link in self.links if link.name == name] for name in link_names} self.is_linked = True
[docs] def popsize(self, ti: int = None): """ Return population size A population's size is the sum of all of the people in its compartments, excluding birth and death compartments :param ti: Optionally specify a scalar time index to retrieve popsize for. ```None`` corresponds to all times :return: If ``ti`` is specified, returns a scalar population size. If ``ti`` is ``None``, return a numpy array the same size as the simulation time vector """ if ti is None: return np.sum([comp.vals for comp in self.comps if (not isinstance(comp, SourceCompartment) and not isinstance(comp, SinkCompartment))], axis=0) if ti == self.popsize_cache_time: return self.popsize_cache_val else: n = 0 for comp in self.comps: if not isinstance(comp, SourceCompartment) and not isinstance(comp, SinkCompartment): n += comp[ti] return n
[docs] def get_variable(self, name: str) -> list: """ Return list of variables given code name At the moment, names are unique across object types and within object types except for links, but if that logic changes, simple modifications can be made here Link names in Atomica end in 'par_name:flow' so passing in a code name of this form will return links. Links can also be identified by the compartments that they connect. Allowed syntax is: - 'source_name:' - All links going out from source - ':dest_name' - All links going into destination - 'source_name:dest_name' - All links from Source to Dest - 'source_name:dest_name:par_name' - All links from Source to Dest belonging to a given Parameter - ':dest:par_name' - 'source::par_name' - As per above - '::par_name' - All links with specified par_name (less efficient than 'par_name:flow') :param name: Code name to search for :return: A list of Variables """ name = name.replace("___", ":") # Parameter functions will convert ':' to '___' for use in variable names if name in self.comp_lookup: return [self.comp_lookup[name]] elif name in self.charac_lookup: return [self.charac_lookup[name]] elif name in self.par_lookup: return [self.par_lookup[name]] elif name in self.link_lookup: return self.link_lookup[name] elif ":" in name and not name.endswith(":flow"): name_tokens = name.split(":") if len(name_tokens) == 2: name_tokens.append("") src, dest, par = name_tokens if src and dest: links = [link for link in self.get_comp(src).outlinks if link.dest.name == dest] elif src: links = self.get_comp(src).outlinks elif dest: links = self.get_comp(dest).inlinks else: links = self.links if par: links = [link for link in links if (link.parameter and link.parameter.name == par)] return links else: raise NotFoundError(f"Object '{name}' not found in population '{self.name}'")
[docs] def get_comp(self, comp_name): """Allow compartments to be retrieved by name rather than index. Returns a Compartment.""" try: return self.comp_lookup[comp_name] except KeyError: raise NotFoundError(f"Compartment {comp_name} not found")
[docs] def get_charac(self, charac_name): """Allow dependencies to be retrieved by name rather than index. Returns a Variable.""" try: return self.charac_lookup[charac_name] except KeyError: raise NotFoundError(f"Characteristic {charac_name} not found")
[docs] def get_par(self, par_name): """Allow dependencies to be retrieved by name rather than index. Returns a Variable.""" try: return self.par_lookup[par_name] except KeyError: raise NotFoundError(f"Parameter {par_name} not found")
[docs] def build(self, framework, progset): """ Generate a compartmental cascade as defined in a settings object. Fill out the compartment, transition and dependency lists within the model population object. Maintaining order as defined in a cascade workbook is crucial due to cross-referencing. The progset is required so that Parameter instances with functions can correctly identify dynamic quantities if they are only dynamic due to program overwrites. """ comps = framework.comps characs = framework.characs pars = framework.pars # Parameters first pass # Instantiate all parameters first. That way, we know which compartments need to be TimedCompartments # We make a `timed_compartments` dict mapping {timed_compartment_name:parameter} so that we can identify # the timed compartments and quickly retrieve their parameters. This is done here partly because we need # to instantiate the parameters first, and also because `framework.transitions` is keyed by parameter # rather than compartment so it's straightforward to include here for par_name in list(pars.index): if pars.at[par_name, "population type"] == self.type: par = Parameter(pop=self, name=par_name) par.units = pars.at[par_name, "format"] par.timescale = pars.at[par_name, "timescale"] par.derivative = pars.at[par_name, "is derivative"] == "y" self.pars.append(par) self.par_lookup = {par.name: par for par in self.pars} # Instantiate compartments residual_junctions = {x[0] for x in framework.transitions.get(">", [])} for comp_name in list(comps.index): if comps.at[comp_name, "population type"] == self.type: if comp_name in residual_junctions: self.comps.append(ResidualJunctionCompartment(pop=self, name=comp_name, duration_group=comps.at[comp_name, "duration group"])) elif comps.at[comp_name, "is junction"] == "y": self.comps.append(JunctionCompartment(pop=self, name=comp_name, duration_group=comps.at[comp_name, "duration group"])) elif comps.at[comp_name, "duration group"]: self.comps.append(TimedCompartment(pop=self, name=comp_name, parameter=self.par_lookup[comps.at[comp_name, "duration group"]])) elif comps.at[comp_name, "is source"] == "y": self.comps.append(SourceCompartment(pop=self, name=comp_name)) elif comps.at[comp_name, "is sink"] == "y": self.comps.append(SinkCompartment(pop=self, name=comp_name)) else: self.comps.append(Compartment(pop=self, name=comp_name)) self.comp_lookup = {comp.name: comp for comp in self.comps} # Characteristics first pass, instantiate objects for charac_name in list(characs.index): if characs.at[charac_name, "population type"] == self.type: self.characs.append(Characteristic(pop=self, name=charac_name)) self.charac_lookup = {charac.name: charac for charac in self.characs} # Characteristics second pass, add includes and denominator # This is a separate pass because characteristics can depend on each other for charac in self.characs: includes = [x.strip() for x in characs.at[charac.name, "components"].split(",")] for inc_name in includes: charac.add_include(self.get_variable(inc_name)[0]) # nb. We expect to only get one match for the name, so use index 0 denominator = characs.at[charac.name, "denominator"] if not pd.isna(denominator): charac.add_denom(self.get_variable(denominator)[0]) # nb. framework import strips whitespace from the overall field # Parameters second pass, create parameter objects and links # This is a separate pass because we can't create the links before the compartments are instantiated for par in self.pars: if framework.transitions[par.name]: for pair in framework.transitions[par.name]: src = self.get_comp(pair[0]) dst = self.get_comp(pair[1]) src.connect(dst, par) # If the parameter is a timed parameter, the TimedCompartment will also assign the FlushLink in this step if ">" in framework.transitions: for pair in framework.transitions[">"]: try: src = self.get_comp(pair[0]) dst = self.get_comp(pair[1]) src.connect(dst, None) except NotFoundError: continue # Parameters third pass, process f_stacks, deps, and limits # This is a separate pass because output parameters can depend on Links for par in self.pars: min_value = pars.at[par.name, "minimum value"] max_value = pars.at[par.name, "maximum value"] if np.isfinite(min_value) or np.isfinite(max_value): par.limits = [max(-np.inf, min_value), min(np.inf, max_value)] fcn_str = pars.at[par.name, "function"] if not pd.isna(fcn_str): par.set_fcn(fcn_str) # If this Parameter has links and a function, it must be updated before it is needed during integration. # If the function depends on any compartment sizes, it must be updated element-wise during integration. # Similarly, if it is a derivative parameter, it needs to be updated element-wise. # Otherwise, it can be pre-computed in a fast vector operation # A timed parameter doesn't _directly_ have links associated with it (because it does not supply values # for the links) but it does need to be precomputed for par in self.pars: if par.fcn_str and (par.links or par.derivative or framework.pars.at[par.name, "timed"] == "y" or (progset is not None and (par.name, self.name) in progset.covouts)): par.set_dynamic(progset)
[docs] def initialize_compartments(self, parset: ParameterSet, framework, t_init: float) -> None: """ Set compartment sizes This method takes the databook values for compartments and characteristics, and uses them to compute the initial compartment sizes at ``ti=0``. The computation is carried out by solving the linear matrix equation mapping characteristics to compartments. For this purpose, both compartments and characteristics in the databook are treated in the same way i.e. a databook compartment is treated like a characteristic containing only that compartment. :param parset: A py:class:`ParameterSet` instance with initial values for compartments/characteristics :param framework: A py:class:`ProjectFramework` instance, used to identify and retrieve interaction terms :param t_init: The year to use for initialization. This should generally be set to the sim start year """ if not self.comps: # If this population has no compartments, then nothing needs to be done return if parset.initialization is not None: # If the parset contains explicit parameter values, try to apply them parset.apply_initialization(self, framework) return # Given a set of characteristics and their initial values, compute the initial # values for the compartments by solving the set of characteristics simultaneously # Build up the comps and characs containing the setup values in the databook - the `b` in `x=A*b` characs_to_use = framework.characs.index[(framework.characs["setup weight"] > 0) & (framework.characs["population type"] == self.type)] comps_to_use = framework.comps.index[(framework.comps["setup weight"] > 0) & (framework.comps["population type"] == self.type)] b_objs = [self.charac_lookup[x] for x in characs_to_use] + [self.comp_lookup[x] for x in comps_to_use] # Build up the comps corresponding to the `x` values in `x=A*b` i.e. the compartments being solved for comps = [c for c in self.comps if not (isinstance(c, SourceCompartment) or isinstance(c, SinkCompartment))] charac_indices = {c.name: i for i, c in enumerate(b_objs)} # Make lookup dict for characteristic indices comp_indices = {c.name: i for i, c in enumerate(comps)} # Make lookup dict for compartment indices b = np.zeros((len(b_objs), 1)) A = np.zeros((len(b_objs), len(comps))) # Construct the characteristic value vector (b) and the includes matrix (A) for i, obj in enumerate(b_objs): # Look up the characteristic value par = parset.pars[obj.name] b[i] = par.interpolate(t_init, pop_name=self.name)[0] * par.y_factor[self.name] * par.meta_y_factor if isinstance(obj, Characteristic): if obj.denominator is not None: denom_par = parset.pars[obj.denominator.name] b[i] *= denom_par.interpolate(t_init, pop_name=self.name)[0] * denom_par.y_factor[self.name] * denom_par.meta_y_factor for inc in obj.get_included_comps(): A[i, comp_indices[inc.name]] = 1.0 else: A[i, comp_indices[obj.name]] = 1.0 # Solve the linear system (nb. lstsq returns the minimum norm solution x = np.linalg.lstsq(A, b.ravel(), rcond=None)[0].reshape(-1, 1) proposed = np.matmul(A, x) residual = np.sum((proposed.ravel() - b.ravel()) ** 2) # Accumulate any errors here. The errors could occur either at the system level or at the level # of individual comps/characs. To avoid error_msg = "" characteristic_tolerence_failed = False # Print warning for characteristics that are not well matched by the compartment size solution for i in range(0, len(b_objs)): if abs(proposed[i] - b[i]) > model_settings["tolerance"]: characteristic_tolerence_failed = True error_msg += "Characteristic '{0}' '{1}' - Requested {2}, Calculated {3}\n".format(self.name, b_objs[i].name, b[i], proposed[i]) # Print expanded diagnostic for negative compartments showing parent characteristics def report_characteristic(charac, n_indent=0): """ Recursively diagnose characteristic If a compartment has been assigned a negative value, it is usually because that negative value is required to match a target characteristic value. This method takes the root characteristic, and prints out all of the compartments relating to it, as well as descending further into any included characteristics. This brings together all of the affected quantities to help diagnose where the negative compartment size originates. :param charac: A :class:`Characteristic` instance :param n_indent: Indent level used to prefix the log message :return: A string containing the debug output """ msg = "" if charac.name in charac_indices: msg += n_indent * "\t" + "Characteristic '{0}': Target value = {1}\n".format(charac.name, b[charac_indices[charac.name]]) else: msg += n_indent * "\t" + "Characteristic '{0}' not in databook: Target value = N/A (0.0)\n".format(charac.name) n_indent += 1 if isinstance(charac, Characteristic): for inc in charac.includes: if isinstance(inc, Characteristic): msg += report_characteristic(inc, n_indent) else: msg += n_indent * "\t" + "Compartment %s: Computed value = %f\n" % (inc.name, x[comp_indices[inc.name]]) return msg for i in range(0, len(comps)): if x[i] < -model_settings["tolerance"]: error_msg += "Compartment %s %s - Calculated %f\n" % (self.name, comps[i].name, x[i]) for charac in b_objs: try: if comps[i] in charac.get_included_comps(): error_msg += report_characteristic(charac) except Exception: if comps[i] == charac: error_msg += report_characteristic(charac) if residual > model_settings["tolerance"]: # Halt for an unsatisfactory overall solution raise BadInitialization("Global residual was %g which is unacceptably large (should be < %g)\n%s" % (residual, model_settings["tolerance"], error_msg)) elif np.any(np.less(x, -model_settings["tolerance"])): # Halt for any negative popsizes raise BadInitialization(f"Negative initial popsizes:\n{error_msg}") elif characteristic_tolerence_failed: raise BadInitialization(f"Characteristics failed to meet tolerances\n{error_msg}") elif error_msg: # Generic error message if any of the warning messages were encountered - not entirely sure when # this would happen so if this *does* occur, it should be written as an explicit branch above # (but it exists as a fallback to ensure that any inconsistencies result in the error being raised) raise BadInitialization(f"Initialization error\n{error_msg}") # Otherwise, insert the values for i, c in enumerate(comps): c[0] = max(0.0, x[i])
[docs] class Model: """A class to wrap up multiple populations within model and handle cross-population transitions.""" def __init__(self, settings, framework, parset, progset=None, program_instructions=None): # Note that if a progset is provided and program instructions are not, then programs will not be # turned on. However, the progset is still available so that the coverage can still be probed # (in particular, the coverage denominator from Result.get_coverage('denominator') is used # for reconciliation # Record version info for the model run. These are generally NOT updated in migration. Thus, they serve # as a record of which specific version of the code was used to generate the results self.version = version self.gitinfo = sc.dcp(gitinfo) self.created = sc.now(utc=True) self.pops = list() # List of population groups that this model subdivides into. self.interactions = sc.odict() self.programs_active = None # True or False depending on whether Programs will be used or not self.progset = sc.dcp(progset) self.program_instructions = sc.dcp(program_instructions) # program instructions self.t = settings.tvec #: Simulation time vector (this is a brand new instance from the `settings.tvec` property method) self.dt = settings.sim_dt #: Simulation time step self._t_index = 0 # Keeps track of array index for current timepoint data within all compartments. self._vars_by_pop = None # Cache to look up lists of variables by name across populations self._pop_ids = sc.odict() # Maps name of a population to its position index within populations list. self._program_cache = None #: Cache program capacities and coverage for coverage scenarios self._exec_order = None #: Cache the dependency order of various quantities self.framework = sc.dcp(framework) # Store a copy of the Framework used to generate this model self.framework.spreadsheet = None # No need to keep the spreadsheet self.build(parset) def _update_program_cache(self): # Finally, prepare programs if self.progset and self.program_instructions: self.programs_active = True self._program_cache = dict() self._program_cache["comps"] = dict() for prog in self.progset.programs.values(): self._program_cache["comps"][prog.name] = [] for pop_name in prog.target_pops: for comp_name in prog.target_comps: self._program_cache["comps"][prog.name].append(self.get_pop(pop_name).get_comp(comp_name)) self._program_cache["capacities"] = self.progset.get_capacities(tvec=self.t, dt=self.dt, instructions=self.program_instructions) # Cache the proportion coverage for coverage scenarios so that we don't call interpolate() every timestep coverage = self.progset.get_prop_coverage(tvec=self.t, dt=self.dt, capacities=self._program_cache["capacities"], num_eligible={k: np.nan for k in self.progset.programs}, instructions=self.program_instructions) self._program_cache["prop_coverage"] = {k: coverage[k] for k in self.program_instructions.coverage} # Check that any programs with no coverage denominator have been given coverage overwrites # Otherwise, the coverage denominator will be treated as 0 and will result in 100% coverage # but that would just be a side effect of not targeting anyone (division by 0 is treated as 100%) for prog in self.progset.programs.values(): if not self._program_cache["comps"][prog.name] and prog.name not in self._program_cache["prop_coverage"]: raise ModelError(f'Program "{prog.name}" does not target any compartments, but the program instructions did not specify coverage for this program. Programs without target compartments require their coverage to be explicitly specified in the instructions') else: self.programs_active = False def _set_vars_by_pop(self) -> None: """ Update cache dicts and lists During integration, the model needs to iterate over parameters with the same name across populations. Therefore, we build a cache dict where we map code names to a list of references to the required Parameter objects. We also construct a cache list of the parameter names from the framework so we can quickly iterate over it. """ self._vars_by_pop = defaultdict(list) for pop in self.pops: for var in pop.comps + pop.characs + pop.pars + pop.links: self._vars_by_pop[var.name].append(var) self._vars_by_pop = dict(self._vars_by_pop) # Stop new entries from appearing in here by accident def __getstate__(self): self.unlink() d = sc.dcp(self.__dict__) # Pickling to string results in a copy self.relink() # Relink, otherwise the original object gets unlinked return d def __setstate__(self, d): self.__dict__ = d self.relink() def __deepcopy__(self, memodict={}): # Using dcp(self.__dict__) is faster than pickle getstate/setstate # when this is called via copy.deepcopy() self.unlink() d = sc.dcp(self.__dict__) self.relink() new = Model.__new__(Model) new.__dict__.update(d) new.relink() return new
[docs] def get_pop(self, pop_name): """Allow model populations to be retrieved by name rather than index.""" pop_index = self._pop_ids[pop_name] return self.pops[pop_index]
[docs] def build(self, parset): """Build the full model.""" # First construct populations for k, (pop_name, pop_label, pop_type) in enumerate(zip(parset.pop_names, parset.pop_labels, parset.pop_types)): self.pops.append(Population(framework=self.framework, name=pop_name, label=pop_label, progset=self.progset, pop_type=pop_type)) self._pop_ids[pop_name] = k # Expand interactions into matrix form self.interactions = dict() for name, weights in parset.interactions.items(): from_pops = [x.name for x in self.pops if x.type == self.framework.interactions.at[name, "from population type"]] to_pops = [x.name for x in self.pops if x.type == self.framework.interactions.at[name, "to population type"]] self.interactions[name] = np.zeros((len(from_pops), len(to_pops), len(self.t))) for from_pop, par in weights.items(): for to_pop in par.pops: self.interactions[name][from_pops.index(from_pop), to_pops.index(to_pop), :] = par.interpolate(self.t, to_pop) * par.y_factor[to_pop] * par.meta_y_factor # Instantiate transfer parameters # Note transfer parameters can currently only be data parameters (i.e. they don't have any functions) so # no need to worry about setting functions and flagging dependencies for them for transfer_name in parset.transfers: if parset.transfers[transfer_name]: for pop_source in parset.transfers[transfer_name]: # This contains the data for all of the destination pops. transfer_parameter = parset.transfers[transfer_name][pop_source] pop = self.get_pop(pop_source) for pop_target in transfer_parameter.ts: # Create the parameter object for this link (shared across all compartments) par_name = "%s_%s_to_%s" % (transfer_name, pop_source, pop_target) # e.g. 'aging_0-4_to_15-64' par = Parameter(pop=pop, name=par_name) par.preallocate(self.t, self.dt) # Preallocate now, because these parameters are not present in the framework so they won't get preallocated later par.scale_factor = transfer_parameter.y_factor[pop_target] * transfer_parameter.meta_y_factor par.vals = transfer_parameter.interpolate(tvec=self.t, pop_name=pop_target) * par.scale_factor par.units = transfer_parameter.ts[pop_target].units.strip().split()[0].strip().lower() # Sampling might result in the parameter value going out of bounds, so make sure the transfer parameter values are constrained if par.units in {FS.QUANTITY_TYPE_RATE, FS.QUANTITY_TYPE_PROBABILITY, FS.QUANTITY_TYPE_NUMBER}: par.limits = [0, np.inf] elif par.units == FS.QUANTITY_TYPE_DURATION: par.limits = [model_settings["tolerance"], np.inf] else: raise Exception("Unknown transfer parameter units") par.constrain() pop.pars.append(par) pop.par_lookup[par_name] = par target_pop_obj = self.get_pop(pop_target) for src in pop.comps: if not (isinstance(src, SourceCompartment) or isinstance(src, SinkCompartment) or isinstance(src, JunctionCompartment)): # Instantiate a link between corresponding compartments dest = target_pop_obj.get_comp(src.name) # Get the corresponding compartment src.connect(dest, par) # Now that all object have been created, update _vars_by_pop() accordingly self._set_vars_by_pop() # Flag dependencies for aggregated parameters prior to precomputing # Note that all parameters have been instantiated and we can set_dynamic # in any order, as long as we examine all parameters for par_name in self.framework.pars.index: if par_name not in self._vars_by_pop: continue pars = self._vars_by_pop[par_name] if pars[0].pop_aggregation: # Make the parameter dynamic, because it needs to be computed during integration for par in pars: par.set_dynamic() # Also make the variable being aggregated dynamic for var in self._vars_by_pop[pars[0].pop_aggregation[1]]: var.set_dynamic(progset=self.progset) # If there is a weighting variable, if len(pars[0].pop_aggregation) > 3: for var in self._vars_by_pop[pars[0].pop_aggregation[3]]: var.set_dynamic(progset=self.progset) # Set execution order - needs to be done _after_ pop aggregations have been flagged as dynamic self._set_exec_order() # Insert parameter initial values and do any required precomputation for par_name in self._exec_order["all_pars"]: if par_name not in parset.pars: # This happens for transfer parameters that don't appear in parset.pars, but they have been updated already above continue cascade_par = parset.pars[par_name] pars = self._vars_by_pop[cascade_par.name] for par in pars: par.preallocate(self.t, self.dt) par.scale_factor = cascade_par.meta_y_factor # Set meta scale factor regardless of whether a population-specific y-factor is also provided if par.pop.name in cascade_par.y_factor: par.scale_factor *= cascade_par.y_factor[par.pop.name] # Add in population-specific scale factor if par.pop.name in cascade_par.skip_function: par.skip_function = cascade_par.skip_function[par.pop.name] # Copy in any skipped evaluations if par.skip_function: assert cascade_par.has_values(par.pop.name), "Parameter function was marked as being skipped for some of the simulation, but the ParameterSet has no values to use instead. If skipping, the ParameterSet must contain some values" if par.fcn_str and par._precompute: # If the parameter is marked for precomputation, then insert it now par.update() elif cascade_par.has_values(par.pop.name): # If the databook contains values, then insert them now par.vals = cascade_par.interpolate(tvec=self.t, pop_name=par.pop.name) * par.scale_factor par.constrain() # Sampling might result in the parameter value going out of bounds (or user might have entered bad values in the databook) so ensure they are clipped here # Finally, preallocate remaining quantities and initialize the compartments # Note that TimedLink preallocation depends on TimedCompartment preallocation # which is setting the duration based on the evaluated parameters above. # Therefore, in the loop below, compartments must be preallocated before links for pop in self.pops: for obj in pop.comps + pop.characs + pop.links: obj.preallocate(self.t, self.dt) pop.initialize_compartments(parset, self.framework, self.t[0])
def _set_exec_order(self) -> None: """ Get the execution order Some quantities, like parameters, characteristics, and junctions, have dependencies that require them to be updated in a specific order. This method constructs directed graphs of the dependencies and returns the topological ordering, which specifies the order in which to update each quantity. For parameters, we need to update them in dependency order in three places - when precomputing or postcomputing, in which case we need all parameters, and during integration, in which case we only work with the dynamic parameters. which case we need to work with all parameters, and during integration, in which case we need to work. We also need to iterate over transition parameters when updating links, but actually this can be done in any order. However, we cache the transition parameters into a flat list here so that they can be efficiently iterated over. Note that the parameter update order is calculated from the framework, which may be relevant if the model structure is changed after building the model but before processing. :return: Dict containing execution orders for ``'all_pars'``,``dynamic_pars``,``characs``,``junctions`` """ import networkx as nx exec_order = dict() # Set the parameter update order - this is a list of parameters names, in dependency order # The parameters may or may not exist in each population, but they are updated across populations # Since parameters are implemented one name at a time, here we set the parameter order by # making a graph using the names rather than actual parameter objects. Note that par_update_order # contains all parameters, which are used during initialization as well as in update_pars. However, we # could have update_pars only operate on the subset of the graph contributing to transitions, or to # dynamic programs or to program overwrites. par_derivative = self.framework.pars["is derivative"].to_dict() # Store all parameter names in framework, as well as whether they are a derivative or not G = nx.DiGraph() G.add_nodes_from(par_derivative, keep=False) for pop in self.pops: for par in pop.pars: for dep, dep_var in par.deps.items(): if dep in par_derivative and par_derivative[dep] != "y": # Derivative parameters are allowed to refer to themselves directly, # and derivative parameters are not considered dependencies - so we do # not need to add a dependency edge to the graph G.add_edge(dep, par.name) if par.pop_aggregation and par.pop_aggregation[1] in par_derivative and par_derivative[par.pop_aggregation[1]] != "y": G.add_edge(par.pop_aggregation[1], par.name) if par._is_dynamic or (self.progset and par.name in self.progset.pars): # If the parameter is dynamic or appears in the progset, then we need to # include it in the list of parameters to iterate over in `update_pars` G.nodes[par.name]["keep"] = True if not nx.dag.is_directed_acyclic_graph(G): message = "Circular dependencies in parameters:" for cycle in nx.simple_cycles(G): message += "\n - " + " -> ".join(cycle) raise Exception(message) exec_order["all_pars"] = [x for x in nx.dag.topological_sort(G) if x in self._vars_by_pop] # Not all parameters may exist depending on populations, so filter out only the ones that are actually instantiated in this Model exec_order["dynamic_pars"] = [x for x in exec_order["all_pars"] if G.nodes[x]["keep"]] # Set the parameter execution order - this is a list of only transition parameters, used when updating links # This is a flat list of parameters, but the order actually should not matter since all parameters should be # resolved at this point exec_order["transition_pars"] = [] for pop in self.pops: for par in pop.pars: if par.links and par.units != FS.QUANTITY_TYPE_PROPORTION: exec_order["transition_pars"].append(par) # Set characteristic execution order - in cases where characteristics depend on each other G = nx.DiGraph() for pop in self.pops: for charac in pop.characs: G.add_node(charac) for include in charac.includes: if isinstance(include, Characteristic): G.add_edge(include, charac) # Note directionality - the included characteristic needs to be added first if isinstance(charac.denominator, Characteristic): G.add_edge(charac.denominator, charac) # Note directionality - the included characteristic needs to be added first assert nx.dag.is_directed_acyclic_graph(G), "There is a circular dependency in characteristics, which is not permitted" exec_order["characs"] = [c for c in nx.dag.topological_sort(G) if c._is_dynamic] # Topological sorting of the junction graph, which is a valid execution order # TODO - Move normal compartments into here as well # Set the junction execution order G = nx.DiGraph() for pop in self.pops: for comp in pop.comps: if isinstance(comp, JunctionCompartment): G.add_node(comp) for link in comp.outlinks: if isinstance(link.dest, JunctionCompartment): G.add_edge(link.source, link.dest) assert nx.dag.is_directed_acyclic_graph(G), "There is a cycle present where one junction has flows into another, which results in an infinite loop and is not permitted" exec_order["junctions"] = list(nx.dag.topological_sort(G)) # Topological sorting of the junction graph, which is a valid execution order self._exec_order = exec_order
[docs] def process(self) -> None: """Run the full model.""" assert self._t_index == 0 # Only makes sense to process a simulation once, starting at ti=0 - this might be relaxed later on self._set_exec_order() # Set the execution order again in case the user has updated the parameters etc. It is critically important that this is correct during integration self._update_program_cache() # Initial flush of people in junctions if self._t_index == 0: self.update_pars() # Update transition parameters in case junction outflows are function parameters self.flush_junctions() # Flush the current contents of the junction without including any inflows self.update_pars() # Update the transition parameters in case junction outflows are functions _and_ they depend on compartment sizes that just changed in the line above self.update_links() # Update all of the links # Main integration loop while self._t_index < (self.t.size - 1): self._t_index += 1 # Step the simulation forward self.update_comps() self.update_pars() self.update_links() # Update postcompute parameters - note that it needs to be done in execution order for par_name in self._exec_order["all_pars"]: for par in self._vars_by_pop[par_name]: if par.fcn_str and not (par._is_dynamic or par._precompute): par.update() par.constrain() # Clear characteristic internal storage and switch to dynamic computation to save space for pop in self.pops: for charac in pop.characs: charac._vals = None self._program_cache = None # Drop the program cache afterwards to save space
[docs] def update_comps(self) -> None: """ Set the compartment values at self._t_index+1 based on the current values at self._t_index and the link values at self._t_index. Values are updated by iterating over all outgoing links """ ti = self._t_index # Pre-populate the current value - need to iterate over pops here because transfers # will cross population boundaries for pop in self.pops: for comp in pop.comps: comp.update(ti)
[docs] def flush_junctions(self) -> None: """ Flush initialization values from junctions If junctions have been initialized with nonzero values as a mcv1 for initializing the downstream compartments, then the junctions need to be flushed into the downstream compartments at the start of the simulation. This is done using the ``.flush()`` method of the ``JunctionCompartment``. The order of the loop is important if junctions flow into other junctions - this needs to be computed from the graph prior to calling this method (so that ``Model.junctions`` is in the correct order) """ for j in self._exec_order["junctions"]: try: j.initial_flush() except Exception as e: raise ModelError(f"Error when initially flushing junction: {j}") from e
[docs] def update_pars(self) -> None: """ Update parameter values Run through all parameters and characteristics, updating as required. This takes place in stages 1. Characteristics that are dependencies of parameters are updated 2. Parameters are updated in dependency order a. The parameter function is evaluated b. The parameter is overwritten by programs c. The parameter is updated with the population aggregation calculation d. The parameter value is constrained The parameters are updated parameter-at-a-time (across populations) so that the population aggregation can be carried out. Only parameters that are required for transitions or that are overwritten by programs are modified here - otherwise, parameter values are computed in vector calculations either before integration (if the value is purely from the databook) or after integration (if the parameter depends on any flow rates or compartment sizes). """ ti = self._t_index # First, compute dependent characteristics, as parameters might depend on them for charac in self._exec_order["characs"]: charac.update(ti) do_program_overwrite = self.programs_active and self.program_instructions.start_year <= self.t[ti] <= self.program_instructions.stop_year if do_program_overwrite: prop_coverage = sc.odict.fromkeys(self._program_cache["comps"], 0.0) for k, comp_list in self._program_cache["comps"].items(): if k in self._program_cache["prop_coverage"]: # If the coverage was precomputed in a coverage scenario prop_coverage[k] = self._program_cache["prop_coverage"][k][[ti]] else: n = 0.0 for comp in comp_list: n += comp[ti] prop_coverage[k] = self.progset.programs[k].get_prop_covered(self.t[ti], self._program_cache["capacities"][k][ti], n) prog_vals = self.progset.get_outcomes(prop_coverage) for par_name in self._exec_order["dynamic_pars"]: # All of the parameters with this name, across populations. # There should be one for each population (these are Parameters, not Links). pars = self._vars_by_pop[par_name] # First - update parameters that are dependencies, evaluating f_stack if required for par in pars: if par._is_dynamic: par.update(ti) # Then overwrite with program values if do_program_overwrite: for par in pars: if (par.name, par.pop.name) in prog_vals: if par.derivative: par._dx = prog_vals[(par.name, par.pop.name)] # For derivative parameters, overwrite the derivative rather than the value else: par[ti] = prog_vals[(par.name, par.pop.name)] if par.units == FS.QUANTITY_TYPE_NUMBER: par[ti] *= par.source_popsize(ti) / self.dt # The outcome in the progbook is per person reached, which is a timestep specific value. Thus, need to annualize here elif par.units == FS.QUANTITY_TYPE_RATE or par.units == FS.QUANTITY_TYPE_PROBABILITY: # Continuous programs generally should not target number or probability parameters # We apply a factor of dt here regardless of the parameter's timescale. This is because the dt factor here # matches the factor of dt used to divide the annual spending into timestep spending par[ti] /= self.dt # Handle parameters that aggregate over populations and use interactions in these functions. if pars[0].pop_aggregation: # NB. `par.pop_aggregation` is (agg_fcn,par_name,interaction_name,charac_name) where the last item is optional par_vals = [x[ti] for x in self._vars_by_pop[pars[0].pop_aggregation[1]]] # Value of variable being averaged par_vals = np.array(par_vals).reshape(-1, 1) # NOTE - When doing cross-population interactions, 'pars' is from the 'to' pop # and 'par_vals' is from the 'from pop if len(pars[0].pop_aggregation) < 3: weights = np.ones((len(par_vals), len(pars))) else: weights = self.interactions[pars[0].pop_aggregation[2]][:, :, ti].copy() if pars[0].pop_aggregation[0] in {"SRC_POP_AVG", "SRC_POP_SUM"}: weights = weights.T elif pars[0].pop_aggregation[0] in {"TGT_POP_AVG", "TGT_POP_SUM"}: pass else: raise ModelError("Unknown aggregation function '{0}'").format(pars[0].pop_aggregation[0]) # This should never happen, an error should be raised earlier # If we are weighting by a variable, multiply the weights matrix accordingly if len(pars[0].pop_aggregation) == 4: vals = [par[ti] for par in self._vars_by_pop[pars[0].pop_aggregation[3]]] # Value of weighting variable vals = np.array(vals).reshape(-1, 1) weights *= vals.T if pars[0].pop_aggregation[0] in {"SRC_POP_AVG", "TGT_POP_AVG"}: norm = np.sum(weights, axis=1, keepdims=1) norm[norm == 0] = 1 weights /= norm par_vals = np.matmul(weights, par_vals) for par, val in zip(pars, par_vals): if par.skip_function is None or (self.t[ti] < par.skip_function[0]) or (self.t[ti] > par.skip_function[1]): # Careful - note how the < here matches >= in Parameter.update() par[ti] = par.scale_factor * val # Restrict the parameter's value if a limiting range was defined for par in pars: if par.derivative and ti < len(self.t) - 1: # If derivative parameter, then perform an Euler forward step before constraining par[ti + 1] = par[ti] + par._dx * self.dt par.constrain(ti + 1) else: par.constrain(ti)
[docs] def run_model(settings, framework, parset: ParameterSet, progset: ProgramSet = None, program_instructions: ProgramInstructions = None, name: str = None): """ Build and process model Running simulations is accomplished via the :class:`Model` object in two steps 1. The :class:`Model` object is build, and all variables are initialized 2. The integration is carried out ``run_model`` serves as a wrapper for both of these steps, which are commonly performed together. However, in some cases it may be desired to split the operations. For example - In optimization, the same ``Model`` is used at each iteration, but with different instructions. To save time, the model is built just once, and deep-copied after building but before processing - Sometimes it may be required to make changes to the model's structure e.g. to implement exotic cross-population interactions that cannot be expressed via the Excel inputs. In that case, it may again be necessary :param settings: Project settings defining simulation time span and time step :param framework: A :class:`ProjectFramework` instance :param parset: A :class:`ParameterSet` instance :param progset: Optionally provide a :class:`ProgramSet` instance to use programs :param program_instructions: Optional :class:`ProgramInstructions` instance. If ``progset`` is specified, then instructions must be provided :param name: Optionally specify the name to assign to the output result :return: A :class:`Result` object containing the processed model """ m = Model(settings, framework, parset, progset, program_instructions) m.process() return Result(model=m, parset=parset, name=name)