Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,25 @@ public Map<String, String> getLastIngestionLogs(
GET_LOGS, () -> this.decoratedClient.getLastIngestionLogs(ingestionPipeline, after));
}

@Override
public Map<String, String> getIngestionLogs(
IngestionPipeline ingestionPipeline, String after, String runId) {
return executeWithMetering(
GET_LOGS, () -> this.decoratedClient.getIngestionLogs(ingestionPipeline, after, runId));
}

public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline) {
return this.respondWithMetering(
KILL, () -> this.decoratedClient.killIngestion(ingestionPipeline));
}

@Override
public PipelineServiceClientResponse killIngestionRun(
IngestionPipeline ingestionPipeline, String runId) {
return this.respondWithMetering(
KILL, () -> this.decoratedClient.killIngestionRun(ingestionPipeline, runId));
}

@Override
public String getPlatform() {
return this.decoratedClient.getPlatform();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,13 @@ private AppRunRecord convertPipelineStatusToAppRun(App app, PipelineStatus pipel
case SUCCESS -> AppRunRecord.Status.SUCCESS;
case FAILED, PARTIAL_SUCCESS -> AppRunRecord.Status.FAILED;
case RUNNING -> AppRunRecord.Status.RUNNING;
case STOPPED -> AppRunRecord.Status.STOPPED;
})
.withConfig(pipelineStatus.getConfig());
.withConfig(pipelineStatus.getConfig())
.withProperties(
pipelineStatus.getRunId() != null
? Map.of("pipelineRunId", pipelineStatus.getRunId())
: null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,18 +702,58 @@ public PipelineStatus getLatestPipelineStatus(IngestionPipeline ingestionPipelin
}

public PipelineStatus getPipelineStatus(String ingestionPipelineFQN, UUID pipelineStatusRunId) {
return getPipelineStatus(ingestionPipelineFQN, pipelineStatusRunId.toString());
}

public PipelineStatus getPipelineStatus(String ingestionPipelineFQN, String runId) {
IngestionPipeline ingestionPipeline = findByName(ingestionPipelineFQN, Include.NON_DELETED);
return JsonUtils.readValue(
daoCollection
.entityExtensionTimeSeriesDao()
.getExtensionByKey(
RUN_ID_EXTENSION_KEY,
pipelineStatusRunId.toString(),
runId,
ingestionPipeline.getFullyQualifiedName(),
PIPELINE_STATUS_EXTENSION),
PipelineStatus.class);
}

/**
* Upsert only the time-series record for a specific run without overwriting the pipeline-level
* current status. Use this when stopping a specific run while other runs may still be active.
* Inserts a new record if none exists for the runId, otherwise updates the existing one.
*/
@Transaction
public void updatePipelineStatusByRunId(String fqn, PipelineStatus pipelineStatus) {
IngestionPipeline ingestionPipeline = findByName(fqn, Include.NON_DELETED);
String pipelineFqn = ingestionPipeline.getFullyQualifiedName();
String json = JsonUtils.pojoToJson(pipelineStatus);
PipelineStatus storedPipelineStatus =
JsonUtils.readValue(
daoCollection
.entityExtensionTimeSeriesDao()
.getLatestExtensionByKey(
RUN_ID_EXTENSION_KEY,
pipelineStatus.getRunId(),
pipelineFqn,
PIPELINE_STATUS_EXTENSION),
PipelineStatus.class);
if (storedPipelineStatus != null) {
Comment thread
gitar-bot[bot] marked this conversation as resolved.
daoCollection
.entityExtensionTimeSeriesDao()
.updateExtensionByKey(
RUN_ID_EXTENSION_KEY,
pipelineStatus.getRunId(),
pipelineFqn,
PIPELINE_STATUS_EXTENSION,
json);
} else {
daoCollection
.entityExtensionTimeSeriesDao()
.insert(pipelineFqn, PIPELINE_STATUS_EXTENSION, PIPELINE_STATUS_JSON_SCHEMA, json);
}
}

@Transaction
public IngestionPipeline deletePipelineStatusByRunId(UUID ingestionPipelineId, UUID runId) {
IngestionPipeline ingestionPipeline = find(ingestionPipelineId, Include.NON_DELETED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.api.data.RestoreEntity;
Expand All @@ -59,6 +60,7 @@
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.schema.type.EntityHistory;
import org.openmetadata.schema.type.EntityReference;
Expand Down Expand Up @@ -94,6 +96,7 @@
import org.openmetadata.service.util.DeleteEntityResponse;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
import org.openmetadata.service.util.PipelineStatusUtils;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.WebsocketNotificationHandler;
import org.quartz.SchedulerException;
Expand Down Expand Up @@ -374,8 +377,13 @@ protected static AppRunRecord convertPipelineStatus(App app, PipelineStatus pipe
case SUCCESS -> AppRunRecord.Status.SUCCESS;
case FAILED, PARTIAL_SUCCESS -> AppRunRecord.Status.FAILED;
case RUNNING -> AppRunRecord.Status.RUNNING;
case STOPPED -> AppRunRecord.Status.STOPPED;
})
.withConfig(pipelineStatus.getConfig());
.withConfig(pipelineStatus.getConfig())
.withProperties(
pipelineStatus.getRunId() != null
? Map.of("pipelineRunId", pipelineStatus.getRunId())
: null);
}

private ResultList<AppRunRecord> sortRunsByStartTime(ResultList<AppRunRecord> runs) {
Expand Down Expand Up @@ -527,7 +535,14 @@ public Response getLastLogs(
schema = @Schema(type = "string"))
@QueryParam("after")
@DefaultValue("")
String after) {
String after,
@Parameter(
description =
"Pipeline run ID to fetch logs for a specific run. "
+ "If not provided, returns logs for the latest run.",
schema = @Schema(type = "string"))
@QueryParam("runId")
String runId) {
App installation = repository.getByName(uriInfo, name, repository.getFields("id,pipelines"));
if (installation.getAppType().equals(AppType.Internal)) {
AppRunRecord latestRun = repository.getLatestAppRunsOptional(installation).orElse(null);
Expand All @@ -544,7 +559,7 @@ public Response getLastLogs(
ingestionPipelineRepository.get(
uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));
return Response.ok(
pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, after),
pipelineServiceClient.getIngestionLogs(ingestionPipeline, after, runId),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this backward compatible? would it pick the last logs if runId is null?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's backward comatible

MediaType.APPLICATION_JSON_TYPE)
.build();
}
Expand Down Expand Up @@ -1227,7 +1242,12 @@ public Response stopApplicationRun(
@Context SecurityContext securityContext,
@Parameter(description = "Name of the App", schema = @Schema(type = "string"))
@PathParam("name")
String name) {
String name,
@Parameter(
description = "Pipeline run ID to stop a specific run",
schema = @Schema(type = "string"))
@QueryParam("runId")
String runId) {
EntityUtil.Fields fields = getFields(String.format("%s,bot,pipelines", FIELD_OWNERS));
App app = repository.getByName(uriInfo, name, fields);
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.TRIGGER);
Expand All @@ -1241,17 +1261,148 @@ public Response stopApplicationRun(
.entity("Application stop in progress. Please check status via.")
.build();
} else {
if (!app.getPipelines().isEmpty()) {
IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app);
PipelineServiceClientResponse response =
pipelineServiceClient.killIngestion(ingestionPipeline);
return Response.status(response.getCode()).entity(response).build();
if (nullOrEmpty(app.getPipelines())) {
throw new BadRequestException(
String.format(
"Application [%s] supports interrupts but has no associated pipeline configured.",
name));
}
IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app);
if (runId != null && !runId.isBlank()) {
return stopSpecificRun(uriInfo, ingestionPipeline, runId);
} else {
return stopAllRuns(app, ingestionPipeline);
}
}
}
throw new BadRequestException("Application does not support Interrupts.");
}

private Response stopSpecificRun(
UriInfo uriInfo, IngestionPipeline ingestionPipeline, String runId) {
markPipelineStatusAsStopped(ingestionPipeline, runId);
PipelineServiceClientResponse killResponse;
try {
killResponse = pipelineServiceClient.killIngestionRun(ingestionPipeline, runId);
} catch (Exception e) {
LOG.error(
"Kill request for run [{}] on pipeline [{}] failed after DB update. Workflow may still be running.",
runId,
ingestionPipeline.getFullyQualifiedName(),
e);
return Response.status(Response.Status.BAD_GATEWAY)
.entity(
new PipelineServiceClientResponse()
.withCode(Response.Status.BAD_GATEWAY.getStatusCode())
.withReason(e.getMessage())
.withPlatform(pipelineServiceClient.getPlatform()))
.build();
Comment thread
gitar-bot[bot] marked this conversation as resolved.
}
return toStopResponse(killResponse);
}

private Response stopAllRuns(App app, IngestionPipeline ingestionPipeline) {
Long runStartTime =
repository
.getLatestAppRunsOptional(app, ingestionPipeline.getService().getId())
.map(AppRunRecord::getStartTime)
.orElse(null);
markLatestPipelineStatusAsStopped(ingestionPipeline, runStartTime);
PipelineServiceClientResponse killResponse;
try {
killResponse = pipelineServiceClient.killIngestion(ingestionPipeline);
} catch (Exception e) {
LOG.error(
"Kill request for pipeline [{}] failed after DB update. Workflows may still be running.",
ingestionPipeline.getFullyQualifiedName(),
e);
return Response.status(Response.Status.BAD_GATEWAY)
.entity(
new PipelineServiceClientResponse()
.withCode(Response.Status.BAD_GATEWAY.getStatusCode())
.withReason(e.getMessage())
.withPlatform(pipelineServiceClient.getPlatform()))
.build();
}
return toStopResponse(killResponse);
}

private void markPipelineStatusAsStopped(IngestionPipeline ingestionPipeline, String runId) {
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
try {
PipelineStatus status =
ingestionPipelineRepository.getPipelineStatus(
ingestionPipeline.getFullyQualifiedName(), runId);
if (status == null) {
LOG.warn(
"Pipeline status not found in DB for run [{}] on pipeline [{}]. Proceeding with kill but DB state will remain inconsistent.",
runId,
ingestionPipeline.getFullyQualifiedName());
return;
}
if (!PipelineStatusUtils.isTerminalState(status.getPipelineState())) {
status.setPipelineState(PipelineStatusType.STOPPED);
status.setEndDate(System.currentTimeMillis());
// Use updatePipelineStatusByRunId instead of addPipelineStatus to avoid overwriting
// the pipeline-level current status. When stopping a specific run, other runs may still
// be active and their status should not be affected.
ingestionPipelineRepository.updatePipelineStatusByRunId(
ingestionPipeline.getFullyQualifiedName(), status);
}
} catch (Exception e) {
LOG.error(
"Failed to mark run [{}] as STOPPED in DB for pipeline [{}]. Kill will proceed but DB status may remain inconsistent.",
runId,
ingestionPipeline.getFullyQualifiedName(),
e);
}
}

private void markLatestPipelineStatusAsStopped(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt this implementation be based on markPipelineStatusAsStopped but passing the last runId? Otherwise seems we're duplicating logic

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored it, thanks

IngestionPipeline ingestionPipeline, Long runStartTime) {
IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
long now = System.currentTimeMillis();
long startTs = runStartTime != null ? runStartTime : now - TimeUnit.HOURS.toMillis(1);
ResultList<PipelineStatus> statuses;
try {
statuses =
ingestionPipelineRepository.listPipelineStatus(
ingestionPipeline.getFullyQualifiedName(), startTs, now);
} catch (Exception e) {
LOG.error(
"Failed to list pipeline statuses for [{}]. Kill will proceed but DB statuses may remain inconsistent.",
ingestionPipeline.getFullyQualifiedName(),
e);
return;
}
for (PipelineStatus status : statuses.getData()) {
if (status.getRunId() == null || status.getRunId().isBlank()) {
continue;
}
if (!PipelineStatusUtils.isTerminalState(status.getPipelineState())) {
markPipelineStatusAsStopped(ingestionPipeline, status.getRunId());
}
}
}

private Response toStopResponse(PipelineServiceClientResponse killResponse) {
int code = killResponse.getCode();
if (code >= 200 && code < 300) {
return Response.status(code).entity(killResponse).build();
}
if (code == 404) {
LOG.warn(
"Kill request returned 404 — workflow already completed. DB status already marked STOPPED.");
return Response.ok(killResponse).build();
}
LOG.error(
"Kill request returned unexpected code [{}]. DB status already marked STOPPED but workflow may still be running.",
code);
return Response.status(Response.Status.BAD_GATEWAY).entity(killResponse).build();
}

@POST
@Path("/deploy/{name}")
@Operation(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openmetadata.service.util;

import java.util.Set;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType;

public final class PipelineStatusUtils {

private static final Set<PipelineStatusType> TERMINAL_STATES =
Set.of(
PipelineStatusType.SUCCESS,
PipelineStatusType.FAILED,
PipelineStatusType.STOPPED,
PipelineStatusType.PARTIAL_SUCCESS);

private PipelineStatusUtils() {}

public static boolean isTerminalState(PipelineStatusType state) {
return state != null && TERMINAL_STATES.contains(state);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,23 @@ default PipelineServiceClientResponse runPipeline(
/* Get the all last run logs of a deployed pipeline */
Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after);

/* Get logs for a specific pipeline run identified by runId.
* When runId is null or blank, falls back to getLastIngestionLogs (latest run). */
default Map<String, String> getIngestionLogs(
IngestionPipeline ingestionPipeline, String after, String runId) {
return getLastIngestionLogs(ingestionPipeline, after);
}
Comment thread
Vishnuujain marked this conversation as resolved.

/* Get the all last run logs of a deployed pipeline */
PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline);

/* Stop a specific run of a deployed pipeline identified by its run ID.
* Default is a no-op: clients that do not support per-run stopping return success without
* taking any action. The DB status is already marked STOPPED before this is called. */
default PipelineServiceClientResponse killIngestionRun(
IngestionPipeline ingestionPipeline, String runId) {
return new PipelineServiceClientResponse().withCode(200).withPlatform(getPlatform());
}
Comment on lines +143 to +146
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: killIngestionRun default no-op marks DB STOPPED but never kills

The killIngestionRun default method in PipelineServiceClientInterface returns a 200 success response without actually stopping anything. None of the concrete pipeline service clients (AirflowRESTClient, K8sPipelineClient, etc.) override this method. Combined with the fact that stopSpecificRun marks the DB status as STOPPED before calling killIngestionRun, this means:

  1. The pipeline status will show STOPPED in the database
  2. The actual workflow continues running in the pipeline service
  3. The API returns 200 success to the user

This creates a silent inconsistency where the UI shows the run as stopped but it's still executing. The same issue applies to getIngestionLogs which silently ignores the runId parameter and returns latest logs instead of per-run logs.

At minimum, killIngestionRun should throw UnsupportedOperationException or return a non-200 response so callers know the feature isn't supported, rather than silently pretending it succeeded. Alternatively, the concrete clients should implement these methods.

Suggested fix:

// Option A: Make it clear the operation is unsupported
default PipelineServiceClientResponse killIngestionRun(
    IngestionPipeline ingestionPipeline, String runId) {
  throw new UnsupportedOperationException(
      "Per-run stop is not supported by " + getPlatform());
}

// Option B: Return a distinguishable response
default PipelineServiceClientResponse killIngestionRun(
    IngestionPipeline ingestionPipeline, String runId) {
  return new PipelineServiceClientResponse()
      .withCode(501).withPlatform(getPlatform())
      .withReason("Per-run stop not supported");
}

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion


String getPlatform();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"description": "Pipeline status denotes if its failed or succeeded.",
"type": "string",
"javaType": "org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType",
"enum": ["queued","success","failed","running","partialSuccess"]
"enum": ["queued","success","failed","running","partialSuccess","stopped"]
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding "stopped" to PipelineStatusType requires updating backend code paths that treat any non-(FAILED/SUCCESS/PARTIAL_SUCCESS) state as "still running". For example, RunAppImpl.waitForCompletion (and similar ingestion workflow wait loops) currently only terminates on FAILED/SUCCESS/PARTIAL_SUCCESS; a STOPPED run would keep polling until timeout. Please audit/update those completion checks to treat STOPPED as a terminal state (likely a failure) to avoid hung workflows after a stop/cancel.

Suggested change
"enum": ["queued","success","failed","running","partialSuccess","stopped"]
"enum": ["queued","success","failed","running","partialSuccess"]

Copilot uses AI. Check for mistakes.
},
"startDate": {
"description": "startDate of the pipeline run for this particular execution.",
Expand Down
Loading
Loading