# Copyright 2021 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
#
"""
Command line parsing and validation for the :term:`ROS1+robot` engine.
"""
# Core packages
import typing as tp
import argparse
# 3rd party packages
# Project packages
from sierra.core import types, ros1, config
from sierra.plugins import PluginCmdline
from sierra.core.ros1 import cmdline
[docs]
class EngineCmdline(cmdline.ROSCmdline):
"""Defines :term:`ROS1` extensions to :class:`~sierra.core.cmdline.CoreCmdline`."""
def __init__(
self,
parents: tp.Optional[list[argparse.ArgumentParser]],
stages: list[int],
) -> None:
super().__init__(parents, stages)
[docs]
def init_multistage(self) -> None:
super().init_multistage()
self.multistage.add_argument(
"--skip-online-check",
help="""
If passed, then the usual 'is this robot online' checks will be
skipped.
"""
+ self.stage_usage_doc([1, 2]),
action="store_true",
)
self.multistage.add_argument(
"--online-check-method",
choices=["ping+ssh", "nc+ssh"],
help="""
How SIERRA should check if a given robot is online. Valid
values:
- ``ping+ssh`` - First, verify that you can ping each the
hostname/IP associated with each robot. Second, verify
that passwordless ssh to the hostname/IP works. This is
the most common option.
- ``nc+ssh`` - First, verify that an ssh connection exists
from the SIERRA host machine to the robot on the
specified port using netcat. Second, verify that
passwordless ssh to the robot on the specified port
works. This is useful when connecting to the robots
through a reverse SSH tunnel, which can be necessary if
the robots don't have a fixed IP address and cannot be
addressed by FQDN (looking at you eduroam...).
""",
default="ping+ssh",
)
[docs]
def init_stage1(self) -> None:
super().init_stage1()
self.stage1.add_argument(
"--skip-sync",
help="""
If passed, then the generated experiment will not be synced to
robots. This is useful when:
- You are developing your :term:`Project` and just want to
check locally if the experiment is being generated
properly.
- You have a lot of robots and/or the network connection
from the SIERRA host machine to the robots is slow, and
copying the experiment multiple times as you tweak
parameters takes a long time.
"""
+ self.stage_usage_doc([1]),
action="store_true",
)
[docs]
def init_stage2(self) -> None:
super().init_stage2()
self.stage2.add_argument(
"--exec-resume",
help="""
Resume a batch experiment that was killed/stopped/etc last time
SIERRA was run.
"""
+ self.stage_usage_doc([2]),
action="store_true",
default=False,
)
self.stage2.add_argument(
"--exec-inter-run-pause",
metavar="SECONDS",
help="""
How long to pause between :term:`Experimental Runs
<Experimental Run>`, giving you time to reset the environment,
move robots, etc.
"""
+ self.stage_usage_doc([2]),
type=int,
default=config.ROS["inter_run_pause"],
)
def build(parents: list[argparse.ArgumentParser], stages: list[int]) -> PluginCmdline:
"""
Get a cmdline parser supporting the :term:`ROS1+Robot` engine.
Extends built-in cmdline parser with:
- :class:`~sierra.core.ros1.cmdline.ROSCmdline` (ROS1 common)
- :class:`~sierra.plugins.engine.ros1robot.cmdline.EngineCmdline`
(ROS1+robot specifics)
"""
return EngineCmdline(parents=parents, stages=stages)
def to_cmdopts(args: argparse.Namespace) -> types.Cmdopts:
"""Update cmdopts with ROS1+robot-specific cmdline options."""
opts = ros1.cmdline.to_cmdopts(args)
self_updates = {
# Multistage
"exec_jobs_per_node": 1, # (1 job/robot)
"skip_online_check": args.skip_online_check,
"online_check_method": args.online_check_method,
# stage 1
"skip_sync": args.skip_sync,
# stage 2
"exec_resume": args.exec_resume,
"exec_inter_run_pause": args.exec_inter_run_pause,
}
opts |= self_updates
return opts
def sphinx_cmdline_stage1():
return EngineCmdline([], [1]).parser
def sphinx_cmdline_stage2():
return EngineCmdline([], [2]).parser
__all__ = ["EngineCmdline"]