Source code for sierra.core.pipeline.stage4.rendering

# Copyright 2019 John Harwell, All rights reserved.
#
#  SPDX-License-Identifier: MIT

"""Classes for rendering frames (images) into videos.

Frames can be:

- Captured by by the ``--platform`` during stage 2.

- Generated during stage 3 of SIERRA via imagizing.

- Generated inter-experiment heatmaps from bivariate experiments.

"""

# Core packages
import subprocess
import typing as tp
import multiprocessing as mp
import queue
import copy
import shutil
import logging
import pathlib

# 3rd party packages
import psutil

# Project packages
import sierra.core.variables.batch_criteria as bc
from sierra.core import types, config, utils


[docs]class ParallelRenderer: """Base class for performing the requested rendering in parallel. Unless disabled with ``--proccessing-serial``, then it is done serially. """
[docs] def __init__(self, main_config: types.YAMLDict, cmdopts: types.Cmdopts) -> None: self.main_config = main_config self.cmdopts = cmdopts
[docs] def do_rendering(self, inputs: tp.List[types.SimpleDict]) -> None: """ Do the rendering. """ q = mp.JoinableQueue() # type: mp.JoinableQueue for spec in inputs: q.put(spec) # Render videos in parallel--waaayyyy faster if self.cmdopts['processing_serial']: parallelism = 1 else: parallelism = psutil.cpu_count() for _ in range(0, parallelism): p = mp.Process(target=ParallelRenderer._thread_worker, args=(q, self.main_config)) p.start() q.join()
[docs] @staticmethod def _thread_worker(q: mp.Queue, main_config: types.YAMLDict) -> None: while True: # Wait for 3 seconds after the queue is empty before bailing try: render_opts = q.get(True, 3) ExpRenderer()(main_config, render_opts) q.task_done() except queue.Empty: break
[docs]class PlatformFramesRenderer(ParallelRenderer): """Renders frames (images) captured in each experimental run by a platform. """
[docs] def __init__(self, main_config: types.YAMLDict, cmdopts: types.Cmdopts) -> None: super().__init__(main_config, cmdopts) self.main_config = main_config self.cmdopts = cmdopts self.logger = logging.getLogger(__name__)
[docs] def __call__(self, criteria: bc.IConcreteBatchCriteria) -> None: exp_to_render = utils.exp_range_calc(self.cmdopts, self.cmdopts['batch_output_root'], criteria) inputs = [] for exp in exp_to_render: inputs.extend(self._calc_rendering_inputs(exp)) self.do_rendering(inputs)
[docs] def _calc_rendering_inputs(self, exp: pathlib.Path) -> tp.List[types.SimpleDict]: # Render targets are in # <batch_output_root>/<exp>/<sim>/<frames_leaf>, for all # runs in a given experiment (which can be a lot!). output_dir = pathlib.Path(self.cmdopts['batch_video_root'], exp.name) inputs = [] for run in exp.iterdir(): platform = self.cmdopts['platform'].split('.')[1] frames_leaf = config.kRendering[platform]['frames_leaf'] opts = { 'ofile_name': run.name + config.kRenderFormat, 'input_dir': str(exp / run / frames_leaf), 'output_dir': str(output_dir), 'ffmpeg_opts': self.cmdopts['render_cmd_opts'] } inputs.append(copy.deepcopy(opts)) return inputs
[docs]class ProjectFramesRenderer(ParallelRenderer): """Render the video for each experimental run in each experiment. """
[docs] def __init__(self, main_config: types.YAMLDict, cmdopts: types.Cmdopts) -> None: super().__init__(main_config, cmdopts) self.main_config = main_config self.cmdopts = cmdopts self.logger = logging.getLogger(__name__)
[docs] def __call__(self, criteria: bc.IConcreteBatchCriteria) -> None: exp_to_render = utils.exp_range_calc(self.cmdopts, self.cmdopts['batch_output_root'], criteria) inputs = [] for exp in exp_to_render: inputs.extend(self._calc_rendering_inputs(exp)) self.do_rendering(inputs)
[docs] def _calc_rendering_inputs(self, exp: pathlib.Path) -> tp.List[types.SimpleDict]: exp_imagize_root = pathlib.Path(self.cmdopts['batch_imagize_root'], exp.name) if not exp_imagize_root.exists(): return [] # Project render targets are in # <batch_video_root>/<exp_name>, for all directories # in <exp_imagize_root>. output_dir = pathlib.Path(self.cmdopts['batch_video_root'], exp.name) inputs = [] for candidate in exp_imagize_root.iterdir(): if candidate.is_dir(): opts = { 'input_dir': str(candidate), 'output_dir': str(output_dir), 'ofile_name': candidate.name + config.kRenderFormat, 'ffmpeg_opts': self.cmdopts['render_cmd_opts'] } inputs.append(copy.deepcopy(opts)) return inputs
[docs]class BivarHeatmapRenderer(ParallelRenderer): """Render videos from generated inter-experiment heatmaps. versionadded:: 1.2.20 """
[docs] def __init__(self, main_config: types.YAMLDict, cmdopts: types.Cmdopts) -> None: super().__init__(main_config, cmdopts) self.main_config = main_config self.cmdopts = cmdopts self.logger = logging.getLogger(__name__)
[docs] def __call__(self, criteria: bc.IConcreteBatchCriteria) -> None: inputs = self._calc_rendering_inputs() self.do_rendering(inputs)
[docs] def _calc_rendering_inputs(self) -> tp.List[types.SimpleDict]: graph_root = pathlib.Path(self.cmdopts['batch_graph_collate_root']) inputs = [] for candidate in graph_root.iterdir(): if "HM-" in candidate.name and candidate.is_dir(): # Project render targets are in <batch_video_root>/<graph name>. output_dir = pathlib.Path(self.cmdopts['batch_video_root'], candidate.name) opts = { 'input_dir': str(candidate), 'output_dir': str(output_dir), 'ofile_name': candidate.name + config.kRenderFormat, 'ffmpeg_opts': self.cmdopts['render_cmd_opts'] } inputs.append(copy.deepcopy(opts)) return inputs
[docs]class ExpRenderer: """Render all images in the input directory to a video via :program:`ffmpeg`. """
[docs] def __init__(self) -> None: self.logger = logging.getLogger(__name__) assert shutil.which('ffmpeg') is not None, "ffmpeg not found"
[docs] def __call__(self, main_config: types.YAMLDict, render_opts: tp.Dict[str, str]) -> None: output_dir = pathlib.Path(render_opts['output_dir']) self.logger.info("Rendering images in %s...", output_dir.name) opts = render_opts['ffmpeg_opts'].split(' ') ipaths = "'{0}/*{1}'".format(render_opts['input_dir'], config.kImageExt) opath = pathlib.Path(render_opts['output_dir'], render_opts['ofile_name']) cmd = ["ffmpeg", "-y", "-pattern_type", "glob", "-i", ipaths] cmd.extend(opts) cmd.extend([str(opath)]) to_run = ' '.join(cmd) self.logger.trace('Run cmd: %s', to_run) # type: ignore utils.dir_create_checked(render_opts['output_dir'], exist_ok=True) with subprocess.Popen(to_run, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc: proc.wait() # We use communicate(), not wait() to avoid issues with IO buffers # becoming full (i.e., you get deadlocks with wait() regularly). stdout_raw, stderr_raw = proc.communicate() # Only show output if the process failed (i.e., did not return 0) if proc.returncode != 0: self.logger.error("Cmd '%s' failed!", to_run) stdout_str = stdout_raw.decode("ascii") stderr_str = stderr_raw.decode("ascii") self.logger.error(stdout_str) self.logger.error(stderr_str)
__api__ = [ 'ParallelRenderer', 'PlatformFramesRenderer', 'ProjectFramesRenderer', 'BivarHeatmapRenderer', 'ExpRenderer' ]