Source code for sierra.core.variables.batch_criteria

# Copyright 2019 John Harwell, All rights reserved.
#
#  SPDX-License-Identifier: MIT
"""
Base classes used to define :term:`Batch Experiments <Batch Experiment>`.
"""
# Core packages
import typing as tp
import logging
import argparse
import copy
import pathlib
import itertools

# 3rd party packages

# Project packages
from sierra.core.variables import base_variable
from sierra.core import utils
from sierra.core.experiment import definition
import sierra.core.plugin as pm
from sierra.core import types, config
from sierra.core.graphs import bcbridge


[docs] class IQueryableBatchCriteria: """Mixin interface for criteria which can be queried during stage {1,2}. Used to extract additional information needed for configuring some :term:`Engines <Engine>` and execution environments. """
[docs] def n_agents(self, exp_num: int) -> int: """ Return the # of agents used for a given :term:`Experiment`. """ raise NotImplementedError
[docs] class BaseBatchCriteria(base_variable.IBaseVariable): """Defines experiments via lists of sets of changes to make to an expdef. Attributes: cli_arg: Unparsed batch criteria string from command line. main_config: Parsed dictionary of main YAML configuration. batch_input_root: Absolute path to the directory where batch experiment directories should be created. """ def __init__( self, cli_arg: str, main_config: types.YAMLDict, batch_input_root: pathlib.Path ) -> None: # 2025-09-21 [JRH]: The "name" of the batch criteria is just whatever is # passed on the cmdline. self.name = cli_arg self.main_config = main_config self.batch_input_root = batch_input_root self.cat_str = cli_arg.split(".", maxsplit=1)[0] self.def_str = ".".join(cli_arg.split(".")[1:]) self.logger = logging.getLogger(__name__) # Stub out IBaseVariable because all concrete batch criteria only implement # a subset of them.
[docs] def gen_attr_changelist(self) -> list[definition.AttrChangeSet]: return []
[docs] def gen_tag_rmlist(self) -> list[definition.ElementRmList]: return []
[docs] def gen_element_addlist(self) -> list[definition.ElementAddList]: return []
[docs] def gen_files(self) -> None: pass
def cardinality(self) -> int: return -1 def gen_exp_names(self) -> list[str]: raise NotImplementedError def computable_exp_scenario_name(self) -> bool: return False
[docs] def arena_dims(self, cmdopts: types.Cmdopts) -> list[utils.ArenaExtent]: """Get the arena dimensions used for each experiment in the batch. Not applicable to all criteria. Must be implemented on a per-engine basis, as different engines have different means of computing the size of the arena. """ module = pm.pipeline.get_plugin_module(cmdopts["engine"]) assert hasattr( module, "arena_dims_from_criteria" ), f"Engine plugin {module.__name__} does not implement arena_dims_from_criteria()" return module.arena_dims_from_criteria(self)
def n_exp(self) -> int: from sierra.core.experiment import spec # noqa: PLC0415 scaffold_spec = spec.scaffold_spec_factory(tp.cast(XVarBatchCriteria, self)) return scaffold_spec.n_exps def pickle_exp_defs(self, cmdopts: types.Cmdopts) -> None: from sierra.core.experiment import spec # noqa: PLC0415 scaffold_spec = spec.scaffold_spec_factory(tp.cast(XVarBatchCriteria, self)) for exp in range(0, scaffold_spec.n_exps): exp_dirname = self.gen_exp_names()[exp] # Pickling of batch criteria experiment definitions is the FIRST set # of changes to be pickled--all other changes come after. We append # to the pickle file by default, which allows any number of # additional sets of changes to be written, BUT that can also lead # to errors if stage 1 is run multiple times before stage 4. So, we # DELETE the pickle file for each experiment here to make stage 1 # idempotent. pkl_path = self.batch_input_root / exp_dirname / config.PICKLE_LEAF exp_defi = scaffold_spec.mods[exp] if not scaffold_spec.is_compound: exp_defi.pickle(pkl_path, delete=True) else: exp_defi[0].pickle(pkl_path, delete=True) exp_defi[1].pickle(pkl_path, delete=False)
[docs] def scaffold_exps( self, batch_def: definition.BaseExpDef, cmdopts: types.Cmdopts ) -> None: """Scaffold a batch experiment. Takes the raw template input file and apply expdef modifications from the batch criteria for all experiments, and save the result in each experiment's input directory. """ from sierra.core.experiment import spec # noqa: PLC0415 scaffold_spec = spec.scaffold_spec_factory( tp.cast(XVarBatchCriteria, self), log=True ) for i in range(0, scaffold_spec.n_exps): modsi = scaffold_spec.mods[i] expi_def = copy.deepcopy(batch_def) self._scaffold_expi(expi_def, modsi, scaffold_spec.is_compound, i, cmdopts) n_exp_dirs = len(list(self.batch_input_root.iterdir())) if scaffold_spec.n_exps != n_exp_dirs: msg1 = ( f"Size of batch experiment ({scaffold_spec.n_exps}) != " f"# exp dirs ({n_exp_dirs}): possibly caused by:" ) msg2 = ( f"(1) Changing bc w/o changing the generation root " f"({self.batch_input_root})" ) msg3 = ( f"(2) Sharing {self.batch_input_root} between different " f"batch criteria" ) self.logger.fatal(msg1) self.logger.fatal(msg2) self.logger.fatal(msg3) raise RuntimeError("Batch experiment size/# exp dir mismatch")
def _scaffold_expi( self, expi_def: definition.BaseExpDef, modsi, is_compound: bool, i: int, cmdopts: types.Cmdopts, ) -> None: exp_dirname = self.gen_exp_names()[i] exp_input_root = self.batch_input_root / exp_dirname utils.dir_create_checked(exp_input_root, exist_ok=cmdopts["exp_overwrite"]) if not is_compound: self.logger.debug( ("Applying %s expdef mods from '%s' for exp%s in %s"), len(modsi), self.name, i, exp_dirname, ) for mod in modsi: if isinstance(mod, definition.AttrChange): expi_def.attr_change(mod.path, mod.attr, mod.value) elif isinstance(mod, definition.ElementAdd): assert ( mod.path is not None ), "Cannot add root {mode.tag} during scaffolding" expi_def.element_add(mod.path, mod.tag, mod.attr, mod.allow_dup) elif isinstance(mod, definition.ElementRm): expi_def.element_remove(mod.path, mod.tag) else: self.logger.debug( ("Applying %s expdef modifications from '%s' for exp%s in %s"), len(modsi[0]) + len(modsi[1]), self.name, i, exp_dirname, ) # Mods are a tuple for compound specs: adds, changes. We do adds # first, in case some insane person wants to use the second batch # criteria to modify something they just added. for add in modsi[0]: expi_def.element_add(add.path, add.tag, add.attr, add.allow_dup) for chg in modsi[1]: expi_def.attr_change(chg.path, chg.attr, chg.value) # This will be the "template" input file used to generate the input # files for each experimental run in the experiment fmt = pm.pipeline.get_plugin_module(cmdopts["expdef"]) wr_config = definition.WriterConfig( [ { "src_parent": None, "src_tag": fmt.root_querypath(), "opath_leaf": None, "new_children": None, "new_children_parent": None, } ] ) expi_def.write_config_set(wr_config) opath = utils.exp_template_path(cmdopts, self.batch_input_root, exp_dirname) expi_def.write(opath)
[docs] class UnivarBatchCriteria(BaseBatchCriteria): """ Base class for a univariate batch criteria. """ def cardinality(self) -> int: return 1 def gen_exp_names(self) -> list[str]: return [f"c1-exp{i}" for i in range(0, self.n_exp())]
[docs] def populations( self, cmdopts: types.Cmdopts, exp_names: tp.Optional[list[str]] = None ) -> list[int]: """ Calculate system sizes used the batch experiment, sorted. Arguments: cmdopts: Dictionary of parsed command line options. exp_names: If is not `None`, then these directories will be used to calculate the system sizes, rather than the results of ``gen_exp_names()``. """ sizes = [] names = exp_names if exp_names is not None else self.gen_exp_names() module1 = pm.pipeline.get_plugin_module(cmdopts["engine"]) module2 = pm.pipeline.get_plugin_module(cmdopts["expdef"]) for d in names: path = self.batch_input_root / d / config.PICKLE_LEAF exp_def = module2.unpickle(path) sizes.append( module1.population_size_from_pickle(exp_def, self.main_config, cmdopts) ) return sizes
[docs] class XVarBatchCriteria( BaseBatchCriteria, bcbridge.IGraphable, IQueryableBatchCriteria ): """ N-dimensional multiple :class:`sierra.core.variables.batch_criteria.UnivarBatchCriteria`. .. versionchanged:: 1.2.20 Batch criteria can be compound: one criteria can create and the other modify expdef elements to create an experiment definition. """ def __init__(self, criterias: list[BaseBatchCriteria]) -> None: BaseBatchCriteria.__init__( self, "+".join([c.name for c in criterias]), criterias[0].main_config, criterias[0].batch_input_root, ) self.criterias = criterias def cardinality(self) -> int: return len(self.criterias) def computable_exp_scenario_name(self) -> bool: return any(c.computable_exp_scenario_name() for c in self.criterias)
[docs] def gen_attr_changelist(self) -> list[definition.AttrChangeSet]: changes = [c.gen_attr_changelist() for c in self.criterias] # Flatten each list of sets into a single list of items flattened_lists = [] for list_of_sets in changes: flattened_list = list(list_of_sets) flattened_lists.append(flattened_list) # Use itertools.product to get all combinations result = [] for combination in itertools.product(*flattened_lists): combined = definition.AttrChangeSet() # Add all changes from each AttrChangeSet in the combination for change_set in combination: for change in change_set: combined.add(change) result.append(combined) return result
[docs] def gen_element_addlist(self) -> list[definition.ElementAddList]: adds = [c.gen_element_addlist() for c in self.criterias] # Create combinations and combine ElementAddList objects result = [] for combo in itertools.product(*adds): combined = definition.ElementAddList() # Add all ElementAdd objects from each ElementAddList in the combo for elem_add_list in combo: for elem_add in elem_add_list: combined.append(elem_add) result.append(combined) return result
[docs] def gen_tag_rmlist(self) -> list[definition.ElementRmList]: rms = [c.gen_tag_rmlist() for c in self.criterias] # Create combinations and combine ElementRmList objects result = [] for combo in itertools.product(*rms): combined = definition.ElementRmList() # Add all ElementRm objects from each ElementRmList in the combo for elem_rm_list in combo: for elem_rm in elem_rm_list: combined.append(elem_rm) result.append(combined) return result
[docs] def gen_exp_names(self) -> list[str]: """ Generate a SORTED list of strings for all experiment names. These will be used as directory LEAF names, and don't include the parents. Basically, this is a flattened list of permutations of all ``gen_exp_names()`` for each batch criteria. """ # Collect all criteria lists with their prefixes criteria_lists = [] for i, criteria in enumerate(self.criterias, 1): prefixed_names = [f"c{i}-exp{j}" for j in range(0, criteria.n_exp())] criteria_lists.append(prefixed_names) # Generate all combinations using itertools.product return [ "+".join(combination) for combination in itertools.product(*criteria_lists) ]
[docs] def populations(self, cmdopts: types.Cmdopts) -> list: """Generate a N-D array of system sizes used the batch experiment. Sizes are in the same order as the directories returned from ``gen_exp_names()`` for each criteria along each axis. """ names = self.gen_exp_names() criteria_dims = [] criteria_counts = [] for criteria in self.criterias: exp_names = criteria.gen_exp_names() n_chgs = len(criteria.gen_attr_changelist()) n_adds = len(criteria.gen_element_addlist()) criteria_dims.append(len(exp_names)) criteria_counts.append(n_chgs + n_adds) # Create multi-dimensional nested list initialized with zeros def create_nested_list(dimensions: list[int]) -> list: if len(dimensions) == 1: return [0] * dimensions[0] return [create_nested_list(dimensions[1:]) for _ in range(dimensions[0])] sizes = create_nested_list(criteria_dims) # Get plugin modules module1 = pm.pipeline.get_plugin_module(cmdopts["engine"]) module2 = pm.pipeline.get_plugin_module(cmdopts["expdef"]) # Calculate total combinations for index conversion total_combinations = 1 for count in criteria_counts: total_combinations *= count for d in names: pkl_path = self.batch_input_root / d / config.PICKLE_LEAF exp_def = module2.unpickle(pkl_path) # Convert linear index to multi-dimensional indices index = names.index(d) indices = [] remaining_index = index for i in range(len(criteria_counts)): # Calculate stride for this dimension stride = 1 for j in range(i + 1, len(criteria_counts)): stride *= criteria_counts[j] # Calculate index for this dimension dim_index = remaining_index // stride indices.append(dim_index) remaining_index = remaining_index % stride # Set the population size at the calculated indices current_level = sizes for _, idx in enumerate(indices[:-1]): current_level = current_level[idx] current_level[indices[-1]] = module1.population_size_from_pickle( exp_def, self.main_config, cmdopts ) return sizes
[docs] def exp_scenario_name(self, exp_num: int) -> str: """Given the experiment number, compute a parsable scenario name. It is necessary to query this function after generating the changelist in order to create generator classes for each experiment in the batch with the correct name and definition in some cases. Can only be called if constant density is one of the sub-criteria. """ for criteria in self.criterias: if hasattr(criteria, "exp_scenario_name"): return criteria.exp_scenario_name( int(exp_num / len(criteria.gen_attr_changelist())) ) raise RuntimeError( "Batch criteria does not define 'exp_scenario_name()' required for constant density scenarios" )
[docs] def graph_info( self, cmdopts: types.Cmdopts, batch_output_root: tp.Optional[pathlib.Path] = None, exp_names: tp.Optional[list[str]] = None, ) -> bcbridge.GraphInfo: info = bcbridge.GraphInfo( cmdopts, batch_output_root, self.gen_exp_names(), ) # 2025-07-08 [JRH]: Eventually, this will be replaced with axes # selection, but for now, limiting to bivariate is the simpler way to # go. assert ( len(self.criterias) <= 2 ), "Only {univar,bivar} batch criteria graph generation currently supported" exp_names = self.gen_exp_names() if self.cardinality() == 1: info1 = self.criterias[0].graph_info( cmdopts, exp_names=exp_names, batch_output_root=batch_output_root ) info.xticks = info1.xticks info.xlabel = info1.xlabel info.xticklabels = info1.xticklabels elif self.cardinality() == 2: c1_xnames = [f"c1-exp{i}" for i in range(0, self.criterias[0].n_exp())] xnames = [d for d in self.gen_exp_names() if any(x in d for x in c1_xnames)] c2_ynames = [f"c2-exp{i}" for i in range(0, self.criterias[1].n_exp())] ynames = [d for d in self.gen_exp_names() if any(y in d for y in c2_ynames)] info1 = self.criterias[0].graph_info( cmdopts, exp_names=xnames, batch_output_root=batch_output_root ) info2 = self.criterias[1].graph_info( cmdopts, exp_names=ynames, batch_output_root=batch_output_root ) info.xticks = info1.xticks info.xticklabels = info1.xticklabels info.yticks = info2.xticks info.xlabel = info1.xlabel info.ylabel = info2.xlabel info.yticklabels = info2.xticklabels return info
def set_batch_input_root(self, root: pathlib.Path) -> None: self.batch_input_root = root for criteria in self.criterias: criteria.batch_input_root = root
[docs] def n_agents(self, exp_num: int) -> int: # Calculate dimensions and counts for each criteria criteria_counts = [] for criteria in self.criterias: n_chgs = len(criteria.gen_attr_changelist()) n_adds = len(criteria.gen_element_addlist()) criteria_counts.append(n_chgs + n_adds) # Convert linear experiment number to multi-dimensional indices indices = [] remaining_exp_num = exp_num for i in range(len(criteria_counts)): # Calculate stride for this dimension stride = 1 for j in range(i + 1, len(criteria_counts)): stride *= criteria_counts[j] # Calculate index for this dimension dim_index = remaining_exp_num // stride indices.append(dim_index) remaining_exp_num = remaining_exp_num % stride # Find the first criteria that has an n_agents method and use it for i, criteria in enumerate(self.criterias): if hasattr(criteria, "n_agents"): return criteria.n_agents(indices[i]) # If no criteria has n_agents method, raise an error raise AttributeError("No criteria has an 'n_agents' method")
def univar_factory( main_config: types.YAMLDict, cmdopts: types.Cmdopts, batch_input_root: pathlib.Path, cli_arg: str, scenario, ) -> BaseBatchCriteria: """ Construct a univariate batch criteria object from a single cmdline argument. """ category = cli_arg.split(".", maxsplit=1)[0] path = f"variables.{category}" module = pm.bc_load(cmdopts, category) bcfactory = module.factory if 5 in cmdopts["pipeline"]: ret = bcfactory( cli_arg, main_config, cmdopts, batch_input_root, scenario=scenario ) else: ret = bcfactory(cli_arg, main_config, cmdopts, batch_input_root) logging.info("Create univariate batch criteria %s from %s", ret.name, path) return ret def factory( main_config: types.YAMLDict, cmdopts: types.Cmdopts, batch_input_root: pathlib.Path, args: argparse.Namespace, scenario: tp.Optional[str] = None, ) -> XVarBatchCriteria: """ Construct a multivariate batch criteria object from cmdline input. """ criterias = [ univar_factory(main_config, cmdopts, batch_input_root, arg, scenario) for arg in args.batch_criteria ] # Project hook bc = pm.module_load_tiered( project=cmdopts["project"], path="variables.batch_criteria" ) ret = bc.XVarBatchCriteria(criterias) logging.info( "Created %s-D batch criteria from %s", len(criterias), ",".join([c.name for c in criterias]), ) return ret __all__ = [ "BaseBatchCriteria", "IQueryableBatchCriteria", "UnivarBatchCriteria", "XVarBatchCriteria", ]