Skip to content

Commit 0fe95dd

Browse files
committed
conf
1 parent 6639d4e commit 0fe95dd

File tree

2 files changed

+93
-2
lines changed

2 files changed

+93
-2
lines changed

kernel/src/table_configuration.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,9 @@ impl TableConfiguration {
295295
/// Returns the physical partition schema for `partitionValues_parsed`.
296296
///
297297
/// Field names are physical column names (respecting column mapping mode),
298-
/// and field types are the actual partition column data types with their original nullability.
298+
/// and field types are the actual partition column data types. All fields are forced nullable
299+
/// because partition values are derived from map lookups (`MAP_TO_STRUCT` over the
300+
/// string-valued `partitionValues` map), which can return null for any missing key.
299301
/// Returns `None` if the table has no partition columns.
300302
pub(crate) fn build_partition_values_parsed_schema(&self) -> Option<SchemaRef> {
301303
let partition_columns = self.metadata().partition_columns();
@@ -317,7 +319,7 @@ impl TableConfiguration {
317319
StructField::new(
318320
field.physical_name(column_mapping_mode).to_owned(),
319321
field.data_type().clone(),
320-
field.is_nullable(),
322+
true, // Always nullable: values come from map lookups
321323
)
322324
})
323325
.collect();

kernel/tests/checkpoint_transform.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,9 @@ async fn test_checkpoint_partition_values_parsed_with_column_mapping(
532532
Ok(())
533533
}
534534

535+
/// Schema with a non-nullable partition column (`category`).
536+
const NON_NULLABLE_PARTITION_SCHEMA: &str = r#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"category","type":"string","nullable":false,"metadata":{}}]}"#;
537+
535538
const WIDE_SCHEMA: &str = r#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"long","nullable":true,"metadata":{}}]}"#;
536539

537540
/// Regression test for https://github.com/delta-io/delta-kernel-rs/issues/2165
@@ -624,3 +627,89 @@ async fn test_scan_schema_evolved_table_with_checkpoint_predicate_on_new_column(
624627

625628
Ok(())
626629
}
630+
631+
/// Checkpoint creation succeeds for tables with non-nullable partition columns and
632+
/// writeStatsAsStruct=true. The `partitionValues_parsed` schema must force all partition fields
633+
/// nullable (values come from map lookups which can return null), regardless of what the table
634+
/// schema declares.
635+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
636+
async fn test_checkpoint_non_nullable_partition_column() -> Result<(), Box<dyn std::error::Error>> {
637+
let (store, table_root) = new_in_memory_store();
638+
let executor = Arc::new(TokioMultiThreadExecutor::new(
639+
tokio::runtime::Handle::current(),
640+
));
641+
let engine = Arc::new(
642+
DefaultEngineBuilder::new(store.clone())
643+
.with_task_executor(executor)
644+
.build(),
645+
);
646+
647+
// Version 0: protocol + metadata with non-nullable partition column, stats as struct
648+
write_commit(
649+
&store,
650+
&build_commit(
651+
NON_NULLABLE_PARTITION_SCHEMA,
652+
&["category"],
653+
true,
654+
true,
655+
true,
656+
),
657+
0,
658+
)
659+
.await?;
660+
661+
// Version 1: write data for partition category=books
662+
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
663+
let batch = RecordBatch::try_new(
664+
Arc::new(ArrowSchema::new(vec![
665+
Field::new("id", ArrowDataType::Int64, true),
666+
Field::new("name", ArrowDataType::Utf8, true),
667+
])),
668+
vec![
669+
Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
670+
Arc::new(StringArray::from(vec!["Alice", "Bob"])),
671+
],
672+
)?;
673+
write_batch_to_table(
674+
&snapshot,
675+
engine.as_ref(),
676+
batch,
677+
HashMap::from([("category".to_string(), Scalar::String("books".into()))]),
678+
)
679+
.await?;
680+
681+
// Checkpoint with writeStatsAsStruct=true -- this would fail with
682+
// Arrow(InvalidArgumentError("Found unmasked nulls for non-nullable StructArray field"))
683+
// if partitionValues_parsed copied the non-nullable constraint from the table schema.
684+
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
685+
snapshot.checkpoint(engine.as_ref())?;
686+
687+
// Verify data round-trips correctly through a scan after the checkpoint
688+
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
689+
let scan = snapshot.scan_builder().build()?;
690+
let batches = read_scan(&scan, engine.clone())?;
691+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
692+
assert_eq!(total_rows, 2);
693+
694+
let schema = batches[0].schema();
695+
let merged = concat_batches(&schema, &batches)?;
696+
let names: Vec<&str> = merged
697+
.column_by_name("name")
698+
.unwrap()
699+
.as_string::<i32>()
700+
.iter()
701+
.map(|v| v.unwrap())
702+
.collect();
703+
assert_eq!(names, vec!["Alice", "Bob"]);
704+
705+
let categories: Vec<&str> = merged
706+
.column_by_name("category")
707+
.unwrap()
708+
.as_string::<i32>()
709+
.iter()
710+
.map(|v| v.unwrap())
711+
.collect();
712+
assert_eq!(categories, vec!["books", "books"]);
713+
714+
Ok(())
715+
}

0 commit comments

Comments
 (0)