@@ -46,8 +46,8 @@ use tracing::{debug, error, info, warn};
4646use ulid:: Ulid ;
4747
4848use super :: {
49- BATCH_NUM_BYTES_LIMIT , BatchBuilder , EMIT_BATCHES_TIMEOUT , ProcessorMailbox , Source ,
50- SourceContext , SourceRuntime , TypedSourceFactory ,
49+ BATCH_NUM_BYTES_LIMIT , BatchBuilder , EMIT_BATCHES_TIMEOUT , Source , SourceContext , SourceRuntime ,
50+ SourceSink , TypedSourceFactory ,
5151} ;
5252use crate :: models:: { LocalShardPositionsUpdate , NewPublishLock , NewPublishToken , PublishLock } ;
5353
@@ -393,7 +393,7 @@ impl IngestSource {
393393 async fn reset_if_needed (
394394 & mut self ,
395395 new_assigned_shard_ids : & BTreeSet < ShardId > ,
396- doc_processor_mailbox : & ProcessorMailbox ,
396+ source_sink : & SourceSink ,
397397 ctx : & SourceContext ,
398398 ) -> anyhow:: Result < ( ) > {
399399 // No need to do anything if the list of shards before and after are empty.
@@ -440,10 +440,10 @@ impl IngestSource {
440440 self . publish_lock . kill ( ) . await ;
441441 self . publish_lock = PublishLock :: default ( ) ;
442442 self . publish_token = self . client_id . new_publish_token ( ) ;
443- doc_processor_mailbox
443+ source_sink
444444 . send_publish_lock ( NewPublishLock ( self . publish_lock . clone ( ) ) , ctx)
445445 . await ?;
446- doc_processor_mailbox
446+ source_sink
447447 . send_publish_token ( NewPublishToken ( self . publish_token . clone ( ) ) , ctx)
448448 . await ?;
449449 Ok ( ( ) )
@@ -454,7 +454,7 @@ impl IngestSource {
454454impl Source for IngestSource {
455455 async fn emit_batches (
456456 & mut self ,
457- processor_mailbox : & ProcessorMailbox ,
457+ source_sink : & SourceSink ,
458458 ctx : & SourceContext ,
459459 ) -> Result < Duration , ActorExitStatus > {
460460 let mut batch_builder = BatchBuilder :: new ( SourceType :: IngestV2 ) ;
@@ -497,18 +497,18 @@ impl Source for IngestSource {
497497 "Sending doc batch to indexer."
498498 ) ;
499499 let message = batch_builder. build ( ) ;
500- processor_mailbox . send_raw_doc_batch ( message, ctx) . await ?;
500+ source_sink . send_raw_doc_batch ( message, ctx) . await ?;
501501 }
502502 Ok ( Duration :: default ( ) )
503503 }
504504
505505 async fn assign_shards (
506506 & mut self ,
507507 new_assigned_shard_ids : BTreeSet < ShardId > ,
508- processor_mailbox : & ProcessorMailbox ,
508+ source_sink : & SourceSink ,
509509 ctx : & SourceContext ,
510510 ) -> anyhow:: Result < ( ) > {
511- self . reset_if_needed ( & new_assigned_shard_ids, processor_mailbox , ctx)
511+ self . reset_if_needed ( & new_assigned_shard_ids, source_sink , ctx)
512512 . await ?;
513513
514514 // As enforced by `reset_if_needed`, at this point, all currently assigned shards should be
@@ -952,7 +952,7 @@ mod tests {
952952 let ( source_mailbox, _source_inbox) = universe. create_test_mailbox :: < SourceActor > ( ) ;
953953 let ( doc_processor_mailbox, doc_processor_inbox) =
954954 universe. create_test_mailbox :: < DocProcessor > ( ) ;
955- let processor_mailbox = ProcessorMailbox :: new ( doc_processor_mailbox. clone ( ) ) ;
955+ let source_sink = SourceSink :: new ( doc_processor_mailbox. clone ( ) ) ;
956956 let ( observable_state_tx, _observable_state_rx) = watch:: channel ( serde_json:: Value :: Null ) ;
957957 let ctx: SourceContext =
958958 ActorContext :: for_test ( & universe, source_mailbox, observable_state_tx) ;
@@ -962,7 +962,7 @@ mod tests {
962962 let shard_ids: BTreeSet < ShardId > = once ( 0 ) . map ( ShardId :: from) . collect ( ) ;
963963 let publish_lock = source. publish_lock . clone ( ) ;
964964 source
965- . assign_shards ( shard_ids, & processor_mailbox , & ctx)
965+ . assign_shards ( shard_ids, & source_sink , & ctx)
966966 . await
967967 . unwrap ( ) ;
968968 assert_eq ! ( sequence_rx. recv( ) . await . unwrap( ) , 1 ) ;
@@ -976,7 +976,7 @@ mod tests {
976976 let shard_ids: BTreeSet < ShardId > = ( 0 ..2 ) . map ( ShardId :: from) . collect ( ) ;
977977 let publish_lock = source. publish_lock . clone ( ) ;
978978 source
979- . assign_shards ( shard_ids, & processor_mailbox , & ctx)
979+ . assign_shards ( shard_ids, & source_sink , & ctx)
980980 . await
981981 . unwrap ( ) ;
982982 assert_eq ! ( sequence_rx. recv( ) . await . unwrap( ) , 2 ) ;
@@ -989,7 +989,7 @@ mod tests {
989989 let shard_ids: BTreeSet < ShardId > = ( 1 ..3 ) . map ( ShardId :: from) . collect ( ) ;
990990 let publish_lock = source. publish_lock . clone ( ) ;
991991 source
992- . assign_shards ( shard_ids, & processor_mailbox , & ctx)
992+ . assign_shards ( shard_ids, & source_sink , & ctx)
993993 . await
994994 . unwrap ( ) ;
995995
@@ -1158,7 +1158,7 @@ mod tests {
11581158 let ( source_mailbox, _source_inbox) = universe. create_test_mailbox :: < SourceActor > ( ) ;
11591159 let ( doc_processor_mailbox, _doc_processor_inbox) =
11601160 universe. create_test_mailbox :: < DocProcessor > ( ) ;
1161- let processor_mailbox = ProcessorMailbox :: new ( doc_processor_mailbox. clone ( ) ) ;
1161+ let source_sink = SourceSink :: new ( doc_processor_mailbox. clone ( ) ) ;
11621162 let ( observable_state_tx, _observable_state_rx) = watch:: channel ( serde_json:: Value :: Null ) ;
11631163 let ctx: SourceContext =
11641164 ActorContext :: for_test ( & universe, source_mailbox, observable_state_tx) ;
@@ -1168,7 +1168,7 @@ mod tests {
11681168 BTreeSet :: from_iter ( [ ShardId :: from ( 1 ) , ShardId :: from ( 2 ) ] ) ;
11691169
11701170 source
1171- . assign_shards ( shard_ids, & processor_mailbox , & ctx)
1171+ . assign_shards ( shard_ids, & source_sink , & ctx)
11721172 . await
11731173 . unwrap ( ) ;
11741174
@@ -1325,7 +1325,7 @@ mod tests {
13251325 let ( source_mailbox, _source_inbox) = universe. create_test_mailbox :: < SourceActor > ( ) ;
13261326 let ( doc_processor_mailbox, _doc_processor_inbox) =
13271327 universe. create_test_mailbox :: < DocProcessor > ( ) ;
1328- let processor_mailbox = ProcessorMailbox :: new ( doc_processor_mailbox. clone ( ) ) ;
1328+ let source_sink = SourceSink :: new ( doc_processor_mailbox. clone ( ) ) ;
13291329 let ( observable_state_tx, _observable_state_rx) = watch:: channel ( serde_json:: Value :: Null ) ;
13301330 let ctx: SourceContext =
13311331 ActorContext :: for_test ( & universe, source_mailbox, observable_state_tx) ;
@@ -1339,7 +1339,7 @@ mod tests {
13391339
13401340 // In this scenario, the indexer will only be able to acquire shard 1.
13411341 source
1342- . assign_shards ( shard_ids, & processor_mailbox , & ctx)
1342+ . assign_shards ( shard_ids, & source_sink , & ctx)
13431343 . await
13441344 . unwrap ( ) ;
13451345
@@ -1392,7 +1392,7 @@ mod tests {
13921392 let ( source_mailbox, _source_inbox) = universe. create_test_mailbox :: < SourceActor > ( ) ;
13931393 let ( doc_processor_mailbox, doc_processor_inbox) =
13941394 universe. create_test_mailbox :: < DocProcessor > ( ) ;
1395- let processor_mailbox = ProcessorMailbox :: new ( doc_processor_mailbox. clone ( ) ) ;
1395+ let source_sink = SourceSink :: new ( doc_processor_mailbox. clone ( ) ) ;
13961396 let ( observable_state_tx, _observable_state_rx) = watch:: channel ( serde_json:: Value :: Null ) ;
13971397 let ctx: SourceContext =
13981398 ActorContext :: for_test ( & universe, source_mailbox, observable_state_tx) ;
@@ -1472,7 +1472,7 @@ mod tests {
14721472 ) ;
14731473 fetch_message_tx. send ( Ok ( in_flight_value) ) . await . unwrap ( ) ;
14741474
1475- source. emit_batches ( & processor_mailbox , & ctx) . await . unwrap ( ) ;
1475+ source. emit_batches ( & source_sink , & ctx) . await . unwrap ( ) ;
14761476 let doc_batch = doc_processor_inbox
14771477 . recv_typed_message :: < RawDocBatch > ( )
14781478 . await
@@ -1498,7 +1498,7 @@ mod tests {
14981498 assert_eq ! ( partition_deltas[ 1 ] . 1 . from, Position :: offset( 22u64 ) ) ;
14991499 assert_eq ! ( partition_deltas[ 1 ] . 1 . to, Position :: eof( 23u64 ) ) ;
15001500
1501- source. emit_batches ( & processor_mailbox , & ctx) . await . unwrap ( ) ;
1501+ source. emit_batches ( & source_sink , & ctx) . await . unwrap ( ) ;
15021502 let shard = source. assigned_shards . get ( & ShardId :: from ( 2 ) ) . unwrap ( ) ;
15031503 assert_eq ! ( shard. status, IndexingStatus :: ReachedEof ) ;
15041504
@@ -1512,7 +1512,7 @@ mod tests {
15121512 . await
15131513 . unwrap ( ) ;
15141514
1515- source. emit_batches ( & processor_mailbox , & ctx) . await . unwrap ( ) ;
1515+ source. emit_batches ( & source_sink , & ctx) . await . unwrap ( ) ;
15161516 let shard = source. assigned_shards . get ( & ShardId :: from ( 1 ) ) . unwrap ( ) ;
15171517 assert_eq ! ( shard. status, IndexingStatus :: Error ) ;
15181518
@@ -1533,7 +1533,7 @@ mod tests {
15331533 ) ;
15341534 fetch_message_tx. send ( Ok ( in_flight_value) ) . await . unwrap ( ) ;
15351535
1536- source. emit_batches ( & processor_mailbox , & ctx) . await . unwrap ( ) ;
1536+ source. emit_batches ( & source_sink , & ctx) . await . unwrap ( ) ;
15371537 let shard = source. assigned_shards . get ( & ShardId :: from ( 1 ) ) . unwrap ( ) ;
15381538 assert_eq ! ( shard. status, IndexingStatus :: Active ) ;
15391539 }
@@ -1616,19 +1616,19 @@ mod tests {
16161616 let ( source_mailbox, _source_inbox) = universe. create_test_mailbox :: < SourceActor > ( ) ;
16171617 let ( doc_processor_mailbox, doc_processor_inbox) =
16181618 universe. create_test_mailbox :: < DocProcessor > ( ) ;
1619- let processor_mailbox = ProcessorMailbox :: new ( doc_processor_mailbox. clone ( ) ) ;
1619+ let source_sink = SourceSink :: new ( doc_processor_mailbox. clone ( ) ) ;
16201620 let ( observable_state_tx, _observable_state_rx) = watch:: channel ( serde_json:: Value :: Null ) ;
16211621 let ctx: SourceContext =
16221622 ActorContext :: for_test ( & universe, source_mailbox, observable_state_tx) ;
16231623
16241624 let shard_ids: BTreeSet < ShardId > = BTreeSet :: from_iter ( [ ShardId :: from ( 1 ) ] ) ;
16251625
16261626 source
1627- . assign_shards ( shard_ids, & processor_mailbox , & ctx)
1627+ . assign_shards ( shard_ids, & source_sink , & ctx)
16281628 . await
16291629 . unwrap ( ) ;
16301630
1631- source. emit_batches ( & processor_mailbox , & ctx) . await . unwrap ( ) ;
1631+ source. emit_batches ( & source_sink , & ctx) . await . unwrap ( ) ;
16321632
16331633 let shard = source. assigned_shards . get ( & ShardId :: from ( 1 ) ) . unwrap ( ) ;
16341634 assert_eq ! ( shard. status, IndexingStatus :: NotFound ) ;
@@ -1904,7 +1904,7 @@ mod tests {
19041904 let ( source_mailbox, _source_inbox) = universe. create_test_mailbox :: < SourceActor > ( ) ;
19051905 let ( doc_processor_mailbox, _doc_processor_inbox) =
19061906 universe. create_test_mailbox :: < DocProcessor > ( ) ;
1907- let processor_mailbox = ProcessorMailbox :: new ( doc_processor_mailbox. clone ( ) ) ;
1907+ let source_sink = SourceSink :: new ( doc_processor_mailbox. clone ( ) ) ;
19081908 let ( observable_state_tx, _observable_state_rx) = watch:: channel ( serde_json:: Value :: Null ) ;
19091909 let ctx: SourceContext =
19101910 ActorContext :: for_test ( & universe, source_mailbox, observable_state_tx) ;
@@ -1919,7 +1919,7 @@ mod tests {
19191919 } ) ;
19201920
19211921 source
1922- . assign_shards ( shard_ids, & processor_mailbox , & ctx)
1922+ . assign_shards ( shard_ids, & source_sink , & ctx)
19231923 . await
19241924 . unwrap ( ) ;
19251925
0 commit comments