Source code for sierra.core.utils

# Copyright 2018 John Harwell, All rights reserved.
#
#  SPDX-License-Identifier: MIT

"""Miscellaneous bits used in mutiple places but that don't fit anywhere else."""

# Core packages
import typing as tp
import time
import logging
import pickle
import functools
import pathlib

# 3rd party packages
import numpy as np
import polars as pl
from retry import retry

# Project packages
from sierra.core.vector import Vector3D
from sierra.core.experiment import definition
from sierra.core import types, config
from sierra.core import plugin as pm


[docs] class ArenaExtent: """Representation of a 2D or 3D section/chunk/volume of the arena."""
[docs] @staticmethod def from_corners(ll: Vector3D, ur: Vector3D) -> "ArenaExtent": """Initialize an extent via LL and UR corners. As opposed to an origin and a set of dimensions. """ return ArenaExtent(ur - ll, ll)
def __init__(self, dims: Vector3D, origin: Vector3D = Vector3D.ZERO) -> None: self._origin = origin self.dims = dims self.ll = origin self.ur = origin + dims self.center = origin + dims / 2.0 def contains(self, pt: Vector3D) -> bool: return pt >= self.ll and pt <= self.ur def area(self) -> float: return self.dims.x * self.dims.y def xsize(self) -> int: return self.dims.x def ysize(self) -> int: return self.dims.y def zsize(self) -> int: return self.dims.z def origin(self) -> Vector3D: return self._origin def __str__(self) -> str: return str(self.dims) + "@" + str(self._origin)
[docs] class Sigmoid: r""" Sigmoid activation function. .. math:: f(x) = \frac{1}{1+e^{-x}} """ def __init__(self, x: float) -> None: self.x = x def __call__(self) -> float: if self.x < 0: # Equivalent, and numerically stable for large negative # exponents. If you don't case the sigmoid, you get overflow errors # at runtime. return 1.0 - 1.0 / (1 + np.exp(self.x)) return 1.0 / (1 + np.exp(-self.x))
[docs] class ReLu: r""" Rectified Linear Unit (ReLU) activation function. .. math:: \begin{aligned} f(x) = max(0,x) &= x \textit{if} x > 0 &= 0 \textit{else} \end{aligned} """ def __init__(self, x: float): self.x = x def __call__(self): return max(0, self.x)
def scale_minmax(minval: float, maxval: float, val: float) -> float: """ Scale values from range [minval, maxval] -> [-1,1]. .. math:: -1 + (value - minval) * (1 - \frac{-1}{maxval - minval}) """ return -1.0 + (val - minval) * (1 - (-1)) / (maxval - minval)
[docs] def dir_create_checked(path: tp.Union[pathlib.Path, str], exist_ok: bool) -> None: """Create a directory idempotently. If the directory exists and it shouldn't, raise an error. """ if not isinstance(path, pathlib.Path): path = pathlib.Path(path) try: path.mkdir(exist_ok=exist_ok, parents=True) except FileExistsError: logging.fatal("%s already exists! Not overwriting", str(path)) raise
[docs] def path_exists(path: tp.Union[pathlib.Path, str]) -> bool: """ Check if a path exists, trying multiple times. This is necessary for working on HPC systems where if a given directory/filesystem is under heavy pressure the first check or two might time out as the FS goes and executes the query over the network. """ res = [] if not isinstance(path, pathlib.Path): path = pathlib.Path(path) for _ in range(0, 10): if path.exists(): res.append(True) else: res.append(False) time.sleep(0.001) return max(set(res), key=res.count)
[docs] def get_primary_axis(criteria, primary_axis_bc: list, cmdopts: types.Cmdopts) -> int: """ Determine axis in a bivariate batch criteria is the primary axis. This is obtained on a per-query basis depending on the query context, or can be overriden on the cmdline. """ if cmdopts["plot_primary_axis"] == 0: return 0 if cmdopts["plot_primary_axis"] == 1: return 1 if any(isinstance(criteria.criterias[1], elt) for elt in primary_axis_bc): return 0 return 1
[docs] def exp_range_calc( exp_range: str, root_dir: pathlib.Path, dirnames: list[str] ) -> types.PathList: """ Get the range of experiments to run/do stuff with. SUPER USEFUL. """ exp_all = [root_dir / d for d in dirnames] if exp_range is not None: min_exp, max_exp = (int(x) for x in exp_range.split(":")) assert ( min_exp <= max_exp ), f"Min batch exp >= max batch exp({min_exp} vs. {max_exp})" return exp_all[min_exp : max_exp + 1] return exp_all
[docs] def exp_include_filter(inc_spec: tp.Optional[str], target: list, n_exps: int): """Calculate which experiments to include in a calculation for something. Take a input list of experiment numbers to include, and returns the sublist specified by the inc_spec (of the form [x:y]). inc_spec is an `absolute` specification; if a given performance measure excludes exp0 then that case is handled internally so that array/list shapes work out when generating graphs if this function is used consistently everywhere. """ if inc_spec is None: start = None end = None else: r = inc_spec.split(":") start = int(r[0]) end = len(target) if r[1] == "" else int(r[1]) if len(target) < n_exps: # Handle perf measures which exclude exp0 by default start -= 1 return target[slice(start, end, None)]
def bivar_exp_labels_calc( exp_dirs: list[str], ) -> tuple[list[str], list[str]]: """ Calculate the labels for bivariant experiment graphs. """ # Because sets are used, if a sub-range of experiments are selected for # collation, the selected range has to be an even multiple of the # of # experiments in the second batch criteria, or inter-experiment graph # generation won't work (the final CSV is always an MxN grid). xlabels_set = set() ylabels_set = set() for e in exp_dirs: pair = e.name.split("+") xlabels_set.add(pair[0]) ylabels_set.add(pair[1]) xlabels = sorted(xlabels_set) ylabels = sorted(ylabels_set) return (xlabels, ylabels)
[docs] def apply_to_expdef(var, exp_def: definition.BaseExpDef) -> tuple[ tp.Optional[definition.ElementRmList], tp.Optional[definition.ElementAddList], tp.Optional[definition.AttrChangeSet], ]: """ Apply a generated expdef modifictions to an experiment definition. In this order: #. Remove existing expdef elements #. Add new expdef elements #. Change existing expdef element attributes """ rmsl = var.gen_tag_rmlist() # type: tp.List[definition.ElementRmList] addsl = var.gen_element_addlist() # type: tp.List[definition.ElementAddList] chgsl = var.gen_attr_changelist() # type: tp.List[definition.AttrChangeSet] if rmsl: rms = rmsl[0] for r in rms: exp_def.element_remove(r.path, r.tag) else: rms = None if addsl: adds = addsl[0] for a in adds: assert a.path is not None, "Can't add tag {a.tag} with no parent" exp_def.element_add(a.path, a.tag, a.attr, a.allow_dup) else: adds = None if chgsl: chgs = chgsl[0] for c in chgs: if not isinstance(c, definition.NullMod): exp_def.attr_change(c.path, c.attr, c.value) else: chgs = None return rms, adds, chgs
[docs] def pickle_modifications( adds: tp.Optional[definition.ElementAddList], chgs: tp.Optional[definition.AttrChangeSet], path: pathlib.Path, ) -> None: """ After applying expdef modifications, pickle changes for later retrieval. """ if adds is not None: adds.pickle(path) if chgs is not None: chgs.pickle(path)
[docs] def exp_template_path( cmdopts: types.Cmdopts, batch_input_root: pathlib.Path, dirname: str ) -> pathlib.Path: """Calculate the path to the template input file in the batch experiment root. The file at this path will be Used as the de-facto template for generating per-run input files. """ template = pathlib.Path(cmdopts["expdef_template"]) return batch_input_root / dirname / template.stem
[docs] def get_n_agents( main_config: types.YAMLDict, cmdopts: types.Cmdopts, exp_input_root: pathlib.Path, exp_def: definition.BaseExpDef, ) -> tp.Optional[int]: """ Get the # agents used for a specific :term:`Experiment`. """ module1 = pm.pipeline.get_plugin_module(cmdopts["engine"]) # Get the number of agents to send to shell cmds generator. We try: # # 1. Getting it from the current experiment definition, which contains all # changes to the template input file EXCEPT those from batch criteria, # which have already been written and pickled at this point. # # 2. Getting it from the pickled experiment definition (i.e., from the # batch criteria which was used for this experiment). if hasattr(module1, "population_size_from_def"): n_agents = module1.population_size_from_def(exp_def, main_config, cmdopts) module2 = pm.pipeline.get_plugin_module(cmdopts["expdef"]) if n_agents <= 0: pkl_def = module2.unpickle(exp_input_root / config.PICKLE_LEAF) if hasattr(module1, "population_size_from_pickle"): n_agents = module1.population_size_from_pickle( pkl_def, main_config, cmdopts ) if n_agents <= 0: raise RuntimeError("n_agents must be > 0") return n_agents return None
[docs] def df_fill(df: pl.DataFrame, policy: str) -> pl.DataFrame: """ Fill missing cells in a dataframe according to the specified fill policy. """ if policy == "none": return df if policy == "pad": return df.select([pl.col(col).forward_fill() for col in df.columns]) if policy == "zero": return df.fill_null(0) raise RuntimeError(f"Bad fill policy {policy}")
@retry(OSError, tries=10, delay=0.100, backoff=1.1) def pickle_dump(obj: object, f) -> None: pickle.dump(obj, f) def gen_scenario_spec(cmdopts: types.Cmdopts, **kwargs) -> dict[str, tp.Any]: # scenario is passed in kwargs during stage 5 (can't be passed via # --scenario in general ) scenario = kwargs.get("scenario", cmdopts["scenario"]) module = pm.module_load_tiered( project=cmdopts["project"], path="generators.scenario" ) return module.to_dict(scenario) def sphinx_ref(ref: str) -> str: try: # This is kind of a hack... if __sphinx_build_man__: # type: ignore[name-defined] parts = ref.split(".") stripped = parts[-1] return stripped[:-1] except NameError: pass return ref utf8open = functools.partial(open, encoding="UTF-8") """ Explictly specify that the type of file being opened is UTF-8, which is should be for almost everything in SIERRA. """ __all__ = [ "ArenaExtent", "ReLu", "Sigmoid", "apply_to_expdef", "df_fill", "dir_create_checked", "exp_include_filter", "exp_range_calc", "exp_template_path", "get_n_agents", "get_primary_axis", "path_exists", "pickle_modifications", "utf8open", ]