# Copyright 2020 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""Classes for the population size batch criteria.
See :ref:`ln-sierra-platform-ros1gazebo-bc-population-size` for usage
documentation.
"""
# Core packages
import typing as tp
import random
import logging
import pathlib
# 3rd party packages
import implements
# Project packages
from sierra.core.variables import batch_criteria as bc
from sierra.core.experiment import xml
from sierra.core import types, utils
from sierra.core.vector import Vector3D
from sierra.core.variables import population_size
[docs]@implements.implements(bc.IConcreteBatchCriteria)
@implements.implements(bc.IQueryableBatchCriteria)
class PopulationSize(population_size.BasePopulationSize):
"""A univariate range of system sizes used to define batch experiments.
This class is a base class which should (almost) never be used on its
own. Instead, the ``factory()`` function should be used to dynamically
create derived classes expressing the user's desired size distribution.
Note: Usage of this class assumes homogeneous systems.
Attributes:
size_list: List of integer system sizes defining the range of the
variable for the batch experiment.
"""
[docs] def __init__(self,
cli_arg: str,
main_config: types.YAMLDict,
batch_input_root: pathlib.Path,
robot: str,
sizes: tp.List[int],
positions: tp.List[Vector3D]) -> None:
population_size.BasePopulationSize.__init__(self,
cli_arg,
main_config,
batch_input_root)
self.sizes = sizes
self.robot = robot
self.positions = positions
self.logger = logging.getLogger(__name__)
if len(positions) < self.sizes[-1]:
self.logger.warning("# possible positions < # robots: %s < %s",
len(positions),
self.sizes[-1])
self.tag_adds = [] # type: tp.List[xml.TagAddList]
[docs] def gen_tag_addlist(self) -> tp.List[xml.TagAddList]:
"""Generate XML modifications to set system sizes.
"""
if not self.tag_adds:
robot_config = self.main_config['ros']['robots'][self.robot]
prefix = robot_config['prefix']
model_base = robot_config['model']
model_variant = robot_config.get('model_variant', '')
if model_variant != '':
model = f"{model_base}_{model_variant}"
else:
model = model_base
desc_cmd = f"$(find xacro)/xacro $(find {model_base}_description)/urdf/{model}.urdf.xacro"
for s in self.sizes:
exp_adds = xml.TagAddList()
pos_i = random.randint(0, len(self.positions) - 1)
exp_adds.append(xml.TagAdd(".",
"master",
{},
True))
exp_adds.append(xml.TagAdd("./master",
"group",
{
'ns': 'sierra'
},
False))
exp_adds.append(xml.TagAdd("./master/group/[@ns='sierra']",
"param",
{
'name': 'experiment/n_robots',
'value': str(s)
},
False))
for i in range(0, s):
ns = f'{prefix}{i}'
pos = self.positions[pos_i]
pos_i = (pos_i + 1) % len(self.positions)
spawn_cmd_args = f"-urdf -model {model}_{ns} -x {pos.x} -y {pos.y} -z {pos.z} -param robot_description"
exp_adds.append(xml.TagAdd("./robot",
"group",
{
'ns': ns
},
True))
exp_adds.append(xml.TagAdd(f"./robot/group/[@ns='{ns}']",
"param",
{
"name": "tf_prefix",
"value": ns
},
True))
# These two tag adds are OK to use because:
#
# - All robots in Gazebo are created using spawn_model
# initially.
#
# - All robots in Gazebo will provide a robot description
# .urdf.xacro per ROS naming conventions
exp_adds.append(xml.TagAdd(f"./robot/group/[@ns='{ns}']",
"param",
{
"name": "robot_description",
"command": desc_cmd
},
True))
exp_adds.append(xml.TagAdd(f"./robot/group/[@ns='{ns}']",
"node",
{
"name": "spawn_urdf",
"pkg": "gazebo_ros",
"type": "spawn_model",
"args": spawn_cmd_args
},
True))
self.tag_adds.append(exp_adds)
return self.tag_adds
[docs] def gen_exp_names(self, cmdopts: types.Cmdopts) -> tp.List[str]:
adds = self.gen_tag_addlist()
return ['exp' + str(x) for x in range(0, len(adds))]
[docs] def n_robots(self, exp_num: int) -> int:
return int(len(self.tag_adds[exp_num]) / len(self.tag_adds[0]))
def factory(cli_arg: str,
main_config: types.YAMLDict,
cmdopts: types.Cmdopts,
**kwargs) -> PopulationSize:
"""Create a :class:`PopulationSize` derived class from the cmdline definition.
"""
parser = population_size.Parser()
attr = parser(cli_arg)
max_sizes = parser.to_sizes(attr)
if cmdopts['robot_positions']:
positions = [Vector3D.from_str(s,
astype=float) for s in cmdopts['robot_positions']]
else:
# Get the dimensions of the effective arena from the scenario so we can
# place robots randomly within it.
kw = utils.gen_scenario_spec(cmdopts, **kwargs)
xs = random.choices(range(0, kw['arena_x']), k=max_sizes[-1]) # type: ignore
ys = random.choices(range(0, kw['arena_y']), k=max_sizes[-1]) # type: ignore
zs = random.choices(range(0, kw['arena_z']), k=max_sizes[-1]) # type: ignore
positions = [Vector3D(x, y, z) for x, y, z in zip(xs, ys, zs)]
def __init__(self) -> None:
PopulationSize.__init__(self,
cli_arg,
main_config,
cmdopts['batch_input_root'],
cmdopts['robot'],
max_sizes,
positions)
return type(cli_arg, # type: ignore
(PopulationSize,),
{"__init__": __init__})
__api__ = [
'PopulationSize'
]