What would you like to be improved?
JobManager.runJob submits the job to jobExecutor before persisting JobEntity in entityStore. If entityStore.put(...) throws IOException, the method throws, but the submitted job remains queued/running. Because lifecycle reconciliation and cleanup operate on persisted JobEntity records, this leaves an orphaned execution that cannot be tracked or managed through normal APIs.
How should we improve?
Add compensation logic in runJob: if submitJob succeeds but entityStore.put fails, attempt jobExecutor.cancelJob(jobExecutionId) before rethrowing. Preserve original failure semantics (throw the persistence failure) while logging rollback success/failure. Optionally attempt staging-directory cleanup in this failure path as best effort. This makes submit+persist effectively atomic from the caller’s perspective and prevents untracked running jobs.
Here a test to help:
@Test
public void testRunJobShouldCancelSubmittedJobWhenStorePutFails() throws IOException {
mockedMetalake
.when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
.thenAnswer(a -> null);
JobTemplateEntity shellJobTemplate =
newShellJobTemplateEntity("shell_job", "A shell job template");
when(jobManager.getJobTemplate(metalake, shellJobTemplate.name())).thenReturn(shellJobTemplate);
String jobExecutionId = "job_execution_id_for_test";
when(jobExecutor.submitJob(any())).thenReturn(jobExecutionId);
doThrow(new IOException("Entity store error"))
.when(entityStore)
.put(any(JobEntity.class), anyBoolean());
Assertions.assertThrows(
RuntimeException.class, () -> jobManager.runJob(metalake, "shell_job", Collections.emptyMap()));
verify(jobExecutor, times(1)).cancelJob(jobExecutionId);
}
What would you like to be improved?
JobManager.runJob submits the job to jobExecutor before persisting JobEntity in entityStore. If entityStore.put(...) throws IOException, the method throws, but the submitted job remains queued/running. Because lifecycle reconciliation and cleanup operate on persisted JobEntity records, this leaves an orphaned execution that cannot be tracked or managed through normal APIs.
How should we improve?
Add compensation logic in runJob: if submitJob succeeds but entityStore.put fails, attempt jobExecutor.cancelJob(jobExecutionId) before rethrowing. Preserve original failure semantics (throw the persistence failure) while logging rollback success/failure. Optionally attempt staging-directory cleanup in this failure path as best effort. This makes submit+persist effectively atomic from the caller’s perspective and prevents untracked running jobs.
Here a test to help: