Skip to content

Commit aef848f

Browse files
committed
Integrate parquet GC and retention policy into janitor actors
1 parent 63f8743 commit aef848f

File tree

6 files changed

+383
-86
lines changed

6 files changed

+383
-86
lines changed

quickwit/quickwit-janitor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ quickwit-doc-mapper = { workspace = true }
3232
quickwit-index-management = { workspace = true }
3333
quickwit-indexing = { workspace = true }
3434
quickwit-metastore = { workspace = true }
35+
quickwit-parquet-engine = { workspace = true }
3536
quickwit-proto = { workspace = true }
3637
quickwit-query = { workspace = true }
3738
quickwit-search = { workspace = true }

quickwit/quickwit-janitor/src/actors/garbage_collector.rs

Lines changed: 223 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ use std::time::{Duration, Instant};
2020
use async_trait::async_trait;
2121
use futures::{StreamExt, stream};
2222
use quickwit_actors::{Actor, ActorContext, Handler};
23+
use quickwit_common::is_metrics_index;
2324
use quickwit_common::shared_consts::split_deletion_grace_period;
24-
use quickwit_index_management::{GcMetrics, run_garbage_collect};
25+
use quickwit_index_management::{GcMetrics, run_garbage_collect, run_parquet_garbage_collect};
2526
use quickwit_metastore::ListIndexesMetadataResponseExt;
2627
use quickwit_proto::metastore::{
2728
ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient,
@@ -35,6 +36,42 @@ use crate::metrics::JANITOR_METRICS;
3536

3637
const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes
3738

39+
/// Result of a GC run (tantivy or parquet).
40+
struct GcRunResult {
41+
num_deleted_splits: usize,
42+
num_deleted_bytes: usize,
43+
num_failed: usize,
44+
sample_deleted_files: Vec<String>,
45+
}
46+
47+
impl GcRunResult {
48+
fn failed() -> Self {
49+
Self {
50+
num_deleted_splits: 0,
51+
num_deleted_bytes: 0,
52+
num_failed: 0,
53+
sample_deleted_files: Vec::new(),
54+
}
55+
}
56+
}
57+
58+
fn gc_metrics(split_type: &str) -> GcMetrics {
59+
GcMetrics {
60+
deleted_splits: JANITOR_METRICS
61+
.gc_deleted_splits
62+
.with_label_values(["success", split_type])
63+
.clone(),
64+
deleted_bytes: JANITOR_METRICS
65+
.gc_deleted_bytes
66+
.with_label_values([split_type])
67+
.clone(),
68+
failed_splits: JANITOR_METRICS
69+
.gc_deleted_splits
70+
.with_label_values(["error", split_type])
71+
.clone(),
72+
}
73+
}
74+
3875
/// Staged files needs to be deleted if there was a failure.
3976
/// TODO ideally we want clean up all staged splits every time we restart the indexing pipeline, but
4077
/// the grace period strategy should do the job for the moment.
@@ -76,15 +113,44 @@ impl GarbageCollector {
76113
counters: GarbageCollectorCounters::default(),
77114
}
78115
}
116+
// if !deleted_file_entries.is_empty() {
117+
// let num_deleted_splits = deleted_file_entries.len();
118+
// let num_deleted_bytes = deleted_file_entries
119+
// .iter()
120+
// .map(|entry| entry.file_size_bytes.as_u64() as usize)
121+
// .sum::<usize>();
122+
// let deleted_files: HashSet<&Path> = deleted_file_entries
123+
// .iter()
124+
// .map(|deleted_entry| deleted_entry.file_name.as_path())
125+
// .take(5)
126+
// .collect();
127+
// info!(
128+
// num_deleted_splits = num_deleted_splits,
129+
// "Janitor deleted {:?} and {} other splits.", deleted_files, num_deleted_splits,
130+
// );
131+
// self.counters.num_deleted_files += num_deleted_splits;
132+
// self.counters.num_deleted_bytes += num_deleted_bytes;
133+
134+
fn record_gc_result(&mut self, result: &GcRunResult, split_type: &str) {
135+
self.counters.num_failed_splits += result.num_failed;
136+
if result.num_deleted_splits > 0 {
137+
info!(
138+
"Janitor deleted {:?} and {} other {} splits.",
139+
result.sample_deleted_files,
140+
result.num_deleted_splits,
141+
split_type,
142+
);
143+
self.counters.num_deleted_files += result.num_deleted_splits;
144+
self.counters.num_deleted_bytes += result.num_deleted_bytes;
145+
}
146+
}
79147

80148
/// Gc Loop handler logic.
81149
/// Should not return an error to prevent the actor from crashing.
82150
async fn handle_inner(&mut self, ctx: &ActorContext<Self>) {
83151
debug!("loading indexes from the metastore");
84152
self.counters.num_passes += 1;
85153

86-
let start = Instant::now();
87-
88154
let response = match self
89155
.metastore
90156
.list_indexes_metadata(ListIndexesMetadataRequest::all())
@@ -106,84 +172,115 @@ impl GarbageCollector {
106172
info!("loaded {} indexes from the metastore", indexes.len());
107173

108174
let expected_count = indexes.len();
109-
let index_storages: HashMap<IndexUid, Arc<dyn Storage>> = stream::iter(indexes).filter_map(|index| {
175+
176+
// Resolve storages and split into tantivy vs parquet indexes.
177+
let mut tantivy_storages: HashMap<IndexUid, Arc<dyn Storage>> = HashMap::new();
178+
let mut parquet_storages: HashMap<IndexUid, Arc<dyn Storage>> = HashMap::new();
179+
180+
let resolved: Vec<_> = stream::iter(indexes).filter_map(|index| {
110181
let storage_resolver = self.storage_resolver.clone();
111182
async move {
112183
let index_uid = index.index_uid.clone();
113184
let index_uri = index.index_uri();
114185
let storage = match storage_resolver.resolve(index_uri).await {
115186
Ok(storage) => storage,
116187
Err(error) => {
117-
error!(index=%index.index_id(), error=?error, "failed to resolve the index storage Uri");
188+
error!(index=%index_uid.index_id, error=?error, "failed to resolve the index storage Uri");
118189
return None;
119190
}
120191
};
121192
Some((index_uid, storage))
122193
}}).collect()
123194
.await;
124195

125-
let storage_got_count = index_storages.len();
126-
self.counters.num_failed_storage_resolution += expected_count - storage_got_count;
196+
self.counters.num_failed_storage_resolution += expected_count - resolved.len();
127197

128-
if index_storages.is_empty() {
198+
for (index_uid, storage) in resolved {
199+
if is_metrics_index(&index_uid.index_id) {
200+
parquet_storages.insert(index_uid, storage);
201+
} else {
202+
tantivy_storages.insert(index_uid, storage);
203+
}
204+
}
205+
206+
if tantivy_storages.is_empty() && parquet_storages.is_empty() {
129207
return;
130208
}
131209

132-
let gc_res = run_garbage_collect(
133-
index_storages,
134-
self.metastore.clone(),
135-
STAGED_GRACE_PERIOD,
136-
split_deletion_grace_period(),
137-
false,
138-
Some(ctx.progress()),
139-
Some(GcMetrics {
140-
deleted_splits: JANITOR_METRICS
141-
.gc_deleted_splits
142-
.with_label_values(["success"])
143-
.clone(),
144-
deleted_bytes: JANITOR_METRICS.gc_deleted_bytes.clone(),
145-
failed_splits: JANITOR_METRICS
146-
.gc_deleted_splits
147-
.with_label_values(["error"])
148-
.clone(),
149-
}),
150-
)
151-
.await;
210+
// Run Tantivy GC
211+
if !tantivy_storages.is_empty() {
212+
let tantivy_start = Instant::now();
213+
let gc_res = run_garbage_collect(
214+
tantivy_storages,
215+
self.metastore.clone(),
216+
STAGED_GRACE_PERIOD,
217+
split_deletion_grace_period(),
218+
false,
219+
Some(ctx.progress()),
220+
Some(gc_metrics("tantivy")),
221+
)
222+
.await;
152223

153-
let run_duration = start.elapsed().as_secs();
154-
JANITOR_METRICS.gc_seconds_total.inc_by(run_duration);
224+
let tantivy_run_duration = tantivy_start.elapsed().as_secs();
225+
JANITOR_METRICS.gc_seconds_total.with_label_values(["tantivy"]).inc_by(tantivy_run_duration);
226+
227+
let result = match gc_res {
228+
Ok(removal_info) => {
229+
self.counters.num_successful_gc_run += 1;
230+
JANITOR_METRICS.gc_runs.with_label_values(["success", "tantivy"]).inc();
231+
GcRunResult {
232+
num_deleted_splits: removal_info.removed_split_entries.len(),
233+
num_deleted_bytes: removal_info.removed_split_entries.iter().map(|e| e.file_size_bytes.as_u64() as usize).sum(),
234+
num_failed: removal_info.failed_splits.len(),
235+
sample_deleted_files: removal_info.removed_split_entries.iter().take(5).map(|e| e.file_name.display().to_string()).collect(),
236+
}
237+
}
238+
Err(error) => {
239+
self.counters.num_failed_gc_run += 1;
240+
JANITOR_METRICS.gc_runs.with_label_values(["error", "tantivy"]).inc();
241+
error!(error=?error, "failed to run garbage collection");
242+
GcRunResult::failed()
243+
}
244+
};
245+
self.record_gc_result(&result, "tantivy");
246+
}
155247

156-
let deleted_file_entries = match gc_res {
157-
Ok(removal_info) => {
158-
self.counters.num_successful_gc_run += 1;
159-
JANITOR_METRICS.gc_runs.with_label_values(["success"]).inc();
160-
self.counters.num_failed_splits += removal_info.failed_splits.len();
161-
removal_info.removed_split_entries
162-
}
163-
Err(error) => {
164-
self.counters.num_failed_gc_run += 1;
165-
JANITOR_METRICS.gc_runs.with_label_values(["error"]).inc();
166-
error!(error=?error, "failed to run garbage collection");
167-
return;
168-
}
169-
};
170-
if !deleted_file_entries.is_empty() {
171-
let num_deleted_splits = deleted_file_entries.len();
172-
let num_deleted_bytes = deleted_file_entries
173-
.iter()
174-
.map(|entry| entry.file_size_bytes.as_u64() as usize)
175-
.sum::<usize>();
176-
let deleted_files: HashSet<&Path> = deleted_file_entries
177-
.iter()
178-
.map(|deleted_entry| deleted_entry.file_name.as_path())
179-
.take(5)
180-
.collect();
181-
info!(
182-
num_deleted_splits = num_deleted_splits,
183-
"Janitor deleted {:?} and {} other splits.", deleted_files, num_deleted_splits,
184-
);
185-
self.counters.num_deleted_files += num_deleted_splits;
186-
self.counters.num_deleted_bytes += num_deleted_bytes;
248+
// Run Parquet GC
249+
if !parquet_storages.is_empty() {
250+
let parquet_start = Instant::now();
251+
let gc_res = run_parquet_garbage_collect(
252+
parquet_storages,
253+
self.metastore.clone(),
254+
STAGED_GRACE_PERIOD,
255+
split_deletion_grace_period(),
256+
false,
257+
Some(ctx.progress()),
258+
Some(gc_metrics("parquet")),
259+
)
260+
.await;
261+
262+
let parquet_run_duration = parquet_start.elapsed().as_secs();
263+
JANITOR_METRICS.gc_seconds_total.with_label_values(["parquet"]).inc_by(parquet_run_duration);
264+
265+
let result = match gc_res {
266+
Ok(removal_info) => {
267+
self.counters.num_successful_gc_run += 1;
268+
JANITOR_METRICS.gc_runs.with_label_values(["success", "parquet"]).inc();
269+
GcRunResult {
270+
num_deleted_splits: removal_info.removed_split_count(),
271+
num_deleted_bytes: removal_info.removed_bytes() as usize,
272+
num_failed: removal_info.failed_split_count(),
273+
sample_deleted_files: removal_info.removed_parquet_splits_entries.iter().take(5).map(|e| format!("{}.parquet", e.split_id)).collect(),
274+
}
275+
}
276+
Err(error) => {
277+
self.counters.num_failed_gc_run += 1;
278+
JANITOR_METRICS.gc_runs.with_label_values(["error", "parquet"]).inc();
279+
error!(error=?error, "failed to run parquet garbage collection");
280+
GcRunResult::failed()
281+
}
282+
};
283+
self.record_gc_result(&result, "parquet");
187284
}
188285
}
189286
}
@@ -756,4 +853,69 @@ mod tests {
756853
assert_eq!(counters.num_failed_splits, 2000);
757854
universe.assert_quit().await;
758855
}
856+
857+
#[tokio::test]
858+
async fn test_garbage_collect_parquet_index() {
859+
use quickwit_metastore::ListMetricsSplitsResponseExt;
860+
use quickwit_parquet_engine::split::{
861+
MetricsSplitMetadata, MetricsSplitRecord, MetricsSplitState, SplitId, TimeRange,
862+
};
863+
use quickwit_proto::metastore::ListMetricsSplitsResponse;
864+
865+
let storage_resolver = StorageResolver::unconfigured();
866+
let mut mock = MockMetastoreService::new();
867+
868+
mock.expect_list_indexes_metadata()
869+
.times(1)
870+
.returning(|_| {
871+
let indexes = vec![IndexMetadata::for_test(
872+
"otel-metrics-v0_1",
873+
"ram://indexes/otel-metrics-v0_1",
874+
)];
875+
Ok(ListIndexesMetadataResponse::for_test(indexes))
876+
});
877+
878+
let marked_split = MetricsSplitRecord {
879+
state: MetricsSplitState::MarkedForDeletion,
880+
update_timestamp: 0,
881+
metadata: MetricsSplitMetadata::builder()
882+
.split_id(SplitId::new("metrics_aaa"))
883+
.index_uid("otel-metrics-v0_1:00000000000000000000000000")
884+
.time_range(TimeRange::new(1000, 2000))
885+
.num_rows(10)
886+
.size_bytes(512)
887+
.build(),
888+
};
889+
890+
// Phase 1 (staged): empty
891+
mock.expect_list_metrics_splits()
892+
.times(1)
893+
.returning(|_| Ok(ListMetricsSplitsResponse::empty()));
894+
// Phase 2 (marked): one split to delete
895+
let marked_resp = ListMetricsSplitsResponse::try_from_splits(&[marked_split]).unwrap();
896+
mock.expect_list_metrics_splits()
897+
.times(1)
898+
.returning(move |_| Ok(marked_resp.clone()));
899+
mock.expect_delete_metrics_splits()
900+
.times(1)
901+
.returning(|req| {
902+
assert_eq!(req.split_ids, ["metrics_aaa"]);
903+
Ok(EmptyResponse {})
904+
});
905+
906+
let garbage_collect_actor = GarbageCollector::new(
907+
MetastoreServiceClient::from_mock(mock),
908+
storage_resolver,
909+
);
910+
let universe = Universe::with_accelerated_time();
911+
let (_mailbox, handle) = universe.spawn_builder().spawn(garbage_collect_actor);
912+
913+
let counters = handle.process_pending_and_observe().await.state;
914+
assert_eq!(counters.num_passes, 1);
915+
assert_eq!(counters.num_successful_gc_run, 1);
916+
assert_eq!(counters.num_failed_gc_run, 0);
917+
assert_eq!(counters.num_deleted_files, 1);
918+
assert_eq!(counters.num_failed_splits, 0);
919+
universe.assert_quit().await;
920+
}
759921
}

0 commit comments

Comments
 (0)