Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ private AppRunRecord convertPipelineStatusToAppRun(App app, PipelineStatus pipel
case SUCCESS -> AppRunRecord.Status.SUCCESS;
case FAILED, PARTIAL_SUCCESS -> AppRunRecord.Status.FAILED;
case RUNNING -> AppRunRecord.Status.RUNNING;
case STOPPED -> AppRunRecord.Status.STOPPED;
})
.withConfig(pipelineStatus.getConfig());
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Outdated
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.schema.type.EntityHistory;
import org.openmetadata.schema.type.EntityReference;
Expand Down Expand Up @@ -374,8 +375,13 @@ protected static AppRunRecord convertPipelineStatus(App app, PipelineStatus pipe
case SUCCESS -> AppRunRecord.Status.SUCCESS;
case FAILED, PARTIAL_SUCCESS -> AppRunRecord.Status.FAILED;
case RUNNING -> AppRunRecord.Status.RUNNING;
case STOPPED -> AppRunRecord.Status.STOPPED;
})
.withConfig(pipelineStatus.getConfig());
.withConfig(pipelineStatus.getConfig())
.withProperties(
pipelineStatus.getRunId() != null
? Map.of("pipelineRunId", pipelineStatus.getRunId())
: null);
}

private ResultList<AppRunRecord> sortRunsByStartTime(ResultList<AppRunRecord> runs) {
Expand Down Expand Up @@ -1227,7 +1233,12 @@ public Response stopApplicationRun(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the App", schema = @Schema(type = "string"))
@PathParam("name")
String name) {
String name,
@Parameter(
description = "Pipeline run ID (Argo workflow UID) to stop a specific run",
Comment thread
Vishnuujain marked this conversation as resolved.
Outdated
schema = @Schema(type = "string"))
@QueryParam("runId")
String runId) {
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Outdated
EntityUtil.Fields fields = getFields(String.format("%s,bot,pipelines", FIELD_OWNERS));
App app = repository.getByName(uriInfo, name, fields);
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.TRIGGER);
Expand All @@ -1243,15 +1254,81 @@ public Response stopApplicationRun(
} else {
if (!app.getPipelines().isEmpty()) {
IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app);
PipelineServiceClientResponse response =
pipelineServiceClient.killIngestion(ingestionPipeline);
return Response.status(response.getCode()).entity(response).build();
if (runId != null && !runId.isBlank()) {
markPipelineStatusAsStopped(uriInfo, ingestionPipeline, runId);
PipelineServiceClientResponse response =
pipelineServiceClient.killIngestionRun(ingestionPipeline, runId);
return Response.status(response.getCode()).entity(response).build();
} else {
markLatestPipelineStatusAsStopped(uriInfo, ingestionPipeline);
PipelineServiceClientResponse response =
pipelineServiceClient.killIngestion(ingestionPipeline);
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopApplicationRun updates the stored PipelineStatus to STOPPED before calling the pipeline service kill operation and returns whatever status code the kill call produces. If the kill call fails (non-2xx / throws), the DB may still show the run as STOPPED, which is inaccurate. Consider only marking the status as STOPPED after a successful kill response, or recording a distinct intermediate state (if available) when the stop request is issued but not yet confirmed.

Suggested change
markPipelineStatusAsStopped(uriInfo, ingestionPipeline, runId);
PipelineServiceClientResponse response =
pipelineServiceClient.killIngestionRun(ingestionPipeline, runId);
return Response.status(response.getCode()).entity(response).build();
} else {
markLatestPipelineStatusAsStopped(uriInfo, ingestionPipeline);
PipelineServiceClientResponse response =
pipelineServiceClient.killIngestion(ingestionPipeline);
PipelineServiceClientResponse response =
pipelineServiceClient.killIngestionRun(ingestionPipeline, runId);
if (response.getCode() >= 200 && response.getCode() < 300) {
markPipelineStatusAsStopped(uriInfo, ingestionPipeline, runId);
}
return Response.status(response.getCode()).entity(response).build();
} else {
PipelineServiceClientResponse response =
pipelineServiceClient.killIngestion(ingestionPipeline);
if (response.getCode() >= 200 && response.getCode() < 300) {
markLatestPipelineStatusAsStopped(uriInfo, ingestionPipeline);
}

Copilot uses AI. Check for mistakes.
return Response.status(response.getCode()).entity(response).build();
}
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new runId query parameter path and the DB-side status marking logic (markPipelineStatusAsStopped / markLatestPipelineStatusAsStopped) introduce new behavior that isn't covered by existing backend tests. Please add service-level tests for stopApplicationRun covering: stopping with a specific runId (updates only that run when kill succeeds), stopping without runId (fallback behavior), and failure cases (kill failure should not incorrectly mark runs as STOPPED).

Copilot generated this review using guidance from repository custom instructions.
}
}
}
throw new BadRequestException("Application does not support Interrupts.");
}

private void markPipelineStatusAsStopped(
UriInfo uriInfo, IngestionPipeline ingestionPipeline, String runId) {
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Outdated
try {
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
PipelineStatus status =
ingestionPipelineRepository.getPipelineStatus(
ingestionPipeline.getFullyQualifiedName(), UUID.fromString(runId));
if (status == null) {
LOG.warn("Pipeline status not found for run {}, skipping DB update", runId);
return;
}
if (!isTerminalState(status.getPipelineState())) {
status.setPipelineState(PipelineStatusType.STOPPED);
status.setEndDate(System.currentTimeMillis());
ingestionPipelineRepository.addPipelineStatus(
uriInfo, ingestionPipeline.getFullyQualifiedName(), status);
}
} catch (Exception e) {
LOG.warn(
"Failed to mark pipeline run {} as stopped, continuing with kill: {}",
runId,
e.getMessage());
}
}

private void markLatestPipelineStatusAsStopped(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt this implementation be based on markPipelineStatusAsStopped but passing the last runId? Otherwise seems we're duplicating logic

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored it, thanks

UriInfo uriInfo, IngestionPipeline ingestionPipeline) {
try {
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
ResultList<PipelineStatus> recentStatuses =
ingestionPipelineRepository.listPipelineStatus(
ingestionPipeline.getFullyQualifiedName(), null, null, 20);
Copy link
Copy Markdown
Collaborator

@TeddyCr TeddyCr Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand we want to change the status of the running pipeline from queue, running, etc. to stopped? Instead of fetching the last 20 status, why not fetch the statuses for the timestamp between start and now to have something more deterministic?

Listing the last 20 statuses feel fragile.

long now = System.currentTimeMillis();
for (PipelineStatus status : recentStatuses.getData()) {
if (!isTerminalState(status.getPipelineState())) {
status.setPipelineState(PipelineStatusType.STOPPED);
status.setEndDate(now);
ingestionPipelineRepository.addPipelineStatus(
uriInfo, ingestionPipeline.getFullyQualifiedName(), status);
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

markLatestPipelineStatusAsStopped calls listPipelineStatus(...), which merges queued statuses from pipelineServiceClient.getQueuedPipelineStatus(...). Those queued entries may not be persisted and/or may not have a non-null runId, but addPipelineStatus(...) requires pipelineStatus.getRunId() as the time-series key. This can cause the whole marking loop to fail (caught and logged) and skip DB updates. Consider only iterating DB-backed statuses (avoid the merged queued list), or at least skip entries with null/blank runId and isolate failures per-status so one bad entry doesn't prevent updating others.

Suggested change
long now = System.currentTimeMillis();
for (PipelineStatus status : recentStatuses.getData()) {
if (!isTerminalState(status.getPipelineState())) {
status.setPipelineState(PipelineStatusType.STOPPED);
status.setEndDate(now);
ingestionPipelineRepository.addPipelineStatus(
uriInfo, ingestionPipeline.getFullyQualifiedName(), status);
if (recentStatuses == null || recentStatuses.getData() == null) {
return;
}
long now = System.currentTimeMillis();
for (PipelineStatus status : recentStatuses.getData()) {
if (status == null || isTerminalState(status.getPipelineState())) {
continue;
}
String runId = status.getRunId();
if (nullOrEmpty(runId)) {
LOG.warn(
"Skipping pipeline status without runId while marking runs as stopped for pipeline {}",
ingestionPipeline.getFullyQualifiedName());
continue;
}
try {
status.setPipelineState(PipelineStatusType.STOPPED);
status.setEndDate(now);
ingestionPipelineRepository.addPipelineStatus(
uriInfo, ingestionPipeline.getFullyQualifiedName(), status);
} catch (RuntimeException e) {
LOG.warn(
"Failed to mark pipeline run {} as stopped, continuing with remaining runs: {}",
runId,
e.getMessage());

Copilot uses AI. Check for mistakes.
}
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Outdated
}
} catch (Exception e) {
LOG.warn("Failed to mark pipeline runs as stopped, continuing with kill: {}", e.getMessage());
}
}

private static boolean isTerminalState(PipelineStatusType state) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should that be a utility method instead of a method of the AppResource?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't used much, but we can move it in util

if (state == null) {
return false;
}
return state == PipelineStatusType.SUCCESS
|| state == PipelineStatusType.FAILED
|| state == PipelineStatusType.STOPPED
|| state == PipelineStatusType.PARTIAL_SUCCESS;
}

@POST
@Path("/deploy/{name}")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,11 @@ default PipelineServiceClientResponse runPipeline(
/* Get the all last run logs of a deployed pipeline */
PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline);

/* Stop a specific run of a deployed pipeline identified by its run ID (Argo workflow UID) */
default PipelineServiceClientResponse killIngestionRun(
IngestionPipeline ingestionPipeline, String runId) {
return killIngestion(ingestionPipeline);
}
Copy link
Copy Markdown
Collaborator

@TeddyCr TeddyCr Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not using the runId here if I am not mistaking. We should make it noop for clarity

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, runId is unused in the default intentional fallback for non-Argo clients that don't support per-run stopping

Comment on lines +143 to +146
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: killIngestionRun default no-op marks DB STOPPED but never kills

The killIngestionRun default method in PipelineServiceClientInterface returns a 200 success response without actually stopping anything. None of the concrete pipeline service clients (AirflowRESTClient, K8sPipelineClient, etc.) override this method. Combined with the fact that stopSpecificRun marks the DB status as STOPPED before calling killIngestionRun, this means:

  1. The pipeline status will show STOPPED in the database
  2. The actual workflow continues running in the pipeline service
  3. The API returns 200 success to the user

This creates a silent inconsistency where the UI shows the run as stopped but it's still executing. The same issue applies to getIngestionLogs which silently ignores the runId parameter and returns latest logs instead of per-run logs.

At minimum, killIngestionRun should throw UnsupportedOperationException or return a non-200 response so callers know the feature isn't supported, rather than silently pretending it succeeded. Alternatively, the concrete clients should implement these methods.

Suggested fix:

// Option A: Make it clear the operation is unsupported
default PipelineServiceClientResponse killIngestionRun(
    IngestionPipeline ingestionPipeline, String runId) {
  throw new UnsupportedOperationException(
      "Per-run stop is not supported by " + getPlatform());
}

// Option B: Return a distinguishable response
default PipelineServiceClientResponse killIngestionRun(
    IngestionPipeline ingestionPipeline, String runId) {
  return new PipelineServiceClientResponse()
      .withCode(501).withPlatform(getPlatform())
      .withReason("Per-run stop not supported");
}

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion


String getPlatform();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"description": "Pipeline status denotes if its failed or succeeded.",
"type": "string",
"javaType": "org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType",
"enum": ["queued","success","failed","running","partialSuccess"]
"enum": ["queued","success","failed","running","partialSuccess","stopped"]
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding "stopped" to PipelineStatusType requires updating backend code paths that treat any non-(FAILED/SUCCESS/PARTIAL_SUCCESS) state as "still running". For example, RunAppImpl.waitForCompletion (and similar ingestion workflow wait loops) currently only terminates on FAILED/SUCCESS/PARTIAL_SUCCESS; a STOPPED run would keep polling until timeout. Please audit/update those completion checks to treat STOPPED as a terminal state (likely a failure) to avoid hung workflows after a stop/cancel.

Suggested change
"enum": ["queued","success","failed","running","partialSuccess","stopped"]
"enum": ["queued","success","failed","running","partialSuccess"]

Copilot uses AI. Check for mistakes.
},
"startDate": {
"description": "startDate of the pipeline run for this particular execution.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export interface StopScheduleRunModalProps {
appName: string;
isModalOpen: boolean;
displayName: string;
runId?: string;
onClose: () => void;
onStopWorkflowsUpdate?: () => void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const StopScheduleModal: FC<StopScheduleRunModalProps> = ({
appName,
isModalOpen,
displayName,
runId,
onClose,
onStopWorkflowsUpdate,
}) => {
Expand All @@ -32,7 +33,7 @@ const StopScheduleModal: FC<StopScheduleRunModalProps> = ({
const handleConfirm = async () => {
setIsLoading(true);
try {
const { status } = await stopApp(appName);
const { status } = await stopApp(appName, runId);
if (status === 200) {
showSuccessToast(
t('message.application-stop', {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ const AppRunsHistory = forwardRef(
>([]);
const [expandedRowKeys, setExpandedRowKeys] = useState<string[]>([]);
const [isStopModalOpen, setIsStopModalOpen] = useState<boolean>(false);
const [selectedRunId, setSelectedRunId] = useState<string | undefined>(
undefined
);
const [showConfigModal, setShowConfigModal] = useState<boolean>(false);
const [appRunRecordConfig, setAppRunRecordConfig] = useState<
AppRunRecord['config']
Expand Down Expand Up @@ -198,16 +201,23 @@ const AppRunsHistory = forwardRef(
onClick={() => showAppRunConfig(record)}>
{t('label.config')}
</Button>
{/* For status running or activewitherror and supportsInterrupt is true, show stop button */}
{(record.status === Status.Running ||
record.status === Status.ActiveError) &&
{record.status !== Status.Success &&
record.status !== Status.Failed &&
record.status !== Status.Stopped &&
record.status !== Status.Completed &&
Boolean(appData?.supportsInterrupt) && (
Comment thread
gitar-bot[bot] marked this conversation as resolved.
<Button
className="m-l-xs p-0"
data-testid="stop-button"
size="small"
type="link"
onClick={() => setIsStopModalOpen(true)}>
onClick={() => {
const rawRunId = record.properties?.pipelineRunId;
setSelectedRunId(
typeof rawRunId === 'string' ? rawRunId : undefined
);
setIsStopModalOpen(true);
}}>
Comment on lines +209 to +226
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stop button is now shown whenever status is not one of Success/Failed/Stopped/Completed. This includes synthetic rows (where record.status is undefined) and Status.StopInProgress, which can surface a stop action when there is no specific run to stop (leading to runId being undefined and the backend fallback kill-all path). Consider restricting the stop button to explicit stoppable statuses (e.g., Running/ActiveError/Started/Pending as intended), excluding record.isSynthetic and Status.StopInProgress, and only enabling per-run stop when record.properties.pipelineRunId is present for external pipeline-backed runs.

Copilot uses AI. Check for mistakes.
Comment on lines +209 to +226
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New per-run stop behavior (capturing pipelineRunId from record.properties and passing it into the stop modal) is not covered by the existing AppRunsHistory tests. Please add/adjust unit tests to verify: (1) stop button visibility rules for terminal statuses / synthetic rows / stop-in-progress, and (2) clicking Stop calls the stop API with the expected runId when record.properties.pipelineRunId is present (and does not offer per-run stop when it is absent).

Copilot generated this review using guidance from repository custom instructions.
{t('label.stop')}
</Button>
)}
Expand Down Expand Up @@ -452,8 +462,10 @@ const AppRunsHistory = forwardRef(
appName={fqn}
displayName={appData?.displayName ?? ''}
isModalOpen={isStopModalOpen}
runId={selectedRunId}
onClose={() => {
setIsStopModalOpen(false);
setSelectedRunId(undefined);
}}
onStopWorkflowsUpdate={() => {
fetchAppHistory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ export enum PipelineState {
PartialSuccess = "partialSuccess",
Queued = "queued",
Running = "running",
Stopped = "stopped",
Success = "success",
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,10 @@ export const restoreApp = async (id: string) => {
return response.data;
};

export const stopApp = async (name: string) => {
return await APIClient.post(`${BASE_URL}/stop/${getEncodedFqn(name)}`);
export const stopApp = async (name: string, runId?: string) => {
const params = runId ? `?runId=${encodeURIComponent(runId)}` : '';

return await APIClient.post(`${BASE_URL}/stop/${getEncodedFqn(name)}${params}`);
};

export const getApplicationLogs = (appName: string, after?: string) => {
Expand Down
Loading