Skip to content

Commit 107341e

Browse files
bxfjbRongtongJin
authored andcommitted
[ISSUE #9271] Enhance tiered storage getQueueOffsetByTimeAsync (#9272)
1 parent 0f75753 commit 107341e

2 files changed

Lines changed: 48 additions & 8 deletions

File tree

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
3939
import org.apache.rocketmq.tieredstore.metadata.entity.QueueMetadata;
4040
import org.apache.rocketmq.tieredstore.metadata.entity.TopicMetadata;
41+
import org.apache.rocketmq.tieredstore.provider.FileSegment;
4142
import org.apache.rocketmq.tieredstore.util.MessageFormatUtil;
4243
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
4344
import org.slf4j.Logger;
@@ -302,9 +303,26 @@ public CompletableFuture<Long> getQueueOffsetByTimeAsync(long timestamp, Boundar
302303
return CompletableFuture.completedFuture(cqMin);
303304
}
304305

306+
// get correct consume queue file by binary search
307+
List<FileSegment> consumeQueueFileList = this.consumeQueue.getFileSegmentList();
308+
int low = 0, high = consumeQueueFileList.size() - 1;
309+
int mid = low + (high - low) / 2;
310+
while (low <= high) {
311+
FileSegment fileSegment = consumeQueueFileList.get(mid);
312+
if (fileSegment.getMinTimestamp() <= timestamp && timestamp <= fileSegment.getMaxTimestamp()) {
313+
break;
314+
} else if (timestamp < fileSegment.getMinTimestamp()) {
315+
high = mid - 1;
316+
} else {
317+
low = mid + 1;
318+
}
319+
mid = low + (high - low) / 2;
320+
}
321+
FileSegment target = consumeQueueFileList.get(mid);
322+
305323
// binary search lower bound index in a sorted array
306-
long minOffset = cqMin;
307-
long maxOffset = cqMax;
324+
long minOffset = target.getBaseOffset() / MessageFormatUtil.CONSUME_QUEUE_UNIT_SIZE;
325+
long maxOffset = target.getCommitOffset() / MessageFormatUtil.CONSUME_QUEUE_UNIT_SIZE - 1;
308326
List<String> queryLog = new ArrayList<>();
309327
while (minOffset < maxOffset) {
310328
long middle = minOffset + (maxOffset - minOffset) / 2;

tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,13 +177,35 @@ public void testBinarySearchInQueueByTime() {
177177
// append message to consume queue
178178
flatFile.consumeQueue.initOffset(50 * ConsumeQueue.CQ_STORE_UNIT_SIZE);
179179

180-
for (int i = 0; i < 5; i++) {
181-
AppendResult appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
182-
mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN * i,
183-
MessageFormatUtilTest.MSG_LEN, 0, timestamp1, 50 + i,
180+
AppendResult appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
181+
mq.getTopic(), mq.getQueueId(), 0,
182+
MessageFormatUtilTest.MSG_LEN, 0, timestamp1, 50,
184183
"", "", 0, 0, null));
185-
Assert.assertEquals(AppendResult.SUCCESS, appendResult);
186-
}
184+
Assert.assertEquals(AppendResult.SUCCESS, appendResult);
185+
186+
appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
187+
mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN,
188+
MessageFormatUtilTest.MSG_LEN, 0, timestamp2, 51,
189+
"", "", 0, 0, null));
190+
Assert.assertEquals(AppendResult.SUCCESS, appendResult);
191+
192+
appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
193+
mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN * 2,
194+
MessageFormatUtilTest.MSG_LEN, 0, timestamp2, 52,
195+
"", "", 0, 0, null));
196+
Assert.assertEquals(AppendResult.SUCCESS, appendResult);
197+
198+
appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
199+
mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN * 3,
200+
MessageFormatUtilTest.MSG_LEN, 0, timestamp2, 53,
201+
"", "", 0, 0, null));
202+
Assert.assertEquals(AppendResult.SUCCESS, appendResult);
203+
204+
appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
205+
mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN * 4,
206+
MessageFormatUtilTest.MSG_LEN, 0, timestamp3, 54,
207+
"", "", 0, 0, null));
208+
Assert.assertEquals(AppendResult.SUCCESS, appendResult);
187209

188210
// commit message will increase max consume queue offset
189211
Assert.assertTrue(flatFile.commitAsync().join());

0 commit comments

Comments
 (0)