Skip to content

Commit 2e55a78

Browse files
committed
Simplification of the dual sequencer/no sequencer logic
1 parent f7f9995 commit 2e55a78

File tree

8 files changed

+100
-139
lines changed

8 files changed

+100
-139
lines changed

quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,14 @@ pub use parquet_uploader::ParquetUploader;
5050
pub use pipeline::{MetricsPipeline, MetricsPipelineParams};
5151
pub use processed_parquet_batch::ProcessedParquetBatch;
5252
pub(crate) use publisher_impl::METRICS_PUBLISHER_NAME;
53+
54+
#[cfg(test)]
55+
/// Spawn a `Sequencer<Publisher>` in front of the given publisher mailbox.
56+
pub(crate) fn spawn_sequencer_for_test(
57+
universe: &quickwit_actors::Universe,
58+
publisher_mailbox: quickwit_actors::Mailbox<crate::actors::Publisher>,
59+
) -> quickwit_actors::Mailbox<crate::actors::Sequencer<crate::actors::Publisher>> {
60+
let sequencer = crate::actors::Sequencer::new(publisher_mailbox);
61+
let (sequencer_mailbox, _sequencer_handle) = universe.spawn_builder().spawn(sequencer);
62+
sequencer_mailbox
63+
}

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_doc_processor.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ mod tests {
524524
use quickwit_storage::RamStorage;
525525

526526
use crate::actors::metrics_pipeline::{ParquetIndexer, ParquetPackager, ParquetUploader};
527-
use crate::actors::{Publisher, SplitsUpdateMailbox, UploaderType};
527+
use crate::actors::{Publisher, UploaderType};
528528

529529
let universe = Universe::with_accelerated_time();
530530
let temp_dir = tempfile::tempdir().unwrap();
@@ -533,11 +533,12 @@ mod tests {
533533
let mock_metastore = MockMetastoreService::new();
534534
let ram_storage = StdArc::new(RamStorage::default());
535535
let (publisher_mailbox, _publisher_inbox) = universe.create_test_mailbox::<Publisher>();
536+
let sequencer_mailbox = super::super::spawn_sequencer_for_test(&universe, publisher_mailbox);
536537
let uploader = ParquetUploader::new(
537538
UploaderType::IndexUploader,
538539
quickwit_proto::metastore::MetastoreServiceClient::from_mock(mock_metastore),
539540
ram_storage,
540-
SplitsUpdateMailbox::Publisher(publisher_mailbox),
541+
sequencer_mailbox,
541542
4,
542543
);
543544
let (uploader_mailbox, _uploader_handle) = universe.spawn_builder().spawn(uploader);

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_e2e_test.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ use quickwit_proto::types::IndexUid;
3737
use quickwit_storage::RamStorage;
3838

3939
use crate::actors::{
40-
ParquetDocProcessor, ParquetIndexer, ParquetPackager, ParquetUploader, Publisher,
41-
SplitsUpdateMailbox, UploaderType,
40+
ParquetDocProcessor, ParquetIndexer, ParquetPackager, ParquetUploader, Publisher, UploaderType,
4241
};
4342
use crate::models::RawDocBatch;
4443

@@ -162,11 +161,13 @@ async fn test_metrics_pipeline_e2e() {
162161
);
163162
let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher);
164163

164+
let sequencer_mailbox = super::spawn_sequencer_for_test(&universe, publisher_mailbox);
165+
165166
let uploader = ParquetUploader::new(
166167
UploaderType::IndexUploader,
167168
metastore_client.clone(),
168169
ram_storage,
169-
SplitsUpdateMailbox::Publisher(publisher_mailbox),
170+
sequencer_mailbox,
170171
4,
171172
);
172173
let (uploader_mailbox, _uploader_handle) = universe.spawn_builder().spawn(uploader);

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_indexer.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ mod tests {
554554

555555
use super::*;
556556
use crate::actors::metrics_pipeline::{ParquetPackager, ParquetUploader};
557-
use crate::actors::{Publisher, SplitsUpdateMailbox, UploaderType};
557+
use crate::actors::{Publisher, UploaderType};
558558

559559
/// Create a test ParquetUploader and return its mailbox.
560560
fn create_test_uploader(
@@ -563,12 +563,13 @@ mod tests {
563563
let mock_metastore = MockMetastoreService::new();
564564
let ram_storage = Arc::new(RamStorage::default());
565565
let (publisher_mailbox, _publisher_inbox) = universe.create_test_mailbox::<Publisher>();
566+
let sequencer_mailbox = super::super::spawn_sequencer_for_test(universe, publisher_mailbox);
566567

567568
let uploader = ParquetUploader::new(
568569
UploaderType::IndexUploader,
569570
quickwit_proto::metastore::MetastoreServiceClient::from_mock(mock_metastore),
570571
ram_storage,
571-
SplitsUpdateMailbox::Publisher(publisher_mailbox),
572+
sequencer_mailbox,
572573
4,
573574
);
574575
universe.spawn_builder().spawn(uploader)
@@ -587,12 +588,13 @@ mod tests {
587588

588589
let ram_storage = Arc::new(RamStorage::default());
589590
let (publisher_mailbox, _publisher_inbox) = universe.create_test_mailbox::<Publisher>();
591+
let sequencer_mailbox = super::super::spawn_sequencer_for_test(universe, publisher_mailbox);
590592

591593
let uploader = ParquetUploader::new(
592594
UploaderType::IndexUploader,
593595
quickwit_proto::metastore::MetastoreServiceClient::from_mock(mock_metastore),
594596
ram_storage,
595-
SplitsUpdateMailbox::Publisher(publisher_mailbox),
597+
sequencer_mailbox,
596598
4,
597599
);
598600
universe.spawn_builder().spawn(uploader)
@@ -915,11 +917,12 @@ mod tests {
915917

916918
let ram_storage = Arc::new(RamStorage::default());
917919
let (publisher_mailbox, _publisher_inbox) = universe.create_test_mailbox::<Publisher>();
920+
let sequencer_mailbox = super::super::spawn_sequencer_for_test(&universe, publisher_mailbox);
918921
let uploader = ParquetUploader::new(
919922
UploaderType::IndexUploader,
920923
quickwit_proto::metastore::MetastoreServiceClient::from_mock(mock_metastore),
921924
ram_storage,
922-
SplitsUpdateMailbox::Publisher(publisher_mailbox),
925+
sequencer_mailbox,
923926
4,
924927
);
925928
let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader);

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_packager.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ mod tests {
244244
use quickwit_storage::RamStorage;
245245

246246
use super::*;
247-
use crate::actors::{Publisher, SplitsUpdateMailbox, UploaderType};
247+
use crate::actors::{Publisher, UploaderType};
248248

249249
fn create_test_uploader(
250250
universe: &Universe,
@@ -256,12 +256,13 @@ mod tests {
256256

257257
let ram_storage = Arc::new(RamStorage::default());
258258
let (publisher_mailbox, _publisher_inbox) = universe.create_test_mailbox::<Publisher>();
259+
let sequencer_mailbox = super::super::spawn_sequencer_for_test(universe, publisher_mailbox);
259260

260261
let uploader = ParquetUploader::new(
261262
UploaderType::IndexUploader,
262263
quickwit_proto::metastore::MetastoreServiceClient::from_mock(mock_metastore),
263264
ram_storage,
264-
SplitsUpdateMailbox::Publisher(publisher_mailbox),
265+
sequencer_mailbox,
265266
4,
266267
);
267268
universe.spawn_builder().spawn(uploader)

0 commit comments

Comments
 (0)