Source code for sierra.core.generators.experiment

# Copyright 2018 London Lowmanstone, John Harwell, All rights reserved.
#
#  SPDX-License-Identifier: MIT
"""Experiment generation classes.

Experiment generation modifies the
:class:`~sierra.core.experiment.definition.BaseExpDef` object built from
the specified batch criteria as follows:

- Engine-specific modifications common to all batch experiments
- Project-specific modifications common to all batch experiments
- Modifications generated by the selected controller+scenario

NOTE:: Generated definitions from batch criteria are not handled here; they are
       already generated to scaffold the batch experiment when experiment
       generation is run.

"""

# Core packages
import typing as tp
import logging
import pathlib
import time
import random
import pickle
import copy
import os

# 3rd party packages

# Project packages
import sierra.core.generators.generator_factory as gf
from sierra.core.experiment import spec, definition, bindings
from sierra.core import types, batchroot, exproot, utils, config, engine
import sierra.core.variables.batch_criteria as bc
import sierra.core.plugin as pm


[docs] class BatchExpDefGenerator: """Generate experiment definitions for a :term:`Batch Experiment`. Does not create the batch experiment after generation. """ def __init__( self, criteria: bc.XVarBatchCriteria, pathset: batchroot.PathSet, controller_name: str, scenario_basename: str, cmdopts: types.Cmdopts, ) -> None: #: batch_config_template: Absolute path to the root template expdef # configuration file. self.batch_config_template = pathlib.Path(cmdopts["expdef_template"]) assert self.batch_config_template.is_file(), "'{}' is not a valid file".format( self.batch_config_template ) self.exp_template_stem = self.batch_config_template.stem self.batch_config_extension = None self.pathset = pathset #: controller_name: Name of controller generator to use. self.controller_name = controller_name #: scenario_basename: Name of scenario generator to use. self.scenario_basename = scenario_basename #: criteria: :class:`~sierra.core.variables.batch_criteria.BatchCriteria` # derived object instance created from cmdline definition. self.criteria = criteria self.cmdopts = cmdopts self.logger = logging.getLogger(__name__)
[docs] def generate_defs(self) -> list[definition.BaseExpDef]: """Generate and return the batch experiment definition. Returns: A list of experiment definitions (one for each experiment in the batch). """ scaffold_spec = spec.scaffold_spec_factory(self.criteria) # Create and run generators defs = [] for i in range(0, scaffold_spec.n_exps): generator = self._create_exp_generator(i) self.logger.debug( ( "Generating scenario+controller changes from " "generator '%s' for exp%s" ), self.cmdopts["joint_generator"], i, ) defs.append(generator.generate()) return defs
def _create_exp_generator(self, exp_num: int): """ Create the joint scenario+controller generator from command line definitions. Arguments: exp_num: Experiment number in the batch """ exp_spec = spec.ExperimentSpec( self.criteria, self.pathset.input_root, exp_num, self.cmdopts ) template_fpath = exp_spec.exp_input_root / self.exp_template_stem config_root = pathlib.Path(self.cmdopts["project_config_root"]) scenario = gf.ScenarioGenerator( controller=self.controller_name, scenario=self.scenario_basename, spec=exp_spec, expdef_template_fpath=template_fpath, cmdopts=self.cmdopts, ) controller = gf.ControllerGenerator( controller=self.controller_name, config_root=config_root, cmdopts=self.cmdopts, spec=exp_spec, ) generator = gf.JointGenerator(scenario=scenario, controller=controller) self.cmdopts["joint_generator"] = generator.name return generator
[docs] class BatchExpCreator: """Instantiate a :term:`Batch Experiment`. Calls :class:`~sierra.core.generators.experiment.ExpCreator` on each experimental definition in the batch """ def __init__( self, criteria: bc.XVarBatchCriteria, cmdopts: types.Cmdopts, pathset: batchroot.PathSet, ) -> None: #: Absolute path to the root template expdef configuration file. self.batch_config_template = pathlib.Path(cmdopts["expdef_template"]) self.pathset = pathset #: :class:`~sierra.core.variables.batch_criteria.BatchCriteria` derived # object instance created from cmdline definition. self.criteria = criteria self.cmdopts = cmdopts self.logger = logging.getLogger(__name__) def create(self, generator: BatchExpDefGenerator) -> None: utils.dir_create_checked(self.pathset.input_root, self.cmdopts["exp_overwrite"]) # Scaffold the batch experiment, creating experiment directories and # writing template expdef input files for each experiment in the batch # with changes from the batch criteria added. module = pm.pipeline.get_plugin_module(self.cmdopts["expdef"]) exp_def = module.ExpDef( input_fpath=self.batch_config_template, write_config=None ) module = pm.pipeline.get_plugin_module(self.cmdopts["engine"]) if hasattr(module, "expdef_flatten"): self.logger.debug( "Flattening --expdef-template definition before scaffolding" ) # Flatten the expdef here if the engine defines the hook, so that # the full flattened file contents are available for scaffolding. exp_def = module.expdef_flatten(exp_def) self.criteria.scaffold_exps(exp_def, self.cmdopts) # Pickle experiment definitions in the actual batch experiment # directory for later retrieval. self.criteria.pickle_exp_defs(self.cmdopts) # Run batch experiment generator (must be after scaffolding so the # per-experiment template files are in place). defs = generator.generate_defs() assert len(defs) > 0, "No expdef modifications generated?" self.logger.info( "Applying generated scenario+controller changes/mods to all experiments" ) if self.cmdopts["exec_parallelism_paradigm"] is not None: self.logger.warning( "Overriding engine=%s parallelism paradigm with %s", self.cmdopts["engine"], self.cmdopts["exec_parallelism_paradigm"], ) parallelism_paradigm = self.cmdopts["exec_parallelism_paradigm"] else: configurer = engine.ExpConfigurer(self.cmdopts) parallelism_paradigm = configurer.parallelism_paradigm() self._init_cmdfile(parallelism_paradigm) for i, defi in enumerate(defs): self.logger.trace( "Applying %s+%s generated scenario+controller changes/mods to exp%s", defi.n_mods()[0], defi.n_mods()[1], i, ) exp_pathset = exproot.PathSet( self.pathset, self.criteria.gen_exp_names()[i] ) ExpCreator( self.cmdopts, self.criteria, self.batch_config_template, exp_pathset, i, ).from_def(defi, parallelism_paradigm) def _init_cmdfile(self, paradigm: str) -> None: # Commands file stored in batch input root if paradigm == "per-batch": path = self.pathset.root / config.GNU_PARALLEL["cmdfile_stem"] if utils.path_exists(path.with_suffix(config.GNU_PARALLEL["cmdfile_ext"])): path.with_suffix(config.GNU_PARALLEL["cmdfile_ext"]).unlink()
[docs] class ExpCreator: """Instantiate a generated experiment from an experiment definition. Takes generated :term:`Experiment` definitions and writes them to the filesystem. Args: template_ipath: Absolute path to the template expdef configuration file. """ def __init__( self, cmdopts: types.Cmdopts, criteria: bc.XVarBatchCriteria, template_ipath: pathlib.Path, pathset: exproot.PathSet, exp_num: int, ) -> None: # filename of template file, sans extension and parent directory path self.template_stem = template_ipath.resolve().stem #: Dictionary containing parsed cmdline options. self.cmdopts = cmdopts self.criteria = criteria self.exp_num = exp_num self.pathset = pathset self.logger = logging.getLogger(__name__) # If random seeds where previously generated, use them if configured self.seeds_fpath = self.pathset.input_root / config.RANDOM_SEEDS_LEAF self.preserve_seeds = self.cmdopts["preserve_seeds"] self.random_seeds = None if self.preserve_seeds: if utils.path_exists(self.seeds_fpath): with self.seeds_fpath.open("rb") as f: self.random_seeds = pickle.load(f) if self.random_seeds is not None: if len(self.random_seeds) == self.cmdopts["n_runs"]: self.logger.trace( "Using existing random seeds for experiment%s", self.exp_num ) elif len(self.random_seeds) != int(self.cmdopts["n_runs"]): # OK to overwrite the saved random seeds--they changed the # experiment definition. self.logger.warning( ( "Experiment%s definition changed: # random " "seeds (%s) != --n-runs (%s): create new " "seeds" ), self.exp_num, len(self.random_seeds), self.cmdopts["n_runs"], ) self.preserve_seeds = False if not self.preserve_seeds or self.random_seeds is None: self.logger.trace( "Generating new random seeds for experiment%s", self.exp_num ) self.random_seeds = random.sample( range(0, int(time.time())), int(self.cmdopts["n_runs"]) )
[docs] def from_def( self, exp_def: definition.BaseExpDef, parallelism_paradigm: str ) -> None: """Create all experimental runs by writing input files to filesystem. The passed :class:`~sierra.core.experiment.definition.BaseExpDef` object contains all changes that should be made to all runs in the experiment. Additional changes to create a set of unique runs from which distributions of system behavior can be meaningfully computed post-hoc are added. """ cmdfile_path = self._init_cmdfile(parallelism_paradigm) n_agents = utils.get_n_agents( self.criteria.main_config, self.cmdopts, self.pathset.input_root, exp_def ) generator = engine.ExpRunShellCmdsGenerator( self.cmdopts, self.criteria, self.exp_num, n_agents ) # Create all experimental runs self.logger.debug( "Creating %s runs in exp%s", self.cmdopts["n_runs"], self.exp_num ) for run_num in range(int(self.cmdopts["n_runs"])): per_run = copy.deepcopy(exp_def) self._create_exp_run( per_run, generator, run_num, cmdfile_path, parallelism_paradigm ) # Perform experiment level configuration AFTER all runs have been # generated in the experiment, in case the configuration depends on the # generated launch files. engine.ExpConfigurer(self.cmdopts).for_exp(self.pathset.input_root) # Save seeds if not utils.path_exists(self.seeds_fpath) or not self.preserve_seeds: if utils.path_exists(self.seeds_fpath): self.seeds_fpath.unlink() with self.seeds_fpath.open("ab") as f: utils.pickle_dump(self.random_seeds, f)
def _create_exp_run( self, run_exp_def: definition.BaseExpDef, cmds_generator, run_num: int, cmdfile_path: pathlib.Path, parallelism_paradigm: str, ) -> None: run_output_dir = f"{self.template_stem}_run{run_num}_output" # If the project defined per-run configuration, apply # it. Otherwise, the already-applied configuration for the engine is # all that will be used per-run. per_run = pm.module_load_tiered( project=self.cmdopts["project"], path="generators.experiment" ) run_output_root = self.pathset.output_root / run_output_dir stem_path = self._get_launch_file_stempath(run_num) # Generate per-run exp changes. per_run.for_single_exp_run( run_exp_def, run_num, run_output_root, stem_path, self.random_seeds[run_num], self.cmdopts, ) # Write out the experimental run launch file run_exp_def.write(stem_path) # Perform any necessary programmatic (i.e., stuff you can do in python # and don't need a shell for) per-run configuration. configurer = engine.ExpConfigurer(self.cmdopts) configurer.for_exp_run(self.pathset.input_root, run_output_root) ext = config.GNU_PARALLEL["cmdfile_ext"] if parallelism_paradigm in ["per-exp", "per-batch"]: # Update commands file with the command for the configured # experimental run. with utils.utf8open(cmdfile_path.with_suffix(ext), "a") as cmdfile: self._update_cmdfile( cmdfile, cmds_generator, parallelism_paradigm, run_num, run_output_root, self._get_launch_file_stempath(run_num), "slave", ) elif parallelism_paradigm == "per-run": # Write new GNU Parallel commands file with the commands for the # experimental run. master_fpath = f"{cmdfile_path}_run{run_num}_master{ext}" slave_fpath = f"{cmdfile_path}_run{run_num}_slave{ext}" self.logger.trace("Updating slave cmdfile %s", slave_fpath) with utils.utf8open(slave_fpath, "w") as cmds_file: self._update_cmdfile( cmds_file, cmds_generator, "per-run", run_num, run_output_root, self._get_launch_file_stempath(run_num), "slave", ) self.logger.trace("Updating master cmdfile %s", master_fpath) with utils.utf8open(master_fpath, "w") as cmdfile: self._update_cmdfile( cmdfile, cmds_generator, "per-run", run_num, run_output_root, self._get_launch_file_stempath(run_num), "master", ) def _get_launch_file_stempath(self, run_num: int) -> pathlib.Path: """File is named as ``<template input file stem>_run<run_num>``.""" leaf = f"{self.template_stem}_run{run_num}" return self.pathset.input_root / leaf def _init_cmdfile(self, paradigm: str) -> pathlib.Path: # Commands file stored in batch input root if paradigm == "per-batch": path = self.pathset.parent / config.GNU_PARALLEL["cmdfile_stem"] # Commands file stored in input root for each experiment. elif paradigm in {"per-exp", "per-run"}: path = self.pathset.input_root / config.GNU_PARALLEL["cmdfile_stem"] else: raise ValueError(f"Bad value for parallelism paradigm: {paradigm}") # Clear out commands file if it exists and is per-batch/per-exp, because # those files are appended to as we generate each experiment. We # don't need to do that for per-run parallelism, because those files are # not. if paradigm == "per-exp" and utils.path_exists( path.with_suffix(config.GNU_PARALLEL["cmdfile_ext"]) ): path.with_suffix(config.GNU_PARALLEL["cmdfile_ext"]).unlink() return path def _update_cmdfile( self, cmdfile, cmds_generator: bindings.IExpRunShellCmdsGenerator, paradigm: str, run_num: int, run_output_root: pathlib.Path, launch_stem_path: pathlib.Path, for_host: str, ) -> None: """Add command to launch a given experimental run to the command file.""" pre_specs = cmds_generator.pre_run_cmds(for_host, launch_stem_path, run_num) assert all( spec.shell for spec in pre_specs ), "All pre-exp commands are run in a shell" pre_cmds = [spec.cmd for spec in pre_specs] self.logger.trace("Pre-experiment cmds: %s", pre_cmds) exec_specs = cmds_generator.exec_run_cmds(for_host, launch_stem_path, run_num) assert all( spec.shell for spec in exec_specs ), "All exec-exp commands are run in a shell" exec_cmds = [spec.cmd for spec in exec_specs] self.logger.trace("Exec-experiment cmds: %s", exec_cmds) post_specs = cmds_generator.post_run_cmds(for_host, run_output_root) assert all( spec.shell for spec in post_specs ), "All post-exp commands are run in a shell" post_cmds = [spec.cmd for spec in post_specs] self.logger.trace("Post-experiment cmds: %s", post_cmds) if len(pre_cmds + exec_cmds + post_cmds) == 0: self.logger.debug("Skipping writing %s cmds file: no cmds", for_host) return # # This is one of those crucial parts of SIERRA where the "magic" # happens. # # If there is 1 cmdfile per experiment, then the pre- and post-exec cmds # need to be prepended and appended to the exec cmds on a per-line # basis, because each line needs to contain the total set of commands to # run each experimental run within the experiment. Each line needs to be # capable of being executed independently of the others, so that e.g., # GNU parallel can process them concurrently if directed to do so. Same # for 1 cmdfile per batch. # # If there is 1 cmdfile per experimental run, then its the same thing, # BUT we need to break the exec cmds over multiple lines in the # cmdfile. All commands in the command file WILL be run in parallel; # this is the paradigm that maps to real hw, and obviously you want all # of your configured agents doing things simultaneously in an # experiment. if paradigm in ["per-exp", "per-batch"]: cmdfile.write(" ".join(pre_cmds + exec_cmds + post_cmds) + "\n") elif paradigm == "per-run": for e in exec_cmds: cmdfile.write(" ".join([*pre_cmds, e, *post_cmds]) + "\n") else: raise ValueError(f"Bad paradigm {paradigm}")
__all__ = ["BatchExpCreator", "BatchExpDefGenerator", "ExpCreator"]