Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 107 additions & 8 deletions kernel/src/engine/arrow_expression/evaluate_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -902,10 +902,22 @@ fn evaluate_map_to_struct(
.map(|f| ArrowField::try_from_kernel(*f))
.try_collect()?;

// Propagate the input map's null bitmap to the output struct. This is critical:
// when a map row is null, the loop above appends null to every child builder
// (since no keys match). Without this null bitmap, the output struct row appears
// valid (non-null) to Arrow, but its children contain nulls. If any child field
// is non-nullable, Arrow rejects this as "Found unmasked nulls for non-nullable
// StructArray field". With the bitmap, the struct row is marked null, which masks
// the child nulls and satisfies Arrow's validation.
//
// This matters during checkpoint creation: the COALESCE expression evaluates
// MAP_TO_STRUCT for all rows including non-add actions (remove, metadata, protocol)
// where the partition values map is null. Partition columns declared NOT NULL would
// cause the checkpoint to fail without this propagation.
Ok(StructArray::try_new(
arrow_fields.into(),
output_columns,
None,
map_array.nulls().cloned(),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment.

  • Before: This code was None and it was so obviously correct that it wasn't worth having a comment about why it is None.
  • Now: The code is map_array.nulls().cloned() and, since I don't see any comment, apparently it is obvious why this should be map_array.nulls().cloned().

Yet -- this was a bug. Clearly it is not obvious.

Please add a detailed comment explaining why, what happens when it is present, what happens if it was NOT present, etc.

We need to remind ourselves in the future (and perhaps in other similar areas of the code) why this value MUST be set.

@dengsh12 and @DrakeLin -- Remember to raise the bar on PR reviews and aim for excellent code clariity

Copy link
Copy Markdown
Collaborator

@dengsh12 dengsh12 Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! But in this special case ... I feel like reserving map_array.nulls().cloned() is the straightforward way(extract things from the MapArray and retain the nulls), perhaps we should add comments if we set it to None(uncommon since dropping the nullabilities)? Let's add a comment stating the corner case, and in the future claude would know this corner case exists

Copy link
Copy Markdown
Contributor Author

@momcilomrk-db momcilomrk-db Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree I think it makes sense to add detailed comments now more than before since Claude can pick it up and not create the same bug in the future.

For example I asked the claude to look at the previous PRs (to find when the bug was introduced), and it also found this related issue #1645 which seems to be the same None pattern. If there was a comment maybe it would pick it up in this case as well.

)?)
}

Expand All @@ -924,7 +936,8 @@ mod tests {

use super::*;
use crate::arrow::array::{
ArrayRef, BooleanArray, Int32Array, Int64Array, StringArray, StructArray,
ArrayRef, BooleanArray, Int32Array, Int64Array, MapBuilder, StringArray, StringBuilder,
StructArray,
};
use crate::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
Expand Down Expand Up @@ -1971,8 +1984,6 @@ mod tests {

/// Helper: creates a RecordBatch with a `pv` column of type Map<String, String>.
fn create_partition_map_batch() -> RecordBatch {
use crate::arrow::array::{MapBuilder, StringBuilder};

let mut builder = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());

// Row 0: {"date": "2024-01-15", "region": "us", "id": "42"}
Expand Down Expand Up @@ -2073,8 +2084,6 @@ mod tests {

#[test]
fn test_map_to_struct_parse_error() {
use crate::arrow::array::{MapBuilder, StringBuilder};

let mut builder = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());
builder.keys().append_value("count");
builder.values().append_value("not_a_number");
Expand All @@ -2098,8 +2107,6 @@ mod tests {

#[test]
fn test_map_to_struct_duplicate_keys() {
use crate::arrow::array::{MapBuilder, StringBuilder};

let mut builder = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());
builder.keys().append_value("x");
builder.values().append_value("first");
Expand Down Expand Up @@ -2130,6 +2137,98 @@ mod tests {
assert_eq!(col.value(0), "last");
}

#[rstest]
#[case::mixed_nulls(
vec![
Some(vec![("region", "us"), ("id", "42")]),
None,
Some(vec![("region", "eu"), ("id", "7")]),
],
vec![true, false, true],
)]
#[case::all_nulls(vec![None, None], vec![false, false])]
fn test_map_to_struct_null_propagation_with_non_nullable_fields(
#[case] rows: Vec<Option<Vec<(&str, &str)>>>,
#[case] expected_validity: Vec<bool>,
) {
let mut builder = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());
for row in &rows {
match row {
Some(entries) => {
for (k, v) in entries {
builder.keys().append_value(k);
builder.values().append_value(v);
}
builder.append(true).unwrap();
}
None => {
builder.append(false).unwrap();
}
}
}
let map_array = builder.finish();
let schema = ArrowSchema::new(vec![ArrowField::new(
"pv",
map_array.data_type().clone(),
true,
)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(map_array)]).unwrap();

let output_schema = StructType::new_unchecked(vec![
StructField::new("region", DataType::STRING, false),
StructField::new("id", DataType::INTEGER, false),
]);
let result_type = DataType::Struct(Box::new(output_schema));
let expr = Expr::map_to_struct(column_expr!("pv"));
let result = evaluate_expression(&expr, &batch, Some(&result_type)).unwrap();
let structs = result.as_any().downcast_ref::<StructArray>().unwrap();

assert_eq!(structs.len(), expected_validity.len());
for (i, &valid) in expected_validity.iter().enumerate() {
assert_eq!(structs.is_valid(i), valid, "row {i} validity mismatch");
}
}

#[test]
fn test_coalesce_map_to_struct_with_null_map_non_nullable_fields() {
let mut builder = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());
builder.keys().append_value("date");
builder.values().append_value("2024-01-15");
builder.append(true).unwrap();
builder.append(false).unwrap();

let map_array = builder.finish();
let map_type = map_array.data_type().clone();
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(
"pv_parsed",
ArrowDataType::Struct(
vec![ArrowField::new("date", ArrowDataType::Date32, false)].into(),
),
true,
),
ArrowField::new("pv", map_type, true),
]));

let pv_parsed = new_null_array(schema.field(0).data_type(), 2);
let batch = RecordBatch::try_new(schema, vec![pv_parsed, Arc::new(map_array)]).unwrap();

let output_schema =
StructType::new_unchecked(vec![StructField::new("date", DataType::DATE, false)]);
let result_type = DataType::Struct(Box::new(output_schema));
let expr = Expr::coalesce([
Expr::column(["pv_parsed"]),
Expr::map_to_struct(column_expr!("pv")),
]);
let result = evaluate_expression(&expr, &batch, Some(&result_type)).unwrap();
let structs = result.as_any().downcast_ref::<StructArray>().unwrap();

// Row 0: pv_parsed null, MAP_TO_STRUCT succeeds
assert!(structs.is_valid(0));
// Row 1: pv_parsed null, map null → null struct
assert!(structs.is_null(1));
}

#[test]
fn test_map_to_struct_non_map_input() {
let schema = ArrowSchema::new(vec![ArrowField::new("s", ArrowDataType::Utf8, true)]);
Expand Down
Loading