Skip to content

Commit 14f3ab7

Browse files
He-PinCopilot
andcommitted
Add defensive sequence number validation in replicated event persistence
Replace the FIXME (akka#29259) comment in handleExternalReplicatedEventPersist with a proper defensive validation of the replica sequence number before updating seenPerReplica. The validation logs a warning when the incoming event's originSequenceNr does not match the expected next sequence number for that replica. This covers the gap scenario where events from a replica may arrive out of order via the replication stream (onReplicatedEvent path). The event is still persisted for backward compatibility — rejecting it could stall the replication stream. Key design decisions (confirmed by cross-review from GPT-5.4 and Sonnet 4.6): - Only gap detection (seqNr > expected) can fire from current callers; onPublishedEvent filters both duplicates and gaps before calling. onReplicatedEvent filters duplicates via alreadySeen() but allows gaps. - Uses != check (not separate < and > branches) to avoid dead code. - Warning message includes the advancing seqNr to help operators diagnose potential event loss. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 5addd96 commit 14f3ab7

File tree

1 file changed

+20
-1
lines changed
  • persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal

1 file changed

+20
-1
lines changed

persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,26 @@ private[pekko] object Running {
523523
OptionVal.Some(
524524
ReplicatedEventMetadata(event.originReplica, event.originSequenceNr, updatedVersion, isConcurrent)))
525525
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr)
526-
// FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259
526+
527+
// Sequence number validation (resolves https://github.com/akka/akka/issues/29259):
528+
// Callers are responsible for filtering duplicates and handling gaps:
529+
// - onPublishedEvent: rejects duplicates and gaps, only exact-match seqNr passes through
530+
// - onReplicatedEvent: filters duplicates via alreadySeen(), but gaps may pass through
531+
// if the replication source delivers events out of order
532+
// We add a defensive warning for unexpected sequence numbers to aid diagnosis.
533+
// The event is still persisted for backward compatibility; rejecting it could stall replication.
534+
val expectedSeqNr = newState2.seenPerReplica.getOrElse(event.originReplica, 0L) + 1
535+
if (event.originSequenceNr != expectedSeqNr) {
536+
setup.internalLogger.warnN(
537+
"Unexpected replication sequence number [{}] from replica [{}], expected [{}]. " +
538+
"This may indicate event loss or out-of-order delivery in the replication stream. " +
539+
"Advancing seenPerReplica to [{}]; earlier events from this replica may be skipped.",
540+
event.originSequenceNr,
541+
event.originReplica,
542+
expectedSeqNr,
543+
event.originSequenceNr)
544+
}
545+
527546
val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr)
528547
persistingEvents(
529548
newState2.copy(seenPerReplica = updatedSeen, version = updatedVersion),

0 commit comments

Comments
 (0)