# Copyright 2018 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""Miscellaneous bits used in mutiple places but that don't fit anywhere else."""
# Core packages
import typing as tp
import time
import logging
import pickle
import functools
import pathlib
# 3rd party packages
import numpy as np
import polars as pl
from retry import retry
# Project packages
from sierra.core.vector import Vector3D
from sierra.core.experiment import definition
from sierra.core import types, config
from sierra.core import plugin as pm
[docs]
class ArenaExtent:
"""Representation of a 2D or 3D section/chunk/volume of the arena."""
[docs]
@staticmethod
def from_corners(ll: Vector3D, ur: Vector3D) -> "ArenaExtent":
"""Initialize an extent via LL and UR corners.
As opposed to an origin and a set of dimensions.
"""
return ArenaExtent(ur - ll, ll)
def __init__(self, dims: Vector3D, origin: Vector3D = Vector3D.ZERO) -> None:
self._origin = origin
self.dims = dims
self.ll = origin
self.ur = origin + dims
self.center = origin + dims / 2.0
def contains(self, pt: Vector3D) -> bool:
return pt >= self.ll and pt <= self.ur
def area(self) -> float:
return self.dims.x * self.dims.y
def xsize(self) -> int:
return self.dims.x
def ysize(self) -> int:
return self.dims.y
def zsize(self) -> int:
return self.dims.z
def origin(self) -> Vector3D:
return self._origin
def __str__(self) -> str:
return str(self.dims) + "@" + str(self._origin)
[docs]
class Sigmoid:
r"""
Sigmoid activation function.
.. math::
f(x) = \frac{1}{1+e^{-x}}
"""
def __init__(self, x: float) -> None:
self.x = x
def __call__(self) -> float:
if self.x < 0:
# Equivalent, and numerically stable for large negative
# exponents. If you don't case the sigmoid, you get overflow errors
# at runtime.
return 1.0 - 1.0 / (1 + np.exp(self.x))
return 1.0 / (1 + np.exp(-self.x))
[docs]
class ReLu:
r"""
Rectified Linear Unit (ReLU) activation function.
.. math::
\begin{aligned}
f(x) = max(0,x) &= x \textit{if} x > 0
&= 0 \textit{else}
\end{aligned}
"""
def __init__(self, x: float):
self.x = x
def __call__(self):
return max(0, self.x)
def scale_minmax(minval: float, maxval: float, val: float) -> float:
"""
Scale values from range [minval, maxval] -> [-1,1].
.. math::
-1 + (value - minval) * (1 - \frac{-1}{maxval - minval})
"""
return -1.0 + (val - minval) * (1 - (-1)) / (maxval - minval)
[docs]
def dir_create_checked(path: tp.Union[pathlib.Path, str], exist_ok: bool) -> None:
"""Create a directory idempotently.
If the directory exists and it shouldn't, raise an error.
"""
if not isinstance(path, pathlib.Path):
path = pathlib.Path(path)
try:
path.mkdir(exist_ok=exist_ok, parents=True)
except FileExistsError:
logging.fatal("%s already exists! Not overwriting", str(path))
raise
[docs]
def path_exists(path: tp.Union[pathlib.Path, str]) -> bool:
"""
Check if a path exists, trying multiple times.
This is necessary for working on HPC systems where if a given
directory/filesystem is under heavy pressure the first check or two might
time out as the FS goes and executes the query over the network.
"""
res = []
if not isinstance(path, pathlib.Path):
path = pathlib.Path(path)
for _ in range(0, 10):
if path.exists():
res.append(True)
else:
res.append(False)
time.sleep(0.001)
return max(set(res), key=res.count)
[docs]
def get_primary_axis(criteria, primary_axis_bc: list, cmdopts: types.Cmdopts) -> int:
"""
Determine axis in a bivariate batch criteria is the primary axis.
This is obtained on a per-query basis depending on the query context, or can
be overriden on the cmdline.
"""
if cmdopts["plot_primary_axis"] == 0:
return 0
if cmdopts["plot_primary_axis"] == 1:
return 1
if any(isinstance(criteria.criterias[1], elt) for elt in primary_axis_bc):
return 0
return 1
[docs]
def exp_range_calc(
exp_range: str, root_dir: pathlib.Path, dirnames: list[str]
) -> types.PathList:
"""
Get the range of experiments to run/do stuff with. SUPER USEFUL.
"""
exp_all = [root_dir / d for d in dirnames]
if exp_range is not None:
min_exp, max_exp = (int(x) for x in exp_range.split(":"))
assert (
min_exp <= max_exp
), f"Min batch exp >= max batch exp({min_exp} vs. {max_exp})"
return exp_all[min_exp : max_exp + 1]
return exp_all
[docs]
def exp_include_filter(inc_spec: tp.Optional[str], target: list, n_exps: int):
"""Calculate which experiments to include in a calculation for something.
Take a input list of experiment numbers to include, and returns the sublist
specified by the inc_spec (of the form [x:y]). inc_spec is an `absolute`
specification; if a given performance measure excludes exp0 then that case
is handled internally so that array/list shapes work out when generating
graphs if this function is used consistently everywhere.
"""
if inc_spec is None:
start = None
end = None
else:
r = inc_spec.split(":")
start = int(r[0])
end = len(target) if r[1] == "" else int(r[1])
if len(target) < n_exps: # Handle perf measures which exclude exp0 by default
start -= 1
return target[slice(start, end, None)]
def bivar_exp_labels_calc(
exp_dirs: list[str],
) -> tuple[list[str], list[str]]:
"""
Calculate the labels for bivariant experiment graphs.
"""
# Because sets are used, if a sub-range of experiments are selected for
# collation, the selected range has to be an even multiple of the # of
# experiments in the second batch criteria, or inter-experiment graph
# generation won't work (the final CSV is always an MxN grid).
xlabels_set = set()
ylabels_set = set()
for e in exp_dirs:
pair = e.name.split("+")
xlabels_set.add(pair[0])
ylabels_set.add(pair[1])
xlabels = sorted(xlabels_set)
ylabels = sorted(ylabels_set)
return (xlabels, ylabels)
[docs]
def apply_to_expdef(var, exp_def: definition.BaseExpDef) -> tuple[
tp.Optional[definition.ElementRmList],
tp.Optional[definition.ElementAddList],
tp.Optional[definition.AttrChangeSet],
]:
"""
Apply a generated expdef modifictions to an experiment definition.
In this order:
#. Remove existing expdef elements
#. Add new expdef elements
#. Change existing expdef element attributes
"""
rmsl = var.gen_tag_rmlist() # type: tp.List[definition.ElementRmList]
addsl = var.gen_element_addlist() # type: tp.List[definition.ElementAddList]
chgsl = var.gen_attr_changelist() # type: tp.List[definition.AttrChangeSet]
if rmsl:
rms = rmsl[0]
for r in rms:
exp_def.element_remove(r.path, r.tag)
else:
rms = None
if addsl:
adds = addsl[0]
for a in adds:
assert a.path is not None, "Can't add tag {a.tag} with no parent"
exp_def.element_add(a.path, a.tag, a.attr, a.allow_dup)
else:
adds = None
if chgsl:
chgs = chgsl[0]
for c in chgs:
if not isinstance(c, definition.NullMod):
exp_def.attr_change(c.path, c.attr, c.value)
else:
chgs = None
return rms, adds, chgs
[docs]
def pickle_modifications(
adds: tp.Optional[definition.ElementAddList],
chgs: tp.Optional[definition.AttrChangeSet],
path: pathlib.Path,
) -> None:
"""
After applying expdef modifications, pickle changes for later retrieval.
"""
if adds is not None:
adds.pickle(path)
if chgs is not None:
chgs.pickle(path)
[docs]
def exp_template_path(
cmdopts: types.Cmdopts, batch_input_root: pathlib.Path, dirname: str
) -> pathlib.Path:
"""Calculate the path to the template input file in the batch experiment root.
The file at this path will be Used as the de-facto template for generating
per-run input files.
"""
template = pathlib.Path(cmdopts["expdef_template"])
return batch_input_root / dirname / template.stem
[docs]
def get_n_agents(
main_config: types.YAMLDict,
cmdopts: types.Cmdopts,
exp_input_root: pathlib.Path,
exp_def: definition.BaseExpDef,
) -> tp.Optional[int]:
"""
Get the # agents used for a specific :term:`Experiment`.
"""
module1 = pm.pipeline.get_plugin_module(cmdopts["engine"])
# Get the number of agents to send to shell cmds generator. We try:
#
# 1. Getting it from the current experiment definition, which contains all
# changes to the template input file EXCEPT those from batch criteria,
# which have already been written and pickled at this point.
#
# 2. Getting it from the pickled experiment definition (i.e., from the
# batch criteria which was used for this experiment).
if hasattr(module1, "population_size_from_def"):
n_agents = module1.population_size_from_def(exp_def, main_config, cmdopts)
module2 = pm.pipeline.get_plugin_module(cmdopts["expdef"])
if n_agents <= 0:
pkl_def = module2.unpickle(exp_input_root / config.PICKLE_LEAF)
if hasattr(module1, "population_size_from_pickle"):
n_agents = module1.population_size_from_pickle(
pkl_def, main_config, cmdopts
)
if n_agents <= 0:
raise RuntimeError("n_agents must be > 0")
return n_agents
return None
[docs]
def df_fill(df: pl.DataFrame, policy: str) -> pl.DataFrame:
"""
Fill missing cells in a dataframe according to the specified fill policy.
"""
if policy == "none":
return df
if policy == "pad":
return df.select([pl.col(col).forward_fill() for col in df.columns])
if policy == "zero":
return df.fill_null(0)
raise RuntimeError(f"Bad fill policy {policy}")
@retry(OSError, tries=10, delay=0.100, backoff=1.1)
def pickle_dump(obj: object, f) -> None:
pickle.dump(obj, f)
def gen_scenario_spec(cmdopts: types.Cmdopts, **kwargs) -> dict[str, tp.Any]:
# scenario is passed in kwargs during stage 5 (can't be passed via
# --scenario in general )
scenario = kwargs.get("scenario", cmdopts["scenario"])
module = pm.module_load_tiered(
project=cmdopts["project"], path="generators.scenario"
)
return module.to_dict(scenario)
def sphinx_ref(ref: str) -> str:
try:
# This is kind of a hack...
if __sphinx_build_man__: # type: ignore[name-defined]
parts = ref.split(".")
stripped = parts[-1]
return stripped[:-1]
except NameError:
pass
return ref
utf8open = functools.partial(open, encoding="UTF-8")
"""
Explictly specify that the type of file being opened is UTF-8, which is should
be for almost everything in SIERRA.
"""
__all__ = [
"ArenaExtent",
"ReLu",
"Sigmoid",
"apply_to_expdef",
"df_fill",
"dir_create_checked",
"exp_include_filter",
"exp_range_calc",
"exp_template_path",
"get_n_agents",
"get_primary_axis",
"path_exists",
"pickle_modifications",
"utf8open",
]