# Copyright 2019 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""Classes for comparing deliverables within the same scenario.
Univariate and bivariate batch criteria.
"""
# Core packages
import os
import copy
import glob
import re
import typing as tp
import argparse
import logging
import pathlib
# 3rd party packages
import pandas as pd
# Project packages
from sierra.core.graphs.summary_line_graph import SummaryLineGraph
from sierra.core.graphs.stacked_surface_graph import StackedSurfaceGraph
from sierra.core.graphs.heatmap import Heatmap, DualHeatmap
from sierra.core.variables import batch_criteria as bc
import sierra.core.root_dirpath_generator as rdg
from sierra.core import types, utils, config, storage
[docs]class UnivarIntraScenarioComparator:
"""Compares a set of controllers within each of a set of scenarios.
Graph generation
is controlled via a config file parsed in
:class:`~sierra.core.pipeline.stage5.pipeline_stage5.PipelineStage5`.
Univariate batch criteria only.
Attributes:
controllers: List of controller names to compare.
cc_csv_root: Absolute directory path to the location controller CSV
files should be output to.
cc_graph_root: Absolute directory path to the location the generated
graphs should be output to.
cmdopts: Dictionary of parsed cmdline parameters.
cli_args: :class:`argparse` object containing the cmdline
parameters. Needed for
:class:`~sierra.core.variables.batch_criteria.BatchCriteria`
generation for each scenario controllers are compared within,
as batch criteria is dependent on controller+scenario
definition, and needs to be re-generated for each scenario in
order to get graph labels/axis ticks to come out right in all
cases.
"""
[docs] def __init__(self,
controllers: tp.List[str],
cc_csv_root: pathlib.Path,
cc_graph_root: pathlib.Path,
cmdopts: types.Cmdopts,
cli_args,
main_config: types.YAMLDict) -> None:
self.controllers = controllers
self.cc_graph_root = cc_graph_root
self.cc_csv_root = cc_csv_root
self.cmdopts = cmdopts
self.cli_args = cli_args
self.main_config = main_config
self.project_root = pathlib.Path(self.cmdopts['sierra_root'],
self.cmdopts['project'])
self.logger = logging.getLogger(__name__)
[docs] def __call__(self,
graphs: tp.List[types.YAMLDict],
legend: tp.List[str],
comp_type: str) -> None:
# Obtain the list of scenarios to use. We can just take the scenario
# list of the first controllers, because we have already checked that
# all controllers executed the same set scenarios.
batch_leaves = os.listdir(self.project_root / self.controllers[0])
# For each controller comparison graph we are interested in, generate it
# using data from all scenarios
cmdopts = copy.deepcopy(self.cmdopts)
for graph in graphs:
found = False
for leaf in batch_leaves:
if self._leaf_select(leaf):
self.logger.debug("Generating graph %s from scenario '%s'",
graph,
leaf)
self._compare_in_scenario(cmdopts=cmdopts,
graph=graph,
batch_leaf=leaf,
legend=legend)
found = True
break
if not found:
self.logger.warning("Did not find scenario to compare in for criteria %s",
self.cli_args.batch_criteria)
[docs] def _leaf_select(self, candidate: str) -> bool:
"""Determine if a controller can be included in the comparison for a scenario.
You can only compare controllers within the scenario directly generated
from the value of ``--batch-criteria``; other scenarios will (probably)
cause file not found errors.
"""
template_stem, scenario, _ = rdg.parse_batch_leaf(candidate)
leaf = rdg.gen_batch_leaf(criteria=self.cli_args.batch_criteria,
scenario=scenario,
template_stem=template_stem)
return leaf in candidate
[docs] def _compare_in_scenario(self,
cmdopts: types.Cmdopts,
graph: types.YAMLDict,
batch_leaf: str,
legend: tp.List[str]) -> None:
for controller in self.controllers:
dirs = [d for d in os.listdir(
self.project_root / controller) if batch_leaf in d]
if len(dirs) == 0:
self.logger.warning("Controller %s was not run on experiment %s",
controller,
batch_leaf)
continue
batch_leaf = dirs[0]
_, scenario, _ = rdg.parse_batch_leaf(batch_leaf)
# We need to generate the root directory paths for each batch
# experiment (which # lives inside of the scenario dir), because
# they are all different. We need generate these paths for EACH
# controller, because the controller is part of the batch root path.
paths = rdg.regen_from_exp(sierra_rpath=self.cli_args.sierra_root,
project=self.cli_args.project,
batch_leaf=batch_leaf,
controller=controller)
cmdopts.update(paths)
# For each scenario, we have to create the batch criteria for it,
# because they are all different.
criteria = bc.factory(self.main_config,
cmdopts,
self.cli_args,
scenario)
self._gen_csv(batch_leaf=batch_leaf,
criteria=criteria,
cmdopts=cmdopts,
controller=controller,
src_stem=graph['src_stem'],
dest_stem=graph['dest_stem'],
inc_exps=graph.get('include_exp', None))
self._gen_graph(batch_leaf=batch_leaf,
criteria=criteria,
cmdopts=cmdopts,
dest_stem=graph['dest_stem'],
title=graph.get('title', ''),
label=graph.get('label', ''),
inc_exps=graph.get('include_exp', None),
legend=legend)
[docs] def _gen_csv(self,
batch_leaf: str,
criteria: bc.IConcreteBatchCriteria,
cmdopts: types.Cmdopts,
controller: str,
src_stem: str,
dest_stem: str,
inc_exps: tp.Optional[str]) -> None:
"""Generate a set of CSV files for use in intra-scenario graph generation.
1 CSV per controller.
"""
self.logger.debug("Gathering data for '%s' from %s -> %s",
controller, src_stem, dest_stem)
ipath = pathlib.Path(cmdopts['batch_stat_collate_root'],
src_stem + config.kStats['mean'].exts['mean'])
# Some experiments might not generate the necessary performance measure
# .csvs for graph generation, which is OK.
if not utils.path_exists(ipath):
self.logger.warning("%s missing for controller %s",
ipath,
controller)
return
preparer = StatsPreparer(ipath_stem=cmdopts['batch_stat_collate_root'],
ipath_leaf=src_stem,
opath_stem=self.cc_csv_root,
n_exp=criteria.n_exp())
opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf, dest_stem, None)
preparer.across_rows(opath_leaf=opath_leaf, index=0, inc_exps=inc_exps)
[docs] def _gen_graph(self,
batch_leaf: str,
criteria: bc.IConcreteBatchCriteria,
cmdopts: types.Cmdopts,
dest_stem: str,
title: str,
label: str,
inc_exps: tp.Optional[str],
legend: tp.List[str]) -> None:
"""Generate a graph comparing the specified controllers within a scenario.
"""
opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf, dest_stem, None)
xticks = criteria.graph_xticks(cmdopts)
xtick_labels = criteria.graph_xticklabels(cmdopts)
if inc_exps is not None:
xtick_labels = utils.exp_include_filter(
inc_exps, xtick_labels, criteria.n_exp())
xticks = utils.exp_include_filter(
inc_exps, xticks, criteria.n_exp())
opath = self.cc_graph_root / (opath_leaf + config.kImageExt)
SummaryLineGraph(stats_root=self.cc_csv_root,
input_stem=opath_leaf,
output_fpath=opath,
stats=cmdopts['dist_stats'],
title=title,
xlabel=criteria.graph_xlabel(cmdopts),
ylabel=label,
xtick_labels=xtick_labels,
xticks=xticks,
logyscale=cmdopts['plot_log_yscale'],
large_text=self.cmdopts['plot_large_text'],
legend=legend).generate()
[docs]class BivarIntraScenarioComparator:
"""Compares a set of controllers within each of a set of scenarios.
Graph generation is controlled via a config file
parsed in
:class:`~sierra.core.pipeline.stage5.pipeline_stage5.PipelineStage5`.
Bivariate batch criteria only.
Attributes:
controllers: List of controller names to compare.
cc_csv_root: Absolute directory path to the location controller CSV
files should be output to.
cc_graph_root: Absolute directory path to the location the generated
graphs should be output to.
cmdopts: Dictionary of parsed cmdline parameters.
cli_args: :class:`argparse` object containing the cmdline
parameters. Needed for
:class:`~sierra.core.variables.batch_criteria.BatchCriteria`
generation for each scenario controllers are compared within,
as batch criteria is dependent on controller+scenario
definition, and needs to be re-generated for each scenario in
order to get graph labels/axis ticks to come out right in all
cases.
"""
[docs] def __init__(self,
controllers: tp.List[str],
cc_csv_root: pathlib.Path,
cc_graph_root: pathlib.Path,
cmdopts: types.Cmdopts,
cli_args: argparse.Namespace,
main_config: types.YAMLDict) -> None:
self.controllers = controllers
self.cc_csv_root = cc_csv_root
self.cc_graph_root = cc_graph_root
self.cmdopts = cmdopts
self.cli_args = cli_args
self.main_config = main_config
self.logger = logging.getLogger(__name__)
self.logger.debug("csv_root=%s", str(self.cc_csv_root))
self.logger.debug("graph_root=%s", str(self.cc_graph_root))
self.project_root = pathlib.Path(self.cmdopts['sierra_root'],
self.cmdopts['project'])
[docs] def __call__(self,
graphs: tp.List[types.YAMLDict],
legend: tp.List[str],
comp_type: str) -> None:
# Obtain the list of scenarios to use. We can just take the scenario
# list of the first controllers, because we have already checked that
# all controllers executed the same set scenarios.
batch_leaves = os.listdir(self.project_root / self.controllers[0])
cmdopts = copy.deepcopy(self.cmdopts)
for graph in graphs:
found = False
for leaf in batch_leaves:
if self._leaf_select(leaf):
self.logger.debug("Generating graph %s from scenario '%s'",
graph,
leaf)
self._compare_in_scenario(cmdopts=cmdopts,
graph=graph,
batch_leaf=leaf,
legend=legend,
comp_type=comp_type)
found = True
break
if not found:
self.logger.warning("Did not find scenario to compare in for criteria '%s'",
self.cli_args.batch_criteria)
[docs] def _leaf_select(self, candidate: str) -> bool:
"""Determine if a controller can be included in the comparison for a scenario.
You can only compare controllers within the scenario directly generated
from the value of ``--batch-criteria``; other scenarios will (probably)
cause file not found errors.
"""
template_stem, scenario, _ = rdg.parse_batch_leaf(candidate)
leaf = rdg.gen_batch_leaf(criteria=self.cli_args.batch_criteria,
scenario=scenario,
template_stem=template_stem)
return leaf in candidate
[docs] def _compare_in_scenario(self,
cmdopts: types.Cmdopts,
graph: types.YAMLDict,
batch_leaf: str,
legend: tp.List[str],
comp_type: str) -> None:
"""Compare all controllers within the specified scenario.
Generates CSV files and graphs according to configuration.
"""
for controller in self.controllers:
dirs = [d for d in os.listdir(
self.project_root / controller) if batch_leaf in d]
if len(dirs) == 0:
self.logger.warning("Controller '%s' was not run on scenario '%s'",
controller,
batch_leaf)
continue
batch_leaf = dirs[0]
_, scenario, _ = rdg.parse_batch_leaf(batch_leaf)
# We need to generate the root directory paths for each batch
# experiment (which # lives inside of the scenario dir), because
# they are all different. We need generate these paths for EACH
# controller, because the controller is part of the batch root path.
paths = rdg.regen_from_exp(sierra_rpath=self.cli_args.sierra_root,
project=self.cli_args.project,
batch_leaf=batch_leaf,
controller=controller)
cmdopts.update(paths)
# For each scenario, we have to create the batch criteria for it,
# because they are all different.
criteria = bc.factory(self.main_config,
cmdopts,
self.cli_args,
scenario)
if comp_type == 'LNraw':
self._gen_csvs_for_1D(cmdopts=cmdopts,
criteria=criteria,
controller=controller,
batch_leaf=batch_leaf,
src_stem=graph['src_stem'],
dest_stem=graph['dest_stem'],
primary_axis=graph.get('primary_axis', 0),
inc_exps=graph.get('include_exp', None))
elif 'HM' in comp_type or 'SU' in comp_type:
self._gen_csvs_for_2D_or_3D(cmdopts=cmdopts,
controller=controller,
batch_leaf=batch_leaf,
src_stem=graph['src_stem'],
dest_stem=graph['dest_stem'])
if comp_type == 'LNraw':
self._gen_graphs1D(batch_leaf=batch_leaf,
criteria=criteria,
cmdopts=cmdopts,
dest_stem=graph['dest_stem'],
title=graph.get('title', ''),
label=graph.get('label', ''),
primary_axis=graph.get('primary_axis', 0),
inc_exps=graph.get('include_exp', None),
legend=legend)
elif 'HM' in comp_type:
self._gen_graphs2D(batch_leaf=batch_leaf,
criteria=criteria,
cmdopts=cmdopts,
dest_stem=graph['dest_stem'],
title=graph.get('title', ''),
label=graph.get('label', ''),
legend=legend,
comp_type=comp_type)
elif 'SU' in comp_type:
self._gen_graph3D(batch_leaf=batch_leaf,
criteria=criteria,
cmdopts=cmdopts,
dest_stem=graph['dest_stem'],
title=graph.get('title', ''),
zlabel=graph.get('label', ''),
legend=legend,
comp_type=comp_type)
[docs] def _gen_csvs_for_2D_or_3D(self,
cmdopts: types.Cmdopts,
batch_leaf: str,
controller: str,
src_stem: str,
dest_stem: str) -> None:
"""Generate a set of CSV files for use in intra-scenario graph generation.
1 CSV per controller, for 2D/3D comparison types only. Because each CSV
file corresponding to performance measures are 2D arrays, we actually
just copy and rename the performance measure CSV files for each
controllers into :attr:`cc_csv_root`.
:class:`~sierra.core.graphs.stacked_surface_graph.StackedSurfaceGraph`
expects an ``_[0-9]+.csv`` pattern for each 2D surfaces to graph in
order to disambiguate which files belong to which controller without
having the controller name in the filepath (contains dots), so we do
that here. :class:`~sierra.core.graphs.heatmap.Heatmap` does not require
that, but for the heatmap set we generate it IS helpful to have an easy
way to differentiate primary vs. other controllers, so we do it
unconditionally here to handle both cases.
"""
self.logger.debug("Gathering data for '%s' from %s -> %s",
controller, src_stem, dest_stem)
csv_ipath = pathlib.Path(cmdopts['batch_stat_collate_root'],
src_stem + config.kStats['mean'].exts['mean'])
# Some experiments might not generate the necessary performance measure
# .csvs for graph generation, which is OK.
if not utils.path_exists(csv_ipath):
self.logger.warning("%s missing for controller '%s'",
csv_ipath,
controller)
return
df = storage.DataFrameReader('storage.csv')(csv_ipath)
opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf,
dest_stem,
[self.controllers.index(controller)])
opath_stem = self.cc_csv_root / opath_leaf
opath = opath_stem.with_name(
opath_stem.name + config.kStats['mean'].exts['mean'])
writer = storage.DataFrameWriter('storage.csv')
writer(df, opath, index=False)
[docs] def _gen_csvs_for_1D(self,
cmdopts: types.Cmdopts,
criteria: bc.IConcreteBatchCriteria,
batch_leaf: str,
controller: str,
src_stem: str,
dest_stem: str,
primary_axis: int,
inc_exps: tp.Optional[str]) -> None:
"""Generate a set of CSV files for use in intra-scenario graph generation.
Because we are targeting linegraphs, we draw the the i-th row/col (as
configured) from the performance results of each controller .csv, and
concatenate them into a new .csv file which can be given to
:class:`~sierra.core.graphs.summary_line_graph.SummaryLineGraph`.
"""
self.logger.debug("Gathering data for '%s' from %s -> %s",
controller, src_stem, dest_stem)
csv_ipath = pathlib.Path(cmdopts['batch_stat_collate_root'],
src_stem + config.kStats['mean'].exts['mean'])
# Some experiments might not generate the necessary performance measure
# .csvs for graph generation, which is OK.
if not utils.path_exists(csv_ipath):
self.logger.warning("%s missing for controller '%s'",
csv_ipath,
controller)
return
if cmdopts['dist_stats'] != 'none':
self.logger.warning(("--dist-stats is not supported with "
"1D CSVs sliced from 2D CSV for linegraph "
"generation: no stats will be included"))
if primary_axis == 0:
preparer = StatsPreparer(ipath_stem=cmdopts['batch_stat_collate_root'],
ipath_leaf=src_stem,
opath_stem=self.cc_csv_root,
n_exp=criteria.criteria2.n_exp())
reader = storage.DataFrameReader('storage.csv')
ipath = pathlib.Path(cmdopts['batch_stat_collate_root'],
src_stem + config.kStats['mean'].exts['mean'])
n_rows = len(reader(ipath).index)
for i in range(0, n_rows):
opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf,
dest_stem,
[i])
preparer.across_rows(opath_leaf=opath_leaf,
index=i,
inc_exps=inc_exps)
else:
preparer = StatsPreparer(ipath_stem=cmdopts['batch_stat_collate_root'],
ipath_leaf=src_stem,
opath_stem=self.cc_csv_root,
n_exp=criteria.criteria1.n_exp())
exp_dirs = criteria.gen_exp_names(cmdopts)
xlabels, ylabels = utils.bivar_exp_labels_calc(exp_dirs)
xlabels = utils.exp_include_filter(
inc_exps, xlabels, criteria.criteria1.n_exp())
for col in ylabels:
col_index = ylabels.index(col)
opath_leaf = LeafGenerator.from_batch_leaf(
batch_leaf, dest_stem, [col_index])
preparer.across_cols(opath_leaf=opath_leaf,
col_index=col_index,
all_cols=xlabels,
inc_exps=inc_exps)
[docs] def _gen_graphs1D(self,
batch_leaf: str,
criteria: bc.BivarBatchCriteria,
cmdopts: types.Cmdopts,
dest_stem: str,
title: str,
label: str,
primary_axis: int,
inc_exps: tp.Optional[str],
legend: tp.List[str]) -> None:
oleaf = LeafGenerator.from_batch_leaf(batch_leaf, dest_stem, None)
csv_stem_root = self.cc_csv_root / oleaf
pattern = str(csv_stem_root) + '*' + config.kStats['mean'].exts['mean']
paths = [f for f in glob.glob(pattern) if re.search('_[0-9]+', f)]
for i in range(0, len(paths)):
opath_leaf = LeafGenerator.from_batch_leaf(
batch_leaf, dest_stem, [i])
img_opath = self.cc_graph_root / (opath_leaf + config.kImageExt)
if primary_axis == 0:
n_exp = criteria.criteria1.n_exp()
xticks = utils.exp_include_filter(inc_exps,
criteria.graph_yticks(
cmdopts),
n_exp)
xtick_labels = utils.exp_include_filter(inc_exps,
criteria.graph_yticklabels(
cmdopts),
n_exp)
xlabel = criteria.graph_ylabel(cmdopts)
else:
n_exp = criteria.criteria2.n_exp()
xticks = utils.exp_include_filter(inc_exps,
criteria.graph_xticks(
cmdopts),
n_exp)
xtick_labels = utils.exp_include_filter(inc_exps,
criteria.graph_xticklabels(
cmdopts),
n_exp)
xlabel = criteria.graph_xlabel(cmdopts)
# TODO: Fix no statistics support for these graphs
SummaryLineGraph(stats_root=self.cc_csv_root,
input_stem=opath_leaf,
stats='none',
output_fpath=img_opath,
model_root=cmdopts['batch_model_root'],
title=title,
xlabel=xlabel,
ylabel=label,
xticks=xticks,
xtick_labels=xtick_labels,
legend=legend,
logyscale=cmdopts['plot_log_yscale'],
large_text=cmdopts['plot_large_text']).generate()
[docs] def _gen_graphs2D(self,
batch_leaf: str,
criteria: bc.BivarBatchCriteria,
cmdopts: types.Cmdopts,
dest_stem: str,
title: str,
label: str,
legend: tp.List[str],
comp_type: str) -> None:
if comp_type in ['HMscale', 'HMdiff']:
self._gen_paired_heatmaps(batch_leaf,
criteria,
cmdopts,
dest_stem,
title,
label,
comp_type)
elif comp_type == 'HMraw':
self._gen_dual_heatmaps(batch_leaf,
criteria,
cmdopts,
dest_stem,
title,
label,
legend,
comp_type)
[docs] def _gen_paired_heatmaps(self,
batch_leaf: str,
criteria: bc.BivarBatchCriteria,
cmdopts: types.Cmdopts,
dest_stem: str,
title: str,
label: str,
comp_type: str) -> None:
"""Generate a set of :class:`~sierra.core.graphs.heatmap.Heatmap` graphs.
Uses a configured controller of primary interest against all other
controllers (one graph per pairing), after input files have been
gathered from each controller into :attr:`cc_csv_root`.
"""
opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf, dest_stem, None)
opath = self.cc_graph_root / (opath_leaf + config.kImageExt)
pattern = self.cc_csv_root / (opath_leaf + '*' +
config.kStats['mean'].exts['mean'])
paths = [pathlib.Path(f) for f in glob.glob(str(pattern))
if re.search(r'_[0-9]+\.', f)]
self.logger.debug("Generating paired heatmaps in %s -> %s",
pattern,
[str(f.relative_to(self.cc_csv_root)) for f in paths])
if len(paths) < 2:
self.logger.warning(("Not enough matches from pattern='%s'--"
"skipping paired heatmap generation"),
pattern)
return
reader = storage.DataFrameReader('storage.csv')
ref_df = reader(paths[0])
for i in range(1, len(paths)):
df = reader(paths[i])
if comp_type == 'HMscale':
plot_df = df / ref_df
elif comp_type == 'HMdiff':
plot_df = df - ref_df
# Have to add something before the .mean to ensure that the diff CSV
# does not get picked up by the regex above as each controller is
# treated in turn as the primary.
leaf = LeafGenerator.from_batch_leaf(batch_leaf,
dest_stem,
[0, i]) + '_paired'
ipath = self.cc_csv_root / (leaf + config.kStats['mean'].exts['mean'])
opath = self.cc_graph_root / (leaf + config.kImageExt)
writer = storage.DataFrameWriter('storage.csv')
writer(plot_df, ipath, index=False)
Heatmap(input_fpath=ipath,
output_fpath=opath,
title=title,
transpose=self.cmdopts['plot_transpose_graphs'],
zlabel=self._gen_zaxis_label(label, comp_type),
xlabel=criteria.graph_xlabel(cmdopts),
ylabel=criteria.graph_ylabel(cmdopts),
xtick_labels=criteria.graph_xticklabels(cmdopts),
ytick_labels=criteria.graph_yticklabels(cmdopts)).generate()
[docs] def _gen_dual_heatmaps(self,
batch_leaf: str,
criteria: bc.BivarBatchCriteria,
cmdopts: types.Cmdopts,
dest_stem: str,
title: str,
label: str,
legend: tp.List[str],
comp_type: str) -> None:
"""Generate a set of :class:`~sierra.core.graphs.heatmap.DualHeatmap` graphs.
Graphs contain all pairings of (primary controller, other), one per
graph, within the specified scenario after input files have been
gathered from each controller into :attr:`cc_csv_root`. Only valid if
the comparison type is ``HMraw``.
"""
opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf, dest_stem, None)
opath = self.cc_graph_root / (opath_leaf + config.kImageExt)
pattern = self.cc_csv_root / (opath_leaf + '*' +
config.kStats['mean'].exts['mean'])
paths = [pathlib.Path(f) for f in glob.glob(str(pattern))
if re.search('_[0-9]+', f)]
self.logger.debug("Generating dual heatmaps in %s -> %s",
pattern,
[str(f.relative_to(self.cc_csv_root)) for f in paths])
DualHeatmap(ipaths=paths,
output_fpath=opath,
title=title,
large_text=cmdopts['plot_large_text'],
zlabel=self._gen_zaxis_label(label, comp_type),
xlabel=criteria.graph_xlabel(cmdopts),
ylabel=criteria.graph_ylabel(cmdopts),
legend=legend,
xtick_labels=criteria.graph_xticklabels(cmdopts),
ytick_labels=criteria.graph_yticklabels(cmdopts)).generate()
[docs] def _gen_graph3D(self,
batch_leaf: str,
criteria: bc.BivarBatchCriteria,
cmdopts: types.Cmdopts,
dest_stem: str,
title: str,
zlabel: str,
legend: tp.List[str],
comp_type: str) -> None:
"""Generate a graph comparing the specified controllers within a scenario.
Graph contains the specified controllers within thes pecified scenario
after input files have been gathered from each controllers into
:attr:`cc_csv_root`.
"""
opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf, dest_stem, None)
opath = self.cc_graph_root / (opath_leaf + config.kImageExt)
pattern = self.cc_csv_root / (opath_leaf + '*' +
config.kStats['mean'].exts['mean'])
paths = [pathlib.Path(f) for f in glob.glob(
str(pattern)) if re.search('_[0-9]+', f)]
self.logger.debug("Generating stacked surface graphs in %s -> %s",
pattern,
[str(f.relative_to(self.cc_csv_root)) for f in paths])
StackedSurfaceGraph(ipaths=paths,
output_fpath=opath,
title=title,
ylabel=criteria.graph_xlabel(cmdopts),
xlabel=criteria.graph_ylabel(cmdopts),
zlabel=self._gen_zaxis_label(zlabel, comp_type),
xtick_labels=criteria.graph_yticklabels(cmdopts),
ytick_labels=criteria.graph_xticklabels(cmdopts),
legend=legend,
comp_type=comp_type).generate()
[docs] def _gen_zaxis_label(self, label: str, comp_type: str) -> str:
"""If the comparison type is not "raw", put it on the graph as Z axis title.
"""
if 'scale' in comp_type:
return label + ' (Scaled)'
elif 'diff' in comp_type:
return label + ' (Difference Comparison)'
return label
class StatsPreparer():
"""Prepare statistics generated from controllers for graph generation.
If the batch criteria is univariate, then only :meth:`across_rows` is valid;
for bivariate batch criteria, either :meth:`across_rows` or
:meth:`across_cols` is valid, depending on what the primary axis is.
"""
def __init__(self,
ipath_stem: pathlib.Path,
ipath_leaf: str,
opath_stem: pathlib.Path,
n_exp: int):
self.ipath_stem = ipath_stem
self.ipath_leaf = ipath_leaf
self.opath_stem = opath_stem
self.n_exp = n_exp
def across_cols(self,
opath_leaf: str,
all_cols: tp.List[str],
col_index: int,
inc_exps: tp.Optional[str]) -> None:
"""Prepare statistics in column-major batch criteria.
The criteria of interest varies across the rows of controller CSVs. We
take row `index` from a given dataframe and take the rows specified by
the `inc_exps` and append them to a results dataframe column-wise, which
we then write the file system.
"""
exts = config.kStats['mean'].exts
exts.update(config.kStats['conf95'].exts)
exts.update(config.kStats['bw'].exts)
for k in exts:
stat_ipath = pathlib.Path(self.ipath_stem,
self.ipath_leaf + exts[k])
stat_opath = pathlib.Path(self.opath_stem,
opath_leaf + exts[k])
df = self._accum_df_by_col(stat_ipath,
stat_opath,
all_cols,
col_index,
inc_exps)
if df is not None:
writer = storage.DataFrameWriter('storage.csv')
opath = self.opath_stem / (opath_leaf + exts[k])
writer(df, opath, index=False)
def across_rows(self,
opath_leaf: str,
index: int,
inc_exps: tp.Optional[str]) -> None:
"""Prepare statistics in row-major batch criteria.
The criteria of interest varies across the columns of controller
CSVs. We take row `index` from a given dataframe and take the columns
specified by the `inc_exps` and append them to a results dataframe
row-wise, which we then write the file system.
"""
exts = config.kStats['mean'].exts
exts.update(config.kStats['conf95'].exts)
exts.update(config.kStats['bw'].exts)
for k in exts:
stat_ipath = pathlib.Path(self.ipath_stem,
self.ipath_leaf + exts[k])
stat_opath = pathlib.Path(self.opath_stem,
opath_leaf + exts[k])
df = self._accum_df_by_row(stat_ipath, stat_opath, index, inc_exps)
if df is not None:
writer = storage.DataFrameWriter('storage.csv')
writer(df,
self.opath_stem / (opath_leaf + exts[k]),
index=False)
def _accum_df_by_col(self,
ipath: pathlib.Path,
opath: pathlib.Path,
all_cols: tp.List[str],
col_index: int,
inc_exps: tp.Optional[str]) -> pd.DataFrame:
reader = storage.DataFrameReader('storage.csv')
if utils.path_exists(opath):
cum_df = reader(opath)
else:
cum_df = None
if utils.path_exists(ipath):
t = reader(ipath)
if inc_exps is not None:
cols_from_index = utils.exp_include_filter(inc_exps,
list(t.index),
self.n_exp)
else:
cols_from_index = slice(None, None, None)
if cum_df is None:
cum_df = pd.DataFrame(columns=all_cols)
# We need to turn each column of the .csv on the filesystem into a
# row in the .csv which we want to write out, so we transpose, fix
# the index, and then set the columns of the new transposed
# dataframe.
tp_df = t.transpose()
tp_df = tp_df.reset_index(drop=True)
tp_df = tp_df[cols_from_index]
tp_df.columns = all_cols
# Series are columns, so we have to transpose before concatenating
cum_df = pd.concat([cum_df,
tp_df.loc[col_index, :].to_frame().T])
# cum_df = pd.concat([cum_df, tp_df.loc[col_index, :]])
return cum_df
return None
def _accum_df_by_row(self,
ipath: pathlib.Path,
opath: pathlib.Path,
index: int,
inc_exps: tp.Optional[str]) -> pd.DataFrame:
reader = storage.DataFrameReader('storage.csv')
if utils.path_exists(opath):
cum_df = reader(opath)
else:
cum_df = None
if utils.path_exists(ipath):
t = reader(ipath)
if inc_exps is not None:
cols = utils.exp_include_filter(inc_exps,
list(t.columns),
self.n_exp)
else:
cols = t.columns
if cum_df is None:
cum_df = pd.DataFrame(columns=cols)
# Series are columns, so we have to transpose before concatenating
cum_df = pd.concat([cum_df,
t.loc[index, cols].to_frame().T])
return cum_df
return None
class LeafGenerator():
@staticmethod
def from_controller(batch_root: pathlib.Path,
graph_stem: str,
controllers: tp.List[str],
controller: str) -> str:
_, batch_leaf, _ = rdg.parse_batch_leaf(str(batch_root))
leaf = graph_stem + "-" + batch_leaf + \
'_' + str(controllers.index(controller))
return leaf
@staticmethod
def from_batch_root(batch_root: pathlib.Path,
graph_stem: str,
index: tp.Union[int, None]):
_, scenario, _ = rdg.parse_batch_leaf(str(batch_root))
leaf = graph_stem + "-" + scenario
if index is not None:
leaf += '_' + str(index)
return leaf
@staticmethod
def from_batch_leaf(batch_leaf: str,
graph_stem: str,
indices: tp.Union[tp.List[int], None]):
leaf = graph_stem + "-" + batch_leaf
if indices is not None:
leaf += '_' + ''.join([str(i) for i in indices])
return leaf
__api__ = ['UnivarIntraScenarioComparator', 'BivarIntraScenarioComparator']