Source code for sierra.plugins.prod.render.plugin

# Copyright 2019 John Harwell, All rights reserved.
#
#  SPDX-License-Identifier: MIT

"""Classes for rendering frames (images) into videos.

Frames can be:

- Captured by by the ``--engine`` during stage 2.

- Generated during stage 3 of SIERRA via imagizing.

- Generated inter-experiment heatmaps from bivariate experiments.

"""

# Core packages
import subprocess
import typing as tp
import multiprocessing as mp
import queue
import copy
import shutil
import logging
import pathlib
import time
import datetime

# 3rd party packages

# Project packages
import sierra.core.variables.batch_criteria as bc
from sierra.core import types, config, utils, batchroot
from sierra.core import plugin as pm

_logger = logging.getLogger(__name__)


[docs] def proc_batch_exp( main_config: types.YAMLDict, cmdopts: types.Cmdopts, pathset: batchroot.PathSet, criteria: bc.XVarBatchCriteria, ) -> None: """ Render videos. #. From :term:`Engine` if ``--engine-vc`` was passed. #. From imagized images if ``proc.imagize`` was run previously to generate frames, and ``--project-rendering`` is passed. """ if (not cmdopts["engine_vc"]) and (not cmdopts["project_rendering"]): _logger.warning( "Rendering plugin active without --engine-vc or --project-rendering" ) return _logger.info("Rendering videos...") start = time.time() graphs_path = pathlib.Path(cmdopts["project_config_root"]) / pathlib.Path( config.PROJECT_YAML.graphs ) if utils.path_exists(graphs_path): _logger.info("Loading render config for project=%s", cmdopts["project"]) loader = pm.module_load_tiered(project=cmdopts["project"], path="pipeline.yaml") render_config = loader.load_config(cmdopts, config.PROJECT_YAML.graphs) else: _logger.warning("%s does not exist--cannot generate render", graphs_path) return if cmdopts["engine_vc"]: _from_engine(render_config["intra-exp"], cmdopts, pathset, criteria) else: _logger.debug( "--engine-vc not passed--(possibly) skipping " "rendering frames captured by the engine" ) if cmdopts["project_rendering"] and "imagize" in render_config: _from_project_imagized(render_config["imagize"], cmdopts, pathset, criteria) else: _logger.debug( "--project-rendering not passed--(possibly) " "skipping rendering frames captured by the " "project" ) elapsed = int(time.time() - start) sec = datetime.timedelta(seconds=elapsed) _logger.info("Rendering complete in %s", str(sec))
def _from_engine( render_config: types.YAMLDict, cmdopts: types.Cmdopts, pathset: batchroot.PathSet, criteria: bc.XVarBatchCriteria, ) -> None: """Render frames (images) captured in by a engine into videos. Frames are stitched together to make a video using :program:`ffmpeg`. Output format controlled via configuration. Targets to render are found in:: <batch_root>/<exp_name>/<run_name>/<frames_leaf> Videos are output in:: <batch_root>/videos/<exp_name> ``<frames_leaf>`` is controlled via configuration. For more details, see :ref:`plugins/prod/render`. .. note:: This currently only works with PNG images. """ exp_to_render = utils.exp_range_calc( cmdopts["exp_range"], pathset.output_root, criteria.gen_exp_names() ) inputs = [] for exp in exp_to_render: output_dir = pathset.video_root / exp.name for run in exp.iterdir(): engine = cmdopts["engine"].split(".")[1] frames_leaf = config.RENDERING[engine]["frames_leaf"] output_path = output_dir / (run.name + config.RENDERING["format"]) opts = { "exp_root": pathset.imagize_root / exp.name, "output_path": output_path, "input_dir": str(exp / run / frames_leaf), "ffmpeg_opts": cmdopts["render_cmd_opts"], } inputs.append(copy.deepcopy(opts)) _parallel(render_config, cmdopts, inputs) def _from_project_imagized( render_config: types.YAMLDict, cmdopts: types.Cmdopts, pathset: batchroot.PathSet, criteria: bc.XVarBatchCriteria, ) -> None: """Render THINGS previously imagized in a project in stage 3 into videos. Frames (images) in the imagize root (see :ref:`concepts/run-time-tree`) are stitched together to make a video using :program:`ffmpeg`. Output format controlled via configuration. Targets to render are found in:: <batch_root>/imagize/<subdir_path> Videos are output in:: <batch_root>/videos/<exp>/<subdir_path> For more details, see :ref:`plugins/prod/render`. .. NOTE:: This currently only works with PNG images. """ exp_to_render = utils.exp_range_calc( cmdopts["exp_range"], pathset.output_root, criteria.gen_exp_names() ) inputs = [] # For each graph in each category for graph in render_config: # Across all experiments for exp in exp_to_render: exp_imagize_root = pathset.imagize_root / exp.name if not exp_imagize_root.exists(): continue # Check all directories recursively for candidate in exp_imagize_root.rglob("*"): path = pathlib.Path(dict(graph)["src_stem"]) fragment = path.parent / path.name if candidate.is_file(): continue if str(fragment) not in str(candidate): continue output_path = ( pathset.video_root / exp.name / candidate.relative_to(exp_imagize_root) ) / (candidate.name + config.RENDERING["format"]) inputs.append( { "input_dir": candidate, "exp_root": pathset.imagize_root / exp.name, "output_path": output_path, "ffmpeg_opts": cmdopts["render_cmd_opts"], } ) _parallel(render_config, cmdopts, inputs) def _parallel( render_config: types.YAMLDict, cmdopts: types.Cmdopts, inputs: list[types.SimpleDict], ) -> None: """Perform the requested rendering in parallel.""" q = mp.JoinableQueue() # type: mp.JoinableQueue for spec in inputs: q.put(spec) # Render videos in parallel--waaayyyy faster parallelism = cmdopts["processing_parallelism"] for _ in range(0, parallelism): p = mp.Process(target=_worker, args=(q, render_config)) p.start() q.join() def _worker(q: mp.Queue, render_config: types.YAMLDict) -> None: assert shutil.which("ffmpeg") is not None, "ffmpeg not found" while True: # Wait for 3 seconds after the queue is empty before bailing try: render_opts = q.get(True, 3) _logger.info("Rendering images in %s...", render_opts["exp_root"].name) opts = render_opts["ffmpeg_opts"].split(" ") ipaths = "'{}/*.{}'".format( render_opts["input_dir"], config.GRAPHS["static_type"] ) cmd = ["ffmpeg", "-y", "-pattern_type", "glob", "-i", ipaths] cmd.extend(opts) cmd.extend([str(render_opts["output_path"])]) to_run = " ".join(cmd) _logger.trace("Run cmd: %s", to_run) utils.dir_create_checked(render_opts["output_path"].parent, exist_ok=True) with subprocess.Popen( to_run, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE ) as proc: proc.wait() # We use communicate(), not wait() to avoid issues with IO buffers # becoming full (e.g., you get deadlocks with wait() regularly). stdout_raw, stderr_raw = proc.communicate() # Only show output if the process failed (i.e., did not return 0) if proc.returncode != 0: _logger.error("Cmd '%s' failed!", to_run) stdout_str = stdout_raw.decode("ascii") stderr_str = stderr_raw.decode("ascii") _logger.error("Return code=%d", proc.returncode) _logger.error("stdout: %s", stdout_str) _logger.error("stderr: %s", stderr_str) q.task_done() except queue.Empty: break __all__ = ["proc_batch_exp"]