Source code for sierra.plugins.proc.pseudostats.plugin
# Copyright 2025 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""
Plugin for copying non-deterministic experiment data.
"""
# Core packages
import multiprocessing as mp
import logging
import pathlib
import shutil
import os
# 3rd party packages
# Project packages
import sierra.core.variables.batch_criteria as bc
from sierra.core import types, utils, batchroot, config
from sierra.core import plugin as pm
_logger = logging.getLogger(__name__)
[docs]
def proc_batch_exp(
main_config: types.YAMLDict,
cmdopts: types.Cmdopts,
pathset: batchroot.PathSet,
criteria: bc.XVarBatchCriteria,
) -> None:
"""
Copy data for each :term:`Experiment` in the :term:`Batch Experiment`.
Ideally this is done in parallel across experiments, but this can be changed
to serial if memory on the SIERRA host machine is limited via
``--processing-parallelism``.
"""
exp_to_proc = utils.exp_range_calc(
cmdopts["exp_range"], pathset.output_root, criteria.gen_exp_names()
)
parallelism = cmdopts["processing_parallelism"]
tasks = []
run_metrics_leaf = main_config["sierra"]["run"]["run_metrics_leaf"]
for exp in exp_to_proc:
tasks.extend(
[
(
run_output_root,
pathset.stat_root / exp.name,
run_metrics_leaf,
cmdopts["storage"],
cmdopts["dataop"],
)
for run_output_root in (pathset.output_root / exp.name).iterdir()
]
)
_logger.debug("Starting %s workers, method=%s", parallelism, mp.get_start_method())
with mp.Pool(processes=parallelism) as pool:
pool.starmap(_worker, tasks)
_logger.debug("All workers finished")
def _worker(
run_output_root: pathlib.Path,
exp_stat_root: pathlib.Path,
run_metrics_leaf: str,
storage: str,
dataop: str,
) -> None:
"""Copy all files in the output root for a run to the statistics root.
Arguments:
run_output_root: Output root for the :term:`Experimental Run`.
exp_stat_root: Path to the statistics root for the :term:`Experiment`.
run_metrics_leaf: Relative prefix in the run output root for data.
storage: Storage medium.
"""
plugin = pm.pipeline.get_plugin_module(storage)
for item in (run_output_root / run_metrics_leaf).rglob("*"):
if (
item.is_dir()
or item.stat().st_size == 0
or not any(plugin.supports_input(s) for s in item.suffixes)
):
continue
utils.dir_create_checked(exp_stat_root, exist_ok=True)
dest = (exp_stat_root / item.name).with_suffix(
config.STATS["mean"].exts["mean"]
)
if dataop == "move":
item.rename(dest)
elif dataop == "copy":
with item.open("rb") as fsrc, dest.open("wb") as fdest:
os.sendfile(fdest.fileno(), fsrc.fileno(), 0, item.stat().st_size)
__all__ = ["proc_batch_exp"]