# Copyright 2022 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""Helper classes for XML experiment definitions.
Adding/removing tags, modifying attributes, configuration for how to write XML
files.
"""
# Core packages
import typing as tp
import logging
import pickle
import xml.etree.ElementTree as ET
import sys
import pathlib
# 3rd party packages
# Project packages
from sierra.core import types
[docs]class AttrChange():
"""
Specification for a change to an existing XML attribute.
"""
[docs] def __init__(self,
path: str,
attr: str,
value: tp.Union[str, int, float]) -> None:
self.path = path
self.attr = attr
self.value = str(value)
[docs] def __iter__(self):
yield from [self.path, self.attr, self.value]
[docs] def __repr__(self) -> str:
return self.path + '/' + self.attr + ': ' + str(self.value)
[docs]class TagRm():
"""
Specification for removal of an existing XML tag.
"""
[docs] def __init__(self, path: str, tag: str):
"""
Init the object.
Arguments:
path: The path to the **parent** of the tag you want to remove, in
XPath syntax.
tag: The name of the tag to remove.
"""
self.path = path
self.tag = tag
[docs] def __iter__(self):
yield from [self.path, self.tag]
[docs] def __repr__(self) -> str:
return self.path + '/' + self.tag
[docs]class TagAdd():
"""
Specification for adding a new XML tag.
The tag may be added idempotently, or duplicates can be allowed.
"""
[docs] @staticmethod
def as_root(tag: str,
attr: types.StrDict) -> 'TagAdd':
return TagAdd('', tag, attr, False)
[docs] def __init__(self,
path: str,
tag: str,
attr: types.StrDict,
allow_dup: bool):
"""
Init the object.
Arguments:
path: The path to the **parent** tag you want to add a new tag
under, in XPath syntax. If None, then the tag will be added as
the root XML tag.
tag: The name of the tag to add.
attr: A dictionary of (attribute, value) pairs to also create as
children of the new tag when creating the new tag.
"""
self.path = path
self.tag = tag
self.attr = attr
self.allow_dup = allow_dup
[docs] def __iter__(self):
yield from [self.path, self.tag, self.attr]
[docs] def __repr__(self) -> str:
return self.path + '/' + self.tag + ': ' + str(self.attr)
[docs]class AttrChangeSet():
"""
Data structure for :class:`AttrChange` objects.
The order in which attributes are changed doesn't matter from the standpoint
of correctness (i.e., different orders won't cause crashes).
"""
[docs] @staticmethod
def unpickle(fpath: pathlib.Path) -> 'AttrChangeSet':
"""Unpickle XML changes.
You don't know how many there are, so go until you get an exception.
"""
exp_def = AttrChangeSet()
try:
with open(fpath, 'rb') as f:
while True:
exp_def |= AttrChangeSet(*pickle.load(f))
except EOFError:
pass
return exp_def
[docs] def __init__(self, *args: AttrChange) -> None:
self.changes = set(args)
self.logger = logging.getLogger(__name__)
[docs] def __len__(self) -> int:
return len(self.changes)
[docs] def __iter__(self) -> tp.Iterator[AttrChange]:
return iter(self.changes)
[docs] def __ior__(self, other: 'AttrChangeSet') -> 'AttrChangeSet':
self.changes |= other.changes
return self
[docs] def __or__(self, other: 'AttrChangeSet') -> 'AttrChangeSet':
new = AttrChangeSet(*self.changes)
new |= other
return new
[docs] def __repr__(self) -> str:
return str(self.changes)
[docs] def add(self, chg: AttrChange) -> None:
self.changes.add(chg)
[docs] def pickle(self, fpath: pathlib.Path, delete: bool = False) -> None:
from sierra.core import utils
if delete and utils.path_exists(fpath):
fpath.unlink()
with open(fpath, 'ab') as f:
utils.pickle_dump(self.changes, f)
[docs]class TagRmList():
"""
Data structure for :class:`TagRm` objects.
The order in which tags are removed matters (i.e., if you remove dependent
tags in the wrong order you will get an exception), hence the list
representation.
"""
[docs] def __init__(self, *args: TagRm) -> None:
self.rms = list(args)
[docs] def __len__(self) -> int:
return len(self.rms)
[docs] def __iter__(self) -> tp.Iterator[TagRm]:
return iter(self.rms)
[docs] def __repr__(self) -> str:
return str(self.rms)
[docs] def extend(self, other: 'TagRmList') -> None:
self.rms.extend(other.rms)
[docs] def append(self, other: TagRm) -> None:
self.rms.append(other)
[docs] def pickle(self, fpath: pathlib.Path, delete: bool = False) -> None:
from sierra.core import utils
if delete and utils.path_exists(fpath):
fpath.unlink()
with open(fpath, 'ab') as f:
utils.pickle_dump(self.rms, f)
[docs]class TagAddList():
"""
Data structure for :class:`TagAdd` objects.
The order in which tags are added matters (i.e., if you add dependent tags
in the wrong order you will get an exception), hence the list
representation.
"""
[docs] @staticmethod
def unpickle(fpath: pathlib.Path) -> tp.Optional['TagAddList']:
"""Unpickle XML modifications.
You don't know how many there are, so go until you get an exception.
"""
exp_def = TagAddList()
try:
with open(fpath, 'rb') as f:
while True:
exp_def.append(*pickle.load(f))
except EOFError:
pass
return exp_def
[docs] def __init__(self, *args: TagAdd) -> None:
self.adds = list(args)
[docs] def __len__(self) -> int:
return len(self.adds)
[docs] def __iter__(self) -> tp.Iterator[TagAdd]:
return iter(self.adds)
[docs] def __repr__(self) -> str:
return str(self.adds)
[docs] def extend(self, other: 'TagAddList') -> None:
self.adds.extend(other.adds)
[docs] def append(self, other: TagAdd) -> None:
self.adds.append(other)
[docs] def prepend(self, other: TagAdd) -> None:
self.adds.insert(0, other)
[docs] def pickle(self, fpath: pathlib.Path, delete: bool = False) -> None:
from sierra.core import utils
if delete and utils.path_exists(fpath):
fpath.unlink()
with open(fpath, 'ab') as f:
utils.pickle_dump(self.adds, f)
[docs]class WriterConfig():
"""Config for writing :class:`~sierra.core.experiment.definition.XMLExpDef`.
Different parts of the XML tree can be written to multiple XML files.
Attributes:
values: Dict with the following possible key, value pairs:
``src_parent`` - The parent of the root of the XML tree
specifying a sub-tree to write out as a child
of ``dest_root``. This key is required.
``src_tag`` - The name of the tag within ``src_parent`` to write
out. This key is required.
``dest_parent`` - The new name of ``src_root`` when writing out
the partial XML tree to a new file. This key
is optional.
``create_tags`` - Additional tags to create under
``dest_parent``. Must form a tree with a
single root.
``opath_leaf`` - Additional bits added to whatever the opath
file stem that is set for the
:class:`~sierra.core.experiment.definition.XMLExpDef`
instance. This key is optional. Can be used to
add an extension.
``child_grafts`` - Additional bits of the XML tree to add under
the new ``dest_root/src_tag``, specified as a
list of XPath strings. You can't just have
multiple src_roots because that makes
unambiguous renaming of ``src_root`` ->
``dest_root`` impossible. This key is
optional.
"""
[docs] def __init__(self, values: tp.List[dict]) -> None:
self.values = values
[docs] def add(self, value: dict) -> None:
self.values.append(value)
class Writer():
"""Write the XML experiment to the filesystem according to configuration.
More than one file may be written, as specified.
"""
def __init__(self, tree: ET.ElementTree) -> None:
self.tree = tree
self.root = tree.getroot()
self.logger = logging.getLogger(__name__)
def __call__(self,
write_config: WriterConfig,
base_opath: pathlib.Path) -> None:
for config in write_config.values:
self._write_with_config(base_opath, config)
def _write_with_config(self,
base_opath: pathlib.Path,
config: dict) -> None:
tree, src_root, opath = self._write_prepare_tree(base_opath, config)
if tree is None:
self.logger.warning("Cannot write non-existent tree@%s to %s",
src_root,
opath)
return
self.logger.trace("Write tree@%s to %s", # type: ignore
src_root,
opath)
# Renaming tree root is not required
if 'rename_to' in config and config['rename_to'] is not None:
tree.tag = config['rename_to']
self.logger.trace("Rename tree root -> %s", # type: ignore
config['rename_to'])
# Adding tags not required
if 'dest_parent' in config and config['dest_parent'] is not None:
# Create a new tree to add the specified tags in. After adding the
# tags, append the tree of newly created tags back into the parent.
to_write = ET.ElementTree()
if 'create_tags' in config and config['create_tags'] is not None:
self._write_add_tags(config, to_write)
parent = to_write.getroot().find(config['dest_parent'])
assert parent is not None,\
"Could not find parent '{0}' of tags for appending".format(
config['dest_parent'])
parent.append(tree)
else:
to_write = ET.ElementTree(tree)
parent = to_write.getroot()
# Grafts are not required
if 'child_grafts' in config and config['child_grafts'] is not None:
self._write_add_grafts(config, to_write)
# Write out pretty XML to make it easier to read to see if things
# have been generated correctly.
if sys.version_info < (3, 9):
from xml.dom import minidom
with open(opath, "w", encoding='utf-8') as f:
raw = ET.tostring(to_write.getroot())
pretty = minidom.parseString(raw).toprettyxml(indent=" ")
f.write(pretty)
else:
ET.indent(to_write, space="\t", level=0)
to_write.write(opath, encoding='utf-8')
def _write_add_grafts(self,
config: dict,
to_write: ET.ElementTree) -> None:
dest_root = "{0}/{1}".format(config['dest_parent'],
config['src_tag'])
graft_parent = to_write.getroot().find(dest_root)
assert graft_parent is not None, \
"Bad parent {dest_root} for grafting"
for g in config['child_grafts']:
self.logger.trace("Graft tree@%s as child under %s", # type: ignore
g,
dest_root)
elt = self.root.find(g)
graft_parent.append(elt)
def _write_add_tags(self,
config: dict,
to_write: ET.ElementTree) -> None:
for spec in config['create_tags']:
# Tree has no root set--set root to specified tag
if to_write.getroot() is None:
to_write._setroot(ET.Element(spec.tag, spec.attr))
else:
elt = to_write.find(spec.path)
assert elt is not None,\
(f"Could not find parent '{spec.path}' of tag '{spec.tag}' "
"to add")
ET.SubElement(elt, spec.tag, spec.attr)
self.logger.trace("Create tag@%s as child under %s", # type: ignore
spec.tag,
spec.path)
def _write_prepare_tree(self,
base_opath: pathlib.Path,
config: dict) -> tp.Tuple[tp.Optional[ET.Element],
str,
pathlib.Path]:
if config['src_parent'] is None:
src_root = config['src_tag']
else:
src_root = "{0}/{1}".format(config['src_parent'],
config['src_tag'])
tree_out = self.tree.getroot().find(src_root)
# Customizing the output write path is not required
if 'opath_leaf' in config and config['opath_leaf'] is not None:
opath = base_opath.with_name(base_opath.name + config['opath_leaf'])
else:
opath = base_opath
return (tree_out, src_root, opath)
__api__ = [
'AttrChange',
'AttrChangeSet',
'TagAdd',
'TagAddList',
'TagRm',
'TagRmList',
'WriterConfig'
]