Skip to content

Commit e8171e5

Browse files
committed
AI slop
1 parent 84a5b84 commit e8171e5

File tree

4 files changed

+12
-168
lines changed

4 files changed

+12
-168
lines changed

quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs

Lines changed: 1 addition & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -441,139 +441,7 @@ impl IndexingPipeline {
441441
sequencer: sequencer_handle,
442442
publisher: publisher_handle,
443443
next_check_for_progress: Instant::now() + *HEARTBEAT,
444-
}));
445-
Ok(())
446-
}
447-
448-
/// Spawn the parquet pipeline using ParquetDocProcessor and ParquetIndexer.
449-
///
450-
/// This pipeline routes metrics data through the Parquet/DataFusion path instead of
451-
/// the Tantivy path. The output is ParquetSplit files written to the indexing directory.
452-
#[instrument(
453-
name="spawn_parquet_pipeline",
454-
level="info",
455-
skip_all,
456-
fields(
457-
index=%self.params.pipeline_id.index_uid.index_id,
458-
r#gen=self.generation()
459-
))]
460-
async fn spawn_parquet_pipeline(&mut self, ctx: &ActorContext<Self>) -> anyhow::Result<()> {
461-
let index_id = &self.params.pipeline_id.index_uid.index_id;
462-
let source_id = &self.params.pipeline_id.source_id;
463-
464-
info!(
465-
index_id,
466-
source_id,
467-
pipeline_uid=%self.params.pipeline_id.pipeline_uid,
468-
root_dir=%self.params.indexing_directory.path().display(),
469-
"spawning parquet indexing pipeline for metrics",
470-
);
471-
472-
let (source_mailbox, source_inbox) = ctx
473-
.spawn_ctx()
474-
.create_mailbox::<SourceActor<ParquetDocProcessor>>(
475-
"SourceActor",
476-
QueueCapacity::Unbounded,
477-
);
478-
479-
// ParquetPublisher
480-
let parquet_publisher = ParquetPublisher::new(
481-
PublisherType::ParquetPublisher,
482-
self.params.metastore.clone(),
483-
None,
484-
Some(source_mailbox.clone()),
485-
);
486-
let (parquet_publisher_mailbox, parquet_publisher_handle) = ctx
487-
.spawn_actor()
488-
.set_kill_switch(self.kill_switch.clone())
489-
.spawn(parquet_publisher);
490-
491-
// Sequencer for ordered delivery
492-
let parquet_sequencer = Sequencer::new(parquet_publisher_mailbox);
493-
let (parquet_sequencer_mailbox, parquet_sequencer_handle) = ctx
494-
.spawn_actor()
495-
.set_kill_switch(self.kill_switch.clone())
496-
.spawn(parquet_sequencer);
497-
498-
// ParquetUploader
499-
let parquet_uploader = ParquetUploader::new(
500-
super::UploaderType::IndexUploader,
501-
self.params.metastore.clone(),
502-
self.params.storage.clone(),
503-
SplitsUpdateMailbox::Sequencer(parquet_sequencer_mailbox),
504-
self.params.max_concurrent_split_uploads_index,
505-
);
506-
let (parquet_uploader_mailbox, parquet_uploader_handle) = ctx
507-
.spawn_actor()
508-
.set_kill_switch(self.kill_switch.clone())
509-
.spawn(parquet_uploader);
510-
511-
// ParquetPackager
512-
let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default();
513-
let table_config = quickwit_parquet_engine::table_config::TableConfig::default();
514-
let split_writer = quickwit_parquet_engine::storage::ParquetSplitWriter::new(
515-
writer_config,
516-
self.params.indexing_directory.path(),
517-
&table_config,
518-
);
519-
let parquet_packager = ParquetPackager::new(split_writer, parquet_uploader_mailbox);
520-
let (parquet_packager_mailbox, parquet_packager_handle) = ctx
521-
.spawn_actor()
522-
.set_kill_switch(self.kill_switch.clone())
523-
.spawn(parquet_packager);
524-
525-
// ParquetIndexer
526-
let commit_timeout =
527-
Duration::from_secs(self.params.indexing_settings.commit_timeout_secs as u64);
528-
let parquet_indexer = ParquetIndexer::new(
529-
self.params.pipeline_id.index_uid.clone(),
530-
source_id.to_string(),
531-
None,
532-
parquet_packager_mailbox,
533-
Some(commit_timeout),
534-
);
535-
let (parquet_indexer_mailbox, parquet_indexer_handle) = ctx
536-
.spawn_actor()
537-
.set_kill_switch(self.kill_switch.clone())
538-
.spawn(parquet_indexer);
539-
540-
// ParquetDocProcessor
541-
let parquet_doc_processor = ParquetDocProcessor::new(
542-
index_id.to_string(),
543-
source_id.to_string(),
544-
parquet_indexer_mailbox,
545-
);
546-
let (parquet_doc_processor_mailbox, parquet_doc_processor_handle) = ctx
547-
.spawn_actor()
548-
.set_kill_switch(self.kill_switch.clone())
549-
.spawn(parquet_doc_processor);
550-
551-
// Source (using parquet source loader)
552-
let source_runtime = SourceRuntime {
553-
pipeline_id: self.params.pipeline_id.clone(),
554-
source_config: self.params.source_config.clone(),
555-
metastore: self.params.metastore.clone(),
556-
ingester_pool: self.params.ingester_pool.clone(),
557-
queues_dir_path: self.params.queues_dir_path.clone(),
558-
storage_resolver: self.params.source_storage_resolver.clone(),
559-
event_broker: self.params.event_broker.clone(),
560-
indexing_setting: self.params.indexing_settings.clone(),
561-
};
562-
let source = ctx
563-
.protect_future(quickwit_supported_parquet_sources().load_source(source_runtime))
564-
.await?;
565-
let actor_source = SourceActor {
566-
source,
567-
processor_mailbox: parquet_doc_processor_mailbox,
568-
};
569-
let (source_mailbox, source_handle) = ctx
570-
.spawn_actor()
571-
.set_mailboxes(source_mailbox, source_inbox)
572-
.set_kill_switch(self.kill_switch.clone())
573-
.spawn(actor_source);
574-
let assign_shards_message = AssignShards(Assignment {
575-
shard_ids: self.shard_ids.clone(),
576-
}));
444+
});
577445
Ok(())
578446
}
579447

quickwit/quickwit-indexing/src/actors/metrics_pipeline/pipeline.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,9 +343,11 @@ impl MetricsPipeline {
343343

344344
// ParquetPackager
345345
let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default();
346+
let table_config = quickwit_parquet_engine::table_config::TableConfig::default();
346347
let split_writer = quickwit_parquet_engine::storage::ParquetSplitWriter::new(
347348
writer_config,
348349
self.params.indexing_directory.path(),
350+
&table_config,
349351
);
350352
let packager = ParquetPackager::new(split_writer, uploader_mailbox);
351353
let (packager_mailbox, packager_handle) = ctx

quickwit/quickwit-indexing/src/source/mod.rs

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ pub use source_sink::SourceSink;
9292
pub use pulsar_source::{PulsarSource, PulsarSourceFactory};
9393
#[cfg(feature = "sqs")]
9494
pub use queue_sources::sqs_queue;
95-
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, DeferableReplyHandler, Handler, Mailbox};
95+
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler};
9696
use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
9797
use quickwit_common::pubsub::EventBroker;
9898
use quickwit_common::runtimes::RuntimeType;
@@ -118,7 +118,7 @@ pub use void_source::{VoidSource, VoidSourceFactory};
118118

119119
use self::doc_file_reader::dir_and_filename;
120120
use self::stdin_source::StdinSourceFactory;
121-
use crate::models::{NewPublishLock, NewPublishToken, RawDocBatch};
121+
use crate::models::RawDocBatch;
122122
use crate::source::ingest::IngestSourceFactory;
123123
use crate::source::ingest_api_source::IngestApiSourceFactory;
124124

@@ -318,15 +318,10 @@ pub struct SourceActor {
318318
}
319319

320320
impl SourceActor {
321-
pub fn new<A>(source: Box<dyn Source>, source_sink: Mailbox<A>) -> Self
322-
where A: Actor
323-
+ DeferableReplyHandler<RawDocBatch>
324-
+ DeferableReplyHandler<NewPublishLock>
325-
+ DeferableReplyHandler<NewPublishToken>
326-
{
321+
pub fn new(source: Box<dyn Source>, source_sink: impl Into<SourceSink>) -> Self {
327322
SourceActor {
328323
source,
329-
source_sink: SourceSink::new(source_sink),
324+
source_sink: source_sink.into(),
330325
}
331326
}
332327
}
@@ -436,23 +431,6 @@ pub fn quickwit_supported_sources() -> &'static SourceLoader {
436431
&SOURCE_LOADER
437432
}
438433

439-
/// Returns the source loader for parquet pipelines (ParquetDocProcessor).
440-
///
441-
/// Metrics pipelines currently only support IngestV2 sources, which is the
442-
/// production source type for metrics ingestion.
443-
pub fn quickwit_supported_parquet_sources() -> &'static ParquetSourceLoader {
444-
static PARQUET_SOURCE_LOADER: LazyLock<ParquetSourceLoader> = LazyLock::new(|| {
445-
let mut source_factory = ParquetSourceLoader::default();
446-
// Only IngestV2 is currently used for metrics ingestion
447-
source_factory.add_source(SourceType::IngestV2, IngestSourceFactory);
448-
// Add other sources for testing/development
449-
source_factory.add_source(SourceType::File, FileSourceFactory);
450-
source_factory.add_source(SourceType::Vec, VecSourceFactory);
451-
source_factory.add_source(SourceType::Void, VoidSourceFactory);
452-
source_factory
453-
});
454-
&PARQUET_SOURCE_LOADER
455-
}
456434

457435
pub async fn check_source_connectivity(
458436
storage_resolver: &StorageResolver,

quickwit/quickwit-indexing/src/source/source_sink.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,15 @@ pub struct SourceSink {
7171
inner: Arc<dyn SourceSinkTrait>,
7272
}
7373

74-
impl SourceSink {
75-
/// Create a `SourceSink` from any actor mailbox whose actor implements
76-
/// the required message handlers.
77-
pub fn new<A>(mailbox: Mailbox<A>) -> Self
78-
where A: Actor
79-
+ DeferableReplyHandler<RawDocBatch>
80-
+ DeferableReplyHandler<NewPublishLock>
81-
+ DeferableReplyHandler<NewPublishToken> {
74+
impl<T: SourceSinkTrait> From<T> for SourceSink {
75+
fn from(source_sink: T) -> Self {
8276
Self {
83-
inner: Arc::new(mailbox),
77+
inner: Arc::new(source_sink),
8478
}
8579
}
80+
}
8681

82+
impl SourceSink {
8783
/// Send a `RawDocBatch` to the processor.
8884
///
8985
/// The source context's protect zone is held while the send is in flight,

0 commit comments

Comments
 (0)