Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions arc/job/pipe/pipe_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def submit_pipe_run(self, run_id: str, tasks: List[TaskSpec],
tasks=tasks,
cluster_software=cluster_software,
max_workers=pipe_settings.get('max_workers', 100),
max_concurrent=pipe_settings.get('max_concurrent'),
max_attempts=pipe_settings.get('max_attempts', 3),
pipe_root=pipe_root,
)
Expand Down
60 changes: 55 additions & 5 deletions arc/job/pipe/pipe_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import stat
import sys
import time
from numbers import Integral
from typing import Dict, List, Optional

import arc.parser.parser as parser
Expand Down Expand Up @@ -51,16 +52,30 @@ class PipeRun:
run_id (str): Unique identifier for this pipe run.
tasks (List[TaskSpec]): Task specifications to execute.
cluster_software (str): Cluster scheduler type.
max_workers (int): Maximum number of concurrent array workers.
max_workers (int): Maximum total array worker slots (array size).
max_concurrent (Optional[int]): Max workers running simultaneously
(like PBS ``%N``). ``None`` disables throttling.
max_attempts (int): Maximum retry attempts per task.
"""

@staticmethod
def _validate_max_concurrent(max_concurrent: Optional[int]) -> None:
"""Accept ``None`` or a positive integer for throttle settings."""
if max_concurrent is None:
return
if isinstance(max_concurrent, bool) or not isinstance(max_concurrent, Integral):
raise ValueError('PipeRun max_concurrent must be None or a positive integer.')
if max_concurrent > 0:
return
raise ValueError('PipeRun max_concurrent must be None or a positive integer.')

def __init__(self,
project_directory: str,
run_id: str,
tasks: List[TaskSpec],
cluster_software: str,
max_workers: int = 100,
max_concurrent: Optional[int] = None,
max_attempts: int = 3,
pipe_root: Optional[str] = None,
):
Expand All @@ -69,6 +84,8 @@ def __init__(self,
self.tasks = tasks
self.cluster_software = cluster_software
self.max_workers = max_workers
self._validate_max_concurrent(max_concurrent)
self.max_concurrent = None if max_concurrent is None else int(max_concurrent)
self.max_attempts = max_attempts
self.pipe_root = pipe_root if pipe_root is not None \
else os.path.join(project_directory, 'calcs', 'pipe_' + run_id)
Expand Down Expand Up @@ -103,6 +120,7 @@ def _save_run_metadata(self) -> None:
'status': self.status.value,
'cluster_software': self.cluster_software,
'max_workers': self.max_workers,
'max_concurrent': self.max_concurrent,
'max_attempts': self.max_attempts,
'task_family': task_family,
'engine': engine,
Expand Down Expand Up @@ -146,6 +164,7 @@ def from_dir(cls, pipe_root: str) -> 'PipeRun':
tasks=tasks,
cluster_software=data['cluster_software'],
max_workers=data.get('max_workers', 100),
max_concurrent=data.get('max_concurrent'),
max_attempts=data.get('max_attempts', 3),
pipe_root=pipe_root,
)
Expand Down Expand Up @@ -192,12 +211,41 @@ def _submission_resources(self):
Derive resource settings from the homogeneous task list.

Returns:
Tuple[int, int, int]: ``(cpus, memory_mb, array_size)``
Tuple[int, int, int, Optional[int]]:
``(cpus, memory_mb, array_size, throttle)`` where ``throttle``
caps workers running simultaneously (clamped to ``array_size``),
or ``None`` if unthrottled.
"""
cpus = self.tasks[0].required_cores if self.tasks else 1
memory_mb = self.tasks[0].required_memory_mb if self.tasks else 4096
array_size = min(self.max_workers, len(self.tasks)) if self.tasks else self.max_workers
return cpus, memory_mb, array_size
throttle = None
if self.max_concurrent is not None and self.max_concurrent > 0:
throttle = min(self.max_concurrent, array_size)
return cpus, memory_mb, array_size, throttle
Comment on lines +222 to +225
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_concurrent has implicit semantics for non-positive values (e.g., 0 disables throttling) but this isn’t clearly part of the public API contract of PipeRun. Consider normalizing/validating max_concurrent at construction time (e.g., coerce non-positive to None, or raise on negatives) and documenting the supported values in the class docstring to avoid silent configuration mistakes.

Copilot uses AI. Check for mistakes.

def _render_throttle(self, array_size: int, throttle: Optional[int]) -> Dict[str, str]:
"""
Render scheduler-specific array-range and extra-directives strings.

SLURM/PBS encode the throttle as an inline ``%K`` suffix on the range.
SGE uses a separate ``-tc`` directive. HTCondor uses ``max_materialize``
and takes a bare count (not a range) for ``queue``.
"""
cs = 'sge' if self.cluster_software == 'oge' else self.cluster_software
if cs == 'htcondor':
array_range = str(array_size)
extra = f'max_materialize = {throttle}' if throttle else ''
elif cs == 'sge':
array_range = f'1-{array_size}'
extra = f'#$ -tc {throttle}' if throttle else ''
elif cs in ('slurm', 'pbs'):
suffix = f'%{throttle}' if throttle else ''
array_range = f'1-{array_size}{suffix}'
extra = ''
else:
raise NotImplementedError(f'No throttle rendering for {self.cluster_software}')
return {'array_range': array_range, 'extra_directives': extra}

def write_submit_script(self) -> str:
"""
Expand All @@ -215,7 +263,8 @@ def write_submit_script(self) -> str:
raise NotImplementedError(
f'No pipe submit template for cluster software: {self.cluster_software}. '
f'Available templates: {list(pipe_submit.keys())}')
cpus, memory_mb, array_size = self._submission_resources()
cpus, memory_mb, array_size, throttle = self._submission_resources()
throttle_fields = self._render_throttle(array_size, throttle)
server = servers_dict.get('local', {})
queue, _ = next(iter(server.get('queues', {}).items()), ('', None))
engine = self.tasks[0].engine if self.tasks else ''
Expand All @@ -226,7 +275,8 @@ def write_submit_script(self) -> str:
env_setup = f'{env_setup}\n{scratch_export}' if env_setup else scratch_export
content = pipe_submit[template_key].format(
name=f'pipe_{self.run_id}',
max_task_num=array_size,
array_range=throttle_fields['array_range'],
extra_directives=throttle_fields['extra_directives'],
pipe_root=self.pipe_root,
python_exe=sys.executable,
cpus=cpus,
Expand Down
107 changes: 104 additions & 3 deletions arc/job/pipe/pipe_run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import tempfile
import time
import unittest
import uuid

from arc.job.adapters.mockter import MockAdapter
from arc.job.pipe.pipe_state import TaskState, PipeRunState, TaskSpec, read_task_state, update_task_state
Expand Down Expand Up @@ -134,12 +135,14 @@ def setUp(self):
def tearDown(self):
shutil.rmtree(self.tmpdir, ignore_errors=True)

def _make_run(self, cluster_software, max_workers=10, n_tasks=None):
def _make_run(self, cluster_software, max_workers=10, n_tasks=None,
max_concurrent=None, run_id=None):
n = n_tasks if n_tasks is not None else max_workers
tasks = [_make_spec(f't_{i}') for i in range(n)]
run = PipeRun(project_directory=self.tmpdir, run_id='sub_test',
run = PipeRun(project_directory=self.tmpdir,
run_id=run_id or f'sub_test_{uuid.uuid4().hex[:8]}',
tasks=tasks, cluster_software=cluster_software,
max_workers=max_workers)
max_workers=max_workers, max_concurrent=max_concurrent)
run.stage()
return run

Expand Down Expand Up @@ -170,6 +173,104 @@ def test_htcondor_content(self):
content = f.read()
self.assertIn('queue 12', content)

def test_slurm_throttle(self):
run = self._make_run('slurm', max_workers=100, n_tasks=100, max_concurrent=8)
with open(run.write_submit_script()) as f:
content = f.read()
self.assertIn('#SBATCH --array=1-100%8', content)

def test_pbs_throttle(self):
run = self._make_run('pbs', max_workers=50, n_tasks=50, max_concurrent=4)
with open(run.write_submit_script()) as f:
content = f.read()
self.assertIn('#PBS -J 1-50%4', content)

def test_sge_throttle_uses_tc_directive(self):
run = self._make_run('sge', max_workers=20, n_tasks=20, max_concurrent=5)
with open(run.write_submit_script()) as f:
content = f.read()
self.assertIn('#$ -t 1-20', content)
self.assertIn('#$ -tc 5', content)

def test_htcondor_throttle_uses_max_materialize(self):
run = self._make_run('htcondor', max_workers=12, n_tasks=12, max_concurrent=3)
with open(run.write_submit_script()) as f:
content = f.read()
self.assertIn('queue 12', content)
self.assertIn('max_materialize = 3', content)

def test_throttle_clamped_to_array_size(self):
# max_concurrent > array_size should clamp to array_size (no-op throttle).
run = self._make_run('slurm', max_workers=6, n_tasks=6, max_concurrent=99)
with open(run.write_submit_script()) as f:
content = f.read()
self.assertIn('#SBATCH --array=1-6%6', content)

def test_unthrottled_has_no_throttle_markers(self):
"""Regression guard: unthrottled submit scripts contain no throttle syntax."""
array_line_markers = {
'slurm': '#SBATCH --array=',
'pbs': '#PBS -J ',
'sge': '#$ -t ',
'htcondor': 'queue ',
}
for cs, marker in array_line_markers.items():
with self.subTest(cluster_software=cs):
run = self._make_run(cs, max_workers=10, n_tasks=10, max_concurrent=None)
with open(run.write_submit_script()) as f:
content = f.read()
array_line = next(line for line in content.splitlines() if marker in line)
self.assertNotIn('%', array_line)
self.assertNotIn('-tc ', content)
self.assertNotIn('max_materialize', content)

def test_oge_routes_to_sge_throttle(self):
"""cluster_software='oge' should use the SGE throttle directive."""
run = self._make_run('oge', max_workers=15, n_tasks=15, max_concurrent=4)
with open(run.write_submit_script()) as f:
content = f.read()
self.assertIn('#$ -t 1-15', content)
self.assertIn('#$ -tc 4', content)

def test_invalid_max_concurrent_values_raise(self):
"""Only None and positive integers are accepted."""
for max_concurrent in (-1, 0, -2, 1.5, '3', True, False):
with self.subTest(max_concurrent=max_concurrent):
with self.assertRaisesRegex(ValueError, 'max_concurrent'):
PipeRun(project_directory=self.tmpdir,
run_id=f'invalid_{uuid.uuid4().hex[:8]}',
tasks=[_make_spec('t_0')],
cluster_software='slurm',
max_concurrent=max_concurrent)

def test_render_throttle_branching_matrix(self):
"""Unit-test _render_throttle directly across scheduler × throttle combos."""
run = self._make_run('slurm', max_workers=1, n_tasks=1)
cases = [
('slurm', 100, None, {'array_range': '1-100', 'extra_directives': ''}),
('slurm', 100, 8, {'array_range': '1-100%8', 'extra_directives': ''}),
('pbs', 50, 4, {'array_range': '1-50%4', 'extra_directives': ''}),
('sge', 20, None, {'array_range': '1-20', 'extra_directives': ''}),
('sge', 20, 5, {'array_range': '1-20', 'extra_directives': '#$ -tc 5'}),
('oge', 20, 5, {'array_range': '1-20', 'extra_directives': '#$ -tc 5'}),
('htcondor', 12, None, {'array_range': '12', 'extra_directives': ''}),
('htcondor', 12, 3, {'array_range': '12', 'extra_directives': 'max_materialize = 3'}),
]
for cs, array_size, throttle, expected in cases:
with self.subTest(cluster_software=cs, throttle=throttle):
run.cluster_software = cs
self.assertEqual(run._render_throttle(array_size, throttle), expected)

def test_from_dir_round_trip_preserves_max_concurrent(self):
"""Persist max_concurrent through run.json so crash-recovered runs keep their throttle."""
run = self._make_run('slurm', max_workers=20, n_tasks=20, max_concurrent=7)
run.write_submit_script()
reloaded = PipeRun.from_dir(run.pipe_root)
self.assertEqual(reloaded.max_concurrent, 7)
with open(reloaded.write_submit_script()) as f:
content = f.read()
self.assertIn('#SBATCH --array=1-20%7', content)

def test_overwrite_is_safe(self):
run = self._make_run('slurm')
p1 = run.write_submit_script()
Expand Down
1 change: 1 addition & 0 deletions arc/settings/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@
'enabled': False, # Set to True to enable pipe mode (it is off by default, use it for large compute campaigns).
'min_tasks': 10, # Minimum batch size to trigger pipe mode.
'max_workers': 100, # Upper bound on array worker slots per PipeRun.
'max_concurrent': None, # Max array workers running simultaneously (like PBS `%N`). None = unthrottled.
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new setting comment documents only None as disabling throttling, but the implementation/tests treat max_concurrent=0 as disabled as well. Please update this comment to reflect that 0 (and potentially other non-positive values) disables throttling, or alternatively enforce/validate that max_concurrent must be None or a positive integer.

Suggested change
'max_concurrent': None, # Max array workers running simultaneously (like PBS `%N`). None = unthrottled.
'max_concurrent': None, # Max array workers running simultaneously (like PBS `%N`). None or non-positive values disable throttling.

Copilot uses AI. Check for mistakes.
'max_attempts': 3, # Retry budget per task before terminal failure.
'lease_duration_hrs': 1, # Worker lease duration in hours (default 1h).
'env_setup': {}, # Engine-specific shell setup commands, e.g.,
Expand Down
15 changes: 8 additions & 7 deletions arc/settings/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@
#SBATCH -N 1
#SBATCH -n {cpus}
#SBATCH --mem={memory}
#SBATCH --array=1-{max_task_num}
#SBATCH --array={array_range}
#SBATCH -o {pipe_root}/out_%a.txt
#SBATCH -e {pipe_root}/err_%a.txt

{extra_directives}
{env_setup}
WORKER_ID=$SLURM_ARRAY_TASK_ID

Expand All @@ -68,8 +68,8 @@
#PBS -q {queue}
#PBS -l ncpus={cpus}
#PBS -l mem={memory}mb
#PBS -J 1-{max_task_num}

#PBS -J {array_range}
{extra_directives}
{env_setup}
WORKER_ID="$PBS_ARRAY_INDEX"

Expand All @@ -80,10 +80,10 @@
#$ -q {queue}
#$ -pe smp {cpus}
#$ -l h_vmem={memory}M
#$ -t 1-{max_task_num}
#$ -t {array_range}
#$ -o {pipe_root}/out_$SGE_TASK_ID.txt
#$ -e {pipe_root}/err_$SGE_TASK_ID.txt

{extra_directives}
{env_setup}
WORKER_ID=$SGE_TASK_ID

Expand All @@ -96,7 +96,8 @@
output = {pipe_root}/out_$(Process).txt
error = {pipe_root}/err_$(Process).txt
log = {pipe_root}/condor.log
queue {max_task_num}
{extra_directives}
queue {array_range}
""",
}

Expand Down
Loading