Source code for sierra.plugins.engine.ros1gazebo.plugin

# Copyright 2021 John Harwell, All rights reserved.
#
#  SPDX-License-Identifier: MIT
"""
Provides engine-specific callbacks for the :term:`ROS1+Gazebo` engine.
"""
# Core packages
import argparse
import logging
import os
import re
import typing as tp
import sys
import pathlib
import psutil

# 3rd party packages
import packaging.version

# Project packages
from sierra.core import config, ros1, types, batchroot, execenv
from sierra.core.experiment import bindings, definition
import sierra.core.variables.batch_criteria as bc

_logger = logging.getLogger("ros1gazebo.plugin")


class ExpRunShellCmdsGenerator(bindings.IExpRunShellCmdsGenerator):
    def __init__(
        self,
        cmdopts: types.Cmdopts,
        criteria: bc.XVarBatchCriteria,
        exp_num: int,
        n_agents: tp.Optional[int],
    ) -> None:
        self.cmdopts = cmdopts
        self.gazebo_port = -1
        self.roscore_port = -1

    def pre_run_cmds(
        self, host: str, input_fpath: pathlib.Path, run_num: int
    ) -> list[types.ShellCmdSpec]:
        if host == "master":
            return []

        # First, the cmd to start roscore. We need to be on a unique port so
        # that multiple ROS instances corresponding to multiple Gazebo
        # instances with the same topic names are considered distinct/not
        # accessible between instances of Gazebo.
        self.roscore_port = config.ROS["port_base"] + run_num * 2

        # roscore will run on each slave node used during stage 2, so we have to
        # use 'localhost' for binding.
        roscore_uri = types.ShellCmdSpec(
            cmd=f"export ROS_MASTER_URI=http://localhost:{self.roscore_port};",
            shell=True,
            wait=True,
            env=True,
        )
        # ROS/Gazebo don't provide options to not print stuff, so we have to use
        # the nuclear option.
        roscore_cmd = f"roscore -p {self.roscore_port} "
        if self.cmdopts["exec_devnull"]:
            roscore_cmd += "> /dev/null 2>&1 "

        roscore_cmd += "& "

        # Each experiment gets their own roscore. Because each roscore has a
        # different port, this prevents any robots from pre-emptively starting
        # the next experiment before the rest of the robots have finished the
        # current one.
        roscore_process = types.ShellCmdSpec(
            cmd=roscore_cmd,
            shell=True,
            wait=False,
        )

        # Second, the command to give Gazebo a unique port on the host during
        # stage 2. We need to be on a unique port so that multiple Gazebo
        # instances can be run in parallel.
        self.gazebo_port = config.ROS["port_base"] + run_num * 2 + 1

        # 2021/12/13: You can't use HTTPS for some reason or gazebo won't
        # start...
        gazebo_uri = types.ShellCmdSpec(
            cmd=f"export GAZEBO_MASTER_URI=http://localhost:{self.gazebo_port};",
            shell=True,
            env=True,
            wait=True,
        )

        return [roscore_uri, roscore_process, gazebo_uri]

    def exec_run_cmds(
        self, host: str, input_fpath: pathlib.Path, run_num: int
    ) -> list[types.ShellCmdSpec]:
        if host == "master":
            return []

        # For ROS+gazebo, we have two files per experimental run:
        #
        # - One for the stuff that is run on the ROS master (can be
        #   nothing/empty file).
        #
        # - One for the stuff that describes what to run on each robot/how many
        #   robots to spawn.
        #
        # We COULD do it all in a single file, but in order to share base code
        # with the ROS+robot engine, we need to do it this way.
        #
        # 2022/02/28: I don't use the -u argument here to set ROS_MASTER_URI,
        # because ROS works well enough when only running on the localhost, in
        # terms of respecting whatever the envvar is set to.
        master = str(input_fpath) + "_master" + config.ROS["launch_file_ext"]
        robots = str(input_fpath) + "_robots" + config.ROS["launch_file_ext"]

        cmd = "{} --wait {} {} ".format(config.ROS["launch_cmd"], master, robots)

        # ROS/Gazebo don't provide options to not print stuff, so we have to use
        # the nuclear option.
        if self.cmdopts["exec_devnull"]:
            cmd += "> /dev/null 2>&1"

        cmd += ";"
        return [types.ShellCmdSpec(cmd=cmd, shell=True, wait=True)]

    def post_run_cmds(
        self, host: str, run_output_root: pathlib.Path
    ) -> list[types.ShellCmdSpec]:
        return []


class ExpShellCmdsGenerator(bindings.IExpShellCmdsGenerator):
    def __init__(self, cmdopts: types.Cmdopts, exp_num: int) -> None:
        self.cmdopts = cmdopts
        self.exp_num = exp_num

    def pre_exp_cmds(self) -> list[types.ShellCmdSpec]:
        # 2025-09-11 [JRH]: This was a NASTY bug which got triggered when
        # running SIERRA in a venv. ROS is installed system-wide, but mixing
        # venv+system packages via a --system venv caused all sorts of problems
        # because the venv versions were often mixing newer with older packages
        # at load time. So, we have to allow the venv interpreter to find the
        # system-wide packages AFTER everything is loaded by modifying the
        # PYTHONPATH for the sub-shell that SIERRA runs everything in.
        #
        # Hopefully this is better in ROS2.

        return [
            types.ShellCmdSpec(
                cmd="export PYTHONPATH=$PYTHONPATH:/usr/lib/python3/dist-packages/",
                shell=True,
                wait=True,
                env=True,
            ),
        ]

    def exec_exp_cmds(self, exec_opts: types.StrDict) -> list[types.ShellCmdSpec]:
        return []

    def post_exp_cmds(self) -> list[types.ShellCmdSpec]:
        # Cleanup roscore and gazebo processes which are still active because
        # they don't know how to clean up after themselves.
        #
        # This is OK to do even when we have multiple Gazebo+ROS instances on
        # the same machine, because we only run these commands after an
        # experiment finishes, so there isn't anything currently running we
        # might accidentally kill.
        #
        # Can't use killall, because that returns non-zero if things
        # are cleaned up nicely.
        return [
            types.ShellCmdSpec(
                cmd="if pgrep gzserver; then pkill gzserver; fi;", shell=True, wait=True
            ),
            types.ShellCmdSpec(
                cmd="if pgrep rosmaster; then pkill rosmaster; fi;",
                shell=True,
                wait=True,
            ),
            types.ShellCmdSpec(
                cmd="if pgrep roscore; then pkill roscore; fi;", shell=True, wait=True
            ),
            types.ShellCmdSpec(
                cmd="if pgrep rosout; then pkill rosout; fi;", shell=True, wait=True
            ),
        ]


class ExpConfigurer(bindings.IExpConfigurer):
    def __init__(self, cmdopts: types.Cmdopts) -> None:
        self.cmdopts = cmdopts

    def for_exp_run(
        self, exp_input_root: pathlib.Path, run_output_root: pathlib.Path
    ) -> None:
        pass

    def for_exp(self, exp_input_root: pathlib.Path) -> None:
        pass

    def parallelism_paradigm(self) -> str:
        return "per-exp"


[docs] def cmdline_postparse_configure( env: str, args: argparse.Namespace ) -> argparse.Namespace: """ Configure cmdline args after parsing for the :term:`ROS1+Gazebo` engine. This sets arguments appropriately depending on what HPC environment is selected with ``--execenv``: - hpc.local - hpc.adhoc - hpc.slurm - hpc.pbs """ # No configuration needed for stages 3-5 if not any(stage in args.pipeline for stage in [1, 2]): return args if env == "hpc.local": return _configure_hpc_local(args) if env == "hpc.adhoc": return _configure_hpc_adhoc(args) if env == "hpc.slurm": return _configure_hpc_slurm(args) if env == "hpc.pbs": return _configure_hpc_pbs(args) raise RuntimeError(f"'{env}' unsupported on ROS1+Gazebo")
def _configure_hpc_pbs(args: argparse.Namespace) -> argparse.Namespace: # For now, nothing to do. If more stuff with physics engine # configuration is implemented, this may change. _logger.debug( "Allocated %s physics threads/run, %s parallel runs/node", args.physics_n_threads, args.exec_jobs_per_node, ) return args def _configure_hpc_slurm(args: argparse.Namespace) -> argparse.Namespace: # We rely on the user to request their job intelligently so that # SLURM_TASKS_PER_NODE is appropriate. if args.exec_jobs_per_node is None: res = re.search(r"^[^\(]+", os.environ["SLURM_TASKS_PER_NODE"]) assert ( res is not None ), "Unexpected format in SLURM_TASKS_PER_NODE: '{}'".format( os.environ["SLURM_TASKS_PER_NODE"] ) args.exec_jobs_per_node = int(res.group(0)) _logger.debug( "Allocated %s physics threads/run, %s parallel runs/node", args.physics_n_threads, args.exec_jobs_per_node, ) return args def _configure_hpc_adhoc(args: argparse.Namespace) -> argparse.Namespace: nodes = execenv.parse_nodefile(args.nodefile) ppn = sys.maxsize for node in nodes: ppn = min(ppn, node.n_cores) if args.exec_jobs_per_node is None: args.exec_jobs_per_node = int(float(args.n_runs) / len(nodes)) _logger.debug( "Allocated %s physics threads/run, %s parallel runs/node", args.physics_n_threads, args.exec_jobs_per_node, ) return args def _configure_hpc_local(args: argparse.Namespace) -> argparse.Namespace: # For now. If more physics engine configuration is added, this will # change. ppn_per_run_req = 1 if args.exec_jobs_per_node is None: parallel_jobs = int(psutil.cpu_count() / float(ppn_per_run_req)) if parallel_jobs == 0: _logger.warning( ( "Local machine has %s logical cores, but " "%s threads/run requested; " "allocating anyway" ), psutil.cpu_count(), ppn_per_run_req, ) parallel_jobs = 1 # Make sure we don't oversubscribe cores--each simulation needs at # least 1 core. args.exec_jobs_per_node = min(args.n_runs, parallel_jobs) _logger.debug( "Allocated %s physics threads/run, %s parallel runs/node", args.physics_n_threads, args.exec_jobs_per_node, ) return args
[docs] def execenv_check(cmdopts: types.Cmdopts) -> None: """ Verify execution environment in stage 2 for :term:`ROS1+Gazebo`. Check for: - :envvar:`ROS_VERSION` is ROS1. - :envvar:`ROS_DISTRO` is {kinetic/noetic}. - :program:`gazebo` can be found and the version is supported. """ keys = ["ROS_DISTRO", "ROS_VERSION"] for k in keys: assert k in os.environ, f"Non-ROS+Gazebo environment detected: '{k}' not found" # Check ROS distro assert os.environ["ROS_DISTRO"] in [ "kinetic", "noetic", ], "SIERRA only supports ROS1 kinetic,noetic" # Check ROS version assert ( os.environ["ROS_VERSION"] == "1" ), "Wrong ROS version: this plugin is for ROS1" # Check we can find Gazebo version = execenv.check_for_simulator( cmdopts["engine"], cmdopts["execenv"], config.GAZEBO["launch_cmd"] ) # Check Gazebo version stdout = version.stdout.decode("utf-8") stderr = version.stderr.decode("utf-8") res = re.search(r"[0-9]+.[0-9]+.[0-9]+", stdout) assert ( res is not None ), f"Gazebo version not in std: have stdout='{stdout}',stderr='{stderr}'" version = packaging.version.parse(res.group(0)) min_version = packaging.version.parse(config.GAZEBO["min_version"]) assert ( version >= min_version ), f"Gazebo version {version} < min required {min_version}"
def population_size_from_pickle( adds_def: tp.Union[definition.AttrChangeSet, definition.ElementAddList], main_config: types.YAMLDict, cmdopts: types.Cmdopts, ) -> int: return ros1.callbacks.population_size_from_pickle(adds_def, main_config, cmdopts) def population_size_from_def( exp_def: definition.BaseExpDef, main_config: types.YAMLDict, cmdopts: types.Cmdopts ) -> int: return ros1.callbacks.population_size_from_def(exp_def, main_config, cmdopts) def agent_prefix_extract(main_config: types.YAMLDict, cmdopts: types.Cmdopts) -> str: return ros1.callbacks.robot_prefix_extract(main_config, cmdopts) def pre_exp_diagnostics( cmdopts: types.Cmdopts, pathset: batchroot.PathSet, logger: logging.Logger ) -> None: s = "batch_exp_root='%s',runs/exp=%s,threads/job=%s,n_jobs=%s" logger.info( s, pathset.root, cmdopts["n_runs"], cmdopts["physics_n_threads"], cmdopts["exec_jobs_per_node"], ) __all__ = ["cmdline_postparse_configure", "execenv_check"]