#!/usr/bin/env python
"""
Extract topology and trajectory metadata using MDAnalysis.
"""
import argparse
import json
from collections import Counter
import numpy as np
from MDAnalysis import Universe
from rdkit import Chem
alternative_residue_names = {
"ALAD": "ALA",
"ASH": "ASP",
"ASPH": "ASP",
"GLH": "GLU",
"GLUH": "GLU",
"ARGN": "ARG",
"HIP": "HIS",
"HIE": "HIS",
"HID": "HIS",
"CYM": "CYS",
"CYX": "CYS",
"LYN": "LYS",
}
[docs]
def get_protein_sequence(fragment):
"""
Extract the protein sequence from a molecule fragment.
Args:
fragment (MDAnalysis.AtomGroup): Molecule fragment to analyze.
Returns:
str or None: Protein sequence as a string, or None if not found.
"""
# Check if fragment contains protein residues
try:
protein_atoms = fragment.select_atoms(
"protein and not (resname ACE or resname NME)"
)
if len(protein_atoms) > 0:
protein_residues = protein_atoms.residues
# Handle alternative residue names
for old_name, new_name in alternative_residue_names.items():
for residue in protein_residues:
if residue.resname == old_name:
residue.resname = new_name
sequence = protein_residues.sequence(id="sequence1", name="protein1")
return str(sequence.seq)
except Exception:
return None
[docs]
def get_nucleic_sequence(fragment):
"""
Extract the nucleic acid sequence from a molecule fragment.
Args:
fragment (MDAnalysis.AtomGroup): Molecule fragment to analyze.
Returns:
str or None: Nucleic acid sequence as a string, or None if not found.
"""
# Check if fragment contains nucleic acid residues
try:
dna_atoms = fragment.select_atoms("nucleic")
if len(dna_atoms) > 0:
dna_residues = dna_atoms.residues
sequence = []
for res in dna_residues.resnames:
sequence.append(res[-1] if len(res) == 2 else res)
return "".join(sequence)
except Exception:
return None
[docs]
def classify_box(dim, tolerance=1e-3):
"""
Classify the simulation box type based on dimensions and angles.
Args:
dim (list or tuple): Box dimensions [lx, ly, lz, a, b, g].
tolerance (float): Tolerance for angle/length comparison.
Returns:
str: Box type (e.g., "cubic", "tetragonal", "orthorhombic", etc.).
"""
lx, ly, lz, a, b, g = dim
def close(x, y):
return abs(x - y) < tolerance
if all(close(x, 90) for x in (a, b, g)):
if close(lx, ly) and close(ly, lz):
return "cubic"
elif close(lx, ly) or close(ly, lz) or close(lx, lz):
return "tetragonal"
else:
return "orthorhombic"
if close(a, 90) and close(b, 90) and close(g, 120):
return "orthorhombic"
if close(a, 90) and close(b, 120) and close(g, 120):
return "truncated octahedron"
return "triclinic"
TOPTRAJ_AUTO_EXTRACT = {
"total_atom_count": lambda u: safe_extract(lambda: u.atoms.n_atoms),
"total_molecule_count": lambda u: safe_extract(lambda: len(u.atoms.fragments)),
"frame_count": lambda u: safe_extract(lambda: len(u.trajectory)),
"system_charge": lambda u: safe_extract(lambda: sum(list(u.atoms.charges))),
"box_dimensions": lambda u: safe_extract(
lambda: np.mean([f.dimensions[:3] for f in u.trajectory], axis=0)
),
"box_angles": lambda u: safe_extract(
lambda: list(u.trajectory[0].dimensions[3:].flatten())
),
"box_type": lambda u: safe_extract(
lambda: classify_box(u.trajectory[0].dimensions)
),
"water": lambda u: safe_extract(lambda: len(u.select_atoms("water")) > 0),
"positions": lambda u: safe_extract(
lambda: getattr(u.trajectory.ts, "positions", None) is not None
),
"forces": lambda u: safe_extract(
lambda: getattr(u.trajectory.ts, "forces", None) is not None
),
"velocities": lambda u: safe_extract(
lambda: getattr(u.trajectory.ts, "velocities", None) is not None
),
"masses": lambda u: safe_extract(
lambda: getattr(u.atoms, "masses", None) is not None
),
"bonds": lambda u: safe_extract(lambda: getattr(u, "bonds", None) is not None),
"dihedrals": lambda u: safe_extract(
lambda: getattr(u, "dihedrals", None) is not None
),
"fixed_charges": lambda u: safe_extract(
lambda: getattr(u.atoms, "charges", None) is not None
),
}
MOLID_AUTO_EXTRACT = {
"atom_count": lambda fragment: len(fragment.atoms),
"monomer_count": lambda fragment: len(fragment.residues),
"molecule_charge": lambda fragment: safe_extract(
lambda: sum(list(fragment.atoms.charges))
),
"molecular_weight": lambda fragment: float(sum(fragment.masses))
if hasattr(fragment, "masses")
else None,
}
RDKIT_AUTO_EXTRACT = {
"InChIKey": lambda rdkit_mol: Chem.MolToInchiKey(rdkit_mol),
"SMILES": lambda rdkit_mol: Chem.MolToSmiles(rdkit_mol),
"InChI": lambda rdkit_mol: Chem.MolToInchi(rdkit_mol),
"molecular_formula": lambda rdkit_mol: Chem.rdMolDescriptors.CalcMolFormula(
rdkit_mol
),
"molecular_weight": lambda rdkit_mol: Chem.rdMolDescriptors.CalcExactMolWt(
rdkit_mol
),
}
SEQUENCE_AUTO_EXTRACT = {
"protein_sequence": lambda fragment: get_protein_sequence(fragment),
"nucleic_sequence": lambda fragment: get_nucleic_sequence(fragment),
}
[docs]
class TopTrajParser:
"""
Parse topology and trajectory files to extract system and molecule metadata.
"""
def __init__(self, toppath, trajpath):
"""
Initialize the parser with topology and trajectory file paths.
Args:
toppath (str): Path to topology file.
trajpath (str): Path to trajectory file.
"""
self.u = Universe(toppath, trajpath)
self.lines = []
self.data = {}
self.molecule_types = {}
# =========================
# MAIN ENTRY
# =========================
[docs]
def parse(self):
"""
Extract system-level metadata and molecule information.
Returns:
dict: Extracted metadata.
"""
for field, func in TOPTRAJ_AUTO_EXTRACT.items():
key = field
if callable(func):
self.data[key] = func(self.u)
else:
self.data[key] = func
self._extract_molecules()
# print(json.dumps(self.data, indent=2))
# print()
return self.data
def _extract_molecules(self):
"""
Identify unique molecule types and their representative fragments.
Returns:
None
"""
# Group by molecule signature
molecule_types = Counter(
(tuple(fragment.resnames), tuple(fragment.atoms.names))
for fragment in self.u.atoms.fragments
)
# Get first representative fragment of each type
representatives = {}
for fragment in self.u.atoms.fragments:
signature = (tuple(fragment.resnames), tuple(fragment.atoms.names))
if signature not in representatives:
representatives[signature] = {
"count": molecule_types[signature],
"fragment": fragment, # Store the actual AtomGroup
}
self.data["unique_molecule_count"] = len(representatives)
self.molecule_types = representatives
self._find_molecule_IDs()
def _find_molecule_IDs(self):
"""
Assign external IDs and extract properties for each unique molecule.
Returns:
dict: Updated data with molecule IDs.
"""
molecule_ids = {}
for i, (signature, info) in enumerate(self.molecule_types.items()):
mol_residues, mol_atoms = signature[0], signature[1]
fragment = info["fragment"]
fragment_count = info["count"]
molecule_ids[i] = {"molecule_count": fragment_count}
for field, func in MOLID_AUTO_EXTRACT.items():
key = field
if callable(func):
result = func(fragment)
if result is not None:
molecule_ids[i][key] = result
else:
molecule_ids[i][key] = func
if len(mol_atoms) == 1:
atom = fragment.atoms[0]
molecule_ids[i]["molecular_formula"] = getattr(
atom, "element", atom.name.strip("0123456789")
)
elif len(set(mol_residues)) == 1 and len(mol_atoms) > 1:
try:
# Attempt RDKit conversion first
rdkit_mol = fragment.convert_to("RDKIT")
for field, func in RDKIT_AUTO_EXTRACT.items():
key = field
if callable(func):
result = func(rdkit_mol)
if result is not None:
molecule_ids[i][key] = result
else:
molecule_ids[i][key] = func
except Exception:
continue
else:
for field, func in SEQUENCE_AUTO_EXTRACT.items():
key = field
if callable(func):
result = func(fragment)
if result is not None:
molecule_ids[i][key] = result
else:
molecule_ids[i][key] = func
self.data["molecule_ids"] = molecule_ids
return self.data
# # =========================
# # USAGE
# # =========================
# if __name__ == "__main__":
# topology = sys.argv[1]
# trajectory = sys.argv[2]
# parser = TopTrajParser(topology, trajectory)
# result = parser.parse()
# print(json.dumps(result, indent=2))
# =========================
# ENTRY POINT
# =========================
[docs]
def parse_args():
"""Parse command-line arguments.
Returns:
Parsed ``argparse.Namespace`` object.
"""
parser = argparse.ArgumentParser(
description="Extract topology and trajectory files metadata to JSON"
)
parser.add_argument("topology", help="Path to topology file")
parser.add_argument("trajectory", nargs="+", help="Path to trajectory file/s")
parser.add_argument("--output", "-o", help="Output file path (default: stdout)")
return parser.parse_args()
[docs]
def main():
"""Entry point: parse args, run extraction, and write output."""
args = parse_args()
parser = TopTrajParser(args.topology, args.trajectory)
result = parser.parse()
if args.output:
with open(args.output, "w") as f:
json.dump(result, f, indent=2)
else:
print(json.dumps(result, indent=2))
if __name__ == "__main__":
main()