Source code for sierra.core.ros1.variables.exp_setup

# Copyright 2021 John Harwell, All rights reserved.
#
#  SPDX-License-Identifier: MIT

"""Classes for the ``--exp-setup`` cmdline option for ROS1 platforms.

See :ref:`ln-sierra-vars-expsetup` for usage documentation.

"""

# Core packages
import typing as tp

# 3rd party packages
import implements

# Project packages
from sierra.core.variables.base_variable import IBaseVariable
from sierra.core.experiment import xml
from sierra.core import config
from sierra.core.variables.exp_setup import Parser


[docs]@implements.implements(IBaseVariable) class ExpSetup(): """ Defines the experimental setup for ROS experiments. Attributes: n_secs_per_run: The :term:`Experimental Run` duration in seconds, NOT :term:`Ticks <Tick>` or timesteps. n_datapoints: How many datapoints to capture during the experimental run. n_ticks_per_sec: How many times per second robot controllers will be run. """
[docs] def __init__(self, n_secs_per_run: int, n_datapoints: int, n_ticks_per_sec: int, barrier_start: bool, robots_need_timekeeper: bool) -> None: self.n_secs_per_run = n_secs_per_run self.n_datapoints = n_datapoints self.n_ticks_per_sec = n_ticks_per_sec self.barrier_start = barrier_start self.robots_need_timekeeper = robots_need_timekeeper self.tag_adds = None
[docs] def gen_attr_changelist(self) -> tp.List[xml.AttrChangeSet]: return []
[docs] def gen_tag_rmlist(self) -> tp.List[xml.TagRmList]: return []
[docs] def gen_tag_addlist(self) -> tp.List[xml.TagAddList]: if not self.tag_adds: adds = xml.TagAddList( xml.TagAdd("./master/group/[@ns='sierra']", "param", { "name": "experiment/length", "value": str(self.n_secs_per_run), }, True), xml.TagAdd("./master/group/[@ns='sierra']", "param", { "name": "experiment/ticks_per_sec", "value": str(self.n_ticks_per_sec), }, True), xml.TagAdd("./master/group/[@ns='sierra']", "param", { "name": "experiment/barrier_start", "value": str(self.barrier_start).lower(), }, True), xml.TagAdd("./master/group/[@ns='sierra']", "node", { "name": "sierra_timekeeper", "pkg": "sierra_rosbridge", "type": "sierra_timekeeper.py", "required": "true", # Otherwise rospy prints nothing "output": "screen" }, True), xml.TagAdd("./robot/group/[@ns='sierra']", "param", { "name": "experiment/length", "value": str(self.n_secs_per_run), }, True), xml.TagAdd("./robot/group/[@ns='sierra']", "param", { "name": "experiment/ticks_per_sec", "value": str(self.n_ticks_per_sec), }, True), xml.TagAdd("./robot/group/[@ns='sierra']", "param", { "name": "experiment/barrier_start", "value": str(self.barrier_start).lower() }, True) ) if self.robots_need_timekeeper: # Robots also need the timekeeper when they are real and not # simulated robots. With simulated robots, robots share the root # ROS namespace with the master, so duplicating the timekeeper # in that case causes an error. # # Because real robot nodes are launch in a separate roslaunch # than the master node (potentially), and that node exiting will # not cause all robots to exit. This is needed even if robot # nodes respect the set experiment time, because they might be # using ROS packages which launch nodes which DON'T respect the # set experiment time and will stay active until you kill # them. This can cause issues if a robot node expects that at # most 1 dependent node will be active at a given time. Plus, # it's just sloppy to leave that sort of thing hanging around # after a run exits. adds.append(xml.TagAdd("./robot/group/[@ns='sierra']", "node", { "name": "sierra_timekeeper", "pkg": "sierra_rosbridge", "type": "sierra_timekeeper.py", "required": "true", # Otherwise rospy prints nothing "output": "screen" }, True) ) self.tag_adds = adds return [self.tag_adds]
[docs] def gen_files(self) -> None: pass
def factory(arg: str, barrier_start: bool, robots_need_timekeeper: bool) -> ExpSetup: """Create an :class:`ExpSetup` derived class from the command line definition. Arguments: arg: The value of ``--exp-setup``. """ parser = Parser({'n_secs_per_run': config.kROS['n_secs_per_run'], 'n_ticks_per_sec': config.kROS['n_ticks_per_sec'], 'n_datapoints': config.kExperimentalRunData['n_datapoints_1D']}) attr = parser(arg) def __init__(self: ExpSetup) -> None: ExpSetup.__init__(self, attr["n_secs_per_run"], attr['n_datapoints'], attr['n_ticks_per_sec'], barrier_start, robots_need_timekeeper) return type(attr['pretty_name'], (ExpSetup,), {"__init__": __init__}) # type: ignore __api__ = [ 'ExpSetup', ]