# Copyright 2021 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""
The plugin for the :term:`ARGoS` :term:`Engine`.
"""
# Core packages
import argparse
import os
import random
import typing as tp
import re
import shutil
import logging
import sys
import pathlib
import psutil
# 3rd party packages
import packaging.version
# Project packages
from sierra.core import config, types, utils, batchroot, execenv
from sierra.core.experiment import bindings, definition
import sierra.core.variables.batch_criteria as bc
_logger = logging.getLogger("engine.argos")
[docs]
class ExpRunShellCmdsGenerator(bindings.IExpRunShellCmdsGenerator):
def __init__(
self,
cmdopts: types.Cmdopts,
criteria: bc.XVarBatchCriteria,
exp_num: int,
n_agents: tp.Optional[int],
) -> None:
self.cmdopts = cmdopts
self.display_port = -1
[docs]
def pre_run_cmds(
self, host: str, input_fpath: pathlib.Path, run_num: int
) -> list[types.ShellCmdSpec]:
# When running ARGoS under Xvfb in order to headlessly render frames, we
# need to start a per-instance Xvfb server that we tell ARGoS to use via
# the DISPLAY environment variable, which will then be killed when the
# shell GNU parallel spawns to run each line in the commands file exits.
if host == "slave" and self.cmdopts["engine_vc"]:
self.display_port = random.randint(0, 1000000)
cmd1 = f"Xvfb :{self.display_port} -screen 0, 1600x1200x24 &"
cmd2 = f"export DISPLAY=:{self.display_port};"
spec1 = types.ShellCmdSpec(cmd=cmd1, shell=True, wait=True)
spec2 = types.ShellCmdSpec(cmd=cmd2, shell=True, wait=True, env=True)
return [spec1, spec2]
return []
[docs]
def exec_run_cmds(
self, host: str, input_fpath: pathlib.Path, run_num: int
) -> list[types.ShellCmdSpec]:
shellname = execenv.get_executable_arch_aware(config.ARGOS["launch_cmd"])
cmd = "{} -c {}{}".format(
shellname, str(input_fpath), config.ARGOS["launch_file_ext"]
)
# ARGoS is pretty good about not printing stuff if we pass these
# arguments. We don't want to pass > /dev/null so that we get the
# text of any exceptions that cause ARGoS to crash.
if self.cmdopts["exec_devnull"]:
cmd += " --log-file /dev/null --logerr-file /dev/null"
cmd += ";"
return [types.ShellCmdSpec(cmd=cmd, shell=True, wait=True)]
[docs]
def post_run_cmds(
self, host: str, run_output_root: pathlib.Path
) -> list[types.ShellCmdSpec]:
return []
[docs]
class ExpShellCmdsGenerator(bindings.IExpShellCmdsGenerator):
def __init__(self, cmdopts: types.Cmdopts, exp_num: int) -> None:
self.cmdopts = cmdopts
[docs]
def pre_exp_cmds(self) -> list[types.ShellCmdSpec]:
return []
[docs]
def exec_exp_cmds(self, exec_opts: types.StrDict) -> list[types.ShellCmdSpec]:
return []
[docs]
def post_exp_cmds(self) -> list[types.ShellCmdSpec]:
# Cleanup Xvfb processes which were started in the background. If SIERRA
# was run with --exec-resume, then there may be no Xvfb processes to
# kill, so we can't (in general) check the return code.
if self.cmdopts["engine_vc"]:
return [types.ShellCmdSpec(cmd="killall Xvfb", shell=True, wait=True)]
return []
[docs]
def execenv_check(cmdopts: types.Cmdopts) -> None:
"""
Perform stage2 execution environment checks for the :term:`ARgoS` engine.
Checks:
- ARGoS can be found
- ARGoS version supported
- :envvar:`ARGOS_PLUGIN_PATH` is set
- for :program:`Xvfb` if ``--engine-vc`` was passed.
"""
keys = ["ARGOS_PLUGIN_PATH"]
for k in keys:
assert k in os.environ, f"Non-ARGoS environment detected: '{k}' not found"
# Check we can find ARGoS
proc = execenv.check_for_simulator(
cmdopts["engine"], cmdopts["execenv"], config.ARGOS["launch_cmd"]
)
# Check ARGoS version
stdout = proc.stdout.decode("utf-8")
stderr = proc.stderr.decode("utf-8")
res = re.search(r"[0-9]+.[0-9]+.[0-9]+-beta[0-9]+", stdout)
assert (
res is not None
), f"ARGOS_VERSION not in stdout: stdout='{stdout}',stderr='{stderr}'"
_logger.trace("Parsed ARGOS_VERSION: %s", res.group(0))
version = packaging.version.parse(res.group(0))
min_version = config.ARGOS["min_version"]
assert (
version >= min_version
), f"ARGoS version {version} < min required {min_version}"
if cmdopts["engine_vc"]:
assert shutil.which("Xvfb") is not None, "Xvfb not found"
[docs]
def cmdline_postparse_configure(
env: str, args: argparse.Namespace
) -> argparse.Namespace:
"""
Configure cmdline args after parsing for the :term:`ARGoS` engine.
This sets arguments appropriately depending on what HPC environment is
selected with ``--execenv``.
- hpc.local
- hpc.adhoc
- hpc.slurm
- hpc.pbs
"""
# No configuration needed for stages 3-5
if not any(stage in args.pipeline for stage in [1, 2]):
return args
if env == "hpc.local":
return _configure_hpc_local(args)
if env == "hpc.adhoc":
return _configure_hpc_adhoc(args)
if env == "hpc.slurm":
return _configure_hpc_slurm(args)
if env == "hpc.pbs":
return _configure_hpc_pbs(args)
_logger.warning("Execution environment %s untested", env)
return args
def _configure_hpc_pbs(args: argparse.Namespace) -> argparse.Namespace:
_logger.debug("Configuring ARGoS for PBS execution")
# For HPC, we want to use the the maximum # of simultaneous jobs per
# node such that there is no thread oversubscription. We also always
# want to allocate each physics engine its own thread for maximum
# performance, per the original ARGoS paper.
#
# However, PBS does not have an environment variable for # jobs/node, so
# we have to rely on the user to set this appropriately.
args.physics_n_engines = int(
float(os.environ["PBS_NUM_PPN"]) / args.exec_jobs_per_node
)
_logger.debug(
"Allocated %s physics engines/run, %s parallel runs/node",
args.physics_n_engines,
args.exec_jobs_per_node,
)
return args
def _configure_hpc_slurm(args: argparse.Namespace) -> argparse.Namespace:
_logger.debug("Configuring ARGoS for SLURM execution")
# For HPC, we want to use the the maximum # of simultaneous jobs per
# node such that there is no thread oversubscription. We also always
# want to allocate each physics engine its own thread for maximum
# performance, per the original ARGoS paper.
#
# We rely on the user to request their job intelligently so that
# SLURM_TASKS_PER_NODE is appropriate.
if args.exec_jobs_per_node is None:
res = re.search(r"^[^\(]+", os.environ["SLURM_TASKS_PER_NODE"])
assert (
res is not None
), "Unexpected format in SLURM_TASKS_PER_NODE: '{}'".format(
os.environ["SLURM_TASKS_PER_NODE"]
)
args.exec_jobs_per_node = int(res.group(0))
args.physics_n_engines = int(os.environ["SLURM_CPUS_PER_TASK"])
_logger.debug(
"Allocated %s physics engines/run, %s parallel runs/node",
args.physics_n_engines,
args.exec_jobs_per_node,
)
return args
def _configure_hpc_local(args: argparse.Namespace) -> argparse.Namespace:
_logger.debug("Configuring ARGoS for LOCAL execution")
if any(stage in args.pipeline for stage in [1, 2]):
assert (
args.physics_n_engines is not None
), "--physics-n-engines is required for --execenv=hpc.local when running stage{1,2}"
ppn_per_run_req = args.physics_n_engines
if args.exec_jobs_per_node is None:
# Every physics engine gets at least 1 core
parallel_jobs = int(psutil.cpu_count() / float(ppn_per_run_req))
if parallel_jobs == 0:
_logger.warning(
(
"Local machine has %s logical cores, but "
"%s physics engines/run requested; "
"allocating anyway"
),
psutil.cpu_count(),
ppn_per_run_req,
)
parallel_jobs = 1
# Make sure we don't oversubscribe cores--each simulation needs at
# least 1 core.
args.exec_jobs_per_node = min(args.n_runs, parallel_jobs)
_logger.debug(
"Allocated %s physics engines/run, %s parallel runs/node",
args.physics_n_engines,
args.exec_jobs_per_node,
)
return args
def _configure_hpc_adhoc(args: argparse.Namespace) -> argparse.Namespace:
_logger.debug("Configuring ARGoS for ADHOC execution")
nodes = execenv.parse_nodefile(args.nodefile)
ppn = sys.maxsize
for node in nodes:
ppn = min(ppn, node.n_cores)
# For HPC, we want to use the the maximum # of simultaneous jobs per
# node such that there is no thread oversubscription. We also always
# want to allocate each physics engine its own thread for maximum
# performance, per the original ARGoS paper.
if args.exec_jobs_per_node is None:
args.exec_jobs_per_node = int(float(args.n_runs) / len(nodes))
args.physics_n_engines = int(ppn / args.exec_jobs_per_node)
_logger.debug(
"Allocated %s physics engines/run, %s parallel runs/node",
args.physics_n_engines,
args.exec_jobs_per_node,
)
return args
def population_size_from_pickle(
chgs: tp.Union[definition.AttrChangeSet, definition.ElementAddList],
main_config: types.YAMLDict,
cmdopts: types.Cmdopts,
) -> int:
for path, attr, value in chgs:
if path == ".//arena/distribute/entity" and attr == "quantity":
return int(value)
return -1
def arena_dims_from_criteria(
criteria: bc.XVarBatchCriteria,
) -> list[utils.ArenaExtent]:
dims = []
for exp in criteria.gen_attr_changelist():
for c in exp:
if c.path == ".//arena" and c.attr == "size":
d = utils.Vector3D.from_str(c.value)
dims.append(utils.ArenaExtent(d))
assert len(dims) > 0, "Scenario dimensions not contained in batch criteria"
return dims
[docs]
def robot_type_from_def(exp_def: definition.BaseExpDef) -> tp.Optional[str]:
"""
Get the entity type of the robots managed by ARGoS.
.. NOTE:: Assumes homgeneous systems.
"""
for robot in config.ARGOS["spatial_hash2D"]:
if exp_def.has_element(f".//arena/distribute/entity/{robot}"):
return robot
return None
def population_size_from_def(
exp_def: definition.BaseExpDef, main_config: types.YAMLDict, cmdopts: types.Cmdopts
) -> int:
return population_size_from_pickle(exp_def.attr_chgs, main_config, cmdopts)
def pre_exp_diagnostics(
cmdopts: types.Cmdopts, pathset: batchroot.PathSet, logger: logging.Logger
) -> None:
s = "batch_exp_root='%s',runs/exp=%s,threads/job=%s,n_jobs=%s"
logger.info(
s,
pathset.root,
cmdopts["n_runs"],
cmdopts["physics_n_threads"],
cmdopts["exec_jobs_per_node"],
)
def expsetup_from_def(exp_def: definition.BaseExpDef) -> types.SimpleDict:
ret = {} # type: types.SimpleDict
for path, attr, value in exp_def:
if "experiment" in path:
if "length" in attr:
ret["duration"] = int(value)
if "ticks_per_second" in attr:
ret["n_ticks_per_sec"] = int(value)
if any(k not in ret for k in ["duration", "n_ticks_per_sec"]):
raise RuntimeError("Malformed experiment definition")
return ret