# Copyright 2021 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""
Robot plugin for running SIERRA with a set of Turtlebot3 robots.
"""
# Core packages
import os
import logging
import typing as tp
import argparse
import shutil
import pathlib
# 3rd party packages
import implements
# Project packages
from sierra.core import types, platform, utils
from sierra.core.experiment import bindings
[docs]@implements.implements(bindings.IExpShellCmdsGenerator)
class ExpShellCmdsGenerator():
"""Generate the cmds to invoke GNU Parallel to launch ROS on the turtlebots.
"""
[docs] def __init__(self,
cmdopts: types.Cmdopts,
exp_num: int) -> None:
self.cmdopts = cmdopts
self.exp_num = exp_num
[docs] def pre_exp_cmds(self) -> tp.List[types.ShellCmdSpec]:
return []
[docs] def post_exp_cmds(self) -> tp.List[types.ShellCmdSpec]:
return []
[docs] def exec_exp_cmds(self, exec_opts: types.StrDict) -> tp.List[types.ShellCmdSpec]:
jobid = os.getpid()
# Even if we are passed --nodelist, we still make our own copy of it, so
# that the user can safely modify it (if they want to) after running
# stage 1.
nodelist = pathlib.Path(exec_opts['exp_input_root'],
"{0}-nodelist.txt".format(jobid))
resume = ''
# This can't be --resume, because then GNU parallel looks at the results
# directory, and if there is stuff in it, (apparently) assumes that the
# job finished...
if exec_opts['exec_resume']:
resume = '--resume-failed'
# Make sure there are no duplicate nodes
unique_nodes = types.ShellCmdSpec(
cmd='sort -u {0} > {1}'.format(exec_opts["nodefile"], nodelist),
shell=True,
wait=True)
# Make sure GNU parallel uses the right shell, because it seems to
# defaults to /bin/sh since all cmds are run in a python shell which
# does not have $SHELL set.
use_bash = types.ShellCmdSpec(
cmd='export PARALLEL_SHELL={0}'.format(shutil.which('bash')),
shell=True,
env=True,
wait=True)
ret = [use_bash, unique_nodes]
# 1 GNU parallel command to launch each experimental run, because each
# run might use all available nodes/robots.
for i in range(self.cmdopts['n_runs']):
# GNU parallel cmd for robots (slaves)
robots = 'parallel {2} ' \
'--jobs {1} ' \
'--results {4} ' \
'--joblog {3} ' \
'--sshloginfile {0} ' \
'--workdir {4} < "{5}"'
robots_ipath = exec_opts['cmdfile_stem_path'] + \
f"_run{i}_slave" + exec_opts['cmdfile_ext']
robot_log = pathlib.Path(exec_opts['scratch_dir'],
f"parallel-slaves-run{i}.log")
robots = robots.format(nodelist,
exec_opts['n_jobs'],
resume,
robot_log,
exec_opts['scratch_dir'],
robots_ipath)
# If no master is spawned, then we need to wait for this GNU
# parallel cmd. If the master is spawned, then we wait for THAT
# command; waiting for both results in the master never starting
# because that cmd is never run.
robots_spec = types.ShellCmdSpec(cmd=robots,
shell=True,
wait=self.cmdopts['no_master_node'])
ret.append(robots_spec)
if not self.cmdopts['no_master_node']:
ros_master = 'parallel {3} ' \
'--results {1} ' \
'--joblog {0} ' \
'--workdir {1} < "{2}"'
ros_master_ipath = exec_opts['cmdfile_stem_path'] + \
f"_run{i}_master" + exec_opts['cmdfile_ext']
master_log = pathlib.Path(exec_opts['scratch_dir'],
f"parallel-master-run{i}.log")
ros_master = ros_master.format(master_log,
exec_opts['scratch_dir'],
ros_master_ipath,
resume)
master_spec = types.ShellCmdSpec(cmd=ros_master,
shell=True,
wait=not self.cmdopts['no_master_node'])
ret.append(master_spec)
wait = ('echo "{0} seconds until launching next run!"; '
'sleep {0}s ;'.format(self.cmdopts['exec_inter_run_pause']))
wait_spec = types.ShellCmdSpec(cmd=wait,
shell=True,
wait=True)
ret.append(wait_spec)
return ret
[docs]class ExecEnvChecker(platform.ExecEnvChecker):
[docs] def __init__(self, cmdopts: types.Cmdopts) -> None:
super().__init__(cmdopts)
self.cmdopts = cmdopts
self.logger = logging.getLogger('robot.turtlebot3')
[docs] def __call__(self) -> None:
nodes = self.parse_nodefile(self.cmdopts['nodefile'])
for node in nodes:
if int(node.n_cores) != 1:
self.logger.warning(("Nodefile %s, host %s has multiple "
"cores; turtlebots are single core"),
self.cmdopts['nodefile'],
node.hostname)
if not self.cmdopts['skip_online_check']:
self.check_connectivity(node.login,
node.hostname,
node.port,
'turtlebot3')
__api__ = [
'ParsedCmdlineConfigurer',
'ExpShellCmdsGenerator',
'ExecEnvChecker'
]