# Copyright 2018 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""Miscellaneous bits used in mutiple places but that don't fit anywhere else.
"""
# Core packages
import typing as tp
import time
import logging
import pickle
import functools
import pathlib
# 3rd party packages
import numpy as np
import pandas as pd
from retry import retry
# Project packages
from sierra.core.vector import Vector3D
from sierra.core.experiment import xml, definition
from sierra.core import types, config
from sierra.core import plugin_manager as pm
[docs]class ArenaExtent():
"""Representation of a 2D or 3D section/chunk/volume of the arena."""
[docs] @staticmethod
def from_corners(ll: Vector3D, ur: Vector3D) -> 'ArenaExtent':
"""Initialize an extent via LL and UR corners.
As opposed to an origin and a set of dimensions.
"""
return ArenaExtent(ur - ll, ll)
[docs] def __init__(self, dims: Vector3D, origin: Vector3D = Vector3D()) -> None:
self._origin = origin
self.dims = dims
self.ll = origin
self.ur = origin + dims
self.center = origin + dims / 2.0
[docs] def contains(self, pt: Vector3D) -> bool:
return pt >= self.ll and pt <= self.ur
[docs] def area(self) -> float:
return self.dims.x * self.dims.y
[docs] def xsize(self) -> int:
return self.dims.x
[docs] def ysize(self) -> int:
return self.dims.y
[docs] def zsize(self) -> int:
return self.dims.z
[docs] def origin(self) -> Vector3D:
return self._origin
[docs] def __str__(self) -> str:
return str(self.dims) + '@' + str(self._origin)
[docs]class Sigmoid():
"""
Sigmoid activation function.
.. math::
f(x) = \frac{1}{1+e^{-x}}
"""
[docs] def __init__(self, x: float) -> None:
self.x = x
[docs] def __call__(self) -> float:
if self.x < 0:
# Equivalent, and numerically stable for large negative
# exponents. If you don't case the sigmoid, you get overflow errors
# at runtime.
return 1.0 - 1.0 / (1 + np.exp(self.x)) # type: ignore
else:
return 1.0 / (1 + np.exp(-self.x)) # type: ignore
[docs]class ReLu():
r"""
Rectified Linear Unit (ReLU) activation function.
.. math::
\begin{aligned}
f(x) = max(0,x) &= x \textit{if} x > 0
&= 0 \textit{else}
\end{aligned}
"""
[docs] def __init__(self, x: float):
self.x = x
[docs] def __call__(self):
return max(0, self.x)
def scale_minmax(minval: float, maxval: float, val: float) -> float:
"""
Scale values from range [minval, maxval] -> [-1,1].
.. math::
-1 + (value - minval) * (1 - \frac{-1}{maxval - minval})
"""
return -1.0 + (val - minval) * (1 - (-1)) / (maxval - minval)
[docs]def dir_create_checked(path: tp.Union[pathlib.Path, str],
exist_ok: bool) -> None:
"""Create a directory idempotently.
If the directory exists and it shouldn't, raise an error.
"""
if not isinstance(path, pathlib.Path):
path = pathlib.Path(path)
try:
path.mkdir(exist_ok=exist_ok, parents=True)
except FileExistsError:
logging.fatal("%s already exists! Not overwriting", str(path))
raise
[docs]def path_exists(path: tp.Union[pathlib.Path, str]) -> bool:
"""
Check if a path exists, trying multiple times.
This is necessary for working on HPC systems where if a given
directory/filesystem is under heavy pressure the first check or two might
time out as the FS goes and executes the query over the network.
"""
res = []
if not isinstance(path, pathlib.Path):
path = pathlib.Path(path)
for _ in range(0, 10):
if path.exists():
res.append(True)
else:
res.append(False)
time.sleep(0.001)
return max(set(res), key=res.count)
[docs]def get_primary_axis(criteria,
primary_axis_bc: tp.List,
cmdopts: types.Cmdopts) -> int:
"""
Determine axis in a bivariate batch criteria is the primary axis.
This is obtained on a per-query basis depending on the query context, or can
be overriden on the cmdline.
"""
if cmdopts['plot_primary_axis'] == 0:
return 0
if cmdopts['plot_primary_axis'] == 1:
return 1
if any(isinstance(criteria.criteria1, elt) for elt in primary_axis_bc):
return 0
return 1
[docs]def exp_range_calc(cmdopts: types.Cmdopts,
root_dir: pathlib.Path,
criteria) -> types.PathList:
"""
Get the range of experiments to run/do stuff with. SUPER USEFUL.
"""
exp_all = [root_dir / d for d in criteria.gen_exp_names(cmdopts)]
exp_range = cmdopts['exp_range']
if cmdopts['exp_range'] is not None:
min_exp = int(exp_range.split(':')[0])
max_exp = int(exp_range.split(':')[1])
assert min_exp <= max_exp, \
f"Min batch exp >= max batch exp({min_exp} vs. {max_exp})"
return exp_all[min_exp: max_exp + 1]
return exp_all
[docs]def exp_include_filter(inc_spec: tp.Optional[str],
target: tp.List,
n_exps: int):
"""Calculate which experiments to include in a calculation for something.
Take a input list of experiment numbers to include, and returns the sublist
specified by the inc_spec (of the form [x:y]). inc_spec is an `absolute`
specification; if a given performance measure excludes exp0 then that case
is handled internally so that array/list shapes work out when generating
graphs if this function is used consistently everywhere.
"""
if inc_spec is None:
start = None
end = None
else:
r = inc_spec.split(':')
start = int(r[0])
if r[1] == '':
end = len(target)
else:
end = int(r[1])
if len(target) < n_exps: # Handle perf measures which exclude exp0 by default
start -= 1
return target[slice(start, end, None)]
def bivar_exp_labels_calc(exp_dirs: types.PathList) -> tp.Tuple[tp.List[str],
tp.List[str]]:
"""
Calculate the labels for bivariant experiment graphs.
"""
# Because sets are used, if a sub-range of experiments are selected for
# collation, the selected range has to be an even multiple of the # of
# experiments in the second batch criteria, or inter-experiment graph
# generation won't work (the final CSV is always an MxN grid).
xlabels_set = set()
ylabels_set = set()
for e in exp_dirs:
pair = e.name.split('+')
xlabels_set.add(pair[0])
ylabels_set.add(pair[1])
xlabels = sorted(list(xlabels_set))
ylabels = sorted(list(ylabels_set))
return (xlabels, ylabels)
[docs]def apply_to_expdef(var,
exp_def: definition.XMLExpDef) -> tp.Tuple[tp.Optional[xml.TagRmList],
tp.Optional[xml.TagAddList],
tp.Optional[xml.AttrChangeSet]]:
"""
Apply a generated XML modifictions to an experiment definition.
In this order:
#. Remove existing XML tags
#. Add new XML tags
#. Change existing XML attributes
"""
rmsl = var.gen_tag_rmlist() # type: tp.List[xml.TagRmList]
addsl = var.gen_tag_addlist() # type: tp.List[xml.TagAddList]
chgsl = var.gen_attr_changelist() # type: tp.List[xml.AttrChangeSet]
if rmsl:
rms = rmsl[0]
for r in rms:
exp_def.tag_remove(r.path, r.tag)
else:
rms = None
if addsl:
adds = addsl[0]
for a in adds:
assert a.path is not None, "Can't add tag {a.tag} with no parent"
exp_def.tag_add(a.path, a.tag, a.attr, a.allow_dup)
else:
adds = None
if chgsl:
chgs = chgsl[0]
for c in chgs:
exp_def.attr_change(c.path, c.attr, c.value)
else:
chgs = None
return rms, adds, chgs
[docs]def pickle_modifications(adds: tp.Optional[xml.TagAddList],
chgs: tp.Optional[xml.AttrChangeSet],
path: pathlib.Path) -> None:
"""
After applying XML modifications, pickle changes for later retrieval.
"""
if adds is not None:
adds.pickle(path)
if chgs is not None:
chgs.pickle(path)
[docs]def exp_template_path(cmdopts: types.Cmdopts,
batch_input_root: pathlib.Path,
dirname: str) -> pathlib.Path:
"""Calculate the path to the template input file in the batch experiment root.
The file at this path will be Used as the de-facto template for generating
per-run input files.
"""
template = pathlib.Path(cmdopts['template_input_file'])
return batch_input_root / dirname / template.stem
[docs]def get_n_robots(main_config: types.YAMLDict,
cmdopts: types.Cmdopts,
exp_input_root: pathlib.Path,
exp_def: definition.XMLExpDef) -> int:
"""
Get the # robots used for a specific :term:`Experiment`.
"""
module = pm.pipeline.get_plugin_module(cmdopts['platform'])
# Get # robots to send to shell cmds generator. We try:
#
# 1. Getting it from the current experiment definition, which contains all
# changes to the template input file EXCEPT those from batch criteria,
# which have already been written and pickled at this point.
#
# 2. Getting it from the pickled experiment definition (i.e., from the
# batch criteria which was used for this experiment).
n_robots = module.population_size_from_def(exp_def,
main_config,
cmdopts)
if n_robots <= 0:
pkl_def = definition.unpickle(exp_input_root / config.kPickleLeaf)
n_robots = module.population_size_from_pickle(pkl_def,
main_config,
cmdopts)
assert n_robots > 0, "n_robots must be > 0"
return n_robots
[docs]def df_fill(df: pd.DataFrame, policy: str) -> pd.DataFrame:
"""
Fill missing cells in a dataframe according to the specified fill policy.
"""
if policy == 'none':
return df
elif policy == 'pad':
return df.fillna(method='pad')
elif policy == 'zero':
return df.fillna(value=0)
else:
raise RuntimeError(f"Bad fill policy {policy}")
@retry(OSError, tries=10, delay=0.100, backoff=1.1) # type:ignore
def pickle_dump(obj: object, f) -> None:
pickle.dump(obj, f)
def gen_scenario_spec(cmdopts: types.Cmdopts, **kwargs) -> tp.Dict[str, tp.Any]:
# scenario is passed in kwargs during stage 5 (can't be passed via
# --scenario in general )
scenario = kwargs.get('scenario', cmdopts['scenario'])
sgp = pm.module_load_tiered(project=cmdopts['project'],
path='generators.scenario_generator_parser')
kw = sgp.ScenarioGeneratorParser().to_dict(scenario)
return kw
def sphinx_ref(ref: str) -> str:
try:
# This is kind of a hack...
if __sphinx_build_man__: # type: ignore
parts = ref.split('.')
stripped = parts[-1]
return stripped[:-1]
except NameError:
pass
return ref
utf8open = functools.partial(open, encoding='UTF-8')
"""
Explictly specify that the type of file being opened is UTF-8, which is should
be for almost everything in SIERRA.
"""
__api__ = [
'ArenaExtent',
'Sigmoid',
'ReLu',
'dir_create_checked',
'path_exists',
'get_primary_axis',
'exp_range_calc',
'exp_include_filter',
'apply_to_expdef',
'pickle_modifications',
'exp_template_path',
'get_n_robots',
'df_fill',
'utf8open',
]