# Copyright 2018 London Lowmanstone, John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""Experiment generation classes.
Experiment generation modifies the
:class:`~sierra.core.experiment.definition.BaseExpDef` object built from
the specified batch criteria as follows:
- Engine-specific modifications common to all batch experiments
- Project-specific modifications common to all batch experiments
- Modifications generated by the selected controller+scenario
NOTE:: Generated definitions from batch criteria are not handled here; they are
already generated to scaffold the batch experiment when experiment
generation is run.
"""
# Core packages
import typing as tp
import logging
import pathlib
import time
import random
import pickle
import copy
import os
# 3rd party packages
# Project packages
import sierra.core.generators.generator_factory as gf
from sierra.core.experiment import spec, definition, bindings
from sierra.core import types, batchroot, exproot, utils, config, engine
import sierra.core.variables.batch_criteria as bc
import sierra.core.plugin as pm
[docs]
class BatchExpDefGenerator:
"""Generate experiment definitions for a :term:`Batch Experiment`.
Does not create the batch experiment after generation.
"""
def __init__(
self,
criteria: bc.XVarBatchCriteria,
pathset: batchroot.PathSet,
controller_name: str,
scenario_basename: str,
cmdopts: types.Cmdopts,
) -> None:
#: batch_config_template: Absolute path to the root template expdef
# configuration file.
self.batch_config_template = pathlib.Path(cmdopts["expdef_template"])
assert self.batch_config_template.is_file(), "'{}' is not a valid file".format(
self.batch_config_template
)
self.exp_template_stem = self.batch_config_template.stem
self.batch_config_extension = None
self.pathset = pathset
#: controller_name: Name of controller generator to use.
self.controller_name = controller_name
#: scenario_basename: Name of scenario generator to use.
self.scenario_basename = scenario_basename
#: criteria: :class:`~sierra.core.variables.batch_criteria.BatchCriteria`
# derived object instance created from cmdline definition.
self.criteria = criteria
self.cmdopts = cmdopts
self.logger = logging.getLogger(__name__)
[docs]
def generate_defs(self) -> list[definition.BaseExpDef]:
"""Generate and return the batch experiment definition.
Returns:
A list of experiment definitions (one for each experiment in the
batch).
"""
scaffold_spec = spec.scaffold_spec_factory(self.criteria)
# Create and run generators
defs = []
for i in range(0, scaffold_spec.n_exps):
generator = self._create_exp_generator(i)
self.logger.debug(
(
"Generating scenario+controller changes from "
"generator '%s' for exp%s"
),
self.cmdopts["joint_generator"],
i,
)
defs.append(generator.generate())
return defs
def _create_exp_generator(self, exp_num: int):
"""
Create the joint scenario+controller generator from command line definitions.
Arguments:
exp_num: Experiment number in the batch
"""
exp_spec = spec.ExperimentSpec(
self.criteria, self.pathset.input_root, exp_num, self.cmdopts
)
template_fpath = exp_spec.exp_input_root / self.exp_template_stem
config_root = pathlib.Path(self.cmdopts["project_config_root"])
scenario = gf.ScenarioGenerator(
controller=self.controller_name,
scenario=self.scenario_basename,
spec=exp_spec,
expdef_template_fpath=template_fpath,
cmdopts=self.cmdopts,
)
controller = gf.ControllerGenerator(
controller=self.controller_name,
config_root=config_root,
cmdopts=self.cmdopts,
spec=exp_spec,
)
generator = gf.JointGenerator(scenario=scenario, controller=controller)
self.cmdopts["joint_generator"] = generator.name
return generator
[docs]
class BatchExpCreator:
"""Instantiate a :term:`Batch Experiment`.
Calls :class:`~sierra.core.generators.experiment.ExpCreator` on each
experimental definition in the batch
"""
def __init__(
self,
criteria: bc.XVarBatchCriteria,
cmdopts: types.Cmdopts,
pathset: batchroot.PathSet,
) -> None:
#: Absolute path to the root template expdef configuration file.
self.batch_config_template = pathlib.Path(cmdopts["expdef_template"])
self.pathset = pathset
#: :class:`~sierra.core.variables.batch_criteria.BatchCriteria` derived
# object instance created from cmdline definition.
self.criteria = criteria
self.cmdopts = cmdopts
self.logger = logging.getLogger(__name__)
def create(self, generator: BatchExpDefGenerator) -> None:
utils.dir_create_checked(self.pathset.input_root, self.cmdopts["exp_overwrite"])
# Scaffold the batch experiment, creating experiment directories and
# writing template expdef input files for each experiment in the batch
# with changes from the batch criteria added.
module = pm.pipeline.get_plugin_module(self.cmdopts["expdef"])
exp_def = module.ExpDef(
input_fpath=self.batch_config_template, write_config=None
)
module = pm.pipeline.get_plugin_module(self.cmdopts["engine"])
if hasattr(module, "expdef_flatten"):
self.logger.debug(
"Flattening --expdef-template definition before scaffolding"
)
# Flatten the expdef here if the engine defines the hook, so that
# the full flattened file contents are available for scaffolding.
exp_def = module.expdef_flatten(exp_def)
self.criteria.scaffold_exps(exp_def, self.cmdopts)
# Pickle experiment definitions in the actual batch experiment
# directory for later retrieval.
self.criteria.pickle_exp_defs(self.cmdopts)
# Run batch experiment generator (must be after scaffolding so the
# per-experiment template files are in place).
defs = generator.generate_defs()
assert len(defs) > 0, "No expdef modifications generated?"
self.logger.info(
"Applying generated scenario+controller changes/mods to all experiments"
)
if self.cmdopts["exec_parallelism_paradigm"] is not None:
self.logger.warning(
"Overriding engine=%s parallelism paradigm with %s",
self.cmdopts["engine"],
self.cmdopts["exec_parallelism_paradigm"],
)
parallelism_paradigm = self.cmdopts["exec_parallelism_paradigm"]
else:
configurer = engine.ExpConfigurer(self.cmdopts)
parallelism_paradigm = configurer.parallelism_paradigm()
self._init_cmdfile(parallelism_paradigm)
for i, defi in enumerate(defs):
self.logger.trace(
"Applying %s+%s generated scenario+controller changes/mods to exp%s",
defi.n_mods()[0],
defi.n_mods()[1],
i,
)
exp_pathset = exproot.PathSet(
self.pathset, self.criteria.gen_exp_names()[i]
)
ExpCreator(
self.cmdopts,
self.criteria,
self.batch_config_template,
exp_pathset,
i,
).from_def(defi, parallelism_paradigm)
def _init_cmdfile(self, paradigm: str) -> None:
# Commands file stored in batch input root
if paradigm == "per-batch":
path = self.pathset.root / config.GNU_PARALLEL["cmdfile_stem"]
if utils.path_exists(path.with_suffix(config.GNU_PARALLEL["cmdfile_ext"])):
path.with_suffix(config.GNU_PARALLEL["cmdfile_ext"]).unlink()
[docs]
class ExpCreator:
"""Instantiate a generated experiment from an experiment definition.
Takes generated :term:`Experiment` definitions and writes them to the
filesystem.
Args:
template_ipath: Absolute path to the template expdef configuration file.
"""
def __init__(
self,
cmdopts: types.Cmdopts,
criteria: bc.XVarBatchCriteria,
template_ipath: pathlib.Path,
pathset: exproot.PathSet,
exp_num: int,
) -> None:
# filename of template file, sans extension and parent directory path
self.template_stem = template_ipath.resolve().stem
#: Dictionary containing parsed cmdline options.
self.cmdopts = cmdopts
self.criteria = criteria
self.exp_num = exp_num
self.pathset = pathset
self.logger = logging.getLogger(__name__)
# If random seeds where previously generated, use them if configured
self.seeds_fpath = self.pathset.input_root / config.RANDOM_SEEDS_LEAF
self.preserve_seeds = self.cmdopts["preserve_seeds"]
self.random_seeds = None
if self.preserve_seeds:
if utils.path_exists(self.seeds_fpath):
with self.seeds_fpath.open("rb") as f:
self.random_seeds = pickle.load(f)
if self.random_seeds is not None:
if len(self.random_seeds) == self.cmdopts["n_runs"]:
self.logger.trace(
"Using existing random seeds for experiment%s", self.exp_num
)
elif len(self.random_seeds) != int(self.cmdopts["n_runs"]):
# OK to overwrite the saved random seeds--they changed the
# experiment definition.
self.logger.warning(
(
"Experiment%s definition changed: # random "
"seeds (%s) != --n-runs (%s): create new "
"seeds"
),
self.exp_num,
len(self.random_seeds),
self.cmdopts["n_runs"],
)
self.preserve_seeds = False
if not self.preserve_seeds or self.random_seeds is None:
self.logger.trace(
"Generating new random seeds for experiment%s", self.exp_num
)
self.random_seeds = random.sample(
range(0, int(time.time())), int(self.cmdopts["n_runs"])
)
[docs]
def from_def(
self, exp_def: definition.BaseExpDef, parallelism_paradigm: str
) -> None:
"""Create all experimental runs by writing input files to filesystem.
The passed :class:`~sierra.core.experiment.definition.BaseExpDef` object
contains all changes that should be made to all runs in the
experiment. Additional changes to create a set of unique runs from which
distributions of system behavior can be meaningfully computed post-hoc
are added.
"""
cmdfile_path = self._init_cmdfile(parallelism_paradigm)
n_agents = utils.get_n_agents(
self.criteria.main_config, self.cmdopts, self.pathset.input_root, exp_def
)
generator = engine.ExpRunShellCmdsGenerator(
self.cmdopts, self.criteria, self.exp_num, n_agents
)
# Create all experimental runs
self.logger.debug(
"Creating %s runs in exp%s", self.cmdopts["n_runs"], self.exp_num
)
for run_num in range(int(self.cmdopts["n_runs"])):
per_run = copy.deepcopy(exp_def)
self._create_exp_run(
per_run, generator, run_num, cmdfile_path, parallelism_paradigm
)
# Perform experiment level configuration AFTER all runs have been
# generated in the experiment, in case the configuration depends on the
# generated launch files.
engine.ExpConfigurer(self.cmdopts).for_exp(self.pathset.input_root)
# Save seeds
if not utils.path_exists(self.seeds_fpath) or not self.preserve_seeds:
if utils.path_exists(self.seeds_fpath):
self.seeds_fpath.unlink()
with self.seeds_fpath.open("ab") as f:
utils.pickle_dump(self.random_seeds, f)
def _create_exp_run(
self,
run_exp_def: definition.BaseExpDef,
cmds_generator,
run_num: int,
cmdfile_path: pathlib.Path,
parallelism_paradigm: str,
) -> None:
run_output_dir = f"{self.template_stem}_run{run_num}_output"
# If the project defined per-run configuration, apply
# it. Otherwise, the already-applied configuration for the engine is
# all that will be used per-run.
per_run = pm.module_load_tiered(
project=self.cmdopts["project"], path="generators.experiment"
)
run_output_root = self.pathset.output_root / run_output_dir
stem_path = self._get_launch_file_stempath(run_num)
# Generate per-run exp changes.
per_run.for_single_exp_run(
run_exp_def,
run_num,
run_output_root,
stem_path,
self.random_seeds[run_num],
self.cmdopts,
)
# Write out the experimental run launch file
run_exp_def.write(stem_path)
# Perform any necessary programmatic (i.e., stuff you can do in python
# and don't need a shell for) per-run configuration.
configurer = engine.ExpConfigurer(self.cmdopts)
configurer.for_exp_run(self.pathset.input_root, run_output_root)
ext = config.GNU_PARALLEL["cmdfile_ext"]
if parallelism_paradigm in ["per-exp", "per-batch"]:
# Update commands file with the command for the configured
# experimental run.
with utils.utf8open(cmdfile_path.with_suffix(ext), "a") as cmdfile:
self._update_cmdfile(
cmdfile,
cmds_generator,
parallelism_paradigm,
run_num,
run_output_root,
self._get_launch_file_stempath(run_num),
"slave",
)
elif parallelism_paradigm == "per-run":
# Write new GNU Parallel commands file with the commands for the
# experimental run.
master_fpath = f"{cmdfile_path}_run{run_num}_master{ext}"
slave_fpath = f"{cmdfile_path}_run{run_num}_slave{ext}"
self.logger.trace("Updating slave cmdfile %s", slave_fpath)
with utils.utf8open(slave_fpath, "w") as cmds_file:
self._update_cmdfile(
cmds_file,
cmds_generator,
"per-run",
run_num,
run_output_root,
self._get_launch_file_stempath(run_num),
"slave",
)
self.logger.trace("Updating master cmdfile %s", master_fpath)
with utils.utf8open(master_fpath, "w") as cmdfile:
self._update_cmdfile(
cmdfile,
cmds_generator,
"per-run",
run_num,
run_output_root,
self._get_launch_file_stempath(run_num),
"master",
)
def _get_launch_file_stempath(self, run_num: int) -> pathlib.Path:
"""File is named as ``<template input file stem>_run<run_num>``."""
leaf = f"{self.template_stem}_run{run_num}"
return self.pathset.input_root / leaf
def _init_cmdfile(self, paradigm: str) -> pathlib.Path:
# Commands file stored in batch input root
if paradigm == "per-batch":
path = self.pathset.parent / config.GNU_PARALLEL["cmdfile_stem"]
# Commands file stored in input root for each experiment.
elif paradigm in {"per-exp", "per-run"}:
path = self.pathset.input_root / config.GNU_PARALLEL["cmdfile_stem"]
else:
raise ValueError(f"Bad value for parallelism paradigm: {paradigm}")
# Clear out commands file if it exists and is per-batch/per-exp, because
# those files are appended to as we generate each experiment. We
# don't need to do that for per-run parallelism, because those files are
# not.
if paradigm == "per-exp" and utils.path_exists(
path.with_suffix(config.GNU_PARALLEL["cmdfile_ext"])
):
path.with_suffix(config.GNU_PARALLEL["cmdfile_ext"]).unlink()
return path
def _update_cmdfile(
self,
cmdfile,
cmds_generator: bindings.IExpRunShellCmdsGenerator,
paradigm: str,
run_num: int,
run_output_root: pathlib.Path,
launch_stem_path: pathlib.Path,
for_host: str,
) -> None:
"""Add command to launch a given experimental run to the command file."""
pre_specs = cmds_generator.pre_run_cmds(for_host, launch_stem_path, run_num)
assert all(
spec.shell for spec in pre_specs
), "All pre-exp commands are run in a shell"
pre_cmds = [spec.cmd for spec in pre_specs]
self.logger.trace("Pre-experiment cmds: %s", pre_cmds)
exec_specs = cmds_generator.exec_run_cmds(for_host, launch_stem_path, run_num)
assert all(
spec.shell for spec in exec_specs
), "All exec-exp commands are run in a shell"
exec_cmds = [spec.cmd for spec in exec_specs]
self.logger.trace("Exec-experiment cmds: %s", exec_cmds)
post_specs = cmds_generator.post_run_cmds(for_host, run_output_root)
assert all(
spec.shell for spec in post_specs
), "All post-exp commands are run in a shell"
post_cmds = [spec.cmd for spec in post_specs]
self.logger.trace("Post-experiment cmds: %s", post_cmds)
if len(pre_cmds + exec_cmds + post_cmds) == 0:
self.logger.debug("Skipping writing %s cmds file: no cmds", for_host)
return
#
# This is one of those crucial parts of SIERRA where the "magic"
# happens.
#
# If there is 1 cmdfile per experiment, then the pre- and post-exec cmds
# need to be prepended and appended to the exec cmds on a per-line
# basis, because each line needs to contain the total set of commands to
# run each experimental run within the experiment. Each line needs to be
# capable of being executed independently of the others, so that e.g.,
# GNU parallel can process them concurrently if directed to do so. Same
# for 1 cmdfile per batch.
#
# If there is 1 cmdfile per experimental run, then its the same thing,
# BUT we need to break the exec cmds over multiple lines in the
# cmdfile. All commands in the command file WILL be run in parallel;
# this is the paradigm that maps to real hw, and obviously you want all
# of your configured agents doing things simultaneously in an
# experiment.
if paradigm in ["per-exp", "per-batch"]:
cmdfile.write(" ".join(pre_cmds + exec_cmds + post_cmds) + "\n")
elif paradigm == "per-run":
for e in exec_cmds:
cmdfile.write(" ".join([*pre_cmds, e, *post_cmds]) + "\n")
else:
raise ValueError(f"Bad paradigm {paradigm}")
__all__ = ["BatchExpCreator", "BatchExpDefGenerator", "ExpCreator"]