Source code for biosim_extractor.metadata.populatemetadata
#!/usr/bin/env python3
"""
Extract and populate metadata from a single MD engine log file, validated against the biosim-schema.
Preserves canonical casing from schema mappings.
"""
import argparse
import json
from typing import Any, Dict
from biosim_extractor.amber.amberlog import AmberLogParser
from biosim_extractor.gromacs.gromacslog import GromacsLogParser
from biosim_extractor.helpers.metadata_utils import round_floats
from biosim_extractor.mdanalysis.toptraj import TopTrajParser
from biosim_extractor.metadata.fetchschema import get_schema, update_schema
from biosim_extractor.metadata.filemetadata import files_metadata, group_files
from biosim_extractor.metadata.validatemetadata import validate_metadata
from biosim_extractor.units.unitconversion import UnitConverter
# -----------------------------
# Utility functions
# -----------------------------
[docs]
def flatten_dict(d: Dict) -> Dict:
"""Recursively flatten a nested dict, keeping the first occurrence of duplicate keys.
Args:
d: Nested dictionary to flatten.
Returns:
Single-level dictionary with all leaf key-value pairs.
"""
items = {}
for k, v in d.items():
if isinstance(v, dict):
items.update(flatten_dict(v))
else:
if k not in items:
items[k] = v
return items
[docs]
def get_by_path(d: Dict, path: str):
"""Retrieve a value from a nested dict using a dot-separated path.
Args:
d: Dictionary to traverse.
path: Dot-separated key path, e.g. ``"SimulationMetadata.timestep"``.
Returns:
Value at the path, or ``None`` if any key is missing.
"""
keys = path.split(".")
for key in keys:
if not isinstance(d, dict):
return None
if key not in d:
return None
d = d[key]
return d
[docs]
def assign_by_path(d: Dict, path: str, value: Any):
"""Set a value in a nested dict at a dot-separated path, creating intermediate dicts as needed.
Args:
d: Dictionary to modify in place.
path: Dot-separated key path.
value: Value to assign at the final key.
"""
keys = path.split(".")
for key in keys[:-1]:
if key not in d or not isinstance(d[key], dict):
d[key] = {}
d = d[key]
d[keys[-1]] = value
[docs]
def add_to_path(d: Dict, path: str, value: Any):
"""Append a value to a list in a nested dict at a dot-separated path.
Args:
d: Dictionary to modify in place.
path: Dot-separated key path pointing to an existing list.
value: Value to append.
"""
keys = path.split(".")
for key in keys[:-1]:
if key not in d or not isinstance(d[key], dict):
d[key] = {}
d = d[key]
d[keys[-1]].append(value)
[docs]
def is_numeric(value):
"""Check whether a value can be interpreted as a float.
Args:
value: Value to test.
Returns:
``True`` if ``float(value)`` succeeds, ``False`` otherwise.
"""
try:
float(value)
return True
except (ValueError, TypeError):
return False
[docs]
def remove_null_parents(d):
"""Recursively remove any dict that contains a ``None`` value.
Args:
d: Dictionary to clean.
Returns:
Cleaned dictionary with ``None``-containing dicts removed, or ``None`` if
the top-level dict itself contains a ``None`` value.
"""
if not isinstance(d, dict):
return d
if any(v is None for v in d.values()):
return None
cleaned = {}
for k, v in d.items():
result = remove_null_parents(v)
if result is not None:
cleaned[k] = result
return cleaned
# -----------------------------
# VALUE NORMALISATION
# -----------------------------
[docs]
def normalize_key(value: Any) -> str:
"""Normalise a value to lowercase stripped string for case-insensitive matching.
Args:
value: Value to normalise.
Returns:
Lowercased, stripped string representation.
"""
return str(value).strip().lower()
[docs]
def transform_value(value: Any, rules: Dict):
"""Map a raw engine value to its canonical schema equivalent using a rules dict.
Args:
value: Raw value from the engine data.
rules: Mapping of raw keys to canonical values (empty dict skips mapping).
Returns:
Canonical mapped value, or ``None`` if the value has no matching rule.
"""
if not rules:
return value
# case-insensitive matching
norm_value = normalize_key(value)
normalised_keys = []
for key, mapped in rules.items():
normalised_keys.append(normalize_key(key))
if normalize_key(key) == norm_value:
if isinstance(mapped, list) and mapped:
# return canonical casing exactly as defined
return mapped[0]
return mapped
if norm_value not in normalised_keys:
return None
return value # fallback (unchanged, but not lowercased)
# -----------------------------
# Main class
# -----------------------------
[docs]
class MetadataPopulator:
"""Orchestrates extraction of MD engine metadata and population of metadata validated against the biosim schema.
Supports log-file-based engines (Amber, GROMACS) and topology/trajectory parsing via MDAnalysis.
"""
def __init__(
self,
schema_path=None,
log_file=None,
engine=None,
top_file=None,
traj_file=None,
store_file_metadata=True,
):
"""Orchestrates extraction of MD engine metadata from log files (Amber/GROMACS)
or topology/trajectory inputs, mapping results to the `biosim-schema` format.
Features supported by this class:
- Reads logs via specialized parsers (`AmberLogParser`, etc.).
- Flattens nested structures and applies reverse-forward schema mappings.
- Handles unit conversions where required (e.g., kcal/mol → eV).
- Stores metadata about input files if `store_file_metadata=True`.
Use cases:
1. Batch processing logs via CLI (`--logfile`, `--engine`).
2. Populating a single simulation from top/traj without logs via MDAnalysis integration.
Args (constructor):
schema_path: Path to the engine mapping JSON file.
log_file: Optional MD engine log file path.
engine: MD engine name, such as "amber" or "gromacs".
top_file: Optional topology file path.
traj_file: Optional trajectory file path or list of trajectory paths.
store_file_metadata: If True, include input file metadata in the output.
"""
self.schema_path = schema_path
self.log_file = log_file
self.top_file = top_file
self.traj_file = traj_file
self.store_file_metadata = store_file_metadata
self.engine = engine
if engine:
self.engine = engine.lower()
self.converter = UnitConverter()
self.schema = {}
self.engine_data = {}
self.data = {}
[docs]
def populate(self):
"""Run the full extraction and mapping pipeline.
Returns:
Populated ``SimulationMetadata`` dictionary with ``None``-containing
entries removed.
"""
self.load_schema()
if self.engine:
self.engine_data = self.parse_log()
if self.engine not in self.schema.get("reverse", {}):
raise ValueError(f"No reversemapping found for engine: {self.engine}")
if self.engine not in self.schema.get("forward", {}):
raise ValueError(f"No forwardmapping found forengine: {self.engine}")
self.data = self.apply_mapping()
if self.top_file and self.traj_file:
self.data = self.populate_toptraj()
# self.data["SimulationMetadata"]["@type"] = "SimulationMetadata"
result = self.data["SimulationMetadata"]
# save file metadata in dict
if self.store_file_metadata:
saved_files = {}
if self.log_file:
saved_files = group_files([self.log_file], saved_files, role="log")
if self.top_file and self.traj_file:
saved_files = group_files([self.top_file], saved_files, role="topology")
saved_files = group_files(
self.traj_file, saved_files, role="trajectory"
)
result["files"] = files_metadata(saved_files)
# Remove any dict that contains a None field
result = remove_null_parents(result) or {}
# Round all floats
result = round_floats(result, decimals=2)
return result
[docs]
def validate(self, result, biosimschema_path=None, strict=False):
"""Validate populated metadata against the biosim schema.
Args:
result: Populated metadata dictionary to validate.
biosimschema_path: Optional path to the biosim schema YAML.
strict: If ``True``, raise on warnings in addition to errors.
"""
validate_metadata(result, biosimschema_path, strict)
[docs]
def load_schema(self):
"""Load and parse the extraction schema JSON from ``self.schema_path``."""
with open(self.schema_path) as f:
self.schema = json.load(f)
[docs]
def parse_log(self):
"""Parse the MD engine log file and return a flattened parameter dictionary.
Returns:
Flat dictionary of parameter names to raw values.
Raises:
ValueError: If ``self.engine`` is not a supported engine.
"""
if self.engine == "amber":
parser = AmberLogParser(self.log_file)
# raw = parser.parse()["SimulationSettings"]
elif self.engine == "gromacs":
parser = GromacsLogParser(self.log_file)
# raw = parser.parse()["Input Parameters"]
else:
raise ValueError(f"Unsupported engine: {self.engine}")
raw = parser.parse()
# print(json.dumps(raw, indent=2))
# print("----------")
# print(json.dumps(flatten_dict(raw), indent=2))
return flatten_dict(raw)
[docs]
def populate_toptraj(self):
"""Parse topology and trajectory files and apply schema mapping.
Returns:
Schema-mapped result dictionary, or ``None`` if topology/trajectory
files are not set.
"""
if self.top_file and self.traj_file:
parser = TopTrajParser(self.top_file, self.traj_file)
self.engine_data = parser.parse()
self.engine = "toptrajparser"
return self.apply_mapping()
# -----------------------------
# Mapping logic
# -----------------------------
[docs]
def apply_mapping(self) -> Dict:
"""
Apply mapping rules to engine data to produce schema-compliant output.
Returns:
Result dictionary with mapped schema values applied.
"""
engine_data = self.engine_data
reverse_mapping = self.schema["reverse"][self.engine]
forward_mapping = self.schema["forward"][self.engine]
result = self.data
for param, config in reverse_mapping.items():
if param not in engine_data:
continue
raw_value = engine_data[param]
for path, rules in config.get("by_path", {}).items():
is_multivalued = (
config.get("path_metadata", {})
.get(path, {})
.get("multivalued", False)
)
mapped_value = transform_value(raw_value, rules)
if (
is_multivalued
and mapped_value is not None
and not isinstance(mapped_value, list)
):
mapped_value = [mapped_value]
if len(rules) == 0: # check for unit conversion
for term in forward_mapping[path]:
if "unit" in term and term["key"] == param:
# Check if this is a vector value based on path or value type
is_vector = (
"box_dimensions" in path
or "box_angles" in path
or "vector" in path.lower()
or (
isinstance(mapped_value, list)
and len(mapped_value) > 1
)
)
# Handle both single values and lists uniformly
mapped_value = self.convert_values(
mapped_value, term, is_vector
)
existing = get_by_path(result, path)
if existing is None:
assign_by_path(result, path, mapped_value)
elif (
isinstance(existing, dict)
and "value" in existing
and isinstance(mapped_value, dict)
and "value" in mapped_value
):
# Second key hit same path — promote scalar to vector
assign_by_path(
result,
path,
{
"vector_value": [existing["value"], mapped_value["value"]],
"value_unit": existing["value_unit"],
},
)
elif (
isinstance(existing, dict)
and "vector_value" in existing
and isinstance(mapped_value, dict)
and "value" in mapped_value
):
# Third+ key — append to existing vector
existing["vector_value"].append(mapped_value["value"])
else:
try:
add_to_path(result, path, mapped_value)
except (KeyError, AttributeError):
continue
# Special handling for molecule_ids with full transformation pipeline
if "molecule_ids" in engine_data:
molecules_list = []
for _mol_index, mol_data in engine_data["molecule_ids"].items():
transformed_molecule = {}
# Process each property of the molecule through the transformation pipeline
for prop_name, prop_value in mol_data.items():
# Check if this molecule property has mapping rules
if prop_name in reverse_mapping:
config = reverse_mapping[prop_name]
for path, rules in config.get("by_path", {}).items():
is_multivalued = (
config.get("path_metadata", {})
.get(path, {})
.get("multivalued", False)
)
mapped_value = transform_value(prop_value, rules)
# check if multivalued
if (
is_multivalued
and mapped_value is not None
and not isinstance(mapped_value, list)
):
mapped_value = [mapped_value]
# Check for unit conversion
if len(rules) == 0:
for term in forward_mapping[path]:
if "unit" in term and term["key"] == prop_name:
mapped_value = self.convert_values(
mapped_value, term
)
# Use the final path segment as the key
final_key = path.split(".")[-1]
transformed_molecule[final_key] = mapped_value
else:
# No mapping found in schema, skip
continue
molecules_list.append(transformed_molecule)
# Assign the transformed molecules list to the schema path
assign_by_path(
result, "SimulationMetadata.composition.molecule_ID", molecules_list
)
return result
[docs]
def convert_values(self, value, term, is_vector=False):
"""
Convert a raw value (or list) to a unit-annotated schema dictionary.
Args:
value: Numeric value or list of values to convert.
term: Forward-mapping entry containing ``"unit"`` and ``"key"``.
is_vector: If ``True``, stores the result under ``"vector_value"`` instead of ``"value"``.
Returns:
Dictionary with ``"value"`` (or ``"vector_value"``) and ``"value_unit"`` keys.
"""
unit = (
self.converter.get_target_unit(term["unit"])
if self.converter.needs_conversion(term["unit"])
else term["unit"]
)
if isinstance(value, list):
# Handle list of values (vectors)
converted_values = (
[self.converter.convert(v, term["unit"]) for v in value]
if self.converter.needs_conversion(term["unit"])
else value
)
if is_vector:
return {"vector_value": converted_values, "value_unit": unit}
else:
return {"value": converted_values, "value_unit": unit}
else:
# Handle single value
if is_numeric(value):
converted_value = (
self.converter.convert(value, term["unit"])
if self.converter.needs_conversion(term["unit"])
else value
)
return {"value": converted_value, "value_unit": unit}
else:
return {"value": value, "value_unit": unit}
[docs]
def resolve_schema_inputs(args):
"""Resolve mapping and biosim schema paths from args or remote schema bundle.
If either path argument (`mappingschema`, `biosimschema`) is missing, the function
fetches a bundled schema release (optionally updating if requested). This ensures
downstream processing has valid JSON/YAML sources without requiring manual caching setup
"""
mapping_path = args.mappingschema
biosim_path = args.biosimschema
# If either path is missing, fetch a schema bundle and fill defaults.
if not mapping_path or not biosim_path:
bundle = (
update_schema(
version=args.schema_version,
cache_dir=args.schema_cache_dir,
)
if args.update_schema
else get_schema(
version=args.schema_version,
cache_dir=args.schema_cache_dir,
)
)
mapping_path = mapping_path or str(bundle.mapping_json)
biosim_path = biosim_path or str(bundle.schema_yaml)
return mapping_path, biosim_path
# -----------------------------
# Entry point
# -----------------------------
[docs]
def parse_args():
"""Parse command-line arguments.
Returns:
Parsed ``argparse.Namespace`` object.
"""
parser = argparse.ArgumentParser(
description="Populate biosim-schema with MD engine data"
)
parser.add_argument(
"mappingschema",
nargs="?",
help="Path to engine mapping schema JSON (optional if schema bundle is fetched).",
)
parser.add_argument(
"--biosimschema",
help="Path to biosim schema YAML (optional if schema bundle is fetched).",
)
parser.add_argument(
"--schema-version",
default="latest",
help="biosim-schema release version/tag to fetch (default: latest).",
)
parser.add_argument(
"--schema-cache-dir",
default=None,
help="Directory for cached biosim-schema bundles (default: BIOSIM_SCHEMA_CACHE_DIR or /tmp/biosim-schema-cache).",
)
parser.add_argument(
"--update-schema",
action="store_true",
help="Force refresh of cached schema bundle before use.",
)
parser.add_argument("--engine", help="MD engine (amber, gromacs, etc.)")
parser.add_argument("--logfile", help="Path to MD log file")
parser.add_argument("--top", help="Topology file path")
parser.add_argument("--traj", nargs="+", help="Trajectory file path")
parser.add_argument("--config", help="Configuration file path")
parser.add_argument(
"--exclude-file-metadata",
action="store_false",
help="Include flag to include file metadata in metadata output",
)
parser.add_argument(
"--file-metadata",
dest="store_file_metadata",
action=argparse.BooleanOptionalAction,
default=True,
help="Include file metadata in output (use --no-file-metadata to disable).",
)
parser.add_argument("--output", "-o", help="Output file path")
return parser.parse_args()
[docs]
def main():
"""Entry point: parse args, resolve schema sources, run pipeline, validate, write output."""
args = parse_args()
mapping_path, biosim_path = resolve_schema_inputs(args)
populator = MetadataPopulator(
schema_path=mapping_path,
log_file=args.logfile,
engine=args.engine,
top_file=args.top,
traj_file=args.traj,
store_file_metadata=args.store_file_metadata,
)
result = populator.populate()
populator.validate(result, biosimschema_path=biosim_path)
if args.output:
with open(args.output, "w") as f:
json.dump(result, f, indent=2)
else:
print(json.dumps(result, indent=2))
if __name__ == "__main__":
main()