Source code for sierra.core.pipeline.stage3.statistics_calculator

# Copyright 2019 John Harwell, All rights reserved.
#
#  SPDX-License-Identifier: MIT

"""
Classes for generating statistics within and across experiments in a batch.
"""

# Core packages
import re
import multiprocessing as mp
import typing as tp
import queue
import time
import datetime
import logging
import pathlib

# 3rd party packages
import pandas as pd
import psutil

# Project packages
import sierra.core.variables.batch_criteria as bc
from sierra.core import types, utils, stat_kernels, storage, config


[docs]class GatherSpec: """ Data class for specifying .csv files to gather from an :term:`Experiment`. """
[docs] def __init__(self, exp_name: str, item_stem: str, imagize_csv_stem: tp.Optional[str]): self.exp_name = exp_name self.item_stem = item_stem self.imagize_csv_stem = imagize_csv_stem
[docs] def for_imagizing(self): return self.imagize_csv_stem is not None
[docs]class BatchExpParallelCalculator: """Process :term:`Output .csv` files for each experiment in the batch. In parallel for speed. """
[docs] def __init__(self, main_config: dict, cmdopts: types.Cmdopts): self.main_config = main_config self.cmdopts = cmdopts self.logger = logging.getLogger(__name__)
[docs] def __call__(self, criteria: bc.IConcreteBatchCriteria) -> None: exp_to_avg = utils.exp_range_calc(self.cmdopts, self.cmdopts['batch_output_root'], criteria) template_input_leaf = pathlib.Path(self.cmdopts['template_input_file']).stem avg_opts = { 'template_input_leaf': template_input_leaf, 'df_skip_verify': self.cmdopts['df_skip_verify'], 'dist_stats': self.cmdopts['dist_stats'], 'project_imagizing': self.cmdopts['project_imagizing'], 'processing_mem_limit': self.cmdopts['processing_mem_limit'], 'storage_medium': self.cmdopts['storage_medium'], 'df_homogenize': self.cmdopts['df_homogenize'] } if self.cmdopts['processing_serial']: n_gatherers = 1 n_processors = 1 else: # Aways need to have at least one of each! If SIERRA is invoked on a # machine with 2 or less logical cores, the calculation with # psutil.cpu_count() will return 0 for # gatherers. n_gatherers = max(1, int(psutil.cpu_count() * 0.25)) n_processors = max(1, int(psutil.cpu_count() * 0.75)) with mp.Pool(processes=n_gatherers + n_processors) as pool: self._execute(exp_to_avg, avg_opts, n_gatherers, n_processors, pool)
[docs] def _execute(self, exp_to_avg: tp.List[pathlib.Path], avg_opts: types.SimpleDict, n_gatherers: int, n_processors: int, pool) -> None: m = mp.Manager() gatherq = m.Queue() processq = m.Queue() for exp in exp_to_avg: gatherq.put(exp) # Start some threads gathering .csvs first to get things rolling. self.logger.debug("Starting %d gatherers, method=%s", n_gatherers, mp.get_start_method()) gathered = [pool.apply_async(BatchExpParallelCalculator._gather_worker, (gatherq, processq, self.main_config, avg_opts)) for i in range(0, n_gatherers)] self.logger.debug("Starting %d processors, method=%s", n_processors, mp.get_start_method()) processed = [pool.apply_async(BatchExpParallelCalculator._process_worker, (processq, self.main_config, self.cmdopts['batch_stat_root'], avg_opts)) for i in range(0, n_processors)] # To capture the otherwise silent crashes when something goes wrong in # worker threads. Any assertions will show and any exceptions will be # re-raised. self.logger.debug("Waiting for workers to finish") for g in gathered: g.get() for p in processed: p.get() pool.close() pool.join() self.logger.debug("All threads finished")
[docs] @staticmethod def _gather_worker(gatherq: mp.Queue, processq: mp.Queue, main_config: types.YAMLDict, avg_opts: tp.Dict[str, str]) -> None: gatherer = ExpCSVGatherer(main_config, avg_opts, processq) # Wait for 3 seconds after the queue is empty before bailing, at the # start. If that is not long enough then exponentially increase from # there until you find how long it takes to get the first item in the # queue, and use that as the appropriate timeout (plus a little # margin). timeout = 3 got_item = False n_tries = 0 while n_tries < 2: try: exp_output_root = gatherq.get(True, timeout) gatherer(exp_output_root) gatherq.task_done() got_item = True except queue.Empty: if got_item: break timeout *= 2 n_tries += 1
[docs] @staticmethod def _process_worker(processq: mp.Queue, main_config: types.YAMLDict, batch_stat_root: pathlib.Path, avg_opts: tp.Dict[str, str]) -> None: calculator = ExpStatisticsCalculator(main_config, avg_opts, batch_stat_root) # Wait for 3 seconds after the queue is empty before bailing, at the # start. If that is not long enough then exponentially increase from # there until you find how long it takes to get the first item in the # queue, and use that as the appropriate timeout (plus a little # margin). timeout = 3 got_item = False n_tries = 0 while n_tries < 2: try: item = processq.get(True, timeout) key = list(item.keys())[0] calculator(key, item[key]) processq.task_done() got_item = True except queue.Empty: if got_item: break timeout *= 2 n_tries += 1
[docs]class ExpCSVGatherer: """Gather all :term:`Output .csv` files from all runs within an experiment. "Gathering" in this context means creating a dictionary mapping which .csv came from where, so that statistics can be generated both across and with experiments in the batch. """
[docs] def __init__(self, main_config: types.YAMLDict, gather_opts: dict, processq: mp.Queue) -> None: self.processq = processq self.gather_opts = gather_opts # Will get the main name and extension of the config file (without the # full absolute path). self.template_input_fname = self.gather_opts['template_input_leaf'] self.main_config = main_config self.run_metrics_leaf = main_config['sierra']['run']['run_metrics_leaf'] self.videos_leaf = 'videos' self.project_imagize = gather_opts['project_imagizing'] self.logger = logging.getLogger(__name__)
[docs] def __call__(self, exp_output_root: pathlib.Path) -> None: """Process the CSV files found in the output save path.""" if not self.gather_opts['df_skip_verify']: self._verify_exp_outputs(exp_output_root) self.logger.info('Processing .csvs: %s...', exp_output_root.name) pattern = "{}_{}_output".format(re.escape(self.gather_opts['template_input_leaf']), r'\d+') runs = list(exp_output_root.iterdir()) assert(all(re.match(pattern, r.name) for r in runs)),\ f"Extra files/not all dirs in '{exp_output_root}' are exp runs" # Maps (unique .csv stem, optional parent dir) to the averaged dataframe to_gather = self._calc_gather_items(runs[0], exp_output_root.name) for item in to_gather: self._wait_for_memory() gathered = self._gather_item_from_sims(exp_output_root, item, runs) # Put gathered .csv list in the process queue self.processq.put(gathered) self.logger.debug("Enqueued %s items from %s for processing", len(to_gather), exp_output_root.name)
[docs] def _calc_gather_items(self, run_output_root: pathlib.Path, exp_name: str) -> tp.List[GatherSpec]: to_gather = [] sim_output_root = run_output_root / self.run_metrics_leaf # The metrics folder should contain nothing but .csv files and # directories. For all directories it contains, they each should contain # nothing but .csv files (these are for video rendering later). for item in sim_output_root.iterdir(): csv_stem = item.stem if item.is_file(): to_gather.append(GatherSpec(exp_name=exp_name, item_stem=csv_stem, imagize_csv_stem=None)) else: # This takes FOREVER, so only do it if we absolutely need to if not self.project_imagize: continue for csv_fname in item.iterdir(): to_gather.append(GatherSpec(exp_name=exp_name, item_stem=csv_stem, imagize_csv_stem=csv_fname.stem)) return to_gather
[docs] def _gather_item_from_sims(self, exp_output_root: pathlib.Path, item: GatherSpec, runs: tp.List[pathlib.Path]) -> tp.Dict[GatherSpec, tp.List[pd.DataFrame]]: gathered = {} # type: tp.Dict[GatherSpec, pd.DataFrame] for run in runs: sim_output_root = run / self.run_metrics_leaf if item.for_imagizing(): item_path = sim_output_root / item.item_stem / \ (item.imagize_csv_stem + config.kStorageExt['csv']) else: item_path = sim_output_root / \ (item.item_stem + config.kStorageExt['csv']) reader = storage.DataFrameReader(self.gather_opts['storage_medium']) df = reader(item_path, index_col=False) if df.dtypes[0] == 'object': df[df.columns[0]] = df[df.columns[0]].apply(lambda x: float(x)) if item not in gathered: gathered[item] = [] gathered[item].append(df) return gathered
[docs] def _wait_for_memory(self) -> None: while True: mem = psutil.virtual_memory() avail = mem.available / mem.total free_percent = avail * 100 free_limit = 100 - self.gather_opts['processing_mem_limit'] if free_percent >= free_limit: return self.logger.info("Waiting for memory: avail=%s,min=%s", free_percent, free_limit) time.sleep(1)
[docs] def _verify_exp_outputs(self, exp_output_root: pathlib.Path) -> None: """ Verify the integrity of all runs in an experiment. Specifically: - All runs produced all CSV files. - All runs CSV files with the same name have the same # rows and columns. - No CSV files contain NaNs. """ experiments = exp_output_root.iterdir() self.logger.info('Verifying results in %s...', str(exp_output_root)) start = time.time() for exp1 in experiments: csv_root1 = exp1 / self.run_metrics_leaf for exp2 in experiments: csv_root2 = exp2 / self.run_metrics_leaf if not csv_root2.is_dir(): continue self._verify_exp_outputs_pairwise(csv_root1, csv_root2) elapsed = int(time.time() - start) sec = datetime.timedelta(seconds=elapsed) self.logger.info("Done verifying results in %s: %s", exp_output_root, sec)
[docs] def _verify_exp_outputs_pairwise(self, csv_root1: pathlib.Path, csv_root2: pathlib.Path) -> None: for csv in csv_root2.iterdir(): path1 = csv path2 = csv_root2 / csv.name # .csvs for rendering that we don't verify (for now...) if path1.is_dir() or path2.is_dir(): self.logger.debug("Not verifying '%s': contains rendering data", str(path1)) continue assert (utils.path_exists(path1) and utils.path_exists(path2)),\ f"Either {path1} or {path2} does not exist" # Verify both dataframes have same # columns, and that # column sets are identical reader = storage.DataFrameReader(self.gather_opts['storage_medium']) df1 = reader(path1) df2 = reader(path2) assert (len(df1.columns) == len(df2.columns)), \ (f"Dataframes from {path1} and {path2} do not have " "the same # columns") assert(sorted(df1.columns) == sorted(df2.columns)),\ f"Columns from {path1} and {path2} not identical" # Verify the length of all columns in both dataframes is the same for c1 in df1.columns: assert(all(len(df1[c1]) == len(df1[c2]) for c2 in df1.columns)),\ f"Not all columns from {path1} have same length" assert(all(len(df1[c1]) == len(df2[c2]) for c2 in df1.columns)),\ (f"Not all columns from {path1} and {path2} have " "the same length")
[docs]class ExpStatisticsCalculator: """Generate statistics from output files for all runs within an experiment. .. IMPORTANT:: You *CANNOT* use logging ANYWHERE during processing .csv files. Why ? I *think* because of a bug in the logging module itself. If you get unlucky enough to spawn the process which enters the __call__() method in this class while another logging statement is in progress (and is therefore holding an internal logging module lock), then the underlying fork() call will copy the lock in the acquired state. Then, when this class goes to try to log something, it deadlocks with itself. You also can't just create loggers with unique names, as this seems to be something like the GIL, but for the logging module. Sometimes python sucks. """
[docs] def __init__(self, main_config: types.YAMLDict, avg_opts: dict, batch_stat_root: pathlib.Path) -> None: self.avg_opts = avg_opts # will get the main name and extension of the config file (without the # full absolute path) self.template_input_fname = self.avg_opts['template_input_leaf'] self.main_config = main_config self.batch_stat_root = batch_stat_root self.intra_perf_csv = main_config['sierra']['perf']['intra_perf_csv'] self.intra_perf_col = main_config['sierra']['perf']['intra_perf_col']
[docs] def __call__(self, gather_spec: GatherSpec, gathered_dfs: tp.List[pd.DataFrame]) -> None: csv_concat = pd.concat(gathered_dfs) exp_stat_root = self.batch_stat_root / gather_spec.exp_name utils.dir_create_checked(exp_stat_root, exist_ok=True) # Create directory for averaged .csv files for imagizing later. if gather_spec.for_imagizing(): utils.dir_create_checked(exp_stat_root / gather_spec.item_stem, exist_ok=True) by_row_index = csv_concat.groupby(csv_concat.index) dfs = {} if self.avg_opts['dist_stats'] in ['none', 'all']: dfs.update(stat_kernels.mean.from_groupby(by_row_index)) if self.avg_opts['dist_stats'] in ['conf95', 'all']: dfs.update(stat_kernels.conf95.from_groupby(by_row_index)) if self.avg_opts['dist_stats'] in ['bw', 'all']: dfs.update(stat_kernels.bw.from_groupby(by_row_index)) for ext in dfs: opath = exp_stat_root / gather_spec.item_stem if gather_spec.for_imagizing(): opath /= (gather_spec.imagize_csv_stem + ext) else: opath = opath.with_suffix(ext) df = utils.df_fill(dfs[ext], self.avg_opts['df_homogenize']) writer = storage.DataFrameWriter(self.avg_opts['storage_medium']) writer(df, opath, index=False)
__api__ = [ 'GatherSpec', 'BatchExpParallelCalculator', 'ExpCSVGatherer', 'ExpStatisticsCalculator' ]