# -*- coding: utf-8 -*-
"""
Miscellaneous utility functions for Excel files
This module implements utility functions for Excel functionality that is common
to different kinds of spreadsheets used in Atomica (e.g. Databooks and Program Books).
For example, Excel formatting, and time-varying data entry tables, are implemented here.
"""
from xlsxwriter.utility import xl_rowcol_to_cell as xlrc
import sciris as sc
import io
import numpy as np
from .system import FrameworkSettings as FS
import pandas as pd
from .utils import format_duration, datetime_to_year
import xlsxwriter
from typing import Tuple
from openpyxl.utils import get_column_letter
__all__ = ["standard_formats", "apply_widths", "update_widths", "transfer_comments", "copy_sheet", "read_tables", "read_dataframes", "TimeDependentConnections", "TimeDependentValuesEntry", "cell_get_string", "cell_get_number", "validate_category"]
# Suppress known warning in Openpyxl
# Warnings are:
# - C:\ProgramData\Miniconda3\envs\atomica37\lib\site-packages\openpyxl\worksheet\_reader.py:300: UserWarning: Conditional Formatting extension is not supported and will be removed
# warn(msg)
# - C:\ProgramData\Miniconda3\envs\atomica37\lib\site-packages\openpyxl\worksheet\_reader.py:300: UserWarning: Data Validation extension is not supported and will be removed
# warn(msg)
# This means that conditional formatting and data valuation rules aren't being loaded, but since `data_only=True` these don't matter and can be safely ignored
import warnings
warnings.filterwarnings(action="ignore", category=UserWarning, module="openpyxl.worksheet", lineno=300)
[docs]
def apply_widths(worksheet: xlsxwriter.workbook.Worksheet, width_dict: dict) -> None:
"""
Set column widths
Given a dictionary of precomputed character counts (typically the maximum number of characters in
any cell written to each column), set the width of the column in Excel accordingly. The Excel
column width is slightly larger than the character count.
:param worksheet: A Worksheet instance
:param width_dict: A dictionary of character widths to use for each column (by index)
"""
for idx, width in width_dict.items():
worksheet.set_column(idx, idx, width * 1.1 + 1)
[docs]
def update_widths(width_dict: dict, column_index: int, contents: str) -> None:
"""
Keep track of required width for a column
``width_dict`` is a dict that is keyed by column index e.g. 0,1,2
and the value is the length of the longest contents seen for that column
:param width_dict: Storage dictionary
:param column_index: Index of the column value has been inserted in
:param contents: Content, length of which is used to set width
"""
if width_dict is None or contents is None or not sc.isstring(contents):
return
if len(contents) == 0:
return
if column_index not in width_dict:
width_dict[column_index] = len(contents)
else:
width_dict[column_index] = max(width_dict[column_index], len(contents))
[docs]
def copy_sheet(source: str, sheet_name: str, workbook: xlsxwriter.Workbook) -> None:
"""
Copy a sheet into a Workbook
This function allows values to be copied from a file into a Workbook. The
main use case is to support custom sheets in databooks that are not otherwise parsed
but which might need to be retained. In particular, the ``ProjectData`` class does
not parse ignored sheets at all, because no guarantees can be made about the quantity and
type of the content, and whether there are formulas etc. that would be lost. In some cases
though, it may be necessary to edit the databook and preserve specific sheets. In general,
this can be done by using the ``to_workbook()`` method and then manually performing whatever
operations are necessary to preserve the content on the extra sheets. However, when the
extra sheet content is extremely simple e.g. just a table of values, then this helper
function can be used to facilitate copying the content.
Warning - note that Excel functions, formatting, and comments will NOT be preserved.
:param source: File name of the spreadsheet to read the source sheet from or an ``sc.Spreadsheet`` instance
:param sheet_name: Name of the sheet to write to
:param workbook: A Workbook instance to add the sheet to
:return: None - the sheet will be added to the Workbook in-place
"""
import openpyxl
if sc.isstring(source):
source = sc.Spreadsheet(source)
src_workbook = openpyxl.load_workbook(source.tofile(), read_only=True, data_only=True) # Load in read-only mode for performance, since we don't parse comments etc.
src_worksheet = src_workbook[sheet_name]
dst_worksheet = workbook.add_worksheet(sheet_name)
for i, row in enumerate(src_worksheet.rows):
for j, cell in enumerate(row):
dst_worksheet.write(i, j, cell.value)
src_workbook.close()
[docs]
def read_tables(worksheet) -> Tuple[list, list]:
"""
Read tables from sheet
:param worksheet: An openpyxl worksheet
:return: A tuple containing - A list of tables (which is a list of rows, terminated by an empty row in the original spreadsheet),
and a list of start row indices for each table read in
"""
# This function takes in a openpyxl worksheet, and returns tables
# A table consists of a block of rows with any #ignore rows skipped
# This function will start at the top of the worksheet, read rows into a buffer
# until it gets to the first entirely empty row
# And then returns the contents of that buffer as a table. So a table is a list of openpyxl rows
# This function continues until it has exhausted all of the rows in the sheet
buffer = []
tables = []
start_rows = []
start = None
for i, row in enumerate(worksheet.rows):
# Determine whether to skip the row, add it to the buffer, or flush buffer into a table
flush = False
for j, cell in enumerate(row):
if cell.value:
if cell.data_type == "s" and cell.value.startswith("#ignore"):
if j == 0:
break # If #ignore is encountered in the first column, skip the row and continue parsing the table
else:
flush = True
break # If #ignore is encoutered after preceding blank cells
else:
# Read the row into the buffer and continue parsing the table
if not buffer:
start = i + 1 # Excel rows are indexed starting at 1
buffer.append(row)
break # If the cell has a value in it, continue parsing the table
else:
if buffer:
flush = True
if flush:
tables.append(buffer) # Only append the buffer if it is not empty
start_rows.append(start)
buffer = []
# After the last row, if the buffer has some un-flushed contents, then include it in the last table
if buffer:
tables.append(buffer)
start_rows.append(start)
return tables, start_rows
[docs]
def read_dataframes(worksheet, merge=False) -> list:
"""
Read dataframes from sheet
This function operates similarly to ``read_tables`` except it returns Dataframes instead of
cells. This enables the dataframes to be constructed more quickly, at the expense of being
able to track the cell references and row numbers. These are shown for databooks (via ``read_tables``)
but not for frameworks (which go via ``read_dataframes``)
:param worksheet: An openpyxl worksheet
:param merge: If False (default) then blank rows will be used to split the dataframes. If True, only one
DataFrame will be returned
:return: A list of DataFrames
"""
if worksheet.max_row is None or worksheet.max_column is None:
worksheet.calculate_dimension(force=True)
content = np.empty((worksheet.max_row, worksheet.max_column), dtype="object")
ignore = np.zeros((worksheet.max_row), dtype=bool)
empty = np.zeros((worksheet.max_row), dtype=bool) # True for index where a new table begins
for i, row in enumerate(worksheet.rows):
any_values = False # Set True if this row contains any values
for j, cell in enumerate(row):
v = cell.value
if cell.data_type == "s":
if not any_values and v.startswith("#ignore"):
# If we encounter a #ignore and it's the first content in the row
if j == 0:
# If it's the first cell, ignore the row (i.e., do NOT treat it as a blank row)
ignore[i] = True # Ignore the row
break
elif v.startswith("#ignore"):
# Skip the rest of the row
content[i, j:] = None
break
else:
v = v.strip()
any_values = any_values or bool(v) # If it's a string type, call strip() before checking truthiness
elif v is not None:
any_values = True
content[i, j] = v
if not any_values:
empty[i] = True
tables = []
if merge:
ignore[empty] = True
if all(ignore):
return []
tables.append(content[~ignore, :])
else:
# A change from False to True means that we need to start a new table
# A True followed by a True doesn't start a new table but instead gets ignored
content = content[~ignore, :]
empty = empty[~ignore]
# If there is no content at all, return immediately
if all(empty):
return []
idx = []
start = None
for i in range(len(empty)):
# Check for the start of a table
if not empty[i] and start is None:
start = i
# Check for the end of a table
if empty[i] and start is not None:
# If we were reading a table and have reached an empty row
idx.append((start, i))
start = None
elif i + 1 == len(empty) and start is not None:
# If we were reading a table and have reached the end of the data
idx.append((start, i + 1))
start = None
tables = []
for start, stop in idx:
tables.append(content[start:stop].copy()) # Use .copy() so that we don't retain any references to the original full array outside this function
dfs = []
for table in tables:
df = pd.DataFrame(table)
df.dropna(axis=1, how="all", inplace=True)
df.columns = df.iloc[0]
df = df[1:]
dfs.append(df)
return dfs
[docs]
class TimeDependentConnections:
"""
Structure for reading/writing interactions
A :class:`TimeDependentConnections` object is suitable when there are time dependent interactions between two quantities
This class is used for both transfers and interactions. The content that it writes consists of
- A connection matrix table that has Y/N selection of which interactions are present between two things
- A set of pairwise connections specifying to, from, units, assumption, and time
Interactions can have a diagonal, whereas transfers cannot (e.g. a population can infect itself but cannot transfer to itself).
In Excel, a :class:`TimeDependentConnections` maps to three tables
1. A table to enter the code name and full name
2. An interactions matrix with Y/N indicating whether an interaction between two populations exists
3. A set of rows for entering time varying data for each pair of populations
:param code_name: the code name of this quantity e.g. 'aging'
:param full_name: the full name of this quantity e.g. 'Aging'
:param tvec: time values for the time-dependent rows
:param pops: list of strings to use as the rows and columns - these are typically lists of population code names
:param type: 'transfer' or 'interaction'. A transfer cannot have diagonal entries, and can have Number or Probability formats. An Interaction can have
diagonal entries and only has N.A. formats
:param ts: Optionally specify a dict containing all of the non-empty TimeSeries objects used. The format is ``{(from_pop, to_pop):TimeSeries}``.
An interaction can only be Y/N for clarity, if it is Y then a row is displayed for the TimeSeries. Actually, the Y/N can be
decided in the first instance based on the provided TimeSeries i.e. if a TimeSeries is provided for an interaction, then the
interaction must have been marked with Y
:param pop_type: Specify pop_type, which is used by :meth:`ProjectData.add_pop` to determine which TDCs to add new populations to
"""
def __init__(self, code_name: str, full_name: str, tvec: np.array, from_pops: list, to_pops: list, interpop_type: str, ts: dict = None, from_pop_type: str = None, to_pop_type: str = None):
self.code_name = code_name
self.full_name = full_name
self.type = interpop_type
self.from_pop_type = from_pop_type
self.from_pops = from_pops
self.to_pop_type = to_pop_type
self.to_pops = to_pops
self.tvec = tvec
self.ts = ts if ts is not None else sc.odict()
self.attributes = {} #: Attributes associated with the table
self.ts_attributes = {} #: Attributes associated with each TimeSeries row
self.ts_attributes["Provenance"] = {} # Include provenance attribute by default
self.assumption_heading = "Constant" #: Heading to use for assumption column
self.write_units = None #: Write a column for units (if None, units will be written if any of the TimeSeries have units)
self.write_uncertainty = None #: Write a column for units (if None, units will be written if any of the TimeSeries have uncertainty)
self.write_assumption = None #: Write a column for units (if None, units will be written if any of the TimeSeries have an assumption)
if self.type == "transfer":
self.enable_diagonal = False
self.allowed_units = []
self.allowed_units.append("%s (%s)" % (FS.QUANTITY_TYPE_NUMBER.title(), format_duration(1, pluralize=True)))
self.allowed_units.append("%s (per %s)" % (FS.QUANTITY_TYPE_RATE.title(), format_duration(1, pluralize=False)))
self.allowed_units.append("%s (%s)" % (FS.QUANTITY_TYPE_DURATION.title(), format_duration(1, pluralize=True)))
elif self.type == "interaction":
self.enable_diagonal = True
self.allowed_units = [FS.DEFAULT_SYMBOL_INAPPLICABLE]
else:
raise Exception('Unknown TimeDependentConnections type - must be "transfer" or "interaction"')
def __repr__(self):
return '<TDC %s "%s">' % (self.type.title(), self.code_name)
[docs]
@classmethod
def from_tables(cls, tables: list, interaction_type):
"""
Instantiate based on list of tables
This method instantiates and initializes a new :class:`TimeDependentConnections` object from
tables that have been read in using :func:`read_tables`. Note that the parent object
such as :class:`ProjectData` is responsible for extracting the tables and passing them
to this function. For instance, the transfers sheet might contain more than one set of
tables, so it is the calling function's responsibility to split those tables up into
the groups of three expected by this method.
:param tables: A list of tables. A table here is a list of rows, and a row is a list of cells.
:param interaction_type: A string identifying the interaction type - either 'transfer' or 'interaction'
:return: A new :class:`TimeDependentConnections` instance
"""
from .utils import TimeSeries # Import here to avoid circular reference
assert interaction_type in {"transfer", "interaction"}, "Unknown interaction type"
# Read the TDC definition table (including attributes)
code_name = None
full_name = None
from_pop_type = None
to_pop_type = None
# Start by reading the TDC header row specifying the name and pop types
# This table also contains only a single row. Any subsequent rows will automatically be ignored
attributes = {}
for header_cell, value_cell in zip(tables[0][0], tables[0][1]):
header = cell_get_string(header_cell, allow_empty=True)
if header is None:
continue
elif header.startswith("#ignore"):
break
lowered_header = header.lower()
if lowered_header == "abbreviation":
code_name = cell_get_string(value_cell)
elif lowered_header == "full name":
full_name = cell_get_string(value_cell)
elif lowered_header == "from population type":
from_pop_type = cell_get_string(value_cell, True)
elif lowered_header == "to population type":
to_pop_type = cell_get_string(value_cell, True)
else:
attributes[header] = value_cell.value
if interaction_type == "transfer":
assert from_pop_type == to_pop_type, "Transfers can only occur between populations of the same type"
if code_name is None:
raise Exception("Code name/abbreviation missing")
if full_name is None:
raise Exception("Full name missing")
# Read the pops from the Y/N table. The Y/N content of the table depends on the timeseries objects that
# are present. That is, if the Y/N matrix contains a Y then a TimeSeries must be read in, and vice versa.
# Therefore, we don't actually parse the matrix, and instead just read in all the TimeSeries instances
# that are defined and infer the matrix that way. However, we do still need to parse the to_pops and from_pops
# These both get parsed because for interactions across pop types they could be different. Blank cells in either
# the row or the column signal the end of parsing those population lists
# Read the to_pops from the first row of the table, until a blank cell is encountered
to_pops = []
for cell in tables[1][0][1:]:
if cell.value is None or (cell.data_type == "s" and cell.value.startswith("#ignore")):
break
else:
to_pops.append(cell.value)
# Read the from_pops from the first column, until a blank cell is encountered. Note that a #ignore
# can never be present in the first column, because this would cause the row to be skipped prior to this point
from_pops = []
for row in tables[1][1:]:
if row[0].value is None:
break
else:
from_pops.append(row[0].value)
# Instantiate the TDC object based on the metadata from the first two tables
tdc = cls(code_name, full_name, None, from_pops=from_pops, to_pops=to_pops, interpop_type=interaction_type, from_pop_type=from_pop_type, to_pop_type=to_pop_type)
tdc.attributes = attributes
known_headings = {"from population", "to population", "units", "uncertainty", "constant", "assumption"}
headings, times, tdc.tvec = _parse_ts_header(tables[2][0], known_headings, skip_first=False)
# Validate and process headings
if not times and "constant" not in headings:
raise Exception("Could not find an assumption or time-specific value - all tables must contain at least one of these values")
tdc.write_units = True if "units" in headings else None
tdc.write_uncertainty = True if "uncertainty" in headings else None
tdc.write_assumption = True if "constant" in headings else None
if "assumption" in headings:
tdc.write_assumption = True
tdc.assumption_heading = "Assumption"
for heading in headings:
if heading not in known_headings:
tdc.ts_attributes[heading] = {}
tdc.ts = sc.odict()
for row in tables[2][1:]:
if row[0].value != "...":
assert row[0].value in from_pops, 'Population "%s" not found - should be contained in %s' % (row[0].value, from_pops)
assert row[2].value in to_pops, 'Population "%s" not found - should be contained in %s' % (row[2].value, to_pops)
vals = [x.value for x in row]
from_pop = vals[0]
to_pop = vals[2]
if "units" in headings:
units = cell_get_string(row[headings["units"]], allow_empty=True)
if units is not None and units.lower().strip() in FS.STANDARD_UNITS:
units = units.lower().strip() # Only lower and strip units if they are standard units
else:
units = None
ts = TimeSeries(units=units)
if "uncertainty" in headings:
ts.sigma = cell_get_number(row[headings["uncertainty"]])
else:
ts.sigma = None
if "constant" in headings:
ts.assumption = cell_get_number(row[headings["constant"]])
elif "assumption" in headings:
ts.assumption = cell_get_number(row[headings["assumption"]])
else:
ts.assumption = None
for attribute in tdc.ts_attributes:
if attribute in headings:
tdc.ts_attributes[attribute][(from_pop, to_pop)] = row[headings[attribute]].value
for t, idx in times.items():
ts.insert(t, cell_get_number(row[idx])) # If cell_get_number returns None, this gets handled accordingly by ts.insert()
tdc.ts[(from_pop, to_pop)] = ts
return tdc
[docs]
def write(self, worksheet, start_row, formats, references: dict = None, widths: dict = None) -> int:
"""
Write to cells in a worksheet
:param worksheet: An xlsxwriter worksheet instance
:param start_row: The first row in which to write values
:param formats: Format dict for the opened workbook - typically the return value of :func:`standard_formats` when the workbook was opened
:param references: References dict containing cell references for strings in the current workbook
:param widths: ``dict`` storing column widths
:return: The row index for the next available row for writing in the spreadsheet
"""
assert self.assumption_heading in {"Constant", "Assumption"}, "Unsupported assumption heading"
write_units = self.write_units if self.write_units is not None else any((ts.units is not None for ts in self.ts.values()))
write_uncertainty = self.write_uncertainty if self.write_uncertainty is not None else any((ts.sigma is not None for ts in self.ts.values()))
write_assumption = self.write_assumption if self.write_assumption is not None else any((ts.assumption is not None for ts in self.ts.values()))
if not references:
references = {x: x for x in self.from_pops + self.to_pops} # Default null mapping for populations
# First, write the name entry table
current_row = start_row
column = 0
worksheet.write(current_row, column, "Abbreviation", formats["center_bold"])
update_widths(widths, column, "Abbreviation")
worksheet.write(current_row + 1, column, self.code_name)
update_widths(widths, column, self.code_name)
column += 1
worksheet.write(current_row, column, "Full Name", formats["center_bold"])
update_widths(widths, column, "Full Name")
worksheet.write(current_row + 1, column, self.full_name)
update_widths(widths, column, self.full_name)
column += 1
worksheet.write(current_row, column, "From population type", formats["center_bold"])
update_widths(widths, column, "From population type")
worksheet.write(current_row + 1, column, self.from_pop_type)
update_widths(widths, column, self.from_pop_type)
column += 1
worksheet.write(current_row, column, "To population type", formats["center_bold"])
update_widths(widths, column, "To population type")
worksheet.write(current_row + 1, column, self.to_pop_type)
update_widths(widths, column, self.to_pop_type)
for attribute, value in self.attributes.items():
column += 1
worksheet.write(current_row, column, attribute, formats["center_bold"])
update_widths(widths, column, attribute)
worksheet.write(current_row + 1, column, value)
update_widths(widths, column, value)
# Then, write the Y/N matrix
current_row += 3 # Leave a blank row below the matrix
# Note - table_references are local to this TimeDependentConnections instance
# For example, there could be two transfers, and each of them could potentially transfer between 0-4 and 5-14
# so the worksheet might contain two references from 0-4 to 5-14 but they would be for different transfers and thus
# the time-dependent rows would depend on different boolean table cells
current_row, table_references, values_written = self._write_pop_matrix(worksheet, current_row, formats, references, boolean_choice=True, widths=widths)
# Finally, write the time dependent part
headings = []
headings.append("From population")
headings.append("") # --->
headings.append("To population")
offset = len(headings)
attribute_index = {}
for attribute in self.ts_attributes:
attribute_index[attribute] = offset
headings.append(attribute)
offset += 1
if write_units:
headings.append("Units")
units_index = offset # Column to write the units in
offset += 1
if write_uncertainty:
headings.append("Uncertainty")
uncertainty_index = offset # Column to write the units in
offset += 1
if write_assumption:
headings.append(self.assumption_heading)
headings.append("")
constant_index = offset
offset += 1
if len(self.tvec):
offset += 1 # Additional offset for the 'OR' column
headings += [float(x) for x in self.tvec]
for i, entry in enumerate(headings):
worksheet.write(current_row, i, entry, formats["center_bold"])
update_widths(widths, i, entry)
# Now, we will write a wrapper that gates the content
# If the gating cell is 'Y', then the content will be displayed, otherwise not
def gate_content(content, gating_cell):
if content.startswith("="): # If this is itself a reference
return '=IF(%s="Y",%s,"...")' % (gating_cell, content[1:])
else:
return '=IF(%s="Y","%s","...")' % (gating_cell, content)
for from_idx in range(0, len(self.from_pops)):
for to_idx in range(0, len(self.to_pops)):
current_row += 1
from_pop = self.from_pops[from_idx]
to_pop = self.to_pops[to_idx]
entry_tuple = (from_pop, to_pop)
entry_cell = table_references[entry_tuple]
# Write hyperlink
if values_written[entry_cell] != FS.DEFAULT_SYMBOL_INAPPLICABLE:
worksheet.write_url(entry_cell, "internal:%s!%s" % (worksheet.name, xlrc(current_row, 1)), cell_format=formats["center_unlocked"], string=values_written[entry_cell])
worksheet.write_url(xlrc(current_row, 1), "internal:%s!%s" % (worksheet.name, entry_cell), cell_format=formats["center_unlocked"])
if entry_tuple in self.ts:
ts = self.ts[entry_tuple]
format = formats["not_required"]
else:
ts = None
format = formats["unlocked"]
if ts:
worksheet.write_formula(current_row, 0, gate_content(references[from_pop], entry_cell), formats["center_bold"], value=from_pop)
update_widths(widths, 0, from_pop)
worksheet.write_formula(current_row, 1, gate_content("--->", entry_cell), formats["center"], value="--->")
worksheet.write_formula(current_row, 2, gate_content(references[to_pop], entry_cell), formats["center_bold"], value=to_pop)
update_widths(widths, 2, to_pop)
# Write the attributes
for attribute in self.ts_attributes:
if isinstance(self.ts_attributes[attribute], dict):
if entry_tuple in self.ts_attributes[attribute]:
val = self.ts_attributes[attribute][entry_tuple]
else:
val = None
else:
val = self.ts_attributes[attribute]
if val is not None:
worksheet.write(current_row, attribute_index[attribute], val)
update_widths(widths, attribute_index[attribute], val)
if self.write_units:
worksheet.write(current_row, units_index, ts.units, format)
update_widths(widths, units_index, ts.units)
if self.allowed_units:
worksheet.data_validation(xlrc(current_row, units_index), {"validate": "list", "source": [x for x in self.allowed_units]})
if self.write_uncertainty:
worksheet.write(current_row, uncertainty_index, ts.sigma, formats["not_required"])
if self.write_assumption:
worksheet.write(current_row, constant_index, ts.assumption, format)
if len(self.tvec):
worksheet.write_formula(current_row, constant_index + 1, gate_content("OR", entry_cell), formats["center"], value="OR")
update_widths(widths, constant_index + 1, "OR")
else:
worksheet.write_formula(current_row, 0, gate_content(references[from_pop], entry_cell), formats["center_bold"], value="...")
worksheet.write_formula(current_row, 1, gate_content("--->", entry_cell), formats["center"], value="...")
worksheet.write_formula(current_row, 2, gate_content(references[to_pop], entry_cell), formats["center_bold"], value="...")
if self.write_units:
worksheet.write_blank(current_row, units_index, "", format)
if self.allowed_units:
worksheet.data_validation(xlrc(current_row, units_index), {"validate": "list", "source": [x for x in self.allowed_units]})
if self.write_uncertainty:
worksheet.write_blank(current_row, uncertainty_index, "", formats["not_required"])
if self.write_assumption:
worksheet.write_blank(current_row, constant_index, "", format)
if len(self.tvec):
worksheet.write_formula(current_row, constant_index + 1, gate_content("OR", entry_cell), formats["center"], value="...")
update_widths(widths, constant_index + 1, "...")
content = [None] * len(self.tvec)
if ts:
for t, v in zip(ts.t, ts.vals):
idx = np.where(self.tvec == t)[0][0]
content[idx] = v
for idx, v in enumerate(content):
if v is None:
worksheet.write_blank(current_row, offset + idx, v, format)
else:
worksheet.write(current_row, offset + idx, v, format)
widths[offset + idx] = max(widths[offset + idx], 7) if offset + idx in widths else 7
if not content:
idx = 0
if self.write_assumption and len(self.tvec):
# Conditional formatting for the assumption, depending on whether time-values were entered
fcn_empty_times = 'COUNTIF(%s:%s,"<>" & "")>0' % (xlrc(current_row, offset), xlrc(current_row, offset + idx))
worksheet.conditional_format(xlrc(current_row, constant_index), {"type": "formula", "criteria": "=" + fcn_empty_times, "format": formats["ignored"]})
worksheet.conditional_format(xlrc(current_row, constant_index), {"type": "formula", "criteria": "=AND(%s,NOT(ISBLANK(%s)))" % (fcn_empty_times, xlrc(current_row, constant_index)), "format": formats["ignored_warning"]})
# Conditional formatting for the row - it has a white background if the gating cell is 'N'
worksheet.conditional_format("%s:%s" % (xlrc(current_row, 3), xlrc(current_row, offset + idx)), {"type": "formula", "criteria": '=%s<>"Y"' % (entry_cell), "format": formats["white_bg"]})
current_row += 2
return current_row
def _write_pop_matrix(self, worksheet, start_row, formats, references: dict = None, boolean_choice=False, widths: dict = None):
"""
Write a square matrix to Excel
This function writes the Y/N matrix
- Transfer matrix
- Interactions matrix
If ``self.enable_diagonal`` is ``False`` then the diagonal will be forced to be ``'N.A.'``. If an entry
is specified for an entry on the diagonal and ``enable_diagonal=False``, an error will be thrown
:param worksheet: An xlsxwriter worksheet instance
:param start_row: The first row in which to write values
:param formats: Format dict for the opened workbook - typically the return value of :func:`standard_formats` when the workbook was opened
:param references: Optionally supply dict with references, used to link population names in Excel
:param boolean_choice: If True, values will be coerced to Y/N and an Excel validation will be added
:param widths: ``dict`` storing column widths
:return: Tuple with ``(next_row, table_references, values_written)``. The references are used for hyperlinking to the Excel matrix
"""
entries = self.ts
if not references:
references = {x: x for x in self.from_pops + self.to_pops} # This is a null-mapping that takes say 'adults'->'adults' thus simplifying the workflow. Otherwise, it's assumed a reference exists for every node
table_references = {}
values_written = {}
# Write the headers
for i, node in enumerate(self.to_pops):
worksheet.write_formula(start_row, i + 1, references[node], formats["center_bold"], value=node)
update_widths(widths, i + 1, node)
for i, node in enumerate(self.from_pops):
worksheet.write_formula(start_row + i + 1, 0, references[node], formats["center_bold"], value=node)
update_widths(widths, 0, node)
# Prepare the content - first replace the dict with one keyed by index. This is because we cannot apply formatting
# after writing content, so have to do the writing in a single pass over the entire matrix
if boolean_choice:
content = np.full((len(self.from_pops), len(self.to_pops)), "N", dtype=object) # This will also coerce the value to string in preparation for writing
else:
content = np.full((len(self.from_pops), len(self.to_pops)), "", dtype=object) # This will also coerce the value to string in preparation for writing
for interaction, value in entries.items():
from_pop, to_pop = interaction
if not self.enable_diagonal and from_pop == to_pop:
raise Exception("Trying to write a diagonal entry to a table that is not allowed to contain diagonal terms") # This is because data loss will occur if the user adds entries on the diagonal, then writes the table, and then reads it back in
from_idx = self.from_pops.index(from_pop)
to_idx = self.to_pops.index(to_pop)
if boolean_choice:
value = "Y" if value else "N"
content[from_idx, to_idx] = value
# Write the content
for from_idx in range(0, len(self.from_pops)):
for to_idx in range(0, len(self.to_pops)):
row = start_row + 1 + from_idx
col = to_idx + 1
if not self.enable_diagonal and self.to_pops[to_idx] == self.from_pops[from_idx]: # Disable the diagonal if it's linking the same two quantities and that's desired
val = FS.DEFAULT_SYMBOL_INAPPLICABLE
worksheet.write(row, col, val, formats["center"])
worksheet.data_validation(xlrc(row, col), {"validate": "list", "source": ["N.A."]})
else:
val = content[from_idx, to_idx]
worksheet.write(row, col, content[from_idx, to_idx], formats["center_unlocked"])
if boolean_choice:
worksheet.data_validation(xlrc(row, col), {"validate": "list", "source": ["Y", "N"]})
worksheet.conditional_format(xlrc(row, col), {"type": "cell", "criteria": "equal to", "value": '"Y"', "format": formats["unlocked_boolean_true"]})
worksheet.conditional_format(xlrc(row, col), {"type": "cell", "criteria": "equal to", "value": '"N"', "format": formats["unlocked_boolean_false"]})
table_references[(self.from_pops[from_idx], self.to_pops[to_idx])] = xlrc(row, col, True, True) # Store reference to this interaction
values_written[table_references[(self.from_pops[from_idx], self.to_pops[to_idx])]] = val
next_row = start_row + 1 + len(self.from_pops) + 1
return next_row, table_references, values_written
[docs]
class TimeDependentValuesEntry:
"""Table for time-dependent data entry
This class is Databooks and Program books to enter potentially time-varying data.
Conceptually, it maps a set of TimeSeries object to a single name and table in the
spreadsheet. For example, a Characteristic might contain a TimeSeries for each population,
and the resulting TimeDependentValuesEntry (TDVE) table would have a `name` matching the
population, and TimeSeries for each population.
The TDVE class optionally allows the specification of units, assumptions, and uncertainty,
which each map to properties on the underlying TimeSeries objects. It also contains a
time vector corresponding to the time values that appear or will appear in the spreadsheet.
Note that the units are stored within the TimeSeries objects, which means that they can
are able to differ across rows.
:param name: The name/title for this table
:param tvec: Specify the time values for this table. All TimeSeries in the ts dict should have corresponding time values
:param ts: Optionally specify an odict() of TimeSeries objects populating the rows. Could be populated after
:param allowed_units: Optionally specify a list of allowed units that will appear as a dropdown
:param comment: Optionally specify descriptive text that will be added as a comment to the name cell
"""
def __init__(self, name, tvec: np.array = None, ts=None, allowed_units: list = None, comment: str = None, pop_type: str = None):
if ts is None:
ts = sc.odict()
self.name = name #: Name for th quantity printed in Excel
self.comment = comment #: A comment that will be added in Excel
self.tvec = [] if tvec is None else tvec #: time axis (e.g. np.arange(2000,2019)) - all TimeSeries time values must exactly match one of the values here
self.ts = ts # : dict of :class:`TimeSeries` objects
self.allowed_units = [x.title() if x in FS.STANDARD_UNITS else x for x in allowed_units] if allowed_units is not None else None # Otherwise, can be an odict with keys corresponding to ts - leave as None for no restriction
self.pop_type = pop_type
self.ts_attributes = {} #: Dictionary containing extra attributes to write along with each TimeSeries object.
self.ts_attributes["Provenance"] = {} # Include provenance attribute by default
# Keys are attribute name, values can be either a scalar or a dict keyed by the same keys as self.ts. Compared to units, uncertainty etc.
# attributes are store in the TDVE rather than in the TimeSeries
self.assumption_heading = "Constant" #: Heading to use for assumption column
self.write_units = None #: Write a column for units (if None, units will be written if any of the TimeSeries have units)
self.write_uncertainty = None #: Write a column for uncertainty (if None, uncertainty will be written if any of the TimeSeries have uncertainty)
self.write_assumption = None #: Write a column for assumption/constant (if None, assumption will be written if any of the TimeSeries have an assumption)
def __repr__(self):
output = sc.prepr(self)
return output
@property
def has_data(self) -> bool:
"""
Check whether all time series have data entered
:return: True if all of the TimeSeries objects stored in the TDVE have data
"""
return all([x.has_data for x in self.ts.values()])
[docs]
@classmethod
def from_rows(cls, rows: list):
"""
Create new instance from Excel rows
Given a set of openpyxl rows, instantiate a :class:`TimeDependentValuesEntry` object
That is, the parent object e.g. :class:`ProjectData` is responsible for finding where the TDVE table is,
and reading all of the rows associated with it (skipping ``#ignored`` rows) and then passing those rows,
unparsed, to this function
Headings for 'units', 'uncertainty', and 'assumption'/'constant' are optional and will be read in
if they are present in the spreadsheet.
:param rows: A list of rows
:return: A new :class:`TimeDependentValuesEntry` instance
"""
from .utils import TimeSeries # Import here to avoid circular reference
# Retrieve the name
name = rows[0][0].value
if name is None:
raise Exception('The name of the table is missing. This can also happen if extra rows have been added without a "#ignore" entry in the first column')
elif not sc.isstring(name):
raise Exception("In cell %s of the spreadsheet, the name of the quantity assigned to this table needs to be a string" % rows[0][0].coordinate)
name = name.strip() # The name needs to be written back in a case sensitive form
tdve = cls(name)
known_headings = {"units", "uncertainty", "constant", "assumption"}
headings, times, tdve.tvec = _parse_ts_header(rows[0], known_headings, skip_first=True)
# Validate and process headings
if not times and "constant" not in headings:
raise Exception("Could not find an assumption or time-specific value - all tables must contain at least one of these values")
tdve.write_units = True if "units" in headings else None
tdve.write_uncertainty = True if "uncertainty" in headings else None
tdve.write_assumption = True if "constant" in headings else None
if "assumption" in headings:
tdve.write_assumption = True
tdve.assumption_heading = "Assumption"
for heading in headings:
if heading not in known_headings:
# If it's not a known heading and it's a string, then it must be an attribute
# Note that the way `headings` is populated by skipping i=0 ensures that the table name
# is not interpreted as a heading
tdve.ts_attributes[heading] = {}
ts_entries = sc.odict()
for row in rows[1:]:
if row[0].value is None:
continue
if not row[0].data_type in {"s", "str"}:
raise Exception("In cell %s of the spreadsheet, the name of the entry was expected to be a string, but it was not. The left-most column is expected to be a name. If you are certain the value is correct, add an single quote character at the start of the cell to ensure it remains as text" % row[0].coordinate)
series_name = row[0].value.strip()
if "units" in headings:
units = cell_get_string(row[headings["units"]], allow_empty=True)
if units is not None and units.lower().strip() in FS.STANDARD_UNITS:
units = units.lower().strip() # Only lower and strip units if they are standard units
else:
units = None
ts = TimeSeries(units=units)
if "uncertainty" in headings:
ts.sigma = cell_get_number(row[headings["uncertainty"]])
else:
ts.sigma = None
if "constant" in headings:
ts.assumption = cell_get_number(row[headings["constant"]])
elif "assumption" in headings:
ts.assumption = cell_get_number(row[headings["assumption"]])
else:
ts.assumption = None
for attribute in tdve.ts_attributes:
if attribute in headings:
# If it's a default attribute e.g. provenance, and it is missing from the databook, then don't populate it
tdve.ts_attributes[attribute][series_name] = row[headings[attribute]].value
for t, idx in times.items():
ts.insert(t, cell_get_number(row[idx])) # If cell_get_number returns None, this gets handled accordingly by ts.insert()
ts_entries[series_name] = ts
tdve.ts = ts_entries
return tdve
[docs]
def write(self, worksheet, start_row, formats, references: dict = None, widths: dict = None) -> int:
"""
Write to cells in a worksheet
Note that the year columns are drawn from the ``tvec`` attribute. To suppress the year columns (e.g. for the user to enter only an assumption)
then set ``tvec`` to an empty array/list.
:param worksheet: An xlsxwriter worksheet instance
:param start_row: The first row in which to write values
:param formats: Format dict for the opened workbook - typically the return value of :func:`standard_formats` when the workbook was opened
:param references: References dict containing cell references for strings in the current workbook
:param widths: ``dict`` storing column widths
:return: The row index for the next available row for writing in the spreadsheet
"""
assert self.assumption_heading in {"Constant", "Assumption"}, "Unsupported assumption heading"
write_units = self.write_units if self.write_units is not None else any((ts.units is not None for ts in self.ts.values()))
write_uncertainty = self.write_uncertainty if self.write_uncertainty is not None else any((ts.sigma is not None for ts in self.ts.values()))
write_assumption = self.write_assumption if self.write_assumption is not None else any((ts.assumption is not None for ts in self.ts.values()))
if not references:
references = dict()
current_row = start_row
# First, assemble and write the headings
headings = []
headings.append(self.name)
offset = 1 # This is the column where the time values start (after the 'or')
# Next allocate attributes
attribute_index = {}
for attribute in self.ts_attributes:
attribute_index[attribute] = offset
headings.append(attribute)
offset += 1
if write_units:
headings.append("Units")
units_index = offset # Column to write the units in
offset += 1
if write_uncertainty:
headings.append("Uncertainty")
uncertainty_index = offset # Column to write the units in
offset += 1
if write_assumption:
headings.append(self.assumption_heading)
headings.append("")
constant_index = offset
offset += 2
headings += [float(x) for x in self.tvec]
for i, entry in enumerate(headings):
worksheet.write(current_row, i, entry, formats["center_bold"])
update_widths(widths, i, entry)
if not pd.isna(self.comment):
worksheet.write_comment(xlrc(current_row, 0), self.comment)
# Now, write the TimeSeries objects - self.ts is an odict and whatever pops are present will be written in whatever order they are in
for row_name, row_ts in self.ts.items():
current_row += 1
# Write the name
if row_name in references:
worksheet.write_formula(current_row, 0, references[row_name], formats["center_bold"], value=row_name)
update_widths(widths, 0, row_name)
else:
worksheet.write_string(current_row, 0, row_name, formats["center_bold"])
update_widths(widths, 0, row_name)
# Write the attributes
for attribute in self.ts_attributes:
if isinstance(self.ts_attributes[attribute], dict):
if row_name in self.ts_attributes[attribute]:
val = self.ts_attributes[attribute][row_name]
else:
val = None
else:
val = self.ts_attributes[attribute]
if val is not None:
worksheet.write(current_row, attribute_index[attribute], val)
update_widths(widths, attribute_index[attribute], val)
# Write the units
if write_units:
if row_ts.units:
if row_ts.units.lower().strip() in FS.STANDARD_UNITS: # Preserve case if nonstandard unit
unit = row_ts.units.title().strip()
else:
unit = row_ts.units.strip()
worksheet.write(current_row, units_index, unit)
update_widths(widths, units_index, unit)
else:
worksheet.write(current_row, units_index, FS.DEFAULT_SYMBOL_INAPPLICABLE)
if self.allowed_units and isinstance(self.allowed_units, dict) and row_name in self.allowed_units: # Add dropdown selection if there is more than one valid choice for the units
allowed = self.allowed_units[row_name]
elif self.allowed_units and not isinstance(self.allowed_units, dict):
allowed = self.allowed_units
else:
allowed = None
if allowed:
worksheet.data_validation(xlrc(current_row, units_index), {"validate": "list", "source": allowed})
if write_uncertainty:
if row_ts.sigma is None:
worksheet.write(current_row, uncertainty_index, row_ts.sigma, formats["not_required"]) # NB. For now, uncertainty is always optional
else:
worksheet.write(current_row, uncertainty_index, row_ts.sigma, formats["not_required"])
if row_ts.has_data:
format = formats["not_required"]
else:
format = formats["unlocked"]
if write_assumption:
worksheet.write(current_row, constant_index, row_ts.assumption, format)
if len(self.tvec):
worksheet.write(current_row, constant_index + 1, "OR", formats["center"])
update_widths(widths, constant_index + 1, "OR")
# Write the time values if they are present
if len(self.tvec):
content = [None] * len(self.tvec) # Initialize an empty entry for every time in the TDVE's tvec
for t, v in zip(row_ts.t, row_ts.vals):
# If the TimeSeries contains data for that time point, then insert it now
idx = np.where(self.tvec == t)[0]
if len(idx):
content[idx[0]] = v
for idx, v in enumerate(content):
if v is None:
worksheet.write_blank(current_row, offset + idx, v, format)
else:
worksheet.write(current_row, offset + idx, v, format)
widths[offset + idx] = max(widths[offset + idx], 7) if offset + idx in widths else 7
if write_assumption:
# Conditional formatting for the assumption
# Do this here, because after the loop above, we have easy and clear access to the range of cells to include in the formula
fcn_empty_times = 'COUNTIF(%s:%s,"<>" & "")>0' % (xlrc(current_row, offset), xlrc(current_row, offset + len(content) - 1))
# Hatched out if the cell will be ignored
worksheet.conditional_format(xlrc(current_row, constant_index), {"type": "formula", "criteria": "=" + fcn_empty_times, "format": formats["ignored"]})
worksheet.conditional_format(xlrc(current_row, constant_index), {"type": "formula", "criteria": "=AND(%s,NOT(ISBLANK(%s)))" % (fcn_empty_times, xlrc(current_row, constant_index)), "format": formats["ignored_warning"]})
return current_row + 2 # Add two so there is a blank line after this table
def _parse_ts_header(row: list, known_headings: list, skip_first=False) -> Tuple[dict, dict, np.array]:
"""
Internal function to read TDVE/TDC timeseries tables
:param row: A list of openpyxl Cell instances corresponding to the heading row (the one containing the
headers for 'constant', 'uncertainty', 'provenance' etc., and the optional year values
:param known_headings: A list of known headings that should be lowered for further processing. These
would correspond to the standard/default headings
:param skip_first: If True, the first heading cell will be skipped. This is required for TDVE tables
where the top left cell is the name of the variable and not a column heading
:return: A tuple containing
- A dictionary of heading values (from string cell types) with {heading: column_index}.
Any known headings will have had `str.lower()` called on them.
- A dictionary of time values and the columns they appear in. Datetimes will be converted
into floats at this step (so for example a spreadsheet containing 1/1/2022 will have
that converted to 2022.0 at this step)
- A numpy array with sorted time values, that can be used as a time vector
"""
headings = {}
times = {}
for i, cell in enumerate(row):
v = cell.value
if v is None or (i == 0 and skip_first):
continue
elif cell.data_type in {"s", "str"}:
v = v.strip()
if v.startswith("#ignore"):
break
elif v.lower() in known_headings:
if v.lower() in headings:
raise Exception(f"Duplicate heading in cell {cell.coordinate} - another cell in the same row already has the same header '{v}' (possibly in cell {get_column_letter(1+headings[v.lower()])}{cell.row})")
headings[v.lower()] = i
else:
if v in headings:
raise Exception(f"Duplicate heading in cell {cell.coordinate} - another cell in the same row already has the same header '{v}' (possibly in cell {get_column_letter(1+headings[v.lower()])}{cell.row})")
headings[v] = i
elif cell.is_date:
year = datetime_to_year(v)
if year in times:
raise Exception(f"Duplicate year in cell {cell.coordinate} - another cell in the same row already contains the same year ({year}) (possibly in cell {get_column_letter(1+times[year])}{cell.row})")
times[datetime_to_year(v)] = i
elif cell.data_type == "n":
if v in times:
raise Exception(f"Duplicate year in cell {cell.coordinate} - another cell in the same row already contains the same year ({v}) (possibly in cell {get_column_letter(1+times[v])}{cell.row})")
times[v] = i
else:
raise Exception("Unknown data type in cell %s of the spreadsheet - quantity must be a string or a number" % cell.coordinate)
return headings, times, np.array(sorted(times), dtype=float)
[docs]
def cell_get_string(cell, allow_empty=False) -> str:
"""
Return string value from cell
This function checks if a cell contains a string. If it does, the stripped value
will be returned. Otherwise, an informative error will be raised
Note that the string type is determined from the cell's value rather than
the openpyxl cell data type.
:param cell: An openpyxl cell
:return: A string with the contents of the cell
"""
if cell.value is None and allow_empty:
return None
elif not sc.isstring(cell.value):
raise Exception("Cell %s needs to contain a string (i.e. not a number, date, or other cell type)" % cell.coordinate)
else:
return cell.value.strip()
[docs]
def cell_get_number(cell, dtype=float):
"""
Return numeric value from cell
This function is to guard against accidentally having the Excel cell contain a string
instead of a number. If a string has been entered, an error will be raised. The added value
from this function is that if the Excel cell type is empty but the value is
empty or ``N.A.`` then the value will be treated as though the cell was correctly set to a
numeric type but had been left empty.
The output is cast to ``dtype`` which means that code that requires numeric input from Excel
can use this input to guarantee that the resulting number is of the correct type, or ``None``.
:param cell: An openpyxl cell
:param dtype: If the cell is numeric, cast to this type (default is `float` but could be `int` for example)
:return: A scalar instance of ``dtype`` (e.g. ``float``) or ``None`` if cell is empty or being treated as empty
:raises: :class:`Exception` if the cell contains a string
"""
if cell.value is None:
return None
elif cell.data_type == "n": # Numeric type
return dtype(cell.value)
elif cell.data_type == "s": # Only do relatively expensive string processing if it's actually a string type
s = cell.value.lower().strip()
if s == FS.DEFAULT_SYMBOL_INAPPLICABLE:
return None
elif not s.replace("-", ""):
return None
raise Exception("Cell %s needs to contain a number" % cell.coordinate)
[docs]
def validate_category(workbook, expected_category) -> None:
"""
Check Atomica workbook type
This function makes sure that a workbook has a particular category property
stored within it, and displays an appropriate error message if not. If the
category isn't present or doesn't start with 'atomica', just ignore it for
robustness (instead, a parsing error will likely be raised)
:param workbook: An openpyxl workbook
:param category: The expected string category
:raises: :class:`Exception` if the workbook category is not valid
"""
category = workbook.properties.category
if category and sc.isstring(category) and category.startswith("atomica:"):
if category.strip() != expected_category.strip():
expected_type = expected_category.split(":")[1].title()
actual_type = category.split(":")[1].title()
message = "Error loading %s - the provided file was a %s file" % (expected_type, actual_type)
raise Exception(message)