Skip to content

Commit fa5eee3

Browse files
committed
Fix rustfmt formatting
1 parent bc3e7af commit fa5eee3

File tree

9 files changed

+119
-74
lines changed

9 files changed

+119
-74
lines changed

quickwit/quickwit-cli/src/tool.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -696,18 +696,27 @@ fn print_tantivy_gc_result(
696696
for split in &removal_info.failed_splits {
697697
println!(" - {}", split.split_id);
698698
}
699-
println!("{} Splits were unable to be removed.", removal_info.failed_splits.len());
699+
println!(
700+
"{} Splits were unable to be removed.",
701+
removal_info.failed_splits.len()
702+
);
700703
}
701704

702705
let deleted_bytes: u64 = removal_info
703706
.removed_split_entries
704707
.iter()
705708
.map(|s| s.file_size_bytes.as_u64())
706709
.sum();
707-
println!("{}MB of storage garbage collected.", deleted_bytes / 1_000_000);
710+
println!(
711+
"{}MB of storage garbage collected.",
712+
deleted_bytes / 1_000_000
713+
);
708714

709715
if removal_info.failed_splits.is_empty() {
710-
println!("{} Index successfully garbage collected.", "✔".color(GREEN_COLOR));
716+
println!(
717+
"{} Index successfully garbage collected.",
718+
"✔".color(GREEN_COLOR)
719+
);
711720
} else if removal_info.removed_split_entries.is_empty() {
712721
println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR));
713722
} else {
@@ -750,10 +759,16 @@ fn print_parquet_gc_result(
750759
);
751760
}
752761

753-
println!("{}MB of storage garbage collected.", removal_info.removed_bytes() / 1_000_000);
762+
println!(
763+
"{}MB of storage garbage collected.",
764+
removal_info.removed_bytes() / 1_000_000
765+
);
754766

755767
if removal_info.failed_parquet_splits.is_empty() {
756-
println!("{} Index successfully garbage collected.", "✔".color(GREEN_COLOR));
768+
println!(
769+
"{} Index successfully garbage collected.",
770+
"✔".color(GREEN_COLOR)
771+
);
757772
} else if removal_info.removed_parquet_splits_entries.is_empty() {
758773
println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR));
759774
} else {

quickwit/quickwit-index-management/src/garbage_collection.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,11 @@ pub(crate) async fn delete_split_files(
424424
storage: &dyn Storage,
425425
splits: Vec<SplitToDelete>,
426426
progress_opt: Option<&Progress>,
427-
) -> (Vec<SplitToDelete>, Vec<SplitToDelete>, Option<BulkDeleteError>) {
427+
) -> (
428+
Vec<SplitToDelete>,
429+
Vec<SplitToDelete>,
430+
Option<BulkDeleteError>,
431+
) {
428432
if splits.is_empty() {
429433
return (Vec::new(), Vec::new(), None);
430434
}
@@ -438,8 +442,9 @@ pub(crate) async fn delete_split_files(
438442
Ok(()) => (splits, Vec::new(), None),
439443
Err(bulk_err) => {
440444
let success_paths: HashSet<&PathBuf> = bulk_err.successes.iter().collect();
441-
let (succeeded, failed) =
442-
splits.into_iter().partition(|s| success_paths.contains(&s.path));
445+
let (succeeded, failed) = splits
446+
.into_iter()
447+
.partition(|s| success_paths.contains(&s.path));
443448
(succeeded, failed, Some(bulk_err))
444449
}
445450
}
@@ -478,8 +483,14 @@ pub async fn delete_splits_from_storage_and_metastore(
478483
let (succeeded_stds, failed_stds, storage_err) =
479484
delete_split_files(&*storage, splits_to_delete, progress_opt).await;
480485

481-
let successes: Vec<SplitInfo> = succeeded_stds.iter().map(|s| split_infos[&s.path].clone()).collect();
482-
let storage_failures: Vec<SplitInfo> = failed_stds.iter().map(|s| split_infos[&s.path].clone()).collect();
486+
let successes: Vec<SplitInfo> = succeeded_stds
487+
.iter()
488+
.map(|s| split_infos[&s.path].clone())
489+
.collect();
490+
let storage_failures: Vec<SplitInfo> = failed_stds
491+
.iter()
492+
.map(|s| split_infos[&s.path].clone())
493+
.collect();
483494

484495
let mut storage_error: Option<BulkDeleteError> = None;
485496
if let Some(bulk_delete_error) = storage_err {

quickwit/quickwit-index-management/src/parquet_garbage_collection.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,7 @@ async fn delete_marked_parquet_splits(
301301
// To detect if this is the last page, we check if the number of splits is less than the
302302
// limit.
303303
assert!(splits.len() <= DELETE_PARQUET_SPLITS_BATCH_SIZE);
304-
let splits_to_delete_possibly_remaining =
305-
splits.len() == DELETE_PARQUET_SPLITS_BATCH_SIZE;
304+
let splits_to_delete_possibly_remaining = splits.len() == DELETE_PARQUET_SPLITS_BATCH_SIZE;
306305

307306
// Set split after which to search for the next loop.
308307
let Some(last_split) = splits.last() else {
@@ -368,7 +367,10 @@ async fn delete_parquet_splits_from_storage_and_metastore(
368367

369368
let storage_failed: Vec<ParquetSplitInfo> = failed_stds
370369
.into_iter()
371-
.map(|s| ParquetSplitInfo { split_id: s.split_id, file_size_bytes: s.size_bytes })
370+
.map(|s| ParquetSplitInfo {
371+
split_id: s.split_id,
372+
file_size_bytes: s.size_bytes,
373+
})
372374
.collect();
373375

374376
if succeeded_stds.is_empty() {
@@ -381,16 +383,22 @@ async fn delete_parquet_splits_from_storage_and_metastore(
381383
index_uid: Some(index_uid.clone()),
382384
split_ids: ids_to_delete,
383385
};
384-
let metastore_result =
385-
protect_future(progress_opt, metastore.delete_metrics_splits(delete_request)).await;
386+
let metastore_result = protect_future(
387+
progress_opt,
388+
metastore.delete_metrics_splits(delete_request),
389+
)
390+
.await;
386391

387392
if let Some(progress) = progress_opt {
388393
progress.record_progress();
389394
}
390395

391396
let succeeded: Vec<ParquetSplitInfo> = succeeded_stds
392397
.into_iter()
393-
.map(|s| ParquetSplitInfo { split_id: s.split_id, file_size_bytes: s.size_bytes })
398+
.map(|s| ParquetSplitInfo {
399+
split_id: s.split_id,
400+
file_size_bytes: s.size_bytes,
401+
})
394402
.collect();
395403

396404
match metastore_result {

quickwit/quickwit-janitor/src/actors/garbage_collector.rs

Lines changed: 51 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -113,32 +113,13 @@ impl GarbageCollector {
113113
counters: GarbageCollectorCounters::default(),
114114
}
115115
}
116-
// if !deleted_file_entries.is_empty() {
117-
// let num_deleted_splits = deleted_file_entries.len();
118-
// let num_deleted_bytes = deleted_file_entries
119-
// .iter()
120-
// .map(|entry| entry.file_size_bytes.as_u64() as usize)
121-
// .sum::<usize>();
122-
// let deleted_files: HashSet<&Path> = deleted_file_entries
123-
// .iter()
124-
// .map(|deleted_entry| deleted_entry.file_name.as_path())
125-
// .take(5)
126-
// .collect();
127-
// info!(
128-
// num_deleted_splits = num_deleted_splits,
129-
// "Janitor deleted {:?} and {} other splits.", deleted_files, num_deleted_splits,
130-
// );
131-
// self.counters.num_deleted_files += num_deleted_splits;
132-
// self.counters.num_deleted_bytes += num_deleted_bytes;
133116

134117
fn record_gc_result(&mut self, result: &GcRunResult, split_type: &str) {
135118
self.counters.num_failed_splits += result.num_failed;
136119
if result.num_deleted_splits > 0 {
137120
info!(
138121
"Janitor deleted {:?} and {} other {} splits.",
139-
result.sample_deleted_files,
140-
result.num_deleted_splits,
141-
split_type,
122+
result.sample_deleted_files, result.num_deleted_splits, split_type,
142123
);
143124
self.counters.num_deleted_files += result.num_deleted_splits;
144125
self.counters.num_deleted_bytes += result.num_deleted_bytes;
@@ -222,22 +203,40 @@ impl GarbageCollector {
222203
.await;
223204

224205
let tantivy_run_duration = tantivy_start.elapsed().as_secs();
225-
JANITOR_METRICS.gc_seconds_total.with_label_values(["tantivy"]).inc_by(tantivy_run_duration);
206+
JANITOR_METRICS
207+
.gc_seconds_total
208+
.with_label_values(["tantivy"])
209+
.inc_by(tantivy_run_duration);
226210

227211
let result = match gc_res {
228212
Ok(removal_info) => {
229213
self.counters.num_successful_gc_run += 1;
230-
JANITOR_METRICS.gc_runs.with_label_values(["success", "tantivy"]).inc();
214+
JANITOR_METRICS
215+
.gc_runs
216+
.with_label_values(["success", "tantivy"])
217+
.inc();
231218
GcRunResult {
232219
num_deleted_splits: removal_info.removed_split_entries.len(),
233-
num_deleted_bytes: removal_info.removed_split_entries.iter().map(|e| e.file_size_bytes.as_u64() as usize).sum(),
220+
num_deleted_bytes: removal_info
221+
.removed_split_entries
222+
.iter()
223+
.map(|e| e.file_size_bytes.as_u64() as usize)
224+
.sum(),
234225
num_failed: removal_info.failed_splits.len(),
235-
sample_deleted_files: removal_info.removed_split_entries.iter().take(5).map(|e| e.file_name.display().to_string()).collect(),
226+
sample_deleted_files: removal_info
227+
.removed_split_entries
228+
.iter()
229+
.take(5)
230+
.map(|e| e.file_name.display().to_string())
231+
.collect(),
236232
}
237233
}
238234
Err(error) => {
239235
self.counters.num_failed_gc_run += 1;
240-
JANITOR_METRICS.gc_runs.with_label_values(["error", "tantivy"]).inc();
236+
JANITOR_METRICS
237+
.gc_runs
238+
.with_label_values(["error", "tantivy"])
239+
.inc();
241240
error!(error=?error, "failed to run garbage collection");
242241
GcRunResult::failed()
243242
}
@@ -260,22 +259,36 @@ impl GarbageCollector {
260259
.await;
261260

262261
let parquet_run_duration = parquet_start.elapsed().as_secs();
263-
JANITOR_METRICS.gc_seconds_total.with_label_values(["parquet"]).inc_by(parquet_run_duration);
262+
JANITOR_METRICS
263+
.gc_seconds_total
264+
.with_label_values(["parquet"])
265+
.inc_by(parquet_run_duration);
264266

265267
let result = match gc_res {
266268
Ok(removal_info) => {
267269
self.counters.num_successful_gc_run += 1;
268-
JANITOR_METRICS.gc_runs.with_label_values(["success", "parquet"]).inc();
270+
JANITOR_METRICS
271+
.gc_runs
272+
.with_label_values(["success", "parquet"])
273+
.inc();
269274
GcRunResult {
270275
num_deleted_splits: removal_info.removed_split_count(),
271276
num_deleted_bytes: removal_info.removed_bytes() as usize,
272277
num_failed: removal_info.failed_split_count(),
273-
sample_deleted_files: removal_info.removed_parquet_splits_entries.iter().take(5).map(|e| format!("{}.parquet", e.split_id)).collect(),
278+
sample_deleted_files: removal_info
279+
.removed_parquet_splits_entries
280+
.iter()
281+
.take(5)
282+
.map(|e| format!("{}.parquet", e.split_id))
283+
.collect(),
274284
}
275285
}
276286
Err(error) => {
277287
self.counters.num_failed_gc_run += 1;
278-
JANITOR_METRICS.gc_runs.with_label_values(["error", "parquet"]).inc();
288+
JANITOR_METRICS
289+
.gc_runs
290+
.with_label_values(["error", "parquet"])
291+
.inc();
279292
error!(error=?error, "failed to run parquet garbage collection");
280293
GcRunResult::failed()
281294
}
@@ -865,15 +878,13 @@ mod tests {
865878
let storage_resolver = StorageResolver::unconfigured();
866879
let mut mock = MockMetastoreService::new();
867880

868-
mock.expect_list_indexes_metadata()
869-
.times(1)
870-
.returning(|_| {
871-
let indexes = vec![IndexMetadata::for_test(
872-
"otel-metrics-v0_1",
873-
"ram://indexes/otel-metrics-v0_1",
874-
)];
875-
Ok(ListIndexesMetadataResponse::for_test(indexes))
876-
});
881+
mock.expect_list_indexes_metadata().times(1).returning(|_| {
882+
let indexes = vec![IndexMetadata::for_test(
883+
"otel-metrics-v0_1",
884+
"ram://indexes/otel-metrics-v0_1",
885+
)];
886+
Ok(ListIndexesMetadataResponse::for_test(indexes))
887+
});
877888

878889
let marked_split = MetricsSplitRecord {
879890
state: MetricsSplitState::MarkedForDeletion,
@@ -903,10 +914,8 @@ mod tests {
903914
Ok(EmptyResponse {})
904915
});
905916

906-
let garbage_collect_actor = GarbageCollector::new(
907-
MetastoreServiceClient::from_mock(mock),
908-
storage_resolver,
909-
);
917+
let garbage_collect_actor =
918+
GarbageCollector::new(MetastoreServiceClient::from_mock(mock), storage_resolver);
910919
let universe = Universe::with_accelerated_time();
911920
let (_mailbox, handle) = universe.spawn_builder().spawn(garbage_collect_actor);
912921

quickwit/quickwit-janitor/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use quickwit_actors::{Mailbox, Universe};
1818
use quickwit_common::pubsub::EventBroker;
1919
use quickwit_config::NodeConfig;
2020
use quickwit_indexing::actors::MergeSchedulerService;
21-
use quickwit_metastore::{SplitInfo};
21+
use quickwit_metastore::SplitInfo;
2222
use quickwit_proto::metastore::MetastoreServiceClient;
2323
use quickwit_search::SearchJobPlacer;
2424
use quickwit_storage::StorageResolver;

quickwit/quickwit-janitor/src/metrics.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
// limitations under the License.
1414

1515
use once_cell::sync::Lazy;
16-
use quickwit_common::metrics::{
17-
IntCounterVec, IntGaugeVec, new_counter_vec, new_gauge_vec,
18-
};
16+
use quickwit_common::metrics::{IntCounterVec, IntGaugeVec, new_counter_vec, new_gauge_vec};
1917

2018
pub struct JanitorMetrics {
2119
pub ongoing_num_delete_operations_total: IntGaugeVec<1>,

quickwit/quickwit-janitor/src/retention_policy_execution.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ pub async fn run_execute_retention_policy(
9595
}
9696

9797
/// Detect all expired parquet splits based on a retention policy and
98-
/// mark them as `MarkedForDeletion`.
98+
/// mark them as `MarkedForDeletion`.
9999
pub async fn run_execute_parquet_retention_policy(
100100
index_uid: &IndexUid,
101101
metastore: MetastoreServiceClient,
@@ -133,12 +133,12 @@ pub async fn run_execute_parquet_retention_policy(
133133
expired_split_ids.len()
134134
);
135135

136-
ctx.protect_future(
137-
metastore.mark_metrics_splits_for_deletion(MarkMetricsSplitsForDeletionRequest {
136+
ctx.protect_future(metastore.mark_metrics_splits_for_deletion(
137+
MarkMetricsSplitsForDeletionRequest {
138138
index_uid: Some(index_uid.clone()),
139139
split_ids: expired_split_ids,
140-
}),
141-
)
140+
},
141+
))
142142
.await?;
143143

144144
Ok(expired_splits.len())

quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -862,13 +862,15 @@ impl FileBackedIndex {
862862

863863
// Sort by split_id for stable pagination (mirrors Postgres ORDER BY split_id ASC).
864864
splits.sort_unstable_by(|a, b| {
865-
a.metadata.split_id.as_str().cmp(b.metadata.split_id.as_str())
865+
a.metadata
866+
.split_id
867+
.as_str()
868+
.cmp(b.metadata.split_id.as_str())
866869
});
867870

868871
// Apply cursor: skip splits up to and including after_split_id.
869872
let splits = if let Some(ref after) = query.after_split_id {
870-
let pos =
871-
splits.partition_point(|s| s.metadata.split_id.as_str() <= after.as_str());
873+
let pos = splits.partition_point(|s| s.metadata.split_id.as_str() <= after.as_str());
872874
&splits[pos..]
873875
} else {
874876
&splits[..]

quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ pub const OTEL_METRICS_INDEX_ID: &str = "otel-metrics-v0_9";
4747
/// directly and queries via DataFusion. The doc mapping is unused; this config
4848
/// exists only so the index can be registered in the metastore for source
4949
/// assignment and lifecycle management.
50-
///
51-
/// TODO: As a temporary hack, we are including a timestamp_field, so that
52-
/// we can pass the retention policy validation.
50+
///
51+
/// TODO: As a temporary hack, we are including a timestamp_field, so that
52+
/// we can pass the retention policy validation.
5353
const OTEL_METRICS_INDEX_CONFIG: &str = r#"
5454
version: 0.8
5555
@@ -554,7 +554,9 @@ mod tests {
554554
let index_config =
555555
OtlpGrpcMetricsService::index_config(&Uri::for_test("ram:///indexes")).unwrap();
556556
assert_eq!(index_config.index_id, OTEL_METRICS_INDEX_ID);
557-
let retention = index_config.retention_policy_opt.expect("retention policy should be set");
557+
let retention = index_config
558+
.retention_policy_opt
559+
.expect("retention policy should be set");
558560
assert_eq!(
559561
retention.retention_period().unwrap(),
560562
std::time::Duration::from_secs(30 * 24 * 3600)

0 commit comments

Comments
 (0)