Skip to content

Commit d562cc4

Browse files
committed
Fix per-run log fetching
1 parent c877d69 commit d562cc4

File tree

8 files changed

+108
-23
lines changed

8 files changed

+108
-23
lines changed

openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,13 @@ public Map<String, String> getLastIngestionLogs(
136136
GET_LOGS, () -> this.decoratedClient.getLastIngestionLogs(ingestionPipeline, after));
137137
}
138138

139+
@Override
140+
public Map<String, String> getIngestionLogs(
141+
IngestionPipeline ingestionPipeline, String after, String runId) {
142+
return executeWithMetering(
143+
GET_LOGS, () -> this.decoratedClient.getIngestionLogs(ingestionPipeline, after, runId));
144+
}
145+
139146
public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline) {
140147
return this.respondWithMetering(
141148
KILL, () -> this.decoratedClient.killIngestion(ingestionPipeline));

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,41 @@ public PipelineStatus getPipelineStatus(String ingestionPipelineFQN, String runI
718718
PipelineStatus.class);
719719
}
720720

721+
/**
722+
* Upsert only the time-series record for a specific run without overwriting the pipeline-level
723+
* current status. Use this when stopping a specific run while other runs may still be active.
724+
* Inserts a new record if none exists for the runId, otherwise updates the existing one.
725+
*/
726+
public void updatePipelineStatusByRunId(String fqn, PipelineStatus pipelineStatus) {
727+
IngestionPipeline ingestionPipeline = findByName(fqn, Include.NON_DELETED);
728+
String pipelineFqn = ingestionPipeline.getFullyQualifiedName();
729+
String json = JsonUtils.pojoToJson(pipelineStatus);
730+
PipelineStatus storedPipelineStatus =
731+
JsonUtils.readValue(
732+
daoCollection
733+
.entityExtensionTimeSeriesDao()
734+
.getLatestExtensionByKey(
735+
RUN_ID_EXTENSION_KEY,
736+
pipelineStatus.getRunId(),
737+
pipelineFqn,
738+
PIPELINE_STATUS_EXTENSION),
739+
PipelineStatus.class);
740+
if (storedPipelineStatus != null) {
741+
daoCollection
742+
.entityExtensionTimeSeriesDao()
743+
.updateExtensionByKey(
744+
RUN_ID_EXTENSION_KEY,
745+
pipelineStatus.getRunId(),
746+
pipelineFqn,
747+
PIPELINE_STATUS_EXTENSION,
748+
json);
749+
} else {
750+
daoCollection
751+
.entityExtensionTimeSeriesDao()
752+
.insert(pipelineFqn, PIPELINE_STATUS_EXTENSION, PIPELINE_STATUS_JSON_SCHEMA, json);
753+
}
754+
}
755+
721756
@Transaction
722757
public IngestionPipeline deletePipelineStatusByRunId(UUID ingestionPipelineId, UUID runId) {
723758
IngestionPipeline ingestionPipeline = find(ingestionPipelineId, Include.NON_DELETED);

openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,14 @@ public Response getLastLogs(
535535
schema = @Schema(type = "string"))
536536
@QueryParam("after")
537537
@DefaultValue("")
538-
String after) {
538+
String after,
539+
@Parameter(
540+
description =
541+
"Pipeline run ID (Argo workflow UID) to fetch logs for a specific run. "
542+
+ "If not provided, returns logs for the latest run.",
543+
schema = @Schema(type = "string"))
544+
@QueryParam("runId")
545+
String runId) {
539546
App installation = repository.getByName(uriInfo, name, repository.getFields("id,pipelines"));
540547
if (installation.getAppType().equals(AppType.Internal)) {
541548
AppRunRecord latestRun = repository.getLatestAppRunsOptional(installation).orElse(null);
@@ -552,7 +559,7 @@ public Response getLastLogs(
552559
ingestionPipelineRepository.get(
553560
uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));
554561
return Response.ok(
555-
pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, after),
562+
pipelineServiceClient.getIngestionLogs(ingestionPipeline, after, runId),
556563
MediaType.APPLICATION_JSON_TYPE)
557564
.build();
558565
}
@@ -1338,8 +1345,11 @@ private void markPipelineStatusAsStopped(
13381345
if (!PipelineStatusUtils.isTerminalState(status.getPipelineState())) {
13391346
status.setPipelineState(PipelineStatusType.STOPPED);
13401347
status.setEndDate(System.currentTimeMillis());
1341-
ingestionPipelineRepository.addPipelineStatus(
1342-
uriInfo, ingestionPipeline.getFullyQualifiedName(), status);
1348+
// Use updatePipelineStatusByRunId instead of addPipelineStatus to avoid overwriting
1349+
// the pipeline-level current status. When stopping a specific run, other runs may still
1350+
// be active and their status should not be affected.
1351+
ingestionPipelineRepository.updatePipelineStatusByRunId(
1352+
ingestionPipeline.getFullyQualifiedName(), status);
13431353
}
13441354
} catch (Exception e) {
13451355
LOG.error(

openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,13 @@ default PipelineServiceClientResponse runPipeline(
127127
/* Get the all last run logs of a deployed pipeline */
128128
Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after);
129129

130+
/* Get logs for a specific pipeline run identified by runId (e.g. Argo workflow UID).
131+
* When runId is null or blank, falls back to getLastIngestionLogs (latest run). */
132+
default Map<String, String> getIngestionLogs(
133+
IngestionPipeline ingestionPipeline, String after, String runId) {
134+
return getLastIngestionLogs(ingestionPipeline, after);
135+
}
136+
130137
/* Get the all last run logs of a deployed pipeline */
131138
PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline);
132139

openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -138,16 +138,21 @@ const AppRunsHistory = forwardRef(
138138
}, [appData, appRunsHistoryData, isExternalApp]);
139139

140140
const handleRowExpandable = useCallback(
141-
(key?: string) => {
141+
(key?: string, record?: AppRunRecordWithId) => {
142142
if (key) {
143143
if (isExternalApp && appData) {
144-
return navigate(
145-
getLogsViewerPath(
146-
GlobalSettingOptions.APPLICATIONS,
147-
appData.name ?? '',
148-
appData.name ?? ''
149-
)
144+
const basePath = getLogsViewerPath(
145+
GlobalSettingOptions.APPLICATIONS,
146+
appData.name ?? '',
147+
appData.name ?? ''
150148
);
149+
const rawRunId = record?.properties?.pipelineRunId;
150+
const runId = typeof rawRunId === 'string' ? rawRunId : undefined;
151+
const path = runId
152+
? `${basePath}?runId=${encodeURIComponent(runId)}`
153+
: basePath;
154+
155+
return navigate(path);
151156
}
152157
if (expandedRowKeys.includes(key)) {
153158
setExpandedRowKeys((prev) => prev.filter((item) => item !== key));
@@ -190,7 +195,7 @@ const AppRunsHistory = forwardRef(
190195
disabled={showLogAction(record)}
191196
size="small"
192197
type="link"
193-
onClick={() => handleRowExpandable(record.id)}>
198+
onClick={() => handleRowExpandable(record.id, record)}>
194199
{t('label.log-plural')}
195200
</Button>
196201
<Button

openmetadata-ui/src/main/resources/ui/src/pages/LogsViewerPage/LogsViewerPage.tsx

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import {
3636
useState,
3737
} from 'react';
3838
import { useTranslation } from 'react-i18next';
39+
import { useSearchParams } from 'react-router-dom';
3940
import { ReactComponent as TimeDateIcon } from '../../assets/svg/time-date.svg';
4041
import { CopyToClipboardButton } from '../../components/common/CopyToClipboardButton/CopyToClipboardButton';
4142
import ErrorPlaceHolder from '../../components/common/ErrorWithPlaceholder/ErrorPlaceHolder';
@@ -84,6 +85,8 @@ import { LogViewerParams } from './LogsViewerPage.interfaces';
8485
const LogsViewerPage = () => {
8586
const { logEntityType } = useRequiredParams<LogViewerParams>();
8687
const { fqn: ingestionName } = useFqn();
88+
const [searchParams] = useSearchParams();
89+
const runId = searchParams.get('runId') ?? undefined;
8790

8891
const { t } = useTranslation();
8992
const theme = useTheme();
@@ -116,7 +119,7 @@ const LogsViewerPage = () => {
116119
endTs: currentTime,
117120
});
118121

119-
const logs = await getLatestApplicationRuns(ingestionName);
122+
const logs = await getLatestApplicationRuns(ingestionName, runId);
120123
setAppRuns(data);
121124
setLogs(logs.data_insight_task || logs.application_task);
122125

@@ -314,7 +317,7 @@ const LogsViewerPage = () => {
314317
}.log`;
315318

316319
if (isApplicationType) {
317-
const logs = await downloadAppLogs(ingestionName);
320+
const logs = await downloadAppLogs(ingestionName, runId);
318321
fileName = `${ingestionName}.log`;
319322
const element = document.createElement('a');
320323
const file = new Blob([logs || ''], { type: 'text/plain' });
@@ -344,6 +347,7 @@ const LogsViewerPage = () => {
344347
ingestionName,
345348
isApplicationType,
346349
reset,
350+
runId,
347351
updateProgress,
348352
]);
349353

@@ -520,7 +524,7 @@ const LogsViewerPage = () => {
520524
} else {
521525
fetchIngestionDetailsByName();
522526
}
523-
}, []);
527+
}, [runId]);
524528

525529
return (
526530
<PageLayoutV1 pageTitle={t('label.log-viewer')}>

openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,15 @@ export const getExternalApplicationRuns = async (
102102
return response.data;
103103
};
104104

105-
export const getLatestApplicationRuns = async (appName: string) => {
105+
export const getLatestApplicationRuns = async (
106+
appName: string,
107+
runId?: string
108+
) => {
106109
const response = await APIClient.get<DataInsightLatestRun>(
107-
`${BASE_URL}/name/${getEncodedFqn(appName)}/logs`
110+
`${BASE_URL}/name/${getEncodedFqn(appName)}/logs`,
111+
{
112+
params: runId ? { runId } : undefined,
113+
}
108114
);
109115

110116
return response.data;
@@ -165,10 +171,15 @@ export const stopApp = async (name: string, runId?: string) => {
165171
);
166172
};
167173

168-
export const getApplicationLogs = (appName: string, after?: string) => {
174+
export const getApplicationLogs = (
175+
appName: string,
176+
after?: string,
177+
runId?: string
178+
) => {
169179
return APIClient.get(`${BASE_URL}/name/${appName}/logs`, {
170180
params: {
171181
after,
182+
...(runId && { runId }),
172183
},
173184
});
174185
};

openmetadata-ui/src/main/resources/ui/src/utils/IngestionLogs/LogsUtils.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,23 +62,24 @@ export const getLogsFromResponse = (
6262
export const fetchLogsRecursively = async (
6363
ingestionId: string,
6464
pipelineType: string,
65-
after?: string
65+
after?: string,
66+
runId?: string
6667
) => {
6768
let logs = '';
6869

6970
const {
7071
data: { total, after: afterCursor, ...rest },
7172
} =
7273
pipelineType === PipelineType.Application
73-
? await getApplicationLogs(ingestionId, after)
74+
? await getApplicationLogs(ingestionId, after, runId)
7475
: await getIngestionPipelineLogById(ingestionId, after);
7576
logs = logs.concat(getLogsFromResponse(rest, pipelineType));
7677
if (afterCursor && total) {
7778
const progress = round((Number(afterCursor) * 100) / Number(total));
7879
useDownloadProgressStore.getState().updateProgress(progress);
7980

8081
logs = logs.concat(
81-
await fetchLogsRecursively(ingestionId, pipelineType, afterCursor)
82+
await fetchLogsRecursively(ingestionId, pipelineType, afterCursor, runId)
8283
);
8384
}
8485

@@ -101,13 +102,18 @@ export const downloadIngestionLog = async (ingestionId?: string) => {
101102
}
102103
};
103104

104-
export const downloadAppLogs = async (appName?: string) => {
105+
export const downloadAppLogs = async (appName?: string, runId?: string) => {
105106
if (!appName) {
106107
return '';
107108
}
108109

109110
try {
110-
return await fetchLogsRecursively(appName, PipelineType.Application);
111+
return await fetchLogsRecursively(
112+
appName,
113+
PipelineType.Application,
114+
undefined,
115+
runId
116+
);
111117
} catch (err) {
112118
showErrorToast(err as AxiosError);
113119

0 commit comments

Comments
 (0)