-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Add stopped state support for pipeline status and stop/cancel AI agents #27098
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
df32d44
1e4b73c
227b8fc
c9914e2
454f455
a2b2aec
a8029ea
dd4bcf7
c877d69
d562cc4
2683981
d484be8
da79897
0e16b6b
ae1715b
9c1cd2a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -59,6 +59,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.openmetadata.schema.type.EntityHistory; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.openmetadata.schema.type.EntityReference; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -374,8 +375,13 @@ protected static AppRunRecord convertPipelineStatus(App app, PipelineStatus pipe | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case SUCCESS -> AppRunRecord.Status.SUCCESS; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case FAILED, PARTIAL_SUCCESS -> AppRunRecord.Status.FAILED; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case RUNNING -> AppRunRecord.Status.RUNNING; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case STOPPED -> AppRunRecord.Status.STOPPED; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .withConfig(pipelineStatus.getConfig()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .withConfig(pipelineStatus.getConfig()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .withProperties( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pipelineStatus.getRunId() != null | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ? Map.of("pipelineRunId", pipelineStatus.getRunId()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| : null); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private ResultList<AppRunRecord> sortRunsByStartTime(ResultList<AppRunRecord> runs) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1227,7 +1233,12 @@ public Response stopApplicationRun( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Context SecurityContext securityContext, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Parameter(description = "Name of the App", schema = @Schema(type = "string")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @PathParam("name") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String name) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String name, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Parameter( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| description = "Pipeline run ID (Argo workflow UID) to stop a specific run", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Vishnuujain marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| schema = @Schema(type = "string")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @QueryParam("runId") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String runId) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
gitar-bot[bot] marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| EntityUtil.Fields fields = getFields(String.format("%s,bot,pipelines", FIELD_OWNERS)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| App app = repository.getByName(uriInfo, name, fields); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| OperationContext operationContext = new OperationContext(entityType, MetadataOperation.TRIGGER); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1243,15 +1254,83 @@ public Response stopApplicationRun( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!app.getPipelines().isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| PipelineServiceClientResponse response = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pipelineServiceClient.killIngestion(ingestionPipeline); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return Response.status(response.getCode()).entity(response).build(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (runId != null && !runId.isBlank()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| markPipelineStatusAsStopped(uriInfo, ingestionPipeline, runId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| PipelineServiceClientResponse response = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pipelineServiceClient.killIngestionRun(ingestionPipeline, runId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return Response.status(response.getCode()).entity(response).build(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| markLatestPipelineStatusAsStopped(uriInfo, ingestionPipeline); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| PipelineServiceClientResponse response = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pipelineServiceClient.killIngestion(ingestionPipeline); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| markPipelineStatusAsStopped(uriInfo, ingestionPipeline, runId); | |
| PipelineServiceClientResponse response = | |
| pipelineServiceClient.killIngestionRun(ingestionPipeline, runId); | |
| return Response.status(response.getCode()).entity(response).build(); | |
| } else { | |
| markLatestPipelineStatusAsStopped(uriInfo, ingestionPipeline); | |
| PipelineServiceClientResponse response = | |
| pipelineServiceClient.killIngestion(ingestionPipeline); | |
| PipelineServiceClientResponse response = | |
| pipelineServiceClient.killIngestionRun(ingestionPipeline, runId); | |
| if (response.getCode() >= 200 && response.getCode() < 300) { | |
| markPipelineStatusAsStopped(uriInfo, ingestionPipeline, runId); | |
| } | |
| return Response.status(response.getCode()).entity(response).build(); | |
| } else { | |
| PipelineServiceClientResponse response = | |
| pipelineServiceClient.killIngestion(ingestionPipeline); | |
| if (response.getCode() >= 200 && response.getCode() < 300) { | |
| markLatestPipelineStatusAsStopped(uriInfo, ingestionPipeline); | |
| } |
Copilot
AI
Apr 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new runId query parameter path and the DB-side status marking logic (markPipelineStatusAsStopped / markLatestPipelineStatusAsStopped) introduce new behavior that isn't covered by existing backend tests. Please add service-level tests for stopApplicationRun covering: stopping with a specific runId (updates only that run when kill succeeds), stopping without runId (fallback behavior), and failure cases (kill failure should not incorrectly mark runs as STOPPED).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldnt this implementation be based on markPipelineStatusAsStopped but passing the last runId? Otherwise seems we're duplicating logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored it, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand we want to change the status of the running pipeline from queue, running, etc. to stopped? Instead of fetching the last 20 status, why not fetch the statuses for the timestamp between start and now to have something more deterministic?
Listing the last 20 statuses feel fragile.
Copilot
AI
Apr 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
markLatestPipelineStatusAsStopped calls listPipelineStatus(...), which merges queued statuses from pipelineServiceClient.getQueuedPipelineStatus(...). Those queued entries may not be persisted and/or may not have a non-null runId, but addPipelineStatus(...) requires pipelineStatus.getRunId() as the time-series key. This can cause the whole marking loop to fail (caught and logged) and skip DB updates. Consider only iterating DB-backed statuses (avoid the merged queued list), or at least skip entries with null/blank runId and isolate failures per-status so one bad entry doesn't prevent updating others.
| long now = System.currentTimeMillis(); | |
| for (PipelineStatus status : recentStatuses.getData()) { | |
| if (!isTerminalState(status.getPipelineState())) { | |
| status.setPipelineState(PipelineStatusType.STOPPED); | |
| status.setEndDate(now); | |
| ingestionPipelineRepository.addPipelineStatus( | |
| uriInfo, ingestionPipeline.getFullyQualifiedName(), status); | |
| if (recentStatuses == null || recentStatuses.getData() == null) { | |
| return; | |
| } | |
| long now = System.currentTimeMillis(); | |
| for (PipelineStatus status : recentStatuses.getData()) { | |
| if (status == null || isTerminalState(status.getPipelineState())) { | |
| continue; | |
| } | |
| String runId = status.getRunId(); | |
| if (nullOrEmpty(runId)) { | |
| LOG.warn( | |
| "Skipping pipeline status without runId while marking runs as stopped for pipeline {}", | |
| ingestionPipeline.getFullyQualifiedName()); | |
| continue; | |
| } | |
| try { | |
| status.setPipelineState(PipelineStatusType.STOPPED); | |
| status.setEndDate(now); | |
| ingestionPipelineRepository.addPipelineStatus( | |
| uriInfo, ingestionPipeline.getFullyQualifiedName(), status); | |
| } catch (RuntimeException e) { | |
| LOG.warn( | |
| "Failed to mark pipeline run {} as stopped, continuing with remaining runs: {}", | |
| runId, | |
| e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should that be a utility method instead of a method of the AppResource?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wasn't used much, but we can move it in util
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -130,5 +130,11 @@ default PipelineServiceClientResponse runPipeline( | |
| /* Get the all last run logs of a deployed pipeline */ | ||
| PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline); | ||
|
|
||
| /* Stop a specific run of a deployed pipeline identified by its run ID (Argo workflow UID) */ | ||
| default PipelineServiceClientResponse killIngestionRun( | ||
| IngestionPipeline ingestionPipeline, String runId) { | ||
| return killIngestion(ingestionPipeline); | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are not using the runId here if I am not mistaking. We should make it noop for clarity
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, runId is unused in the default intentional fallback for non-Argo clients that don't support per-run stopping
Comment on lines
+143
to
+146
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| String getPlatform(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -26,7 +26,7 @@ | |||||
| "description": "Pipeline status denotes if its failed or succeeded.", | ||||||
| "type": "string", | ||||||
| "javaType": "org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType", | ||||||
| "enum": ["queued","success","failed","running","partialSuccess"] | ||||||
| "enum": ["queued","success","failed","running","partialSuccess","stopped"] | ||||||
|
||||||
| "enum": ["queued","success","failed","running","partialSuccess","stopped"] | |
| "enum": ["queued","success","failed","running","partialSuccess"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,6 +86,9 @@ const AppRunsHistory = forwardRef( | |
| >([]); | ||
| const [expandedRowKeys, setExpandedRowKeys] = useState<string[]>([]); | ||
| const [isStopModalOpen, setIsStopModalOpen] = useState<boolean>(false); | ||
| const [selectedRunId, setSelectedRunId] = useState<string | undefined>( | ||
| undefined | ||
| ); | ||
| const [showConfigModal, setShowConfigModal] = useState<boolean>(false); | ||
| const [appRunRecordConfig, setAppRunRecordConfig] = useState< | ||
| AppRunRecord['config'] | ||
|
|
@@ -198,16 +201,23 @@ const AppRunsHistory = forwardRef( | |
| onClick={() => showAppRunConfig(record)}> | ||
| {t('label.config')} | ||
| </Button> | ||
| {/* For status running or activewitherror and supportsInterrupt is true, show stop button */} | ||
| {(record.status === Status.Running || | ||
| record.status === Status.ActiveError) && | ||
| {record.status !== Status.Success && | ||
| record.status !== Status.Failed && | ||
| record.status !== Status.Stopped && | ||
| record.status !== Status.Completed && | ||
| Boolean(appData?.supportsInterrupt) && ( | ||
|
gitar-bot[bot] marked this conversation as resolved.
|
||
| <Button | ||
| className="m-l-xs p-0" | ||
| data-testid="stop-button" | ||
| size="small" | ||
| type="link" | ||
| onClick={() => setIsStopModalOpen(true)}> | ||
| onClick={() => { | ||
| const rawRunId = record.properties?.pipelineRunId; | ||
| setSelectedRunId( | ||
| typeof rawRunId === 'string' ? rawRunId : undefined | ||
| ); | ||
| setIsStopModalOpen(true); | ||
| }}> | ||
|
Comment on lines
+209
to
+226
|
||
| {t('label.stop')} | ||
| </Button> | ||
| )} | ||
|
|
@@ -452,8 +462,10 @@ const AppRunsHistory = forwardRef( | |
| appName={fqn} | ||
| displayName={appData?.displayName ?? ''} | ||
| isModalOpen={isStopModalOpen} | ||
| runId={selectedRunId} | ||
| onClose={() => { | ||
| setIsStopModalOpen(false); | ||
| setSelectedRunId(undefined); | ||
| }} | ||
| onStopWorkflowsUpdate={() => { | ||
| fetchAppHistory(); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.