Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f929f4d
Governance workflows: scheduleRunId correlation, batch trigger refact…
yan-3005 Apr 6, 2026
95ecb30
Update generated TypeScript types
github-actions[bot] Apr 6, 2026
d5e4d84
Governance workflows: fix updatedBy namespace resolution and address …
yan-3005 Apr 6, 2026
3eeaab5
Fix readScheduleRunIdFromProcess to handle String-typed schedule run IDs
yan-3005 Apr 6, 2026
035d0f9
Route scheduleRunId filter through ListFilter instead of bypassing pa…
yan-3005 Apr 6, 2026
870bc7b
Improve PeriodicBatchEntityTrigger scaling: per-iteration offset comm…
yan-3005 Apr 6, 2026
3ff954b
Merge branch 'ram/inclusive-gateway' into ram/workflow-instance-state…
yan-3005 Apr 7, 2026
3d5c00b
Merge branch 'ram/inclusive-gateway' into ram/workflow-instance-state…
yan-3005 Apr 12, 2026
3b8aed3
feat(workflows): scheduleRunId, per-batch instance recording, stage e…
yan-3005 Apr 14, 2026
6852185
Merge remote-tracking branch 'origin/ram/inclusive-gateway' into ram/…
yan-3005 Apr 14, 2026
23047ac
fix(trigger): remove attachWorkflowInstanceListeners from trigger pro…
yan-3005 Apr 15, 2026
12177fe
fix(trigger): resolve merge conflict — keep scheduleRunId IO parameter
yan-3005 Apr 15, 2026
3383b60
Merge remote-tracking branch 'origin/main' into ram/workflow-instance…
yan-3005 Apr 15, 2026
277e75a
fix(tests): update TriggerFactoryTest for multi-instance arch; restor…
yan-3005 Apr 15, 2026
34cb467
Merge branch 'ram/inclusive-gateway' into ram/workflow-instance-state…
yan-3005 Apr 15, 2026
86dbaa6
Merge remote-tracking branch 'origin/ram/inclusive-gateway' into ram/…
yan-3005 Apr 15, 2026
27cbda2
Merge branch 'ram/workflow-instance-states-impl' of https://github.co…
yan-3005 Apr 15, 2026
8084025
Merge origin/ram/inclusive-gateway into ram/workflow-instance-states-…
yan-3005 Apr 15, 2026
10548d8
perf: fetch WorkflowDefinition once per stage event, eliminating dupl…
yan-3005 Apr 15, 2026
82ddbf9
fix: include resolveWorkflowFqn helper in TriggerFactory
yan-3005 Apr 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions bootstrap/sql/migrations/native/1.14.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
ALTER TABLE workflow_instance_time_series
ADD COLUMN scheduleRunId VARCHAR(36)
GENERATED ALWAYS AS (json ->> '$.scheduleRunId');
ALTER TABLE workflow_instance_time_series
ADD INDEX idx_workflow_instance_schedule_run_id (scheduleRunId);

ALTER TABLE workflow_instance_state_time_series
ADD COLUMN scheduleRunId VARCHAR(36)
GENERATED ALWAYS AS (json ->> '$.scheduleRunId');
ALTER TABLE workflow_instance_state_time_series
ADD INDEX idx_workflow_instance_state_schedule_run_id (scheduleRunId);

-- Marker: forces v1140 Java runDataMigration() to re-run via the reprocessing path on existing
-- DBs after fixing migrateWorkflowJson trigger-output bug in MigrationUtil.java. Idempotent.
DELETE FROM SERVER_MIGRATION_SQL_LOGS WHERE 1=0;

-- Update entityLink generated column to read from global_entityList[0] instead of global_relatedEntity
ALTER TABLE workflow_instance_time_series
MODIFY COLUMN entityLink TEXT GENERATED ALWAYS AS (json ->> '$.variables.global_entityList[0]');
20 changes: 20 additions & 0 deletions bootstrap/sql/migrations/native/1.14.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
ALTER TABLE workflow_instance_time_series
ADD COLUMN scheduleRunId VARCHAR(36)
GENERATED ALWAYS AS ((json ->> 'scheduleRunId')) STORED;
CREATE INDEX idx_workflow_instance_schedule_run_id
ON workflow_instance_time_series (scheduleRunId);

ALTER TABLE workflow_instance_state_time_series
ADD COLUMN scheduleRunId VARCHAR(36)
GENERATED ALWAYS AS ((json ->> 'scheduleRunId')) STORED;
CREATE INDEX idx_workflow_instance_state_schedule_run_id
ON workflow_instance_state_time_series (scheduleRunId);

-- Marker: forces v1140 Java runDataMigration() to re-run via the reprocessing path on existing
-- DBs after fixing migrateWorkflowJson trigger-output bug in MigrationUtil.java. Idempotent.
DELETE FROM SERVER_MIGRATION_SQL_LOGS WHERE 1=0;

-- Update entityLink generated column to read from global_entityList[0] instead of global_relatedEntity
ALTER TABLE workflow_instance_time_series DROP COLUMN entityLink;
ALTER TABLE workflow_instance_time_series
ADD COLUMN entityLink TEXT GENERATED ALWAYS AS ((json -> 'variables' -> 'global_entityList' ->> 0)) STORED;
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class Workflow {
public static final String FALSE_ENTITY_LIST_VARIABLE = "false_entityList";
public static final String HAS_TRUE_ENTITIES_VARIABLE = "hasTrueEntities";
public static final String HAS_FALSE_ENTITIES_VARIABLE = "hasFalseEntities";
public static final String PROCESSED_FQNS_VARIABLE = "processedFqnsInRun";
public static final String BATCH_SINK_PROCESSED_VARIABLE = "batchSinkProcessed";
public static final String TRIGGERING_OBJECT_ID_VARIABLE = "triggeringObjectId";
public static final String RECOGNIZER_FEEDBACK = "recognizerFeedback";
Expand All @@ -26,6 +27,7 @@ public class Workflow {
public static final String STAGE_INSTANCE_STATE_ID_VARIABLE = "stageInstanceStateId";
public static final String WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE =
"workflowInstanceExecutionId";
public static final String WORKFLOW_SCHEDULE_RUN_ID_VARIABLE = "scheduleRunId";
public static final String WORKFLOW_RUNTIME_EXCEPTION = "workflowRuntimeException";
public static final String EXCEPTION_VARIABLE = "exception";
public static final String FAILURE_VARIABLE = "failure";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.PROCESS_COMPLETED_WITH_ERROR_END_EVENT;
import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_SCHEDULE_RUN_ID_VARIABLE;
import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -22,6 +24,7 @@
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.service.Entity;
import org.openmetadata.service.governance.workflows.elements.TriggerFactory;
import org.openmetadata.service.jdbi3.NewStageRequest;
import org.openmetadata.service.jdbi3.WorkflowDefinitionRepository;
import org.openmetadata.service.jdbi3.WorkflowInstanceRepository;
import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository;
Expand Down Expand Up @@ -210,22 +213,26 @@ private void addFailureStage(
Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);

UUID executionId = UUID.nameUUIDFromBytes(processInstanceId.getBytes());
UUID scheduleRunId = readScheduleRunIdFromProcess(processInstanceId);

UUID stageId =
stateRepository.addNewStageToInstance(
WORKFLOW_FAILURE_LISTENER_STAGE,
executionId,
workflowInstanceId,
workflowName,
System.currentTimeMillis());
new NewStageRequest(
WORKFLOW_FAILURE_LISTENER_STAGE,
executionId,
workflowInstanceId,
workflowName,
System.currentTimeMillis(),
scheduleRunId,
null));

Map<String, Object> stageData = new HashMap<>();
stageData.put("status", "FAILED");
stageData.put("failureType", failureType);
stageData.put("processInstanceId", processInstanceId);
stageData.put("exception", errorMessage);

stateRepository.updateStage(stageId, System.currentTimeMillis(), stageData);
stateRepository.updateStage(stageId, System.currentTimeMillis(), null, stageData);

LOG.info(
"[WorkflowFailure] FAILURE_STAGE_ADDED: workflowInstanceId={}, stageId={}",
Expand All @@ -241,6 +248,28 @@ private void addFailureStage(
}
}

private UUID readScheduleRunIdFromProcess(String processInstanceId) {
try {
RuntimeService runtimeService = ProcessEngines.getDefaultProcessEngine().getRuntimeService();
Object plain =
runtimeService.getVariable(processInstanceId, WORKFLOW_SCHEDULE_RUN_ID_VARIABLE);
if (plain != null) {
return WorkflowScheduleRunIdReader.toUuid(plain);
}
Object namespaced =
runtimeService.getVariable(
processInstanceId,
getNamespacedVariableName(GLOBAL_NAMESPACE, WORKFLOW_SCHEDULE_RUN_ID_VARIABLE));
return namespaced != null ? WorkflowScheduleRunIdReader.toUuid(namespaced) : null;
} catch (Exception e) {
LOG.debug(
"[WorkflowFailure] Could not read scheduleRunId for process {}: {}",
processInstanceId,
e.getMessage());
return null;
}
}

private boolean isStageStatusEnabled(String workflowDefinitionKey) {
try {
WorkflowDefinitionRepository workflowDefRepository =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.openmetadata.service.governance.workflows;

import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE;
import static org.openmetadata.service.governance.workflows.Workflow.UPDATED_BY_VARIABLE;
import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName;
import static org.openmetadata.service.governance.workflows.elements.TriggerFactory.getTriggerWorkflowId;

Expand Down Expand Up @@ -1109,17 +1111,22 @@ public boolean isActivityWithVariableExecuting(
}

public boolean triggerWorkflow(String workflowName) {
return triggerWorkflow(workflowName, null);
}

public boolean triggerWorkflow(String workflowName, String triggeredBy) {
RuntimeService runtimeService = processEngine.getRuntimeService();
RepositoryService repositoryService = processEngine.getRepositoryService();

String baseProcessKey = getTriggerWorkflowId(workflowName);
Map<String, Object> initialVars = buildInitialVariables(triggeredBy);

// Prefer the current workflow definition config to avoid triggering stale process keys left
// behind by older deployments.
List<String> configuredTriggerKeys =
getConfiguredPeriodicTriggerProcessKeys(workflowName, baseProcessKey);
if (!configuredTriggerKeys.isEmpty()) {
return triggerProcessDefinitions(runtimeService, configuredTriggerKeys);
return triggerProcessDefinitions(runtimeService, configuredTriggerKeys, initialVars);
}

// Legacy fallback: trigger all latest process definitions matching the workflow prefix.
Expand All @@ -1131,26 +1138,36 @@ public boolean triggerWorkflow(String workflowName) {
.list();
if (!processDefinitions.isEmpty()) {
return triggerProcessDefinitions(
runtimeService, processDefinitions.stream().map(ProcessDefinition::getKey).toList());
runtimeService,
processDefinitions.stream().map(ProcessDefinition::getKey).toList(),
initialVars);
}

// Fallback to original behavior for non-periodic trigger types.
try {
runtimeService.startProcessInstanceByKey(baseProcessKey);
runtimeService.startProcessInstanceByKey(baseProcessKey, initialVars);
return true;
} catch (FlowableObjectNotFoundException ex) {
LOG.error("No process definition found for key: {}", baseProcessKey);
return false;
}
}

private Map<String, Object> buildInitialVariables(String triggeredBy) {
Map<String, Object> vars = new HashMap<>();
if (triggeredBy != null && !triggeredBy.isEmpty()) {
vars.put(getNamespacedVariableName(GLOBAL_NAMESPACE, UPDATED_BY_VARIABLE), triggeredBy);
}
return vars;
}

private boolean triggerProcessDefinitions(
RuntimeService runtimeService, List<String> processKeys) {
RuntimeService runtimeService, List<String> processKeys, Map<String, Object> initialVars) {
boolean anyStarted = false;
for (String processKey : processKeys) {
try {
LOG.info("Triggering process with key: {}", processKey);
runtimeService.startProcessInstanceByKey(processKey);
runtimeService.startProcessInstanceByKey(processKey, initialVars);
anyStarted = true;
} catch (Exception e) {
LOG.error("Failed to start process: {}", processKey, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,23 @@ private void addWorkflowInstance(
DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) {
String processKey = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
String workflowDefinitionName = getMainWorkflowDefinitionNameFromTrigger(processKey);
if (workflowDefinitionName.equals(processKey)) {
String existingKey = execution.getProcessInstanceBusinessKey();
if (existingKey != null && !existingKey.isEmpty()) {
LOG.debug(
"[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - process key '{}' is not an OM trigger workflow, skipping",
execution.getProcessInstanceId(),
processKey);
"[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - business key already set by trigger, skipping",
execution.getProcessInstanceId());
return;
}
updateBusinessKey(execution.getProcessInstanceId());
UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey());
UUID scheduleRunId = WorkflowScheduleRunIdReader.readFrom(execution);

workflowInstanceRepository.addNewWorkflowInstance(
workflowDefinitionName,
workflowInstanceId,
System.currentTimeMillis(),
execution.getVariables());
execution.getVariables(),
scheduleRunId);
LOG.debug(
"[WORKFLOW_INSTANCE_CREATED] Workflow: {}, InstanceId: {}, ProcessInstance: {} - Workflow instance record created successfully",
workflowDefinitionName,
Expand All @@ -123,14 +125,14 @@ private void updateWorkflowInstance(
DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) {
String processKey = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
String workflowDefinitionName = getMainWorkflowDefinitionNameFromTrigger(processKey);
if (workflowDefinitionName.equals(processKey)) {
String businessKeyForUpdate = execution.getProcessInstanceBusinessKey();
if (businessKeyForUpdate == null || businessKeyForUpdate.isEmpty()) {
LOG.debug(
"[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - process key '{}' is not an OM trigger workflow, skipping",
execution.getProcessInstanceId(),
processKey);
"[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - no business key, skipping update",
execution.getProcessInstanceId());
return;
}
UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey());
UUID workflowInstanceId = UUID.fromString(businessKeyForUpdate);

// Capture all variables including any failure indicators
java.util.Map<String, Object> variables = new java.util.HashMap<>(execution.getVariables());
Expand Down
Loading
Loading