@@ -6,11 +6,11 @@ use chrono::NaiveDate;
66use common_domain:: ids:: { AliasOr , CustomerId , PlanId , PlanVersionId , TenantId } ;
77use csv:: ReaderBuilder ;
88use error_stack:: bail;
9- use futures:: StreamExt ;
9+ use meteroid_store:: Services ;
10+ use meteroid_store:: StoreResult ;
1011use meteroid_store:: domain:: enums:: SubscriptionActivationCondition ;
1112use meteroid_store:: errors:: StoreError ;
1213use meteroid_store:: repositories:: { CustomersInterface , PlansInterface } ;
13- use meteroid_store:: { Services , StoreResult } ;
1414use serde:: Deserialize ;
1515use uuid:: Uuid ;
1616
@@ -22,8 +22,9 @@ use super::csv_ingest::{
2222 CsvString , optional_csv_string, optional_naive_date, optional_u16, optional_u32,
2323} ;
2424
25- const MAX_CSV_SIZE : usize = 10 * 1024 * 1024 ; // 10MB limit
26- const CONCURRENCY : usize = 10 ;
25+ const MAX_CSV_SIZE : usize = 10 * 1024 * 1024 ; // 10MB
26+ const MAX_ROW_COUNT : usize = 5_000 ;
27+ const BATCH_SIZE : usize = 50 ;
2728
2829pub struct SubscriptionIngestionOptions {
2930 pub delimiter : char ,
@@ -84,6 +85,12 @@ impl SubscriptionIngestService {
8485
8586 let total_rows = ( raw_rows. len ( ) + failures. len ( ) ) as i32 ;
8687
88+ if total_rows as usize > MAX_ROW_COUNT {
89+ bail ! ( StoreError :: InvalidArgument ( format!(
90+ "Row count ({total_rows}) exceeds maximum allowed ({MAX_ROW_COUNT})"
91+ ) ) ) ;
92+ }
93+
8794 let proceed_on_failures =
8895 |f : & [ SubscriptionIngestionFailure ] | !options. fail_on_error || f. is_empty ( ) ;
8996
@@ -103,71 +110,116 @@ impl SubscriptionIngestService {
103110 validated_rows = rows;
104111 }
105112
106- let parsed: Vec < ( i32 , Option < String > , CreateSubscription ) > =
107- if proceed_on_failures ( & failures) {
108- validated_rows
109- . into_iter ( )
110- . map ( |row| {
111- let idempotency_key = row
112- . csv
113- . idempotency_key
114- . as_ref ( )
115- . map ( |k| format ! ( "{}:{}" , tenant_id, k) ) ;
116- (
117- row. row_number ,
118- idempotency_key,
119- Self :: map_to_domain (
120- actor,
121- row. csv ,
122- row. customer_id ,
123- row. plan_version_id ,
124- ) ,
125- )
126- } )
127- . collect ( )
128- } else {
129- vec ! [ ]
130- } ;
113+ // Map to domain and filter idempotent duplicates before DB work
114+ #[ allow( clippy:: type_complexity) ]
115+ let mut to_insert: Vec < ( Vec < i32 > , Vec < Option < String > > , Vec < CreateSubscription > ) > =
116+ Vec :: new ( ) ;
117+ let mut skipped_rows = 0i32 ;
131118
132- tracing:: info!( "Processing {} subscription records" , parsed. len( ) ) ;
119+ if proceed_on_failures ( & failures) {
120+ let mut current_batch_rows: Vec < i32 > = Vec :: new ( ) ;
121+ let mut current_batch_keys: Vec < Option < String > > = Vec :: new ( ) ;
122+ let mut current_batch_subs: Vec < CreateSubscription > = Vec :: new ( ) ;
123+
124+ for row in validated_rows {
125+ let idempotency_key = row
126+ . csv
127+ . idempotency_key
128+ . as_ref ( )
129+ . map ( |k| format ! ( "{}:{}" , tenant_id, k) ) ;
130+
131+ // Check idempotency before adding to batch
132+ if let Some ( ref key) = idempotency_key
133+ && self
134+ . idempotency
135+ . check_and_set ( key. clone ( ) , Duration :: from_secs ( 24 * 3600 ) )
136+ . await
137+ {
138+ skipped_rows += 1 ;
139+ continue ;
140+ }
133141
134- let mut successful_rows = 0 ;
142+ current_batch_rows. push ( row. row_number ) ;
143+ current_batch_keys. push ( idempotency_key) ;
144+ current_batch_subs. push ( Self :: map_to_domain (
145+ actor,
146+ row. csv ,
147+ row. customer_id ,
148+ row. plan_version_id ,
149+ ) ) ;
150+
151+ if current_batch_subs. len ( ) >= BATCH_SIZE {
152+ to_insert. push ( (
153+ std:: mem:: take ( & mut current_batch_rows) ,
154+ std:: mem:: take ( & mut current_batch_keys) ,
155+ std:: mem:: take ( & mut current_batch_subs) ,
156+ ) ) ;
157+ }
158+ }
135159
136- let mut stream = futures:: stream:: iter ( parsed)
137- . map ( |( row_number, idempotency_key, sub) | {
138- let services = self . services . clone ( ) ;
139- let idempotency = self . idempotency . clone ( ) ;
140- async move {
141- if let Some ( key) = & idempotency_key
142- && idempotency
143- . check_and_set ( key. clone ( ) , Duration :: from_secs ( 24 * 3600 ) )
144- . await
145- {
146- return ( row_number, idempotency_key, Ok ( ( ) ) ) ;
147- }
148- let result = services
149- . insert_subscription ( sub, tenant_id)
150- . await
151- . map ( |_| ( ) ) ;
152- ( row_number, idempotency_key, result)
160+ if !current_batch_subs. is_empty ( ) {
161+ to_insert. push ( ( current_batch_rows, current_batch_keys, current_batch_subs) ) ;
162+ }
163+ }
164+
165+ let insert_count: usize = to_insert. iter ( ) . map ( |( _, _, s) | s. len ( ) ) . sum ( ) ;
166+ tracing:: info!(
167+ "Processing {} subscription records in {} batches ({} skipped as idempotent)" ,
168+ insert_count,
169+ to_insert. len( ) ,
170+ skipped_rows,
171+ ) ;
172+
173+ // Includes idempotent-skipped rows (already processed in a prior import)
174+ let mut successful_rows = skipped_rows;
175+
176+ for ( batch_rows, batch_keys, batch_subs) in to_insert {
177+ let mut conn = self . services . store ( ) . get_conn ( ) . await ?;
178+
179+ match self
180+ . services
181+ . insert_subscription_batch_tx ( & mut conn, batch_subs. clone ( ) , tenant_id)
182+ . await
183+ {
184+ Ok ( results) => {
185+ successful_rows += results. len ( ) as i32 ;
153186 }
154- } )
155- . buffer_unordered ( CONCURRENCY ) ;
156-
157- while let Some ( ( row_number, idempotency_key, result) ) = stream. next ( ) . await {
158- match result {
159- Ok ( ( ) ) => successful_rows += 1 ,
160- Err ( e) => {
161- if let Some ( key) = idempotency_key {
187+ Err ( e) if options. fail_on_error => {
188+ for key in batch_keys. into_iter ( ) . flatten ( ) {
162189 self . idempotency . invalidate ( key) . await ;
163190 }
164- failures. push ( SubscriptionIngestionFailure {
165- row_number,
166- reason : e. to_string ( ) ,
167- } ) ;
168-
169- if options. fail_on_error {
170- break ;
191+ let reason = e. to_string ( ) ;
192+ for row_number in batch_rows {
193+ failures. push ( SubscriptionIngestionFailure {
194+ row_number,
195+ reason : reason. clone ( ) ,
196+ } ) ;
197+ }
198+ break ;
199+ }
200+ Err ( _) => {
201+ // Batch failed in continue mode: retry individually for per-row errors
202+ drop ( conn) ;
203+ let mut retry_conn = self . services . store ( ) . get_conn ( ) . await ?;
204+ for ( i, sub) in batch_subs. into_iter ( ) . enumerate ( ) {
205+ match self
206+ . services
207+ . insert_subscription_batch_tx ( & mut retry_conn, vec ! [ sub] , tenant_id)
208+ . await
209+ {
210+ Ok ( _) => {
211+ successful_rows += 1 ;
212+ }
213+ Err ( e) => {
214+ if let Some ( key) = batch_keys[ i] . as_ref ( ) {
215+ self . idempotency . invalidate ( key. clone ( ) ) . await ;
216+ }
217+ failures. push ( SubscriptionIngestionFailure {
218+ row_number : batch_rows[ i] ,
219+ reason : e. to_string ( ) ,
220+ } ) ;
221+ }
222+ }
171223 }
172224 }
173225 }
0 commit comments