Skip to content

Commit 9108d15

Browse files
authored
IGNITE-26295 Integrate Raft Log GC into the new storage (#7880)
1 parent dda8567 commit 9108d15

17 files changed

+912
-77
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.raft.storage.segstore;
19+
20+
/** Callback executed by the {@link RaftLogCheckpointer} before an index file is created. */
21+
@FunctionalInterface
22+
interface BeforeIndexFileCreatedCallback {
23+
void beforeIndexFileCreated(long indexFileSize);
24+
}

modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/FileProperties.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
/**
2323
* Represents properties common for some file types (namely Segment and Index files) used by the log storage.
2424
*/
25-
class FileProperties {
25+
class FileProperties implements Comparable<FileProperties> {
2626
/** File ordinal. Incremented each time a new file is created. */
2727
private final int ordinal;
2828

@@ -54,6 +54,17 @@ int generation() {
5454
return generation;
5555
}
5656

57+
@Override
58+
public int compareTo(FileProperties o) {
59+
int cmp = Integer.compare(ordinal, o.ordinal);
60+
61+
if (cmp != 0) {
62+
return cmp;
63+
}
64+
65+
return Integer.compare(generation, o.generation);
66+
}
67+
5768
@Override
5869
public boolean equals(Object o) {
5970
if (o == null || getClass() != o.getClass()) {

modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.ignite.internal.logger.IgniteLogger;
5252
import org.apache.ignite.internal.logger.Loggers;
5353
import org.jetbrains.annotations.Nullable;
54+
import org.jetbrains.annotations.VisibleForTesting;
5455

5556
/**
5657
* File manager responsible for persisting {@link ReadModeIndexMemTable}s to index files.
@@ -215,10 +216,8 @@ private List<IndexMetaSpec> saveIndexMemtable(
215216
try (var os = new BufferedOutputStream(Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE))) {
216217
os.write(fileHeaderWithIndexMetas.header());
217218

218-
Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
219-
220-
while (it.hasNext()) {
221-
SegmentInfo segmentInfo = it.next().getValue();
219+
for (Entry<Long, SegmentInfo> longSegmentInfoEntry : indexMemTable) {
220+
SegmentInfo segmentInfo = longSegmentInfoEntry.getValue();
222221

223222
// Segment Info may not contain payload in case of suffix truncation, see "IndexMemTable#truncateSuffix".
224223
if (segmentInfo.size() > 0) {
@@ -385,11 +384,7 @@ private static FileHeaderWithIndexMetas serializeHeaderAndFillMetadata(
385384

386385
var metaSpecs = new ArrayList<IndexMetaSpec>(numGroups);
387386

388-
Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
389-
390-
while (it.hasNext()) {
391-
Entry<Long, SegmentInfo> entry = it.next();
392-
387+
for (Entry<Long, SegmentInfo> entry : indexMemTable) {
393388
// Using the boxed value to avoid unnecessary autoboxing later.
394389
Long groupId = entry.getKey();
395390

@@ -504,6 +499,19 @@ private static byte[] payload(SegmentInfo segmentInfo) {
504499
return payloadBuffer.array();
505500
}
506501

502+
/**
503+
* Computes the size in bytes that the index file for the given {@code indexMemTable} will occupy on disk.
504+
*/
505+
static long computeIndexFileSize(ReadModeIndexMemTable indexMemTable) {
506+
long total = headerSize(indexMemTable.numGroups());
507+
508+
for (Entry<Long, SegmentInfo> longSegmentInfoEntry : indexMemTable) {
509+
total += payloadSize(longSegmentInfoEntry.getValue());
510+
}
511+
512+
return total;
513+
}
514+
507515
private static int headerSize(int numGroups) {
508516
return COMMON_META_SIZE + numGroups * GROUP_META_SIZE;
509517
}
@@ -512,7 +520,8 @@ private static int payloadSize(SegmentInfo segmentInfo) {
512520
return segmentInfo.size() * Integer.BYTES;
513521
}
514522

515-
private static String indexFileName(FileProperties fileProperties) {
523+
@VisibleForTesting
524+
static String indexFileName(FileProperties fileProperties) {
516525
return String.format(INDEX_FILE_NAME_FORMAT, fileProperties.ordinal(), fileProperties.generation());
517526
}
518527

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.raft.storage.segstore;
19+
20+
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
21+
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
22+
import java.io.IOException;
23+
import java.nio.file.Files;
24+
import java.nio.file.Path;
25+
import java.util.Comparator;
26+
import java.util.stream.Stream;
27+
28+
/**
29+
* Compaction strategy that prioritizes files with the most dead entries, fully-deletable files first.
30+
*/
31+
class MostGarbageFirstCompactionStrategy implements SegmentFileCompactionStrategy {
32+
private final Path segmentFilesDir;
33+
34+
private final IndexFileManager indexFileManager;
35+
36+
MostGarbageFirstCompactionStrategy(Path segmentFilesDir, IndexFileManager indexFileManager) {
37+
this.segmentFilesDir = segmentFilesDir;
38+
this.indexFileManager = indexFileManager;
39+
}
40+
41+
@Override
42+
public Stream<FileProperties> selectCandidates() throws IOException {
43+
var scores = new Object2LongOpenHashMap<FileProperties>();
44+
45+
Comparator<FileProperties> comparator =
46+
Comparator.<FileProperties>comparingLong(props -> scores.computeIfAbsent(props, this::score))
47+
.thenComparing(Comparator.naturalOrder());
48+
49+
//noinspection resource
50+
return Files.list(segmentFilesDir)
51+
.filter(p -> !p.getFileName().toString().endsWith(".tmp"))
52+
.map(SegmentFile::fileProperties)
53+
.filter(props -> Files.exists(indexFileManager.indexFilePath(props)))
54+
.sorted(comparator);
55+
}
56+
57+
private long score(FileProperties props) {
58+
Long2ObjectMap<IndexFileMeta> description = indexFileManager.describeSegmentFile(props.ordinal());
59+
60+
if (description.isEmpty()) {
61+
return -1; // Fully deletable — highest priority.
62+
}
63+
64+
long liveCount = 0;
65+
66+
for (IndexFileMeta meta : description.values()) {
67+
liveCount += meta.lastLogIndexExclusive() - meta.firstLogIndexInclusive();
68+
}
69+
70+
return liveCount;
71+
}
72+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.raft.storage.segstore;
19+
20+
import java.io.IOException;
21+
import java.nio.file.Files;
22+
import java.nio.file.Path;
23+
import java.util.stream.Stream;
24+
25+
/**
26+
* Compaction strategy that yields fully-checkpointed segment files in write order (oldest first).
27+
*/
28+
class OldestFirstCompactionStrategy implements SegmentFileCompactionStrategy {
29+
private final Path segmentFilesDir;
30+
31+
private final IndexFileManager indexFileManager;
32+
33+
OldestFirstCompactionStrategy(Path segmentFilesDir, IndexFileManager indexFileManager) {
34+
this.segmentFilesDir = segmentFilesDir;
35+
this.indexFileManager = indexFileManager;
36+
}
37+
38+
@Override
39+
public Stream<FileProperties> selectCandidates() throws IOException {
40+
return Files.list(segmentFilesDir)
41+
.filter(p -> !p.getFileName().toString().endsWith(".tmp"))
42+
.map(SegmentFile::fileProperties)
43+
.sorted()
44+
.filter(props -> Files.exists(indexFileManager.indexFilePath(props)));
45+
}
46+
}

modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import static org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.SWITCH_SEGMENT_RECORD;
2222
import static org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
2323
import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
24-
import static org.apache.ignite.lang.ErrorGroups.Marshalling.COMMON_ERR;
24+
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
2525

2626
import java.io.IOException;
2727
import java.nio.ByteBuffer;
@@ -62,14 +62,18 @@ class RaftLogCheckpointer {
6262

6363
private final FailureProcessor failureProcessor;
6464

65+
private final BeforeIndexFileCreatedCallback beforeIndexFileCreated;
66+
6567
RaftLogCheckpointer(
6668
String nodeName,
6769
IndexFileManager indexFileManager,
6870
FailureProcessor failureProcessor,
69-
int maxQueueSize
71+
int maxQueueSize,
72+
BeforeIndexFileCreatedCallback beforeIndexFileCreated
7073
) {
7174
this.indexFileManager = indexFileManager;
7275
this.failureProcessor = failureProcessor;
76+
this.beforeIndexFileCreated = beforeIndexFileCreated;
7377

7478
queue = new CheckpointQueue(maxQueueSize);
7579
checkpointThread = new IgniteThread(nodeName, "segstore-checkpoint", new CheckpointTask());
@@ -91,7 +95,7 @@ private void stopCheckpointThread() {
9195
} catch (InterruptedException e) {
9296
Thread.currentThread().interrupt();
9397

94-
throw new IgniteInternalException(COMMON_ERR, "Interrupted while waiting for the checkpoint thread to finish.", e);
98+
throw new IgniteInternalException(INTERNAL_ERR, "Interrupted while waiting for the checkpoint thread to finish.", e);
9599
}
96100
}
97101

@@ -101,7 +105,7 @@ void onRollover(SegmentFile segmentFile, ReadModeIndexMemTable indexMemTable) {
101105
} catch (InterruptedException e) {
102106
Thread.currentThread().interrupt();
103107

104-
throw new IgniteInternalException(COMMON_ERR, "Interrupted while adding an entry to the checkpoint queue.", e);
108+
throw new IgniteInternalException(INTERNAL_ERR, "Interrupted while adding an entry to the checkpoint queue.", e);
105109
}
106110
}
107111

@@ -155,6 +159,11 @@ public void run() {
155159

156160
segmentFile.sync();
157161

162+
long indexFileSize = IndexFileManager.computeIndexFileSize(entry.memTable());
163+
164+
// Notify about the upcoming log size increase.
165+
beforeIndexFileCreated.beforeIndexFileCreated(indexFileSize);
166+
158167
indexFileManager.saveNewIndexMemtable(entry.memTable());
159168

160169
queue.removeHead();

0 commit comments

Comments
 (0)