#!/usr/bin/env python
"""
Extract AMBER log file metadata into a structured dictionary.
This script parses AMBER log files and outputs structured metadata as JSON.
It can be used as a standalone CLI tool or imported as a module.
"""
import argparse
import json
import re
from biosim_extractor.helpers.log_utils import add_value, normalize_name, parse_value
# -------------------------
# PARSER
# -------------------------
[docs]
class AmberLogParser:
"""
Parser for AMBER log files.
"""
def __init__(self, filepath):
"""
Args:
filepath (str): Path to the AMBER log file.
"""
self.filepath = filepath
self.lines = []
self.data = {
# "Header": {},
"SimulationSettings": {},
"Results": {
"TimeSeries": [],
"Averages": {},
"RMSFluctuations": {},
"Timings": {},
},
}
# -------------------------
# PUBLIC API
# -------------------------
[docs]
def parse(self):
"""
Parse the AMBER log file.
Returns:
dict: Parsed metadata.
"""
with open(self.filepath) as f:
self.lines = f.readlines()
# self._parse_header()
self._parse_simulation_settings()
self._parse_results()
# print(json.dumps(self.data, indent=2))
return self.data
# # -------------------------
# # HEADER
# # -------------------------
# def _parse_header(self):
# for line in self.lines[:200]:
# if "=" in line:
# parts = line.split(",")[0].split("=")
# if len(parts) == 2:
# key, val = parts
# add_value(self.data["Header"], key.strip(), parse_value(val))
# -------------------------
# SIMULATION SETTINGS
# -------------------------
def _parse_simulation_settings(self):
"""
Parse simulation settings from the log file.
"""
settings = self.data["SimulationSettings"]
current_section = None
capture_cntrl = False
for line in self.lines:
stripped = line.strip()
# Stop at time series
if "NSTEP" in line and "TIME" in line:
break
# -------------------------
# &cntrl block
# -------------------------
if "&cntrl" in stripped:
capture_cntrl = True
current_section = "cntrl"
settings[current_section] = {}
continue
if capture_cntrl:
if "/" in stripped:
capture_cntrl = False
current_section = None
continue
for part in stripped.split(","):
if "=" in part:
k, v = part.split("=")
add_value(settings["cntrl"], k.strip(), parse_value(v))
continue
# -------------------------
# Colon sections
# -------------------------
if stripped.endswith(":") and "=" not in stripped:
section_name = normalize_name(stripped[:-1])
current_section = section_name
settings[current_section] = {}
continue
# -------------------------
# Key-value pairs
# -------------------------
if "=" in line:
matches = re.findall(r"([A-Za-z0-9_\-\s]+?)\s*=\s*([-\d\.E+]+)", line)
for k, v in matches:
key = normalize_name(k)
val = parse_value(v)
if current_section:
add_value(settings[current_section], key, val)
else:
add_value(settings, key, val)
# Reset section on blank line
if not stripped:
current_section = None
self._parse_file_assignments(settings)
# -------------------------
# SETTINGS: FILE ASSIGNMENTS
# -------------------------
def _parse_file_assignments(self, settings):
"""
Parse file assignments from the log file.
Args:
settings (dict): Simulation settings dictionary to update.
"""
capture = False
files = {}
pattern = r"\|\s*([A-Z0-9_]+):\s*(.+)"
for line in self.lines:
stripped = line.strip()
# Start block
if "File Assignments:" in line:
capture = True
continue
if capture:
# Stop if block ends
if not stripped or not stripped.startswith("|"):
break
match = re.search(pattern, line)
if match:
key = match.group(1).strip()
val = match.group(2).strip()
files[key] = val
if files:
settings["File_Assignments"] = files
# -------------------------
# RESULTS (ALL OUTPUT DATA)
# -------------------------
def _parse_results(self):
"""
Parse results blocks from the log file.
"""
# self._parse_time_series()
self._parse_block(
"A V E R A G E S", "R M S F L U C T U A T I O N S", "Averages"
)
# self._parse_block("R M S F L U C T U A T I O N S", "TIMINGS", "RMSFluctuations")
self._parse_timings()
# -------------------------
# TIME SERIES
# -------------------------
def _parse_time_series(self):
"""
Parse time series data from the log file.
"""
steps = []
current = {}
in_series = False
for line in self.lines:
if "NSTEP" in line and "TIME" in line:
in_series = True
if current:
steps.append(current)
current = {}
matches = re.findall(r"([A-Za-z\(\)\-]+)\s*=\s*([-\d\.E+]+)", line)
for k, v in matches:
current[k] = parse_value(v)
continue
if in_series and "=" in line:
matches = re.findall(r"([A-Za-z\(\)\-]+)\s*=\s*([-\d\.E+]+)", line)
for k, v in matches:
current[k] = parse_value(v)
if "A V E R A G E S" in line:
break
if current:
steps.append(current)
self.data["Results"]["TimeSeries"] = steps
# -------------------------
# GENERIC BLOCK PARSER
# -------------------------
def _parse_block(self, start_marker, end_marker, target_key):
"""
Parse a generic results block.
Args:
start_marker (str): Line indicating the start of the block.
end_marker (str): Line indicating the end of the block.
target_key (str): Key in the results dictionary to populate.
"""
capture = False
target = self.data["Results"][target_key]
for line in self.lines:
if start_marker in line:
capture = True
continue
if capture and "=" in line:
matches = re.findall(r"([A-Za-z\(\)\-]+)\s*=\s*([-\d\.E+]+)", line)
for k, v in matches:
add_value(target, k, parse_value(v))
if end_marker in line:
break
# -------------------------
# TIMINGS
# -------------------------
def _parse_timings(self):
"""
Parse timing information from the log file.
"""
timings = self.data["Results"]["Timings"]
pattern = r"\|\s*(.*?)\s*:\s*([-\d\.E+]+)\s*seconds"
for line in self.lines:
if "CPU time" in line or "wall time" in line:
match = re.search(pattern, line)
if match:
key = normalize_name(match.group(1))
val = parse_value(match.group(2))
add_value(timings, key, val)
# =========================
# ENTRY POINT
# =========================
[docs]
def parse_args():
"""Parse command-line arguments.
Returns:
Parsed ``argparse.Namespace`` object.
"""
parser = argparse.ArgumentParser(
description="Extract Amber log file metadata to JSON"
)
parser.add_argument("logfile", help="Path to Amber log file")
parser.add_argument("--output", "-o", help="Output file path (default: stdout)")
return parser.parse_args()
[docs]
def main():
"""Entry point: parse args, run extraction, and write output."""
args = parse_args()
parser = AmberLogParser(args.logfile)
result = parser.parse()
if args.output:
with open(args.output, "w") as f:
json.dump(result, f, indent=2)
else:
print(json.dumps(result, indent=2))
if __name__ == "__main__":
main()