From df32d44bdabf8889c09e0cea380f8da0ba249bba Mon Sep 17 00:00:00 2001 From: Vishnu Jain Date: Mon, 6 Apr 2026 19:42:02 +0530 Subject: [PATCH 01/13] Add stopped state support for pipeline status and stop/cancel AI agents --- .../DataInsightSystemChartRepository.java | 1 + .../service/resources/apps/AppResource.java | 35 +++++++++++++++++++ .../ingestionPipelines/ingestionPipeline.json | 2 +- .../AppRunsHistory.component.tsx | 4 ++- 4 files changed, 40 insertions(+), 2 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java index f63fc06bdd41..4d71d5324ea3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java @@ -549,6 +549,7 @@ private AppRunRecord convertPipelineStatusToAppRun(App app, PipelineStatus pipel case SUCCESS -> AppRunRecord.Status.SUCCESS; case FAILED, PARTIAL_SUCCESS -> AppRunRecord.Status.FAILED; case RUNNING -> AppRunRecord.Status.RUNNING; + case STOPPED -> AppRunRecord.Status.STOPPED; }) .withConfig(pipelineStatus.getConfig()); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java index 5ab4f8ac5e5e..c46ef5e091b0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java @@ -59,6 +59,7 @@ import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse; import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType; import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection; import org.openmetadata.schema.type.EntityHistory; import org.openmetadata.schema.type.EntityReference; @@ -374,6 +375,7 @@ protected static AppRunRecord convertPipelineStatus(App app, PipelineStatus pipe case SUCCESS -> AppRunRecord.Status.SUCCESS; case FAILED, PARTIAL_SUCCESS -> AppRunRecord.Status.FAILED; case RUNNING -> AppRunRecord.Status.RUNNING; + case STOPPED -> AppRunRecord.Status.STOPPED; }) .withConfig(pipelineStatus.getConfig()); } @@ -1243,6 +1245,7 @@ public Response stopApplicationRun( } else { if (!app.getPipelines().isEmpty()) { IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app); + markLatestPipelineStatusAsStopped(uriInfo, ingestionPipeline); PipelineServiceClientResponse response = pipelineServiceClient.killIngestion(ingestionPipeline); return Response.status(response.getCode()).entity(response).build(); @@ -1252,6 +1255,38 @@ public Response stopApplicationRun( throw new BadRequestException("Application does not support Interrupts."); } + /** + * Mark the latest non-terminal pipeline status as "stopped" so the exit handler + * (which always receives "Failed" from Argo) will see a terminal state and skip its update. + */ + private void markLatestPipelineStatusAsStopped( + UriInfo uriInfo, IngestionPipeline ingestionPipeline) { + try { + IngestionPipelineRepository ingestionPipelineRepository = + (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + PipelineStatus latestStatus = + ingestionPipelineRepository.getLatestPipelineStatus(ingestionPipeline); + if (latestStatus != null && !isTerminalState(latestStatus.getPipelineState())) { + latestStatus.setPipelineState(PipelineStatusType.STOPPED); + latestStatus.setEndDate(System.currentTimeMillis()); + ingestionPipelineRepository.addPipelineStatus( + uriInfo, ingestionPipeline.getFullyQualifiedName(), latestStatus); + } + } catch (Exception e) { + LOG.warn("Failed to mark pipeline status as stopped, continuing with kill: {}", e.getMessage()); + } + } + + private static boolean isTerminalState(PipelineStatusType state) { + if (state == null) { + return false; + } + return state == PipelineStatusType.SUCCESS + || state == PipelineStatusType.FAILED + || state == PipelineStatusType.STOPPED + || state == PipelineStatusType.PARTIAL_SUCCESS; + } + @POST @Path("/deploy/{name}") @Operation( diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json index 835bebffa633..6008e2856ca2 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json @@ -26,7 +26,7 @@ "description": "Pipeline status denotes if its failed or succeeded.", "type": "string", "javaType": "org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType", - "enum": ["queued","success","failed","running","partialSuccess"] + "enum": ["queued","success","failed","running","partialSuccess","stopped"] }, "startDate": { "description": "startDate of the pipeline run for this particular execution.", diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx index 0523e5b3dad2..a3ecdc65b857 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx @@ -198,8 +198,10 @@ const AppRunsHistory = forwardRef( onClick={() => showAppRunConfig(record)}> {t('label.config')} - {/* For status running or activewitherror and supportsInterrupt is true, show stop button */} + {/* For status running, pending, started or activewitherror and supportsInterrupt is true, show stop button */} {(record.status === Status.Running || + record.status === Status.Pending || + record.status === Status.Started || record.status === Status.ActiveError) && Boolean(appData?.supportsInterrupt) && ( - {/* For status running, pending, started or activewitherror and supportsInterrupt is true, show stop button */} - {(record.status === Status.Running || - record.status === Status.Pending || - record.status === Status.Started || - record.status === Status.ActiveError) && + {record.status !== Status.Success && + record.status !== Status.Failed && + record.status !== Status.Stopped && + record.status !== Status.Completed && Boolean(appData?.supportsInterrupt) && ( )} @@ -454,8 +462,10 @@ const AppRunsHistory = forwardRef( appName={fqn} displayName={appData?.displayName ?? ''} isModalOpen={isStopModalOpen} + runId={selectedRunId} onClose={() => { setIsStopModalOpen(false); + setSelectedRunId(undefined); }} onStopWorkflowsUpdate={() => { fetchAppHistory(); diff --git a/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts b/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts index 5b373d77e8a9..407d4a4e5a24 100644 --- a/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts +++ b/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts @@ -155,8 +155,10 @@ export const restoreApp = async (id: string) => { return response.data; }; -export const stopApp = async (name: string) => { - return await APIClient.post(`${BASE_URL}/stop/${getEncodedFqn(name)}`); +export const stopApp = async (name: string, runId?: string) => { + return await APIClient.post(`${BASE_URL}/stop/${getEncodedFqn(name)}`, null, { + params: runId ? { runId } : undefined, + }); }; export const getApplicationLogs = (appName: string, after?: string) => { From c9914e2c27e22fb40a11887a9a7039b19674ea5c Mon Sep 17 00:00:00 2001 From: Vishnu Jain Date: Mon, 6 Apr 2026 20:49:34 +0530 Subject: [PATCH 04/13] Fix HTTP 415 on stop by appending runId as URL param instead of POST body --- .../src/main/resources/ui/src/rest/applicationAPI.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts b/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts index 407d4a4e5a24..94417c177060 100644 --- a/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts +++ b/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts @@ -156,9 +156,9 @@ export const restoreApp = async (id: string) => { }; export const stopApp = async (name: string, runId?: string) => { - return await APIClient.post(`${BASE_URL}/stop/${getEncodedFqn(name)}`, null, { - params: runId ? { runId } : undefined, - }); + const params = runId ? `?runId=${encodeURIComponent(runId)}` : ''; + + return await APIClient.post(`${BASE_URL}/stop/${getEncodedFqn(name)}${params}`); }; export const getApplicationLogs = (appName: string, after?: string) => { From 454f45529fd3d544b809796a9004e558080a9b2f Mon Sep 17 00:00:00 2001 From: Vishnu Jain Date: Mon, 6 Apr 2026 21:33:22 +0530 Subject: [PATCH 05/13] Address gitar review: validate UUID format, document limit, fix HTTP 415 --- .../service/resources/apps/AppResource.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java index 03c3aa44cccb..ce269e898b00 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java @@ -1273,12 +1273,19 @@ public Response stopApplicationRun( private void markPipelineStatusAsStopped( UriInfo uriInfo, IngestionPipeline ingestionPipeline, String runId) { + UUID pipelineRunId; + try { + pipelineRunId = UUID.fromString(runId); + } catch (IllegalArgumentException e) { + LOG.warn("runId '{}' is not a valid UUID, skipping DB status update", runId); + return; + } try { IngestionPipelineRepository ingestionPipelineRepository = (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); PipelineStatus status = ingestionPipelineRepository.getPipelineStatus( - ingestionPipeline.getFullyQualifiedName(), UUID.fromString(runId)); + ingestionPipeline.getFullyQualifiedName(), pipelineRunId); if (status == null) { LOG.warn("Pipeline status not found for run {}, skipping DB update", runId); return; @@ -1302,6 +1309,8 @@ private void markLatestPipelineStatusAsStopped( try { IngestionPipelineRepository ingestionPipelineRepository = (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + // Limit to 20: covers any burst of concurrent runs while avoiding unbounded DB scans. + // This path is only hit when no specific runId is provided (fallback kill-all). ResultList recentStatuses = ingestionPipelineRepository.listPipelineStatus( ingestionPipeline.getFullyQualifiedName(), null, null, 20); From a2b2aecd17833c6191e56f7b31282d711a0cf7d9 Mon Sep 17 00:00:00 2001 From: Vishnu Jain Date: Tue, 7 Apr 2026 00:49:24 +0530 Subject: [PATCH 06/13] Fix killIngestionRun delegation and code review issues --- .../pipeline/MeteredPipelineServiceClient.java | 7 +++++++ .../service/jdbi3/IngestionPipelineRepository.java | 6 +++++- .../service/resources/apps/AppResource.java | 13 +++---------- .../main/resources/ui/src/rest/applicationAPI.ts | 6 +++--- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java index efc9e3ebd09f..46e570b5b415 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java @@ -141,6 +141,13 @@ public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPi KILL, () -> this.decoratedClient.killIngestion(ingestionPipeline)); } + @Override + public PipelineServiceClientResponse killIngestionRun( + IngestionPipeline ingestionPipeline, String runId) { + return this.respondWithMetering( + KILL, () -> this.decoratedClient.killIngestionRun(ingestionPipeline, runId)); + } + @Override public String getPlatform() { return this.decoratedClient.getPlatform(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java index a6ad876c7eab..a1df85356ace 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java @@ -702,13 +702,17 @@ public PipelineStatus getLatestPipelineStatus(IngestionPipeline ingestionPipelin } public PipelineStatus getPipelineStatus(String ingestionPipelineFQN, UUID pipelineStatusRunId) { + return getPipelineStatus(ingestionPipelineFQN, pipelineStatusRunId.toString()); + } + + public PipelineStatus getPipelineStatus(String ingestionPipelineFQN, String runId) { IngestionPipeline ingestionPipeline = findByName(ingestionPipelineFQN, Include.NON_DELETED); return JsonUtils.readValue( daoCollection .entityExtensionTimeSeriesDao() .getExtensionByKey( RUN_ID_EXTENSION_KEY, - pipelineStatusRunId.toString(), + runId, ingestionPipeline.getFullyQualifiedName(), PIPELINE_STATUS_EXTENSION), PipelineStatus.class); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java index ce269e898b00..7964fa1e1165 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java @@ -1273,19 +1273,12 @@ public Response stopApplicationRun( private void markPipelineStatusAsStopped( UriInfo uriInfo, IngestionPipeline ingestionPipeline, String runId) { - UUID pipelineRunId; - try { - pipelineRunId = UUID.fromString(runId); - } catch (IllegalArgumentException e) { - LOG.warn("runId '{}' is not a valid UUID, skipping DB status update", runId); - return; - } try { IngestionPipelineRepository ingestionPipelineRepository = (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); PipelineStatus status = ingestionPipelineRepository.getPipelineStatus( - ingestionPipeline.getFullyQualifiedName(), pipelineRunId); + ingestionPipeline.getFullyQualifiedName(), runId); if (status == null) { LOG.warn("Pipeline status not found for run {}, skipping DB update", runId); return; @@ -1296,7 +1289,7 @@ private void markPipelineStatusAsStopped( ingestionPipelineRepository.addPipelineStatus( uriInfo, ingestionPipeline.getFullyQualifiedName(), status); } - } catch (Exception e) { + } catch (RuntimeException e) { LOG.warn( "Failed to mark pipeline run {} as stopped, continuing with kill: {}", runId, @@ -1323,7 +1316,7 @@ private void markLatestPipelineStatusAsStopped( uriInfo, ingestionPipeline.getFullyQualifiedName(), status); } } - } catch (Exception e) { + } catch (RuntimeException e) { LOG.warn("Failed to mark pipeline runs as stopped, continuing with kill: {}", e.getMessage()); } } diff --git a/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts b/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts index 94417c177060..ab5e86e24d34 100644 --- a/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts +++ b/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts @@ -156,9 +156,9 @@ export const restoreApp = async (id: string) => { }; export const stopApp = async (name: string, runId?: string) => { - const params = runId ? `?runId=${encodeURIComponent(runId)}` : ''; - - return await APIClient.post(`${BASE_URL}/stop/${getEncodedFqn(name)}${params}`); + return await APIClient.post(`${BASE_URL}/stop/${getEncodedFqn(name)}`, undefined, { + params: runId ? { runId } : undefined, + }); }; export const getApplicationLogs = (appName: string, after?: string) => { From dd4bcf78f9d57641481087f6385de26de7b37d5a Mon Sep 17 00:00:00 2001 From: Vishnu Jain Date: Tue, 7 Apr 2026 01:05:15 +0530 Subject: [PATCH 07/13] fix lint-src --- .../src/main/resources/ui/src/rest/applicationAPI.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts b/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts index ab5e86e24d34..10010ff693a8 100644 --- a/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts +++ b/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts @@ -156,9 +156,13 @@ export const restoreApp = async (id: string) => { }; export const stopApp = async (name: string, runId?: string) => { - return await APIClient.post(`${BASE_URL}/stop/${getEncodedFqn(name)}`, undefined, { - params: runId ? { runId } : undefined, - }); + return await APIClient.post( + `${BASE_URL}/stop/${getEncodedFqn(name)}`, + undefined, + { + params: runId ? { runId } : undefined, + } + ); }; export const getApplicationLogs = (appName: string, after?: string) => { From c877d69516163c063f32f656ad93f14e55965055 Mon Sep 17 00:00:00 2001 From: Vishnu Jain Date: Tue, 7 Apr 2026 15:28:44 +0530 Subject: [PATCH 08/13] Improvements around stop/cancel run error handling and robustness --- .../DataInsightSystemChartRepository.java | 6 +- .../service/resources/apps/AppResource.java | 154 +++++++++++++----- .../service/util/PipelineStatusUtils.java | 33 ++++ .../sdk/PipelineServiceClientInterface.java | 6 +- 4 files changed, 156 insertions(+), 43 deletions(-) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/util/PipelineStatusUtils.java diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java index 4d71d5324ea3..ee99e1dddee0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java @@ -551,7 +551,11 @@ private AppRunRecord convertPipelineStatusToAppRun(App app, PipelineStatus pipel case RUNNING -> AppRunRecord.Status.RUNNING; case STOPPED -> AppRunRecord.Status.STOPPED; }) - .withConfig(pipelineStatus.getConfig()); + .withConfig(pipelineStatus.getConfig()) + .withProperties( + pipelineStatus.getRunId() != null + ? Map.of("pipelineRunId", pipelineStatus.getRunId()) + : null); } /** diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java index 7964fa1e1165..51425569db27 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java @@ -47,6 +47,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.ServiceEntityInterface; import org.openmetadata.schema.api.data.RestoreEntity; @@ -95,6 +96,7 @@ import org.openmetadata.service.util.DeleteEntityResponse; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.OpenMetadataConnectionBuilder; +import org.openmetadata.service.util.PipelineStatusUtils; import org.openmetadata.service.util.RestUtil; import org.openmetadata.service.util.WebsocketNotificationHandler; import org.quartz.SchedulerException; @@ -1252,83 +1254,155 @@ public Response stopApplicationRun( .entity("Application stop in progress. Please check status via.") .build(); } else { - if (!app.getPipelines().isEmpty()) { - IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app); - if (runId != null && !runId.isBlank()) { - markPipelineStatusAsStopped(uriInfo, ingestionPipeline, runId); - PipelineServiceClientResponse response = - pipelineServiceClient.killIngestionRun(ingestionPipeline, runId); - return Response.status(response.getCode()).entity(response).build(); - } else { - markLatestPipelineStatusAsStopped(uriInfo, ingestionPipeline); - PipelineServiceClientResponse response = - pipelineServiceClient.killIngestion(ingestionPipeline); - return Response.status(response.getCode()).entity(response).build(); - } + if (nullOrEmpty(app.getPipelines())) { + throw new BadRequestException( + String.format( + "Application [%s] supports interrupts but has no associated pipeline configured.", + name)); + } + IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app); + if (runId != null && !runId.isBlank()) { + return stopSpecificRun(uriInfo, ingestionPipeline, runId); + } else { + return stopAllRuns(uriInfo, app, ingestionPipeline); } } } throw new BadRequestException("Application does not support Interrupts."); } + private Response stopSpecificRun( + UriInfo uriInfo, IngestionPipeline ingestionPipeline, String runId) { + markPipelineStatusAsStopped(uriInfo, ingestionPipeline, runId); + PipelineServiceClientResponse killResponse; + try { + killResponse = pipelineServiceClient.killIngestionRun(ingestionPipeline, runId); + } catch (Exception e) { + LOG.error( + "Kill request for run [{}] on pipeline [{}] failed after DB update. Workflow may still be running.", + runId, + ingestionPipeline.getFullyQualifiedName(), + e); + return Response.status(Response.Status.BAD_GATEWAY) + .entity( + new PipelineServiceClientResponse() + .withCode(Response.Status.BAD_GATEWAY.getStatusCode()) + .withReason(e.getMessage()) + .withPlatform(pipelineServiceClient.getPlatform())) + .build(); + } + return toStopResponse(killResponse); + } + + private Response stopAllRuns(UriInfo uriInfo, App app, IngestionPipeline ingestionPipeline) { + Long runStartTime = + repository + .getLatestAppRunsOptional(app, ingestionPipeline.getService().getId()) + .map(AppRunRecord::getStartTime) + .orElse(null); + markLatestPipelineStatusAsStopped(uriInfo, ingestionPipeline, runStartTime); + PipelineServiceClientResponse killResponse; + try { + killResponse = pipelineServiceClient.killIngestion(ingestionPipeline); + } catch (Exception e) { + LOG.error( + "Kill request for pipeline [{}] failed after DB update. Workflows may still be running.", + ingestionPipeline.getFullyQualifiedName(), + e); + return Response.status(Response.Status.BAD_GATEWAY) + .entity( + new PipelineServiceClientResponse() + .withCode(Response.Status.BAD_GATEWAY.getStatusCode()) + .withReason(e.getMessage()) + .withPlatform(pipelineServiceClient.getPlatform())) + .build(); + } + return toStopResponse(killResponse); + } + private void markPipelineStatusAsStopped( UriInfo uriInfo, IngestionPipeline ingestionPipeline, String runId) { + IngestionPipelineRepository ingestionPipelineRepository = + (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); try { - IngestionPipelineRepository ingestionPipelineRepository = - (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); PipelineStatus status = ingestionPipelineRepository.getPipelineStatus( ingestionPipeline.getFullyQualifiedName(), runId); if (status == null) { - LOG.warn("Pipeline status not found for run {}, skipping DB update", runId); + LOG.warn( + "Pipeline status not found in DB for run [{}] on pipeline [{}]. Proceeding with kill but DB state will remain inconsistent.", + runId, + ingestionPipeline.getFullyQualifiedName()); return; } - if (!isTerminalState(status.getPipelineState())) { + if (!PipelineStatusUtils.isTerminalState(status.getPipelineState())) { status.setPipelineState(PipelineStatusType.STOPPED); status.setEndDate(System.currentTimeMillis()); ingestionPipelineRepository.addPipelineStatus( uriInfo, ingestionPipeline.getFullyQualifiedName(), status); } - } catch (RuntimeException e) { - LOG.warn( - "Failed to mark pipeline run {} as stopped, continuing with kill: {}", + } catch (Exception e) { + LOG.error( + "Failed to mark run [{}] as STOPPED in DB for pipeline [{}]. Kill will proceed but DB status may remain inconsistent.", runId, - e.getMessage()); + ingestionPipeline.getFullyQualifiedName(), + e); } } private void markLatestPipelineStatusAsStopped( - UriInfo uriInfo, IngestionPipeline ingestionPipeline) { + UriInfo uriInfo, IngestionPipeline ingestionPipeline, Long runStartTime) { + IngestionPipelineRepository ingestionPipelineRepository = + (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + long now = System.currentTimeMillis(); + long startTs = runStartTime != null ? runStartTime : now - TimeUnit.HOURS.toMillis(1); + ResultList statuses; try { - IngestionPipelineRepository ingestionPipelineRepository = - (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); - // Limit to 20: covers any burst of concurrent runs while avoiding unbounded DB scans. - // This path is only hit when no specific runId is provided (fallback kill-all). - ResultList recentStatuses = + statuses = ingestionPipelineRepository.listPipelineStatus( - ingestionPipeline.getFullyQualifiedName(), null, null, 20); - long now = System.currentTimeMillis(); - for (PipelineStatus status : recentStatuses.getData()) { - if (!isTerminalState(status.getPipelineState())) { + ingestionPipeline.getFullyQualifiedName(), startTs, now); + } catch (Exception e) { + LOG.error( + "Failed to list pipeline statuses for [{}]. Kill will proceed but DB statuses may remain inconsistent.", + ingestionPipeline.getFullyQualifiedName(), + e); + return; + } + for (PipelineStatus status : statuses.getData()) { + if (status.getRunId() == null || status.getRunId().isBlank()) { + continue; + } + if (!PipelineStatusUtils.isTerminalState(status.getPipelineState())) { + try { status.setPipelineState(PipelineStatusType.STOPPED); status.setEndDate(now); ingestionPipelineRepository.addPipelineStatus( uriInfo, ingestionPipeline.getFullyQualifiedName(), status); + } catch (Exception e) { + LOG.error( + "Failed to mark run [{}] as STOPPED for pipeline [{}]. Kill will proceed but this run's DB status remains inconsistent.", + status.getRunId(), + ingestionPipeline.getFullyQualifiedName(), + e); } } - } catch (RuntimeException e) { - LOG.warn("Failed to mark pipeline runs as stopped, continuing with kill: {}", e.getMessage()); } } - private static boolean isTerminalState(PipelineStatusType state) { - if (state == null) { - return false; + private Response toStopResponse(PipelineServiceClientResponse killResponse) { + int code = killResponse.getCode(); + if (code >= 200 && code < 300) { + return Response.status(code).entity(killResponse).build(); + } + if (code == 404) { + LOG.warn( + "Kill request returned 404 — workflow already completed. DB status already marked STOPPED."); + return Response.ok(killResponse).build(); } - return state == PipelineStatusType.SUCCESS - || state == PipelineStatusType.FAILED - || state == PipelineStatusType.STOPPED - || state == PipelineStatusType.PARTIAL_SUCCESS; + LOG.error( + "Kill request returned unexpected code [{}]. DB status already marked STOPPED but workflow may still be running.", + code); + return Response.status(Response.Status.BAD_GATEWAY).entity(killResponse).build(); } @POST diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/PipelineStatusUtils.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/PipelineStatusUtils.java new file mode 100644 index 000000000000..eb41a2c5367a --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/PipelineStatusUtils.java @@ -0,0 +1,33 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.util; + +import java.util.Set; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType; + +public final class PipelineStatusUtils { + + private static final Set TERMINAL_STATES = + Set.of( + PipelineStatusType.SUCCESS, + PipelineStatusType.FAILED, + PipelineStatusType.STOPPED, + PipelineStatusType.PARTIAL_SUCCESS); + + private PipelineStatusUtils() {} + + public static boolean isTerminalState(PipelineStatusType state) { + return state != null && TERMINAL_STATES.contains(state); + } +} diff --git a/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java b/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java index 99547547d7e9..84766cffa8f0 100644 --- a/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java +++ b/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java @@ -130,10 +130,12 @@ default PipelineServiceClientResponse runPipeline( /* Get the all last run logs of a deployed pipeline */ PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline); - /* Stop a specific run of a deployed pipeline identified by its run ID (Argo workflow UID) */ + /* Stop a specific run of a deployed pipeline identified by its run ID (Argo workflow UID). + * Default is a no-op: clients that do not support per-run stopping return success without + * taking any action. The DB status is already marked STOPPED before this is called. */ default PipelineServiceClientResponse killIngestionRun( IngestionPipeline ingestionPipeline, String runId) { - return killIngestion(ingestionPipeline); + return new PipelineServiceClientResponse().withCode(200).withPlatform(getPlatform()); } String getPlatform(); From d562cc491eaed95207f868b304d04022108e9b68 Mon Sep 17 00:00:00 2001 From: Vishnu Jain Date: Thu, 9 Apr 2026 01:45:57 +0530 Subject: [PATCH 09/13] Fix per-run log fetching --- .../MeteredPipelineServiceClient.java | 7 ++++ .../jdbi3/IngestionPipelineRepository.java | 35 +++++++++++++++++++ .../service/resources/apps/AppResource.java | 18 +++++++--- .../sdk/PipelineServiceClientInterface.java | 7 ++++ .../AppRunsHistory.component.tsx | 21 ++++++----- .../pages/LogsViewerPage/LogsViewerPage.tsx | 10 ++++-- .../resources/ui/src/rest/applicationAPI.ts | 17 +++++++-- .../ui/src/utils/IngestionLogs/LogsUtils.ts | 16 ++++++--- 8 files changed, 108 insertions(+), 23 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java index 46e570b5b415..3e83521e04ba 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java @@ -136,6 +136,13 @@ public Map getLastIngestionLogs( GET_LOGS, () -> this.decoratedClient.getLastIngestionLogs(ingestionPipeline, after)); } + @Override + public Map getIngestionLogs( + IngestionPipeline ingestionPipeline, String after, String runId) { + return executeWithMetering( + GET_LOGS, () -> this.decoratedClient.getIngestionLogs(ingestionPipeline, after, runId)); + } + public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline) { return this.respondWithMetering( KILL, () -> this.decoratedClient.killIngestion(ingestionPipeline)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java index a1df85356ace..98ae00ff8a3e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java @@ -718,6 +718,41 @@ public PipelineStatus getPipelineStatus(String ingestionPipelineFQN, String runI PipelineStatus.class); } + /** + * Upsert only the time-series record for a specific run without overwriting the pipeline-level + * current status. Use this when stopping a specific run while other runs may still be active. + * Inserts a new record if none exists for the runId, otherwise updates the existing one. + */ + public void updatePipelineStatusByRunId(String fqn, PipelineStatus pipelineStatus) { + IngestionPipeline ingestionPipeline = findByName(fqn, Include.NON_DELETED); + String pipelineFqn = ingestionPipeline.getFullyQualifiedName(); + String json = JsonUtils.pojoToJson(pipelineStatus); + PipelineStatus storedPipelineStatus = + JsonUtils.readValue( + daoCollection + .entityExtensionTimeSeriesDao() + .getLatestExtensionByKey( + RUN_ID_EXTENSION_KEY, + pipelineStatus.getRunId(), + pipelineFqn, + PIPELINE_STATUS_EXTENSION), + PipelineStatus.class); + if (storedPipelineStatus != null) { + daoCollection + .entityExtensionTimeSeriesDao() + .updateExtensionByKey( + RUN_ID_EXTENSION_KEY, + pipelineStatus.getRunId(), + pipelineFqn, + PIPELINE_STATUS_EXTENSION, + json); + } else { + daoCollection + .entityExtensionTimeSeriesDao() + .insert(pipelineFqn, PIPELINE_STATUS_EXTENSION, PIPELINE_STATUS_JSON_SCHEMA, json); + } + } + @Transaction public IngestionPipeline deletePipelineStatusByRunId(UUID ingestionPipelineId, UUID runId) { IngestionPipeline ingestionPipeline = find(ingestionPipelineId, Include.NON_DELETED); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java index 51425569db27..9866fa31b82f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java @@ -535,7 +535,14 @@ public Response getLastLogs( schema = @Schema(type = "string")) @QueryParam("after") @DefaultValue("") - String after) { + String after, + @Parameter( + description = + "Pipeline run ID (Argo workflow UID) to fetch logs for a specific run. " + + "If not provided, returns logs for the latest run.", + schema = @Schema(type = "string")) + @QueryParam("runId") + String runId) { App installation = repository.getByName(uriInfo, name, repository.getFields("id,pipelines")); if (installation.getAppType().equals(AppType.Internal)) { AppRunRecord latestRun = repository.getLatestAppRunsOptional(installation).orElse(null); @@ -552,7 +559,7 @@ public Response getLastLogs( ingestionPipelineRepository.get( uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS)); return Response.ok( - pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, after), + pipelineServiceClient.getIngestionLogs(ingestionPipeline, after, runId), MediaType.APPLICATION_JSON_TYPE) .build(); } @@ -1338,8 +1345,11 @@ private void markPipelineStatusAsStopped( if (!PipelineStatusUtils.isTerminalState(status.getPipelineState())) { status.setPipelineState(PipelineStatusType.STOPPED); status.setEndDate(System.currentTimeMillis()); - ingestionPipelineRepository.addPipelineStatus( - uriInfo, ingestionPipeline.getFullyQualifiedName(), status); + // Use updatePipelineStatusByRunId instead of addPipelineStatus to avoid overwriting + // the pipeline-level current status. When stopping a specific run, other runs may still + // be active and their status should not be affected. + ingestionPipelineRepository.updatePipelineStatusByRunId( + ingestionPipeline.getFullyQualifiedName(), status); } } catch (Exception e) { LOG.error( diff --git a/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java b/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java index 84766cffa8f0..e7acd5fdea36 100644 --- a/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java +++ b/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java @@ -127,6 +127,13 @@ default PipelineServiceClientResponse runPipeline( /* Get the all last run logs of a deployed pipeline */ Map getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after); + /* Get logs for a specific pipeline run identified by runId (e.g. Argo workflow UID). + * When runId is null or blank, falls back to getLastIngestionLogs (latest run). */ + default Map getIngestionLogs( + IngestionPipeline ingestionPipeline, String after, String runId) { + return getLastIngestionLogs(ingestionPipeline, after); + } + /* Get the all last run logs of a deployed pipeline */ PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline); diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx index ed82766e4b5c..fcba20505c6d 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx @@ -138,16 +138,21 @@ const AppRunsHistory = forwardRef( }, [appData, appRunsHistoryData, isExternalApp]); const handleRowExpandable = useCallback( - (key?: string) => { + (key?: string, record?: AppRunRecordWithId) => { if (key) { if (isExternalApp && appData) { - return navigate( - getLogsViewerPath( - GlobalSettingOptions.APPLICATIONS, - appData.name ?? '', - appData.name ?? '' - ) + const basePath = getLogsViewerPath( + GlobalSettingOptions.APPLICATIONS, + appData.name ?? '', + appData.name ?? '' ); + const rawRunId = record?.properties?.pipelineRunId; + const runId = typeof rawRunId === 'string' ? rawRunId : undefined; + const path = runId + ? `${basePath}?runId=${encodeURIComponent(runId)}` + : basePath; + + return navigate(path); } if (expandedRowKeys.includes(key)) { setExpandedRowKeys((prev) => prev.filter((item) => item !== key)); @@ -190,7 +195,7 @@ const AppRunsHistory = forwardRef( disabled={showLogAction(record)} size="small" type="link" - onClick={() => handleRowExpandable(record.id)}> + onClick={() => handleRowExpandable(record.id, record)}> {t('label.log-plural')}