Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 58 additions & 18 deletions clarifai/cli/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,12 @@ def upload(path, no_lockfile):


@pipeline.command()
@click.argument('path', type=click.Path(exists=True), required=False, default=None)
@click.option(
'--config',
type=click.Path(exists=True),
required=False,
help='Path to the pipeline run config file.',
help='Path to the pipeline run config file (deprecated, use PATH positional argument).',
)
@click.option('--pipeline_id', required=False, help='Pipeline ID to run.')
@click.option('--pipeline_version_id', required=False, help='Pipeline Version ID to run.')
Expand Down Expand Up @@ -187,9 +188,17 @@ def upload(path, no_lockfile):
type=click.Path(exists=True),
help='Path to JSON/YAML file containing parameter overrides.',
)
@click.option(
'--dev',
is_flag=True,
default=False,
help='Upload local code to an ephemeral dev pipeline before running. '
'Only changed steps are re-uploaded.',
)
@click.pass_context
def run(
ctx,
path,
config,
pipeline_id,
pipeline_version_id,
Expand All @@ -208,20 +217,37 @@ def run(
monitor,
override_params,
overrides_file,
dev,
):
"""Run a pipeline and monitor its progress."""
"""Run a pipeline and monitor its progress.

PATH: Optional path to a pipeline directory or config file. Default '.' (current directory).

\tWhen provided, config precedence is config-lock.yaml > config.yaml.
The --config option is accepted for backwards compatibility but PATH
is preferred.
"""
import json

from clarifai.client.pipeline import Pipeline
from clarifai.utils.cli import from_yaml, validate_context

validate_context(ctx)

# Try to load from config-lock.yaml first if no config is specified
lockfile_path = os.path.join(os.getcwd(), "config-lock.yaml")
if not config and os.path.exists(lockfile_path):
click.echo("Found config-lock.yaml, using it as default config source")
config = lockfile_path
# Resolve the config file from the positional PATH (preferred) or --config
if not config:
run_path = path or os.getcwd()
if os.path.isdir(run_path):
lockfile = os.path.join(run_path, 'config-lock.yaml')
configfile = os.path.join(run_path, 'config.yaml')
if os.path.exists(lockfile):
config = lockfile
logger.info(f"Using lockfile as config source: {config}")
elif os.path.exists(configfile):
config = configfile
logger.info(f"Using config file: {config}")
elif os.path.isfile(run_path):
config = run_path

if config:
config_data = from_yaml(config)
Expand Down Expand Up @@ -274,6 +300,18 @@ def run(
if not compute_cluster_id:
compute_cluster_id = ctx.obj.current.get('compute_cluster_id', '')

# Dev mode: upload local code to an ephemeral dev pipeline before running
if dev:
from clarifai.runners.pipelines.pipeline_builder import upload_dev_pipeline

dev_pipeline_id, dev_version_id, dev_user_id, dev_app_id = upload_dev_pipeline(path)
pipeline_id = dev_pipeline_id
pipeline_version_id = dev_version_id
user_id = dev_user_id
app_id = dev_app_id
pipeline_url = None # dev mode always uses explicit IDs

# compute_cluster_id and nodepool_id are mandatory regardless of whether pipeline_url is provided
# Auto-resolve compute cluster and nodepool from --instance if not explicitly provided
if not compute_cluster_id or not nodepool_id:
if instance:
Expand Down Expand Up @@ -375,17 +413,19 @@ def run(
argo_args_override=resources_pb2.ArgoArgsOverride(parameters=parameters)
)

if monitor:
# Monitor existing pipeline run instead of starting new one
result = pipeline.monitor_only(timeout=timeout, monitor_interval=monitor_interval)
else:
# Start new pipeline run and monitor it
result = pipeline.run(
timeout=timeout,
monitor_interval=monitor_interval,
input_args_override=input_args_override,
)
click.echo(json.dumps(result, indent=2, default=str))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you remove this intentionally?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the json was polluting the output imo.

from clarifai.utils.logging import pipeline_logging

with pipeline_logging():
if monitor:
# Monitor existing pipeline run instead of starting new one
result = pipeline.monitor_only(timeout=timeout, monitor_interval=monitor_interval)
else:
# Start new pipeline run and monitor it
result = pipeline.run(
timeout=timeout,
monitor_interval=monitor_interval,
input_args_override=input_args_override,
)


@pipeline.command()
Expand Down
69 changes: 42 additions & 27 deletions clarifai/client/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import time
import uuid
from typing import Dict, List, Optional
Expand Down Expand Up @@ -146,8 +147,6 @@ def run(
if self._runner_selector:
run_request.runner_selector.CopyFrom(self._runner_selector)

logger.info(f"Starting pipeline run for pipeline {self.pipeline_id}")

response = self.STUB.PostPipelineVersionRuns(
run_request, metadata=self.auth_helper.metadata
)
Expand All @@ -169,7 +168,7 @@ def run(
pipeline_version_run = response.pipeline_version_runs[0]
run_id = pipeline_version_run.id or self.pipeline_version_run_id

logger.info(f"Pipeline version run created with ID: {run_id}")
logger.info(f"Pipeline run started: {self.pipeline_id} (run: {run_id})")

# Monitor the run
return self._monitor_pipeline_run(run_id, timeout, monitor_interval)
Expand Down Expand Up @@ -206,6 +205,7 @@ def _monitor_pipeline_run(self, run_id: str, timeout: int, monitor_interval: int
start_time = time.time()
seen_logs = set()
current_page = 1 # Track current page for log pagination.
prev_status = None

while time.time() - start_time < timeout:
# Get run status
Expand Down Expand Up @@ -233,9 +233,6 @@ def _monitor_pipeline_run(self, run_id: str, timeout: int, monitor_interval: int
# Display new log entries and update current page
current_page = self._display_new_logs(run_id, seen_logs, current_page)

elapsed_time = time.time() - start_time
logger.info(f"Pipeline run monitoring... (elapsed {elapsed_time:.1f}s)")

# Check if we have orchestration status
if (
hasattr(pipeline_run, 'orchestration_status')
Expand All @@ -245,40 +242,42 @@ def _monitor_pipeline_run(self, run_id: str, timeout: int, monitor_interval: int
if hasattr(orch_status, 'status') and orch_status.status:
status_code = orch_status.status.code
status_name = status_code_pb2.StatusCode.Name(status_code)
logger.info(f"Pipeline run status: {status_code} ({status_name})")

# Display orchestration status details if available
if hasattr(orch_status, 'description') and orch_status.description:
logger.info(f"Orchestration status: {orch_status.description}")
# Only log when status changes
if status_code != prev_status:
prev_status = status_code
elapsed_time = time.time() - start_time
logger.info(
f"Status: {status_name.removeprefix('JOB_')} ({elapsed_time:.1f}s)"
)
if hasattr(orch_status, 'description') and orch_status.description:
logger.info(f" {orch_status.description}")

# Success codes that allow continuation: JOB_RUNNING, JOB_QUEUED
if status_code in [
status_code_pb2.JOB_QUEUED,
status_code_pb2.JOB_RUNNING,
]: # JOB_QUEUED, JOB_RUNNING
logger.info(f"Pipeline run in progress: {status_code} ({status_name})")
# Continue monitoring
]:
pass # Continue monitoring
# Successful terminal state: JOB_COMPLETED
elif status_code == status_code_pb2.JOB_COMPLETED: # JOB_COMPLETED
logger.info("Pipeline run completed successfully!")
elif status_code == status_code_pb2.JOB_COMPLETED:
return {"status": "success", "pipeline_version_run": pipeline_run_dict}
# Failure terminal states: JOB_UNEXPECTED_ERROR, JOB_FAILED
elif status_code in [
status_code_pb2.JOB_FAILED,
status_code_pb2.JOB_UNEXPECTED_ERROR,
]: # JOB_FAILED, JOB_UNEXPECTED_ERROR
]:
logger.error(
f"Pipeline run failed with status: {status_code} ({status_name})"
f"Pipeline run failed: {status_name.removeprefix('JOB_')}"
)
return {"status": "failed", "pipeline_version_run": pipeline_run_dict}
# Handle legacy SUCCESS status for backward compatibility
elif status_code == status_code_pb2.StatusCode.SUCCESS:
logger.info("Pipeline run completed successfully!")
return {"status": "success", "pipeline_version_run": pipeline_run_dict}
elif status_code != status_code_pb2.StatusCode.MIXED_STATUS:
# Log other unexpected statuses but continue monitoring
logger.warning(
f"Unexpected pipeline run status: {status_code} ({status_name}). Continuing to monitor..."
f"Unexpected pipeline run status: {status_name}. Continuing to monitor..."
)

except Exception as e:
Expand Down Expand Up @@ -322,14 +321,30 @@ def _display_new_logs(self, run_id: str, seen_logs: set, current_page: int = 1)
log_id = log_entry.url or f"{log_entry.created_at.seconds}_{log_entry.message}"
if log_id not in seen_logs:
seen_logs.add(log_id)
log_message = f"[LOG] {log_entry.message.strip()}"

# Write to file if log_file is specified, otherwise log to console
if self.log_file:
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_message + '\n')
else:
logger.info(log_message)
raw_entries = log_entry.message.strip().split('\n')

for raw in raw_entries:
# Filter: only show logs from pipeline_step.py
# and extract the "msg" property from JSON log lines.
try:
parsed = json.loads(raw)
except (ValueError, TypeError):
parsed = None

if parsed and isinstance(parsed, dict):
filename = parsed.get('filename', '')
if filename != 'pipeline_step.py':
continue
log_message = parsed.get('msg', raw)
else:
continue # skip non-JSON lines

# Write to file if log_file is specified, otherwise log to console
if self.log_file:
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_message + '\n')
else:
logger.info(log_message)

# If we got a full page (50 entries), there might be more logs on the next page
# If we got fewer than 50 entries, we've reached the end and should stay on current page
Expand Down
48 changes: 48 additions & 0 deletions clarifai/runners/pipelines/pipeline_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,3 +659,51 @@ def upload_pipeline(path: str, no_lockfile: bool = False):
except Exception as e:
logger.error(f"Pipeline upload failed: {e}")
sys.exit(1)


def upload_dev_pipeline(path: str) -> tuple[str, str, str, str]:
"""
Upload a dev variant of a pipeline, reusing unchanged steps.

Creates (or updates) a pipeline named ``{pipeline_id}-dev`` by uploading
only the steps whose local files have changed since the last upload. A
separate lockfile (``config-lock-dev.yaml``) is written so that the
production lockfile is not affected.

:param path: Path to the pipeline project directory or config.yaml
:return: Tuple of (pipeline_id, pipeline_version_id, user_id, app_id)
"""
# Resolve config path
if os.path.isdir(path):
config_path = os.path.join(path, "config.yaml")
if not os.path.exists(config_path):
raise FileNotFoundError(f"config.yaml not found in directory: {path}")
else:
config_path = path

builder = PipelineBuilder(config_path)
original_pipeline_id = builder.pipeline_id
dev_pipeline_id = f"{original_pipeline_id}-dev"
builder.pipeline_id = dev_pipeline_id

logger.info(f"Starting dev pipeline upload ({dev_pipeline_id}) from config: {config_path}")

# Step 1: Upload only changed pipeline steps (hash-based skip)
if not builder.upload_pipeline_steps():
raise RuntimeError("Failed to upload pipeline steps for dev pipeline")

# Step 2: Prepare lockfile data with step versions
lockfile_data = builder.prepare_lockfile_with_step_versions()

# Step 3: Create the dev pipeline (or new version of it)
success, pipeline_version_id = builder.create_pipeline()
if not success:
raise RuntimeError("Failed to create dev pipeline")

# Step 4: Save dev lockfile (separate from production)
lockfile_data = builder.update_lockfile_with_pipeline_info(lockfile_data, pipeline_version_id)
dev_lockfile_path = os.path.join(builder.config_dir, "config-lock-dev.yaml")
builder.save_lockfile(lockfile_data, lockfile_path=dev_lockfile_path)

logger.info(f"Dev pipeline upload complete: {dev_pipeline_id} v{pipeline_version_id}")
return dev_pipeline_id, pipeline_version_id, builder.user_id, builder.app_id
38 changes: 38 additions & 0 deletions clarifai/utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
f"thread=%(thread)d {COLORS.get('RESET')}"
)

PIPELINE_LOG_FORMAT = (
f"[%(levelname)s] {COLORS.get('TIME')}%(asctime)s{COLORS.get('RESET')} %(message)s"
)

# Create thread local storage that the format() call below uses.
# This is only used by the json_logger in the appropriate CLARIFAI_DEPLOY levels.
thread_log_info = threading.local()
Expand Down Expand Up @@ -456,5 +460,39 @@ def formatTime(self, record, datefmt=None):
return ""


class PipelineFormatter(TerminalFormatter):
"""Compact formatter for pipeline run output: no thread ID, no microseconds."""

def __init__(self):
super().__init__(PIPELINE_LOG_FORMAT)

def formatTime(self, record, datefmt=None):
try:
return datetime.datetime.fromtimestamp(record.created).strftime('%H:%M:%S')
except Exception:
return ""


def pipeline_logging():
"""Context manager that temporarily applies PipelineFormatter to the SDK logger."""
from contextlib import contextmanager

@contextmanager
def _ctx():
sdk_logger = logging.getLogger("clarifai")
old_formatters = []
for handler in sdk_logger.handlers:
old_formatters.append((handler, handler.formatter))
if isinstance(handler.formatter, TerminalFormatter):
handler.setFormatter(PipelineFormatter())
try:
yield
finally:
for handler, fmt in old_formatters:
handler.setFormatter(fmt)

return _ctx()


# the default logger for the SDK.
logger = get_logger(logger_level=os.environ.get("LOG_LEVEL", "INFO"), name="clarifai")
Loading