Source code for sierra.plugins.proc.compress.plugin
# Copyright 2025 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""
Plugin for compressing experiment data. Currently only works with .tar.gz files.
"""
# Core packages
import multiprocessing as mp
import typing as tp
import logging
import pathlib
import shutil
# 3rd party packages
import tarfile
# Project packages
import sierra.core.variables.batch_criteria as bc
from sierra.core import types, utils, batchroot
_logger = logging.getLogger(__name__)
[docs]
def proc_batch_exp(
main_config: types.YAMLDict,
cmdopts: types.Cmdopts,
pathset: batchroot.PathSet,
criteria: bc.XVarBatchCriteria,
) -> None:
"""
Comcompress data for each :term:`Experiment` in the :term:`Batch Experiment`.
Ideally this is done in parallel across experiments, but this can be changed
to serial if memory on the SIERRA host machine is limited via
``--processing-parallelism``.
"""
exp_to_proc = utils.exp_range_calc(
cmdopts["exp_range"], pathset.output_root, criteria.gen_exp_names()
)
parallelism = cmdopts["processing_parallelism"]
tasks = []
run_metrics_leaf = main_config["sierra"]["run"]["run_metrics_leaf"]
for exp in exp_to_proc:
tasks.extend(
_build_tasklist_for_exp(
pathset.output_root / exp.name,
run_metrics_leaf,
cmdopts["compress_remove_after"],
)
)
_logger.debug("Starting %s workers, method=%s", parallelism, mp.get_start_method())
with mp.Pool(processes=parallelism, maxtasksperchild=1) as pool:
processed = [pool.starmap_async(_worker, tasks)]
_logger.debug("Waiting for workers to finish")
for p in processed:
p.get()
pool.close()
pool.join()
_logger.debug("All workers finished")
def _build_tasklist_for_exp(
exp_output_root: pathlib.Path,
run_metrics_leaf: pathlib.Path,
remove_after: bool,
) -> list[tuple[pathlib.Path, pathlib.Path, bool]]:
"""Add root dir each experimental run to queue for processing.
Enqueueing for processing is done at the file-level rather than
per-experiment, so that for systems with more CPUs than experiments you
still get maximum throughput.
"""
return [
(
exp_output_root,
exp.relative_to(exp_output_root) / run_metrics_leaf,
remove_after,
)
for exp in exp_output_root.iterdir()
]
def _worker(
exp_output_root: pathlib.Path, relpath: pathlib.Path, remove_after: bool
) -> None:
"""Compress the output root for a single experiment into a tarball.
Arguments:
exp_output_root: Output root for the :term:`Experiment`.
relpath: Path to the actual tarball relative to the experiment root.
"""
if not (exp_output_root / relpath).exists():
_logger.warning(
"Cannot compress: %s does not exist", (exp_output_root / relpath)
)
return
with tarfile.open(
(exp_output_root / relpath).with_suffix(".tar.gz"), "w:gz"
) as tar:
tar.add(
str(exp_output_root / relpath), arcname=relpath.relative_to(relpath.parent)
)
if remove_after:
shutil.rmtree(exp_output_root / relpath)
__all__ = ["proc_batch_exp"]