Skip to content

Commit 593fccf

Browse files
authored
[Issue 1446][Consumer] Fix consumer can't consume resent chunked messages (#1464)
Master Issue: #1446 related issue apache/pulsar#21070 and apache/pulsar#21101 ### Motivation Current, when the producer resend the chunked message like this: ``` M1: UUID: 0, ChunkID: 0 M2: UUID: 0, ChunkID: 0 // Resend the first chunk M3: UUID: 0, ChunkID: 1 ``` When the consumer received the M2, it will find that it's already tracking the UUID:0 chunked messages, and will then discard the message M1 and M2. This will lead to unable to consume the whole chunked message even though it's already persisted in the Pulsar topic. Here is the code logic: ```Go if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != ctx.lastChunkedMsgID+1 { lastChunkedMsgID := -1 totalChunks := -1 if ctx != nil { lastChunkedMsgID = int(ctx.lastChunkedMsgID) totalChunks = int(ctx.totalChunks) ctx.chunkedMsgBuffer.Clear() } pc.log.Warnf(fmt.Sprintf( "Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d", msgID.String(), lastChunkedMsgID, chunkID, totalChunks)) pc.chunkedMsgCtxMap.remove(uuid) pc.availablePermits.inc() return nil } ``` The bug can be easily reproduced using the testcase `TestChunkWithReconnection` and `TestResendChunkMessages` introduced by this PR. ### Modifications The current chunk processing strategy is consistent with the behavior of the Java client: https://github.com/apache/pulsar/blob/52a4d5ee84fad6af2736376a6fcdd1bc41e7c52f/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1579 When receiving the new duplicated first chunk of a chunked message, the consumer discard the current chunked message context and create a new context to track the following messages. For the case mentioned in Motivation, the M1 will be released and the consumer will assemble M2 and M3 as the chunked message.
1 parent e7eb92f commit 593fccf

File tree

2 files changed

+303
-8
lines changed

2 files changed

+303
-8
lines changed

pulsar/consumer_partition.go

Lines changed: 136 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1461,35 +1461,161 @@ func (pc *partitionConsumer) processMessageChunk(compressedPayload internal.Buff
14611461
partitionIdx: pc.partitionIdx,
14621462
}
14631463

1464-
if msgMeta.GetChunkId() == 0 {
1464+
if msgMeta.GetChunkId() != msgMeta.GetNumChunksFromMsg()-1 {
1465+
pc.availablePermits.inc()
1466+
}
1467+
if chunkID == 0 {
1468+
// Handle ack hole case when receive duplicated chunks.
1469+
// There are two situation that receives chunks with the same sequence ID and chunk ID.
1470+
// Situation 1 - Message redeliver:
1471+
// For example:
1472+
// Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
1473+
// Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
1474+
// Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
1475+
// Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
1476+
// Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
1477+
// In this case, chunk-3 and chunk-4 have the same msgID with chunk-1 and chunk-2.
1478+
// This may be caused by message redeliver, we can't ack any chunk in this case here.
1479+
// Situation 2 - Corrupted chunk message
1480+
// For example:
1481+
// Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
1482+
// Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
1483+
// Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
1484+
// Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
1485+
// Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
1486+
// In this case, all the chunks with different msgIDs and are persistent in the topic.
1487+
// But Chunk-1 and Chunk-2 belong to a corrupted chunk message that must be skipped since
1488+
// they will not be delivered to end users. So we should ack them here to avoid ack hole.
1489+
ctx := pc.chunkedMsgCtxMap.get(uuid)
1490+
if ctx != nil {
1491+
isCorruptedChunkMessageDetected := true
1492+
for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
1493+
if previousChunkMsgID == nil {
1494+
continue
1495+
}
1496+
if previousChunkMsgID.equal(msgID) {
1497+
isCorruptedChunkMessageDetected = false
1498+
break
1499+
}
1500+
}
1501+
if isCorruptedChunkMessageDetected {
1502+
ctx.discard(pc)
1503+
}
1504+
// The first chunk of a new chunked-message received
1505+
// before receiving other chunks of previous chunked-message
1506+
// so, remove previous chunked-message from map and release buffer
1507+
pc.log.Warnf(fmt.Sprintf(
1508+
"[%s] [%s] Receive a duplicated chunk id=0 message with messageId [%s], sequenceId [%d], "+
1509+
"uuid [%s]. Remove previous chunk context with lastChunkedMsgID [%d]",
1510+
pc.name,
1511+
pc.options.subscription,
1512+
msgID.String(),
1513+
msgMeta.GetSequenceId(),
1514+
msgMeta.GetUuid(),
1515+
ctx.lastChunkedMsgID,
1516+
))
1517+
ctx.chunkedMsgBuffer.Clear()
1518+
pc.chunkedMsgCtxMap.remove(uuid)
1519+
}
14651520
pc.chunkedMsgCtxMap.addIfAbsent(uuid,
14661521
numChunks,
14671522
totalChunksSize,
14681523
)
14691524
}
14701525

1526+
// discard message if chunk is out-of-order
14711527
ctx := pc.chunkedMsgCtxMap.get(uuid)
1472-
14731528
if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != ctx.lastChunkedMsgID+1 {
1529+
// Filter and ack duplicated chunks instead of discard ctx.
1530+
// For example:
1531+
// Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
1532+
// Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
1533+
// Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
1534+
// Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
1535+
// Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
1536+
// Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
1537+
// We should filter and ack chunk-4 and chunk-5.
1538+
if ctx != nil && chunkID <= ctx.lastChunkedMsgID {
1539+
pc.log.Warnf(fmt.Sprintf(
1540+
"[%s] [%s] Receive a duplicated chunk message with messageId [%s], "+
1541+
"last-chunk-Id [%d], chunkId [%d], sequenceId [%d], uuid [%s]",
1542+
pc.name,
1543+
pc.options.subscription,
1544+
msgID.String(),
1545+
ctx.lastChunkedMsgID,
1546+
chunkID,
1547+
msgMeta.GetSequenceId(),
1548+
msgMeta.GetUuid(),
1549+
))
1550+
// Just like the above logic of receiving the first chunk again.
1551+
// We only ack this chunk in the message duplication case.
1552+
isCorruptedChunkMessageDetected := true
1553+
for _, previousChunkMsgID := range ctx.chunkedMsgIDs {
1554+
if previousChunkMsgID == nil {
1555+
continue
1556+
}
1557+
if previousChunkMsgID.equal(msgID) {
1558+
isCorruptedChunkMessageDetected = false
1559+
break
1560+
}
1561+
}
1562+
if isCorruptedChunkMessageDetected {
1563+
pc.AckID(toTrackingMessageID(msgID))
1564+
}
1565+
return nil
1566+
}
1567+
// Chunked messages rely on TCP to ensure that chunk IDs are strictly increasing within a partition.
1568+
// If the current chunk ID is greater than ctx.lastChunkedMsgID + 1,
1569+
// it indicates that the current chunk is corrupted and may require resource cleanup.
14741570
lastChunkedMsgID := -1
14751571
totalChunks := -1
14761572
if ctx != nil {
14771573
lastChunkedMsgID = int(ctx.lastChunkedMsgID)
14781574
totalChunks = int(ctx.totalChunks)
1479-
ctx.chunkedMsgBuffer.Clear()
14801575
}
14811576
pc.log.Warnf(fmt.Sprintf(
1482-
"Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d",
1483-
msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
1484-
pc.chunkedMsgCtxMap.remove(uuid)
1485-
pc.availablePermits.inc()
1577+
"[%s] [%s] Received unexpected chunk messageId [%s], last-chunk-id [%d], "+
1578+
"chunkId = [%d], total-chunks [%d], sequenceId [%d], uuid [%s]",
1579+
pc.Topic(),
1580+
pc.options.subscription,
1581+
msgID.String(),
1582+
lastChunkedMsgID,
1583+
chunkID,
1584+
totalChunks,
1585+
msgMeta.GetSequenceId(),
1586+
msgMeta.GetUuid()),
1587+
)
1588+
if ctx != nil {
1589+
ctx.chunkedMsgBuffer.Clear()
1590+
pc.chunkedMsgCtxMap.remove(uuid)
1591+
}
1592+
// Consider a scenario where MaxPendingChunkedMessage is set to 1,
1593+
// and we have two messages (A and B), each consisting of three chunks:
1594+
// A chunks are Chunk-1, Chunk-2, Chunk-6 and B chunks are Chunk-3, Chunk-4, Chunk-5
1595+
// The consumer receives them in the following order:
1596+
// Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
1597+
// Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
1598+
// since MaxPendingChunkedMessage is 1, the context for A is removed
1599+
// Chunk-3 sequence ID: 1, chunk ID: 0, msgID: 1:3
1600+
// Chunk-4 sequence ID: 1, chunk ID: 1, msgID: 1:4
1601+
// Chunk-5 sequence ID: 1, chunk ID: 2, msgID: 1:5
1602+
// Chunk-6 sequence ID: 0, chunk ID: 2, msgID: 1:6
1603+
// If we acknowledge Chunk-6 here, message A would be lost.
1604+
// This is unexpected, as the user would expect A to be successfully consumed after redelivery.
1605+
// So the correct logic should be:
1606+
// If AutoAckIncompleteChunk is true, then acknowledge the message.
1607+
// Otherwise, do nothing so that the message can be redelivered in the future.
1608+
if pc.options.autoAckIncompleteChunk {
1609+
pc.AckID(toTrackingMessageID(msgID))
1610+
}
14861611
return nil
14871612
}
14881613

1614+
// The chunk ID meets the expected value,
1615+
// so we add the current chunk to the corresponding chunkedMsgCtx.
14891616
ctx.append(chunkID, msgID, compressedPayload)
14901617

14911618
if msgMeta.GetChunkId() != msgMeta.GetNumChunksFromMsg()-1 {
1492-
pc.availablePermits.inc()
14931619
return nil
14941620
}
14951621

@@ -2518,6 +2644,7 @@ func (c *chunkedMsgCtxMap) discardOldestChunkMessage(autoAck bool) {
25182644
if autoAck {
25192645
ctx.discard(c.pc)
25202646
}
2647+
ctx.chunkedMsgBuffer.Clear()
25212648
delete(c.chunkedMsgCtxs, oldest)
25222649
c.pc.log.Infof("Chunked message [%s] has been removed from chunkedMsgCtxMap", oldest)
25232650
}
@@ -2535,6 +2662,7 @@ func (c *chunkedMsgCtxMap) discardChunkMessage(uuid string, autoAck bool) {
25352662
if autoAck {
25362663
ctx.discard(c.pc)
25372664
}
2665+
ctx.chunkedMsgBuffer.Clear()
25382666
delete(c.chunkedMsgCtxs, uuid)
25392667
e := c.pendingQueue.Front()
25402668
for ; e != nil; e = e.Next() {

pulsar/message_chunking_test.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24+
"log/slog"
2425
"math/rand"
2526
"net/http"
27+
"os"
2628
"strings"
2729
"sync"
2830
"testing"
2931
"time"
3032

3133
"github.com/apache/pulsar-client-go/pulsar/internal"
34+
"github.com/apache/pulsar-client-go/pulsar/log"
3235

3336
"google.golang.org/protobuf/proto"
3437

@@ -578,3 +581,167 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) {
578581
uint32(internal.MaxMessageSize),
579582
)
580583
}
584+
585+
func TestChunkWithReconnection(t *testing.T) {
586+
sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
587+
client, err := NewClient(ClientOptions{
588+
URL: lookupURL,
589+
Logger: log.NewLoggerWithSlog(sLogger),
590+
})
591+
assert.Nil(t, err)
592+
defer client.Close()
593+
594+
topic := newTopicName()
595+
producer, err := client.CreateProducer(ProducerOptions{
596+
Topic: topic,
597+
DisableBatching: true,
598+
EnableChunking: true,
599+
ChunkMaxMessageSize: 100,
600+
MaxPendingMessages: 200000,
601+
SendTimeout: 60 * time.Second,
602+
})
603+
assert.NoError(t, err)
604+
assert.NotNil(t, producer)
605+
606+
c, err := client.Subscribe(ConsumerOptions{
607+
Topic: topic,
608+
Type: Exclusive,
609+
SubscriptionName: "chunk-subscriber",
610+
})
611+
assert.NoError(t, err)
612+
assert.NotNil(t, c)
613+
defer c.Close()
614+
615+
// Reduce publish rate to prevent the producer sending messages too fast
616+
url := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/publishRate"
617+
makeHTTPCall(t, http.MethodPost, url, "{\"publishThrottlingRateInMsg\": 1,\"publishThrottlingRateInByte\": 100}")
618+
// Need to wait some time to let the rate limiter take effect
619+
time.Sleep(2 * time.Second)
620+
621+
// payload/ChunkMaxMessageSize = 1000/100 = 10 msg, and publishThrottlingRateInMsg = 1
622+
// so that this chunk msg will send finish after 10 seconds
623+
producer.SendAsync(context.Background(), &ProducerMessage{
624+
Payload: createTestMessagePayload(1000),
625+
}, func(_ MessageID, _ *ProducerMessage, err error) {
626+
assert.Nil(t, err)
627+
})
628+
assert.NoError(t, err)
629+
630+
time.Sleep(5 * time.Second)
631+
// trigger topic unload to test sending chunk msg with reconnection
632+
url = adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/unload"
633+
makeHTTPCall(t, http.MethodPut, url, "")
634+
// Need to wait some time to receive all chunk messages
635+
time.Sleep(10 * time.Second)
636+
637+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
638+
msg, err := c.Receive(ctx)
639+
cancel()
640+
assert.NoError(t, err)
641+
assert.NotNil(t, msg.ID())
642+
}
643+
644+
func TestResendChunkMessages(t *testing.T) {
645+
sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
646+
client, err := NewClient(ClientOptions{
647+
URL: lookupURL,
648+
Logger: log.NewLoggerWithSlog(sLogger),
649+
})
650+
assert.Nil(t, err)
651+
defer client.Close()
652+
653+
topic := newTopicName()
654+
producer, err := client.CreateProducer(ProducerOptions{
655+
Topic: topic,
656+
DisableBatching: true,
657+
EnableChunking: true,
658+
ChunkMaxMessageSize: 100,
659+
})
660+
assert.NoError(t, err)
661+
assert.NotNil(t, producer)
662+
663+
c, err := client.Subscribe(ConsumerOptions{
664+
Topic: topic,
665+
Type: Exclusive,
666+
SubscriptionName: "chunk-subscriber",
667+
MaxPendingChunkedMessage: 10,
668+
})
669+
assert.NoError(t, err)
670+
assert.NotNil(t, c)
671+
defer c.Close()
672+
673+
sendSingleChunk(producer, "0", 0, 2)
674+
sendSingleChunk(producer, "0", 0, 2) // Resending the first chunk
675+
sendSingleChunk(producer, "1", 0, 3) // This is for testing the interwoven chunked message
676+
sendSingleChunk(producer, "1", 1, 3)
677+
sendSingleChunk(producer, "1", 0, 3) // Resending the UUID-1 chunked message
678+
sendSingleChunk(producer, "0", 1, 2)
679+
680+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
681+
msg, err := c.Receive(ctx)
682+
cancel()
683+
assert.NoError(t, err)
684+
assert.Equal(t, "chunk-0-0|chunk-0-1|", string(msg.Payload()))
685+
c.Ack(msg)
686+
687+
sendSingleChunk(producer, "1", 1, 3)
688+
sendSingleChunk(producer, "1", 2, 3)
689+
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
690+
msg, err = c.Receive(ctx)
691+
cancel()
692+
assert.NoError(t, err)
693+
assert.Equal(t, "chunk-1-0|chunk-1-1|chunk-1-2|", string(msg.Payload()))
694+
c.Ack(msg)
695+
}
696+
697+
func TestResendChunkWithAckHoleMessages(t *testing.T) {
698+
sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
699+
client, err := NewClient(ClientOptions{
700+
URL: lookupURL,
701+
Logger: log.NewLoggerWithSlog(sLogger),
702+
})
703+
assert.Nil(t, err)
704+
defer client.Close()
705+
706+
topic := newTopicName()
707+
producer, err := client.CreateProducer(ProducerOptions{
708+
Topic: topic,
709+
DisableBatching: true,
710+
EnableChunking: true,
711+
ChunkMaxMessageSize: 100,
712+
})
713+
assert.NoError(t, err)
714+
assert.NotNil(t, producer)
715+
716+
c, err := client.Subscribe(ConsumerOptions{
717+
Topic: topic,
718+
Type: Exclusive,
719+
SubscriptionName: "chunk-subscriber",
720+
MaxPendingChunkedMessage: 10,
721+
})
722+
assert.NoError(t, err)
723+
assert.NotNil(t, c)
724+
defer c.Close()
725+
726+
sendSingleChunk(producer, "0", 0, 4)
727+
sendSingleChunk(producer, "0", 1, 4)
728+
sendSingleChunk(producer, "0", 2, 4)
729+
sendSingleChunk(producer, "0", 1, 4) // Resending previous chunk
730+
sendSingleChunk(producer, "0", 2, 4)
731+
sendSingleChunk(producer, "0", 3, 4)
732+
733+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
734+
msg, err := c.Receive(ctx)
735+
cancel()
736+
assert.NoError(t, err)
737+
assert.Equal(t, "chunk-0-0|chunk-0-1|chunk-0-2|chunk-0-3|", string(msg.Payload()))
738+
c.Ack(msg)
739+
740+
sendSingleChunk(producer, "1", 0, 4)
741+
sendSingleChunk(producer, "1", 1, 4)
742+
sendSingleChunk(producer, "1", 4, 4) // send broken chunk
743+
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
744+
msg, err = c.Receive(ctx)
745+
cancel()
746+
assert.ErrorIs(t, err, context.DeadlineExceeded)
747+
}

0 commit comments

Comments
 (0)