# -*- coding: utf-8 -*-
"""
Miscellaneous utility functions for Excel files
This module implements utility functions for Excel functionality that is common
to different kinds of spreadsheets used in Atomica (e.g. Databooks and Program Books).
For example, Excel formatting, and time-varying data entry tables, are implemented here.
"""
from xlsxwriter.utility import xl_rowcol_to_cell as xlrc
import sciris as sc
import io
import numpy as np
from .system import FrameworkSettings as FS
import pandas as pd
from .utils import format_duration, datetime_to_year
import xlsxwriter
from typing import Tuple
from openpyxl.utils import get_column_letter
__all__ = ["standard_formats", "apply_widths", "update_widths", "transfer_comments", "copy_sheet", "read_tables", "read_dataframes", "TimeDependentConnections", "TimeDependentValuesEntry", "cell_get_string", "cell_get_number", "validate_category"]
# Suppress known warning in Openpyxl
# Warnings are:
# - C:\ProgramData\Miniconda3\envs\atomica37\lib\site-packages\openpyxl\worksheet\_reader.py:300: UserWarning: Conditional Formatting extension is not supported and will be removed
#   warn(msg)
# - C:\ProgramData\Miniconda3\envs\atomica37\lib\site-packages\openpyxl\worksheet\_reader.py:300: UserWarning: Data Validation extension is not supported and will be removed
#   warn(msg)
# This means that conditional formatting and data valuation rules aren't being loaded, but since `data_only=True` these don't matter and can be safely ignored
import warnings
warnings.filterwarnings(action="ignore", category=UserWarning, module="openpyxl.worksheet", lineno=300)
[docs]
def apply_widths(worksheet: xlsxwriter.workbook.Worksheet, width_dict: dict) -> None:
    """
    Set column widths
    Given a dictionary of precomputed character counts (typically the maximum number of characters in
    any cell written to each column), set the width of the column in Excel accordingly. The Excel
    column width is slightly larger than the character count.
    :param worksheet: A Worksheet instance
    :param width_dict: A dictionary of character widths to use for each column (by index)
    """
    for idx, width in width_dict.items():
        worksheet.set_column(idx, idx, width * 1.1 + 1) 
[docs]
def update_widths(width_dict: dict, column_index: int, contents: str) -> None:
    """
    Keep track of required width for a column
    ``width_dict`` is a dict that is keyed by column index e.g. 0,1,2
    and the value is the length of the longest contents seen for that column
    :param width_dict: Storage dictionary
    :param column_index: Index of the column value has been inserted in
    :param contents: Content, length of which is used to set width
    """
    if width_dict is None or contents is None or not sc.isstring(contents):
        return
    if len(contents) == 0:
        return
    if column_index not in width_dict:
        width_dict[column_index] = len(contents)
    else:
        width_dict[column_index] = max(width_dict[column_index], len(contents)) 
[docs]
def copy_sheet(source: str, sheet_name: str, workbook: xlsxwriter.Workbook) -> None:
    """
    Copy a sheet into a Workbook
    This function allows values to be copied from a file into a Workbook. The
    main use case is to support custom sheets in databooks that are not otherwise parsed
    but which might need to be retained. In particular, the ``ProjectData`` class does
    not parse ignored sheets at all, because no guarantees can be made about the quantity and
    type of the content, and whether there are formulas etc. that would be lost. In some cases
    though, it may be necessary to edit the databook and preserve specific sheets. In general,
    this can be done by using the ``to_workbook()`` method and then manually performing whatever
    operations are necessary to preserve the content on the extra sheets. However, when the
    extra sheet content is extremely simple e.g. just a table of values, then this helper
    function can be used to facilitate copying the content.
    Warning - note that Excel functions, formatting, and comments will NOT be preserved.
    :param source: File name of the spreadsheet to read the source sheet from or an ``sc.Spreadsheet`` instance
    :param sheet_name: Name of the sheet to write to
    :param workbook: A Workbook instance to add the sheet to
    :return: None - the sheet will be added to the Workbook in-place
    """
    import openpyxl
    if sc.isstring(source):
        source = sc.Spreadsheet(source)
    src_workbook = openpyxl.load_workbook(source.tofile(), read_only=True, data_only=True)  # Load in read-only mode for performance, since we don't parse comments etc.
    src_worksheet = src_workbook[sheet_name]
    dst_worksheet = workbook.add_worksheet(sheet_name)
    for i, row in enumerate(src_worksheet.rows):
        for j, cell in enumerate(row):
            dst_worksheet.write(i, j, cell.value)
    src_workbook.close() 
[docs]
def read_tables(worksheet) -> Tuple[list, list]:
    """
    Read tables from sheet
    :param worksheet: An openpyxl worksheet
    :return: A tuple containing - A list of tables (which is a list of rows, terminated by an empty row in the original spreadsheet),
             and a list of start row indices for each table read in
    """
    # This function takes in a openpyxl worksheet, and returns tables
    # A table consists of a block of rows with any #ignore rows skipped
    # This function will start at the top of the worksheet, read rows into a buffer
    # until it gets to the first entirely empty row
    # And then returns the contents of that buffer as a table. So a table is a list of openpyxl rows
    # This function continues until it has exhausted all of the rows in the sheet
    buffer = []
    tables = []
    start_rows = []
    start = None
    for i, row in enumerate(worksheet.rows):
        # Determine whether to skip the row, add it to the buffer, or flush buffer into a table
        flush = False
        for j, cell in enumerate(row):
            if cell.value:
                if cell.data_type == "s" and cell.value.startswith("#ignore"):
                    if j == 0:
                        break  # If #ignore is encountered in the first column, skip the row and continue parsing the table
                    else:
                        flush = True
                        break  # If #ignore is encoutered after preceding blank cells
                else:
                    # Read the row into the buffer and continue parsing the table
                    if not buffer:
                        start = i + 1  # Excel rows are indexed starting at 1
                    buffer.append(row)
                    break  # If the cell has a value in it, continue parsing the table
        else:
            if buffer:
                flush = True
        if flush:
            tables.append(buffer)  # Only append the buffer if it is not empty
            start_rows.append(start)
            buffer = []
    # After the last row, if the buffer has some un-flushed contents, then include it in the last table
    if buffer:
        tables.append(buffer)
        start_rows.append(start)
    return tables, start_rows 
[docs]
def read_dataframes(worksheet, merge=False) -> list:
    """
    Read dataframes from sheet
    This function operates similarly to ``read_tables`` except it returns Dataframes instead of
    cells. This enables the dataframes to be constructed more quickly, at the expense of being
    able to track the cell references and row numbers. These are shown for databooks (via ``read_tables``)
    but not for frameworks (which go via ``read_dataframes``)
    :param worksheet: An openpyxl worksheet
    :param merge: If False (default) then blank rows will be used to split the dataframes. If True, only one
                  DataFrame will be returned
    :return: A list of DataFrames
    """
    if worksheet.max_row is None or worksheet.max_column is None:
        worksheet.calculate_dimension(force=True)
    content = np.empty((worksheet.max_row, worksheet.max_column), dtype="object")
    ignore = np.zeros((worksheet.max_row), dtype=bool)
    empty = np.zeros((worksheet.max_row), dtype=bool)  # True for index where a new table begins
    for i, row in enumerate(worksheet.rows):
        any_values = False  # Set True if this row contains any values
        for j, cell in enumerate(row):
            v = cell.value
            if cell.data_type == "s":
                if not any_values and v.startswith("#ignore"):
                    # If we encounter a #ignore and it's the first content in the row
                    if j == 0:
                        # If it's the first cell, ignore the row (i.e., do NOT treat it as a blank row)
                        ignore[i] = True  # Ignore the row
                    break
                elif v.startswith("#ignore"):
                    # Skip the rest of the row
                    content[i, j:] = None
                    break
                else:
                    v = v.strip()
                    any_values = any_values or bool(v)  # If it's a string type, call strip() before checking truthiness
            elif v is not None:
                any_values = True
            content[i, j] = v
        if not any_values:
            empty[i] = True
    tables = []
    if merge:
        ignore[empty] = True
        if all(ignore):
            return []
        tables.append(content[~ignore, :])
    else:
        # A change from False to True means that we need to start a new table
        # A True followed by a True doesn't start a new table but instead gets ignored
        content = content[~ignore, :]
        empty = empty[~ignore]
        # If there is no content at all, return immediately
        if all(empty):
            return []
        idx = []
        start = None
        for i in range(len(empty)):
            # Check for the start of a table
            if not empty[i] and start is None:
                start = i
            # Check for the end of a table
            if empty[i] and start is not None:
                # If we were reading a table and have reached an empty row
                idx.append((start, i))
                start = None
            elif i + 1 == len(empty) and start is not None:
                # If we were reading a table and have reached the end of the data
                idx.append((start, i + 1))
                start = None
        tables = []
        for start, stop in idx:
            tables.append(content[start:stop].copy())  # Use .copy() so that we don't retain any references to the original full array outside this function
    dfs = []
    for table in tables:
        df = pd.DataFrame(table)
        df.dropna(axis=1, how="all", inplace=True)
        df.columns = df.iloc[0]
        df = df[1:]
        dfs.append(df)
    return dfs 
[docs]
class TimeDependentConnections:
    """
    Structure for reading/writing interactions
    A :class:`TimeDependentConnections` object is suitable when there are time dependent interactions between two quantities
    This class is used for both transfers and interactions. The content that it writes consists of
    - A connection matrix table that has Y/N selection of which interactions are present between two things
    - A set of pairwise connections specifying to, from, units, assumption, and time
    Interactions can have a diagonal, whereas transfers cannot (e.g. a population can infect itself but cannot transfer to itself).
    In Excel, a :class:`TimeDependentConnections` maps to three tables
    1. A table to enter the code name and full name
    2. An interactions matrix with Y/N indicating whether an interaction between two populations exists
    3. A set of rows for entering time varying data for each pair of populations
    :param code_name: the code name of this quantity e.g. 'aging'
    :param full_name: the full name of this quantity e.g. 'Aging'
    :param tvec: time values for the time-dependent rows
    :param pops: list of strings to use as the rows and columns - these are typically lists of population code names
    :param type: 'transfer' or 'interaction'. A transfer cannot have diagonal entries, and can have Number or Probability formats. An Interaction can have
                 diagonal entries and only has N.A. formats
    :param ts: Optionally specify a dict containing all of the non-empty TimeSeries objects used. The format is ``{(from_pop, to_pop):TimeSeries}``.
               An interaction can only be Y/N for clarity, if it is Y then a row is displayed for the TimeSeries. Actually, the Y/N can be
               decided in the first instance based on the provided TimeSeries i.e. if a TimeSeries is provided for an interaction, then the
               interaction must have been marked with Y
    :param pop_type: Specify pop_type, which is used by :meth:`ProjectData.add_pop` to determine which TDCs to add new populations to
    """
    def __init__(self, code_name: str, full_name: str, tvec: np.array, from_pops: list, to_pops: list, interpop_type: str, ts: dict = None, from_pop_type: str = None, to_pop_type: str = None):
        self.code_name = code_name
        self.full_name = full_name
        self.type = interpop_type
        self.from_pop_type = from_pop_type
        self.from_pops = from_pops
        self.to_pop_type = to_pop_type
        self.to_pops = to_pops
        self.tvec = tvec
        self.ts = ts if ts is not None else sc.odict()
        self.attributes = {}  #: Attributes associated with the table
        self.ts_attributes = {}  #: Attributes associated with each TimeSeries row. This is a dict-of-dicts with `ts_attributes[attribute]={(from_pop, to_pop):value}` e.g., `ts_attributes["Provenance"] = {('0-4', '5-14'): 'value'}`
        self.ts_attributes["Provenance"] = {}  # Include provenance attribute by default
        self.assumption_heading = "Constant"  #: Heading to use for assumption column
        self.write_units = None  #: Write a column for units (if None, units will be written if any of the TimeSeries have units)
        self.write_uncertainty = None  #: Write a column for units (if None, units will be written if any of the TimeSeries have uncertainty)
        self.write_assumption = None  #: Write a column for units (if None, units will be written if any of the TimeSeries have an assumption)
        if self.type == "transfer":
            self.enable_diagonal = False
            self.allowed_units = []
            self.allowed_units.append("%s (%s)" % (FS.QUANTITY_TYPE_NUMBER.title(), format_duration(1, pluralize=True)))
            self.allowed_units.append("%s (per %s)" % (FS.QUANTITY_TYPE_RATE.title(), format_duration(1, pluralize=False)))
            self.allowed_units.append("%s (%s)" % (FS.QUANTITY_TYPE_DURATION.title(), format_duration(1, pluralize=True)))
        elif self.type == "interaction":
            self.enable_diagonal = True
            self.allowed_units = [FS.DEFAULT_SYMBOL_INAPPLICABLE]
        else:
            raise Exception('Unknown TimeDependentConnections type - must be "transfer" or "interaction"')
    def __repr__(self):
        return '<TDC %s "%s">' % (self.type.title(), self.code_name)
[docs]
    @classmethod
    def from_tables(cls, tables: list, interaction_type):
        """
        Instantiate based on list of tables
        This method instantiates and initializes a new :class:`TimeDependentConnections` object from
        tables that have been read in using :func:`read_tables`. Note that the parent object
        such as :class:`ProjectData` is responsible for extracting the tables and passing them
        to this function. For instance, the transfers sheet might contain more than one set of
        tables, so it is the calling function's responsibility to split those tables up into
        the groups of three expected by this method.
        :param tables: A list of tables. A table here is a list of rows, and a row is a list of cells.
        :param interaction_type: A string identifying the interaction type - either 'transfer' or 'interaction'
        :return: A new :class:`TimeDependentConnections` instance
        """
        from .utils import TimeSeries  # Import here to avoid circular reference
        assert interaction_type in {"transfer", "interaction"}, "Unknown interaction type"
        # Read the TDC definition table (including attributes)
        code_name = None
        full_name = None
        from_pop_type = None
        to_pop_type = None
        # Start by reading the TDC header row specifying the name and pop types
        # This table also contains only a single row. Any subsequent rows will automatically be ignored
        attributes = {}
        for header_cell, value_cell in zip(tables[0][0], tables[0][1]):
            header = cell_get_string(header_cell, allow_empty=True)
            if header is None:
                continue
            elif header.startswith("#ignore"):
                break
            lowered_header = header.lower()
            if lowered_header == "abbreviation":
                code_name = cell_get_string(value_cell)
            elif lowered_header == "full name":
                full_name = cell_get_string(value_cell)
            elif lowered_header == "from population type":
                from_pop_type = cell_get_string(value_cell, True)
            elif lowered_header == "to population type":
                to_pop_type = cell_get_string(value_cell, True)
            else:
                attributes[header] = value_cell.value
        if interaction_type == "transfer":
            assert from_pop_type == to_pop_type, "Transfers can only occur between populations of the same type"
        if code_name is None:
            raise Exception("Code name/abbreviation missing")
        if full_name is None:
            raise Exception("Full name missing")
        # Read the pops from the Y/N table. The Y/N content of the table depends on the timeseries objects that
        # are present. That is, if the Y/N matrix contains a Y then a TimeSeries must be read in, and vice versa.
        # Therefore, we don't actually parse the matrix, and instead just read in all the TimeSeries instances
        # that are defined and infer the matrix that way. However, we do still need to parse the to_pops and from_pops
        # These both get parsed because for interactions across pop types they could be different. Blank cells in either
        # the row or the column signal the end of parsing those population lists
        # Read the to_pops from the first row of the table, until a blank cell is encountered
        to_pops = []
        for cell in tables[1][0][1:]:
            if cell.value is None or (cell.data_type == "s" and cell.value.startswith("#ignore")):
                break
            else:
                to_pops.append(cell.value)
        # Read the from_pops from the first column, until a blank cell is encountered. Note that a #ignore
        # can never be present in the first column, because this would cause the row to be skipped prior to this point
        from_pops = []
        for row in tables[1][1:]:
            if row[0].value is None:
                break
            else:
                from_pops.append(row[0].value)
        # Instantiate the TDC object based on the metadata from the first two tables
        tdc = cls(code_name, full_name, None, from_pops=from_pops, to_pops=to_pops, interpop_type=interaction_type, from_pop_type=from_pop_type, to_pop_type=to_pop_type)
        tdc.attributes = attributes
        known_headings = {"from population", "to population", "units", "uncertainty", "constant", "assumption"}
        headings, times, tdc.tvec = _parse_ts_header(tables[2][0], known_headings, skip_first=False)
        # Validate and process headings
        if not times and "constant" not in headings:
            raise Exception("Could not find an assumption or time-specific value - all tables must contain at least one of these values")
        tdc.write_units = True if "units" in headings else None
        tdc.write_uncertainty = True if "uncertainty" in headings else None
        tdc.write_assumption = True if "constant" in headings else None
        if "assumption" in headings:
            tdc.write_assumption = True
            tdc.assumption_heading = "Assumption"
        for heading in headings:
            if heading not in known_headings:
                tdc.ts_attributes[heading] = {}
        tdc.ts = sc.odict()
        for row in tables[2][1:]:
            if row[0].value != "...":
                assert row[0].value in from_pops, 'Population "%s" not found - should be contained in %s' % (row[0].value, from_pops)
                assert row[2].value in to_pops, 'Population "%s" not found - should be contained in %s' % (row[2].value, to_pops)
                vals = [x.value for x in row]
                from_pop = vals[0]
                to_pop = vals[2]
                if "units" in headings:
                    units = cell_get_string(row[headings["units"]], allow_empty=True)
                    if units is not None and units.lower().strip() in FS.STANDARD_UNITS:
                        units = units.lower().strip()  # Only lower and strip units if they are standard units
                else:
                    units = None
                ts = TimeSeries(units=units)
                if "uncertainty" in headings:
                    ts.sigma = cell_get_number(row[headings["uncertainty"]])
                else:
                    ts.sigma = None
                if "constant" in headings:
                    ts.assumption = cell_get_number(row[headings["constant"]])
                elif "assumption" in headings:
                    ts.assumption = cell_get_number(row[headings["assumption"]])
                else:
                    ts.assumption = None
                for attribute in tdc.ts_attributes:
                    if attribute in headings:
                        tdc.ts_attributes[attribute][(from_pop, to_pop)] = row[headings[attribute]].value
                for t, idx in times.items():
                    ts.insert(t, cell_get_number(row[idx]))  # If cell_get_number returns None, this gets handled accordingly by ts.insert()
                tdc.ts[(from_pop, to_pop)] = ts
        return tdc 
[docs]
    def write(self, worksheet, start_row, formats, references: dict = None, widths: dict = None) -> int:
        """
        Write to cells in a worksheet
        :param worksheet: An xlsxwriter worksheet instance
        :param start_row: The first row in which to write values
        :param formats: Format dict for the opened workbook - typically the return value of :func:`standard_formats` when the workbook was opened
        :param references: References dict containing cell references for strings in the current workbook
        :param widths: ``dict`` storing column widths
        :return: The row index for the next available row for writing in the spreadsheet
        """
        assert self.assumption_heading in {"Constant", "Assumption"}, "Unsupported assumption heading"
        write_units = self.write_units if self.write_units is not None else any((ts.units is not None for ts in self.ts.values()))
        write_uncertainty = self.write_uncertainty if self.write_uncertainty is not None else any((ts.sigma is not None for ts in self.ts.values()))
        write_assumption = self.write_assumption if self.write_assumption is not None else any((ts.assumption is not None for ts in self.ts.values()))
        if not references:
            references = {x: x for x in self.from_pops + self.to_pops}  # Default null mapping for populations
        # First, write the name entry table
        current_row = start_row
        column = 0
        worksheet.write(current_row, column, "Abbreviation", formats["center_bold"])
        update_widths(widths, column, "Abbreviation")
        worksheet.write(current_row + 1, column, self.code_name)
        update_widths(widths, column, self.code_name)
        column += 1
        worksheet.write(current_row, column, "Full Name", formats["center_bold"])
        update_widths(widths, column, "Full Name")
        worksheet.write(current_row + 1, column, self.full_name)
        update_widths(widths, column, self.full_name)
        column += 1
        worksheet.write(current_row, column, "From population type", formats["center_bold"])
        update_widths(widths, column, "From population type")
        worksheet.write(current_row + 1, column, self.from_pop_type)
        update_widths(widths, column, self.from_pop_type)
        column += 1
        worksheet.write(current_row, column, "To population type", formats["center_bold"])
        update_widths(widths, column, "To population type")
        worksheet.write(current_row + 1, column, self.to_pop_type)
        update_widths(widths, column, self.to_pop_type)
        for attribute, value in self.attributes.items():
            column += 1
            worksheet.write(current_row, column, attribute, formats["center_bold"])
            update_widths(widths, column, attribute)
            worksheet.write(current_row + 1, column, value)
            update_widths(widths, column, value)
        # Then, write the Y/N matrix
        current_row += 3  # Leave a blank row below the matrix
        # Note - table_references are local to this TimeDependentConnections instance
        # For example, there could be two transfers, and each of them could potentially transfer between 0-4 and 5-14
        # so the worksheet might contain two references from 0-4 to 5-14 but they would be for different transfers and thus
        # the time-dependent rows would depend on different boolean table cells
        current_row, table_references, values_written = self._write_pop_matrix(worksheet, current_row, formats, references, boolean_choice=True, widths=widths)
        # Finally, write the time dependent part
        headings = []
        headings.append("From population")
        headings.append("")  # --->
        headings.append("To population")
        offset = len(headings)
        attribute_index = {}
        for attribute in self.ts_attributes:
            attribute_index[attribute] = offset
            headings.append(attribute)
            offset += 1
        if write_units:
            headings.append("Units")
            units_index = offset  # Column to write the units in
            offset += 1
        if write_uncertainty:
            headings.append("Uncertainty")
            uncertainty_index = offset  # Column to write the units in
            offset += 1
        if write_assumption:
            headings.append(self.assumption_heading)
            headings.append("")
            constant_index = offset
            offset += 1
            if len(self.tvec):
                offset += 1  # Additional offset for the 'OR' column
        headings += [float(x) for x in self.tvec]
        for i, entry in enumerate(headings):
            worksheet.write(current_row, i, entry, formats["center_bold"])
            update_widths(widths, i, entry)
        # Now, we will write a wrapper that gates the content
        # If the gating cell is 'Y', then the content will be displayed, otherwise not
        def gate_content(content, gating_cell):
            if content.startswith("="):  # If this is itself a reference
                return '=IF(%s="Y",%s,"...")' % (gating_cell, content[1:])
            else:
                return '=IF(%s="Y","%s","...")' % (gating_cell, content)
        for from_idx in range(0, len(self.from_pops)):
            for to_idx in range(0, len(self.to_pops)):
                current_row += 1
                from_pop = self.from_pops[from_idx]
                to_pop = self.to_pops[to_idx]
                entry_tuple = (from_pop, to_pop)
                entry_cell = table_references[entry_tuple]
                # Write hyperlink
                if values_written[entry_cell] != FS.DEFAULT_SYMBOL_INAPPLICABLE:
                    worksheet.write_url(entry_cell, "internal:%s!%s" % (worksheet.name, xlrc(current_row, 1)), cell_format=formats["center_unlocked"], string=values_written[entry_cell])
                    worksheet.write_url(xlrc(current_row, 1), "internal:%s!%s" % (worksheet.name, entry_cell), cell_format=formats["center_unlocked"])
                if entry_tuple in self.ts:
                    ts = self.ts[entry_tuple]
                    format = formats["not_required"]
                else:
                    ts = None
                    format = formats["unlocked"]
                if ts:
                    worksheet.write_formula(current_row, 0, gate_content(references[from_pop], entry_cell), formats["center_bold"], value=from_pop)
                    update_widths(widths, 0, from_pop)
                    worksheet.write_formula(current_row, 1, gate_content("--->", entry_cell), formats["center"], value="--->")
                    worksheet.write_formula(current_row, 2, gate_content(references[to_pop], entry_cell), formats["center_bold"], value=to_pop)
                    update_widths(widths, 2, to_pop)
                    # Write the attributes
                    for attribute in self.ts_attributes:
                        if isinstance(self.ts_attributes[attribute], dict):
                            if entry_tuple in self.ts_attributes[attribute]:
                                val = self.ts_attributes[attribute][entry_tuple]
                            else:
                                val = None
                        else:
                            val = self.ts_attributes[attribute]
                        if val is not None:
                            worksheet.write(current_row, attribute_index[attribute], val)
                            update_widths(widths, attribute_index[attribute], val)
                    if self.write_units:
                        worksheet.write(current_row, units_index, ts.units, format)
                        update_widths(widths, units_index, ts.units)
                        if self.allowed_units:
                            worksheet.data_validation(xlrc(current_row, units_index), {"validate": "list", "source": [x for x in self.allowed_units]})
                    if self.write_uncertainty:
                        worksheet.write(current_row, uncertainty_index, ts.sigma, formats["not_required"])
                    if self.write_assumption:
                        worksheet.write(current_row, constant_index, ts.assumption, format)
                        if len(self.tvec):
                            worksheet.write_formula(current_row, constant_index + 1, gate_content("OR", entry_cell), formats["center"], value="OR")
                            update_widths(widths, constant_index + 1, "OR")
                else:
                    worksheet.write_formula(current_row, 0, gate_content(references[from_pop], entry_cell), formats["center_bold"], value="...")
                    worksheet.write_formula(current_row, 1, gate_content("--->", entry_cell), formats["center"], value="...")
                    worksheet.write_formula(current_row, 2, gate_content(references[to_pop], entry_cell), formats["center_bold"], value="...")
                    if self.write_units:
                        worksheet.write_blank(current_row, units_index, "", format)
                        if self.allowed_units:
                            worksheet.data_validation(xlrc(current_row, units_index), {"validate": "list", "source": [x for x in self.allowed_units]})
                    if self.write_uncertainty:
                        worksheet.write_blank(current_row, uncertainty_index, "", formats["not_required"])
                    if self.write_assumption:
                        worksheet.write_blank(current_row, constant_index, "", format)
                        if len(self.tvec):
                            worksheet.write_formula(current_row, constant_index + 1, gate_content("OR", entry_cell), formats["center"], value="...")
                            update_widths(widths, constant_index + 1, "...")
                content = [None] * len(self.tvec)
                if ts:
                    for t, v in zip(ts.t, ts.vals):
                        idx = np.where(self.tvec == t)[0][0]
                        content[idx] = v
                for idx, v in enumerate(content):
                    if v is None:
                        worksheet.write_blank(current_row, offset + idx, v, format)
                    else:
                        worksheet.write(current_row, offset + idx, v, format)
                    widths[offset + idx] = max(widths[offset + idx], 7) if offset + idx in widths else 7
                if not content:
                    idx = 0
                if self.write_assumption and len(self.tvec):
                    # Conditional formatting for the assumption, depending on whether time-values were entered
                    fcn_empty_times = 'COUNTIF(%s:%s,"<>" & "")>0' % (xlrc(current_row, offset), xlrc(current_row, offset + idx))
                    worksheet.conditional_format(xlrc(current_row, constant_index), {"type": "formula", "criteria": "=" + fcn_empty_times, "format": formats["ignored"]})
                    worksheet.conditional_format(xlrc(current_row, constant_index), {"type": "formula", "criteria": "=AND(%s,NOT(ISBLANK(%s)))" % (fcn_empty_times, xlrc(current_row, constant_index)), "format": formats["ignored_warning"]})
                # Conditional formatting for the row - it has a white background if the gating cell is 'N'
                worksheet.conditional_format("%s:%s" % (xlrc(current_row, 3), xlrc(current_row, offset + idx)), {"type": "formula", "criteria": '=%s<>"Y"' % (entry_cell), "format": formats["white_bg"]})
        current_row += 2
        return current_row 
    def _write_pop_matrix(self, worksheet, start_row, formats, references: dict = None, boolean_choice=False, widths: dict = None):
        """
        Write a square matrix to Excel
        This function writes the Y/N matrix
        - Transfer matrix
        - Interactions matrix
        If ``self.enable_diagonal`` is ``False`` then the diagonal will be forced to be ``'N.A.'``. If an entry
        is specified for an entry on the diagonal and ``enable_diagonal=False``, an error will be thrown
        :param worksheet: An xlsxwriter worksheet instance
        :param start_row: The first row in which to write values
        :param formats: Format dict for the opened workbook - typically the return value of :func:`standard_formats` when the workbook was opened
        :param references: Optionally supply dict with references, used to link population names in Excel
        :param boolean_choice: If True, values will be coerced to Y/N and an Excel validation will be added
        :param widths: ``dict`` storing column widths
        :return: Tuple with ``(next_row, table_references, values_written)``. The references are used for hyperlinking to the Excel matrix
        """
        entries = self.ts
        if not references:
            references = {x: x for x in self.from_pops + self.to_pops}  # This is a null-mapping that takes say 'adults'->'adults' thus simplifying the workflow. Otherwise, it's assumed a reference exists for every node
        table_references = {}
        values_written = {}
        # Write the headers
        for i, node in enumerate(self.to_pops):
            worksheet.write_formula(start_row, i + 1, references[node], formats["center_bold"], value=node)
            update_widths(widths, i + 1, node)
        for i, node in enumerate(self.from_pops):
            worksheet.write_formula(start_row + i + 1, 0, references[node], formats["center_bold"], value=node)
            update_widths(widths, 0, node)
        # Prepare the content - first replace the dict with one keyed by index. This is because we cannot apply formatting
        # after writing content, so have to do the writing in a single pass over the entire matrix
        if boolean_choice:
            content = np.full((len(self.from_pops), len(self.to_pops)), "N", dtype=object)  # This will also coerce the value to string in preparation for writing
        else:
            content = np.full((len(self.from_pops), len(self.to_pops)), "", dtype=object)  # This will also coerce the value to string in preparation for writing
        for interaction, value in entries.items():
            from_pop, to_pop = interaction
            if not self.enable_diagonal and from_pop == to_pop:
                raise Exception("Trying to write a diagonal entry to a table that is not allowed to contain diagonal terms")  # This is because data loss will occur if the user adds entries on the diagonal, then writes the table, and then reads it back in
            from_idx = self.from_pops.index(from_pop)
            to_idx = self.to_pops.index(to_pop)
            if boolean_choice:
                value = "Y" if value else "N"
            content[from_idx, to_idx] = value
        # Write the content
        for from_idx in range(0, len(self.from_pops)):
            for to_idx in range(0, len(self.to_pops)):
                row = start_row + 1 + from_idx
                col = to_idx + 1
                if not self.enable_diagonal and self.to_pops[to_idx] == self.from_pops[from_idx]:  # Disable the diagonal if it's linking the same two quantities and that's desired
                    val = FS.DEFAULT_SYMBOL_INAPPLICABLE
                    worksheet.write(row, col, val, formats["center"])
                    worksheet.data_validation(xlrc(row, col), {"validate": "list", "source": ["N.A."]})
                else:
                    val = content[from_idx, to_idx]
                    worksheet.write(row, col, content[from_idx, to_idx], formats["center_unlocked"])
                    if boolean_choice:
                        worksheet.data_validation(xlrc(row, col), {"validate": "list", "source": ["Y", "N"]})
                        worksheet.conditional_format(xlrc(row, col), {"type": "cell", "criteria": "equal to", "value": '"Y"', "format": formats["unlocked_boolean_true"]})
                        worksheet.conditional_format(xlrc(row, col), {"type": "cell", "criteria": "equal to", "value": '"N"', "format": formats["unlocked_boolean_false"]})
                table_references[(self.from_pops[from_idx], self.to_pops[to_idx])] = xlrc(row, col, True, True)  # Store reference to this interaction
                values_written[table_references[(self.from_pops[from_idx], self.to_pops[to_idx])]] = val
        next_row = start_row + 1 + len(self.from_pops) + 1
        return next_row, table_references, values_written 
[docs]
class TimeDependentValuesEntry:
    """Table for time-dependent data entry
    This class is Databooks and Program books to enter potentially time-varying data.
    Conceptually, it maps a set of TimeSeries object to a single name and table in the
    spreadsheet. For example, a Characteristic might contain a TimeSeries for each population,
    and the resulting TimeDependentValuesEntry (TDVE) table would have a `name` matching the
    population, and TimeSeries for each population.
    The TDVE class optionally allows the specification of units, assumptions, and uncertainty,
    which each map to properties on the underlying TimeSeries objects. It also contains a
    time vector corresponding to the time values that appear or will appear in the spreadsheet.
    Note that the units are stored within the TimeSeries objects, which means that they can
    are able to differ across rows.
    :param name: The name/title for this table
    :param tvec: Specify the time values for this table. All TimeSeries in the ts dict should have corresponding time values
    :param ts: Optionally specify an odict() of TimeSeries objects populating the rows. Could be populated after
    :param allowed_units: Optionally specify a list of allowed units that will appear as a dropdown
    :param comment: Optionally specify descriptive text that will be added as a comment to the name cell
    """
    def __init__(self, name, tvec: np.array = None, ts=None, allowed_units: list = None, comment: str = None, pop_type: str = None, default_all: bool = False):
        if ts is None:
            ts = sc.odict()
        self.name = name  #: Name for th quantity printed in Excel
        self.comment = comment  #: A comment that will be added in Excel
        self.tvec = [] if tvec is None else tvec  #: time axis (e.g. np.arange(2000,2019)) - all TimeSeries time values must exactly match one of the values here
        self.ts = ts  # : dict of :class:`TimeSeries` objects
        self.allowed_units = [x.title() if x in FS.STANDARD_UNITS else x for x in allowed_units] if allowed_units is not None else None  # Otherwise, can be an odict with keys corresponding to ts - leave as None for no restriction
        self.pop_type = pop_type
        self.ts_attributes = {}  #: Dictionary containing extra attributes to write along with each TimeSeries object.
        self.ts_attributes["Provenance"] = {}  # Include provenance attribute by default
        #  Keys are attribute name, values can be either a scalar or a dict keyed by the same keys as self.ts. Compared to units, uncertainty etc.
        #  attributes are store in the TDVE rather than in the TimeSeries
        self.assumption_heading = "Constant"  #: Heading to use for assumption column
        self.write_units = None  #: Write a column for units (if None, units will be written if any of the TimeSeries have units)
        self.write_uncertainty = None  #: Write a column for uncertainty (if None, uncertainty will be written if any of the TimeSeries have uncertainty)
        self.write_assumption = None  #: Write a column for assumption/constant (if None, assumption will be written if any of the TimeSeries have an assumption)
        self.default_all = default_all  #: Record whether the framework specifies that this TDVE should default to having an 'All' row instead of population-specific rows (the user can manually modify further)
    def __repr__(self):
        output = sc.prepr(self)
        return output
    @property
    def has_data(self) -> bool:
        """
        Check whether all time series have data entered
        :return: True if all of the TimeSeries objects stored in the TDVE have data
        """
        return all([x.has_data for x in self.ts.values()])
[docs]
    @classmethod
    def from_rows(cls, rows: list):
        """
        Create new instance from Excel rows
        Given a set of openpyxl rows, instantiate a :class:`TimeDependentValuesEntry` object
        That is, the parent object e.g. :class:`ProjectData` is responsible for finding where the TDVE table is,
        and reading all of the rows associated with it (skipping ``#ignored`` rows) and then passing those rows,
        unparsed, to this function
        Headings for 'units', 'uncertainty', and 'assumption'/'constant' are optional and will be read in
        if they are present in the spreadsheet.
        :param rows: A list of rows
        :return: A new :class:`TimeDependentValuesEntry` instance
        """
        from .utils import TimeSeries  # Import here to avoid circular reference
        # Retrieve the name
        name = rows[0][0].value
        if name is None:
            raise Exception('The name of the table is missing. This can also happen if extra rows have been added without a "#ignore" entry in the first column')
        elif not sc.isstring(name):
            raise Exception("In cell %s of the spreadsheet, the name of the quantity assigned to this table needs to be a string" % rows[0][0].coordinate)
        name = name.strip()  # The name needs to be written back in a case sensitive form
        tdve = cls(name)
        known_headings = {"units", "uncertainty", "constant", "assumption"}
        headings, times, tdve.tvec = _parse_ts_header(rows[0], known_headings, skip_first=True)
        # Validate and process headings
        if not times and "constant" not in headings:
            raise Exception("Could not find an assumption or time-specific value - all tables must contain at least one of these values")
        tdve.write_units = True if "units" in headings else None
        tdve.write_uncertainty = True if "uncertainty" in headings else None
        tdve.write_assumption = True if "constant" in headings else None
        if "assumption" in headings:
            tdve.write_assumption = True
            tdve.assumption_heading = "Assumption"
        for heading in headings:
            if heading not in known_headings:
                # If it's not a known heading and it's a string, then it must be an attribute
                # Note that the way `headings` is populated by skipping i=0 ensures that the table name
                # is not interpreted as a heading
                tdve.ts_attributes[heading] = {}
        ts_entries = sc.odict()
        for row in rows[1:]:
            if row[0].value is None:
                continue
            if not row[0].data_type in {"s", "str"}:
                raise Exception("In cell %s of the spreadsheet, the name of the entry was expected to be a string, but it was not. The left-most column is expected to be a name. If you are certain the value is correct, add an single quote character at the start of the cell to ensure it remains as text" % row[0].coordinate)
            series_name = row[0].value.strip()
            if "units" in headings:
                units = cell_get_string(row[headings["units"]], allow_empty=True)
                if units is not None and units.lower().strip() in FS.STANDARD_UNITS:
                    units = units.lower().strip()  # Only lower and strip units if they are standard units
            else:
                units = None
            ts = TimeSeries(units=units)
            if "uncertainty" in headings:
                ts.sigma = cell_get_number(row[headings["uncertainty"]])
            else:
                ts.sigma = None
            if "constant" in headings:
                ts.assumption = cell_get_number(row[headings["constant"]])
            elif "assumption" in headings:
                ts.assumption = cell_get_number(row[headings["assumption"]])
            else:
                ts.assumption = None
            for attribute in tdve.ts_attributes:
                if attribute in headings:
                    # If it's a default attribute e.g. provenance, and it is missing from the databook, then don't populate it
                    tdve.ts_attributes[attribute][series_name] = row[headings[attribute]].value
            for t, idx in times.items():
                ts.insert(t, cell_get_number(row[idx]))  # If cell_get_number returns None, this gets handled accordingly by ts.insert()
            ts_entries[series_name] = ts
        tdve.ts = ts_entries
        return tdve 
[docs]
    def write(self, worksheet, start_row, formats, references: dict = None, widths: dict = None) -> int:
        """
        Write to cells in a worksheet
        Note that the year columns are drawn from the ``tvec`` attribute. To suppress the year columns (e.g. for the user to enter only an assumption)
        then set ``tvec`` to an empty array/list.
        :param worksheet: An xlsxwriter worksheet instance
        :param start_row: The first row in which to write values
        :param formats: Format dict for the opened workbook - typically the return value of :func:`standard_formats` when the workbook was opened
        :param references: References dict containing cell references for strings in the current workbook
        :param widths: ``dict`` storing column widths
        :return: The row index for the next available row for writing in the spreadsheet
        """
        assert self.assumption_heading in {"Constant", "Assumption"}, "Unsupported assumption heading"
        write_units = self.write_units if self.write_units is not None else any((ts.units is not None for ts in self.ts.values()))
        write_uncertainty = self.write_uncertainty if self.write_uncertainty is not None else any((ts.sigma is not None for ts in self.ts.values()))
        write_assumption = self.write_assumption if self.write_assumption is not None else any((ts.assumption is not None for ts in self.ts.values()))
        if not references:
            references = dict()
        current_row = start_row
        # First, assemble and write the headings
        headings = []
        headings.append(self.name)
        offset = 1  # This is the column where the time values start (after the 'or')
        # Next allocate attributes
        attribute_index = {}
        for attribute in self.ts_attributes:
            attribute_index[attribute] = offset
            headings.append(attribute)
            offset += 1
        if write_units:
            headings.append("Units")
            units_index = offset  # Column to write the units in
            offset += 1
        if write_uncertainty:
            headings.append("Uncertainty")
            uncertainty_index = offset  # Column to write the units in
            offset += 1
        if write_assumption:
            headings.append(self.assumption_heading)
            headings.append("")
            constant_index = offset
            offset += 2
        headings += [float(x) for x in self.tvec]
        for i, entry in enumerate(headings):
            worksheet.write(current_row, i, entry, formats["center_bold"])
            update_widths(widths, i, entry)
        if not pd.isna(self.comment):
            worksheet.write_comment(xlrc(current_row, 0), self.comment)
        # Now, write the TimeSeries objects - self.ts is an odict and whatever pops are present will be written in whatever order they are in
        for row_name, row_ts in self.ts.items():
            current_row += 1
            # Write the name
            if row_name in references:
                worksheet.write_formula(current_row, 0, references[row_name], formats["center_bold"], value=row_name)
                update_widths(widths, 0, row_name)
            else:
                worksheet.write_string(current_row, 0, row_name, formats["center_bold"])
                update_widths(widths, 0, row_name)
            # Write the attributes
            for attribute in self.ts_attributes:
                if isinstance(self.ts_attributes[attribute], dict):
                    if row_name in self.ts_attributes[attribute]:
                        val = self.ts_attributes[attribute][row_name]
                    else:
                        val = None
                else:
                    val = self.ts_attributes[attribute]
                if val is not None:
                    worksheet.write(current_row, attribute_index[attribute], val)
                    update_widths(widths, attribute_index[attribute], val)
            # Write the units
            if write_units:
                if row_ts.units:
                    if row_ts.units.lower().strip() in FS.STANDARD_UNITS:  # Preserve case if nonstandard unit
                        unit = row_ts.units.title().strip()
                    else:
                        unit = row_ts.units.strip()
                    worksheet.write(current_row, units_index, unit)
                    update_widths(widths, units_index, unit)
                else:
                    worksheet.write(current_row, units_index, FS.DEFAULT_SYMBOL_INAPPLICABLE)
                if self.allowed_units and isinstance(self.allowed_units, dict) and row_name in self.allowed_units:  # Add dropdown selection if there is more than one valid choice for the units
                    allowed = self.allowed_units[row_name]
                elif self.allowed_units and not isinstance(self.allowed_units, dict):
                    allowed = self.allowed_units
                else:
                    allowed = None
                if allowed:
                    worksheet.data_validation(xlrc(current_row, units_index), {"validate": "list", "source": allowed})
            if write_uncertainty:
                if row_ts.sigma is None:
                    worksheet.write(current_row, uncertainty_index, row_ts.sigma, formats["not_required"])  # NB. For now, uncertainty is always optional
                else:
                    worksheet.write(current_row, uncertainty_index, row_ts.sigma, formats["not_required"])
            if row_ts.has_data:
                format = formats["not_required"]
            else:
                format = formats["unlocked"]
            if write_assumption:
                worksheet.write(current_row, constant_index, row_ts.assumption, format)
                if len(self.tvec):
                    worksheet.write(current_row, constant_index + 1, "OR", formats["center"])
                    update_widths(widths, constant_index + 1, "OR")
            # Write the time values if they are present
            if len(self.tvec):
                content = [None] * len(self.tvec)  # Initialize an empty entry for every time in the TDVE's tvec
                for t, v in zip(row_ts.t, row_ts.vals):
                    # If the TimeSeries contains data for that time point, then insert it now
                    idx = np.where(self.tvec == t)[0]
                    if len(idx):
                        content[idx[0]] = v
                for idx, v in enumerate(content):
                    if v is None:
                        worksheet.write_blank(current_row, offset + idx, v, format)
                    else:
                        worksheet.write(current_row, offset + idx, v, format)
                    widths[offset + idx] = max(widths[offset + idx], 7) if offset + idx in widths else 7
                if write_assumption:
                    # Conditional formatting for the assumption
                    # Do this here, because after the loop above, we have easy and clear access to the range of cells to include in the formula
                    fcn_empty_times = 'COUNTIF(%s:%s,"<>" & "")>0' % (xlrc(current_row, offset), xlrc(current_row, offset + len(content) - 1))
                    # Hatched out if the cell will be ignored
                    worksheet.conditional_format(xlrc(current_row, constant_index), {"type": "formula", "criteria": "=" + fcn_empty_times, "format": formats["ignored"]})
                    worksheet.conditional_format(xlrc(current_row, constant_index), {"type": "formula", "criteria": "=AND(%s,NOT(ISBLANK(%s)))" % (fcn_empty_times, xlrc(current_row, constant_index)), "format": formats["ignored_warning"]})
        return current_row + 2  # Add two so there is a blank line after this table 
 
def _parse_ts_header(row: list, known_headings: list, skip_first=False) -> Tuple[dict, dict, np.array]:
    """
    Internal function to read TDVE/TDC timeseries tables
    :param row: A list of openpyxl Cell instances corresponding to the heading row (the one containing the
                 headers for 'constant', 'uncertainty', 'provenance' etc., and the optional year values
    :param known_headings: A list of known headings that should be lowered for further processing. These
                           would correspond to the standard/default headings
    :param skip_first: If True, the first heading cell will be skipped. This is required for TDVE tables
                       where the top left cell is the name of the variable and not a column heading
    :return: A tuple containing
                - A dictionary of heading values (from string cell types) with {heading: column_index}.
                  Any known headings will have had `str.lower()` called on them.
                - A dictionary of time values and the columns they appear in. Datetimes will be converted
                  into floats at this step (so for example a spreadsheet containing 1/1/2022 will have
                  that converted to 2022.0 at this step)
                - A numpy array with sorted time values, that can be used as a time vector
    """
    headings = {}
    times = {}
    for i, cell in enumerate(row):
        v = cell.value
        if v is None or (i == 0 and skip_first):
            continue
        elif cell.data_type in {"s", "str"}:
            v = v.strip()
            if v.startswith("#ignore"):
                break
            elif v.lower() in known_headings:
                if v.lower() in headings:
                    raise Exception(f"Duplicate heading in cell {cell.coordinate} - another cell in the same row already has the same header '{v}' (possibly in cell {get_column_letter(1+headings[v.lower()])}{cell.row})")
                headings[v.lower()] = i
            else:
                if v in headings:
                    raise Exception(f"Duplicate heading in cell {cell.coordinate} - another cell in the same row already has the same header '{v}' (possibly in cell {get_column_letter(1+headings[v.lower()])}{cell.row})")
                headings[v] = i
        elif cell.is_date:
            year = datetime_to_year(v)
            if year in times:
                raise Exception(f"Duplicate year in cell {cell.coordinate} - another cell in the same row already contains the same year ({year}) (possibly in cell {get_column_letter(1+times[year])}{cell.row})")
            times[datetime_to_year(v)] = i
        elif cell.data_type == "n":
            if v in times:
                raise Exception(f"Duplicate year in cell {cell.coordinate} - another cell in the same row already contains the same year ({v}) (possibly in cell {get_column_letter(1+times[v])}{cell.row})")
            times[v] = i
        else:
            raise Exception("Unknown data type in cell %s of the spreadsheet - quantity must be a string or a number" % cell.coordinate)
    return headings, times, np.array(sorted(times), dtype=float)
[docs]
def cell_get_string(cell, allow_empty=False) -> str:
    """
    Return string value from cell
    This function checks if a cell contains a string. If it does, the stripped value
    will be returned. Otherwise, an informative error will be raised
    Note that the string type is determined from the cell's value rather than
    the openpyxl cell data type.
    :param cell: An openpyxl cell
    :return: A string with the contents of the cell
    """
    if cell.value is None and allow_empty:
        return None
    elif not sc.isstring(cell.value):
        raise Exception("Cell %s needs to contain a string (i.e. not a number, date, or other cell type)" % cell.coordinate)
    else:
        return cell.value.strip() 
[docs]
def cell_get_number(cell, dtype=float):
    """
    Return numeric value from cell
    This function is to guard against accidentally having the Excel cell contain a string
    instead of a number. If a string has been entered, an error will be raised. The added value
    from this function is that if the Excel cell type is empty but the value is
    empty or ``N.A.`` then the value will be treated as though the cell was correctly set to a
    numeric type but had been left empty.
    The output is cast to ``dtype`` which means that code that requires numeric input from Excel
    can use this input to guarantee that the resulting number is of the correct type, or ``None``.
    :param cell: An openpyxl cell
    :param dtype: If the cell is numeric, cast to this type (default is `float` but could be `int` for example)
    :return: A scalar instance of ``dtype`` (e.g. ``float``) or ``None`` if cell is empty or being treated as empty
    :raises: :class:`Exception` if the cell contains a string
    """
    if cell.value is None:
        return None
    elif cell.data_type == "n":  # Numeric type
        return dtype(cell.value)
    elif cell.data_type == "s":  # Only do relatively expensive string processing if it's actually a string type
        s = cell.value.lower().strip()
        if s == FS.DEFAULT_SYMBOL_INAPPLICABLE:
            return None
        elif not s.replace("-", ""):
            return None
    raise Exception("Cell %s needs to contain a number" % cell.coordinate) 
[docs]
def validate_category(workbook, expected_category) -> None:
    """
    Check Atomica workbook type
    This function makes sure that a workbook has a particular category property
    stored within it, and displays an appropriate error message if not. If the
    category isn't present or doesn't start with 'atomica', just ignore it for
    robustness (instead, a parsing error will likely be raised)
    :param workbook: An openpyxl workbook
    :param category: The expected string category
    :raises: :class:`Exception` if the workbook category is not valid
    """
    category = workbook.properties.category
    if category and sc.isstring(category) and category.startswith("atomica:"):
        if category.strip() != expected_category.strip():
            expected_type = expected_category.split(":")[1].title()
            actual_type = category.split(":")[1].title()
            message = "Error loading %s - the provided file was a %s file" % (expected_type, actual_type)
            raise Exception(message)