Source code for atomica.framework

"""
Implements Framework functionality

A Framework contains all of the information defining a model that can be
run using Atomica. This module implements the :class:`ProjectFramework`
class, which provides a Python representation of a Framework file.

"""

import numpy as np
import pandas as pd
import sciris as sc
import io
from collections import defaultdict

from .cascade import validate_cascade
from .excel import read_tables, validate_category, standard_formats, read_dataframes
from .function_parser import parse_function
from .system import NotFoundError, FrameworkSettings as FS
from .system import logger
from .utils import format_duration, evaluate_plot_string
from .plotting import _extract_labels
from .version import version, gitinfo

__all__ = ["InvalidFramework", "ProjectFramework", "generate_framework_doc"]


[docs]class InvalidFramework(Exception): pass
[docs]class ProjectFramework: """ Base Framework class The object that defines the transition-network structure of models generated by a project. :param inputs: A string (which will load an Excel file from disk) or an sc.Spreadsheet. If not provided, the internal sheets will be set to an empty dict ready for content :param name: Optionally explicitly set the framework name. Otherwise, it will be assigned from the 'about' sheet, if present :param validate: If True (default) then the framework will be validated upon loading. This can be overridden if operating on invalid/partial frameworks is required. However, the framework should still be validated before it is used. """ def __init__(self, inputs=None, name: str = None, validate: bool = True): import openpyxl # Define metadata self.uid = sc.uuid() #: Unique identifier self.version = version #: Current Atomica version self.gitinfo = sc.dcp(gitinfo) #: Atomica Git version information, if being run in a Git repository self.created = sc.now(utc=True) #: Creation time self.modified = sc.now(utc=True) #: Last modified time # Load Framework from disk self.sheets = sc.odict() #: Stores a dict of Pandas dataframes organized by sheet from the Excel file self.spreadsheet = None #: A ``sc.Spreadsheet`` object containing the original Excel file if inputs is None: return elif isinstance(inputs, sc.Spreadsheet): self.spreadsheet = inputs else: self.spreadsheet = sc.Spreadsheet(inputs) workbook = openpyxl.load_workbook(self.spreadsheet.tofile(), read_only=True, data_only=True) # Load in read-write mode so that we can correctly dump the file validate_category(workbook, "atomica:framework") # For some pages, we only ever want to read in one DataFrame, and we want empty lines to be ignored. For example, on the # 'compartments' sheet, we want to ignore blank lines, while on the 'cascades' sheet we want the blank line to delimit the # start of a new cascade. So, for the sheet names below, multiple tables will be compressed to one table. We could have the # logic reversed, so tables are flattened by default. However, this would make it impossible for users to add their own # sheets with meaningful whitespace - whereas the other way round, users can always handle flattening their custom # sheets in their own code at the point where they use their sheet. merge_tables = {"databook pages", "compartments", "parameters", "characteristics", "interactions", "plots", "population types"} for worksheet in workbook.worksheets: sheet_title = worksheet.title.lower() self.sheets[sheet_title] = read_dataframes(worksheet, sheet_title in merge_tables) for df in self.sheets[sheet_title]: if sheet_title == "cascades": # On the cascades sheet, the user-entered name appears in the header row. We must preserve case for this # name so that things like 'TB cascade' don't become 'tb cascade'. Same for compartment names so that # any capitals in the compartment name are preserved df.columns = [df.columns[0]] + list(df.columns[1:].str.lower()) elif sheet_title == "transitions": # On the transitions sheet, don't make the compartment code names lowercase pass else: if len(df.columns): df.columns = df.columns.str.lower() if validate: self._validate() if name is not None: self.name = name def __repr__(self): """ Print object """ output = sc.prepr(self) return output def __setstate__(self, d): from .migration import migrate self.__dict__ = d framework = migrate(self) self.__dict__ = framework.__dict__ @property def name(self) -> str: """ Return framework name The framework name is returned from the 'about' sheet in the framework file rather than being stored separately. :return: The name of the framework """ return self.sheets["about"][0]["name"].iloc[0] @name.setter def name(self, value: str) -> None: """ Set framework name The framework name is assigned on the 'about' sheet in the framework file. :param value: The name to assign to the framework """ assert sc.isstring(value) self.sheets["about"][0]["name"].iloc[0] = value # The primary data storage in the Framework are DataFrames with the contents of the Excel file # The convenience methods below enable easy access of frequency used contents without having # to store them separately or going via the DataFrames every time @property def comps(self) -> pd.DataFrame: """ Return compartments dataframe :return: Dataframe containing all compartments """ return self.sheets["compartments"][0] @comps.setter def comps(self, value): assert isinstance(value, pd.DataFrame) self.sheets["compartments"] = [value]
[docs] def get_comp(self, comp_name: str) -> pd.Series: """ Retrieve compartment by code name :param comp_name: Code name of an compartment :return: The row of the compartments dataframe corresponding to the requested compartment """ return self.comps.loc[comp_name]
@property def characs(self) -> pd.DataFrame: """ Return characteristics dataframe :return: Dataframe containing all characteristics """ return self.sheets["characteristics"][0] @characs.setter def characs(self, value: pd.DataFrame) -> None: assert isinstance(value, pd.DataFrame) self.sheets["characteristics"] = [value]
[docs] def get_charac(self, charac_name: str) -> pd.Series: """ Retrieve characteristic by code name :param charac_name: Code name of an characteristic :return: The row of the characteristics dataframe corresponding to the requested characteristic """ return self.characs.loc[charac_name]
[docs] def get_charac_includes(self, includes) -> list: """ Expand characteristics into compartments To initialize the model, it is necessary to map characteristics to the compartments they contain. Characteristics may contain other characteristics, but eventually all characteristics can be represented as a collection of compartments. This method takes in either a single code name, or a list of code names. It then goes through all of the code names, and recursively replaces any characteristic code names with the compartments contained in the characteristic. The final return value is then a list of all of the compartment names that are included in the original ``includes``. Typical usage of this function is to pass in a single characteristic name e.g. >>> Framework.get_charac_includes('alive') :param includes: A single code name, or list of codenames, containing only compartments and/or characteristics :return: A list of compartment names """ if not isinstance(includes, list): includes = [includes] expanded = [] for include in includes: if include in self.characs.index: components = [x.strip() for x in self.characs.at[include, "components"].split(",")] expanded += self.get_charac_includes(components) else: expanded.append(str(include)) # Use 'str()' to get `'sus'` in the error message instead of `u'sus'` return expanded
@property def pars(self) -> pd.DataFrame: """ Return parameters dataframe :return: Dataframe containing all parameters """ return self.sheets["parameters"][0] @pars.setter def pars(self, value: pd.DataFrame) -> None: assert isinstance(value, pd.DataFrame) self.sheets["parameters"] = [value]
[docs] def get_par(self, par_name: str) -> pd.Series: """ Retrieve parameter by code name :param par_name: Code name of an parameter :return: The row of the parameters dataframe corresponding to the requested parameter """ return self.pars.loc[par_name]
@property def interactions(self) -> pd.DataFrame: """ Return interactions dataframe :return: Dataframe containing all interactions """ return self.sheets["interactions"][0] @interactions.setter def interactions(self, value: pd.DataFrame) -> None: assert isinstance(value, pd.DataFrame) self.sheets["interactions"] = [value] @property def cascades(self) -> sc.odict: """ Return dict with all cascades If the Cascades sheet is present, return an odict where the key is the name of the cascade and the value is the corresponding dataframe. Note that the fallback cascade (automatically produced if no cascades are present in the original Excel file) is also included here. Therefore, downstream code does not need to differentiate between explicitly defined cascades or automatically generated cascades. :return: A dict keyed by cascade name, containing a ``pd.DataFrame`` with the definition of each cascade from the framework """ if "cascades" not in self.sheets: return sc.odict() # Return an empty dict will let code downstream iterate over d.keys() and fail gracefully (no iterations) if no cascades were present cascade_list = self.sheets["cascades"] data_present = False # If there is a cascade sheet but only has headings, then treat it like it wasn't defined d = sc.odict() for df in cascade_list: cascade_name = df.columns[0].strip() if cascade_name is None or len(cascade_name) == 0: raise Exception("A cascade was found without a name") if cascade_name in d: raise InvalidFramework('A cascade with name "%s" was already read in' % (cascade_name)) d[cascade_name] = df if df.shape[0]: data_present = True if data_present: return d else: return sc.odict() @property def pop_types(self) -> sc.odict: """ Return available population types This returns a dictionary representation of the dataframe on the 'Population types' sheet of the framework, where the key is the population type code name, and the value is a dictionary with all of the attributes (columns) in the dataframe. :return: A dictionary with the population types defined in the framework """ return self.sheets["population types"][0].set_index("code name").to_dict(orient="index", into=sc.odict)
[docs] def get_interaction(self, interaction_name: str) -> pd.Series: """ Retrieve interaction by code name :param interaction_name: Code name of an interaction :return: The row of the interactions dataframe corresponding to the requested interaction """ return self.interactions.loc[interaction_name]
[docs] def get_variable(self, name: str) -> tuple: """ Retrieve variable from framework This method retrieves compartments, characteristics, parameters, and interactions based on code names or full names. The entire row from the framework file is returned. This method can be used when accessing attributes of quantities defined in the framework. In general, the more specialized methods `Framework.get_comp()`, `Framework.get_charac()`, `Framework.get_par()` and `Framework.get_interaction()` should be used if you know in advance which of these variables is desired, and are happy to look up by code name only. These methods are faster and will return errors if a variable of the incorrect type is requested. :param name: Code name or full name :return: A tuple containing (``Series``,type) where - ``Series`` is a Pandas Series corresponding to the row in the framework defining the requested variable - ``type`` is a string identifying the type of variable that was returned e.g. 'comp' for compartment. The strings match ``FS.KEY_COMPARTMENT`` etc. """ for df, item_type in zip([self.comps, self.characs, self.pars, self.interactions], [FS.KEY_COMPARTMENT, FS.KEY_CHARACTERISTIC, FS.KEY_PARAMETER, FS.KEY_INTERACTION]): if name in df.index: return df.loc[name], item_type elif name in set(df["display name"]): return df.loc[df["display name"] == name].iloc[0], item_type raise NotFoundError('Variable "%s" not found in Framework' % (name))
[docs] def get_label(self, name): """ Get full name/label from code name This method converts a code name into a full name. If the name is already a full name, then it will be returned unchanged. Thus this function can be used to normalize inputs and guarantee full names. Example usage: >>> Framework.get_label('b_rate') 'Number of births' >>> Framework.get_label('Number of births') 'Number of births' :param name: A code name or full name :return: The corresponding full name/label """ return self.get_variable(name)[0]["display name"]
def __contains__(self, item: str) -> bool: """ Check if code name is defined in framework An item is contained in this Framework if ``get_variable`` would return something. :param item: A code name :return: ``True`` if ``item`` is a compartment, characteristic, parameter, or interaction """ for df in [self.comps, self.characs, self.pars, self.interactions]: if item in df.index: return True return False def _process_transitions(self) -> None: """ Parse transition sheet This method parses the dataframes associated with the transition sheet into an edge-list representation, stored in ``ProjectFramework.transitions``. The dictionary has the form ``{par_name:(from_comp,to_comp)}``. The dictionary is flat, so there is a single transition structure for the entire model regardless of population type. This method expects a sheet called 'Transitions' to be present and correctly filled out """ # First, assign the transition matrices to population types self.transitions = defaultdict(list) comps = self.comps["population type"].to_dict() # Look up which pop type is associated with each compartment pars = self.pars["population type"].to_dict() # Look up which pop type is associated with each parameter for i, df in enumerate(self.sheets["transitions"]): # If the population type isn't set (in the top left of the matrix) then assign the matrix to the first pop type cols = list(df.columns) if pd.isna(cols[0]) or cols[0].lower().strip() == "transition matrix": # The template for single population type frameworks has 'Transition matrix' where the population type would normally go cols[0] = self.pop_types.keys()[0] df.columns = cols df = df.set_index(df.columns[0]) if df.index.name not in self.pop_types: raise InvalidFramework('Transition matrix has population type "%s" but this is not a recognized population type (available population types are %s)' % (df.index.name, list(self.pop_types.keys()))) # Check all compartments are within the specified population type for comp in set(list(df.index) + list(df.columns)): if comp not in comps: raise InvalidFramework('A compartment "%s" appears in the matrix on the Transitions sheet, but it was not defined on the Compartments sheet' % (comp)) elif comps[comp] != df.index.name: raise InvalidFramework('Compartment "%s" belongs to pop type "%s" but it appears in the transition matrix for "%s"' % (comp, comps[comp], df.index.name)) # Next, import each dataframe for _, from_row in df.iterrows(): # For each row in the transition matrix from_row.dropna(inplace=True) from_comp = from_row.name for to_comp, par_names in from_row.iteritems(): if par_names.strip() == ">": self.transitions[">"].append((from_comp, to_comp)) # Add a transition entry for parameter-less junction residual links. This is consistent in that `self.transitions` is a representation of links, not parameters continue for par_name in par_names.split(","): par_name = par_name.strip() if par_name not in self.pars.index: raise InvalidFramework('Parameter "%s" appears in the transition matrix but not on the Parameters page' % (par_name)) if pars[par_name] != df.index.name: raise InvalidFramework('Parameter "%s" belongs to pop type "%s" but it appears in the transition matrix for "%s"' % (par_name, pars[par_name], df.index.name)) if self.pars.at[par_name, "timed"] == "y": if self.comps.at[from_comp, "duration group"]: existing = {self.comps.at[from_comp, "duration group"]} raise InvalidFramework(f'A compartment can only have one timed outflow - e.g., Parameters "{existing}" and "{par_name}" are both timed parameters, so they cannot both be associated with Compartment "{from_comp}"') else: self.comps.at[from_comp, "duration group"] = par_name if self.comps.at[from_comp, "is source"] == "y" or self.comps.at[from_comp, "is sink"] == "y" or self.comps.at[from_comp, "is junction"] == "y": raise InvalidFramework(f'Parameter "{par_name}" defines a timed outflow from Compartment "{from_comp}" but timed outflows cannot be applied to source, sink, or junction compartments') elif self.comps.at[to_comp, "duration group"] == par_name: raise InvalidFramework(f'Compartment "{from_comp}" belongs to the duration group "{par_name}" but it flushes into "{to_comp}" which is a member of the same group. Flushing into the same duration group is not permitted') self.transitions[par_name].append((from_comp, to_comp)) def _validate(self) -> None: """ Check contents of the framework This function validates the content of Framework. There are two aspects to this - Adding in any missing values using appropriate defaults - Checking that the provided information is internally consistent This method is called automatically during construction, and therefore any changes made during validation will occur prior to users interacting with the ProjectFramework If the framework contains invalid content, this function call will result in an error being raised. """ import networkx as nx # Check for required sheets for page in ["databook pages", "parameters"]: if page not in self.sheets: raise InvalidFramework('The Framework file is missing a required sheet: "%s"' % (page)) # VALIDATE METADATA # Validate 'About' sheet - it must have a name if "about" not in self.sheets: self.sheets["about"] = [pd.DataFrame.from_records([("Unnamed", "No description available")], columns=["name", "description"])] # Get the dataframe which has the name in it - the first one on the page, if there were multiple pages name_df = self.sheets["about"][0] required_columns = ["name"] defaults = dict() valid_content = { "name": None, # Valid content being `None` means that it just cannot be empty } try: name_df = _sanitize_dataframe(name_df, required_columns, defaults, valid_content) except Exception as e: message = 'An error was detected on the "About" sheet in the Framework file -> ' raise Exception("%s -> %s" % (message, e)) from e name_df["name"] = name_df["name"].astype(str) self.name = name_df["name"].iloc[0] if "cascade" in self.sheets and "cascades" not in self.sheets: logger.warning('A sheet called "Cascade" was found, but it probably should be called "Cascades"') if "plot" in self.sheets and "plots" not in self.sheets: logger.warning('A sheet called "Plot" was found, but it probably should be called "Plots"') # VALIDATE POPULATION TYPES # Default to having 'Default' if "population types" not in self.sheets: self.sheets["population types"] = [pd.DataFrame.from_records([(FS.DEFAULT_POP_TYPE, "Default")], columns=["code name", "description"])] available_pop_types = list(self.pop_types.keys()) # Get available pop types # VALIDATE COMPARTMENTS if "compartments" not in self.sheets: self.sheets["compartments"] = [pd.DataFrame(columns=["code name", "display name"])] required_columns = ["display name"] defaults = {"is sink": "n", "is source": "n", "is junction": "n", "databook page": None, "default value": None, "databook order": None, "guidance": None, "population type": None} # Default is for it to be randomly ordered if the databook page is not None valid_content = { "display name": None, # Valid content being `None` means that it just cannot be empty "is sink": {"y", "n"}, "is source": {"y", "n"}, "is junction": {"y", "n"}, } numeric_columns = ["databook order", "default value"] try: self.comps = _sanitize_dataframe(self.comps, required_columns, defaults, valid_content, set_index="code name", numeric_columns=numeric_columns) except Exception as e: message = 'An error was detected on the "Compartments" sheet in the Framework file' raise Exception("%s -> %s" % (message, e)) from e # Assign first population type to any empty population types # In general, if the user has specified any pop types, then the first population type will be # selected as the default in downstream functions e.g. `ProjectData.add_pop` self.comps["population type"] = self.comps["population type"].fillna(available_pop_types[0]) self.comps["duration group"] = None # Store the duration group (the name of the outgoing timed parameter) if there is one # Default setup weight is 1 if in databook or 0 otherwise # This is a separate check because the default value depends on other columns if "setup weight" not in self.comps: self.comps["setup weight"] = (~self.comps["databook page"].isnull()).astype(int) else: fill_ones = self.comps["setup weight"].isnull() & self.comps["databook page"] self.comps["setup weight"][fill_ones] = 1 self.comps["setup weight"] = self.comps["setup weight"].fillna(0) if "calibrate" not in self.comps: # If calibration column is not present, then it calibrate if in the databook default_calibrate = ~self.comps["databook page"].isnull() self.comps["calibrate"] = None self.comps["calibrate"][default_calibrate] = "y" # VALIDATE COMPARTMENTS for comp_name, row in zip(self.comps.index, self.comps.to_dict(orient="records")): if [row["is sink"], row["is source"], row["is junction"]].count("y") > 1: raise InvalidFramework('Compartment "%s" can only be one of Sink, Source, or Junction' % comp_name) if (row["setup weight"] > 0) & (row["is source"] == "y" or row["is sink"] == "y"): raise InvalidFramework('Compartment "%s" is a source or a sink, but has a nonzero setup weight' % comp_name) if (row["setup weight"] > 0) & (pd.isna(row["databook page"])): raise InvalidFramework('Compartment "%s" has a nonzero setup weight, but does not appear in the databook' % comp_name) if (not pd.isna(row["databook page"])) & (row["is source"] == "y" or row["is sink"] == "y"): raise InvalidFramework('Compartment "%s" is a source or a sink, but has a databook page' % comp_name) # It only makes sense to calibrate comps and characs that appear in the databook, because these are the only ones that # will appear in the parset if (pd.isna(row["databook page"])) & (not pd.isna(row["calibrate"])): raise InvalidFramework('Compartment "%s" is marked as being eligible for calibration, but it does not appear in the databook' % comp_name) if pd.isna(row["databook page"]) and not pd.isna(row["databook order"]): logger.warning('Compartment "%s" has a databook order (%s), but no databook page', comp_name, row["databook order"]) if (not pd.isna(row["databook page"])) and not (row["databook page"] in self.sheets["databook pages"][0]["datasheet code name"].values): raise InvalidFramework('Compartment "%s" has databook page "%s" but that page does not appear on the "databook pages" sheet' % (comp_name, row["databook page"])) if row["population type"] not in available_pop_types: raise InvalidFramework('Compartment "%s" has population type "%s" but that population type does not appear on the "population types" sheet - must be one of %s' % (comp_name, row["population type"], available_pop_types)) # VALIDATE CHARACTERISTICS if "characteristics" not in self.sheets: self.sheets["characteristics"] = [pd.DataFrame(columns=["code name", "display name"])] required_columns = ["display name"] defaults = {"components": None, "denominator": None, "default value": None, "databook page": None, "databook order": None, "guidance": None, "population type": None} valid_content = { "display name": None, "components": None, } numeric_columns = ["databook order", "default value"] try: self.characs = _sanitize_dataframe(self.characs, required_columns, defaults, valid_content, set_index="code name", numeric_columns=numeric_columns) except Exception as e: message = 'An error was detected on the "Characteristics" sheet in the Framework file' raise Exception("%s -> %s" % (message, e)) from e # Assign first population type to any empty population types self.characs["population type"] = self.characs["population type"].fillna(available_pop_types[0]) if "setup weight" not in self.characs: self.characs["setup weight"] = (~self.characs["databook page"].isnull()).astype(int) else: fill_ones = self.characs["setup weight"].isnull() & self.characs["databook page"] self.characs["setup weight"][fill_ones] = 1 self.characs["setup weight"] = self.characs["setup weight"].fillna(0) if "calibrate" not in self.characs: # If calibration column is not present, then it calibrate if in the databook default_calibrate = ~self.characs["databook page"].isnull() self.characs["calibrate"] = None self.characs["calibrate"][default_calibrate] = "y" for charac_name, row in zip(self.characs.index, self.characs.to_dict(orient="records")): # Block this out because that way, can validate that there are some nonzero setup weights. Otherwise, user could set setup weights but # not put them in the databook, causing an error when actually trying to run the simulation if (row["setup weight"] > 0) and (row["databook page"] is None): raise InvalidFramework('Characteristic "%s" has a nonzero setup weight, but does not appear in the databook' % charac_name) if row["denominator"] is not None: if row["denominator"] in self.comps.index: if row["population type"] != self.comps.at[row["denominator"], "population type"]: raise InvalidFramework('In Characteristic "%s", included compartment "%s" does not have a matching population type' % (charac_name, row["denominator"])) if (row["setup weight"] > 0) and self.comps.at[row["denominator"], "databook page"] is None: # Check that denominators appear in the databook as described in https://github.com/atomicateam/atomica/issues/433 raise InvalidFramework('Characteristic "%s" is being used for initialization, but the denominator compartment "%s" does not appear in the databook. Denominators that are used in initialization must appear in the databook ' % (charac_name, row["denominator"])) elif row["denominator"] in self.characs.index: if row["population type"] != self.characs.at[row["denominator"], "population type"]: raise InvalidFramework('In Characteristic "%s", included characteristic "%s" does not have a matching population type' % (charac_name, row["denominator"])) if not (self.characs.loc[row["denominator"]]["denominator"] is None): raise InvalidFramework('Characteristic "%s" uses the characteristic "%s" as a denominator. However, "%s" also has a denominator, which means that it cannot be used as a denominator for "%s"' % (charac_name, row["denominator"], row["denominator"], charac_name)) if (row["setup weight"] > 0) and self.characs.at[row["denominator"], "databook page"] is None: # Check that denominators appear in the databook as described in https://github.com/atomicateam/atomica/issues/433 raise InvalidFramework('Characteristic "%s" is being used for initialization, but the denominator characteristic "%s" does not appear in the databook. Denominators that are used in initialization must appear in the databook ' % (charac_name, row["denominator"])) else: raise InvalidFramework('In Characteristic "%s", denominator "%s" was not recognized as a Compartment or Characteristic' % (charac_name, row["denominator"])) if (row["databook page"] is None) and (row["calibrate"] is not None): raise InvalidFramework('Compartment "%s" is marked as being eligible for calibration, but it does not appear in the databook' % charac_name) if (row["databook page"] is not None) and not (row["databook page"] in self.sheets["databook pages"][0]["datasheet code name"].values): raise InvalidFramework('Characteristic "%s" has databook page "%s" but that page does not appear on the "databook pages" sheet' % (charac_name, row["databook page"])) if row["population type"] not in available_pop_types: raise InvalidFramework('Characteristic "%s" has population type "%s" but that population type does not appear on the "population types" sheet - must be one of %s' % (charac_name, row["population type"], available_pop_types)) for component in row["components"].split(","): component = component.strip() if component in self.comps.index: if row["population type"] != self.comps.at[component, "population type"]: raise InvalidFramework('In Characteristic "%s", included compartment "%s" does not have a matching population type' % (charac_name, component)) elif component in self.characs.index: if row["population type"] != self.characs.at[component, "population type"]: raise InvalidFramework('In Characteristic "%s", included characteristic "%s" does not have a matching population type' % (charac_name, component)) else: raise InvalidFramework('In Characteristic "%s", included component "%s" was not recognized as a Compartment or Characteristic' % (charac_name, component)) # VALIDATE INTERACTIONS if "interactions" not in self.sheets: self.sheets["interactions"] = [pd.DataFrame(columns=["code name", "display name", "to population type", "from population type"])] required_columns = ["display name"] defaults = {"default value": None, "from population type": None, "to population type": None} valid_content = { "display name": None, } try: self.interactions = _sanitize_dataframe(self.interactions, required_columns, defaults, valid_content, set_index="code name") except Exception as e: message = 'An error was detected on the "Interactions" sheet in the Framework file' raise Exception("%s -> %s" % (message, e)) from e # Assign first population type to any empty population types self.interactions["from population type"] = self.interactions["from population type"].fillna(available_pop_types[0]) self.interactions["to population type"] = self.interactions["to population type"].fillna(available_pop_types[0]) for interaction_name, row in zip(self.interactions.index, self.interactions.to_dict(orient="records")): if row["from population type"] not in available_pop_types: raise InvalidFramework('Interaction "%s" has population type "%s" but that population type does not appear on the "population types" sheet - must be one of %s' % (interaction_name, row["from population type"], available_pop_types)) if row["to population type"] not in available_pop_types: raise InvalidFramework('Interaction "%s" has population type "%s" but that population type does not appear on the "population types" sheet - must be one of %s' % (interaction_name, row["to population type"], available_pop_types)) # VALIDATE PARAMETERS # This is done last, because validating parameter dependencies requires checking compartments and characteristics required_columns = ["display name", "format"] defaults = { "default value": None, "minimum value": None, "maximum value": None, "function": None, "databook page": None, "databook order": None, "targetable": "n", "guidance": None, "timescale": None, "population type": None, "is derivative": "n", "timed": "n", } valid_content = { "display name": None, "targetable": {"y", "n"}, "is derivative": {"y", "n"}, "timed": {"y", "n"}, } numeric_columns = ["databook order", "default value", "minimum value", "maximum value", "timescale"] try: self.pars = _sanitize_dataframe(self.pars, required_columns, defaults, valid_content, set_index="code name", numeric_columns=numeric_columns) except Exception as e: message = 'An error was detected on the "Parameters" sheet in the Framework file' raise Exception("%s -> %s" % (message, e)) from e # Assign first population type to any empty population types self.pars["population type"] = self.pars["population type"].fillna(available_pop_types[0]) self.pars["format"] = self.pars["format"].map(lambda x: x.strip() if sc.isstring(x) else x) if "calibrate" not in self.pars: default_calibrate = self.pars["targetable"] == "y" self.pars["calibrate"] = None self.pars["calibrate"][default_calibrate] = "y" # Parse the transitions matrix if "transitions" not in self.sheets: self.sheets["transitions"] = [] self._process_transitions() # Now validate each parameter G = nx.DiGraph() # Generate a dependency graph def cross_pop_message(par, quantity_type, quantity_name): spec = self.get_variable(quantity_name)[0] message = f"The function for parameter '{par.name}' in the '{par['population type']}' population type refers to {quantity_type} '{quantity_name}' in the '{spec['population type']}' population type. All cross-population interactions must take place within either the SRC_POP_SUM or SRC_POP_AVG population aggregations" return message # If framework has units that case-insensitively match the standard units, then correct the case lower_idx = self.pars["format"].str.lower().isin(FS.STANDARD_UNITS) self.pars["format"][lower_idx] = self.pars["format"][lower_idx].str.lower() for par_name, par in zip(self.pars.index, self.pars.to_dict(orient="records")): if (par["databook page"] is not None) and not (par["databook page"] in self.sheets["databook pages"][0]["datasheet code name"].values): raise InvalidFramework('Parameter "%s" has databook page "%s" but that page does not appear on the "databook pages" sheet' % (par_name, par["databook page"])) if par["population type"] not in available_pop_types: raise InvalidFramework('Parameter "%s" has population type "%s" but that population type does not appear on the "population types" sheet - must be one of %s' % (par_name, par["population type"], available_pop_types)) if par["is derivative"] == "y" and par["function"] is None: raise InvalidFramework('Parameter "%s" is marked "is derivative" but it does not have a parameter function' % (par_name)) if par["is derivative"] == "y" and par["databook page"] is None: raise InvalidFramework('Parameter "%s" is marked "is derivative" but it does not have a databook page - it needs to appear in the databook so that an initial value can be provided.' % (par_name)) if not np.isfinite(par["timescale"]): if par["format"] in {FS.QUANTITY_TYPE_DURATION}: # Durations should always have a timescale, assumed to be annual by default # This ensures the time units get printed in the databook, even for non-transition parameters self.pars.at[par_name, "timescale"] = 1.0 # Assign default timescale for probability and duration (but not number, since a number might be absolute rather than annual for non-transition parameters) elif self.transitions[par_name] and not np.isfinite(par["timescale"]) and par["format"] in {FS.QUANTITY_TYPE_NUMBER, FS.QUANTITY_TYPE_PROBABILITY, FS.QUANTITY_TYPE_RATE}: # For number and probability, non-transition parameters might not be be in units per time period, therefore having a timescale # is optional *unless* it is a transition parameter self.pars.at[par_name, "timescale"] = 1.0 elif par["format"] == FS.QUANTITY_TYPE_PROPORTION: # Proportion units should never have a timescale since they are time-invariant raise InvalidFramework("Parameter %s is in proportion units, therefore it cannot have a timescale entered for it" % par_name) if par["timed"] == "y": if par["format"] != FS.QUANTITY_TYPE_DURATION: raise InvalidFramework(f'Parameter "{par_name}" is marked as driving a timed transition so the format needs to be "{FS.QUANTITY_TYPE_DURATION}"') if par["is derivative"] == "y": raise InvalidFramework(f'Parameter "{par_name}" is marked as driving a timed transition so it cannot be a derivative parameter') if par["targetable"] == "y": raise InvalidFramework(f'Parameter "{par_name}" is marked as driving a timed transition so it cannot be targeted by programs') if par["format"] in {FS.QUANTITY_TYPE_PROBABILITY, FS.QUANTITY_TYPE_RATE} and par["maximum value"] == 1: # With a timestep of 0.25, the maximum timestep probability of 1 corresponds to an annual probability of 4. Allowing the annual probability to exceed 1 for the purpose of # flow rate calculations impacts dynamics, because capping the value is a non-invertible process. If the annual probability is capped during calculations, then a timestep # probability of 0.3 and 1.0 will both have an annual probability of 1. logger.warning(f'Parameter "{par_name}" is in rate units and a maximum value of "1" has been entered. Rates in the framework should generally not be limited to "1"') if par["function"] is None: # In order to have a value, a transition parameter must either be # - have a function # - appear in the databook # - TODO: be targetable, in which case, the simulation must be run with programs active AND the progbook must have an outcome defined by at least one program in each population if not par["databook page"]: message = 'Parameter "%s" does not have a function OR a databook page. It must have at least one of these entries.' % (par_name) raise InvalidFramework(message) else: if not sc.isstring(par["function"]): message = 'The function for parameter "%s" has not been specified as a string. This can happen if the formula consists only of a number. In that case, you need to put a single quote character at the start of the cell in Excel, to convert the number to a string' % (par_name) raise InvalidFramework(message) _, deps = parse_function(par["function"]) # Parse the function to get dependencies is_aggregation = par["function"].startswith("SRC_POP_AVG") or par["function"].startswith("TGT_POP_AVG") or par["function"].startswith("SRC_POP_SUM") or par["function"].startswith("TGT_POP_SUM") is_cross_aggregation = par["function"].startswith("SRC_POP_AVG") or par["function"].startswith("SRC_POP_SUM") for dep in deps: if dep in ["t", "dt"]: # These are special variables passed in by model.py continue elif "___" in dep: # Note that the parser replaces ':' with '___' if is_aggregation: message = 'The function for parameter "%s" depends on a flow rate ("%s") but the function contains a population aggregation. Population aggregations can only operate on a single variable and cannot operate on flow rate' % (par_name, dep.replace("___", ":")) raise InvalidFramework(message) if self.transitions[par_name]: message = 'The function for parameter "%s" depends on a flow rate ("%s"). However, "%s" also governs a flow rate, because it appears in the transition matrix. Transition parameters cannot depend on flow rates, so no flow rates can appear in the function for "%s"' % (par_name, dep.replace("___", ":"), par_name, par_name) raise InvalidFramework(message) if dep.endswith("___flow"): # If the user requested the flow associated with a parameter dep_name = dep.replace("___flow", "") if dep_name not in self.pars.index: message = 'The function for parameter "%s" depends on the flow rate "%s:flow". This requires a parameter called "%s" to be defined in the Framework, but no parameter with that name was found' % (par_name, dep_name, dep_name) raise InvalidFramework(message) elif not is_cross_aggregation and self.pars.at[dep_name, "population type"] != par["population type"]: raise InvalidFramework(cross_pop_message(par, "compartment", dep_name)) if not self.transitions[dep_name]: # If the user is trying to get the flow rate for a non-transition parameter message = 'The function for parameter "%s" depends on the flow rate "%s:flow". Flow rates are only associated with transition parameters, but "%s" does not appear in the transition matrix, and there is therefore no flow rate associated with it' % (par_name, dep_name, dep_name) raise InvalidFramework(message) else: # If the user requested the flow between compartments deps = dep.split("___") if deps[0]: if deps[0] not in self.comps.index: message = 'The function for parameter "%s" depends on the flow rate "%s". This requires a source compartment called "%s" to be defined in the Framework, but no compartment with that name was found' % (par_name, dep.replace("___", ":"), deps[0]) raise InvalidFramework(message) elif not is_cross_aggregation and self.comps.at[deps[0], "population type"] != par["population type"]: raise InvalidFramework(cross_pop_message(par, "compartment", deps[0])) if deps[1]: if deps[1] not in self.comps.index: message = 'The function for parameter "%s" depends on the flow rate "%s". This requires a destination compartment called "%s" to be defined in the Framework, but no compartment with that name was found' % (par_name, dep.replace("___", ":"), deps[1]) raise InvalidFramework(message) elif not is_cross_aggregation and self.comps.at[deps[1], "population type"] != par["population type"]: raise InvalidFramework(cross_pop_message(par, "compartment", deps[1])) elif dep in self.comps.index: if not is_cross_aggregation and self.comps.at[dep, "population type"] != par["population type"]: raise InvalidFramework(cross_pop_message(par, "compartment", dep)) elif dep in self.characs.index: if not is_cross_aggregation and self.characs.at[dep, "population type"] != par["population type"]: raise InvalidFramework(cross_pop_message(par, "characteristic", dep)) elif dep in self.interactions.index: if not is_aggregation: message = 'The function for parameter "%s" includes the Interaction "%s", which means that the parameter function can only be one of: "SRC_POP_AVG", "TGT_POP_AVG", "SRC_POP_SUM" or "TGT_POP_SUM"' % (par_name, dep) raise InvalidFramework(message) if (len(dep) > 2) and (par["function"].startswith("SRC_POP_SUM") or par["function"].startswith("TGT_POP_SUM")): logger.warning(f"Parameter '{par_name}' has a weighting variable but uses a summation aggregation. It should very likely use SRC_POP_AVG or TGT_POP_AVG instead") # If a population aggregation includes a weighting interaction, then the 'to' population must match this parameter if self.interactions.at[dep, "to population type"] != par["population type"]: message = f""" The parameter '{par_name}' has population type '{par['population type']}' and weights the interaction using '{dep}', which is defined as applying from type '{self.interactions.at[dep,'from population type']}' to type '{self.interactions.at[dep,'to population type']}'. If weighting a cross-type interaction, the 'to' population type in the interaction must match the parameter """ raise InvalidFramework(" ".join(message.split())) # If population aggregation includes a weighting interaction, then the 'from' population must match all other variables in the aggregation for dep2 in deps: if dep2 != dep: var = self.get_variable(dep2)[0] if var["population type"] != self.interactions.at[dep, "from population type"]: message = f""" The parameter '{par_name}' has uses interaction weighting '{dep}', which is defined as applying from type '{self.interactions.at[dep,'from population type']}' to type '{self.interactions.at[dep,'to population type']}'. If weighting a cross-type interaction, the quantity being averaged and the optional weighting quantity must belong to the 'from' population type. However, the parameter contains the quantity '{dep2}' which has population type '{var['population type']}' """ raise InvalidFramework(" ".join(message.split())) if self.interactions.at[dep, "to population type"] != self.interactions.at[dep, "from population type"]: if par["function"].startswith("TGT_"): raise InvalidFramework(f"Parameter '{par_name}' uses interaction {dep} which crosses population types. Because this interaction is directed, only SRC_POP_SUM and SRC_POP_AVG can be used") elif dep in self.pars.index: # If any cycles are present, an informative error will be raised later. However, the common case of a user entering the parameter's name in its own # formula can be caught immediately, and an even more specific error message displayed. We don't include self connections in the graph for this # purpose, if a derivative refers to itself it can be computed any time and we can then simply check if the graph is acyclic to probe for indirect # circular dependencies if self.pars.at[dep, "is derivative"] != "y": if dep == par_name: raise InvalidFramework(f"Parameter '{par_name}' has a parameter function that refers to itself, but it is not marked as a derivative parameter. Circular references are only permitted if the parameter function is providing a derivative") else: G.add_edge(dep, par_name) # Note directionality - we use par_name->dep for the dependency so that the topological ordering of the graph corresponds to the execution order else: # If it's a derivative parameter, then the forward Euler step always updates the parameter's value at the next time index (ti+1). Therefore, # the parameter is not actually a dependency per-se, because there is no requirement to update it first (because it will have already been updated) # Thus, we can simply ignore it here pass if self.pars.at[dep, "population type"] != par["population type"] and not is_aggregation: # Population types for the dependency and the parameter can only differ if it's an aggregation raise InvalidFramework(cross_pop_message(par, "parameter", dep)) else: message = 'The function for parameter "%s" depends on a quantity "%s", but no Compartment, Characteristic, or Parameter with this name was found' % (par_name, dep) raise InvalidFramework(message) # Check that aggregations do not use functions in the first argument if is_aggregation: temp_list = par["function"].split("(")[1] quantity = temp_list.rstrip(")").split(",")[0].strip() if quantity not in set(self.pars.index).union(self.comps.index).union(self.characs.index): message = 'The function for parameter "%s" aggregates the quantity "%s". However, this quantity does not directly correspond to a Compartment, Characteristic, or Parameter. This can occur if you are attempting to aggregate a function of several variables. Population aggregation only supports aggregating a single quantity. As an alternative, you can make a separate parameter with your function prior to performing the aggregation.' % (par.name, quantity) raise InvalidFramework(message) if self.transitions[par_name]: # If this parameter is associated with transitions # Transition parameters must have units defined in the framework if not par["format"]: raise InvalidFramework("Parameter %s is a transition parameter, so it needs to have a format specified in the Framework" % par_name) allowed_formats = {FS.QUANTITY_TYPE_NUMBER, FS.QUANTITY_TYPE_PROBABILITY, FS.QUANTITY_TYPE_RATE, FS.QUANTITY_TYPE_DURATION, FS.QUANTITY_TYPE_PROPORTION} if par["format"] not in allowed_formats: raise InvalidFramework('Parameter %s is a transition parameter so format "%s" is not allowed - it must be one of %s' % (par_name, par["format"], allowed_formats)) from_comps = [x[0] for x in self.transitions[par_name]] to_comps = [x[1] for x in self.transitions[par_name]] # Avoid discussions about how to disaggregate parameters with multiple links from the same compartment. # Note that Parameter.source_popsize() sums over source compartments from all links associated with the parameter. # Therefore, if this check wasn't in place here, the compartments would otherwise get double counted if len(from_comps) != len(set(from_comps)): raise InvalidFramework('Parameter "%s" cannot be associated with more than one transition from the same compartment' % par_name) n_source_outflow = 0 for comp in from_comps: if self.comps.at[comp, "is sink"] == "y": raise InvalidFramework('Parameter "%s" has an outflow from Compartment "%s" which is a sink' % par_name, comp) elif self.comps.at[comp, "is source"] == "y": n_source_outflow += 1 if par["format"] != FS.QUANTITY_TYPE_NUMBER: raise InvalidFramework('Parameter "%s" has an outflow from a source compartment, so it needs to be in "number" units' % par_name) elif self.comps.at[comp, "is junction"] == "y": if par["format"] != FS.QUANTITY_TYPE_PROPORTION: raise InvalidFramework('Parameter "%s" has an outflow from a junction, so it must be in "proportion" units' % par_name) if (par["format"] == FS.QUANTITY_TYPE_PROPORTION) and (self.comps.at[comp, "is junction"] != "y"): raise InvalidFramework('Parameter "%s" has units of "proportion" which means all of its outflows must be from junction compartments, which Compartment "%s" is not' % (par_name, comp)) if n_source_outflow > 1: raise InvalidFramework('Parameter "%s" has an outflow from more than one source compartment, which prevents disaggregation from working correctly' % par_name) if n_source_outflow > 0 and len(from_comps) > 1: raise InvalidFramework(f'Parameter "{par_name}" has an outflow from a source compartment, therefore it cannot be associated with any other transitions') for comp in to_comps: if self.comps.at[comp, "is source"] == "y": raise InvalidFramework('Parameter "%s" has an inflow to Compartment "%s" which is a source' % par_name, comp) else: # If this is not a transition parameter if par["format"] == FS.QUANTITY_TYPE_NUMBER and par["targetable"] == "y": raise InvalidFramework('Parameter "%s" is targetable and in number units, but is not a transition parameter. To target a parameter with programs in number units, the parameter must appear in the transition matrix.' % par_name) # NB. If the user specifies a timescale for a non-transition parameter, it won't have any effect, but it will result in appropriately # labelled units in the databook. So for now, don't throw an error, just proceed # if par['timescale'] is not None: # raise InvalidFramework('Parameter "%s" is not a transition parameter, but has a timescale associated with it. To avoid ambiguity in the parameter value used in functions, non-transition parameters cannot have timescales provided. Please remove the timescale value from the framework.' % par_name) # Check for cycles in the parameter graph if not nx.dag.is_directed_acyclic_graph(G): # Do we have any cycles with length greater than 1? If length = 1, then this has already been checked for message = "Circular dependencies in parameters were found. These are not allowed, apart from derivative parameters being allowed to refer to themselves. The following circular dependencies are present:" for cycle in nx.simple_cycles(G): message += "\n - " + " -> ".join(cycle) raise InvalidFramework(message) # VALIDATE NAMES - No collisions, no keywords code_names = list(self.comps.index) + list(self.characs.index) + list(self.pars.index) + list(self.interactions.index) + list(available_pop_types) tmp = set() for name in code_names: if len(name) == 1: raise InvalidFramework('Code name "%s" is not valid: code names must be at least two characters long' % (name)) if FS.RESERVED_SYMBOLS.intersection(name): raise InvalidFramework('Code name "%s" is not valid: it cannot contain any of these reserved symbols %s' % (name, FS.RESERVED_SYMBOLS)) if name in FS.RESERVED_KEYWORDS: raise InvalidFramework('Requested code name "%s" is a reserved keyword' % name) if name not in tmp: tmp.add(name) else: raise InvalidFramework('Duplicate code name "%s"' % name) display_names = list(self.comps["display name"]) + list(self.characs["display name"]) + list(self.pars["display name"]) + list(self.interactions["display name"]) tmp = set() for name in display_names: if name not in tmp: tmp.add(name) else: raise InvalidFramework('Duplicate display name "%s"' % name) # VALIDATE CASCADES if "cascades" not in self.sheets or not self.cascades: # Make the fallback cascade with name 'Default' used_fallback_cascade = True records = [] for _, spec in self.characs.iterrows(): if not spec["denominator"]: records.append((spec["display name"], spec.name)) self.sheets["cascades"] = sc.promotetolist(pd.DataFrame.from_records(records, columns=["Cascade", "constituents"])) else: used_fallback_cascade = False cascade_names = self.cascades.keys() for name in cascade_names: if name in FS.RESERVED_KEYWORDS: raise InvalidFramework('Requested cascade name "%s" is a reserved keyword' % name) if name in code_names: raise InvalidFramework('Cascade "%s" cannot have the same name as a compartment, characteristic, or parameter' % (name)) if name in display_names: raise InvalidFramework('Cascade "%s" cannot have the same display name as a compartment, characteristic, or parameter' % (name)) for stage_name in self.cascades[name].iloc[:, 0]: if stage_name in FS.RESERVED_KEYWORDS: raise InvalidFramework('Requested cascade stage name "%s" is a reserved keyword' % stage_name) # Check that all cascade constituents match a characteristic or compartment for cascade_name, df in self.cascades.items(): for spec in df.to_dict(orient="records"): if not spec["constituents"]: raise InvalidFramework('In cascade "%s", stage "%s" - no constituents were provided in the spreadsheet' % (cascade_name, spec[cascade_name])) for component in spec["constituents"].split(","): if not (component.strip() in self.comps.index or component.strip() in self.characs.index): raise InvalidFramework('In cascade "%s", stage "%s" - the included component "%s" was not recognized as a Compartment or Characteristic' % (cascade_name, spec[cascade_name], component)) # Check that the cascades are validly nested # This will also check the fallback cascade for cascade_name in self.cascades.keys(): validate_cascade(self, cascade_name, fallback_used=used_fallback_cascade) # VALIDATE PLOTS if "plots" not in self.sheets or not self.sheets["plots"] or self.sheets["plots"][0].empty: self.sheets["plots"] = [pd.DataFrame(columns=["name", "type", "quantities", "plot group"])] required_columns = ["name", "quantities"] defaults = { "type": "series", "plot group": None, } try: self.sheets["plots"][0] = _sanitize_dataframe(self.sheets["plots"][0], required_columns, defaults=defaults, valid_content={}) except Exception as e: message = 'An error was detected on the "Plots" sheet in the Framework file' raise Exception("%s -> %s" % (message, e)) from e if not self.sheets["plots"][0]["name"].is_unique: duplicates = list(self.sheets["plots"][0]["name"][self.sheets["plots"][0]["name"].duplicated()]) raise InvalidFramework(f'Error on "Plots" sheet -> the "Name" column contains duplicate items: {duplicates}') for quantity in self.sheets["plots"][0]["quantities"]: for variables in _extract_labels(sc.promotetolist(evaluate_plot_string(quantity))): if variables.endswith(":flow"): variables = [variables.replace(":flow", "")] elif ":" in variables: variables = variables.split(":") else: variables = [variables] for variable in variables: if variable and variable not in self: raise InvalidFramework(f'Error on "Plots" sheet -> quantity "{variable}" is referenced on the plots sheet but is not defined in the framework') # VALIDATE INITIALIZATION for pop_type in available_pop_types: characs = [] df = self.characs for charac_name in df.index: if df.at[charac_name, "population type"] == pop_type and df.at[charac_name, "databook page"] is not None and df.at[charac_name, "setup weight"]: characs.append(charac_name) comps = [] df = self.comps for comp_name in df.index: if df.at[comp_name, "population type"] == pop_type and df.at[comp_name, "is source"] == "n" and df.at[comp_name, "is sink"] == "n": comps.append(comp_name) if df.at[comp_name, "population type"] == pop_type and df.at[comp_name, "databook page"] is not None and df.at[comp_name, "setup weight"]: characs.append(comp_name) if not comps: # If this population type has no compartments, then no need to initialize anything continue elif len(characs) == 0: # If there are compartments but no characs (i.e. characteristics OR compartments that appear in the databook) then initialize with zeros logger.debug("No compartments or characteristics appear in the databook. All compartments will be assigned an initial value of 0") else: A = np.zeros((len(characs), len(comps))) for i, charac in enumerate(characs): for include in self.get_charac_includes(charac): A[i, comps.index(include)] = 1.0 if np.linalg.matrix_rank(A) < len(comps): logger.debug("Initialization characteristics are underdetermined - this may be intentional, but check the initial compartment sizes carefully") # ASSIGN DURATION GROUPS # This needs to be done for compartments, as well as junctions # We can also validate that junctions do not have cycles at this point if any(self.comps["duration group"]): # For each junction, work out if there are any upstream or downstream timed compartments for junc_name in self.comps.index[self.comps["is junction"] == "y"]: G = nx.MultiDiGraph() for par, edges in self.transitions.items(): G.add_edges_from(edges, par=par) # We are a member of the duration group if there is a path to a timed compartment # that does not go via a timed parameter def get_attached_comps(G, comp_name, direction, comps=None, groups=None, attachments=None): """ Return attached compartments, groups, and attachments This function searches junctions forwards and backwards to find compartments indirectly connected to the junction, and then returns the duration groups of those compartments. This allows intermediate junctions to be assigned to duration groups even if their inflows and outflows are both exclusively to other junctions. - The 'groups' associated with a junction are the set of duration groups for all compartments directly connected to the junction, or indirectly connected via a junction. A group may be associated with the junction even if the link is a flush link - for example, if a compartment in group A flushes into a junction, that junction would be associated with the group A and group A would appear in `groups` - A junction is 'attached' to a group if it is associated with a group via a parameter that isn't a flush link. For example, if a compartment in group A flushes into a junction, the junction is NOT attached to group A. Qualitatively, attachment corresponds to durations being preserved - if a compartment has a flush outflow into a junction, duration is not preserved, whereas if the compartment has a normal flow, duration would be preserved. The return variable `attachments` corresponds to all of the attached groups A junction is assigned a duration group only if - The set of groups and attachments are equal group i.e. if there are any timed inflows, - they must all be from the same duration group - there cannot be any other untimed inflows - all outflows must be into the same duration group Otherwise, the links are untimed, and the only constraint is that upstream groups can't flow into the same downstream groups (which would require TimedLinks). This latter requirement means that it would not be possible to have A -> J1 -> J2 -> A, since A -> J1 -> A would not be allowed unless J1 was attached to group A. :param comp_name: The name of the root compartment :param direction: Traverse the transition graph 'upstream' or 'downstream' :param comps: Compartments at the end of the flow graph :param groups: Groups encountered in the flow graph :param attachments: Attachments encountered in the flow graph :return: Tuple of `(comps, groups, attachments)` """ if comps is None: comps = set() groups = set() attachments = set() if direction == "upstream": edges = G.in_edges(comp_name, data=True) items = [(x[0], x[2]["par"]) for x in edges] elif direction == "downstream": edges = G.out_edges(comp_name, data=True) items = [(x[1], x[2]["par"]) for x in edges] for comp, par in items: if par is None: continue elif self.comps.at[comp, "is junction"] == "y": get_attached_comps(G, comp, direction, comps, groups, attachments) else: comps.add(comp) # Add the inflow comp regardless of how it's connected group = self.comps.at[comp, "duration group"] if group is not None: groups.add(group) if par == ">" or self.pars.at[par, "timed"] != "y": attachments.add(group) return comps, groups, attachments upstream_comps, upstream_groups, upstream_attachments = get_attached_comps(G, junc_name, "upstream") downstream_comps, downstream_groups, downstream_attachments = get_attached_comps(G, junc_name, "downstream") # The logic is # - If there is more than one upstream or downstream group, the junction doesn't belong to the duration group # In that case, there can be no overlap between the duration groups in the upstream and downstream compartments # noting that it's the compartment groups that matter regardless of attachment status # - If there is exactly one upstream or downstream group, then the junction belongs to the duration group # - Similarly, if there are upstream or downstream groups and no groups in the opposite direction, this is also allowed if len(upstream_attachments) == 1 and len(downstream_attachments) == 1 and upstream_attachments == upstream_groups and downstream_attachments == downstream_groups and upstream_attachments == downstream_attachments: # If the self.comps.at[junc_name, "duration group"] = list(upstream_attachments)[0] elif len(upstream_attachments) or len(downstream_attachments): # If there is at least one upstream or downstream attachment, then there can be no overlap between the upstream and downstream groups. if downstream_groups.intersection(upstream_groups): raise InvalidFramework(f'Junction "{junc_name}" has inputs and outputs to multiple duration groups. As these are untimed flows, there can be no overlap in the upstream and downstream groups. Upstream compartments are {upstream_comps} with groups {upstream_groups}. Downstream compartments are {downstream_comps} with groups {downstream_groups}')
[docs] def get_databook_units(self, code_name: str) -> str: """ Return the user-facing units for a quantity given a code name This function returns the units specified in the Framework for quantities defined in the Framework. The units for a quantity are: - For compartments, number - For characteristics, number or fraction depending on whether a denominator is present - For parameters, return either the explicitly specified units plus a timescale, or an empty string - Otherwise, return the inapplicable string (e.g. 'N.A.') This function computes the units dynamically based on the content of the DataFrames. This ensures that it stays in sync with the actual content - for example, if a denominator is programatically added to a characteristic, the units don't also need to be manually updated. Note that at this stage in computation, the units are mainly for managing presentation in the databook. For example, a characteristic with a denominator is technically dimensionless, but we need it to be reported in the databook as a fraction for data entry. Similarly, while the Framework stores the internal units and timescale for a parameter (e.g. 'probability' and '1/365') this function will return 'probability (per day)' for use in the databook. :param code_name: Code name of a quantity supported by ``ProjectFramework.get_variable()`` :return: String containing the units of the quantity """ item_spec, item_type = self.get_variable(code_name) # State variables are in number amounts unless normalized. if item_type in [FS.KEY_COMPARTMENT, FS.KEY_CHARACTERISTIC]: if "denominator" in item_spec.index and item_spec["denominator"] is not None: return FS.QUANTITY_TYPE_FRACTION.title() else: return FS.QUANTITY_TYPE_NUMBER.title() elif item_type == FS.KEY_PARAMETER: units = item_spec["format"].strip() if item_spec["format"] is not None else None if np.isfinite(item_spec["timescale"]): if units is None: raise InvalidFramework(f"A timescale was provided for Framework quantity {code_name} but no units were provided") elif units.lower() == FS.QUANTITY_TYPE_DURATION: return "%s (%s)" % (FS.QUANTITY_TYPE_DURATION.title(), format_duration(item_spec["timescale"], pluralize=True)) elif units.lower() in {FS.QUANTITY_TYPE_NUMBER, FS.QUANTITY_TYPE_PROBABILITY, FS.QUANTITY_TYPE_RATE}: return "%s (per %s)" % (units.title(), format_duration(item_spec["timescale"], pluralize=False)) else: if units is None: raise InvalidFramework(f"A timescale was provided for Framework quantity {code_name} but the units were not one of duration, number, or probability. It is therefore not possible to perform any automatic conversions, simply enter the relevant data entry timescale in the units directly instead.") elif units: return units return FS.DEFAULT_SYMBOL_INAPPLICABLE
[docs] def to_spreadsheet(self) -> sc.Spreadsheet: """ Return content as a Sciris Spreadsheet This :return: A :class:`sciris.Spreadsheet` instance """ # Initialize the bytestream f = io.BytesIO() writer = pd.ExcelWriter(f, engine="xlsxwriter") writer.book.set_properties({"category": "atomica:framework"}) standard_formats(writer.book) # Apply formatting for sheet_name, dfs in self.sheets.items(): if sheet_name == "transitions": # Need to regenerate the transitions based on the actual transitions present dfs = [] for pop_type in self.comps["population type"].unique(): matching_comps = self.comps.index[self.comps["population type"] == pop_type] df = pd.DataFrame(index=matching_comps, columns=matching_comps) df.fillna("", inplace=True) df.index.name = pop_type for par, pairs in self.transitions.items(): for pair in pairs: if pair[0] in matching_comps: if not df.at[pair[0], pair[1]]: df.at[pair[0], pair[1]] = par else: df.at[pair[0], pair[1]] += ", %s" % (par) dfs.append(df) worksheet = writer.book.add_worksheet(sheet_name) writer.sheets[sheet_name] = worksheet # Need to add it to the ExcelWriter for it to behave properly row = 0 for df in dfs: if df.index.name: df.to_excel(writer, sheet_name, startcol=0, startrow=row, index=True) # Write index if present else: df.to_excel(writer, sheet_name, startcol=0, startrow=row, index=False) # Write index if present row += df.shape[0] + 2 # Close the workbook writer.close() # Dump the file content into a ScirisSpreadsheet spreadsheet = sc.Spreadsheet(f) # Return the spreadsheet return spreadsheet
[docs] def save(self, fname) -> None: """ Save framework to disk This function writes a spreadsheet based on the actual dataframes present. This allows programmatic modifications to frameworks to be viewed in Excel. To save the original framework file (if one was loaded in) use `ProjectFramework.original.save()` :param fname: File name to write on disk """ ss = self.to_spreadsheet() ss.save(fname)
def _sanitize_dataframe(df: pd.DataFrame, required_columns: list, defaults: dict, valid_content: dict, set_index: str = None, numeric_columns: list = None) -> pd.DataFrame: """ Take in a DataFrame and sanitize it This function gets called for most/all of the dataframes in the framework. It checks that required columns are present, adds default values, and checks that content is valid. :param df: The DataFrame to sanitize :param required_columns: A list of column names that must be present :param defaults: A dict/odict mapping column names to default values. If a column is not present, it will be initialized with this value. If entries in this column are None, they will be assigned this value The default value can be a lambda function :param valid_content: A dict mapping column names to valid content. If specified, all elements of the column must be members of the iterable (normally this would be a set) If 'valid_content' is None, then instead it will be checked that all of the values are NOT null i.e. use valid_content=None to specify it cannot be empty :param set_index: Optional, if provided, sets the dataframe's index to this column. This column should *not* be in the list of ``required_columns`` as it will not be present after the index is changed :param numeric_columns: A list of column names that should be cast to a numeric type :return: Sanitized dataframe """ if set_index: # If a dataframe has been sanitized and an index column was set, then the index needs to be # restored first before re-sanitizing. If no index was assigned, then attempting to reset # the index will keep creating new columns, so we do not want to reset the index in that case df.reset_index(inplace=True) # First check if there are any duplicate columns in the heading if len(set(df.columns)) < len(df.columns): duplicates = [x for i, x in enumerate(df.columns.values) if x in df.columns[:i]] raise InvalidFramework(f"Duplicate headings present: {duplicates}") # Next check required columns are present if set_index is not None: if set_index not in df.columns: raise InvalidFramework(f'Mandatory index column "{set_index}" is missing') df.set_index(set_index, inplace=True) if any(df.index.isnull()): raise InvalidFramework('The first column contained an empty cell (this probably indicates that a "code name" was left empty)') if not df.index.is_unique: raise InvalidFramework(f"Row indices are not unique. The duplicate items are {set(df.index[df.index.duplicated()])}") for col in required_columns: if col not in df: raise InvalidFramework('A required column "%s" is missing' % col) # Then fill in default values for col, default_value in defaults.items(): if col not in df: df[col] = default_value elif default_value is not None: df[col] = df[col].fillna(default_value) # Finally check content for col, validation in valid_content.items(): if col not in df: raise InvalidFramework('While validating, a required column "%s" was missing' % col) # NB. This error is likely to be the result of a developer adding validation for a column without setting a default for it if validation is None: if df[col].isnull().any(): raise InvalidFramework('The column "%s" cannot contain any empty cells' % (col)) else: validation = set(validation) if not set(df[col]).issubset(validation): raise InvalidFramework('The column "%s" can only contain the following values: %s' % (col, validation)) # Strip all strings if df.columns.isnull().any(): raise InvalidFramework("There cannot be any empty cells in the header row") df.columns = [x.strip() for x in df.columns] if numeric_columns is not None: for col in numeric_columns: try: df[col] = pd.to_numeric(df[col]) except ValueError as E: raise InvalidFramework(f'Column "{col}" must contain numeric values. Invalid values are {[x for x in df[col] if x is not None and not sc.isnumber(x)]}') from E except Exception as E: raise InvalidFramework(f'Error validating column "{col}" when checking that it only contains numeric values') from E return df
[docs]def generate_framework_doc(framework, fname, databook_only=False): """ Generate a framework documentation template file This function takes in a Framework and a file name, and writes a Markdown template file for the framework :param F: A :class:`ProjectFramework` instance :param fname: The filename to write :param databook_only: If True, only quantities appearing in the databook will be shown :return: None """ with open(fname, "w") as f: # Write the heading f.write("# Framework overview\n\n") f.write("**Name**: %s\n\n" % framework.name) f.write("**Description**: %s\n\n" % framework.sheets["about"][0]["description"].iloc[0]) f.write("## Contents\n") f.write("- [Compartments](#compartments)\n") f.write("- [Characteristics](#characteristics)\n") f.write("- [Parameters](#parameters)\n") f.write("- [Interactions](#interactions)\n\n") if "plots" in framework.sheets: f.write("- [Plots](#plots)\n\n") if "cascades" in framework.sheets: f.write("- [Cascades](#cascades)\n\n") f.write("## Compartments\n\n") for _, spec in framework.comps.iterrows(): if databook_only and not spec["databook page"]: continue f.write("### Compartment: %s\n\n" % (spec["display name"])) f.write("- Code name: `%s`\n" % (spec.name)) if spec["is source"] == "y": f.write("- Is source\n") if spec["is sink"] == "y": f.write("- Is sink\n") if spec["is junction"] == "y": f.write("- Is junction\n") if spec["calibrate"] == "y": f.write("- Value can be used for calibration\n") if spec["databook page"]: f.write("- Appears in the databook\n") else: f.write("- Does not appear in the databook\n") if spec["setup weight"] > 0: f.write("- Databook values will be used for model initialization\n") f.write("\n") f.write("- Description: <ENTER DESCRIPTION>\n") f.write("- Data entry guidance: %s\n" % (spec["guidance"] if spec["guidance"] else "<ENTER GUIDANCE>")) f.write("\n") f.write("## Characteristics\n\n") for _, spec in framework.characs.iterrows(): if databook_only and not spec["databook page"]: continue f.write("### Characteristic: %s\n\n" % (spec["display name"])) f.write("- Code name: `%s`\n" % (spec.name)) if spec["calibrate"] == "y": f.write("- Value can be used for calibration\n") f.write("- Includes:\n") for inc_name in spec["components"].split(","): f.write("\t- %s\n" % (framework.get_label(inc_name.strip()))) if spec["denominator"]: f.write("- Denominator: %s\n" % (framework.get_label(spec["denominator"]))) if spec["databook page"]: f.write("- Appears in the databook\n") else: f.write("- Does not appear in the databook\n") if spec["setup weight"] > 0: f.write("- Databook values will be used for model initialization\n") f.write("\n") f.write("- Description: <ENTER DESCRIPTION>\n") f.write("- Data entry guidance: %s\n" % (spec["guidance"] if spec["guidance"] else "<ENTER GUIDANCE>")) f.write("\n") # Work out functional dependencies fcn_deps = {x: set() for x in framework.pars.index.values} fcn_used_in = {x: set() for x in framework.pars.index.values} for _, spec in framework.pars.iterrows(): if spec["function"]: _, deps = parse_function(spec["function"]) # Parse the function to get dependencies for dep in deps: if dep.endswith("___flow"): fcn_deps[spec.name].add(framework.get_label(dep.replace("___flow", "")) + " flow rate") elif "___" in dep: from_comp, to_comp = dep.split("___") label = "Flow" if from_comp: label += " from %s" % (framework.get_label(from_comp)) if to_comp: label += " to %s" % (framework.get_label(to_comp)) fcn_deps[spec.name].add(label) elif dep == "t": fcn_deps[spec.name].add("Time") elif dep == "dt": fcn_deps[spec.name].add("Step size") else: fcn_deps[spec.name].add(framework.get_label(dep)) if dep in fcn_deps: fcn_used_in[dep].add(spec["display name"]) f.write("## Parameters\n\n") for _, spec in framework.pars.iterrows(): if databook_only and not spec["databook page"]: continue f.write("### Parameter: %s\n\n" % (spec["display name"])) f.write("- Code name: `%s`\n" % (spec.name)) if spec["calibrate"] == "y": f.write("- Value can be used for calibration\n") f.write("- Units/format: %s\n" % (spec["format"])) if spec["minimum value"] is not None and spec["maximum value"] is not None: f.write("- Value restrictions: %s-%s\n" % (sc.sigfig(spec["minimum value"], keepints=True), sc.sigfig(spec["maximum value"], keepints=True))) elif spec["minimum value"] is not None: f.write("- Value restrictions: At least %s\n" % (sc.sigfig(spec["minimum value"], keepints=True))) elif spec["maximum value"] is not None: f.write("- Value restrictions: At most %s\n" % (sc.sigfig(spec["maximum value"], keepints=True))) if framework.transitions[spec.name]: f.write("- Contributes to transitions from:\n") for transition in framework.transitions[spec.name]: f.write('\t- "%s" to "%s"\n' % (framework.get_label(transition[0]), framework.get_label(transition[1]))) f.write("- Default value: %s\n" % (spec["default value"])) if spec["databook page"]: f.write("- Appears in the databook\n") else: f.write("- Does not appear in the databook\n") if spec["function"]: f.write("- This parameter's value is computed by a function: `%s`\n" % (spec["function"])) if fcn_deps[spec.name]: f.write("- Depends on:\n") for dep in fcn_deps[spec.name]: f.write('\t- "%s"\n' % (dep)) if fcn_used_in[spec.name]: f.write("- Used to compute:\n") for dep in fcn_used_in[spec.name]: f.write('\t- "%s"\n' % (dep)) f.write("\n") f.write("- Description: <ENTER DESCRIPTION>\n") f.write("- Data entry guidance: %s\n" % (spec["guidance"] if spec["guidance"] else "<ENTER GUIDANCE>")) f.write("\n") f.write("## Interactions\n\n") for _, spec in framework.interactions.iterrows(): f.write("### Interaction: %s\n\n" % (spec["display name"])) f.write("- Code name: `%s`\n" % (spec.name)) used_to_compute = [] for x, deps in fcn_deps.items(): if spec["display name"] in deps: used_to_compute.append(framework.get_label(x)) if used_to_compute: f.write("- Used to compute:\n") for x in used_to_compute: f.write('\t- "%s"\n' % (x)) f.write("\n") f.write("- Description: <ENTER DESCRIPTION>\n") f.write("- Data entry guidance: <ENTER GUIDANCE>\n") f.write("\n") if "plots" in framework.sheets: f.write("## Plots\n\n") for _, spec in framework.sheets["plots"][0].iterrows(): f.write("### Plot: %s\n\n" % (spec["name"])) f.write("- Definition: `%s`\n" % (spec["quantities"])) f.write("- Description: <ENTER DESCRIPTION>\n\n") if framework.cascades: f.write("## Cascades\n\n") for name, df in framework.cascades.items(): f.write("### Cascade: %s\n\n" % (name)) f.write("- Description: <ENTER DESCRIPTION>\n") f.write("- Stages:\n") for _, stage in df.iterrows(): f.write("\t- %s\n" % (stage[0])) for inc_name in stage[1].split(","): f.write("\t\t- %s\n" % (framework.get_label(inc_name.strip()))) f.write("\n")