Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f929f4d
Governance workflows: scheduleRunId correlation, batch trigger refact…
yan-3005 Apr 6, 2026
95ecb30
Update generated TypeScript types
github-actions[bot] Apr 6, 2026
d5e4d84
Governance workflows: fix updatedBy namespace resolution and address …
yan-3005 Apr 6, 2026
3eeaab5
Fix readScheduleRunIdFromProcess to handle String-typed schedule run IDs
yan-3005 Apr 6, 2026
035d0f9
Route scheduleRunId filter through ListFilter instead of bypassing pa…
yan-3005 Apr 6, 2026
870bc7b
Improve PeriodicBatchEntityTrigger scaling: per-iteration offset comm…
yan-3005 Apr 6, 2026
3ff954b
Merge branch 'ram/inclusive-gateway' into ram/workflow-instance-state…
yan-3005 Apr 7, 2026
3d5c00b
Merge branch 'ram/inclusive-gateway' into ram/workflow-instance-state…
yan-3005 Apr 12, 2026
3b8aed3
feat(workflows): scheduleRunId, per-batch instance recording, stage e…
yan-3005 Apr 14, 2026
6852185
Merge remote-tracking branch 'origin/ram/inclusive-gateway' into ram/…
yan-3005 Apr 14, 2026
23047ac
fix(trigger): remove attachWorkflowInstanceListeners from trigger pro…
yan-3005 Apr 15, 2026
12177fe
fix(trigger): resolve merge conflict — keep scheduleRunId IO parameter
yan-3005 Apr 15, 2026
3383b60
Merge remote-tracking branch 'origin/main' into ram/workflow-instance…
yan-3005 Apr 15, 2026
277e75a
fix(tests): update TriggerFactoryTest for multi-instance arch; restor…
yan-3005 Apr 15, 2026
34cb467
Merge branch 'ram/inclusive-gateway' into ram/workflow-instance-state…
yan-3005 Apr 15, 2026
86dbaa6
Merge remote-tracking branch 'origin/ram/inclusive-gateway' into ram/…
yan-3005 Apr 15, 2026
27cbda2
Merge branch 'ram/workflow-instance-states-impl' of https://github.co…
yan-3005 Apr 15, 2026
8084025
Merge origin/ram/inclusive-gateway into ram/workflow-instance-states-…
yan-3005 Apr 15, 2026
10548d8
perf: fetch WorkflowDefinition once per stage event, eliminating dupl…
yan-3005 Apr 15, 2026
82ddbf9
fix: include resolveWorkflowFqn helper in TriggerFactory
yan-3005 Apr 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions bootstrap/sql/migrations/native/1.14.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE workflow_instance_time_series
ADD COLUMN scheduleRunId VARCHAR(36)
GENERATED ALWAYS AS (json ->> '$.scheduleRunId');
ALTER TABLE workflow_instance_time_series
ADD INDEX idx_workflow_instance_schedule_run_id (scheduleRunId);

ALTER TABLE workflow_instance_state_time_series
ADD COLUMN scheduleRunId VARCHAR(36)
GENERATED ALWAYS AS (json ->> '$.scheduleRunId');
ALTER TABLE workflow_instance_state_time_series
ADD INDEX idx_workflow_instance_state_schedule_run_id (scheduleRunId);
11 changes: 11 additions & 0 deletions bootstrap/sql/migrations/native/1.14.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE workflow_instance_time_series
ADD COLUMN scheduleRunId VARCHAR(36)
GENERATED ALWAYS AS ((json ->> 'scheduleRunId')) STORED;
CREATE INDEX idx_workflow_instance_schedule_run_id
ON workflow_instance_time_series (scheduleRunId);

ALTER TABLE workflow_instance_state_time_series
ADD COLUMN scheduleRunId VARCHAR(36)
GENERATED ALWAYS AS ((json ->> 'scheduleRunId')) STORED;
CREATE INDEX idx_workflow_instance_state_schedule_run_id
ON workflow_instance_state_time_series (scheduleRunId);
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class Workflow {
public static final String STAGE_INSTANCE_STATE_ID_VARIABLE = "stageInstanceStateId";
public static final String WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE =
"workflowInstanceExecutionId";
public static final String WORKFLOW_SCHEDULE_RUN_ID_VARIABLE = "scheduleRunId";
public static final String WORKFLOW_RUNTIME_EXCEPTION = "workflowRuntimeException";
public static final String EXCEPTION_VARIABLE = "exception";
public static final String FAILURE_VARIABLE = "failure";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.PROCESS_COMPLETED_WITH_ERROR_END_EVENT;
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_SCHEDULE_RUN_ID_VARIABLE;
import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -22,6 +24,7 @@
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.service.Entity;
import org.openmetadata.service.governance.workflows.elements.TriggerFactory;
import org.openmetadata.service.jdbi3.NewStageRequest;
import org.openmetadata.service.jdbi3.WorkflowDefinitionRepository;
import org.openmetadata.service.jdbi3.WorkflowInstanceRepository;
import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository;
Expand Down Expand Up @@ -210,22 +213,26 @@ private void addFailureStage(
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);

UUID executionId = UUID.nameUUIDFromBytes(processInstanceId.getBytes());
UUID scheduleRunId = readScheduleRunIdFromProcess(processInstanceId);

UUID stageId =
stateRepository.addNewStageToInstance(
WORKFLOW_FAILURE_LISTENER_STAGE,
executionId,
workflowInstanceId,
workflowName,
System.currentTimeMillis());
new NewStageRequest(
WORKFLOW_FAILURE_LISTENER_STAGE,
executionId,
workflowInstanceId,
workflowName,
System.currentTimeMillis(),
scheduleRunId,
null));

Map<String, Object> stageData = new HashMap<>();
stageData.put("status", "FAILED");
stageData.put("failureType", failureType);
stageData.put("processInstanceId", processInstanceId);
stageData.put("exception", errorMessage);

stateRepository.updateStage(stageId, System.currentTimeMillis(), stageData);
stateRepository.updateStage(stageId, System.currentTimeMillis(), null, stageData);

LOG.info(
"[WorkflowFailure] FAILURE_STAGE_ADDED: workflowInstanceId={}, stageId={}",
Expand All @@ -241,6 +248,30 @@ private void addFailureStage(
}
}

private UUID readScheduleRunIdFromProcess(String processInstanceId) {
try {
RuntimeService runtimeService = ProcessEngines.getDefaultProcessEngine().getRuntimeService();
Object plain =
runtimeService.getVariable(processInstanceId, WORKFLOW_SCHEDULE_RUN_ID_VARIABLE);
if (plain instanceof UUID uuid) {
return uuid;
}
Object namespaced =
runtimeService.getVariable(
processInstanceId,
getNamespacedVariableName(GLOBAL_NAMESPACE, WORKFLOW_SCHEDULE_RUN_ID_VARIABLE));
if (namespaced instanceof UUID uuid) {
return uuid;
}
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Outdated
} catch (Exception e) {
LOG.debug(
"[WorkflowFailure] Could not read scheduleRunId for process {}: {}",
processInstanceId,
e.getMessage());
}
return null;
}

private boolean isStageStatusEnabled(String workflowDefinitionKey) {
try {
WorkflowDefinitionRepository workflowDefRepository =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,14 @@ private void addWorkflowInstance(
getMainWorkflowDefinitionNameFromTrigger(
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()));
UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey());
UUID scheduleRunId = WorkflowScheduleRunIdReader.readFrom(execution);

workflowInstanceRepository.addNewWorkflowInstance(
workflowDefinitionName,
workflowInstanceId,
System.currentTimeMillis(),
execution.getVariables());
execution.getVariables(),
scheduleRunId);
LOG.debug(
"[WORKFLOW_INSTANCE_CREATED] Workflow: {}, InstanceId: {}, ProcessInstance: {} - Workflow instance record created successfully",
workflowDefinitionName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.NewStageRequest;
import org.openmetadata.service.jdbi3.WorkflowDefinitionRepository;
import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository;

@Slf4j
Expand Down Expand Up @@ -89,19 +97,22 @@ public void execute(DelegateExecution execution) {
// Create a failed stage record
UUID stageId =
workflowInstanceStateRepository.addNewStageToInstance(
stage + "_failed",
executionId,
workflowInstanceId,
workflowName,
System.currentTimeMillis());
new NewStageRequest(
stage + "_failed",
executionId,
workflowInstanceId,
workflowName,
System.currentTimeMillis(),
WorkflowScheduleRunIdReader.readFrom(execution),
null));

java.util.Map<String, Object> failureData = new java.util.HashMap<>();
failureData.put("status", "FAILED");
failureData.put("error", exc.getMessage());
failureData.put("errorClass", exc.getClass().getSimpleName());

workflowInstanceStateRepository.updateStage(
stageId, System.currentTimeMillis(), failureData);
stageId, System.currentTimeMillis(), null, failureData);

LOG.warn(
"[STAGE_FAILED_RECORDED] Workflow: {}, ProcessInstance: {}, Stage: {}, StageId: {} - Failed stage recorded in database",
Expand Down Expand Up @@ -172,13 +183,17 @@ private void addNewStage(

String stage =
Optional.ofNullable(execution.getCurrentActivityId()).orElse(workflowDefinitionName);
List<String> entityList = resolveStageEntityList(workflowDefinitionName, stage, varHandler);
UUID workflowInstanceStateId =
workflowInstanceStateRepository.addNewStageToInstance(
stage,
workflowInstanceExecutionId,
workflowInstanceId,
workflowDefinitionName,
System.currentTimeMillis());
new NewStageRequest(
stage,
workflowInstanceExecutionId,
workflowInstanceId,
workflowDefinitionName,
System.currentTimeMillis(),
WorkflowScheduleRunIdReader.readFrom(execution),
entityList));
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Outdated
varHandler.setNodeVariable(STAGE_INSTANCE_STATE_ID_VARIABLE, workflowInstanceStateId);
LOG.debug(
"[STAGE_CREATED] Workflow: {}, ProcessInstance: {}, Stage: {}, StageId: {} - Stage record created successfully",
Expand All @@ -188,6 +203,51 @@ private void addNewStage(
workflowInstanceStateId);
}

@SuppressWarnings("unchecked")
private Map<String, String> getStageInputNamespaceMap(
String workflowDefinitionName, String stageName) {
WorkflowDefinitionRepository workflowDefinitionRepository =
(WorkflowDefinitionRepository) Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION);
WorkflowDefinition workflowDefinition =
workflowDefinitionRepository.getByNameForStageProcessing(workflowDefinitionName);
return workflowDefinition.getNodes().stream()
.filter(n -> stageName.equals(n.getName()))
.findFirst()
.map(WorkflowNodeDefinitionInterface::getInputNamespaceMap)
.map(m -> (Map<String, String>) JsonUtils.readOrConvertValue(m, Map.class))
.orElse(null);
}

private List<String> resolveStageEntityList(
String workflowDefinitionName, String stageName, WorkflowVariableHandler varHandler) {
try {
Map<String, String> inputNamespaceMap =
getStageInputNamespaceMap(workflowDefinitionName, stageName);
if (inputNamespaceMap == null) {
return null;
}
return WorkflowVariableHandler.getEntityList(inputNamespaceMap, varHandler);
} catch (EntityNotFoundException e) {
LOG.debug("Could not resolve entityList for stage '{}': {}", stageName, e.getMessage());
return null;
}
}

private String resolveStageUpdatedBy(
String workflowDefinitionName, String stageName, Map<String, Object> variables) {
try {
Map<String, String> inputNamespaceMap =
getStageInputNamespaceMap(workflowDefinitionName, stageName);
if (inputNamespaceMap == null) {
return null;
}
return WorkflowVariableHandler.getUpdatedByFromVariables(inputNamespaceMap, variables);
} catch (EntityNotFoundException e) {
LOG.debug("Could not resolve updatedBy for stage '{}': {}", stageName, e.getMessage());
return null;
}
}

private void updateStage(
WorkflowVariableHandler varHandler,
DelegateExecution execution,
Expand All @@ -210,8 +270,10 @@ private void updateStage(
return;
}

Map<String, Object> variables = execution.getVariables();
String updatedBy = resolveStageUpdatedBy(workflowDefinitionName, stage, variables);
workflowInstanceStateRepository.updateStage(
workflowInstanceStateId, System.currentTimeMillis(), execution.getVariables());
workflowInstanceStateId, System.currentTimeMillis(), updatedBy, variables);

LOG.debug(
"[STAGE_UPDATED] Workflow: {}, ProcessInstance: {}, Stage: {}, StageId: {} - Stage completion recorded",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.openmetadata.service.governance.workflows;

import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_SCHEDULE_RUN_ID_VARIABLE;
import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName;

import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution;

@Slf4j
public final class WorkflowScheduleRunIdReader {
private WorkflowScheduleRunIdReader() {}

public static UUID readFrom(DelegateExecution execution) {
Object plain = execution.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE);
if (plain != null) {
return toUuid(plain);
}
Object namespaced =
execution.getVariable(
getNamespacedVariableName(GLOBAL_NAMESPACE, WORKFLOW_SCHEDULE_RUN_ID_VARIABLE));
return namespaced != null ? toUuid(namespaced) : null;
}

private static UUID toUuid(Object value) {
if (value instanceof UUID uuid) {
return uuid;
}
try {
return UUID.fromString(value.toString());
} catch (IllegalArgumentException e) {
LOG.warn(
"[WorkflowScheduleRunIdReader] Invalid scheduleRunId value '{}': {}",
value,
e.getMessage());
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.openmetadata.service.governance.workflows;

import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_SCHEDULE_RUN_ID_VARIABLE;

import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;

@Slf4j
public class WorkflowScheduleRunIdSetterListener implements JavaDelegate {
@Override
public void execute(DelegateExecution execution) {
if (execution.getVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE) != null) {
return;
}
UUID scheduleRunId = UUID.randomUUID();
execution.setVariable(WORKFLOW_SCHEDULE_RUN_ID_VARIABLE, scheduleRunId);
LOG.debug(
"[SCHEDULE_RUN_ID_SET] ProcessInstance: {} - scheduleRunId: {}",
execution.getProcessInstanceId(),
scheduleRunId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.openmetadata.service.governance.workflows.Workflow.ENTITY_LIST_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.FAILURE_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE;
import static org.openmetadata.service.governance.workflows.Workflow.UPDATED_BY_VARIABLE;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -144,6 +145,40 @@ public static List<String> getEntityList(
return List.of();
}

@SuppressWarnings("unchecked")
public static List<String> getEntityListFromVariables(
Map<String, ?> inputNamespaceMap, Map<String, Object> variables) {
for (Map.Entry<String, ?> entry : inputNamespaceMap.entrySet()) {
String key = entry.getKey();
String namespace = (String) entry.getValue();
if (key.endsWith("_" + ENTITY_LIST_VARIABLE) && namespace != null) {
Object obj = variables.get(getNamespacedVariableName(namespace, key));
if (obj instanceof List) {
return (List<String>) obj;
}
}
}
String entityListNamespace = (String) inputNamespaceMap.get(ENTITY_LIST_VARIABLE);
if (entityListNamespace != null) {
Object obj =
variables.get(getNamespacedVariableName(entityListNamespace, ENTITY_LIST_VARIABLE));
if (obj instanceof List) {
return (List<String>) obj;
}
}
return List.of();
}

public static String getUpdatedByFromVariables(
Map<String, ?> inputNamespaceMap, Map<String, Object> variables) {
String namespace = (String) inputNamespaceMap.get(UPDATED_BY_VARIABLE);
if (namespace == null) {
return null;
}
Object value = variables.get(getNamespacedVariableName(namespace, UPDATED_BY_VARIABLE));
return value instanceof String s ? s : null;
}

public void setFailure(boolean failure) {
if (failure) {
varScope.setTransientVariable(FAILURE_VARIABLE, true);
Expand Down
Loading
Loading