-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Add stopped state support for pipeline status and stop/cancel AI agents #27098
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
df32d44
1e4b73c
227b8fc
c9914e2
454f455
a2b2aec
a8029ea
dd4bcf7
c877d69
d562cc4
2683981
d484be8
da79897
0e16b6b
ae1715b
9c1cd2a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,6 +47,7 @@ | |
| import java.util.Map; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.openmetadata.schema.ServiceEntityInterface; | ||
| import org.openmetadata.schema.api.data.RestoreEntity; | ||
|
|
@@ -59,6 +60,7 @@ | |
| import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; | ||
| import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse; | ||
| import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; | ||
| import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType; | ||
| import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection; | ||
| import org.openmetadata.schema.type.EntityHistory; | ||
| import org.openmetadata.schema.type.EntityReference; | ||
|
|
@@ -94,6 +96,7 @@ | |
| import org.openmetadata.service.util.DeleteEntityResponse; | ||
| import org.openmetadata.service.util.EntityUtil; | ||
| import org.openmetadata.service.util.OpenMetadataConnectionBuilder; | ||
| import org.openmetadata.service.util.PipelineStatusUtils; | ||
| import org.openmetadata.service.util.RestUtil; | ||
| import org.openmetadata.service.util.WebsocketNotificationHandler; | ||
| import org.quartz.SchedulerException; | ||
|
|
@@ -374,8 +377,13 @@ protected static AppRunRecord convertPipelineStatus(App app, PipelineStatus pipe | |
| case SUCCESS -> AppRunRecord.Status.SUCCESS; | ||
| case FAILED, PARTIAL_SUCCESS -> AppRunRecord.Status.FAILED; | ||
| case RUNNING -> AppRunRecord.Status.RUNNING; | ||
| case STOPPED -> AppRunRecord.Status.STOPPED; | ||
| }) | ||
| .withConfig(pipelineStatus.getConfig()); | ||
| .withConfig(pipelineStatus.getConfig()) | ||
| .withProperties( | ||
| pipelineStatus.getRunId() != null | ||
| ? Map.of("pipelineRunId", pipelineStatus.getRunId()) | ||
| : null); | ||
| } | ||
|
|
||
| private ResultList<AppRunRecord> sortRunsByStartTime(ResultList<AppRunRecord> runs) { | ||
|
|
@@ -527,7 +535,14 @@ public Response getLastLogs( | |
| schema = @Schema(type = "string")) | ||
| @QueryParam("after") | ||
| @DefaultValue("") | ||
| String after) { | ||
| String after, | ||
| @Parameter( | ||
| description = | ||
| "Pipeline run ID to fetch logs for a specific run. " | ||
| + "If not provided, returns logs for the latest run.", | ||
| schema = @Schema(type = "string")) | ||
| @QueryParam("runId") | ||
| String runId) { | ||
| App installation = repository.getByName(uriInfo, name, repository.getFields("id,pipelines")); | ||
| if (installation.getAppType().equals(AppType.Internal)) { | ||
| AppRunRecord latestRun = repository.getLatestAppRunsOptional(installation).orElse(null); | ||
|
|
@@ -544,7 +559,7 @@ public Response getLastLogs( | |
| ingestionPipelineRepository.get( | ||
| uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS)); | ||
| return Response.ok( | ||
| pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, after), | ||
| pipelineServiceClient.getIngestionLogs(ingestionPipeline, after, runId), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this backward compatible? would it pick the last logs if runId is null?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, it's backward comatible |
||
| MediaType.APPLICATION_JSON_TYPE) | ||
| .build(); | ||
| } | ||
|
|
@@ -1227,7 +1242,12 @@ public Response stopApplicationRun( | |
| @Context SecurityContext securityContext, | ||
| @Parameter(description = "Name of the App", schema = @Schema(type = "string")) | ||
| @PathParam("name") | ||
| String name) { | ||
| String name, | ||
| @Parameter( | ||
| description = "Pipeline run ID to stop a specific run", | ||
| schema = @Schema(type = "string")) | ||
| @QueryParam("runId") | ||
| String runId) { | ||
| EntityUtil.Fields fields = getFields(String.format("%s,bot,pipelines", FIELD_OWNERS)); | ||
| App app = repository.getByName(uriInfo, name, fields); | ||
| OperationContext operationContext = new OperationContext(entityType, MetadataOperation.TRIGGER); | ||
|
|
@@ -1241,17 +1261,148 @@ public Response stopApplicationRun( | |
| .entity("Application stop in progress. Please check status via.") | ||
| .build(); | ||
| } else { | ||
| if (!app.getPipelines().isEmpty()) { | ||
| IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app); | ||
| PipelineServiceClientResponse response = | ||
| pipelineServiceClient.killIngestion(ingestionPipeline); | ||
| return Response.status(response.getCode()).entity(response).build(); | ||
| if (nullOrEmpty(app.getPipelines())) { | ||
| throw new BadRequestException( | ||
| String.format( | ||
| "Application [%s] supports interrupts but has no associated pipeline configured.", | ||
| name)); | ||
| } | ||
| IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app); | ||
| if (runId != null && !runId.isBlank()) { | ||
| return stopSpecificRun(uriInfo, ingestionPipeline, runId); | ||
| } else { | ||
| return stopAllRuns(app, ingestionPipeline); | ||
| } | ||
| } | ||
| } | ||
| throw new BadRequestException("Application does not support Interrupts."); | ||
| } | ||
|
|
||
| private Response stopSpecificRun( | ||
| UriInfo uriInfo, IngestionPipeline ingestionPipeline, String runId) { | ||
| markPipelineStatusAsStopped(ingestionPipeline, runId); | ||
| PipelineServiceClientResponse killResponse; | ||
| try { | ||
| killResponse = pipelineServiceClient.killIngestionRun(ingestionPipeline, runId); | ||
| } catch (Exception e) { | ||
| LOG.error( | ||
| "Kill request for run [{}] on pipeline [{}] failed after DB update. Workflow may still be running.", | ||
| runId, | ||
| ingestionPipeline.getFullyQualifiedName(), | ||
| e); | ||
| return Response.status(Response.Status.BAD_GATEWAY) | ||
| .entity( | ||
| new PipelineServiceClientResponse() | ||
| .withCode(Response.Status.BAD_GATEWAY.getStatusCode()) | ||
| .withReason(e.getMessage()) | ||
| .withPlatform(pipelineServiceClient.getPlatform())) | ||
| .build(); | ||
|
gitar-bot[bot] marked this conversation as resolved.
|
||
| } | ||
| return toStopResponse(killResponse); | ||
| } | ||
|
|
||
| private Response stopAllRuns(App app, IngestionPipeline ingestionPipeline) { | ||
| Long runStartTime = | ||
| repository | ||
| .getLatestAppRunsOptional(app, ingestionPipeline.getService().getId()) | ||
| .map(AppRunRecord::getStartTime) | ||
| .orElse(null); | ||
| markLatestPipelineStatusAsStopped(ingestionPipeline, runStartTime); | ||
| PipelineServiceClientResponse killResponse; | ||
| try { | ||
| killResponse = pipelineServiceClient.killIngestion(ingestionPipeline); | ||
| } catch (Exception e) { | ||
| LOG.error( | ||
| "Kill request for pipeline [{}] failed after DB update. Workflows may still be running.", | ||
| ingestionPipeline.getFullyQualifiedName(), | ||
| e); | ||
| return Response.status(Response.Status.BAD_GATEWAY) | ||
| .entity( | ||
| new PipelineServiceClientResponse() | ||
| .withCode(Response.Status.BAD_GATEWAY.getStatusCode()) | ||
| .withReason(e.getMessage()) | ||
| .withPlatform(pipelineServiceClient.getPlatform())) | ||
| .build(); | ||
| } | ||
| return toStopResponse(killResponse); | ||
| } | ||
|
|
||
| private void markPipelineStatusAsStopped(IngestionPipeline ingestionPipeline, String runId) { | ||
| IngestionPipelineRepository ingestionPipelineRepository = | ||
| (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); | ||
| try { | ||
| PipelineStatus status = | ||
| ingestionPipelineRepository.getPipelineStatus( | ||
| ingestionPipeline.getFullyQualifiedName(), runId); | ||
| if (status == null) { | ||
| LOG.warn( | ||
| "Pipeline status not found in DB for run [{}] on pipeline [{}]. Proceeding with kill but DB state will remain inconsistent.", | ||
| runId, | ||
| ingestionPipeline.getFullyQualifiedName()); | ||
| return; | ||
| } | ||
| if (!PipelineStatusUtils.isTerminalState(status.getPipelineState())) { | ||
| status.setPipelineState(PipelineStatusType.STOPPED); | ||
| status.setEndDate(System.currentTimeMillis()); | ||
| // Use updatePipelineStatusByRunId instead of addPipelineStatus to avoid overwriting | ||
| // the pipeline-level current status. When stopping a specific run, other runs may still | ||
| // be active and their status should not be affected. | ||
| ingestionPipelineRepository.updatePipelineStatusByRunId( | ||
| ingestionPipeline.getFullyQualifiedName(), status); | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.error( | ||
| "Failed to mark run [{}] as STOPPED in DB for pipeline [{}]. Kill will proceed but DB status may remain inconsistent.", | ||
| runId, | ||
| ingestionPipeline.getFullyQualifiedName(), | ||
| e); | ||
| } | ||
| } | ||
|
|
||
| private void markLatestPipelineStatusAsStopped( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldnt this implementation be based on markPipelineStatusAsStopped but passing the last runId? Otherwise seems we're duplicating logic
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. refactored it, thanks |
||
| IngestionPipeline ingestionPipeline, Long runStartTime) { | ||
| IngestionPipelineRepository ingestionPipelineRepository = | ||
| (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); | ||
| long now = System.currentTimeMillis(); | ||
| long startTs = runStartTime != null ? runStartTime : now - TimeUnit.HOURS.toMillis(1); | ||
| ResultList<PipelineStatus> statuses; | ||
| try { | ||
| statuses = | ||
| ingestionPipelineRepository.listPipelineStatus( | ||
| ingestionPipeline.getFullyQualifiedName(), startTs, now); | ||
| } catch (Exception e) { | ||
| LOG.error( | ||
| "Failed to list pipeline statuses for [{}]. Kill will proceed but DB statuses may remain inconsistent.", | ||
| ingestionPipeline.getFullyQualifiedName(), | ||
| e); | ||
| return; | ||
| } | ||
| for (PipelineStatus status : statuses.getData()) { | ||
| if (status.getRunId() == null || status.getRunId().isBlank()) { | ||
| continue; | ||
| } | ||
| if (!PipelineStatusUtils.isTerminalState(status.getPipelineState())) { | ||
| markPipelineStatusAsStopped(ingestionPipeline, status.getRunId()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private Response toStopResponse(PipelineServiceClientResponse killResponse) { | ||
| int code = killResponse.getCode(); | ||
| if (code >= 200 && code < 300) { | ||
| return Response.status(code).entity(killResponse).build(); | ||
| } | ||
| if (code == 404) { | ||
| LOG.warn( | ||
| "Kill request returned 404 — workflow already completed. DB status already marked STOPPED."); | ||
| return Response.ok(killResponse).build(); | ||
| } | ||
| LOG.error( | ||
| "Kill request returned unexpected code [{}]. DB status already marked STOPPED but workflow may still be running.", | ||
| code); | ||
| return Response.status(Response.Status.BAD_GATEWAY).entity(killResponse).build(); | ||
| } | ||
|
|
||
| @POST | ||
| @Path("/deploy/{name}") | ||
| @Operation( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| /* | ||
| * Copyright 2021 Collate | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.openmetadata.service.util; | ||
|
|
||
| import java.util.Set; | ||
| import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType; | ||
|
|
||
| public final class PipelineStatusUtils { | ||
|
|
||
| private static final Set<PipelineStatusType> TERMINAL_STATES = | ||
| Set.of( | ||
| PipelineStatusType.SUCCESS, | ||
| PipelineStatusType.FAILED, | ||
| PipelineStatusType.STOPPED, | ||
| PipelineStatusType.PARTIAL_SUCCESS); | ||
|
|
||
| private PipelineStatusUtils() {} | ||
|
|
||
| public static boolean isTerminalState(PipelineStatusType state) { | ||
| return state != null && TERMINAL_STATES.contains(state); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -127,8 +127,23 @@ default PipelineServiceClientResponse runPipeline( | |
| /* Get the all last run logs of a deployed pipeline */ | ||
| Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after); | ||
|
|
||
| /* Get logs for a specific pipeline run identified by runId. | ||
| * When runId is null or blank, falls back to getLastIngestionLogs (latest run). */ | ||
| default Map<String, String> getIngestionLogs( | ||
| IngestionPipeline ingestionPipeline, String after, String runId) { | ||
| return getLastIngestionLogs(ingestionPipeline, after); | ||
| } | ||
|
Vishnuujain marked this conversation as resolved.
|
||
|
|
||
| /* Get the all last run logs of a deployed pipeline */ | ||
| PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline); | ||
|
|
||
| /* Stop a specific run of a deployed pipeline identified by its run ID. | ||
| * Default is a no-op: clients that do not support per-run stopping return success without | ||
| * taking any action. The DB status is already marked STOPPED before this is called. */ | ||
| default PipelineServiceClientResponse killIngestionRun( | ||
| IngestionPipeline ingestionPipeline, String runId) { | ||
| return new PipelineServiceClientResponse().withCode(200).withPlatform(getPlatform()); | ||
| } | ||
|
Comment on lines
+143
to
+146
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| String getPlatform(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -26,7 +26,7 @@ | |||||
| "description": "Pipeline status denotes if its failed or succeeded.", | ||||||
| "type": "string", | ||||||
| "javaType": "org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType", | ||||||
| "enum": ["queued","success","failed","running","partialSuccess"] | ||||||
| "enum": ["queued","success","failed","running","partialSuccess","stopped"] | ||||||
|
||||||
| "enum": ["queued","success","failed","running","partialSuccess","stopped"] | |
| "enum": ["queued","success","failed","running","partialSuccess"] |
Uh oh!
There was an error while loading. Please reload this page.