diff --git a/bootstrap/sql/migrations/native/1.14.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.14.0/mysql/schemaChanges.sql new file mode 100644 index 000000000000..13e3691aae30 --- /dev/null +++ b/bootstrap/sql/migrations/native/1.14.0/mysql/schemaChanges.sql @@ -0,0 +1,19 @@ +ALTER TABLE workflow_instance_time_series + ADD COLUMN scheduleRunId VARCHAR(36) + GENERATED ALWAYS AS (json ->> '$.scheduleRunId'); +ALTER TABLE workflow_instance_time_series + ADD INDEX idx_workflow_instance_schedule_run_id (scheduleRunId); + +ALTER TABLE workflow_instance_state_time_series + ADD COLUMN scheduleRunId VARCHAR(36) + GENERATED ALWAYS AS (json ->> '$.scheduleRunId'); +ALTER TABLE workflow_instance_state_time_series + ADD INDEX idx_workflow_instance_state_schedule_run_id (scheduleRunId); + +-- Marker: forces v1140 Java runDataMigration() to re-run via the reprocessing path on existing +-- DBs after fixing migrateWorkflowJson trigger-output bug in MigrationUtil.java. Idempotent. +DELETE FROM SERVER_MIGRATION_SQL_LOGS WHERE 1=0; + +-- Update entityLink generated column to read from global_entityList[0] instead of global_relatedEntity +ALTER TABLE workflow_instance_time_series +MODIFY COLUMN entityLink TEXT GENERATED ALWAYS AS (json ->> '$.variables.global_entityList[0]'); diff --git a/bootstrap/sql/migrations/native/1.14.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.14.0/postgres/schemaChanges.sql new file mode 100644 index 000000000000..7b889aff3b8a --- /dev/null +++ b/bootstrap/sql/migrations/native/1.14.0/postgres/schemaChanges.sql @@ -0,0 +1,20 @@ +ALTER TABLE workflow_instance_time_series + ADD COLUMN scheduleRunId VARCHAR(36) + GENERATED ALWAYS AS ((json ->> 'scheduleRunId')) STORED; +CREATE INDEX idx_workflow_instance_schedule_run_id + ON workflow_instance_time_series (scheduleRunId); + +ALTER TABLE workflow_instance_state_time_series + ADD COLUMN scheduleRunId VARCHAR(36) + GENERATED ALWAYS AS ((json ->> 'scheduleRunId')) STORED; +CREATE INDEX idx_workflow_instance_state_schedule_run_id + ON workflow_instance_state_time_series (scheduleRunId); + +-- Marker: forces v1140 Java runDataMigration() to re-run via the reprocessing path on existing +-- DBs after fixing migrateWorkflowJson trigger-output bug in MigrationUtil.java. Idempotent. +DELETE FROM SERVER_MIGRATION_SQL_LOGS WHERE 1=0; + +-- Update entityLink generated column to read from global_entityList[0] instead of global_relatedEntity +ALTER TABLE workflow_instance_time_series DROP COLUMN entityLink; +ALTER TABLE workflow_instance_time_series +ADD COLUMN entityLink TEXT GENERATED ALWAYS AS ((json -> 'variables' -> 'global_entityList' ->> 0)) STORED; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java index 09ad9274dd51..0911ad44a7c7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java @@ -18,6 +18,7 @@ public class Workflow { public static final String FALSE_ENTITY_LIST_VARIABLE = "false_entityList"; public static final String HAS_TRUE_ENTITIES_VARIABLE = "hasTrueEntities"; public static final String HAS_FALSE_ENTITIES_VARIABLE = "hasFalseEntities"; + public static final String PROCESSED_FQNS_VARIABLE = "processedFqnsInRun"; public static final String BATCH_SINK_PROCESSED_VARIABLE = "batchSinkProcessed"; public static final String TRIGGERING_OBJECT_ID_VARIABLE = "triggeringObjectId"; public static final String RECOGNIZER_FEEDBACK = "recognizerFeedback"; @@ -26,6 +27,7 @@ public class Workflow { public static final String STAGE_INSTANCE_STATE_ID_VARIABLE = "stageInstanceStateId"; public static final String WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE = "workflowInstanceExecutionId"; + public static final String WORKFLOW_SCHEDULE_RUN_ID_VARIABLE = "scheduleRunId"; public static final String WORKFLOW_RUNTIME_EXCEPTION = "workflowRuntimeException"; public static final String EXCEPTION_VARIABLE = "exception"; public static final String FAILURE_VARIABLE = "failure"; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java index 073f05f2be5d..50a21d2e1563 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java @@ -5,6 +5,8 @@ import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.PROCESS_COMPLETED_WITH_ERROR_END_EVENT; import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_SCHEDULE_RUN_ID_VARIABLE; +import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName; import java.util.HashMap; import java.util.Map; @@ -22,6 +24,7 @@ import org.openmetadata.schema.governance.workflows.WorkflowDefinition; import org.openmetadata.service.Entity; import org.openmetadata.service.governance.workflows.elements.TriggerFactory; +import org.openmetadata.service.jdbi3.NewStageRequest; import org.openmetadata.service.jdbi3.WorkflowDefinitionRepository; import org.openmetadata.service.jdbi3.WorkflowInstanceRepository; import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository; @@ -210,14 +213,18 @@ private void addFailureStage( Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE); UUID executionId = UUID.nameUUIDFromBytes(processInstanceId.getBytes()); + UUID scheduleRunId = readScheduleRunIdFromProcess(processInstanceId); UUID stageId = stateRepository.addNewStageToInstance( - WORKFLOW_FAILURE_LISTENER_STAGE, - executionId, - workflowInstanceId, - workflowName, - System.currentTimeMillis()); + new NewStageRequest( + WORKFLOW_FAILURE_LISTENER_STAGE, + executionId, + workflowInstanceId, + workflowName, + System.currentTimeMillis(), + scheduleRunId, + null)); Map stageData = new HashMap<>(); stageData.put("status", "FAILED"); @@ -225,7 +232,7 @@ private void addFailureStage( stageData.put("processInstanceId", processInstanceId); stageData.put("exception", errorMessage); - stateRepository.updateStage(stageId, System.currentTimeMillis(), stageData); + stateRepository.updateStage(stageId, System.currentTimeMillis(), null, stageData); LOG.info( "[WorkflowFailure] FAILURE_STAGE_ADDED: workflowInstanceId={}, stageId={}", @@ -241,6 +248,28 @@ private void addFailureStage( } } + private UUID readScheduleRunIdFromProcess(String processInstanceId) { + try { + RuntimeService runtimeService = ProcessEngines.getDefaultProcessEngine().getRuntimeService(); + Object plain = + runtimeService.getVariable(processInstanceId, WORKFLOW_SCHEDULE_RUN_ID_VARIABLE); + if (plain != null) { + return WorkflowScheduleRunIdReader.toUuid(plain); + } + Object namespaced = + runtimeService.getVariable( + processInstanceId, + getNamespacedVariableName(GLOBAL_NAMESPACE, WORKFLOW_SCHEDULE_RUN_ID_VARIABLE)); + return namespaced != null ? WorkflowScheduleRunIdReader.toUuid(namespaced) : null; + } catch (Exception e) { + LOG.debug( + "[WorkflowFailure] Could not read scheduleRunId for process {}: {}", + processInstanceId, + e.getMessage()); + return null; + } + } + private boolean isStageStatusEnabled(String workflowDefinitionKey) { try { WorkflowDefinitionRepository workflowDefRepository = diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java index 49137c437668..6dff9a2b618b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java @@ -1,5 +1,7 @@ package org.openmetadata.service.governance.workflows; +import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; +import static org.openmetadata.service.governance.workflows.Workflow.UPDATED_BY_VARIABLE; import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName; import static org.openmetadata.service.governance.workflows.elements.TriggerFactory.getTriggerWorkflowId; @@ -1109,17 +1111,22 @@ public boolean isActivityWithVariableExecuting( } public boolean triggerWorkflow(String workflowName) { + return triggerWorkflow(workflowName, null); + } + + public boolean triggerWorkflow(String workflowName, String triggeredBy) { RuntimeService runtimeService = processEngine.getRuntimeService(); RepositoryService repositoryService = processEngine.getRepositoryService(); String baseProcessKey = getTriggerWorkflowId(workflowName); + Map initialVars = buildInitialVariables(triggeredBy); // Prefer the current workflow definition config to avoid triggering stale process keys left // behind by older deployments. List configuredTriggerKeys = getConfiguredPeriodicTriggerProcessKeys(workflowName, baseProcessKey); if (!configuredTriggerKeys.isEmpty()) { - return triggerProcessDefinitions(runtimeService, configuredTriggerKeys); + return triggerProcessDefinitions(runtimeService, configuredTriggerKeys, initialVars); } // Legacy fallback: trigger all latest process definitions matching the workflow prefix. @@ -1131,12 +1138,14 @@ public boolean triggerWorkflow(String workflowName) { .list(); if (!processDefinitions.isEmpty()) { return triggerProcessDefinitions( - runtimeService, processDefinitions.stream().map(ProcessDefinition::getKey).toList()); + runtimeService, + processDefinitions.stream().map(ProcessDefinition::getKey).toList(), + initialVars); } // Fallback to original behavior for non-periodic trigger types. try { - runtimeService.startProcessInstanceByKey(baseProcessKey); + runtimeService.startProcessInstanceByKey(baseProcessKey, initialVars); return true; } catch (FlowableObjectNotFoundException ex) { LOG.error("No process definition found for key: {}", baseProcessKey); @@ -1144,13 +1153,21 @@ public boolean triggerWorkflow(String workflowName) { } } + private Map buildInitialVariables(String triggeredBy) { + Map vars = new HashMap<>(); + if (triggeredBy != null && !triggeredBy.isEmpty()) { + vars.put(getNamespacedVariableName(GLOBAL_NAMESPACE, UPDATED_BY_VARIABLE), triggeredBy); + } + return vars; + } + private boolean triggerProcessDefinitions( - RuntimeService runtimeService, List processKeys) { + RuntimeService runtimeService, List processKeys, Map initialVars) { boolean anyStarted = false; for (String processKey : processKeys) { try { LOG.info("Triggering process with key: {}", processKey); - runtimeService.startProcessInstanceByKey(processKey); + runtimeService.startProcessInstanceByKey(processKey, initialVars); anyStarted = true; } catch (Exception e) { LOG.error("Failed to start process: {}", processKey, e); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java index 158215867995..baedc28b6215 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java @@ -97,21 +97,23 @@ private void addWorkflowInstance( DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) { String processKey = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); String workflowDefinitionName = getMainWorkflowDefinitionNameFromTrigger(processKey); - if (workflowDefinitionName.equals(processKey)) { + String existingKey = execution.getProcessInstanceBusinessKey(); + if (existingKey != null && !existingKey.isEmpty()) { LOG.debug( - "[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - process key '{}' is not an OM trigger workflow, skipping", - execution.getProcessInstanceId(), - processKey); + "[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - business key already set by trigger, skipping", + execution.getProcessInstanceId()); return; } updateBusinessKey(execution.getProcessInstanceId()); UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey()); + UUID scheduleRunId = WorkflowScheduleRunIdReader.readFrom(execution); workflowInstanceRepository.addNewWorkflowInstance( workflowDefinitionName, workflowInstanceId, System.currentTimeMillis(), - execution.getVariables()); + execution.getVariables(), + scheduleRunId); LOG.debug( "[WORKFLOW_INSTANCE_CREATED] Workflow: {}, InstanceId: {}, ProcessInstance: {} - Workflow instance record created successfully", workflowDefinitionName, @@ -123,14 +125,14 @@ private void updateWorkflowInstance( DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) { String processKey = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); String workflowDefinitionName = getMainWorkflowDefinitionNameFromTrigger(processKey); - if (workflowDefinitionName.equals(processKey)) { + String businessKeyForUpdate = execution.getProcessInstanceBusinessKey(); + if (businessKeyForUpdate == null || businessKeyForUpdate.isEmpty()) { LOG.debug( - "[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - process key '{}' is not an OM trigger workflow, skipping", - execution.getProcessInstanceId(), - processKey); + "[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - no business key, skipping update", + execution.getProcessInstanceId()); return; } - UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey()); + UUID workflowInstanceId = UUID.fromString(businessKeyForUpdate); // Capture all variables including any failure indicators java.util.Map variables = new java.util.HashMap<>(execution.getVariables()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java index 2d80b6430a18..bf9ab904eb9b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java @@ -5,12 +5,20 @@ import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE; import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; +import org.openmetadata.schema.governance.workflows.WorkflowDefinition; +import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface; +import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.service.Entity; +import org.openmetadata.service.exception.EntityNotFoundException; +import org.openmetadata.service.jdbi3.NewStageRequest; +import org.openmetadata.service.jdbi3.WorkflowDefinitionRepository; import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository; @Slf4j @@ -89,11 +97,14 @@ public void execute(DelegateExecution execution) { // Create a failed stage record UUID stageId = workflowInstanceStateRepository.addNewStageToInstance( - stage + "_failed", - executionId, - workflowInstanceId, - workflowName, - System.currentTimeMillis()); + new NewStageRequest( + stage + "_failed", + executionId, + workflowInstanceId, + workflowName, + System.currentTimeMillis(), + WorkflowScheduleRunIdReader.readFrom(execution), + null)); java.util.Map failureData = new java.util.HashMap<>(); failureData.put("status", "FAILED"); @@ -101,7 +112,7 @@ public void execute(DelegateExecution execution) { failureData.put("errorClass", exc.getClass().getSimpleName()); workflowInstanceStateRepository.updateStage( - stageId, System.currentTimeMillis(), failureData); + stageId, System.currentTimeMillis(), null, failureData); LOG.warn( "[STAGE_FAILED_RECORDED] Workflow: {}, ProcessInstance: {}, Stage: {}, StageId: {} - Failed stage recorded in database", @@ -167,13 +178,19 @@ private void addNewStage( String stage = Optional.ofNullable(execution.getCurrentActivityId()).orElse(workflowDefinitionName); + WorkflowDefinition workflowDefinition = fetchWorkflowDefinition(workflowDefinitionName); + List entityList = resolveStageEntityList(workflowDefinition, stage, varHandler); UUID workflowInstanceStateId = workflowInstanceStateRepository.addNewStageToInstance( - stage, - workflowInstanceExecutionId, - workflowInstanceId, - workflowDefinitionName, - System.currentTimeMillis()); + new NewStageRequest( + stage, + workflowInstanceExecutionId, + workflowInstanceId, + workflowDefinitionName, + System.currentTimeMillis(), + WorkflowScheduleRunIdReader.readFrom(execution), + entityList), + workflowDefinition); varHandler.setNodeVariable(STAGE_INSTANCE_STATE_ID_VARIABLE, workflowInstanceStateId); LOG.debug( "[STAGE_CREATED] Workflow: {}, ProcessInstance: {}, Stage: {}, StageId: {} - Stage record created successfully", @@ -183,6 +200,56 @@ private void addNewStage( workflowInstanceStateId); } + private WorkflowDefinition fetchWorkflowDefinition(String workflowDefinitionName) { + WorkflowDefinitionRepository workflowDefinitionRepository = + (WorkflowDefinitionRepository) Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION); + return workflowDefinitionRepository.getByNameForStageProcessing(workflowDefinitionName); + } + + @SuppressWarnings("unchecked") + private Map extractInputNamespaceMap( + WorkflowDefinition workflowDefinition, String stageName) { + if (workflowDefinition.getNodes() == null) { + return null; + } + return workflowDefinition.getNodes().stream() + .filter(n -> stageName.equals(n.getName())) + .findFirst() + .map(WorkflowNodeDefinitionInterface::getInputNamespaceMap) + .map(m -> (Map) JsonUtils.readOrConvertValue(m, Map.class)) + .orElse(null); + } + + private List resolveStageEntityList( + WorkflowDefinition workflowDefinition, String stageName, WorkflowVariableHandler varHandler) { + try { + Map inputNamespaceMap = + extractInputNamespaceMap(workflowDefinition, stageName); + if (inputNamespaceMap == null) { + return null; + } + return WorkflowVariableHandler.getEntityList(inputNamespaceMap, varHandler); + } catch (EntityNotFoundException e) { + LOG.debug("Could not resolve entityList for stage '{}': {}", stageName, e.getMessage()); + return null; + } + } + + private String resolveStageUpdatedBy( + WorkflowDefinition workflowDefinition, String stageName, Map variables) { + try { + Map inputNamespaceMap = + extractInputNamespaceMap(workflowDefinition, stageName); + if (inputNamespaceMap == null) { + return null; + } + return WorkflowVariableHandler.getUpdatedByFromVariables(inputNamespaceMap, variables); + } catch (EntityNotFoundException e) { + LOG.debug("Could not resolve updatedBy for stage '{}': {}", stageName, e.getMessage()); + return null; + } + } + private void updateStage( WorkflowVariableHandler varHandler, DelegateExecution execution, @@ -205,8 +272,11 @@ private void updateStage( return; } + Map variables = execution.getVariables(); + WorkflowDefinition workflowDefinition = fetchWorkflowDefinition(workflowDefinitionName); + String updatedBy = resolveStageUpdatedBy(workflowDefinition, stage, variables); workflowInstanceStateRepository.updateStage( - workflowInstanceStateId, System.currentTimeMillis(), execution.getVariables()); + workflowInstanceStateId, System.currentTimeMillis(), updatedBy, variables); LOG.debug( "[STAGE_UPDATED] Workflow: {}, ProcessInstance: {}, Stage: {}, StageId: {} - Stage completion recorded", diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdReader.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdReader.java new file mode 100644 index 000000000000..66e9d7c2c83e --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdReader.java @@ -0,0 +1,40 @@ +package org.openmetadata.service.governance.workflows; + +import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_SCHEDULE_RUN_ID_VARIABLE; +import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName; + +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.flowable.engine.delegate.DelegateExecution; + +@Slf4j +public final class WorkflowScheduleRunIdReader { + private WorkflowScheduleRunIdReader() {} + + public static UUID readFrom(DelegateExecution execution) { + Object plain = execution.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE); + if (plain != null) { + return toUuid(plain); + } + Object namespaced = + execution.getVariable( + getNamespacedVariableName(GLOBAL_NAMESPACE, WORKFLOW_SCHEDULE_RUN_ID_VARIABLE)); + return namespaced != null ? toUuid(namespaced) : null; + } + + static UUID toUuid(Object value) { + if (value instanceof UUID uuid) { + return uuid; + } + try { + return UUID.fromString(value.toString()); + } catch (IllegalArgumentException e) { + LOG.warn( + "[WorkflowScheduleRunIdReader] Invalid scheduleRunId value '{}': {}", + value, + e.getMessage()); + return null; + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdSetterListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdSetterListener.java new file mode 100644 index 000000000000..5dbb01b81484 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdSetterListener.java @@ -0,0 +1,24 @@ +package org.openmetadata.service.governance.workflows; + +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_SCHEDULE_RUN_ID_VARIABLE; + +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.JavaDelegate; + +@Slf4j +public class WorkflowScheduleRunIdSetterListener implements JavaDelegate { + @Override + public void execute(DelegateExecution execution) { + if (execution.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE) != null) { + return; + } + UUID scheduleRunId = UUID.randomUUID(); + execution.setVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE, scheduleRunId); + LOG.debug( + "[SCHEDULE_RUN_ID_SET] ProcessInstance: {} - scheduleRunId: {}", + execution.getProcessInstanceId(), + scheduleRunId); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java index 16d0c117b035..622afd8d506e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java @@ -5,6 +5,7 @@ import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.UPDATED_BY_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_SCHEDULE_RUN_ID_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName; @@ -18,6 +19,7 @@ import org.flowable.bpmn.model.CallActivity; import org.flowable.bpmn.model.EndEvent; import org.flowable.bpmn.model.FieldExtension; +import org.flowable.bpmn.model.FlowableListener; import org.flowable.bpmn.model.IOParameter; import org.flowable.bpmn.model.MultiInstanceLoopCharacteristics; import org.flowable.bpmn.model.Process; @@ -30,12 +32,14 @@ import org.openmetadata.schema.governance.workflows.elements.triggers.PeriodicBatchEntityTriggerDefinition; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.service.apps.scheduler.AppScheduler; +import org.openmetadata.service.governance.workflows.WorkflowScheduleRunIdSetterListener; import org.openmetadata.service.governance.workflows.elements.TriggerInterface; import org.openmetadata.service.governance.workflows.elements.triggers.impl.CommitChangeEventOffsetImpl; import org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchChangeEventsImpl; import org.openmetadata.service.governance.workflows.flowable.builders.CallActivityBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder; +import org.openmetadata.service.governance.workflows.flowable.builders.FlowableListenerBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.MultiInstanceLoopCharacteristicsBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder; @@ -68,7 +72,7 @@ public PeriodicBatchEntityTrigger( Process process = new Process(); process.setId(processId); process.setName(processId); - attachWorkflowInstanceListeners(process); + attachScheduleRunIdListener(process); Optional oTimerDefinition = Optional.ofNullable(getTimerEventDefinition(triggerDefinition.getConfig().getSchedule())); @@ -110,13 +114,20 @@ public PeriodicBatchEntityTrigger( } } + private void attachScheduleRunIdListener(Process process) { + FlowableListener listener = + new FlowableListenerBuilder() + .event("start") + .implementation(WorkflowScheduleRunIdSetterListener.class.getName()) + .build(); + process.getExecutionListeners().add(listener); + } + private TimerEventDefinition getTimerEventDefinition(AppSchedule schedule) { if (schedule.getScheduleTimeline().equals(ScheduleTimeline.NONE)) { return null; } - // TODO: Using the AppScheduler logic to craft a Flowable compatible Cron Expression. Eventually - // we should probably avoid this to be dependent that code. CronTrigger cronTrigger = (CronTrigger) AppScheduler.getCronSchedule(schedule).build(); TimerEventDefinition timerDefinition = new TimerEventDefinition(); @@ -156,20 +167,35 @@ private CallActivity getWorkflowTriggerCallActivity( updatedByParameter.setSource(getNamespacedVariableName(GLOBAL_NAMESPACE, UPDATED_BY_VARIABLE)); updatedByParameter.setTarget(getNamespacedVariableName(GLOBAL_NAMESPACE, UPDATED_BY_VARIABLE)); + IOParameter scheduleRunIdParameter = new IOParameter(); + scheduleRunIdParameter.setSource(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE); + scheduleRunIdParameter.setTarget( + getNamespacedVariableName(GLOBAL_NAMESPACE, WORKFLOW_SCHEDULE_RUN_ID_VARIABLE)); + List inParameters; if (singleExecution) { IOParameter entityListParameter = new IOParameter(); entityListParameter.setSource(ENTITY_LIST_VARIABLE); entityListParameter.setTarget( getNamespacedVariableName(GLOBAL_NAMESPACE, ENTITY_LIST_VARIABLE)); - inParameters = List.of(relatedEntityParameter, entityListParameter, updatedByParameter); + inParameters = + List.of( + relatedEntityParameter, + entityListParameter, + updatedByParameter, + scheduleRunIdParameter); } else { IOParameter entityListParameter = new IOParameter(); entityListParameter.setSourceExpression( String.format("${entityToListMap[%s]}", RELATED_ENTITY_VARIABLE)); entityListParameter.setTarget( getNamespacedVariableName(GLOBAL_NAMESPACE, ENTITY_LIST_VARIABLE)); - inParameters = List.of(relatedEntityParameter, entityListParameter, updatedByParameter); + inParameters = + List.of( + relatedEntityParameter, + entityListParameter, + updatedByParameter, + scheduleRunIdParameter); } workflowTrigger.setInParameters(inParameters); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/CommitChangeEventOffsetImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/CommitChangeEventOffsetImpl.java index 992730ddb3e9..da5dd06065f8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/CommitChangeEventOffsetImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/CommitChangeEventOffsetImpl.java @@ -24,9 +24,11 @@ public class CommitChangeEventOffsetImpl implements JavaDelegate { public void execute(DelegateExecution execution) { String workflowFqn = (String) workflowFqnExpr.getValue(execution); String entityType = (String) entityTypeExpr.getValue(execution); - String consumerId = buildConsumerId(workflowFqn, entityType); - Long maxProcessedOffset = (Long) execution.getVariable(MAX_PROCESSED_OFFSET_VARIABLE); + commitOffset(workflowFqn, entityType, maxProcessedOffset); + } + + static void commitOffset(String workflowFqn, String entityType, Long maxProcessedOffset) { if (maxProcessedOffset == null) { LOG.debug( "No events processed for workflow '{}' entity type '{}'. Offset not updated.", @@ -34,12 +36,11 @@ public void execute(DelegateExecution execution) { entityType); return; } - + String consumerId = buildConsumerId(workflowFqn, entityType); String existingJson = Entity.getCollectionDAO() .eventSubscriptionDAO() .getSubscriberExtension(consumerId, OFFSET_EXTENSION); - if (existingJson != null) { EventSubscriptionOffset existing = JsonUtils.readValue(existingJson, EventSubscriptionOffset.class); @@ -52,19 +53,16 @@ public void execute(DelegateExecution execution) { return; } } - EventSubscriptionOffset newOffset = new EventSubscriptionOffset() .withStartingOffset(maxProcessedOffset) .withCurrentOffset(maxProcessedOffset) .withTimestamp(System.currentTimeMillis()); - Entity.getCollectionDAO() .eventSubscriptionDAO() .upsertSubscriberExtension( consumerId, OFFSET_EXTENSION, OFFSET_JSON_SCHEMA, JsonUtils.pojoToJson(newOffset)); - - LOG.info( + LOG.debug( "Committed offset {} for workflow '{}' entity type '{}'.", maxProcessedOffset, workflowFqn, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImpl.java index 3e3e7e1d094d..88bba87a2b90 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImpl.java @@ -3,6 +3,7 @@ import static org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer.OFFSET_EXTENSION; import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; +import static org.openmetadata.service.governance.workflows.Workflow.PROCESSED_FQNS_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.TASK_RETRY_CONFIG; import static org.openmetadata.service.governance.workflows.Workflow.UPDATED_BY_VARIABLE; import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName; @@ -16,6 +17,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -42,6 +44,7 @@ public class FetchChangeEventsImpl implements JavaDelegate { static final String CURRENT_BATCH_OFFSET_VARIABLE = "currentBatchOffset"; static final String MAX_PROCESSED_OFFSET_VARIABLE = "maxProcessedOffset"; + static final int PROCESSED_FQNS_MAX_SIZE = 10_000; private static final String CARDINALITY_VARIABLE = "numberOfEntities"; private static final Set ENTITIES_NEEDING_KEYWORD_FQN = @@ -112,6 +115,9 @@ public void execute(DelegateExecution execution) { Map fqnToMaxOffset = deduplicateByFqn(records); + LinkedHashMap processedFqns = loadProcessedFqns(execution); + fqnToMaxOffset.keySet().removeAll(processedFqns.keySet()); + String searchFilter = Optional.ofNullable(searchFilterExpr) .map(expr -> (String) expr.getValue(execution)) @@ -124,27 +130,26 @@ public void execute(DelegateExecution execution) { fqnToMaxOffset.entrySet().removeIf(e -> !matchingFqns.contains(e.getKey())); } + for (String fqn : fqnToMaxOffset.keySet()) { + processedFqns.put(fqn, Boolean.TRUE); + } + evictOverflow(processedFqns); + List entityList = new ArrayList<>(); Map> entityToListMap = new HashMap<>(); - - for (Map.Entry entry : fqnToMaxOffset.entrySet()) { - String fqn = entry.getKey(); + for (String fqn : fqnToMaxOffset.keySet()) { String entityLink = new MessageParser.EntityLink(entityType, fqn).getLinkString(); entityList.add(entityLink); entityToListMap.put(entityLink, List.of(entityLink)); } - // hasFinished is true only when no more change events exist (records fetch returned empty). - // A batch where all entities were filtered out is NOT finished — the cursor still advances - // via batchMaxOffset, and the loop runs the workflow trigger with zero iterations before - // fetching the next batch. - boolean hasFinished = records.isEmpty(); - + execution.setVariable(PROCESSED_FQNS_VARIABLE, processedFqns); execution.setVariable(CURRENT_BATCH_OFFSET_VARIABLE, batchMaxOffset); Long existingMax = (Long) execution.getVariable(MAX_PROCESSED_OFFSET_VARIABLE); if (existingMax == null || batchMaxOffset > existingMax) { execution.setVariable(MAX_PROCESSED_OFFSET_VARIABLE, batchMaxOffset); + CommitChangeEventOffsetImpl.commitOffset(workflowFqn, entityType, batchMaxOffset); } String updatedByVar = getNamespacedVariableName(GLOBAL_NAMESPACE, UPDATED_BY_VARIABLE); @@ -152,7 +157,7 @@ public void execute(DelegateExecution execution) { execution.setVariable(updatedByVar, "governance-bot"); } execution.setVariable(CARDINALITY_VARIABLE, entityList.size()); - execution.setVariable(HAS_FINISHED_VARIABLE, hasFinished); + execution.setVariable(HAS_FINISHED_VARIABLE, records.isEmpty()); execution.setVariable(ENTITY_LIST_VARIABLE, entityList); execution.setVariable("entityToListMap", entityToListMap); } @@ -316,4 +321,24 @@ static String buildConsumerId(String workflowFqn, String entityType) { } return workflowFqn + "Trigger-" + entityType; } + + @SuppressWarnings("unchecked") + private LinkedHashMap loadProcessedFqns(DelegateExecution execution) { + Object stored = execution.getVariable(PROCESSED_FQNS_VARIABLE); + return stored instanceof LinkedHashMap + ? (LinkedHashMap) stored + : new LinkedHashMap<>(); + } + + private static void evictOverflow(LinkedHashMap cache) { + int overflow = cache.size() - PROCESSED_FQNS_MAX_SIZE; + if (overflow <= 0) { + return; + } + Iterator it = cache.keySet().iterator(); + for (int i = 0; i < overflow; i++) { + it.next(); + it.remove(); + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java index cd1133611264..8776c95d5493 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/MainWorkflow.java @@ -25,11 +25,13 @@ import org.openmetadata.schema.governance.workflows.elements.EdgeDefinition; import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface; import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.governance.workflows.WorkflowInstanceListener; import org.openmetadata.service.governance.workflows.elements.Edge; import org.openmetadata.service.governance.workflows.elements.NodeFactory; import org.openmetadata.service.governance.workflows.elements.NodeInterface; import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl.DataCompletenessImpl; import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent; +import org.openmetadata.service.governance.workflows.flowable.builders.FlowableListenerBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.InclusiveGatewayBuilder; @Getter @@ -51,6 +53,16 @@ public MainWorkflow(WorkflowDefinition workflowDefinition) { process.setName( Optional.ofNullable(workflowDefinition.getDisplayName()) .orElse(workflowDefinition.getFullyQualifiedName())); + + for (String event : List.of("start", "end")) { + org.flowable.bpmn.model.FlowableListener listener = + new FlowableListenerBuilder() + .event(event) + .implementation(WorkflowInstanceListener.class.getName()) + .build(); + process.getExecutionListeners().add(listener); + } + model.addProcess(process); List edges = workflowDefinition.getEdges(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ListFilter.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ListFilter.java index 2ff1d3fbf18e..13e2be6cf130 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ListFilter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ListFilter.java @@ -73,6 +73,7 @@ public String getCondition(String tableName) { conditions.add(getProviderCondition(tableName)); conditions.add(getEntityStatusCondition(tableName)); conditions.add(getServerIdCondition(tableName)); + conditions.add(getScheduleRunIdCondition()); String condition = addCondition(conditions); return condition.isEmpty() ? "WHERE TRUE" : "WHERE " + condition; } @@ -407,6 +408,11 @@ private String getServerIdCondition(String tableName) { : "serverId = :serverId"; } + private String getScheduleRunIdCondition() { + String scheduleRunId = queryParams.get("scheduleRunId"); + return scheduleRunId == null ? "" : "scheduleRunId = :scheduleRunId"; + } + private String getEntityFQNHashCondition() { String entityFQN = getQueryParam("entityFQNHash"); return entityFQN == null ? "" : "entityFQNHash = :entityFQNHash"; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/NewStageRequest.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/NewStageRequest.java new file mode 100644 index 000000000000..6606be8fd393 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/NewStageRequest.java @@ -0,0 +1,13 @@ +package org.openmetadata.service.jdbi3; + +import java.util.List; +import java.util.UUID; + +public record NewStageRequest( + String workflowInstanceStage, + UUID workflowInstanceExecutionId, + UUID workflowInstanceId, + String workflowDefinitionName, + Long startedAt, + UUID scheduleRunId, + List entityList) {} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepository.java index 1da86af80e00..e7765dbc8174 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepository.java @@ -1,9 +1,13 @@ package org.openmetadata.service.jdbi3; +import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; +import static org.openmetadata.service.governance.workflows.Workflow.PROCESSED_FQNS_VARIABLE; import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -33,18 +37,23 @@ public void addNewWorkflowInstance( String workflowDefinitionName, UUID workflowInstanceId, Long startedAt, - Map variables) { + Map variables, + UUID scheduleRunId) { WorkflowDefinitionRepository workflowDefinitionRepository = (WorkflowDefinitionRepository) Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION); UUID workflowDefinitionId = workflowDefinitionRepository.getIdFromName(workflowDefinitionName); + List entityList = extractEntityList(variables); + createNewRecord( new WorkflowInstance() .withId(workflowInstanceId) .withWorkflowDefinitionId(workflowDefinitionId) + .withScheduleRunId(scheduleRunId) .withStartedAt(startedAt) .withStatus(WorkflowInstance.WorkflowStatus.RUNNING) .withVariables(variables) + .withEntityList(entityList) .withTimestamp(System.currentTimeMillis()), workflowDefinitionName); } @@ -55,6 +64,7 @@ public void updateWorkflowInstance( JsonUtils.readValue(timeSeriesDao.getById(workflowInstanceId), WorkflowInstance.class); workflowInstance.setEndedAt(endedAt); + workflowInstance.setEntityList(extractFinalEntityList(variables)); WorkflowInstanceStateRepository workflowInstanceStateRepository = (WorkflowInstanceStateRepository) @@ -70,6 +80,7 @@ public void updateWorkflowInstance( } workflowInstance.setStatus(workflowStatus); + workflowInstance.setUpdatedBy(WorkflowInstanceRepository.extractUpdatedByFromStates(states)); Optional oException = Optional.ofNullable( @@ -84,6 +95,39 @@ public void updateWorkflowInstance( getTimeSeriesDao().update(JsonUtils.pojoToJson(workflowInstance), workflowInstanceId); } + private List extractFinalEntityList(Map variables) { + List fromProcessedFqns = extractEntityListFromProcessedFqns(variables); + if (fromProcessedFqns != null && !fromProcessedFqns.isEmpty()) { + return fromProcessedFqns; + } + return extractEntityList(variables); + } + + @SuppressWarnings("unchecked") + private List extractEntityListFromProcessedFqns(Map variables) { + Object obj = variables.get(PROCESSED_FQNS_VARIABLE); + if (obj instanceof Map map && !map.isEmpty()) { + return new ArrayList<>(((Map) map).keySet()); + } + return null; + } + + @SuppressWarnings("unchecked") + private List extractEntityList(Map variables) { + Object obj = variables.get(getNamespacedVariableName(GLOBAL_NAMESPACE, ENTITY_LIST_VARIABLE)); + return obj instanceof List ? (List) obj : null; + } + + static String extractUpdatedByFromStates(List states) { + return states.stream() + .filter(s -> s.getStage() != null && s.getStage().getUpdatedBy() != null) + .max( + Comparator.comparingLong( + s -> Optional.ofNullable(s.getStage().getEndedAt()).orElse(0L))) + .map(s -> s.getStage().getUpdatedBy()) + .orElse(null); + } + /** * Marks a workflow instance as FAILED with the given reason. * Preserves audit trail instead of deleting the instance. diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceStateRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceStateRepository.java index f59b9cdd86a8..d828837711fe 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceStateRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceStateRepository.java @@ -93,52 +93,50 @@ private UUID getStateId(UUID workflowInstanceId, String workflowInstanceStage) { return id; } - public UUID addNewStageToInstance( - String workflowInstanceStage, - UUID workflowInstanceExecutionId, - UUID workflowInstanceId, - String workflowDefinitionName, - Long startedAt) { - + public UUID addNewStageToInstance(NewStageRequest request) { WorkflowDefinitionRepository workflowDefinitionRepository = (WorkflowDefinitionRepository) Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION); - // Efficiently get the workflow definition in a single DB call and extract both ID and - // displayName WorkflowDefinition workflowDefinition = - workflowDefinitionRepository.getByNameForStageProcessing(workflowDefinitionName); - String displayName = getStageDisplayName(workflowDefinition, workflowInstanceStage); - - Stage stage = - new Stage() - .withName(workflowInstanceStage) - .withDisplayName(displayName) - .withStartedAt(startedAt); - - WorkflowInstanceState entityRecord = - new WorkflowInstanceState() - .withStage(stage) - .withWorkflowInstanceExecutionId(workflowInstanceExecutionId) - .withWorkflowInstanceId(workflowInstanceId) - .withTimestamp(System.currentTimeMillis()) - .withStatus(WorkflowInstance.WorkflowStatus.RUNNING) - .withWorkflowDefinitionId(workflowDefinition.getId()); - - UUID stateId = getStateId(workflowInstanceId, workflowInstanceStage); + workflowDefinitionRepository.getByNameForStageProcessing(request.workflowDefinitionName()); + return addNewStageToInstance(request, workflowDefinition); + } + public UUID addNewStageToInstance( + NewStageRequest request, WorkflowDefinition workflowDefinition) { + String displayName = getStageDisplayName(workflowDefinition, request.workflowInstanceStage()); + WorkflowInstanceState entityRecord = buildStateRecord(request, workflowDefinition, displayName); + UUID stateId = getStateId(request.workflowInstanceId(), request.workflowInstanceStage()); if (stateId != null) { entityRecord.withId(stateId); } - entityRecord = createOrUpdateRecord( entityRecord, - buildWorkflowInstanceFqn(workflowDefinitionName, workflowInstanceId.toString())); - + buildWorkflowInstanceFqn( + request.workflowDefinitionName(), request.workflowInstanceId().toString())); return entityRecord.getId(); } + private WorkflowInstanceState buildStateRecord( + NewStageRequest request, WorkflowDefinition workflowDefinition, String displayName) { + Stage stage = + new Stage() + .withName(request.workflowInstanceStage()) + .withDisplayName(displayName) + .withStartedAt(request.startedAt()) + .withEntityList(request.entityList()); + return new WorkflowInstanceState() + .withStage(stage) + .withWorkflowInstanceExecutionId(request.workflowInstanceExecutionId()) + .withWorkflowInstanceId(request.workflowInstanceId()) + .withScheduleRunId(request.scheduleRunId()) + .withTimestamp(System.currentTimeMillis()) + .withStatus(WorkflowInstance.WorkflowStatus.RUNNING) + .withWorkflowDefinitionId(workflowDefinition.getId()); + } + public void updateStage( - UUID workflowInstanceStateId, Long endedAt, Map variables) { + UUID workflowInstanceStateId, Long endedAt, String updatedBy, Map variables) { WorkflowInstanceState workflowInstanceState = JsonUtils.readValue( timeSeriesDao.getById(workflowInstanceStateId), WorkflowInstanceState.class); @@ -146,6 +144,9 @@ public void updateStage( Stage stage = workflowInstanceState.getStage(); stage.setEndedAt(endedAt); stage.setVariables(variables); + if (updatedBy != null) { + stage.setUpdatedBy(updatedBy); + } workflowInstanceState.setStage(stage); workflowInstanceState.setStatus(WorkflowInstance.WorkflowStatus.FINISHED); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowDefinitionResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowDefinitionResource.java index e52a68bd1c84..a5e9b4b5c636 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowDefinitionResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowDefinitionResource.java @@ -622,7 +622,8 @@ public Response trigger( .build(); } - boolean triggerResponse = WorkflowHandler.getInstance().triggerWorkflow(fqn); + String triggeredBy = securityContext.getUserPrincipal().getName(); + boolean triggerResponse = WorkflowHandler.getInstance().triggerWorkflow(fqn, triggeredBy); if (triggerResponse) { return Response.status(Response.Status.OK) .entity( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowInstanceResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowInstanceResource.java index 0de3f9628b72..f377c813157b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowInstanceResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowInstanceResource.java @@ -113,7 +113,13 @@ public ResultList list( String workflowDefinitionName, @Parameter(description = "Entity Link", schema = @Schema(type = "String")) @QueryParam("entityLink") - String entityLink) { + String entityLink, + @Parameter( + description = + "Filter by scheduleRunId to retrieve all instances from one periodic trigger fire", + schema = @Schema(type = "String")) + @QueryParam("scheduleRunId") + String scheduleRunId) { OperationContext operationContext = new OperationContext(Entity.WORKFLOW_DEFINITION, MetadataOperation.VIEW_ALL); ResourceContextInterface resourceContext = ReportDataContext.builder().build(); @@ -124,6 +130,9 @@ public ResultList list( if (entityLink != null) { filter.addQueryParam("entityLink", entityLink); } + if (scheduleRunId != null) { + filter.addQueryParam("scheduleRunId", scheduleRunId); + } return repository.list(offset, startTs, endTs, limitParam, filter, latest); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowInstanceStateResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowInstanceStateResource.java index 336aa0b85bac..89186967acf4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowInstanceStateResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowInstanceStateResource.java @@ -103,14 +103,22 @@ public ResultList list( schema = @Schema(type = "Boolean")) @DefaultValue("false") @QueryParam("latest") - Boolean latest) { + Boolean latest, + @Parameter( + description = + "Filter by scheduleRunId to retrieve all states from one periodic trigger fire", + schema = @Schema(type = "String")) + @QueryParam("scheduleRunId") + String scheduleRunId) { OperationContext operationContext = new OperationContext(Entity.WORKFLOW_DEFINITION, MetadataOperation.VIEW_ALL); ResourceContextInterface resourceContext = ReportDataContext.builder().build(); authorizer.authorize(securityContext, operationContext, resourceContext); ListFilter filter = new ListFilter(null); - + if (scheduleRunId != null) { + filter.addQueryParam("scheduleRunId", scheduleRunId); + } return repository.list(offset, startTs, endTs, limitParam, filter, latest); } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdReaderTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdReaderTest.java new file mode 100644 index 000000000000..d2c668a0ce58 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdReaderTest.java @@ -0,0 +1,98 @@ +/* + * Copyright 2024 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.governance.workflows; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.when; +import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_SCHEDULE_RUN_ID_VARIABLE; +import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName; + +import java.util.UUID; +import org.flowable.engine.delegate.DelegateExecution; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +class WorkflowScheduleRunIdReaderTest { + + @Mock private DelegateExecution execution; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + } + + @Test + void readFrom_plainUuidVariable_returnsUuid() { + UUID id = UUID.randomUUID(); + when(execution.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE)).thenReturn(id); + + assertEquals(id, WorkflowScheduleRunIdReader.readFrom(execution)); + } + + @Test + void readFrom_uuidStoredAsString_parsesAndReturnsUuid() { + UUID id = UUID.randomUUID(); + when(execution.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE)).thenReturn(id.toString()); + + assertEquals(id, WorkflowScheduleRunIdReader.readFrom(execution)); + } + + @Test + void readFrom_namespacedVariable_returnsUuid() { + UUID id = UUID.randomUUID(); + String namespacedKey = + getNamespacedVariableName(GLOBAL_NAMESPACE, WORKFLOW_SCHEDULE_RUN_ID_VARIABLE); + when(execution.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE)).thenReturn(null); + when(execution.getVariable(namespacedKey)).thenReturn(id); + + assertEquals(id, WorkflowScheduleRunIdReader.readFrom(execution)); + } + + @Test + void readFrom_neitherVariablePresent_returnsNull() { + String namespacedKey = + getNamespacedVariableName(GLOBAL_NAMESPACE, WORKFLOW_SCHEDULE_RUN_ID_VARIABLE); + when(execution.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE)).thenReturn(null); + when(execution.getVariable(namespacedKey)).thenReturn(null); + + assertNull(WorkflowScheduleRunIdReader.readFrom(execution)); + } + + @Test + void readFrom_malformedStringVariable_returnsNullWithoutThrowing() { + when(execution.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE)).thenReturn("not-a-uuid"); + + assertDoesNotThrow(() -> assertNull(WorkflowScheduleRunIdReader.readFrom(execution))); + } + + @Test + void readFrom_plainVariableTakesPrecedenceOverNamespaced() { + UUID plainId = UUID.randomUUID(); + UUID namespacedId = UUID.randomUUID(); + String namespacedKey = + getNamespacedVariableName(GLOBAL_NAMESPACE, WORKFLOW_SCHEDULE_RUN_ID_VARIABLE); + when(execution.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE)).thenReturn(plainId); + when(execution.getVariable(namespacedKey)).thenReturn(namespacedId); + + UUID result = WorkflowScheduleRunIdReader.readFrom(execution); + assertEquals(plainId, result); + assertNotEquals(namespacedId, result); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdSetterListenerTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdSetterListenerTest.java new file mode 100644 index 000000000000..8043406f8942 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/WorkflowScheduleRunIdSetterListenerTest.java @@ -0,0 +1,91 @@ +/* + * Copyright 2024 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.governance.workflows; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_SCHEDULE_RUN_ID_VARIABLE; + +import java.util.UUID; +import org.flowable.engine.delegate.DelegateExecution; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +class WorkflowScheduleRunIdSetterListenerTest { + + @Mock private DelegateExecution execution; + + private WorkflowScheduleRunIdSetterListener listener; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + listener = new WorkflowScheduleRunIdSetterListener(); + } + + @Test + void testSetsScheduleRunIdWhenAbsent() { + when(execution.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE)).thenReturn(null); + when(execution.getProcessInstanceId()).thenReturn("proc-123"); + + listener.execute(execution); + + ArgumentCaptor captor = ArgumentCaptor.forClass(UUID.class); + verify(execution).setVariable(eq(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE), captor.capture()); + assertNotNull(captor.getValue(), "scheduleRunId should be set to a non-null UUID"); + } + + @Test + void testIsIdempotent_DoesNotOverwriteExistingId() { + UUID existingId = UUID.randomUUID(); + when(execution.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE)).thenReturn(existingId); + + listener.execute(execution); + + verify(execution, never()).setVariable(eq(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE), any()); + } + + @Test + void testGeneratesUniqueIdPerCall() { + when(execution.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE)).thenReturn(null); + when(execution.getProcessInstanceId()).thenReturn("proc-1"); + + ArgumentCaptor captor1 = ArgumentCaptor.forClass(UUID.class); + listener.execute(execution); + verify(execution).setVariable(eq(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE), captor1.capture()); + + DelegateExecution execution2 = org.mockito.Mockito.mock(DelegateExecution.class); + when(execution2.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE)).thenReturn(null); + when(execution2.getProcessInstanceId()).thenReturn("proc-2"); + + ArgumentCaptor captor2 = ArgumentCaptor.forClass(UUID.class); + listener.execute(execution2); + verify(execution2).setVariable(eq(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE), captor2.capture()); + + assertNotNull(captor1.getValue()); + assertNotNull(captor2.getValue()); + assertEquals( + false, + captor1.getValue().equals(captor2.getValue()), + "Different trigger fires should get different scheduleRunIds"); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/WorkflowVariableHandlerTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/WorkflowVariableHandlerTest.java new file mode 100644 index 000000000000..4d252bb4338e --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/WorkflowVariableHandlerTest.java @@ -0,0 +1,153 @@ +/* + * Copyright 2024 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.governance.workflows; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class WorkflowVariableHandlerTest { + + @Test + void getEntityListFromVariables_plainEntityList_returnsCorrectList() { + Map inputNamespaceMap = Map.of("entityList", "global"); + Map variables = + Map.of("global_entityList", List.of("entity1", "entity2", "entity3")); + + List result = + WorkflowVariableHandler.getEntityListFromVariables(inputNamespaceMap, variables); + + assertEquals(List.of("entity1", "entity2", "entity3"), result); + } + + @Test + void getEntityListFromVariables_bandedEntityList_returnsCorrectList() { + Map inputNamespaceMap = Map.of("gold_entityList", "dataCompleteness1"); + Map variables = + Map.of( + "global_entityList", List.of("entity1", "entity2", "entity3"), + "dataCompleteness1_gold_entityList", List.of("entity1", "entity3")); + + List result = + WorkflowVariableHandler.getEntityListFromVariables(inputNamespaceMap, variables); + + assertEquals( + List.of("entity1", "entity3"), result, "Should return gold band subset, not global"); + } + + @Test + void getEntityListFromVariables_checkNodeTrueList_returnsPassedEntities() { + Map inputNamespaceMap = Map.of("true_entityList", "checkNode1"); + Map variables = + Map.of( + "global_entityList", List.of("entity1", "entity2"), + "checkNode1_true_entityList", List.of("entity1"), + "checkNode1_false_entityList", List.of("entity2")); + + List result = + WorkflowVariableHandler.getEntityListFromVariables(inputNamespaceMap, variables); + + assertEquals(List.of("entity1"), result); + } + + @Test + void getEntityListFromVariables_variableNotFound_returnsEmptyList() { + Map inputNamespaceMap = Map.of("entityList", "global"); + Map variables = Map.of("someOtherVar", "value"); + + List result = + WorkflowVariableHandler.getEntityListFromVariables(inputNamespaceMap, variables); + + assertTrue(result.isEmpty()); + } + + @Test + void getEntityListFromVariables_emptyInputNamespaceMap_returnsEmptyList() { + Map inputNamespaceMap = Map.of(); + Map variables = Map.of("global_entityList", List.of("entity1")); + + List result = + WorkflowVariableHandler.getEntityListFromVariables(inputNamespaceMap, variables); + + assertTrue(result.isEmpty()); + } + + @Test + void getEntityListFromVariables_bandedKeyTakesPriorityOverPlain() { + Map inputNamespaceMap = Map.of("silver_entityList", "dataCompleteness1"); + Map variables = + Map.of( + "global_entityList", List.of("e1", "e2", "e3"), + "dataCompleteness1_silver_entityList", List.of("e2")); + + List result = + WorkflowVariableHandler.getEntityListFromVariables(inputNamespaceMap, variables); + + assertEquals(List.of("e2"), result); + } + + @Test + void getUpdatedByFromVariables_resolvesFromNamespace() { + Map inputNamespaceMap = Map.of("updatedBy", "userApproval"); + Map variables = + Map.of("userApproval_updatedBy", "alice", "global_updatedBy", "bob"); + + assertEquals( + "alice", WorkflowVariableHandler.getUpdatedByFromVariables(inputNamespaceMap, variables)); + } + + @Test + void getUpdatedByFromVariables_ignoresOtherNamespacesWhenMapPointsElsewhere() { + Map inputNamespaceMap = Map.of("updatedBy", "userApproval"); + Map variables = + Map.of( + "global_updatedBy", "trigger-user", + "userApproval_updatedBy", "approver", + "otherNode_updatedBy", "bot"); + + assertEquals( + "approver", + WorkflowVariableHandler.getUpdatedByFromVariables(inputNamespaceMap, variables)); + } + + @Test + void getUpdatedByFromVariables_namespaceMissingFromMap_returnsNull() { + Map inputNamespaceMap = Map.of("entityList", "global"); + Map variables = Map.of("global_updatedBy", "alice"); + + assertNull(WorkflowVariableHandler.getUpdatedByFromVariables(inputNamespaceMap, variables)); + } + + @Test + void getUpdatedByFromVariables_variableNotSet_returnsNull() { + Map inputNamespaceMap = Map.of("updatedBy", "userApproval"); + Map variables = Map.of("global_entityList", List.of("e1")); + + assertNull(WorkflowVariableHandler.getUpdatedByFromVariables(inputNamespaceMap, variables)); + } + + @Test + void getUpdatedByFromVariables_globalNamespace_resolvesGlobalKey() { + Map inputNamespaceMap = Map.of("updatedBy", "global"); + Map variables = Map.of("global_updatedBy", "trigger-user"); + + assertEquals( + "trigger-user", + WorkflowVariableHandler.getUpdatedByFromVariables(inputNamespaceMap, variables)); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/TriggerFactoryTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/TriggerFactoryTest.java index 876e5f02b491..c7246303656e 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/TriggerFactoryTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/TriggerFactoryTest.java @@ -27,7 +27,6 @@ import org.junit.jupiter.api.Test; import org.openmetadata.schema.governance.workflows.WorkflowDefinition; import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface; -import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SinkTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.endEvent.EndEventDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.startEvent.StartEventDefinition; import org.openmetadata.schema.governance.workflows.elements.triggers.PeriodicBatchEntityTriggerDefinition; @@ -51,167 +50,51 @@ void testGetMainWorkflowDefinitionNameFromTrigger() { } @Test - void testPeriodicBatchTrigger_WithBatchModeNode_UsesSingleExecution() { - WorkflowDefinition workflow = createWorkflowWithBatchSink(true); + void testPeriodicBatchTrigger_CreatesCorrectTriggerType() { + WorkflowDefinition workflow = createWorkflow(); TriggerInterface trigger = TriggerFactory.createTrigger(workflow); assertNotNull(trigger); assertInstanceOf(PeriodicBatchEntityTrigger.class, trigger); - - // Verify single execution mode by checking the BPMN model - BpmnModel model = new BpmnModel(); - trigger.addToWorkflow(model); - - // Find the CallActivity and verify cardinality is "1" - CallActivity callActivity = findCallActivity(model); - assertNotNull(callActivity, "CallActivity should exist"); - - MultiInstanceLoopCharacteristics loopChars = callActivity.getLoopCharacteristics(); - assertNotNull(loopChars, "Loop characteristics should exist"); - assertEquals("1", loopChars.getLoopCardinality(), "Cardinality should be 1 for batch mode"); - } - - @Test - void testPeriodicBatchTrigger_WithoutBatchModeNode_UsesMultipleExecutions() { - WorkflowDefinition workflow = createWorkflowWithBatchSink(false); - - TriggerInterface trigger = TriggerFactory.createTrigger(workflow); - - assertNotNull(trigger); - assertInstanceOf(PeriodicBatchEntityTrigger.class, trigger); - - BpmnModel model = new BpmnModel(); - trigger.addToWorkflow(model); - - CallActivity callActivity = findCallActivity(model); - assertNotNull(callActivity, "CallActivity should exist"); - - MultiInstanceLoopCharacteristics loopChars = callActivity.getLoopCharacteristics(); - assertNotNull(loopChars, "Loop characteristics should exist"); - assertEquals( - "${numberOfEntities}", - loopChars.getLoopCardinality(), - "Cardinality should be ${numberOfEntities} for non-batch mode"); } @Test - void testPeriodicBatchTrigger_WithNoSinkNodes_UsesMultipleExecutions() { - WorkflowDefinition workflow = createWorkflowWithoutSink(); + void testPeriodicBatchTrigger_MultiInstanceLoop_WhenNoSinkTask() { + WorkflowDefinition workflow = createWorkflow(); TriggerInterface trigger = TriggerFactory.createTrigger(workflow); - assertNotNull(trigger); - BpmnModel model = new BpmnModel(); trigger.addToWorkflow(model); CallActivity callActivity = findCallActivity(model); assertNotNull(callActivity, "CallActivity should exist"); - - MultiInstanceLoopCharacteristics loopChars = callActivity.getLoopCharacteristics(); + MultiInstanceLoopCharacteristics loopChars = + (MultiInstanceLoopCharacteristics) callActivity.getLoopCharacteristics(); + assertNotNull(loopChars, "CallActivity should have multi-instance loop when no SinkTask"); assertEquals( "${numberOfEntities}", loopChars.getLoopCardinality(), - "Cardinality should be ${numberOfEntities} when no sink nodes"); - } - - @Test - void testPeriodicBatchTrigger_WithNullNodes_UsesMultipleExecutions() { - WorkflowDefinition workflow = createWorkflowWithNullNodes(); - - TriggerInterface trigger = TriggerFactory.createTrigger(workflow); - - assertNotNull(trigger); - - BpmnModel model = new BpmnModel(); - trigger.addToWorkflow(model); - - CallActivity callActivity = findCallActivity(model); - assertNotNull(callActivity, "CallActivity should exist"); - - MultiInstanceLoopCharacteristics loopChars = callActivity.getLoopCharacteristics(); - assertEquals( - "${numberOfEntities}", - loopChars.getLoopCardinality(), - "Cardinality should be ${numberOfEntities} when nodes is null"); - } - - @Test - void testPeriodicBatchTrigger_WithMultipleSinkNodes_OneBatchMode_UsesSingleExecution() { - WorkflowDefinition workflow = createWorkflowWithMultipleSinks(); - - TriggerInterface trigger = TriggerFactory.createTrigger(workflow); - - BpmnModel model = new BpmnModel(); - trigger.addToWorkflow(model); - - CallActivity callActivity = findCallActivity(model); - MultiInstanceLoopCharacteristics loopChars = callActivity.getLoopCharacteristics(); - - // If ANY sink has batchMode=true, should use single execution - assertEquals( - "1", - loopChars.getLoopCardinality(), - "Cardinality should be 1 if any sink has batchMode=true"); - } - - @Test - void testPeriodicBatchTrigger_WithSinkBatchModeNotSet_DefaultsToSingleExecution() { - // When batchMode is not explicitly set, it should default to true per schema - WorkflowDefinition workflow = createWorkflowWithSinkBatchModeNotSet(); - - TriggerInterface trigger = TriggerFactory.createTrigger(workflow); - - BpmnModel model = new BpmnModel(); - trigger.addToWorkflow(model); - - CallActivity callActivity = findCallActivity(model); - MultiInstanceLoopCharacteristics loopChars = callActivity.getLoopCharacteristics(); - - assertEquals( - "1", - loopChars.getLoopCardinality(), - "Cardinality should be 1 when batchMode not set (defaults to true)"); + "Cardinality should be ${numberOfEntities} when no SinkTask batch mode"); } private CallActivity findCallActivity(BpmnModel model) { for (Process process : model.getProcesses()) { for (FlowElement element : process.getFlowElements()) { - if (element instanceof CallActivity) { - return (CallActivity) element; + if (element instanceof CallActivity callActivity) { + return callActivity; } } } return null; } - private WorkflowDefinition createWorkflowWithBatchSink(boolean batchMode) { + private WorkflowDefinition createWorkflow() { WorkflowDefinition workflow = new WorkflowDefinition(); workflow.setName("TestBatchWorkflow"); workflow.setFullyQualifiedName("TestBatchWorkflow"); - - // Create periodic batch trigger using JSON - PeriodicBatchEntityTriggerDefinition trigger = createPeriodicBatchTrigger(); - workflow.setTrigger(trigger); - - // Create nodes with sink task - List nodes = new ArrayList<>(); - nodes.add(createStartEvent()); - nodes.add(createSinkTask("gitSink", batchMode)); - nodes.add(createEndEvent()); - workflow.setNodes(nodes); - - return workflow; - } - - private WorkflowDefinition createWorkflowWithoutSink() { - WorkflowDefinition workflow = new WorkflowDefinition(); - workflow.setName("TestNoSinkWorkflow"); - workflow.setFullyQualifiedName("TestNoSinkWorkflow"); - - PeriodicBatchEntityTriggerDefinition trigger = createPeriodicBatchTrigger(); - workflow.setTrigger(trigger); + workflow.setTrigger(createPeriodicBatchTrigger()); List nodes = new ArrayList<>(); nodes.add(createStartEvent()); @@ -221,54 +104,6 @@ private WorkflowDefinition createWorkflowWithoutSink() { return workflow; } - private WorkflowDefinition createWorkflowWithNullNodes() { - WorkflowDefinition workflow = new WorkflowDefinition(); - workflow.setName("TestNullNodesWorkflow"); - workflow.setFullyQualifiedName("TestNullNodesWorkflow"); - - PeriodicBatchEntityTriggerDefinition trigger = createPeriodicBatchTrigger(); - workflow.setTrigger(trigger); - - workflow.setNodes(null); - - return workflow; - } - - private WorkflowDefinition createWorkflowWithMultipleSinks() { - WorkflowDefinition workflow = new WorkflowDefinition(); - workflow.setName("TestMultiSinkWorkflow"); - workflow.setFullyQualifiedName("TestMultiSinkWorkflow"); - - PeriodicBatchEntityTriggerDefinition trigger = createPeriodicBatchTrigger(); - workflow.setTrigger(trigger); - - List nodes = new ArrayList<>(); - nodes.add(createStartEvent()); - nodes.add(createSinkTask("webhookSink", false)); // batchMode=false - nodes.add(createSinkTask("gitSink", true)); // batchMode=true - nodes.add(createEndEvent()); - workflow.setNodes(nodes); - - return workflow; - } - - private WorkflowDefinition createWorkflowWithSinkBatchModeNotSet() { - WorkflowDefinition workflow = new WorkflowDefinition(); - workflow.setName("TestDefaultBatchModeWorkflow"); - workflow.setFullyQualifiedName("TestDefaultBatchModeWorkflow"); - - PeriodicBatchEntityTriggerDefinition trigger = createPeriodicBatchTrigger(); - workflow.setTrigger(trigger); - - List nodes = new ArrayList<>(); - nodes.add(createStartEvent()); - nodes.add(createSinkTaskWithoutBatchMode("gitSink")); // batchMode not set - nodes.add(createEndEvent()); - workflow.setNodes(nodes); - - return workflow; - } - private PeriodicBatchEntityTriggerDefinition createPeriodicBatchTrigger() { String triggerJson = """ @@ -298,49 +133,4 @@ private EndEventDefinition createEndEvent() { endEvent.setName("end"); return endEvent; } - - private SinkTaskDefinition createSinkTask(String name, boolean batchMode) { - String sinkTaskJson = - """ - { - "name": "%s", - "type": "automatedTask", - "subType": "sinkTask", - "config": { - "sinkType": "git", - "syncMode": "overwrite", - "outputFormat": "yaml", - "batchMode": %s, - "timeoutSeconds": 300, - "sinkConfig": { - "repositoryUrl": "https://github.com/org/repo.git" - } - } - } - """ - .formatted(name, batchMode); - return JsonUtils.readValue(sinkTaskJson, SinkTaskDefinition.class); - } - - private SinkTaskDefinition createSinkTaskWithoutBatchMode(String name) { - String sinkTaskJson = - """ - { - "name": "%s", - "type": "automatedTask", - "subType": "sinkTask", - "config": { - "sinkType": "git", - "syncMode": "overwrite", - "outputFormat": "yaml", - "timeoutSeconds": 300, - "sinkConfig": { - "repositoryUrl": "https://github.com/org/repo.git" - } - } - } - """ - .formatted(name); - return JsonUtils.readValue(sinkTaskJson, SinkTaskDefinition.class); - } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/CommitChangeEventOffsetImplTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/CommitChangeEventOffsetImplTest.java index a5272e3ba245..4f5a4c32247f 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/CommitChangeEventOffsetImplTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/CommitChangeEventOffsetImplTest.java @@ -140,6 +140,39 @@ void testExecute_existingOffsetEqual_skipsCommit() { .upsertSubscriberExtension(anyString(), anyString(), anyString(), anyString()); } + // ------------------------------------------------------------------------- + // commitOffset — direct static call, null offset → no-op + // ------------------------------------------------------------------------- + + @Test + void testCommitOffset_nullOffset_isNoOp() { + CommitChangeEventOffsetImpl.commitOffset("wf", "table", null); + + verify(eventSubscriptionDAO, never()).getSubscriberExtension(anyString(), anyString()); + verify(eventSubscriptionDAO, never()) + .upsertSubscriberExtension(anyString(), anyString(), anyString(), anyString()); + } + + // ------------------------------------------------------------------------- + // commitOffset — direct static call, stored offset >= processed → skip + // ------------------------------------------------------------------------- + + @Test + void testCommitOffset_storedOffsetHigher_skipsUpsert() { + EventSubscriptionOffset existing = + new EventSubscriptionOffset().withCurrentOffset(100L).withStartingOffset(100L); + when(eventSubscriptionDAO.getSubscriberExtension(anyString(), anyString())) + .thenReturn(JsonUtils.pojoToJson(existing)); + + try (MockedStatic entityMock = mockStatic(Entity.class)) { + entityMock.when(Entity::getCollectionDAO).thenReturn(collectionDAO); + CommitChangeEventOffsetImpl.commitOffset("certificationWorkflow", "table", 50L); + } + + verify(eventSubscriptionDAO, never()) + .upsertSubscriberExtension(anyString(), anyString(), anyString(), anyString()); + } + private void injectField(Object target, String fieldName, Object value) throws Exception { Field field = CommitChangeEventOffsetImpl.class.getDeclaredField(fieldName); field.setAccessible(true); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImplTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImplTest.java index e196a9437476..b6cad6f578c8 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImplTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImplTest.java @@ -11,17 +11,21 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.PROCESSED_FQNS_VARIABLE; import static org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger.HAS_FINISHED_VARIABLE; import static org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchChangeEventsImpl.CURRENT_BATCH_OFFSET_VARIABLE; import static org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchChangeEventsImpl.MAX_PROCESSED_OFFSET_VARIABLE; +import static org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchChangeEventsImpl.PROCESSED_FQNS_MAX_SIZE; import java.lang.reflect.Field; +import java.util.LinkedHashMap; import java.util.List; import org.flowable.common.engine.api.delegate.Expression; import org.flowable.engine.delegate.DelegateExecution; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; @@ -187,7 +191,8 @@ void testExecute_withEvents_setsHasFinishedFalse() { @Test void testExecute_usesStoredOffsetFromExecution() { when(execution.getVariable(CURRENT_BATCH_OFFSET_VARIABLE)).thenReturn(100L); - when(execution.getVariable(MAX_PROCESSED_OFFSET_VARIABLE)).thenReturn(null); + // existingMax == batchMaxOffset (100L == 100L) → inline commit is skipped + when(execution.getVariable(MAX_PROCESSED_OFFSET_VARIABLE)).thenReturn(100L); when(changeEventDAO.listByEntityTypesWithOffset(anyList(), eq(100L), any(int.class))) .thenReturn(List.of()); @@ -196,9 +201,7 @@ void testExecute_usesStoredOffsetFromExecution() { impl.execute(execution); } - // DAO called with the stored offset verify(changeEventDAO).listByEntityTypesWithOffset(anyList(), eq(100L), any(int.class)); - // eventSubscriptionDAO never queried when offset already in execution verify(eventSubscriptionDAO, never()).getSubscriberExtension(anyString(), anyString()); } @@ -237,6 +240,125 @@ void testExecute_deduplicatesByFqn() { verify(execution).setVariable(eq(MAX_PROCESSED_OFFSET_VARIABLE), eq(20L)); } + // ------------------------------------------------------------------------- + // execute — cross-batch FQN dedup: FQN already in processedFqns is skipped + // ------------------------------------------------------------------------- + + @Test + void testExecute_skipsFqnAlreadyDispatchedInPriorBatch() { + LinkedHashMap priorBatchCache = new LinkedHashMap<>(); + priorBatchCache.put("schema.myTable", Boolean.TRUE); + when(execution.getVariable(PROCESSED_FQNS_VARIABLE)).thenReturn(priorBatchCache); + when(execution.getVariable(CURRENT_BATCH_OFFSET_VARIABLE)).thenReturn(null); + when(execution.getVariable(MAX_PROCESSED_OFFSET_VARIABLE)).thenReturn(null); + + ChangeEvent changeEvent = new ChangeEvent(); + changeEvent.setEntityFullyQualifiedName("schema.myTable"); + changeEvent.setEntityType("table"); + ChangeEventRecord record = new ChangeEventRecord(30L, JsonUtils.pojoToJson(changeEvent)); + when(changeEventDAO.listByEntityTypesWithOffset(anyList(), anyLong(), any(int.class))) + .thenReturn(List.of(record)); + + try (MockedStatic entityMock = mockStatic(Entity.class)) { + entityMock.when(Entity::getCollectionDAO).thenReturn(collectionDAO); + impl.execute(execution); + } + + verify(execution).setVariable(eq("numberOfEntities"), eq(0)); + verify(execution).setVariable(eq(ENTITY_LIST_VARIABLE), eq(List.of())); + } + + // ------------------------------------------------------------------------- + // execute — bounded LRU: oldest entry evicted when cache exceeds cap + // ------------------------------------------------------------------------- + + @Test + void testExecute_evictsOldestFqnWhenCacheExceedsCap() { + LinkedHashMap bigCache = new LinkedHashMap<>(); + for (int i = 0; i < PROCESSED_FQNS_MAX_SIZE; i++) { + bigCache.put("fqn-" + i, Boolean.TRUE); + } + when(execution.getVariable(PROCESSED_FQNS_VARIABLE)).thenReturn(bigCache); + when(execution.getVariable(CURRENT_BATCH_OFFSET_VARIABLE)).thenReturn(0L); + when(execution.getVariable(MAX_PROCESSED_OFFSET_VARIABLE)).thenReturn(null); + + ChangeEvent changeEvent = new ChangeEvent(); + changeEvent.setEntityFullyQualifiedName("fqn-new"); + changeEvent.setEntityType("table"); + ChangeEventRecord record = new ChangeEventRecord(50L, JsonUtils.pojoToJson(changeEvent)); + when(changeEventDAO.listByEntityTypesWithOffset(anyList(), anyLong(), any(int.class))) + .thenReturn(List.of(record)); + + try (MockedStatic entityMock = mockStatic(Entity.class)) { + entityMock.when(Entity::getCollectionDAO).thenReturn(collectionDAO); + impl.execute(execution); + } + + ArgumentCaptor captor = ArgumentCaptor.forClass(Object.class); + verify(execution).setVariable(eq(PROCESSED_FQNS_VARIABLE), captor.capture()); + + @SuppressWarnings("unchecked") + LinkedHashMap capturedCache = + (LinkedHashMap) captor.getValue(); + assertEquals(PROCESSED_FQNS_MAX_SIZE, capturedCache.size()); + assertFalse(capturedCache.containsKey("fqn-0")); + assertTrue(capturedCache.containsKey("fqn-new")); + } + + // ------------------------------------------------------------------------- + // execute — Fix 1: offset is committed inline when max advances + // ------------------------------------------------------------------------- + + @Test + void testExecute_commitsOffsetInlineWhenBatchAdvancesMax() { + when(execution.getVariable(CURRENT_BATCH_OFFSET_VARIABLE)).thenReturn(null); + when(execution.getVariable(MAX_PROCESSED_OFFSET_VARIABLE)).thenReturn(null); + + ChangeEvent changeEvent = new ChangeEvent(); + changeEvent.setEntityFullyQualifiedName("schema.myTable"); + changeEvent.setEntityType("table"); + ChangeEventRecord record = new ChangeEventRecord(55L, JsonUtils.pojoToJson(changeEvent)); + when(changeEventDAO.listByEntityTypesWithOffset(anyList(), anyLong(), any(int.class))) + .thenReturn(List.of(record)); + + try (MockedStatic entityMock = mockStatic(Entity.class)) { + entityMock.when(Entity::getCollectionDAO).thenReturn(collectionDAO); + impl.execute(execution); + } + + verify(eventSubscriptionDAO) + .upsertSubscriberExtension( + eq("certificationWorkflowTrigger-table"), + anyString(), + eq("eventSubscriptionOffset"), + anyString()); + } + + // ------------------------------------------------------------------------- + // execute — Fix 1: inline commit skipped when offset has not advanced + // ------------------------------------------------------------------------- + + @Test + void testExecute_skipsInlineCommitWhenBatchDoesNotAdvanceMax() { + when(execution.getVariable(CURRENT_BATCH_OFFSET_VARIABLE)).thenReturn(42L); + when(execution.getVariable(MAX_PROCESSED_OFFSET_VARIABLE)).thenReturn(42L); + + ChangeEvent changeEvent = new ChangeEvent(); + changeEvent.setEntityFullyQualifiedName("schema.myTable"); + changeEvent.setEntityType("table"); + ChangeEventRecord record = new ChangeEventRecord(42L, JsonUtils.pojoToJson(changeEvent)); + when(changeEventDAO.listByEntityTypesWithOffset(anyList(), anyLong(), any(int.class))) + .thenReturn(List.of(record)); + + try (MockedStatic entityMock = mockStatic(Entity.class)) { + entityMock.when(Entity::getCollectionDAO).thenReturn(collectionDAO); + impl.execute(execution); + } + + verify(eventSubscriptionDAO, never()) + .upsertSubscriberExtension(anyString(), anyString(), anyString(), anyString()); + } + private void injectField(Object target, String fieldName, Object value) throws Exception { Field field = FetchChangeEventsImpl.class.getDeclaredField(fieldName); field.setAccessible(true); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepositoryUpdatedByTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepositoryUpdatedByTest.java new file mode 100644 index 000000000000..53f48cfe964c --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepositoryUpdatedByTest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2024 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.jdbi3; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.List; +import org.junit.jupiter.api.Test; +import org.openmetadata.schema.governance.workflows.Stage; +import org.openmetadata.schema.governance.workflows.WorkflowInstanceState; + +class WorkflowInstanceRepositoryUpdatedByTest { + + private static WorkflowInstanceState stateWithUpdatedBy(String updatedBy, Long endedAt) { + Stage stage = new Stage().withUpdatedBy(updatedBy).withEndedAt(endedAt); + return new WorkflowInstanceState().withStage(stage); + } + + @Test + void extractUpdatedByFromStates_returnsLatestNonNullUpdatedBy() { + WorkflowInstanceState old = stateWithUpdatedBy("first-approver", 1000L); + WorkflowInstanceState latest = stateWithUpdatedBy("latest-approver", 3000L); + WorkflowInstanceState middle = stateWithUpdatedBy("middle-approver", 2000L); + + assertEquals( + "latest-approver", + WorkflowInstanceRepository.extractUpdatedByFromStates(List.of(old, latest, middle))); + } + + @Test + void extractUpdatedByFromStates_skipsNullUpdatedBy() { + WorkflowInstanceState withNull = stateWithUpdatedBy(null, 5000L); + WorkflowInstanceState withValue = stateWithUpdatedBy("approver", 1000L); + + assertEquals( + "approver", + WorkflowInstanceRepository.extractUpdatedByFromStates(List.of(withNull, withValue))); + } + + @Test + void extractUpdatedByFromStates_allNull_returnsNull() { + assertNull( + WorkflowInstanceRepository.extractUpdatedByFromStates( + List.of(stateWithUpdatedBy(null, 1000L), stateWithUpdatedBy(null, 2000L)))); + } + + @Test + void extractUpdatedByFromStates_emptyList_returnsNull() { + assertNull(WorkflowInstanceRepository.extractUpdatedByFromStates(List.of())); + } + + @Test + void extractUpdatedByFromStates_nullEndedAt_treatedAsZero() { + WorkflowInstanceState withNullEndedAt = stateWithUpdatedBy("approver-a", null); + WorkflowInstanceState withEndedAt = stateWithUpdatedBy("approver-b", 1L); + + assertEquals( + "approver-b", + WorkflowInstanceRepository.extractUpdatedByFromStates( + List.of(withNullEndedAt, withEndedAt))); + } +} diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstance.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstance.json index 3b09a7267543..515494af0871 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstance.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstance.json @@ -26,6 +26,10 @@ "description": "Workflow Definition Id.", "$ref": "../../type/basic.json#/definitions/uuid" }, + "scheduleRunId": { + "description": "Identifier shared by all WorkflowInstances spawned from one fire of a periodic trigger. Allows grouping batches into a single scheduled run. Null for event-based triggers.", + "$ref": "../../type/basic.json#/definitions/uuid" + }, "startedAt": { "description": "Timestamp on which the workflow instance started.", "$ref": "../../type/basic.json#/definitions/timestamp" @@ -34,6 +38,17 @@ "description": "Timestamp on which the workflow instance ended.", "$ref": "../../type/basic.json#/definitions/timestamp" }, + "entityList": { + "description": "List of entity links processed by this workflow instance.", + "type": "array", + "items": { + "type": "string" + } + }, + "updatedBy": { + "description": "User who last performed an action in this workflow instance.", + "type": "string" + }, "variables": { "type": "object", "existingJavaType": "java.util.Map" diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstanceState.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstanceState.json index f831747feeb4..0191f618ec1d 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstanceState.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/workflowInstanceState.json @@ -23,6 +23,10 @@ "description": "Workflow Definition Reference.", "$ref": "../../type/basic.json#/definitions/uuid" }, + "scheduleRunId": { + "description": "Identifier shared by all WorkflowInstanceStates spawned from one fire of a periodic trigger. Allows grouping batches into a single scheduled run. Null for event-based triggers.", + "$ref": "../../type/basic.json#/definitions/uuid" + }, "stage": { "type": "object", "properties": { @@ -48,6 +52,17 @@ }, "default": [] }, + "entityList": { + "description": "List of entity links that were the input to this stage.", + "type": "array", + "items": { + "type": "string" + } + }, + "updatedBy": { + "description": "User who performed the action in this stage (set by approval or update nodes).", + "type": "string" + }, "variables": { "type": "object", "existingJavaType": "java.util.Map" diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/workflowInstance.ts b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/workflowInstance.ts index 9f51710d7e18..c49dc6e3b40d 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/workflowInstance.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/workflowInstance.ts @@ -17,12 +17,21 @@ export interface WorkflowInstance { /** * Timestamp on which the workflow instance ended. */ - endedAt?: number; - exception?: string; + endedAt?: number; + /** + * List of entity links processed by this workflow instance. + */ + entityList?: string[]; + exception?: string; /** * Unique identifier of this workflow instance state. */ id?: string; + /** + * Identifier shared by all WorkflowInstances spawned from one fire of a periodic trigger. + * Allows grouping batches into a single scheduled run. Null for event-based triggers. + */ + scheduleRunId?: string; /** * Timestamp on which the workflow instance started. */ @@ -32,6 +41,10 @@ export interface WorkflowInstance { * Timestamp on which the workflow instance state was created. */ timestamp?: number; + /** + * User who last performed an action in this workflow instance. + */ + updatedBy?: string; variables?: { [key: string]: any }; /** * Workflow Definition Id. diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/workflowInstanceState.ts b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/workflowInstanceState.ts index a0680734de6a..f63867902a56 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/workflowInstanceState.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/governance/workflows/workflowInstanceState.ts @@ -18,9 +18,15 @@ export interface WorkflowInstanceState { /** * Unique identifier of this workflow instance state. */ - id?: string; - stage?: Stage; - status?: WorkflowStatus; + id?: string; + /** + * Identifier shared by all WorkflowInstanceStates spawned from one fire of a periodic + * trigger. Allows grouping batches into a single scheduled run. Null for event-based + * triggers. + */ + scheduleRunId?: string; + stage?: Stage; + status?: WorkflowStatus; /** * Timestamp on which the workflow instance state was created. */ @@ -49,12 +55,20 @@ export interface Stage { * Timestamp on which the workflow instance stage ended. */ endedAt?: number; - name?: string; + /** + * List of entity links that were the input to this stage. + */ + entityList?: string[]; + name?: string; /** * Timestamp on which the workflow instance stage started. */ startedAt?: number; tasks?: string[]; + /** + * User who performed the action in this stage (set by approval or update nodes). + */ + updatedBy?: string; variables?: { [key: string]: any }; } diff --git a/openmetadata-ui/src/main/resources/ui/src/rest/workflowAPI.interface.ts b/openmetadata-ui/src/main/resources/ui/src/rest/workflowAPI.interface.ts index 46c402072b7b..4421f8ef1064 100644 --- a/openmetadata-ui/src/main/resources/ui/src/rest/workflowAPI.interface.ts +++ b/openmetadata-ui/src/main/resources/ui/src/rest/workflowAPI.interface.ts @@ -15,9 +15,11 @@ export interface WorkflowInstanceFromApplicationParams { endTs: number; workflowDefinitionName: string; entityLink: string; + scheduleRunId?: string; } export interface WorkflowInstanceStateParams { startTs: number; endTs: number; + scheduleRunId?: string; }