Governance Workflows: scheduleRunId correlation, batch trigger refactor, stage entity enrichment#27081
Governance Workflows: scheduleRunId correlation, batch trigger refactor, stage entity enrichment#27081yan-3005 wants to merge 20 commits intoram/inclusive-gatewayfrom
Conversation
…or, stage entity enrichment - Remove MultiInstanceLoopCharacteristics from PeriodicBatchEntityTrigger; one CallActivity per fetch iteration passes the full entityList to the main workflow - Add scheduleRunId UUID generated once per trigger fire, propagated into every spawned WorkflowInstance and WorkflowInstanceState row for batch grouping - Add WorkflowScheduleRunIdSetterListener (idempotent, fires before WorkflowInstanceListener) and WorkflowScheduleRunIdReader shared utility - Add entityList and updatedBy to WorkflowInstance and WorkflowInstanceState schemas; stage.entityList is resolved at stage START using the node's inputNamespaceMap so each stage records exactly the entity list it received via routing (global, gold, true, etc.) - Add getEntityListFromVariables static overload to WorkflowVariableHandler for map-based resolution without a live DelegateExecution - Add NewStageRequest record to carry stage creation params (avoids 5-param cap) - Add scheduleRunId generated columns + indexes to 1.14.0 MySQL and Postgres migrations - Add listByScheduleRunId to WorkflowInstance and WorkflowInstanceState DAOs - Add scheduleRunId query param to WorkflowInstanceResource and WorkflowInstanceStateResource - Add unit tests: WorkflowVariableHandlerTest, WorkflowScheduleRunIdReaderTest, WorkflowScheduleRunIdSetterListenerTest, PeriodicBatchEntityTriggerTest, TriggerFactoryTest
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
✅ TypeScript Types Auto-UpdatedThe generated TypeScript types have been automatically updated based on JSON schema changes in this PR. |
…PR comments - WorkflowVariableHandler: add getUpdatedByFromVariables() to resolve updatedBy via inputNamespaceMap (mirrors getEntityListFromVariables) - WorkflowInstanceStageListener: factor out getStageInputNamespaceMap(), add resolveStageUpdatedBy(), pass resolved value to updateStage(); narrow catch(Exception) to EntityNotFoundException - WorkflowInstanceStateRepository.updateStage: accept resolved updatedBy as parameter, drop broken extractUpdatedBy() that used stage.getName() as namespace (never matched real values) - WorkflowInstanceRepository: replace non-deterministic extractUpdatedBy() (HashMap scan) with extractUpdatedByFromStates() that picks the latest stage's correctly-resolved updatedBy - WorkflowFailureListener: read scheduleRunId from RuntimeService before building NewStageRequest so failure stages carry the correlation id; import NewStageRequest directly instead of FQN - WorkflowScheduleRunIdReader.toUuid: catch IllegalArgumentException, log warning, return null instead of throwing on malformed input - WorkflowInstanceResource/StateResource: route scheduleRunId queries directly through listByScheduleRunId() DAO method instead of silently ignoring the filter parameter via ListFilter
Delegate to WorkflowScheduleRunIdReader.toUuid() (which handles both UUID and String) instead of instanceof UUID checks that silently drop String values Flowable may have serialized during persistence.
…gination Add getScheduleRunIdCondition() to ListFilter following the getServerIdCondition() pattern so scheduleRunId composes correctly with startTs/endTs/limit/latest. Remove the special-case bypass branch in both resource list handlers and delete the now-dead listByScheduleRunId helpers from the repositories and CollectionDAO.
…e table-scoped serverId in ListFilter
4c26498 to
277e75a
Compare
…workflow-instance-states-impl # Conflicts: # openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchChangeEventsImpl.java
…m/open-metadata/OpenMetadata into ram/workflow-instance-states-impl
|
The Java checkstyle failed. Please run You can install the pre-commit hooks with |
|
|
||
| String stage = | ||
| Optional.ofNullable(execution.getCurrentActivityId()).orElse(workflowDefinitionName); | ||
| WorkflowDefinition workflowDefinition = fetchWorkflowDefinition(workflowDefinitionName); |
There was a problem hiding this comment.
⚠️ Performance: fetchWorkflowDefinition called twice per stage event (start + end)
In WorkflowInstanceStageListener, fetchWorkflowDefinition is called in both addNewStage (stage START) and updateStage (stage END), resulting in two DB lookups per stage lifecycle. The WorkflowDefinition doesn't change between start and end, so the result from the START call could be cached on the execution (e.g., as a transient variable or instance field) and reused at END.
This was flagged as resolved in previous findings for the duplicate lookup within a single event, but a new duplicate was introduced across start/end events for the same stage.
Suggested fix:
Store the WorkflowDefinition as a process-scoped variable or
cache it in a local variable keyed by workflowDefinitionName,
so the END event reuses the definition fetched at START.
For example, store it in varHandler at START:
varHandler.setNodeVariable("cachedWorkflowDef", workflowDefinition);
and retrieve it at END before falling back to DB.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Code Review
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
|
|



Summary
MultiInstanceLoopCharacteristicsfromPeriodicBatchEntityTrigger; oneCallActivityper fetch iteration passes the fullentityListto the main workflow. Inclusive gateways insideMainWorkflowhandle per-entity fan-out within a single Flowable execution.scheduleRunIdcorrelation: generates one UUID per trigger fire, propagated into every spawnedWorkflowInstanceandWorkflowInstanceStaterow — allows grouping all rows from one periodic trigger fire into a single scheduled run.stage.entityListresolved at stage START using the node'sinputNamespaceMap(same logic nodes use at runtime), so each stage records exactly the entity list it received via routing (entityList,gold_entityList,true_entityList, etc.).stage.updatedByset at stage END from node-namespaced variable.WorkflowInstancegains top-levelentityList(fromglobal_entityListat start) andupdatedBy(scanned from variables at end).1.14.0): generated columns + indexes forscheduleRunIdon bothworkflow_instance_time_seriesandworkflow_instance_state_time_seriesfor MySQL and Postgres.scheduleRunIdquery param onWorkflowInstanceResourceandWorkflowInstanceStateResourcefor batch grouping queries.Key design decisions
WorkflowScheduleRunIdSetterListenerfires beforeWorkflowInstanceListener(ordering enforced inattachScheduleRunIdListener) so the trigger's ownWorkflowInstancerow also carries thescheduleRunId.stage.entityListis fetched at stage start (not end) because the routing has already resolved which branch this execution is on — usinggetEntityList(inputNamespaceMap, varHandler)with the liveDelegateExecution.NewStageRequestrecord replaces the 5-param method signature cap.WorkflowVariableHandler.getEntityListFromVariablesstatic overload allows map-based resolution without a liveDelegateExecution.Test plan
WorkflowVariableHandlerTest— 6 cases: plainentityList, banded lists (gold_,silver_,true_), key-not-found, empty mapWorkflowScheduleRunIdReaderTest— 5 cases: UUID object, UUID-as-string, namespaced fallback, null when absent, plain takes precedenceWorkflowScheduleRunIdSetterListenerTest— sets on first call, idempotent, unique IDs per processPeriodicBatchEntityTriggerTest— no multi-instance loop, no inherited business key,entityList+scheduleRunIdIOParameters, process structureTriggerFactoryTest— factory creates correct trigger type