forked from delta-io/delta-kernel-rs
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtable_configuration.rs
More file actions
2184 lines (2007 loc) · 85.6 KB
/
table_configuration.rs
File metadata and controls
2184 lines (2007 loc) · 85.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//! This module defines [`TableConfiguration`], a high level api to check feature support and
//! feature enablement for a table at a given version. This encapsulates [`Protocol`], [`Metadata`],
//! [`Schema`], [`TableProperties`], and [`ColumnMappingMode`]. These structs in isolation should
//! be considered raw and unvalidated if they are not a part of [`TableConfiguration`]. We unify
//! these fields because they are deeply intertwined when dealing with table features. For example:
//! To check that deletion vector writes are enabled, you must check both both the protocol's
//! reader/writer features, and ensure that the deletion vector table property is enabled in the
//! [`TableProperties`].
//!
//! [`Schema`]: crate::schema::Schema
use std::borrow::Cow;
use std::collections::HashSet;
use std::sync::{Arc, OnceLock};
use delta_kernel_derive::internal_api;
use tracing::warn;
use url::Url;
use crate::actions::{Metadata, Protocol};
use crate::expressions::ColumnName;
use crate::scan::data_skipping::stats_schema::{
expected_stats_schema, stats_column_names, StatsConfig, StripFieldMetadataTransform,
};
pub(crate) use crate::schema::variant_utils::validate_variant_type_feature_support;
use crate::schema::{schema_has_invariants, SchemaRef, StructField, StructType};
use crate::table_features::{
column_mapping_mode, get_any_level_column_physical_name,
validate_timestamp_ntz_feature_support, ColumnMappingMode, EnablementCheck, FeatureRequirement,
FeatureType, KernelSupport, Operation, TableFeature, LEGACY_READER_FEATURES,
LEGACY_WRITER_FEATURES, MAX_VALID_READER_VERSION, MAX_VALID_WRITER_VERSION,
MIN_VALID_RW_VERSION, TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION,
};
use crate::table_properties::TableProperties;
use crate::transforms::SchemaTransform as _;
use crate::utils::require;
use crate::{DeltaResult, Error, Version};
/// Expected schema for file statistics, using physical column names.
///
/// Wrapped in a struct so it can be extended with a logical-name variant if needed.
#[allow(unused)]
#[derive(Debug, Clone)]
#[internal_api]
pub(crate) struct ExpectedStatsSchemas {
/// Stats schema using physical column names (for storage).
pub physical: SchemaRef,
}
/// Information about in-commit timestamp enablement state.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum InCommitTimestampEnablement {
/// In-commit timestamps is not enabled
NotEnabled,
/// In-commit timestamps is enabled
Enabled {
/// Enablement information, if available. `None` indicates the table was created
/// with ICT enabled from the beginning (no enablement properties needed).
enablement: Option<(Version, i64)>,
},
}
/// Utility function to strip field metadata from stats schemas. This metadata describes logical
/// table columns, not the stats. Keeping it can cause schema mismatches when combining the parsed
/// stats from a checkpoint written before logical metadata was added.
fn strip_metadata(schema: SchemaRef) -> SchemaRef {
match StripFieldMetadataTransform.transform_struct(&schema) {
Some(Cow::Owned(s)) => Arc::new(s),
_ => schema,
}
}
/// Physical schema variants for a table.
///
/// - `full`: physical representations of all columns from [`TableConfiguration::logical_schema`].
/// - `without_partition`: lazily computed variant that excludes partition columns.
#[derive(Debug, Clone, Eq)]
struct PhysicalSchemas {
full: SchemaRef,
without_partition: OnceLock<SchemaRef>,
}
impl PhysicalSchemas {
fn new(full: SchemaRef) -> Self {
Self {
full,
without_partition: OnceLock::new(),
}
}
}
impl PartialEq for PhysicalSchemas {
fn eq(&self, other: &Self) -> bool {
// `without_partition` is deterministically derived from `full` and partition columns
// (compared via `metadata` in TableConfiguration's PartialEq), so comparing it is
// redundant. Two PhysicalSchemas with the same `full` are considered equal even if
// one has `without_partition` initialized and the other does not.
self.full == other.full
}
}
/// Holds all the configuration for a table at a specific version. This includes the supported
/// reader and writer features, table properties, schema, version, and table root. This can be used
/// to check whether a table supports a feature or has it enabled. For example, deletion vector
/// support can be checked with [`TableConfiguration::is_feature_supported`] and deletion
/// vector write enablement can be checked with [`TableConfiguration::is_feature_enabled`].
///
/// [`TableConfiguration`] performs checks upon construction with `TableConfiguration::try_new`
/// to validate that Metadata and Protocol are correctly formatted and mutually compatible.
/// After construction, call `ensure_operation_supported` to verify that the kernel supports the
/// required operations for the table's protocol features.
#[internal_api]
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct TableConfiguration {
metadata: Metadata,
protocol: Protocol,
/// Logical schema: field names are the user-facing (logical) column names.
logical_schema: SchemaRef,
physical_schemas: PhysicalSchemas,
table_properties: TableProperties,
column_mapping_mode: ColumnMappingMode,
table_root: Url,
version: Version,
}
impl TableConfiguration {
/// Constructs a [`TableConfiguration`] for a table located in `table_root` at `version`.
/// This validates that the [`Metadata`] and [`Protocol`] are compatible with one another
/// and that the kernel supports reading from this table.
///
/// Note: This only returns successfully if kernel supports reading the table. It's important
/// to do this validation in `try_new` because all table accesses must first construct
/// the [`TableConfiguration`]. This ensures that developers never forget to check that kernel
/// supports reading the table, and that all table accesses are legal.
///
/// Note: In the future, we will perform stricter checks on the set of reader and writer
/// features. In particular, we will check that:
/// - Non-legacy features must appear in both reader features and writer features lists. If
/// such a feature is present, the reader version and writer version must be 3, and 5
/// respectively.
/// - Legacy reader features occur when the reader version is 3, but the writer version is
/// either 5 or 6. In this case, the writer feature list must be empty.
/// - Column mapping is the only legacy feature present in kernel. No future delta versions
/// will introduce new legacy features.
/// See: <https://github.com/delta-io/delta-kernel-rs/issues/650>
#[internal_api]
pub(crate) fn try_new(
metadata: Metadata,
protocol: Protocol,
table_root: Url,
version: Version,
) -> DeltaResult<Self> {
let logical_schema = Arc::new(metadata.parse_schema()?);
let table_properties = metadata.parse_table_properties();
let column_mapping_mode = column_mapping_mode(&protocol, &table_properties);
let physical_schema = Arc::new(logical_schema.make_physical(column_mapping_mode)?);
let physical_schemas = PhysicalSchemas::new(physical_schema);
let table_config = Self {
logical_schema,
physical_schemas,
metadata,
protocol,
table_properties,
column_mapping_mode,
table_root,
version,
};
// Validate schema against protocol features now that we have a TC instance.
validate_timestamp_ntz_feature_support(&table_config)?;
validate_variant_type_feature_support(&table_config)?;
Ok(table_config)
}
pub(crate) fn try_new_from(
table_configuration: &Self,
new_metadata: Option<Metadata>,
new_protocol: Option<Protocol>,
new_version: Version,
) -> DeltaResult<Self> {
// simplest case: no new P/M, just return the existing table configuration with new version
if new_metadata.is_none() && new_protocol.is_none() {
return Ok(Self {
version: new_version,
..table_configuration.clone()
});
}
// note that while we could pick apart the protocol/metadata updates and validate them
// individually, instead we just re-parse so that we can recycle the try_new validation
// (instead of duplicating it here).
Self::try_new(
new_metadata.unwrap_or_else(|| table_configuration.metadata.clone()),
new_protocol.unwrap_or_else(|| table_configuration.protocol.clone()),
table_configuration.table_root.clone(),
new_version,
)
}
/// Creates a new [`TableConfiguration`] representing the table configuration immediately
/// after a commit.
///
/// This method takes the current table configuration and produces a post-commit
/// configuration at the committed version. If the commit included new Protocol or Metadata
/// actions (e.g. ALTER TABLE), those are passed in and the configuration is rebuilt with
/// full validation. Otherwise the existing configuration is cloned with only the version
/// updated.
///
/// Returns the new [`TableConfiguration`] at `new_version`.
///
/// # Errors
///
/// Returns an error if the new metadata/protocol combination fails
/// [`TableConfiguration::try_new`] validation (e.g., unsupported features, invalid schema).
pub(crate) fn new_post_commit(
table_configuration: &Self,
new_version: Version,
new_metadata: Option<Metadata>,
new_protocol: Option<Protocol>,
) -> DeltaResult<Self> {
Self::try_new_from(table_configuration, new_metadata, new_protocol, new_version)
}
/// Generates the expected schema for file statistics.
///
/// Engines can provide statistics for files written to the delta table, enabling
/// data skipping and other optimizations. Returns the physical stats schema wrapped in
/// an `ExpectedStatsSchemas`.
///
/// The schema is structured as:
/// ```text
/// {
/// numRecords: long,
/// nullCount: { <columns with LONG type> },
/// minValues: { <columns with original types> },
/// maxValues: { <columns with original types> },
/// }
/// ```
///
/// The schemas are affected by:
/// - **Column mapping mode**: Physical schema field names use physical names from column
/// mapping metadata.
/// - **`delta.dataSkippingStatsColumns`**: If set, only specified columns are included.
/// - **`delta.dataSkippingNumIndexedCols`**: Otherwise, includes the first N leaf columns
/// (default 32).
/// - **Required columns** (e.g. clustering columns): Per the Delta protocol, always included in
/// statistics, regardless of the above settings.
/// - **Requested columns**: Optional output filter that limits which columns appear in the
/// schema without affecting column counting.
///
/// See the Delta protocol for more details on per-file statistics:
/// <https://github.com/delta-io/delta/blob/master/PROTOCOL.md#per-file-statistics>
#[allow(unused)]
#[internal_api]
pub(crate) fn build_expected_stats_schemas(
&self,
required_physical_columns: Option<&[ColumnName]>,
requested_physical_columns: Option<&[ColumnName]>,
) -> DeltaResult<ExpectedStatsSchemas> {
let physical_data_schema = self.physical_data_schema_without_partition_columns();
let required_physical_stats_columns = self.required_physical_stats_columns();
let config = StatsConfig {
data_skipping_stats_columns: required_physical_stats_columns.as_deref(),
data_skipping_num_indexed_cols: self.table_properties().data_skipping_num_indexed_cols,
};
let physical_stats_schema = Arc::new(expected_stats_schema(
&physical_data_schema,
&config,
required_physical_columns,
requested_physical_columns,
)?);
let physical_stats_schema = strip_metadata(physical_stats_schema);
Ok(ExpectedStatsSchemas {
physical: physical_stats_schema,
})
}
/// Returns the list of physical column names that should have statistics collected.
///
/// Partition columns are excluded first (their values are already in the Add action's
/// `partitionValues` field). Among the remaining columns, if `required_columns` is `Some`,
/// those columns are always included regardless of `dataSkippingNumIndexedCols` or
/// `dataSkippingStatsColumns` settings (e.g. clustering columns).
pub(crate) fn physical_stats_column_names(
&self,
required_columns: Option<&[ColumnName]>,
) -> Vec<ColumnName> {
let physical_stats_columns = self.required_physical_stats_columns();
let config = StatsConfig {
data_skipping_stats_columns: physical_stats_columns.as_deref(),
data_skipping_num_indexed_cols: self.table_properties().data_skipping_num_indexed_cols,
};
stats_column_names(
&self.physical_data_schema_without_partition_columns(),
&config,
required_columns,
)
}
/// Returns the physical partition schema for `partitionValues_parsed`.
///
/// Field names are physical column names (respecting column mapping mode),
/// and field types are the actual partition column data types with their original nullability.
/// Returns `None` if the table has no partition columns.
pub(crate) fn build_partition_values_parsed_schema(&self) -> Option<SchemaRef> {
let partition_columns = self.metadata().partition_columns();
if partition_columns.is_empty() {
return None;
}
let logical_schema = self.logical_schema();
let column_mapping_mode = self.column_mapping_mode();
let partition_fields: Vec<StructField> = partition_columns
.iter()
.filter_map(|col_name| {
let field = logical_schema.field(col_name);
if field.is_none() {
warn!("Partition column '{col_name}' not found in table schema");
}
field
})
.map(|field: &StructField| {
StructField::new(
field.physical_name(column_mapping_mode).to_owned(),
field.data_type().clone(),
field.is_nullable(),
)
})
.collect();
Some(Arc::new(StructType::new_unchecked(partition_fields)))
}
/// Returns the logical schema for data columns (excludes partition columns).
///
/// Partition columns are excluded because statistics are only collected for data columns
/// that are physically stored in the parquet files. Partition values are stored in the
/// file path, not in the file content, so they don't have file-level statistics.
fn logical_data_schema(&self) -> SchemaRef {
let partition_columns = self.partition_columns();
Arc::new(StructType::new_unchecked(
self.logical_schema()
.fields()
.filter(|field| !partition_columns.contains(field.name()))
.cloned(),
))
}
/// Returns the physical data schema excluding partition columns.
pub(crate) fn physical_data_schema_without_partition_columns(&self) -> SchemaRef {
self.physical_schemas
.without_partition
.get_or_init(|| {
let partition_columns: HashSet<&str> = self
.partition_columns()
.iter()
.map(|s| s.as_str())
.collect();
// Safety: subset of an already-valid schema.
Arc::new(StructType::new_unchecked(
self.logical_schema()
.fields()
.zip(self.physical_schemas.full.fields())
.filter(|(logical_field, _)| {
!partition_columns.contains(logical_field.name().as_str())
})
.map(|(_, physical_field)| physical_field.clone()),
))
})
.clone()
}
/// Translates `delta.dataSkippingStatsColumns` entries to physical column names.
///
/// Returns `None` if the table property is not set. Entries that cannot be resolved
/// (e.g. non-existent columns) are silently skipped with a warning.
fn required_physical_stats_columns(&self) -> Option<Vec<ColumnName>> {
self.table_properties()
.data_skipping_stats_columns
.as_ref()
.map(|cols| {
let logical_schema = self.logical_data_schema();
let mode = self.column_mapping_mode();
cols.iter()
.filter_map(|col| {
get_any_level_column_physical_name(&logical_schema, col, mode)
// Theoretically this should always resolve — if it doesn't,
// the user specified a non-existent column in
// delta.dataSkippingStatsColumns, which is safe to ignore.
.inspect_err(|e| {
warn!(
"Couldn't translate dataSkippingStatsColumns entry '{col}' \
to physical name: {e}; skipping"
);
})
.ok()
})
.collect()
})
}
/// The [`Metadata`] for this table at this version.
#[internal_api]
pub(crate) fn metadata(&self) -> &Metadata {
&self.metadata
}
/// The [`Protocol`] of this table at this version.
#[allow(unused)]
#[internal_api]
pub(crate) fn protocol(&self) -> &Protocol {
&self.protocol
}
/// The logical schema ([`SchemaRef`]) of this table at this version.
#[internal_api]
pub(crate) fn logical_schema(&self) -> SchemaRef {
self.logical_schema.clone()
}
/// The physical schema ([`SchemaRef`]) of this table at this version.
///
/// When column mapping is disabled, this is identical to
/// [`logical_schema`](Self::logical_schema). Otherwise, field names are replaced with
/// physical column names derived from column mapping metadata.
#[internal_api]
pub(crate) fn physical_schema(&self) -> SchemaRef {
self.physical_schemas.full.clone()
}
/// The physical schema for writing data files.
///
/// When [`MaterializePartitionColumns`] is enabled, returns the full physical schema
/// (partition columns are materialized in data files). Otherwise, returns the physical
/// schema with partition columns excluded.
///
/// [`MaterializePartitionColumns`]: crate::table_features::TableFeature::MaterializePartitionColumns
pub(crate) fn physical_write_schema(&self) -> SchemaRef {
if self.is_feature_enabled(&TableFeature::MaterializePartitionColumns) {
self.physical_schema()
} else {
self.physical_data_schema_without_partition_columns()
}
}
/// The [`TableProperties`] of this table at this version.
#[internal_api]
pub(crate) fn table_properties(&self) -> &TableProperties {
&self.table_properties
}
/// Whether this table is catalog-managed (has the CatalogManaged or CatalogOwnedPreview
/// table feature).
#[internal_api]
pub(crate) fn is_catalog_managed(&self) -> bool {
self.is_feature_supported(&TableFeature::CatalogManaged)
|| self.is_feature_supported(&TableFeature::CatalogOwnedPreview)
}
/// The [`ColumnMappingMode`] for this table at this version.
#[internal_api]
pub(crate) fn column_mapping_mode(&self) -> ColumnMappingMode {
self.column_mapping_mode
}
/// The partition columns of this table (empty if non-partitioned)
#[internal_api]
pub(crate) fn partition_columns(&self) -> &[String] {
self.metadata().partition_columns()
}
/// The [`Url`] of the table this [`TableConfiguration`] belongs to
#[internal_api]
pub(crate) fn table_root(&self) -> &Url {
&self.table_root
}
/// The [`Version`] which this [`TableConfiguration`] belongs to.
#[internal_api]
pub(crate) fn version(&self) -> Version {
self.version
}
/// Validates that all feature requirements for a given feature are satisfied.
fn validate_feature_requirements(&self, feature: &TableFeature) -> DeltaResult<()> {
for req in feature.info().feature_requirements {
match req {
FeatureRequirement::Supported(dep) => {
require!(
self.is_feature_supported(dep),
Error::invalid_protocol(format!(
"Feature '{feature}' requires '{dep}' to be supported"
))
);
}
FeatureRequirement::Enabled(dep) => {
require!(
self.is_feature_enabled(dep),
Error::invalid_protocol(format!(
"Feature '{feature}' requires '{dep}' to be enabled"
))
);
}
FeatureRequirement::NotSupported(dep) => {
require!(
!self.is_feature_supported(dep),
Error::invalid_protocol(format!(
"Feature '{feature}' requires '{dep}' to not be supported"
))
);
}
FeatureRequirement::NotEnabled(dep) => {
require!(
!self.is_feature_enabled(dep),
Error::invalid_protocol(format!(
"Feature '{feature}' requires '{dep}' to not be enabled"
))
);
}
FeatureRequirement::Custom(check) => {
check(&self.protocol, &self.table_properties)?;
}
}
}
Ok(())
}
/// Checks that kernel supports a feature for the given operation.
/// Returns an error if the feature is unknown, not supported, or fails validation.
fn check_feature_support(
&self,
feature: &TableFeature,
operation: Operation,
) -> DeltaResult<()> {
let info = feature.info();
match &info.kernel_support {
KernelSupport::Supported => {}
KernelSupport::NotSupported => {
return Err(Error::unsupported(format!(
"Feature '{feature}' is not supported"
)))
}
KernelSupport::Custom(check) => {
check(&self.protocol, &self.table_properties, operation)?;
}
};
self.validate_feature_requirements(feature)
}
/// Returns all reader features enabled for this table based on protocol version.
/// For table features protocol (v3), returns the explicit reader_features list.
/// For legacy protocol (v1-2), infers features from the version number.
fn get_enabled_reader_features(&self) -> Vec<TableFeature> {
match self.protocol.min_reader_version() {
TABLE_FEATURES_MIN_READER_VERSION => {
// Table features reader: use explicit reader_features list
self.protocol
.reader_features()
.map(|f| f.to_vec())
.unwrap_or_default()
}
v if (1..=2).contains(&v) => {
// Legacy reader: infer features from version
LEGACY_READER_FEATURES
.iter()
.filter(|f| f.is_valid_for_legacy_reader(v))
.cloned()
.collect()
}
_ => Vec::new(),
}
}
/// Returns all writer features enabled for this table based on protocol version.
/// For table features protocol (v7), returns the explicit writer_features list.
/// For legacy protocol (v1-6), infers features from the version number.
fn get_enabled_writer_features(&self) -> Vec<TableFeature> {
match self.protocol.min_writer_version() {
TABLE_FEATURES_MIN_WRITER_VERSION => {
// Table features writer: use explicit writer_features list
self.protocol
.writer_features()
.map(|f| f.to_vec())
.unwrap_or_default()
}
v if (1..=6).contains(&v) => {
// Legacy writer: infer features from version
LEGACY_WRITER_FEATURES
.iter()
.filter(|f| f.is_valid_for_legacy_writer(v))
.cloned()
.collect()
}
_ => Vec::new(),
}
}
/// Returns `Ok` if the kernel supports the given operation on this table. This checks that
/// the protocol's features are all supported for the requested operation type.
///
/// - For `Scan` and `Cdf` operations: checks reader version and reader features
/// - For `Write` operations: checks writer version and writer features
#[internal_api]
pub(crate) fn ensure_operation_supported(&self, operation: Operation) -> DeltaResult<()> {
match operation {
Operation::Scan | Operation::Cdf => self.ensure_read_supported(operation),
Operation::Write => self.ensure_write_supported(),
}
}
/// Internal helper for read operations (Scan, Cdf)
fn ensure_read_supported(&self, operation: Operation) -> DeltaResult<()> {
require!(
self.protocol.min_reader_version() >= MIN_VALID_RW_VERSION,
Error::InvalidProtocol(format!(
"min_reader_version must be >= {MIN_VALID_RW_VERSION}, got {}",
self.protocol.min_reader_version()
))
);
// Version check: kernel supports reader versions 1..=MAX_VALID_READER_VERSION
if self.protocol.min_reader_version() > MAX_VALID_READER_VERSION {
return Err(Error::unsupported(format!(
"Unsupported minimum reader version {}",
self.protocol.min_reader_version()
)));
}
// Check all enabled reader features have kernel support
for feature in self.get_enabled_reader_features() {
self.check_feature_support(&feature, operation)?;
}
Ok(())
}
/// Internal helper for write operations
fn ensure_write_supported(&self) -> DeltaResult<()> {
// Version check: kernel supports writer versions
// MIN_VALID_RW_VERSION..=MAX_VALID_WRITER_VERSION
require!(
self.protocol.min_writer_version() >= MIN_VALID_RW_VERSION,
Error::InvalidProtocol(format!(
"min_writer_version must be >= {MIN_VALID_RW_VERSION}, got {}",
self.protocol.min_writer_version()
))
);
// Version check: kernel supports writer versions 1..=MAX_VALID_WRITER_VERSION
if self.protocol.min_writer_version() > MAX_VALID_WRITER_VERSION {
return Err(Error::unsupported(format!(
"Unsupported minimum writer version {}",
self.protocol.min_writer_version()
)));
}
// Check all enabled writer features have kernel support
for feature in self.get_enabled_writer_features() {
self.check_feature_support(&feature, Operation::Write)?;
}
// Schema-dependent validation for Invariants (can't be in FeatureInfo)
// TODO: Better story for schema validation for Invariants and other features
if self.is_feature_supported(&TableFeature::Invariants)
&& schema_has_invariants(self.logical_schema.as_ref())
{
return Err(Error::unsupported(
"Column invariants are not yet supported",
));
}
Ok(())
}
/// Returns information about in-commit timestamp enablement state.
///
/// Returns an error if only one of the enablement properties is present, as this indicates
/// an inconsistent state.
#[allow(unused)]
pub(crate) fn in_commit_timestamp_enablement(
&self,
) -> DeltaResult<InCommitTimestampEnablement> {
if !self.is_feature_enabled(&TableFeature::InCommitTimestamp) {
return Ok(InCommitTimestampEnablement::NotEnabled);
}
let enablement_version = self
.table_properties()
.in_commit_timestamp_enablement_version;
let enablement_timestamp = self
.table_properties()
.in_commit_timestamp_enablement_timestamp;
match (enablement_version, enablement_timestamp) {
(Some(version), Some(timestamp)) => Ok(InCommitTimestampEnablement::Enabled {
enablement: Some((version, timestamp)),
}),
(Some(_), None) => Err(Error::generic(
"In-commit timestamp enabled, but enablement timestamp is missing",
)),
(None, Some(_)) => Err(Error::generic(
"In-commit timestamp enabled, but enablement version is missing",
)),
// If InCommitTimestamps was enabled at the beginning of the table's history,
// it may have an empty enablement version and timestamp
(None, None) => Ok(InCommitTimestampEnablement::Enabled { enablement: None }),
}
}
/// Returns `true` if row tracking is suspended for this table.
///
/// Row tracking is suspended when the `delta.rowTrackingSuspended` table property is set to
/// `true`. Note that:
/// - Row tracking can be _supported_ and _suspended_ at the same time.
/// - Row tracking cannot be _enabled_ while _suspended_.
pub(crate) fn is_row_tracking_suspended(&self) -> bool {
self.table_properties()
.row_tracking_suspended
.unwrap_or(false)
}
/// Returns `true` if row tracking information should be written for this table.
///
/// Row tracking information should be written when:
/// - Row tracking is supported
/// - Row tracking is not suspended
///
/// Note: We ignore [`is_row_tracking_enabled`] at this point because Kernel does not
/// preserve row IDs and row commit versions yet.
pub(crate) fn should_write_row_tracking(&self) -> bool {
self.is_feature_supported(&TableFeature::RowTracking) && !self.is_row_tracking_suspended()
}
/// Returns true if the protocol uses legacy reader version (< 3)
#[allow(dead_code)]
fn is_legacy_reader_version(&self) -> bool {
self.protocol.min_reader_version() < TABLE_FEATURES_MIN_READER_VERSION
}
/// Returns true if the protocol uses legacy writer version (< 7)
#[allow(dead_code)]
fn is_legacy_writer_version(&self) -> bool {
self.protocol.min_writer_version() < TABLE_FEATURES_MIN_WRITER_VERSION
}
/// Helper to check if a feature is present in a feature list.
fn has_feature(features: Option<&[TableFeature]>, feature: &TableFeature) -> bool {
features
.map(|features| features.contains(feature))
.unwrap_or(false)
}
/// Helper method to check if a feature is supported.
/// This checks protocol versions and feature lists but does NOT check enablement properties.
#[internal_api]
pub(crate) fn is_feature_supported(&self, feature: &TableFeature) -> bool {
let info = feature.info();
let min_legacy_version = info.min_legacy_version.as_ref();
let min_reader_version =
min_legacy_version.map_or(TABLE_FEATURES_MIN_READER_VERSION, |v| v.reader);
let min_writer_version =
min_legacy_version.map_or(TABLE_FEATURES_MIN_WRITER_VERSION, |v| v.writer);
match info.feature_type {
FeatureType::WriterOnly => {
if self.is_legacy_writer_version() {
// Legacy writer: protocol writer version meets minimum requirement
self.protocol.min_writer_version() >= min_writer_version
} else {
// Table features writer: feature is in writer_features list
Self::has_feature(self.protocol.writer_features(), feature)
}
}
FeatureType::ReaderWriter => {
let reader_supported = if self.is_legacy_reader_version() {
// Legacy reader: protocol reader version meets minimum requirement
self.protocol.min_reader_version() >= min_reader_version
} else {
// Table features reader: feature is in reader_features list
Self::has_feature(self.protocol.reader_features(), feature)
};
let writer_supported = if self.is_legacy_writer_version() {
// Legacy writer: protocol writer version meets minimum requirement
self.protocol.min_writer_version() >= min_writer_version
} else {
// Table features writer: feature is in writer_features list
Self::has_feature(self.protocol.writer_features(), feature)
};
reader_supported && writer_supported
}
FeatureType::Unknown => Self::has_feature(self.protocol.writer_features(), feature),
}
}
/// Generic method to check if a feature is enabled.
///
/// A feature is enabled if:
/// 1. It is supported in the protocol
/// 2. The enablement check passes
#[internal_api]
pub(crate) fn is_feature_enabled(&self, feature: &TableFeature) -> bool {
if !self.is_feature_supported(feature) {
return false;
}
match feature.info().enablement_check {
EnablementCheck::AlwaysIfSupported => true,
EnablementCheck::EnabledIf(check_fn) => check_fn(&self.table_properties),
}
}
}
#[cfg(test)]
mod test {
use std::collections::HashMap;
use std::sync::Arc;
use rstest::rstest;
use url::Url;
use super::{InCommitTimestampEnablement, TableConfiguration};
use crate::actions::{Metadata, Protocol};
use crate::schema::{ColumnName, DataType, SchemaRef, StructField, StructType};
use crate::table_features::{
ColumnMappingMode, FeatureType, Operation, TableFeature, TABLE_FEATURES_MIN_READER_VERSION,
TABLE_FEATURES_MIN_WRITER_VERSION,
};
use crate::table_properties::{
TableProperties, COLUMN_MAPPING_MODE, ENABLE_IN_COMMIT_TIMESTAMPS,
};
use crate::utils::test_utils::{
assert_result_error_with_message, test_schema_flat, test_schema_flat_with_column_mapping,
test_schema_nested, test_schema_nested_with_column_mapping, test_schema_with_array,
test_schema_with_array_and_column_mapping, test_schema_with_map,
test_schema_with_map_and_column_mapping,
};
use crate::Error;
fn create_mock_table_config(
props_to_enable: &[(&str, &str)],
features: &[TableFeature],
) -> TableConfiguration {
create_mock_table_config_with_version(
props_to_enable,
Some(features),
TABLE_FEATURES_MIN_READER_VERSION,
TABLE_FEATURES_MIN_WRITER_VERSION,
)
}
fn create_mock_table_config_with_version(
props_to_enable: &[(&str, &str)],
features_opt: Option<&[TableFeature]>,
min_reader_version: i32,
min_writer_version: i32,
) -> TableConfiguration {
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(
None,
None,
schema,
vec![],
0,
HashMap::from_iter(
props_to_enable
.iter()
.map(|(key, value)| (key.to_string(), value.to_string())),
),
)
.unwrap();
let (reader_features_opt, writer_features_opt) = if let Some(features) = features_opt {
// This helper only handles known features. Unknown features would need
// explicit placement on reader vs writer lists.
assert!(
features
.iter()
.all(|f| f.feature_type() != FeatureType::Unknown),
"Test helper does not support unknown features"
);
let reader_features = features
.iter()
.filter(|f| f.feature_type() == FeatureType::ReaderWriter);
(
// Only add reader_features if reader >= 3 (non-legacy reader mode)
(min_reader_version >= TABLE_FEATURES_MIN_READER_VERSION)
.then_some(reader_features),
// Only add writer_features if writer >= 7 (non-legacy writer mode)
(min_writer_version >= TABLE_FEATURES_MIN_WRITER_VERSION).then_some(features),
)
} else {
(None, None)
};
let protocol = Protocol::try_new(
min_reader_version,
min_writer_version,
reader_features_opt,
writer_features_opt,
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap()
}
#[test]
fn dv_supported_not_enabled() {
use crate::table_properties::ENABLE_CHANGE_DATA_FEED;
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(
None,
None,
schema,
vec![],
0,
HashMap::from_iter([(ENABLE_CHANGE_DATA_FEED.to_string(), "true".to_string())]),
)
.unwrap();
let protocol = Protocol::try_new_modern(
[TableFeature::DeletionVectors],
[TableFeature::DeletionVectors, TableFeature::ChangeDataFeed],
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
assert!(table_config.is_feature_supported(&TableFeature::DeletionVectors));
assert!(!table_config.is_feature_enabled(&TableFeature::DeletionVectors));
}
#[test]
fn dv_enabled() {
use crate::table_properties::{ENABLE_CHANGE_DATA_FEED, ENABLE_DELETION_VECTORS};
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(
None,
None,
schema,
vec![],
0,
HashMap::from_iter([
(ENABLE_CHANGE_DATA_FEED.to_string(), "true".to_string()),
(ENABLE_DELETION_VECTORS.to_string(), "true".to_string()),
]),
)
.unwrap();
let protocol = Protocol::try_new_modern(
[TableFeature::DeletionVectors],
[TableFeature::DeletionVectors, TableFeature::ChangeDataFeed],
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
assert!(table_config.is_feature_supported(&TableFeature::DeletionVectors));
assert!(table_config.is_feature_enabled(&TableFeature::DeletionVectors));
}
#[rstest]
#[case(-1, 2, Operation::Scan)]
#[case(1, -1, Operation::Write)]
fn reject_protocol_version_below_minimum(
#[case] rv: i32,
#[case] wv: i32,
#[case] op: Operation,
) {
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap();
let protocol =
Protocol::new_unchecked(rv, wv, TableFeature::NO_LIST, TableFeature::NO_LIST);
let table_root = Url::try_from("file:///").unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
let expected = if rv < 1 {
format!("Invalid protocol action in the delta log: min_reader_version must be >= 1, got {rv}")
} else {
format!("Invalid protocol action in the delta log: min_writer_version must be >= 1, got {wv}")
};
assert_result_error_with_message(table_config.ensure_operation_supported(op), &expected);
}
#[test]
fn write_with_cdf() {
use TableFeature::*;
use crate::table_properties::{APPEND_ONLY, ENABLE_CHANGE_DATA_FEED};
let cases = [