Skip to content

Commit 075f3e2

Browse files
committed
fix error handling, linter
1 parent a9979ce commit 075f3e2

File tree

2 files changed

+22
-36
lines changed

2 files changed

+22
-36
lines changed

quickwit/quickwit-index-management/src/parquet_garbage_collection.rs

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use quickwit_common::Progress;
2222
use quickwit_metastore::{
2323
ListMetricsSplitsQuery, ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt,
2424
};
25-
use quickwit_parquet_engine::split::{MetricsSplitRecord, MetricsSplitState};
25+
use quickwit_metastore::SplitState;
26+
use quickwit_parquet_engine::split::MetricsSplitRecord;
2627
use quickwit_proto::metastore::{
2728
DeleteMetricsSplitsRequest, ListMetricsSplitsRequest, MarkMetricsSplitsForDeletionRequest,
2829
MetastoreService, MetastoreServiceClient,
@@ -90,12 +91,9 @@ pub async fn run_parquet_garbage_collect(
9091

9192
let mut deletable_staged_splits = Vec::new();
9293
for index_uid in indexes.keys() {
93-
match list_stale_staged_splits(&metastore, index_uid, staged_cutoff, progress_opt).await {
94-
Ok(splits) => deletable_staged_splits.extend(splits),
95-
Err(err) => {
96-
error!(index_uid=%index_uid, error=?err, "failed to list stale staged parquet splits");
97-
}
98-
}
94+
let splits =
95+
list_stale_staged_splits(&metastore, index_uid, staged_cutoff, progress_opt).await?;
96+
deletable_staged_splits.extend(splits);
9997
}
10098

10199
if dry_run {
@@ -104,12 +102,9 @@ pub async fn run_parquet_garbage_collect(
104102

105103
let mut splits_marked_for_deletion = Vec::new();
106104
for index_uid in indexes.keys() {
107-
match list_marked_splits(&metastore, index_uid, deletion_cutoff, progress_opt).await {
108-
Ok(splits) => splits_marked_for_deletion.extend(splits),
109-
Err(err) => {
110-
error!(index_uid=%index_uid, error=?err, "failed to list marked parquet splits");
111-
}
112-
}
105+
let splits =
106+
list_marked_splits(&metastore, index_uid, deletion_cutoff, progress_opt).await?;
107+
splits_marked_for_deletion.extend(splits);
113108
}
114109
splits_marked_for_deletion.extend(deletable_staged_splits);
115110

@@ -127,38 +122,27 @@ pub async fn run_parquet_garbage_collect(
127122
}
128123

129124
// Schedule all eligible staged splits for delete
130-
if let Err(err) =
131-
mark_splits_for_deletion(&metastore, &deletable_staged_splits, progress_opt).await
132-
{
133-
error!(error=?err, "failed to mark stale staged parquet splits");
134-
}
125+
mark_splits_for_deletion(&metastore, &deletable_staged_splits, progress_opt).await?;
135126

136127
// Phase 2: Delete splits marked for deletion past the grace period
137128
let deletion_cutoff =
138129
OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64;
139130

140131
for (index_uid, storage) in &indexes {
141-
match delete_marked_parquet_splits(
132+
let batch_info = delete_marked_parquet_splits(
142133
&metastore,
143134
index_uid,
144135
storage.clone(),
145136
deletion_cutoff,
146137
progress_opt,
147138
)
148-
.await
149-
{
150-
Ok(batch_info) => {
151-
removal_info
152-
.removed_parquet_splits_entries
153-
.extend(batch_info.removed_parquet_splits_entries);
154-
removal_info
155-
.failed_parquet_splits
156-
.extend(batch_info.failed_parquet_splits);
157-
}
158-
Err(err) => {
159-
error!(index_uid=%index_uid, error=?err, "failed to delete marked parquet splits");
160-
}
161-
}
139+
.await?;
140+
removal_info
141+
.removed_parquet_splits_entries
142+
.extend(batch_info.removed_parquet_splits_entries);
143+
removal_info
144+
.failed_parquet_splits
145+
.extend(batch_info.failed_parquet_splits);
162146
}
163147

164148
metrics.record(
@@ -178,7 +162,7 @@ async fn list_stale_staged_splits(
178162
progress_opt: Option<&Progress>,
179163
) -> anyhow::Result<Vec<MetricsSplitRecord>> {
180164
let query = ListMetricsSplitsQuery::for_index(index_uid.clone())
181-
.with_split_states(vec![MetricsSplitState::Staged.to_string()])
165+
.with_split_states(vec![SplitState::Staged])
182166
.with_update_timestamp_lte(staged_cutoff);
183167

184168
let request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query)
@@ -198,7 +182,7 @@ async fn list_marked_splits(
198182
progress_opt: Option<&Progress>,
199183
) -> anyhow::Result<Vec<MetricsSplitRecord>> {
200184
let query = ListMetricsSplitsQuery::for_index(index_uid.clone())
201-
.with_split_states(vec![MetricsSplitState::MarkedForDeletion.to_string()])
185+
.with_split_states(vec![SplitState::MarkedForDeletion])
202186
.with_update_timestamp_lte(deletion_cutoff);
203187

204188
let request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query)
@@ -258,7 +242,7 @@ async fn delete_marked_parquet_splits(
258242
let mut removal_info = ParquetSplitRemovalInfo::default();
259243

260244
let mut query = ListMetricsSplitsQuery::for_index(index_uid.clone())
261-
.with_split_states(vec![MetricsSplitState::MarkedForDeletion.to_string()])
245+
.with_split_states(vec![SplitState::MarkedForDeletion])
262246
.with_update_timestamp_lte(deletion_cutoff)
263247
.with_limit(DELETE_PARQUET_SPLITS_BATCH_SIZE);
264248

quickwit/quickwit-metastore/src/metastore/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ impl Default for ListMetricsSplitsQuery {
113113
create_timestamp: Default::default(),
114114
mature: Bound::Unbounded,
115115
limit: None,
116+
after_split_id: None,
117+
max_time_range_end: None,
116118
}
117119
}
118120
}

0 commit comments

Comments
 (0)