# Copyright 2019 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""
Classes for collating data within a :term:`Batch Experiment`.
Collation is the process of "lifting" data from :term:`Experimental Runs
<Experimental Run>` across all :term:`Experiment` for all experiments in a
:term:`Batch Experiment` into a single file (a reduce operation). This is
needed to correctly calculate summary statistics for performance measures in
stage 3: you can't just run the calculated stddev through the calculations
because comparing curves of stddev is not meaningful.
"""
# Core packages
import multiprocessing as mp
import queue
import logging
import pathlib
# 3rd party packages
import polars as pl
import yaml
# Project packages
import sierra.core.variables.batch_criteria as bc
import sierra.core.plugin as pm
from sierra.core import types, storage, utils, config, batchroot
from sierra.core.pipeline.stage3 import gather
_logger = logging.getLogger(__name__)
[docs]
def proc_batch_exp(
main_config: dict,
cmdopts: types.Cmdopts,
pathset: batchroot.PathSet,
criteria: bc.XVarBatchCriteria,
) -> None:
"""Generate :term:`Collated Output Data` files for each experiment.
:term:`Collated Output Data` files generated from :term:`Raw Output Data`
files across :term:`Experimental Runs <Experimental Run>`. Gathered in
parallel for each experiment for speed, unless disabled with
``--processing-parallelism``.
"""
pool_opts = {}
pool_opts["parallelism"] = cmdopts["processing_parallelism"]
worker_opts = {
"project": cmdopts["project"],
"template_input_leaf": pathlib.Path(cmdopts["expdef_template"]).stem,
"df_verify": cmdopts["df_verify"],
"processing_mem_limit": cmdopts["processing_mem_limit"],
"storage": cmdopts["storage"],
"df_homogenize": cmdopts["df_homogenize"],
"project_config_root": cmdopts["project_config_root"],
}
exp_to_proc = utils.exp_range_calc(
cmdopts["exp_range"], pathset.output_root, criteria.gen_exp_names()
)
with mp.Pool(processes=pool_opts["parallelism"]) as pool:
_execute_for_batch(
main_config, pathset, exp_to_proc, worker_opts, pool_opts, pool
)
def _execute_for_batch(
main_config: types.YAMLDict,
pathset: batchroot.PathSet,
exp_to_proc: list[pathlib.Path],
worker_opts: types.SimpleDict,
pool_opts: types.SimpleDict,
pool,
) -> None:
m = mp.Manager()
gatherq = m.Queue()
processq = m.Queue()
for exp in exp_to_proc:
gatherq.put(exp)
_logger.debug(
"Starting %d gatherers, method=%s",
pool_opts["parallelism"],
mp.get_start_method(),
)
gathered = [
pool.apply_async(_gather_worker, (gatherq, processq, main_config, worker_opts))
for _ in range(0, pool_opts["parallelism"])
]
_logger.debug("Waiting for gathering to finish")
for g in gathered:
g.get()
_logger.debug(
"Starting %d processors, method=%s",
pool_opts["parallelism"],
mp.get_start_method(),
)
processed = [
pool.apply_async(
_process_worker,
(processq, main_config, pathset.stat_interexp_root, worker_opts),
)
for _ in range(0, pool_opts["parallelism"])
]
# To capture the otherwise silent crashes when something goes wrong in
# worker threads. Any assertions will show and any exceptions will be
# re-raised.
for p in processed:
p.get()
pool.close()
pool.join()
_logger.debug("Processing finished")
def _gather_worker(
gatherq: mp.Queue,
processq: mp.Queue,
main_config: types.YAMLDict,
gather_opts: types.SimpleDict,
) -> None:
gatherer = ExpDataGatherer(main_config, gather_opts, processq)
while True:
# Wait for 3 seconds after the queue is empty before bailing
try:
exp_output_root = gatherq.get(True, 3)
gatherer(exp_output_root)
gatherq.task_done()
except queue.Empty:
break
def _process_worker(
processq: mp.Queue,
main_config: types.YAMLDict,
batch_stat_interexp_root: pathlib.Path,
process_opts: types.SimpleDict,
) -> None:
while True:
# Wait for 3 seconds after the queue is empty before bailing
try:
spec = processq.get(True, 3)
_proc_single_exp(main_config, batch_stat_interexp_root, process_opts, spec)
processq.task_done()
except queue.Empty:
break
[docs]
class ExpDataGatherer(gather.BaseGatherer):
"""Gather :term:`Raw Output Data` files across all runs for :term:`Data Collation`.
The configured output directory for each run is searched recursively for
files to gather. To be eligible for gathering and later processing, files
must:
- Be non-empty
- Have a suffix which supported by the selected ``--storage`` plugin.
- Have a name (last part of absolute path, including extension) which
matches a configured :term:`Product` in a YAML file. E.g., a graph
from the :ref:`plugins/prod/graphs` plugin
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.logger = logging.getLogger(__name__)
def calc_gather_items(
self, run_output_root: pathlib.Path, exp_name: str
) -> list[gather.GatherSpec]:
proj_output_root = run_output_root / str(self.run_metrics_leaf)
plugin = pm.pipeline.get_plugin_module(self.gather_opts["storage"])
if not plugin.supports_output(pl.DataFrame):
raise RuntimeError(
"This plugin can only be used with storage plugins which support pl.DataFrame."
)
config_path = pathlib.Path(
self.gather_opts["project_config_root"], config.PROJECT_YAML.collate
)
try:
collate_config = yaml.load(utils.utf8open(config_path), yaml.FullLoader)
except FileNotFoundError:
self.logger.warning("%s does not exist!", config_path)
collate_config = {}
to_gather = []
for item in proj_output_root.rglob("*"):
# Must be a file (duh)
if not item.is_file():
continue
# Has to be a supported suffix for storage plugin
if (
not any(plugin.supports_input(s) for s in item.suffixes)
or item.stat().st_size == 0
):
continue
# Any number of perf metrics can be configured, so look for a match.
files = collate_config["intra-exp"]
perf_confs = [f for f in files if f["file"] in item.name]
if not perf_confs:
continue
# If we get a file match, then all the columns from that file should
# be added to the set of things to collate.
for conf in perf_confs:
to_gather.extend(
[
gather.GatherSpec(
exp_name=exp_name,
item_stem_path=item.relative_to(proj_output_root),
collate_col=col,
)
for col in conf["cols"]
]
)
return to_gather
def _proc_single_exp(
main_config: types.YAMLDict,
batch_stat_collate_root: pathlib.Path,
process_opts: types.SimpleDict,
spec: gather.ProcessSpec,
) -> None:
"""Collate :term:`Raw Output Data` files together (reduce operation).
:term:`Raw Output Data` files gathered from N :term:`Experimental Runs
<Experimental Run>` are combined together into a single :term:`Batch Summary
Data` file per :term:`Experiment` with 1 column per run.
"""
utils.dir_create_checked(batch_stat_collate_root, exist_ok=True)
collated = {}
key = (spec.gather.item_stem_path, spec.gather.collate_col)
# Build dictionary of columns instead of starting with empty DataFrame
columns_dict = {}
for i, df in enumerate(spec.dfs):
assert (
spec.gather.collate_col in df.columns
), f"{spec.gather.collate_col} not in {df.columns}"
collate_df = df[spec.gather.collate_col]
# Add column to dictionary
columns_dict[spec.exp_run_names[i]] = collate_df
# Create DataFrame from the dictionary of columns
collated[key] = pl.DataFrame(columns_dict)
for k, v in collated.items():
file_path, col = k
df = utils.df_fill(v, process_opts["df_homogenize"])
parent = batch_stat_collate_root / spec.gather.exp_name / file_path.parent
utils.dir_create_checked(parent, exist_ok=True)
# This preserves the directory structure of stuff in the per-run output
# run; if something is in a subdir there, it will show up in a subdir in
# the collated outputs too.
fname = f"{file_path.stem}-{col}" + config.STORAGE_EXT["csv"]
storage.df_write(df, parent / fname, "storage.csv")
__all__ = [
"ExpDataGatherer",
"proc_batch_exp",
]