# Copyright 2021 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""Plugin for :term:`ROS1`-based robot :term:`Engines <Engine>`."""
# Core packages
import argparse
import logging
import typing as tp
import os
import subprocess
import pwd
import pathlib
# 3rd party packages
import yaml
# Project packages
from sierra.core import engine, config, ros1, types, utils, batchroot, execenv
from sierra.core.experiment import bindings, definition
import sierra.core.variables.batch_criteria as bc
_logger = logging.getLogger("engine.ros1robot")
[docs]
class ExpRunShellCmdsGenerator(bindings.IExpRunShellCmdsGenerator):
"""Generate the commands to run a single :term:`Experimental Run`."""
def __init__(
self,
cmdopts: types.Cmdopts,
criteria: bc.XVarBatchCriteria,
exp_num: int,
n_agents: tp.Optional[int],
) -> None:
self.cmdopts = cmdopts
self.n_agents = n_agents
self.exp_num = exp_num
self.criteria = criteria
[docs]
def pre_run_cmds(
self, host: str, input_fpath: pathlib.Path, run_num: int
) -> list[types.ShellCmdSpec]:
master_ip = engine.get_local_ip()
port = config.ROS["port_base"] + self.exp_num
master_uri = f"http://{master_ip}:{port}"
ros_master = types.ShellCmdSpec(
cmd=f"export ROS_MASTER_URI={master_uri};", shell=True, env=True, wait=True
)
if host == "master":
if self.cmdopts["no_master_node"]:
return []
return [ros_master]
main_path = (
pathlib.Path(self.cmdopts["project_config_root"]) / config.PROJECT_YAML.main
)
main_config = yaml.load(utils.utf8open(main_path), yaml.FullLoader)
_logger.debug(
"Generating pre-exec cmds for run%s slaves: %d robots",
run_num,
self.n_agents,
)
script_yaml = main_config["ros"]["robots"][self.cmdopts["robot"]]
script_file = script_yaml.get("setup_script", "$HOME/.bashrc")
ros_setup = types.ShellCmdSpec(
cmd=f". {script_file};", shell=True, wait=True, env=True
)
return [ros_setup, ros_master]
[docs]
def exec_run_cmds(
self, host: str, input_fpath: pathlib.Path, run_num: int
) -> list[types.ShellCmdSpec]:
if host == "master":
return self._exec_run_cmds_master(host, input_fpath, run_num)
return self._exec_run_cmds_slave(host, input_fpath, run_num)
def _exec_run_cmds_master(
self, host: str, input_fpath: pathlib.Path, run_num: int
) -> list[types.ShellCmdSpec]:
if self.cmdopts["no_master_node"]:
return []
_logger.debug("Generating exec cmds for run%s master", run_num)
# ROS master node
exp_dirname = self.criteria.gen_exp_names()[self.exp_num]
exp_template_path = utils.exp_template_path(
self.cmdopts, self.criteria.batch_input_root, exp_dirname
)
cmd = "{0} --wait {1}_run{2}_master{3};"
cmd = cmd.format(
config.ROS["launch_cmd"],
str(exp_template_path),
run_num,
config.ROS["launch_file_ext"],
)
# --wait tells roslaunch to wait for the configured master to come up
# before launch the "master" code.
#
# 2022/02/28: -p (apparently) tells roslaunch not to CONNECT to a master
# at the specified ort, but to LAUNCH a new master at the specified
# port. This is not really documented well.
master_node = types.ShellCmdSpec(cmd=cmd, shell=True, wait=True)
return [master_node]
def _exec_run_cmds_slave(
self, host: str, input_fpath: pathlib.Path, run_num: int
) -> list[types.ShellCmdSpec]:
_logger.debug(
"Generating exec cmds for run%s slaves: %d robots", run_num, self.n_agents
)
nodes = execenv.parse_nodefile(self.cmdopts["nodefile"])
if len(nodes) < self.n_agents:
_logger.critical(
(
"Need %d hosts to correctly generate launch "
"cmds for run%s with %d robots; %d available"
),
self.n_agents,
run_num,
self.n_agents,
len(nodes),
)
ret = [] # type: tp.List[types.ShellCmdSpec]
for i in range(0, self.n_agents):
# --wait tells roslaunch to wait for the configured master to
# come up before launch the robot code.
cmd = "{0} --wait {1}_robot{2}{3};"
cmd = cmd.format(
config.ROS["launch_cmd"],
input_fpath,
i,
config.ROS["launch_file_ext"],
)
ret.extend([types.ShellCmdSpec(cmd=cmd, shell=True, wait=True)])
return ret
[docs]
def post_run_cmds(
self, host: str, run_output_root: pathlib.Path
) -> list[types.ShellCmdSpec]:
if host == "master":
return []
return [
types.ShellCmdSpec(
# Can't use killall, because that returns non-zero if things
# are cleaned up nicely.
cmd="if pgrep roslaunch; then pkill roslaunch; fi;",
shell=True,
wait=True,
)
]
[docs]
class ExpShellCmdsGenerator(bindings.IExpShellCmdsGenerator):
"""Generate the commands to run the :term:`Experiment` in stage 2."""
def __init__(self, cmdopts: types.Cmdopts, exp_num: int) -> None:
self.cmdopts = cmdopts
self.exp_num = exp_num
[docs]
def pre_exp_cmds(self) -> list[types.ShellCmdSpec]:
local_ip = engine.get_local_ip()
port = config.ROS["port_base"] + self.exp_num
master_uri = f"http://{local_ip}:{port}"
_logger.info("Using ROS_MASTER_URI=%s", master_uri)
return [
types.ShellCmdSpec(
# roscore will run on the SIERRA host machine.
cmd=f"export ROS_MASTER_URI={master_uri}",
shell=True,
env=True,
wait=True,
),
types.ShellCmdSpec(
# Each exppperiment gets their own roscore. Because each roscore
# has a different port, this prevents any robots from
# pre-emptively starting the next experiment before the rest of
# the robots have finished the current one.
cmd=f"roscore -p {port} & ",
shell=True,
wait=False,
),
]
[docs]
def exec_exp_cmds(self, exec_opts: types.StrDict) -> list[types.ShellCmdSpec]:
return []
[docs]
def post_exp_cmds(self) -> list[types.ShellCmdSpec]:
# Cleanup roscore processes on the SIERRA host machine which are still
# active because they don't know how to clean up after themselves.
return [
types.ShellCmdSpec(cmd="killall rosmaster;", shell=True, wait=True),
types.ShellCmdSpec(cmd="killall roscore;", shell=True, wait=True),
types.ShellCmdSpec(cmd="killall rosout;", shell=True, wait=True),
]
[docs]
def cmdline_postparse_configure(
env: str, args: argparse.Namespace
) -> argparse.Namespace:
"""
Configure cmdline args after parsing for the :term:`ROS1+Robot` engine.
Checks for:
- List of robot IPs via ``--nodefile`` or :envvar:`SIERRA_NODEFILE`.
"""
if args.nodefile is None:
assert "SIERRA_NODEFILE" in os.environ, (
"Non-ros1robot environment detected: --nodefile not "
"passed and 'SIERRA_NODEFILE' not found"
)
args.nodefile = os.environ["SIERRA_NODEFILE"]
assert utils.path_exists(
args.nodefile
), f"SIERRA_NODEFILE '{args.nodefile}' does not exist"
_logger.info("Using '%s' as robot hostnames file", args.nodefile)
assert not args.engine_vc, "Engine visual capture not supported on ros1robot"
return args
[docs]
def execenv_check(cmdopts: types.Cmdopts) -> None:
"""
Verify execution environment in stage 2 for :term:`ROS1+Robot`.
Check for:
- :envvar:`ROS_VERSION` is ROS1.
- :envvar:`ROS_DISTRO` is {kinetic/noetic}.
- :program:`gazebo` can be found and the version is supported.
"""
keys = ["ROS_DISTRO", "ROS_VERSION"]
for k in keys:
assert k in os.environ, f"Non-ROS1+robot environment detected: '{k}' not found"
# Check ROS distro
assert os.environ["ROS_DISTRO"] in [
"kinetic",
"noetic",
], "SIERRA only supports ROS1 kinetic,noetic"
# Check ROS version
assert (
os.environ["ROS_VERSION"] == "1"
), "Wrong ROS version: This plugin is for ROS1"
def population_size_from_pickle(
adds_def: tp.Union[definition.AttrChangeSet, definition.ElementAddList],
main_config: types.YAMLDict,
cmdopts: types.Cmdopts,
) -> int:
return ros1.callbacks.population_size_from_pickle(adds_def, main_config, cmdopts)
def population_size_from_def(
exp_def: definition.BaseExpDef, main_config: types.YAMLDict, cmdopts: types.Cmdopts
) -> int:
return ros1.callbacks.population_size_from_def(exp_def, main_config, cmdopts)
def agent_prefix_extract(main_config: types.YAMLDict, cmdopts: types.Cmdopts) -> str:
return ros1.callbacks.robot_prefix_extract(main_config, cmdopts)
def pre_exp_diagnostics(
cmdopts: types.Cmdopts, pathset: batchroot.PathSet, logger: logging.Logger
) -> None:
s = "batch_exp_root='%s',runs/exp=%s"
logger.info(s, pathset.root, cmdopts["n_runs"])
__all__ = [
"ExpConfigurer",
"ExpRunShellCmdsGenerator",
"ExpShellCmdsGenerator",
"cmdline_postparse_configure",
"execenv_check",
]