Source code for sierra.plugins.proc.imagize.plugin

# Copyright 2019 John Harwell, All rights reserved.
#
#  SPDX-License-Identifier: MIT

"""Classes for creating image files from ``.mean`` files for experiments.

See :ref:`plugins/proc/imagize` for usage documentation.

"""

# Core packages
import multiprocessing as mp
import typing as tp
import logging
import pathlib

# 3rd party packages
import yaml

# Project packages
import sierra.core.variables.batch_criteria as bc
from sierra.core import types, utils, batchroot, graphs, config
from sierra.core.pipeline.stage3 import gather
from sierra.plugins.proc.statistics import plugin as statistics
import sierra.core.plugin as pm

_logger = logging.getLogger(__name__)


[docs] def proc_batch_exp( main_config: types.YAMLDict, cmdopts: types.Cmdopts, pathset: batchroot.PathSet, criteria: bc.XVarBatchCriteria, ) -> None: """ Generate images for each :term:`Experiment` in the :term:`Batch Experiment`. Ideally this is done in parallel across experiments, but this can be changed to serial if memory on the SIERRA host machine is limited via ``--processing-parallelism``. """ config_path = pathlib.Path(cmdopts["project_config_root"]) / pathlib.Path( config.PROJECT_YAML.graphs ) if utils.path_exists(config_path): _logger.info("Loading imagizing config for project=%s", cmdopts["project"]) imagize_config = yaml.load(utils.utf8open(config_path), yaml.FullLoader)[ "imagize" ] else: _logger.warning("%s does not exist--cannot imagize", config_path) return if not cmdopts["imagize_no_stats"]: statistics.proc_batch_exp( main_config, cmdopts, pathset, criteria, ImagizeInputGatherer ) exp_to_imagize = utils.exp_range_calc( cmdopts["exp_range"], pathset.output_root, criteria.gen_exp_names() ) parallelism = cmdopts["processing_parallelism"] tasks = [] for exp in exp_to_imagize: exp_stat_root = pathset.stat_root / exp.name exp_imagize_root = pathset.imagize_root / exp.name exp_output_root = pathset.output_root / exp.name tasks.extend( _build_tasklist_for_exp( exp_stat_root, exp_imagize_root, exp_output_root, imagize_config, cmdopts["storage"], ) ) # 2025-06-06 [JRH]: This works around what is apparently a nasty memory leak # in hv caused by the hv.save() function for which the usual methods of # clearing memory/figures do not work. The maxtasksperchild argument + # chunksize kills all the child threads periodically, which fixes the leak # problem, for now. _logger.debug("Starting %s workers, method=%s", parallelism, mp.get_start_method()) with mp.Pool(processes=parallelism, maxtasksperchild=1) as pool: processed = pool.starmap_async(_worker, tasks, chunksize=10) _logger.debug("Waiting for workers to finish") processed.get() _logger.debug("All workers finished")
def _build_tasklist_for_exp( exp_stat_root: pathlib.Path, exp_imagize_root: pathlib.Path, exp_output_root: pathlib.Path, imagize_config: types.YAMLDict, storage: str, ) -> list[tuple[types.YAMLDict, dict]]: """Add all files from experiment to multiprocessing queue for processing. Enqueueing for processing is done at the file-level rather than per-experiment, so that for systems with more CPUs than experiments you still get maximum throughput. """ res = [] # There are two types of graphs currently supported: # # - Heatmaps, built from statistical data. # # - Network graphs, built from per-run data. We COULD average GraphML files, # but doing so in a general way is tricky at best, and brittle at worst. for graph in imagize_config: if dict(graph)["type"] == "heatmap": res.extend( _build_task_for_heatmap( graph, imagize_config, storage, exp_stat_root, exp_imagize_root ) ) elif dict(graph)["type"] == "network": res.extend( _build_task_for_network( graph, imagize_config, storage, exp_output_root, exp_imagize_root, ) ) else: raise ValueError("Only {heatmap,network} output graphs supported.") return res def _build_task_for_heatmap( graph: types.YAMLDict, imagize_config: types.YAMLDict, storage: str, exp_stat_root: pathlib.Path, exp_imagize_root: pathlib.Path, ) -> list[tuple[types.YAMLDict, dict]]: candidate = exp_stat_root / dict(graph)["src_stem"] res = [] # type: list[tuple[types.YAMLDict, dict]] if not candidate.is_dir(): _logger.debug( "Configured imagize source <batch stat root>/%s does not exist", (candidate.relative_to(exp_stat_root)), ) return res imagize_output_root = exp_imagize_root / candidate.relative_to(exp_stat_root) utils.dir_create_checked(imagize_output_root, exist_ok=True) for fpath in candidate.iterdir(): assert ( fpath.is_file() ), f"Imagize directory {candidate} must only contain files!" res.append( ( imagize_config, { "input_path": fpath, "graph_stem": candidate.relative_to(exp_stat_root), "imagize_output_root": imagize_output_root, "batch_root": exp_stat_root.parent.parent, "storage": storage, }, ) ) return res def _build_task_for_network( graph: types.YAMLDict, imagize_config: types.YAMLDict, storage: str, exp_output_root: pathlib.Path, exp_imagize_root: pathlib.Path, ) -> list[tuple[types.YAMLDict, dict]]: res = [] for run_output_root in exp_output_root.iterdir(): candidate = run_output_root / dict(graph)["src_stem"] if not candidate.is_dir(): _logger.debug( "Configured imagize source <output root>/%s does not exist", (candidate.relative_to(exp_output_root)), ) continue imagize_output_root = exp_imagize_root / candidate.relative_to(exp_output_root) utils.dir_create_checked(imagize_output_root, exist_ok=True) for fpath in candidate.iterdir(): assert ( fpath.is_file() ), f"Imagize directory {candidate} must only contain files!" res.append( ( imagize_config, { "input_path": fpath, "graph_stem": dict(graph)["src_stem"], "imagize_output_root": imagize_output_root, "batch_root": exp_output_root.parent.parent, "storage": storage, }, ) ) return res def _worker(imagize_config: types.YAMLDict, imagize_opts: dict) -> None: _proc_single_exp(imagize_config, imagize_opts) def _proc_single_exp(imagize_config: types.YAMLDict, imagize_opts: dict) -> None: """Create images from the averaged ``.mean`` files from a single experiment. If no ``.mean`` files suitable for averaging are found, nothing is done. See :ref:`plugins/proc/imagize` for per-engine descriptions of what "suitable" means. Arguments: imagize_config: Parsed YAML configuration for heatmaps. imagize_opts: Dictionary of imagizing options. """ match = None for graph in imagize_config: if dict(graph)["src_stem"] == str(imagize_opts["graph_stem"]): match = graph if match is not None: graph_pathset = graphs.PathSet( input_root=imagize_opts["input_path"].parent, output_root=imagize_opts["imagize_output_root"], model_root=None, batchroot=imagize_opts["batch_root"], ) # All input paths are of the form <dir>/<dir>_<NUMBER>.{extension} if dict(graph)["type"] == "heatmap": graphs.heatmap( pathset=graph_pathset, input_stem=imagize_opts["input_path"].stem, output_stem=imagize_opts["input_path"].stem, title=dict(match)["title"], medium=imagize_opts["storage"], xlabel="X", ylabel="Y", colnames=( match.get("x", "x"), match.get("y", "y"), match.get("z", "z"), ), backend="matplotlib", ) elif dict(graph)["type"] == "network": graphs.network( pathset=graph_pathset, layout=graph.get("layout", "spring"), input_stem=imagize_opts["input_path"].stem, output_stem=imagize_opts["input_path"].stem, title=dict(match)["title"], medium=imagize_opts["storage"], node_color_attr=graph.get("node_color_attr", None), node_size_attr=graph.get("node_size_attr", None), edge_color_attr=graph.get("edge_color_attr", None), edge_weight_attr=graph.get("edge_weight_attr", None), edge_label_attr=graph.get("edge_label_attr", None), backend="matplotlib", ) else: _logger.warning( "No match for graph with src_stem='%s' found in configuration", imagize_opts["graph_stem"], ) class ImagizeInputGatherer(gather.BaseGatherer): """Gather :term:`Raw Output Data` files from all runs for imagizing. The configured output directory for each run is searched recursively for directories containing files to gather. To be eligible for gathering and later processing, files must: - Be in a directory with the same name as the file, sans extension. - Be non-empty - Have a suffix which supported by the selected ``--storage`` plugin. Recursive nesting of files *within* a directory containing files to imagize is not supported--why would you do this anyway? """ def __init__( self, main_config: types.YAMLDict, gather_opts: types.SimpleDict, processq: mp.Queue, ) -> None: super().__init__(main_config, gather_opts, processq) self.logger = logging.getLogger(__name__) self.config_path = ( pathlib.Path(gather_opts["project_config_root"]) / config.PROJECT_YAML.graphs ) self.imagize_config = yaml.load( utils.utf8open(self.config_path), yaml.FullLoader )["imagize"] def calc_gather_items( self, run_output_root: pathlib.Path, exp_name: str ) -> list[gather.GatherSpec]: to_gather = [] proj_output_root = run_output_root / str(self.run_metrics_leaf) plugin = pm.pipeline.get_plugin_module(self.gather_opts["storage"]) for item in proj_output_root.rglob("*"): if not item.is_dir(): continue for imagizable in item.iterdir(): if ( not any(plugin.supports_input(s) for s in imagizable.suffixes) and imagizable.stat().st_size > 0 ): continue if not any( g["src_stem"] in str(imagizable) for g in self.imagize_config ): continue if item.name in imagizable.name: to_gather.append( gather.GatherSpec( exp_name=exp_name, item_stem_path=imagizable.relative_to(proj_output_root), collate_col=None, ) ) return to_gather __all__ = [ "proc_batch_exp", ]