#
# Copyright 2025 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
#
"""Functionality for runner different types of models.
See :ref:`plugins/proc/modelrunner` for usage documentation.
"""
# Core packages
import pathlib
import time
import datetime
import typing as tp
import logging
# 3rd party packages
import yaml
# Project packages
from sierra.core import config, utils, types, batchroot, storage, exproot
from sierra.core.variables import batch_criteria as bc
from sierra.core import plugin as pm
from sierra.core.models import interface
_logger = logging.getLogger(__name__)
[docs]
def proc_batch_exp(
main_config: types.YAMLDict,
cmdopts: types.Cmdopts,
pathset: batchroot.PathSet,
criteria: bc.XVarBatchCriteria,
) -> None:
"""
Run all intra- and inter-exp models.
"""
models = _load_models(main_config, cmdopts, "intra")
_logger.info("Running %d intra-experiment models...", len(models))
start = time.time()
_run_intra_exp(cmdopts, pathset, models, criteria)
elapsed = int(time.time() - start)
sec = datetime.timedelta(seconds=elapsed)
_logger.info("Intra-experiment models finished in %s", str(sec))
models = _load_models(main_config, cmdopts, "inter")
_logger.info("Running %d inter-experiment models...", len(models))
start = time.time()
_run_inter_exp(cmdopts, pathset, models, criteria)
elapsed = int(time.time() - start)
sec = datetime.timedelta(seconds=elapsed)
_logger.info("Inter-experiment models finished in %s", str(sec))
def _load_models(
main_config: types.YAMLDict, cmdopts: types.Cmdopts, model_type: str
) -> dict[str, tp.Union[dict[str, tp.Any]]]:
project_models = (
pathlib.Path(cmdopts["project_config_root"]) / config.PROJECT_YAML.models
)
loaded = {}
_logger.info("Loading %s-exp models for project %s", model_type, cmdopts["project"])
models_config = yaml.load(utils.utf8open(project_models), yaml.FullLoader)
# This is ALL model plugins found by SIERRA.
loaded_model_plugins = [
p
for p in pm.pipeline.loaded_plugins()
if pm.pipeline.loaded_plugins()[p]["type"] == "model"
]
_logger.debug(
"%d loaded model plugins on SIERRA_PLUGIN_PATH", len(loaded_model_plugins)
)
for plugin_name in loaded_model_plugins:
# This is all available models for a specific plugin
module = pm.module_load(plugin_name)
available_models = module.sierra_models(model_type)
_logger.debug(
"Loaded model plugin %s has %d %s-exp models: %s",
plugin_name,
len(available_models) if available_models else 0,
model_type,
available_models,
)
if models_config.get(f"{model_type}-exp"):
for conf in models_config[f"{model_type}-exp"]:
# Class name of the model is the last part of the dot-separated
# path, and plays no part in module lookup in the python
# interpreter.
model_name = conf["name"].split(".")[-1]
# The part of the model name which is relative to the plugin
# directory in which it should be found.
model_module_relpath = ".".join(conf["name"].split(".")[:-1])
model_path = "{}.{}".format(plugin_name, model_module_relpath)
model_fullpath = f"{model_module_relpath}.{model_name}"
if pm.module_exists(model_path):
_logger.debug(
"Loading %s-exp model %s using path %s: YAML configuration match",
model_type,
model_name,
model_module_relpath,
)
model_params = {
k: v for k, v in conf.items() if k not in ["name", "targets"]
}
loaded[model_fullpath] = {
"targets": conf["targets"],
"legend": (
conf["legend"]
if "legend" in conf
else ["Model Prediction" for t in conf["targets"]]
),
"model": getattr(pm.module_load(model_path), model_name)(
model_params
),
}
else:
_logger.debug("All %s-exp models disabled: no configuration", model_type)
if len(loaded) > 0:
_logger.info(
"Loaded %s-exp %s models for project %s",
len(loaded),
model_type,
cmdopts["project"],
)
return loaded
def _run_intra_exp(
cmdopts: types.Cmdopts,
pathset: batchroot.PathSet,
to_run: dict[str, dict[str, tp.Any]],
criteria: bc.XVarBatchCriteria,
) -> None:
"""
Run all enabled intra-experiment models for all experiments in a batch.
"""
exp_dirnames = criteria.gen_exp_names()
exp_to_run = utils.exp_range_calc(
cmdopts["exp_range"], pathset.output_root, exp_dirnames
)
for exp in exp_to_run:
exp_index = exp_dirnames.index(exp.name)
exproots = exproot.PathSet(pathset, exp.name, exp_dirnames[0])
utils.dir_create_checked(exproots.model_root, exist_ok=True)
for blob in to_run.values():
_run_intra_single_in_exp(criteria, cmdopts, exproots, exp_index, blob)
def _run_intra_single_in_exp(
criteria: bc.XVarBatchCriteria,
cmdopts: types.Cmdopts,
pathset: exproot.PathSet,
exp_index: int,
blob: dict[str, tp.Union[interface.IIntraExpModel1D, list[str]]],
) -> None:
model = blob["model"]
targets = blob["targets"]
if not model.should_run(criteria, cmdopts, exp_index):
_logger.debug(
"Skip running intra-experiment model from '%s' for exp%s",
str(model),
exp_index,
)
return
# Run the model
_logger.debug(
"Run intra-experiment model %s for %s",
str(model),
pathset.input_root.name,
)
dfs = model.run(criteria, exp_index, cmdopts, pathset)
legend = blob["legend"]
for df, target in zip(dfs, targets):
path_stem = pathset.model_root / target
# Write model legend file so the generated graph can find it
idx = dfs.index(df)
with utils.utf8open(
path_stem.with_suffix(config.MODELS_EXT["legend"]), "w"
) as f:
f.write(legend[idx])
# Write model .csv file
storage.df_write(
df,
path_stem.with_suffix(config.MODELS_EXT["model"]),
"storage.csv",
)
def _run_inter_exp(
cmdopts: types.Cmdopts,
pathset: batchroot.PathSet,
to_run: dict[str, tp.Any],
criteria: bc.XVarBatchCriteria,
) -> None:
utils.dir_create_checked(pathset.model_interexp_root, exist_ok=True)
for blob in to_run.values():
model = blob["model"]
legend = blob["legend"]
targets = blob["targets"]
if not model.should_run(criteria, cmdopts):
_logger.debug("Skip running inter-experiment model '%s'", str(model))
continue
# Run the model
_logger.debug("Run inter-experiment model '%s'", str(model))
dfs = model.run(criteria, cmdopts, pathset)
for df, csv_stem in zip(dfs, targets):
path_stem = pathset.model_interexp_root / csv_stem
utils.dir_create_checked(path_stem.parent, exist_ok=True)
# Write model .csv file
storage.df_write(
df,
path_stem.with_suffix(config.MODELS_EXT["model"]),
"storage.csv",
)
idx = dfs.index(df)
with utils.utf8open(
path_stem.with_suffix(config.MODELS_EXT["legend"]), "w"
) as f:
f.write(legend[idx])
__all__ = ["proc_batch_exp"]