Source code for sierra.plugins.platform.argos.generators.platform_generators
# Copyright 2021 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""Classes for generating common XML modifications for :term:`ARGoS`.
I.e., changes which are platform-specific, but applicable to all projects using
the platform.
"""
# Core packages
import typing as tp
import logging
import sys
import pathlib
# 3rd party packages
import psutil
# Project packages
from sierra.core.utils import ArenaExtent
from sierra.core.experiment import spec, definition, xml
from sierra.core import types, config, utils
import sierra.core.plugin_manager as pm
from sierra.plugins.platform.argos.variables import arena_shape
from sierra.plugins.platform.argos.variables import population_size
from sierra.plugins.platform.argos.variables import physics_engines
from sierra.plugins.platform.argos.variables import cameras
from sierra.plugins.platform.argos.variables import rendering
import sierra.plugins.platform.argos.variables.exp_setup as exp
[docs]class PlatformExpDefGenerator():
"""
Init the object.
Attributes:
controller: The controller used for the experiment.
cmdopts: Dictionary of parsed cmdline parameters.
"""
[docs] def __init__(self,
exp_spec: spec.ExperimentSpec,
controller: str,
cmdopts: types.Cmdopts,
**kwargs) -> None:
self.controller = controller
self.spec = exp_spec
self.cmdopts = cmdopts
self.template_input_file = kwargs['template_input_file']
self.kwargs = kwargs
self.logger = logging.getLogger(__name__)
[docs] def generate(self) -> definition.XMLExpDef:
"""Generate XML modifications common to all ARGoS experiments.
"""
# ARGoS uses a single input file
wr_config = xml.WriterConfig([{'src_parent': None,
'src_tag': '.',
'opath_leaf': config.kARGoS['launch_file_ext'],
'create_tags': None,
'dest_parent': None,
'rename_to': None
}])
exp_def = definition.XMLExpDef(input_fpath=self.template_input_file,
write_config=wr_config)
# Generate # robots
self._generate_n_robots(exp_def)
# Setup library
self._generate_library(exp_def)
# Setup simulation visualizations
self._generate_visualization(exp_def)
# Setup threading
self._generate_threading(exp_def)
# Setup robot sensors/actuators
self._generate_saa(exp_def)
# Setup simulation time parameters
self._generate_time(exp_def)
return exp_def
[docs] def generate_physics(self,
exp_def: definition.XMLExpDef,
cmdopts: types.Cmdopts,
engine_type: str,
n_engines: int,
extents: tp.List[ArenaExtent],
remove_defs: bool = True) -> None:
"""
Generate XML changes for the specified physics engines configuration.
Physics engine definition removal is optional, because when mixing 2D/3D
engine definitions, you only want to remove the existing definitions
BEFORE you have adding first of the mixed definitions. Doing so every
time results in only the LAST of the mixed definitions being present in
the input file.
Does not write generated changes to the simulation definition pickle
file.
"""
# Valid to have 0 engines here if 2D/3D were mixed but only 1 engine was
# specified for the whole simulation.
if n_engines == 0:
self.logger.warning("0 engines of type %s specified", engine_type)
return
self.logger.trace(("Generating changes for %d '%s' " # type: ignore
"physics engines (all runs)"),
n_engines,
engine_type)
if cmdopts['physics_spatial_hash2D']:
assert hasattr(self.spec.criteria, 'n_robots'),\
("When using the 2D spatial hash, the batch "
"criteria must implement bc.IQueryableBatchCriteria")
n_robots = self.spec.criteria.n_robots(self.spec.exp_num)
else:
n_robots = None
module = pm.pipeline.get_plugin_module(cmdopts['platform'])
robot_type = module.robot_type_from_def(exp_def)
pe = physics_engines.factory(engine_type,
n_engines,
n_robots,
robot_type,
cmdopts,
extents)
utils.apply_to_expdef(pe, exp_def)
[docs] def generate_arena_shape(self,
exp_def: definition.XMLExpDef,
shape: arena_shape.ArenaShape) -> None:
"""
Generate XML changes for the specified arena shape.
Writes generated changes to the simulation definition pickle file.
"""
self.logger.trace(("Generating changes for arena " # type: ignore
"share (all runs)"))
_, adds, chgs = utils.apply_to_expdef(shape, exp_def)
utils.pickle_modifications(adds, chgs, self.spec.exp_def_fpath)
[docs] def _generate_n_robots(self, exp_def: definition.XMLExpDef) -> None:
"""
Generate XML changes to setup # robots (if specified on cmdline).
Writes generated changes to the simulation definition pickle file.
"""
if self.cmdopts['n_robots'] is None:
return
self.logger.trace(("Generating changes for # robots " # type: ignore
"(all runs)"))
chgs = population_size.PopulationSize.gen_attr_changelist_from_list(
[self.cmdopts['n_robots']])
for a in chgs[0]:
exp_def.attr_change(a.path, a.attr, a.value, True)
# Write # robots info to file for later retrieval
chgs[0].pickle(self.spec.exp_def_fpath)
[docs] def _generate_saa(self, exp_def: definition.XMLExpDef) -> None:
"""Generate XML changes to disable selected sensors/actuators.
Some sensors and actuators are computationally expensive in large
populations, but not that costly if the # robots is small.
Does not write generated changes to the simulation definition pickle
file.
"""
self.logger.trace(("Generating changes for SAA " # type: ignore
"(all runs)"))
if not self.cmdopts["with_robot_rab"]:
exp_def.tag_remove(".//media", "range_and_bearing", noprint=True)
exp_def.tag_remove(".//actuators",
"range_and_bearing",
noprint=True)
exp_def.tag_remove(".//sensors", "range_and_bearing", noprint=True)
if not self.cmdopts["with_robot_leds"]:
exp_def.tag_remove(".//actuators", "leds", noprint=True)
exp_def.tag_remove(".//sensors",
"colored_blob_omnidirectional_camera",
noprint=True)
exp_def.tag_remove(".//media", "led", noprint=True)
if not self.cmdopts["with_robot_battery"]:
exp_def.tag_remove(".//sensors", "battery", noprint=True)
exp_def.tag_remove(".//entity/*", "battery", noprint=True)
[docs] def _generate_time(self, exp_def: definition.XMLExpDef) -> None:
"""
Generate XML changes to setup simulation time parameters.
Writes generated changes to the simulation definition pickle file.
"""
self.logger.debug("Using exp_setup=%s", self.cmdopts['exp_setup'])
setup = exp.factory(self.cmdopts["exp_setup"])()
rms, adds, chgs = utils.apply_to_expdef(setup, exp_def)
# Write time setup info to file for later retrieval
utils.pickle_modifications(adds, chgs, self.spec.exp_def_fpath)
[docs] def _generate_threading(self, exp_def: definition.XMLExpDef) -> None:
"""Generate XML changes to set the # of cores for a simulation to use.
This may be less than the total # available on the system, depending on
the experiment definition and user preferences.
Does not write generated changes to the simulation definition pickle
file.
"""
self.logger.trace( # type: ignore
"Generating changes for threading (all runs)")
exp_def.attr_change(".//system",
"threads",
str(self.cmdopts["physics_n_engines"]))
# Only valid on linux, per ARGoS, so we ely on the user to add this
# attribute to the input file if it is applicable.
if not exp_def.attr_get(".//system", "pin_threads_to_cores"):
return
if sys.platform != "linux":
self.logger.critical((".//system/pin_threads_to_cores only "
"valid on linux--configuration error?"))
return
# If you don't do this, you will get runtime errors in ARGoS when you
# attempt to set thread affinity to a core that does not exist. This is
# better than modifying ARGoS source to only pin threads to cores that
# exist, because it implies a configuration error by the user, and
# SIERRA should fail as a result (correctness by construction).
if self.cmdopts['physics_n_engines'] > psutil.cpu_count():
self.logger.warning(("Disabling pinning threads to cores: "
"mores threads than cores! %s > %s"),
self.cmdopts['physics_n_engines'],
psutil.cpu_count())
exp_def.attr_change(".//system",
"pin_threads_to_cores",
"false")
else:
exp_def.attr_change(".//system",
"pin_threads_to_cores",
"true")
[docs] def _generate_library(self, exp_def: definition.XMLExpDef) -> None:
"""Generate XML changes for ARGoS search paths for controller,loop functions.
Set to the name of the plugin passed on the cmdline, unless overriden in
configuration. The ``__CONTROLLER__`` tag is changed during stage 1, but
since this function is called as part of common def generation, it
happens BEFORE that, and so this is OK. If, for some reason that
assumption becomes invalid, a warning will be issued about a
non-existent XML path, so it won't be a silent error.
Does not write generated changes to the simulation definition pickle
file.
"""
self.logger.trace( # type: ignore
"Generating changes for library (all runs)")
run_config = self.spec.criteria.main_config['sierra']['run']
lib_name = run_config.get('library_name',
'lib' + self.cmdopts['project'])
exp_def.attr_change(".//loop_functions",
"library",
lib_name)
exp_def.attr_change(".//__CONTROLLER__",
"library",
lib_name)
[docs] def _generate_visualization(self, exp_def: definition.XMLExpDef) -> None:
"""Generate XML changes to remove visualization elements from input file.
This depends on cmdline parameters, as visualization definitions should
be left in if ARGoS should output simulation frames for video creation.
Does not write generated changes to the simulation definition pickle
file.
"""
self.logger.trace(("Generating changes for " # type: ignore
"visualization (all runs)"))
if not self.cmdopts["platform_vc"]:
# ARGoS visualizations
exp_def.tag_remove(".", "./visualization", noprint=True)
else:
self.logger.debug('Frame grabbing enabled')
# Rendering must be processing before cameras, because it deletes
# the <qt_opengl> tag if it exists, and then re-adds it.
render = rendering.factory(self.cmdopts)
utils.apply_to_expdef(render, exp_def)
cams = cameras.factory(self.cmdopts, [self.spec.arena_dim])
utils.apply_to_expdef(cams, exp_def)
[docs]class PlatformExpRunDefUniqueGenerator:
"""Generate XML changes unique to each experimental run.
These include:
- Random seeds for each simulation.
Attributes:
run_num: The runulation # in the experiment.
run_output_path: Path to simulation output directory within experiment
root.
cmdopts: Dictionary containing parsed cmdline options.
"""
[docs] def __init__(self,
run_num: int,
run_output_path: pathlib.Path,
launch_stem_path: pathlib.Path,
random_seed: int,
cmdopts: types.Cmdopts) -> None:
self.run_output_path = run_output_path
self.launch_stem_path = launch_stem_path
self.cmdopts = cmdopts
self.run_num = run_num
self.random_seed = random_seed
self.logger = logging.getLogger(__name__)
def __generate_random(self, exp_def) -> None:
"""
Generate XML changes for random seeding for a specific simulation.
"""
self.logger.trace("Generating random seed changes for run%s", # type: ignore
self.run_num)
# Set the random seed in the input file
exp_def.attr_change(".//experiment",
"random_seed",
str(self.random_seed))
[docs] def generate(self, exp_def: definition.XMLExpDef):
# Setup simulation random seed
self.__generate_random(exp_def)
# Setup simulation visualization output
self.__generate_visualization(exp_def)
def __generate_visualization(self, exp_def: definition.XMLExpDef):
"""
Generate XML changes for setting up rendering for a specific simulation.
"""
self.logger.trace("Generating visualization changes for run%s", # type: ignore
self.run_num)
if self.cmdopts['platform_vc']:
argos = config.kRendering['argos']
frames_fpath = self.run_output_path / argos['frames_leaf']
exp_def.attr_change(".//qt-opengl/frame_grabbing",
"directory",
str(frames_fpath)) # probably will not be present
__api__ = [
'PlatformExpDefGenerator',
'PlatformExpRunDefUniqueGenerator'
]