Source code for sierra.plugins.proc.decompress.plugin

# Copyright 2025 John Harwell, All rights reserved.
#
#  SPDX-License-Identifier: MIT
"""
Plugin for decompressing experiment data. Currently only works with .tar.gz files.
"""

# Core packages
import multiprocessing as mp
import typing as tp
import logging
import pathlib

# 3rd party packages
import tarfile

# Project packages
import sierra.core.variables.batch_criteria as bc
from sierra.core import types, utils, batchroot

_logger = logging.getLogger(__name__)


[docs] def proc_batch_exp( main_config: types.YAMLDict, cmdopts: types.Cmdopts, pathset: batchroot.PathSet, criteria: bc.XVarBatchCriteria, ) -> None: """ Uncomcompress data for each :term:`Experiment` in the :term:`Batch Experiment`. Ideally this is done in parallel across experiments, but this can be changed to serial if memory on the SIERRA host machine is limited via ``--processing-parallelism``. """ exp_to_proc = utils.exp_range_calc( cmdopts["exp_range"], pathset.output_root, criteria.gen_exp_names() ) parallelism = cmdopts["processing_parallelism"] tasks = [] for exp in exp_to_proc: tasks.extend(_build_tasklist_for_exp(pathset.output_root / exp.name)) _logger.debug("Starting %s workers, method=%s", parallelism, mp.get_start_method()) with mp.Pool(processes=parallelism, maxtasksperchild=1) as pool: processed = [pool.starmap_async(_worker, tasks)] _logger.debug("Waiting for workers to finish") for p in processed: p.get() pool.close() pool.join() _logger.debug("All workers finished")
def _build_tasklist_for_exp( exp_output_root: pathlib.Path, ) -> list[tuple[pathlib.Path, pathlib.Path]]: """Add all compressed files from experiment to queue for processing. Enqueueing for processing is done at the file-level rather than per-experiment, so that for systems with more CPUs than experiments you still get maximum pthroughput. """ return [ (exp_output_root, f.relative_to(exp_output_root)) for f in exp_output_root.rglob("*.tar.gz") ] def _worker(exp_output_root: pathlib.Path, relpath: pathlib.Path) -> None: """Decompress a single tarball from a single experiment. Arguments: exp_output_root: Output root for the :term:`Experiment`. relpath: Path to the actual tarball relative to the experiment root. """ if not (exp_output_root / relpath).exists(): _logger.warning( "Cannot decompress: %s does not exist", (exp_output_root / relpath) ) return with tarfile.open(exp_output_root / relpath, "r:gz") as tar: tar.extractall(filter="data", path=str((exp_output_root / relpath).parent)) __all__ = ["proc_batch_exp"]