diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java index efc9e3ebd09f..3e83521e04ba 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java @@ -136,11 +136,25 @@ public Map getLastIngestionLogs( GET_LOGS, () -> this.decoratedClient.getLastIngestionLogs(ingestionPipeline, after)); } + @Override + public Map getIngestionLogs( + IngestionPipeline ingestionPipeline, String after, String runId) { + return executeWithMetering( + GET_LOGS, () -> this.decoratedClient.getIngestionLogs(ingestionPipeline, after, runId)); + } + public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline) { return this.respondWithMetering( KILL, () -> this.decoratedClient.killIngestion(ingestionPipeline)); } + @Override + public PipelineServiceClientResponse killIngestionRun( + IngestionPipeline ingestionPipeline, String runId) { + return this.respondWithMetering( + KILL, () -> this.decoratedClient.killIngestionRun(ingestionPipeline, runId)); + } + @Override public String getPlatform() { return this.decoratedClient.getPlatform(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java index f63fc06bdd41..ee99e1dddee0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java @@ -549,8 +549,13 @@ private AppRunRecord convertPipelineStatusToAppRun(App app, PipelineStatus pipel case SUCCESS -> AppRunRecord.Status.SUCCESS; case FAILED, PARTIAL_SUCCESS -> AppRunRecord.Status.FAILED; case RUNNING -> AppRunRecord.Status.RUNNING; + case STOPPED -> AppRunRecord.Status.STOPPED; }) - .withConfig(pipelineStatus.getConfig()); + .withConfig(pipelineStatus.getConfig()) + .withProperties( + pipelineStatus.getRunId() != null + ? Map.of("pipelineRunId", pipelineStatus.getRunId()) + : null); } /** diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java index a6ad876c7eab..9e7f2aa3acdd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java @@ -702,18 +702,58 @@ public PipelineStatus getLatestPipelineStatus(IngestionPipeline ingestionPipelin } public PipelineStatus getPipelineStatus(String ingestionPipelineFQN, UUID pipelineStatusRunId) { + return getPipelineStatus(ingestionPipelineFQN, pipelineStatusRunId.toString()); + } + + public PipelineStatus getPipelineStatus(String ingestionPipelineFQN, String runId) { IngestionPipeline ingestionPipeline = findByName(ingestionPipelineFQN, Include.NON_DELETED); return JsonUtils.readValue( daoCollection .entityExtensionTimeSeriesDao() .getExtensionByKey( RUN_ID_EXTENSION_KEY, - pipelineStatusRunId.toString(), + runId, ingestionPipeline.getFullyQualifiedName(), PIPELINE_STATUS_EXTENSION), PipelineStatus.class); } + /** + * Upsert only the time-series record for a specific run without overwriting the pipeline-level + * current status. Use this when stopping a specific run while other runs may still be active. + * Inserts a new record if none exists for the runId, otherwise updates the existing one. + */ + @Transaction + public void updatePipelineStatusByRunId(String fqn, PipelineStatus pipelineStatus) { + IngestionPipeline ingestionPipeline = findByName(fqn, Include.NON_DELETED); + String pipelineFqn = ingestionPipeline.getFullyQualifiedName(); + String json = JsonUtils.pojoToJson(pipelineStatus); + PipelineStatus storedPipelineStatus = + JsonUtils.readValue( + daoCollection + .entityExtensionTimeSeriesDao() + .getLatestExtensionByKey( + RUN_ID_EXTENSION_KEY, + pipelineStatus.getRunId(), + pipelineFqn, + PIPELINE_STATUS_EXTENSION), + PipelineStatus.class); + if (storedPipelineStatus != null) { + daoCollection + .entityExtensionTimeSeriesDao() + .updateExtensionByKey( + RUN_ID_EXTENSION_KEY, + pipelineStatus.getRunId(), + pipelineFqn, + PIPELINE_STATUS_EXTENSION, + json); + } else { + daoCollection + .entityExtensionTimeSeriesDao() + .insert(pipelineFqn, PIPELINE_STATUS_EXTENSION, PIPELINE_STATUS_JSON_SCHEMA, json); + } + } + @Transaction public IngestionPipeline deletePipelineStatusByRunId(UUID ingestionPipelineId, UUID runId) { IngestionPipeline ingestionPipeline = find(ingestionPipelineId, Include.NON_DELETED); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java index 5ab4f8ac5e5e..cea274db8270 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java @@ -47,6 +47,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.ServiceEntityInterface; import org.openmetadata.schema.api.data.RestoreEntity; @@ -59,6 +60,7 @@ import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse; import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType; import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection; import org.openmetadata.schema.type.EntityHistory; import org.openmetadata.schema.type.EntityReference; @@ -94,6 +96,7 @@ import org.openmetadata.service.util.DeleteEntityResponse; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.OpenMetadataConnectionBuilder; +import org.openmetadata.service.util.PipelineStatusUtils; import org.openmetadata.service.util.RestUtil; import org.openmetadata.service.util.WebsocketNotificationHandler; import org.quartz.SchedulerException; @@ -374,8 +377,13 @@ protected static AppRunRecord convertPipelineStatus(App app, PipelineStatus pipe case SUCCESS -> AppRunRecord.Status.SUCCESS; case FAILED, PARTIAL_SUCCESS -> AppRunRecord.Status.FAILED; case RUNNING -> AppRunRecord.Status.RUNNING; + case STOPPED -> AppRunRecord.Status.STOPPED; }) - .withConfig(pipelineStatus.getConfig()); + .withConfig(pipelineStatus.getConfig()) + .withProperties( + pipelineStatus.getRunId() != null + ? Map.of("pipelineRunId", pipelineStatus.getRunId()) + : null); } private ResultList sortRunsByStartTime(ResultList runs) { @@ -527,7 +535,14 @@ public Response getLastLogs( schema = @Schema(type = "string")) @QueryParam("after") @DefaultValue("") - String after) { + String after, + @Parameter( + description = + "Pipeline run ID to fetch logs for a specific run. " + + "If not provided, returns logs for the latest run.", + schema = @Schema(type = "string")) + @QueryParam("runId") + String runId) { App installation = repository.getByName(uriInfo, name, repository.getFields("id,pipelines")); if (installation.getAppType().equals(AppType.Internal)) { AppRunRecord latestRun = repository.getLatestAppRunsOptional(installation).orElse(null); @@ -544,7 +559,7 @@ public Response getLastLogs( ingestionPipelineRepository.get( uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS)); return Response.ok( - pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, after), + pipelineServiceClient.getIngestionLogs(ingestionPipeline, after, runId), MediaType.APPLICATION_JSON_TYPE) .build(); } @@ -1227,7 +1242,12 @@ public Response stopApplicationRun( @Context SecurityContext securityContext, @Parameter(description = "Name of the App", schema = @Schema(type = "string")) @PathParam("name") - String name) { + String name, + @Parameter( + description = "Pipeline run ID to stop a specific run", + schema = @Schema(type = "string")) + @QueryParam("runId") + String runId) { EntityUtil.Fields fields = getFields(String.format("%s,bot,pipelines", FIELD_OWNERS)); App app = repository.getByName(uriInfo, name, fields); OperationContext operationContext = new OperationContext(entityType, MetadataOperation.TRIGGER); @@ -1241,17 +1261,148 @@ public Response stopApplicationRun( .entity("Application stop in progress. Please check status via.") .build(); } else { - if (!app.getPipelines().isEmpty()) { - IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app); - PipelineServiceClientResponse response = - pipelineServiceClient.killIngestion(ingestionPipeline); - return Response.status(response.getCode()).entity(response).build(); + if (nullOrEmpty(app.getPipelines())) { + throw new BadRequestException( + String.format( + "Application [%s] supports interrupts but has no associated pipeline configured.", + name)); + } + IngestionPipeline ingestionPipeline = getIngestionPipeline(uriInfo, securityContext, app); + if (runId != null && !runId.isBlank()) { + return stopSpecificRun(uriInfo, ingestionPipeline, runId); + } else { + return stopAllRuns(app, ingestionPipeline); } } } throw new BadRequestException("Application does not support Interrupts."); } + private Response stopSpecificRun( + UriInfo uriInfo, IngestionPipeline ingestionPipeline, String runId) { + markPipelineStatusAsStopped(ingestionPipeline, runId); + PipelineServiceClientResponse killResponse; + try { + killResponse = pipelineServiceClient.killIngestionRun(ingestionPipeline, runId); + } catch (Exception e) { + LOG.error( + "Kill request for run [{}] on pipeline [{}] failed after DB update. Workflow may still be running.", + runId, + ingestionPipeline.getFullyQualifiedName(), + e); + return Response.status(Response.Status.BAD_GATEWAY) + .entity( + new PipelineServiceClientResponse() + .withCode(Response.Status.BAD_GATEWAY.getStatusCode()) + .withReason(e.getMessage()) + .withPlatform(pipelineServiceClient.getPlatform())) + .build(); + } + return toStopResponse(killResponse); + } + + private Response stopAllRuns(App app, IngestionPipeline ingestionPipeline) { + Long runStartTime = + repository + .getLatestAppRunsOptional(app, ingestionPipeline.getService().getId()) + .map(AppRunRecord::getStartTime) + .orElse(null); + markLatestPipelineStatusAsStopped(ingestionPipeline, runStartTime); + PipelineServiceClientResponse killResponse; + try { + killResponse = pipelineServiceClient.killIngestion(ingestionPipeline); + } catch (Exception e) { + LOG.error( + "Kill request for pipeline [{}] failed after DB update. Workflows may still be running.", + ingestionPipeline.getFullyQualifiedName(), + e); + return Response.status(Response.Status.BAD_GATEWAY) + .entity( + new PipelineServiceClientResponse() + .withCode(Response.Status.BAD_GATEWAY.getStatusCode()) + .withReason(e.getMessage()) + .withPlatform(pipelineServiceClient.getPlatform())) + .build(); + } + return toStopResponse(killResponse); + } + + private void markPipelineStatusAsStopped(IngestionPipeline ingestionPipeline, String runId) { + IngestionPipelineRepository ingestionPipelineRepository = + (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + try { + PipelineStatus status = + ingestionPipelineRepository.getPipelineStatus( + ingestionPipeline.getFullyQualifiedName(), runId); + if (status == null) { + LOG.warn( + "Pipeline status not found in DB for run [{}] on pipeline [{}]. Proceeding with kill but DB state will remain inconsistent.", + runId, + ingestionPipeline.getFullyQualifiedName()); + return; + } + if (!PipelineStatusUtils.isTerminalState(status.getPipelineState())) { + status.setPipelineState(PipelineStatusType.STOPPED); + status.setEndDate(System.currentTimeMillis()); + // Use updatePipelineStatusByRunId instead of addPipelineStatus to avoid overwriting + // the pipeline-level current status. When stopping a specific run, other runs may still + // be active and their status should not be affected. + ingestionPipelineRepository.updatePipelineStatusByRunId( + ingestionPipeline.getFullyQualifiedName(), status); + } + } catch (Exception e) { + LOG.error( + "Failed to mark run [{}] as STOPPED in DB for pipeline [{}]. Kill will proceed but DB status may remain inconsistent.", + runId, + ingestionPipeline.getFullyQualifiedName(), + e); + } + } + + private void markLatestPipelineStatusAsStopped( + IngestionPipeline ingestionPipeline, Long runStartTime) { + IngestionPipelineRepository ingestionPipelineRepository = + (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + long now = System.currentTimeMillis(); + long startTs = runStartTime != null ? runStartTime : now - TimeUnit.HOURS.toMillis(1); + ResultList statuses; + try { + statuses = + ingestionPipelineRepository.listPipelineStatus( + ingestionPipeline.getFullyQualifiedName(), startTs, now); + } catch (Exception e) { + LOG.error( + "Failed to list pipeline statuses for [{}]. Kill will proceed but DB statuses may remain inconsistent.", + ingestionPipeline.getFullyQualifiedName(), + e); + return; + } + for (PipelineStatus status : statuses.getData()) { + if (status.getRunId() == null || status.getRunId().isBlank()) { + continue; + } + if (!PipelineStatusUtils.isTerminalState(status.getPipelineState())) { + markPipelineStatusAsStopped(ingestionPipeline, status.getRunId()); + } + } + } + + private Response toStopResponse(PipelineServiceClientResponse killResponse) { + int code = killResponse.getCode(); + if (code >= 200 && code < 300) { + return Response.status(code).entity(killResponse).build(); + } + if (code == 404) { + LOG.warn( + "Kill request returned 404 — workflow already completed. DB status already marked STOPPED."); + return Response.ok(killResponse).build(); + } + LOG.error( + "Kill request returned unexpected code [{}]. DB status already marked STOPPED but workflow may still be running.", + code); + return Response.status(Response.Status.BAD_GATEWAY).entity(killResponse).build(); + } + @POST @Path("/deploy/{name}") @Operation( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/PipelineStatusUtils.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/PipelineStatusUtils.java new file mode 100644 index 000000000000..eb41a2c5367a --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/PipelineStatusUtils.java @@ -0,0 +1,33 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.util; + +import java.util.Set; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType; + +public final class PipelineStatusUtils { + + private static final Set TERMINAL_STATES = + Set.of( + PipelineStatusType.SUCCESS, + PipelineStatusType.FAILED, + PipelineStatusType.STOPPED, + PipelineStatusType.PARTIAL_SUCCESS); + + private PipelineStatusUtils() {} + + public static boolean isTerminalState(PipelineStatusType state) { + return state != null && TERMINAL_STATES.contains(state); + } +} diff --git a/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java b/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java index f56475286148..70c1ff0c3f86 100644 --- a/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java +++ b/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java @@ -127,8 +127,23 @@ default PipelineServiceClientResponse runPipeline( /* Get the all last run logs of a deployed pipeline */ Map getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after); + /* Get logs for a specific pipeline run identified by runId. + * When runId is null or blank, falls back to getLastIngestionLogs (latest run). */ + default Map getIngestionLogs( + IngestionPipeline ingestionPipeline, String after, String runId) { + return getLastIngestionLogs(ingestionPipeline, after); + } + /* Get the all last run logs of a deployed pipeline */ PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline); + /* Stop a specific run of a deployed pipeline identified by its run ID. + * Default is a no-op: clients that do not support per-run stopping return success without + * taking any action. The DB status is already marked STOPPED before this is called. */ + default PipelineServiceClientResponse killIngestionRun( + IngestionPipeline ingestionPipeline, String runId) { + return new PipelineServiceClientResponse().withCode(200).withPlatform(getPlatform()); + } + String getPlatform(); } diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json index 835bebffa633..6008e2856ca2 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json @@ -26,7 +26,7 @@ "description": "Pipeline status denotes if its failed or succeeded.", "type": "string", "javaType": "org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType", - "enum": ["queued","success","failed","running","partialSuccess"] + "enum": ["queued","success","failed","running","partialSuccess","stopped"] }, "startDate": { "description": "startDate of the pipeline run for this particular execution.", diff --git a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/AppStopRunModal.spec.ts b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/AppStopRunModal.spec.ts new file mode 100644 index 000000000000..eccbe539b17f --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/AppStopRunModal.spec.ts @@ -0,0 +1,217 @@ +/* + * Copyright 2025 Collate. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { expect, test } from '@playwright/test'; +import { GlobalSettingOptions } from '../../constant/settings'; +import { redirectToHomePage } from '../../utils/common'; +import { settingClick } from '../../utils/sidebar'; + +test.use({ storageState: 'playwright/.auth/admin.json' }); + +const MOCK_APP_NAME = 'DataInsightsApplication'; +const MOCK_RUN_ID = 'mock-pipeline-run-id-123'; + +const MOCK_APP = { + id: 'app-id-123', + name: MOCK_APP_NAME, + displayName: 'Data Insights Application', + fullyQualifiedName: MOCK_APP_NAME, + appType: 'Internal', + supportsInterrupt: true, + deleted: false, +}; + +const MOCK_RUNNING_RUN = { + appId: 'app-id-123', + appName: MOCK_APP_NAME, + runType: 'Scheduled', + status: 'running', + timestamp: Date.now() - 60000, + startTime: Date.now() - 60000, + properties: { + pipelineRunId: MOCK_RUN_ID, + }, +}; + +test.describe('App Stop Run Modal', () => { + test.beforeEach(async ({ page }) => { + await redirectToHomePage(page); + }); + + test('should show stop button for running app runs with supportsInterrupt=true', async ({ + page, + }) => { + await page.route(`**/api/v1/apps/name/${MOCK_APP_NAME}*`, async (route) => { + await route.fulfill({ json: MOCK_APP }); + }); + + await page.route( + `**/api/v1/apps/name/${MOCK_APP_NAME}/status*`, + async (route) => { + await route.fulfill({ + json: { + data: [ + { + ...MOCK_RUNNING_RUN, + id: `${MOCK_RUNNING_RUN.appId}-${MOCK_RUNNING_RUN.runType}-${MOCK_RUNNING_RUN.timestamp}`, + }, + ], + paging: { total: 1 }, + }, + }); + } + ); + + await page.route(/\/api\/v1\/apps\?/, async (route) => { + await route.fulfill({ + json: { + data: [MOCK_APP], + paging: { total: 1 }, + }, + }); + }); + + const appsResponse = page.waitForResponse( + `**/api/v1/apps?limit=15&include=non-deleted` + ); + await settingClick(page, GlobalSettingOptions.APPLICATIONS); + await appsResponse; + + await page.click( + `[data-testid="data-insights-application-card"] [data-testid="config-btn"]` + ); + + await page.getByRole('tab', { name: 'Recent Runs' }).click(); + + await expect(page.getByTestId('stop-button')).toBeVisible(); + }); + + test('should open stop modal when stop button is clicked and call stop API with runId', async ({ + page, + }) => { + let stopApiCalled = false; + let stopApiUrl = ''; + + await page.route(`**/api/v1/apps/name/${MOCK_APP_NAME}*`, async (route) => { + await route.fulfill({ json: MOCK_APP }); + }); + + await page.route( + `**/api/v1/apps/stop/${MOCK_APP_NAME}**`, + async (route) => { + stopApiCalled = true; + stopApiUrl = route.request().url(); + await route.fulfill({ status: 200, json: {} }); + } + ); + + await page.route( + `**/api/v1/apps/name/${MOCK_APP_NAME}/status*`, + async (route) => { + await route.fulfill({ + json: { + data: [ + { + ...MOCK_RUNNING_RUN, + id: `${MOCK_RUNNING_RUN.appId}-${MOCK_RUNNING_RUN.runType}-${MOCK_RUNNING_RUN.timestamp}`, + }, + ], + paging: { total: 1 }, + }, + }); + } + ); + + await page.route(/\/api\/v1\/apps\?/, async (route) => { + await route.fulfill({ + json: { + data: [MOCK_APP], + paging: { total: 1 }, + }, + }); + }); + + const appsResponse = page.waitForResponse( + `**/api/v1/apps?limit=15&include=non-deleted` + ); + await settingClick(page, GlobalSettingOptions.APPLICATIONS); + await appsResponse; + + await page.click( + `[data-testid="data-insights-application-card"] [data-testid="config-btn"]` + ); + + await page.getByRole('tab', { name: 'Recent Runs' }).click(); + + await page.click('[data-testid="stop-button"]'); + + await expect(page.getByTestId('stop-modal')).toBeVisible(); + + await page.getByRole('button', { name: 'Confirm' }).click(); + + await expect.poll(() => stopApiCalled, { timeout: 5000 }).toBe(true); + + expect(stopApiUrl).toContain(`runId=${MOCK_RUN_ID}`); + }); + + test('should close stop modal when cancel is clicked', async ({ page }) => { + await page.route(`**/api/v1/apps/name/${MOCK_APP_NAME}*`, async (route) => { + await route.fulfill({ json: MOCK_APP }); + }); + + await page.route( + `**/api/v1/apps/name/${MOCK_APP_NAME}/status*`, + async (route) => { + await route.fulfill({ + json: { + data: [ + { + ...MOCK_RUNNING_RUN, + id: `${MOCK_RUNNING_RUN.appId}-${MOCK_RUNNING_RUN.runType}-${MOCK_RUNNING_RUN.timestamp}`, + }, + ], + paging: { total: 1 }, + }, + }); + } + ); + + await page.route(/\/api\/v1\/apps\?/, async (route) => { + await route.fulfill({ + json: { + data: [MOCK_APP], + paging: { total: 1 }, + }, + }); + }); + + const appsResponse = page.waitForResponse( + `**/api/v1/apps?limit=15&include=non-deleted` + ); + await settingClick(page, GlobalSettingOptions.APPLICATIONS); + await appsResponse; + + await page.click( + `[data-testid="data-insights-application-card"] [data-testid="config-btn"]` + ); + + await page.getByRole('tab', { name: 'Recent Runs' }).click(); + + await page.click('[data-testid="stop-button"]'); + + await expect(page.getByTestId('stop-modal')).toBeVisible(); + + await page.getByRole('button', { name: 'Cancel' }).click(); + + await expect(page.getByTestId('stop-modal')).not.toBeVisible(); + }); +}); diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Modals/StopScheduleRun/StopScheduleRunModal.interface.ts b/openmetadata-ui/src/main/resources/ui/src/components/Modals/StopScheduleRun/StopScheduleRunModal.interface.ts index fc7a0caf7b3c..e253c2b0a756 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Modals/StopScheduleRun/StopScheduleRunModal.interface.ts +++ b/openmetadata-ui/src/main/resources/ui/src/components/Modals/StopScheduleRun/StopScheduleRunModal.interface.ts @@ -15,6 +15,7 @@ export interface StopScheduleRunModalProps { appName: string; isModalOpen: boolean; displayName: string; + runId?: string; onClose: () => void; onStopWorkflowsUpdate?: () => void; } diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Modals/StopScheduleRun/StopScheduleRunModal.test.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Modals/StopScheduleRun/StopScheduleRunModal.test.tsx index 660a7d2bca85..b5556ba89f5a 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Modals/StopScheduleRun/StopScheduleRunModal.test.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Modals/StopScheduleRun/StopScheduleRunModal.test.tsx @@ -10,7 +10,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { fireEvent, render, screen, waitFor } from '@testing-library/react'; +import { + act, + fireEvent, + render, + screen, + waitFor, +} from '@testing-library/react'; import { stopApp } from '../../../rest/applicationAPI'; import { showErrorToast } from '../../../utils/ToastUtils'; import StopScheduleModal from './StopScheduleRunModal'; @@ -51,7 +57,7 @@ describe('StopScheduleModal', () => { const confirmButton = screen.getByText('label.confirm'); fireEvent.click(confirmButton); - expect(stopApp).toHaveBeenCalledWith('test-app'); + expect(stopApp).toHaveBeenCalledWith('test-app', undefined); await waitFor(() => { expect(mockProps.onStopWorkflowsUpdate).toHaveBeenCalled(); @@ -65,9 +71,11 @@ describe('StopScheduleModal', () => { render(); const confirmButton = screen.getByText('label.confirm'); - fireEvent.click(confirmButton); + await act(async () => { + fireEvent.click(confirmButton); + }); - expect(stopApp).toHaveBeenCalledWith('test-app'); + expect(stopApp).toHaveBeenCalledWith('test-app', undefined); await waitFor(() => { expect(showErrorToast).toHaveBeenCalledWith(new Error('API Error')); @@ -83,4 +91,30 @@ describe('StopScheduleModal', () => { expect(mockProps.onClose).toHaveBeenCalled(); }); + + it('should pass runId to stopApp when provided', async () => { + (stopApp as jest.Mock).mockResolvedValueOnce({ status: 200 }); + + render(); + + const confirmButton = screen.getByText('label.confirm'); + fireEvent.click(confirmButton); + + expect(stopApp).toHaveBeenCalledWith('test-app', 'run-123'); + + await waitFor(() => { + expect(mockProps.onClose).toHaveBeenCalled(); + }); + }); + + it('should call stopApp without runId when not provided', async () => { + (stopApp as jest.Mock).mockResolvedValueOnce({ status: 200 }); + + render(); + + const confirmButton = screen.getByText('label.confirm'); + fireEvent.click(confirmButton); + + expect(stopApp).toHaveBeenCalledWith('test-app', undefined); + }); }); diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Modals/StopScheduleRun/StopScheduleRunModal.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Modals/StopScheduleRun/StopScheduleRunModal.tsx index 3340a1370a8d..77566394aa0d 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Modals/StopScheduleRun/StopScheduleRunModal.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Modals/StopScheduleRun/StopScheduleRunModal.tsx @@ -23,6 +23,7 @@ const StopScheduleModal: FC = ({ appName, isModalOpen, displayName, + runId, onClose, onStopWorkflowsUpdate, }) => { @@ -32,7 +33,7 @@ const StopScheduleModal: FC = ({ const handleConfirm = async () => { setIsLoading(true); try { - const { status } = await stopApp(appName); + const { status } = await stopApp(appName, runId); if (status === 200) { showSuccessToast( t('message.application-stop', { @@ -56,19 +57,20 @@ const StopScheduleModal: FC = ({ cancelText={t('label.cancel')} closable={false} confirmLoading={isLoading} - data-testid="stop-modal" maskClosable={false} okText={t('label.confirm')} open={isModalOpen} title={`${t('label.stop')} ${displayName} ?`} onCancel={onClose} onOk={handleConfirm}> - - {t('message.are-you-sure-action-property', { - action: 'Stop', - propertyName: displayName, - })} - +
+ + {t('message.are-you-sure-action-property', { + action: 'Stop', + propertyName: displayName, + })} + +
); }; diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx index 0523e5b3dad2..34eb42f346a2 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx @@ -86,6 +86,9 @@ const AppRunsHistory = forwardRef( >([]); const [expandedRowKeys, setExpandedRowKeys] = useState([]); const [isStopModalOpen, setIsStopModalOpen] = useState(false); + const [selectedRunId, setSelectedRunId] = useState( + undefined + ); const [showConfigModal, setShowConfigModal] = useState(false); const [appRunRecordConfig, setAppRunRecordConfig] = useState< AppRunRecord['config'] @@ -135,16 +138,21 @@ const AppRunsHistory = forwardRef( }, [appData, appRunsHistoryData, isExternalApp]); const handleRowExpandable = useCallback( - (key?: string) => { + (key?: string, record?: AppRunRecordWithId) => { if (key) { if (isExternalApp && appData) { - return navigate( - getLogsViewerPath( - GlobalSettingOptions.APPLICATIONS, - appData.name ?? '', - appData.name ?? '' - ) + const basePath = getLogsViewerPath( + GlobalSettingOptions.APPLICATIONS, + appData.name ?? '', + appData.name ?? '' ); + const rawRunId = record?.properties?.pipelineRunId; + const runId = typeof rawRunId === 'string' ? rawRunId : undefined; + const path = runId + ? `${basePath}?runId=${encodeURIComponent(runId)}` + : basePath; + + return navigate(path); } if (expandedRowKeys.includes(key)) { setExpandedRowKeys((prev) => prev.filter((item) => item !== key)); @@ -187,7 +195,7 @@ const AppRunsHistory = forwardRef( disabled={showLogAction(record)} size="small" type="link" - onClick={() => handleRowExpandable(record.id)}> + onClick={() => handleRowExpandable(record.id, record)}> {t('label.log-plural')} - {/* For status running or activewitherror and supportsInterrupt is true, show stop button */} - {(record.status === Status.Running || - record.status === Status.ActiveError) && + {record.status !== Status.Success && + record.status !== Status.Failed && + record.status !== Status.Stopped && + record.status !== Status.Completed && + record.status !== Status.StopInProgress && Boolean(appData?.supportsInterrupt) && ( )} @@ -452,8 +468,10 @@ const AppRunsHistory = forwardRef( appName={fqn} displayName={appData?.displayName ?? ''} isModalOpen={isStopModalOpen} + runId={selectedRunId} onClose={() => { setIsStopModalOpen(false); + setSelectedRunId(undefined); }} onStopWorkflowsUpdate={() => { fetchAppHistory(); diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts index 13f3b76e03d4..4fe64029ee42 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts @@ -764,6 +764,7 @@ export enum PipelineState { PartialSuccess = "partialSuccess", Queued = "queued", Running = "running", + Stopped = "stopped", Success = "success", } diff --git a/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewerPage/LogsViewerPage.component.test.tsx b/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewerPage/LogsViewerPage.component.test.tsx index 5848c9bec960..eb892f4138f6 100644 --- a/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewerPage/LogsViewerPage.component.test.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewerPage/LogsViewerPage.component.test.tsx @@ -43,6 +43,7 @@ jest.mock('react-router-dom', () => ({ ingestionName: 'ingestion_123456', }), useNavigate: jest.fn(), + useSearchParams: jest.fn(() => [new URLSearchParams(), jest.fn()]), })); jest.mock('../../utils/EntityUtils', () => ({ @@ -679,6 +680,48 @@ describe('LogsViewerPage.component', () => { }); }); + it('should pass runId from search params to getLatestApplicationRuns', async () => { + const mockUseSearchParams = + jest.requireMock('react-router-dom').useSearchParams; + mockUseSearchParams.mockReturnValue([ + new URLSearchParams({ runId: 'run-abc-123' }), + jest.fn(), + ]); + + (useParams as jest.Mock).mockReturnValue({ + logEntityType: 'apps', + fqn: 'DataInsightsApplication', + }); + + render(); + + await waitFor(() => { + expect(getLatestApplicationRuns).toHaveBeenCalledWith( + 'DataInsightsApplication', + 'run-abc-123' + ); + }); + + // Reset to default mock + mockUseSearchParams.mockReturnValue([new URLSearchParams(), jest.fn()]); + }); + + it('should pass undefined runId to getLatestApplicationRuns when not in search params', async () => { + (useParams as jest.Mock).mockReturnValue({ + logEntityType: 'apps', + fqn: 'DataInsightsApplication', + }); + + render(); + + await waitFor(() => { + expect(getLatestApplicationRuns).toHaveBeenCalledWith( + 'DataInsightsApplication', + undefined + ); + }); + }); + it('should handle error when fetching application details', async () => { (useParams as jest.Mock).mockReturnValue({ logEntityType: 'apps', diff --git a/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewerPage/LogsViewerPage.tsx b/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewerPage/LogsViewerPage.tsx index 412201a600d7..95895b93b37f 100644 --- a/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewerPage/LogsViewerPage.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewerPage/LogsViewerPage.tsx @@ -36,6 +36,7 @@ import { useState, } from 'react'; import { useTranslation } from 'react-i18next'; +import { useSearchParams } from 'react-router-dom'; import { ReactComponent as TimeDateIcon } from '../../assets/svg/time-date.svg'; import { CopyToClipboardButton } from '../../components/common/CopyToClipboardButton/CopyToClipboardButton'; import ErrorPlaceHolder from '../../components/common/ErrorWithPlaceholder/ErrorPlaceHolder'; @@ -84,6 +85,8 @@ import { LogViewerParams } from './LogsViewerPage.interfaces'; const LogsViewerPage = () => { const { logEntityType } = useRequiredParams(); const { fqn: ingestionName } = useFqn(); + const [searchParams] = useSearchParams(); + const runId = searchParams.get('runId') ?? undefined; const { t } = useTranslation(); const theme = useTheme(); @@ -116,7 +119,7 @@ const LogsViewerPage = () => { endTs: currentTime, }); - const logs = await getLatestApplicationRuns(ingestionName); + const logs = await getLatestApplicationRuns(ingestionName, runId); setAppRuns(data); setLogs(logs.data_insight_task || logs.application_task); @@ -314,7 +317,7 @@ const LogsViewerPage = () => { }.log`; if (isApplicationType) { - const logs = await downloadAppLogs(ingestionName); + const logs = await downloadAppLogs(ingestionName, runId); fileName = `${ingestionName}.log`; const element = document.createElement('a'); const file = new Blob([logs || ''], { type: 'text/plain' }); @@ -344,6 +347,7 @@ const LogsViewerPage = () => { ingestionName, isApplicationType, reset, + runId, updateProgress, ]); @@ -520,7 +524,7 @@ const LogsViewerPage = () => { } else { fetchIngestionDetailsByName(); } - }, []); + }, [runId]); return ( diff --git a/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts b/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts index 5b373d77e8a9..2a7a268814ea 100644 --- a/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts +++ b/openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts @@ -102,9 +102,15 @@ export const getExternalApplicationRuns = async ( return response.data; }; -export const getLatestApplicationRuns = async (appName: string) => { +export const getLatestApplicationRuns = async ( + appName: string, + runId?: string +) => { const response = await APIClient.get( - `${BASE_URL}/name/${getEncodedFqn(appName)}/logs` + `${BASE_URL}/name/${getEncodedFqn(appName)}/logs`, + { + params: runId ? { runId } : undefined, + } ); return response.data; @@ -155,14 +161,25 @@ export const restoreApp = async (id: string) => { return response.data; }; -export const stopApp = async (name: string) => { - return await APIClient.post(`${BASE_URL}/stop/${getEncodedFqn(name)}`); +export const stopApp = async (name: string, runId?: string) => { + return await APIClient.post( + `${BASE_URL}/stop/${getEncodedFqn(name)}`, + undefined, + { + params: runId ? { runId } : undefined, + } + ); }; -export const getApplicationLogs = (appName: string, after?: string) => { +export const getApplicationLogs = ( + appName: string, + after?: string, + runId?: string +) => { return APIClient.get(`${BASE_URL}/name/${appName}/logs`, { params: { after, + ...(runId && { runId }), }, }); }; diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/IngestionLogs/LogsUtils.ts b/openmetadata-ui/src/main/resources/ui/src/utils/IngestionLogs/LogsUtils.ts index 5e22a9e955e2..9f72bf049d69 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/IngestionLogs/LogsUtils.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/IngestionLogs/LogsUtils.ts @@ -62,7 +62,8 @@ export const getLogsFromResponse = ( export const fetchLogsRecursively = async ( ingestionId: string, pipelineType: string, - after?: string + after?: string, + runId?: string ) => { let logs = ''; @@ -70,7 +71,7 @@ export const fetchLogsRecursively = async ( data: { total, after: afterCursor, ...rest }, } = pipelineType === PipelineType.Application - ? await getApplicationLogs(ingestionId, after) + ? await getApplicationLogs(ingestionId, after, runId) : await getIngestionPipelineLogById(ingestionId, after); logs = logs.concat(getLogsFromResponse(rest, pipelineType)); if (afterCursor && total) { @@ -78,7 +79,7 @@ export const fetchLogsRecursively = async ( useDownloadProgressStore.getState().updateProgress(progress); logs = logs.concat( - await fetchLogsRecursively(ingestionId, pipelineType, afterCursor) + await fetchLogsRecursively(ingestionId, pipelineType, afterCursor, runId) ); } @@ -101,13 +102,18 @@ export const downloadIngestionLog = async (ingestionId?: string) => { } }; -export const downloadAppLogs = async (appName?: string) => { +export const downloadAppLogs = async (appName?: string, runId?: string) => { if (!appName) { return ''; } try { - return await fetchLogsRecursively(appName, PipelineType.Application); + return await fetchLogsRecursively( + appName, + PipelineType.Application, + undefined, + runId + ); } catch (err) { showErrorToast(err as AxiosError);