Skip to content

Commit b0b4b83

Browse files
He-PinCopilot
andcommitted
Validate replica sequence number before updating seenPerReplica
Add defensive validation in handleExternalReplicatedEventPersist to warn when receiving events with unexpected sequence numbers from replicas. This catches bugs in external replication transport implementations. Resolves FIXME referencing akka/akka-core#29259. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent f1a36ba commit b0b4b83

1 file changed

Lines changed: 20 additions & 1 deletion

File tree

  • persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal

persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,26 @@ private[pekko] object Running {
508508
OptionVal.Some(
509509
ReplicatedEventMetadata(event.originReplica, event.originSequenceNr, updatedVersion, isConcurrent)))
510510
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr)
511-
// FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259
511+
512+
// Validate that the sequence number from the replica is the expected next one
513+
val expectedSeqNr = newState2.seenPerReplica.getOrElse(event.originReplica, 0L) + 1
514+
if (event.originSequenceNr < expectedSeqNr) {
515+
setup.internalLogger.warnN(
516+
"Received replicated event from [{}] with sequence number [{}] that has already been seen, expected [{}]. " +
517+
"This may indicate a bug in the replication transport.",
518+
event.originReplica,
519+
event.originSequenceNr,
520+
expectedSeqNr)
521+
} else if (event.originSequenceNr > expectedSeqNr) {
522+
setup.internalLogger.warnN(
523+
"Received replicated event from [{}] with unexpected sequence number [{}], expected [{}]. " +
524+
"Events may have been lost or arrived out of order. " +
525+
"This may indicate a bug in the replication transport.",
526+
event.originReplica,
527+
event.originSequenceNr,
528+
expectedSeqNr)
529+
}
530+
512531
val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr)
513532
persistingEvents(
514533
newState2.copy(seenPerReplica = updatedSeen, version = updatedVersion),

0 commit comments

Comments
 (0)