Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,10 @@ build/*

# AI Agent files
AGENTS.md
CLAUDE.md

# Provenance related
provenance.yml
provenance_graph.yml
provenance.svg
*.dot
21 changes: 21 additions & 0 deletions arc/job/pipe/pipe_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,16 @@ def ingest_pipe_results(self, pipe: PipeRun) -> None:
if state.status == TaskState.COMPLETED.value:
ingest_completed_task(pipe.run_id, pipe.pipe_root, spec, state,
self.sched.species_dict, self.sched.output)
self._update_graph_for_pipe_task(spec, status='done')
elif state.status == TaskState.FAILED_ESS.value:
self._eject_to_scheduler(pipe, spec, state)
self._update_graph_for_pipe_task(spec, status='errored')
ejected_count += 1
elif state.status == TaskState.FAILED_TERMINAL.value:
logger.error(f'Pipe run {pipe.run_id}, task {spec.task_id}: '
f'failed terminally (failure_class={state.failure_class}). '
f'Manual troubleshooting required.')
self._update_graph_for_pipe_task(spec, status='errored')
elif state.status == TaskState.CANCELLED.value:
logger.warning(f'Pipe run {pipe.run_id}, task {spec.task_id}: '
f'was cancelled.')
Expand All @@ -290,6 +293,24 @@ def ingest_pipe_results(self, pipe: PipeRun) -> None:
else:
self._post_ingest_pipe_run(pipe)

def _update_graph_for_pipe_task(self, spec: TaskSpec, status: str) -> None:
"""Update the provenance graph calc node for a completed/failed pipe task."""
graph = getattr(self.sched, 'graph', None)
if graph is None:
return
label = spec.owner_key
meta = spec.ingestion_metadata or {}
job_type = TASK_FAMILY_TO_JOB_TYPE.get(spec.task_family, spec.task_family)
# Build the job_name the scheduler would have used for this task.
conf_idx = meta.get('conformer_index')
if conf_idx is not None:
job_name = f'{job_type}_{conf_idx}'
else:
job_name = spec.task_id # fallback to pipe task_id
calc_nid = graph.find_calc_node(label, job_name)
if calc_nid is not None:
graph.update_node(calc_nid, status=status)

def _post_ingest_pipe_run(self, pipe: PipeRun) -> None:
"""
Trigger family-specific post-processing after all tasks in a pipe run
Expand Down
65 changes: 65 additions & 0 deletions arc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from arc.job.ssh import SSHClient
from arc.output import write_output_yml
from arc.processor import process_arc_project, resolve_neb_level
from arc.provenance import DecisionKind, EdgeType
from arc.reaction import ARCReaction
from arc.scheduler import Scheduler
from arc.species.converter import str_to_xyz
Expand Down Expand Up @@ -671,6 +672,70 @@ def execute(self) -> dict:
log_footer(execution_time=self.execution_time)
return status_dict

def _add_arkane_provenance_nodes(self):
"""Add Arkane computation and result nodes to the provenance graph.

For each converged species with thermo results, creates:
convergence_confirmed → calc(statmech_thermo) → data(thermo)

For each converged reaction with kinetics results, creates:
convergence_confirmed → calc(statmech_kinetics) → data(kinetics)
"""
graph = self.scheduler.graph
for spc in self.scheduler.species_dict.values():
if spc.is_ts or getattr(spc.thermo, 'H298', None) is None:
continue
spc_nid = graph.find_species_node(spc.label)
if spc_nid is None:
continue
# Insert a CalculationNode for the Arkane thermo computation.
calc_nid = graph.add_calculation_node(
label=spc.label,
job_name='arkane_thermo',
job_type='statmech_thermo',
job_adapter=self.thermo_adapter,
status='done',
)
graph.add_edge(spc_nid, calc_nid, EdgeType.belongs_to)
# Link from convergence gate if it exists.
conv_nodes = graph.query(decision_kind=DecisionKind.convergence_confirmed, label=spc.label)
for conv_node in conv_nodes:
graph.add_edge(conv_node.node_id, calc_nid, EdgeType.triggered_by)
thermo_nid = graph.add_data_node(
label=spc.label,
data_kind='thermo',
value=f'H298={spc.thermo.H298:.1f} kJ/mol, S298={spc.thermo.S298:.1f} J/mol/K',
)
graph.add_edge(calc_nid, thermo_nid, EdgeType.output_of)
for rxn in self.scheduler.rxn_list:
if rxn.kinetics is None:
continue
ts_nid = graph.find_species_node(rxn.ts_label)
if ts_nid is None:
continue
# Insert a CalculationNode for the Arkane kinetics computation.
calc_nid = graph.add_calculation_node(
label=rxn.ts_label,
job_name='arkane_kinetics',
job_type='statmech_kinetics',
job_adapter=self.kinetics_adapter,
status='done',
)
graph.add_edge(ts_nid, calc_nid, EdgeType.belongs_to)
# Link from TS convergence gate if it exists.
conv_nodes = graph.query(decision_kind=DecisionKind.convergence_confirmed, label=rxn.ts_label)
for conv_node in conv_nodes:
graph.add_edge(conv_node.node_id, calc_nid, EdgeType.triggered_by)
ea = rxn.kinetics.get('Ea')
ea_str = f', Ea={ea[0]:.1f} {ea[1]}' if ea else ''
kinetics_nid = graph.add_data_node(
label=rxn.ts_label,
data_kind='kinetics',
value=f'{rxn.label}{ea_str}',
)
graph.add_edge(calc_nid, kinetics_nid, EdgeType.output_of)
graph.save(self.scheduler.graph_path)

def save_project_info_file(self):
"""
Save a project info file.
Expand Down
Loading