From 029d6672081d32caea314dca61f3688e841f3036 Mon Sep 17 00:00:00 2001 From: William Chen Date: Fri, 17 Apr 2026 21:10:48 +0000 Subject: [PATCH] refactor: separate read state from effective state in Transaction --- kernel/src/actions/mod.rs | 2 - kernel/src/lib.rs | 4 - kernel/src/log_segment/mod.rs | 57 +++-- kernel/src/log_segment/tests.rs | 26 ++ kernel/src/snapshot/mod.rs | 63 +++-- kernel/src/table_configuration.rs | 28 ++- .../src/transaction/builder/create_table.rs | 15 +- kernel/src/transaction/create_table.rs | 20 +- kernel/src/transaction/domain_metadata.rs | 7 +- kernel/src/transaction/mod.rs | 222 ++++++++++-------- kernel/src/transaction/update.rs | 10 +- kernel/tests/write.rs | 6 + 12 files changed, 263 insertions(+), 197 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index d0b4ad9279..a7d76921ee 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -242,9 +242,7 @@ impl Metadata { /// # Errors /// /// Returns an error if there are any metadata columns in the schema. - // TODO: remove allow(dead_code) after we use this API in CREATE TABLE, etc. #[internal_api] - #[allow(dead_code)] pub(crate) fn try_new( name: Option, description: Option, diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 92777197ce..01f5f43c1c 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -198,10 +198,6 @@ pub mod engine; /// Delta table version is 8 byte unsigned int pub type Version = u64; -/// Sentinel version indicating a pre-commit state (table does not exist yet). -/// Used for create-table transactions before the first commit. -pub const PRE_COMMIT_VERSION: Version = u64::MAX; - pub type FileSize = u64; pub type FileIndex = u64; diff --git a/kernel/src/log_segment/mod.rs b/kernel/src/log_segment/mod.rs index df653bd4f0..c02bc0e762 100644 --- a/kernel/src/log_segment/mod.rs +++ b/kernel/src/log_segment/mod.rs @@ -29,7 +29,7 @@ use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _} use crate::utils::require; use crate::{ DeltaResult, Engine, Error, Expression, FileMeta, Predicate, PredicateRef, RowVisitor, - StorageHandler, Version, PRE_COMMIT_VERSION, + StorageHandler, Version, }; mod domain_metadata_replay; @@ -153,20 +153,45 @@ fn schema_to_is_not_null_predicate(schema: &StructType) -> Option } impl LogSegment { - /// Creates a synthetic LogSegment for pre-commit transactions (e.g., create-table). - /// The sentinel version PRE_COMMIT_VERSION indicates no version exists yet on disk. - /// This is used to construct a pre-commit snapshot that provides table configuration - /// (protocol, metadata, schema) for operations like CTAS. - #[allow(dead_code)] // Used by create_table module - pub(crate) fn for_pre_commit(log_root: Url) -> Self { - use crate::PRE_COMMIT_VERSION; - Self { - end_version: PRE_COMMIT_VERSION, + /// Creates a LogSegment for a newly created table at version 0 from a single commit file. + /// + /// Normal log segments are built by listing files from storage and replaying them. For CREATE + /// TABLE, the table has no prior log. We construct the segment directly from the just created + /// commit file. + /// + /// # Errors + /// + /// Returns an `internal_error` if `commit_file` is not version 0 or not a commit file type. + pub(crate) fn new_for_version_zero( + log_root: Url, + commit_file: ParsedLogPath, + ) -> DeltaResult { + require!( + commit_file.version == 0, + crate::Error::internal_error(format!( + "new_for_version_zero called with version {}", + commit_file.version + )) + ); + require!( + commit_file.is_commit(), + crate::Error::internal_error(format!( + "new_for_version_zero called with non-commit file type: {:?}", + commit_file.file_type + )) + ); + Ok(Self { + end_version: commit_file.version, checkpoint_version: None, log_root, last_checkpoint_metadata: None, - listed: LogSegmentFiles::default(), - } + listed: LogSegmentFiles { + max_published_version: Some(commit_file.version), + latest_commit_file: Some(commit_file.clone()), + ascending_commit_files: vec![commit_file], + ..Default::default() + }, + }) } #[internal_api] @@ -1104,11 +1129,7 @@ impl LogSegment { } /// How many commits since a checkpoint, according to this log segment. - /// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION). pub(crate) fn commits_since_checkpoint(&self) -> u64 { - if self.end_version == PRE_COMMIT_VERSION { - return 0; - } // we can use 0 as the checkpoint version if there is no checkpoint since `end_version - 0` // is the correct number of commits since a checkpoint if there are no checkpoints let checkpoint_version = self.checkpoint_version.unwrap_or(0); @@ -1117,11 +1138,7 @@ impl LogSegment { } /// How many commits since a log-compaction or checkpoint, according to this log segment. - /// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION). pub(crate) fn commits_since_log_compaction_or_checkpoint(&self) -> u64 { - if self.end_version == PRE_COMMIT_VERSION { - return 0; - } // Annoyingly we have to search all the compaction files to determine this, because we only // sort by start version, so technically the max end version could be anywhere in the vec. // We can return 0 in the case there is no compaction since end_version - 0 is the correct diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index a8c31ca0a4..d963623cf8 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -4485,3 +4485,29 @@ async fn read_actions_with_null_map_values( } assert!(found, "Should have found a {action_name} action batch"); } + +#[test] +fn new_for_version_zero_creates_valid_log_segment() { + let log_root = Url::parse("memory:///_delta_log/").unwrap(); + let commit_path = create_log_path("memory:///_delta_log/00000000000000000000.json"); + let segment = super::LogSegment::new_for_version_zero(log_root.clone(), commit_path).unwrap(); + assert_eq!(segment.end_version, 0); + assert_eq!(segment.log_root, log_root); +} + +#[test] +fn new_for_version_zero_rejects_non_zero_version() { + let log_root = Url::parse("memory:///_delta_log/").unwrap(); + let commit_path = create_log_path("memory:///_delta_log/00000000000000000001.json"); + let err = super::LogSegment::new_for_version_zero(log_root, commit_path).unwrap_err(); + assert!(err.to_string().contains("version")); +} + +#[test] +fn new_for_version_zero_rejects_non_commit_file() { + let log_root = Url::parse("memory:///_delta_log/").unwrap(); + let checkpoint_path = + create_log_path("memory:///_delta_log/00000000000000000000.checkpoint.parquet"); + let err = super::LogSegment::new_for_version_zero(log_root, checkpoint_path).unwrap_err(); + assert!(err.to_string().contains("non-commit")); +} diff --git a/kernel/src/snapshot/mod.rs b/kernel/src/snapshot/mod.rs index ab356f95fe..fc26c92acb 100644 --- a/kernel/src/snapshot/mod.rs +++ b/kernel/src/snapshot/mod.rs @@ -129,16 +129,6 @@ impl Snapshot { SnapshotBuilder::new_from(existing_snapshot) } - /// Create a new [`Snapshot`] from a [`LogSegment`] and [`TableConfiguration`]. - #[internal_api] - pub(crate) fn new(log_segment: LogSegment, table_configuration: TableConfiguration) -> Self { - Self::new_with_crc( - log_segment, - table_configuration, - Arc::new(LazyCrc::new(None)), - ) - } - /// Internal constructor that accepts an explicit [`LazyCrc`]. pub(crate) fn new_with_crc( log_segment: LogSegment, @@ -400,15 +390,11 @@ impl Snapshot { /// producing a post-commit snapshot without a full log replay from storage. /// /// The `crc_delta` captures the CRC-relevant changes from the committed transaction - /// (file stats, domain metadata, ICT, etc.). If the pre-commit snapshot had a loaded CRC - /// at its version, the delta is applied to produce a precomputed in-memory CRC for the new - /// version -- this CRC contains all important table metadata (protocol, metadata, domain - /// metadata, set transactions, ICT) and avoids re-reading them from storage. CREATE TABLE - /// always produces a CRC at v0. If no CRC was available on the pre-commit snapshot, the - /// existing lazy CRC is carried forward unchanged. - /// - /// TODO: Handle Protocol changes in CrcDelta (when Kernel-RS supports protocol changes) - /// TODO: Handle Metadata changes in CrcDelta (when Kernel-RS supports metadata changes) + /// (file stats, domain metadata, ICT, etc.). If this snapshot had a loaded CRC at its + /// version, the delta is applied to produce a precomputed in-memory CRC for the new + /// version -- this avoids re-reading metadata from storage. If no CRC was available, the + /// existing lazy CRC is carried forward unchanged. CREATE TABLE handles CRC construction + /// separately in `Transaction::into_committed`. pub(crate) fn new_post_commit( &self, commit: ParsedLogPath, @@ -432,8 +418,12 @@ impl Snapshot { )) ); - let new_table_configuration = - TableConfiguration::new_post_commit(self.table_configuration(), new_version); + let new_table_configuration = TableConfiguration::new_post_commit( + self.table_configuration(), + new_version, + crc_delta.metadata.clone(), + crc_delta.protocol.clone(), + )?; let new_log_segment = self.log_segment.new_with_commit_appended(commit)?; @@ -448,21 +438,18 @@ impl Snapshot { /// Compute the lazy CRC for a post-commit snapshot by applying a [`CrcDelta`]. /// - /// For CREATE TABLE, builds a fresh CRC from the `crc_delta`. For existing tables, applies - /// the `crc_delta` to the current CRC if loaded, otherwise carries forward the existing lazy - /// CRC. + /// Applies the `crc_delta` to the current CRC if loaded, otherwise carries forward the + /// existing lazy CRC. Not used by CREATE TABLE, which builds its CRC from scratch via + /// `CrcDelta::into_crc_for_version_zero` in `Transaction::into_committed`. fn compute_post_commit_crc(&self, new_version: Version, crc_delta: CrcDelta) -> Arc { - let crc = if self.version() == crate::PRE_COMMIT_VERSION { - crc_delta.into_crc_for_version_zero() - } else { - self.lazy_crc - .get_if_loaded_at_version(self.version()) - .map(|base| { - let mut crc = base.as_ref().clone(); - crc.apply(crc_delta); - crc - }) - }; + let crc = self + .lazy_crc + .get_if_loaded_at_version(self.version()) + .map(|base| { + let mut crc = base.as_ref().clone(); + crc.apply(crc_delta); + crc + }); match crc { Some(c) => Arc::new(LazyCrc::new_precomputed(c, new_version)), @@ -1297,7 +1284,11 @@ mod tests { let log_segment = LogSegment::try_new(listed_files, url.join("_delta_log/")?, Some(0), None)?; - Ok(Snapshot::new(log_segment, table_cfg)) + Ok(Snapshot::new_with_crc( + log_segment, + table_cfg, + Arc::new(LazyCrc::new(None)), + )) } #[test] diff --git a/kernel/src/table_configuration.rs b/kernel/src/table_configuration.rs index 6bd2e2eeea..eaf1a85ed5 100644 --- a/kernel/src/table_configuration.rs +++ b/kernel/src/table_configuration.rs @@ -202,17 +202,25 @@ impl TableConfiguration { /// Creates a new [`TableConfiguration`] representing the table configuration immediately /// after a commit. /// - /// This method takes a pre-commit table configuration and produces a post-commit - /// configuration at the committed version. This allows immediate use of the new table - /// configuration without re-reading metadata from storage. + /// This method takes the current table configuration and produces a post-commit + /// configuration at the committed version. If the commit included new Protocol or Metadata + /// actions (e.g. ALTER TABLE), those are passed in and the configuration is rebuilt with + /// full validation. Otherwise the existing configuration is cloned with only the version + /// updated. /// - /// TODO: Take in Protocol (when Kernel-RS supports protocol changes) - /// TODO: Take in Metadata (when Kernel-RS supports metadata changes) - pub(crate) fn new_post_commit(table_configuration: &Self, new_version: Version) -> Self { - Self { - version: new_version, - ..table_configuration.clone() - } + /// Returns the new [`TableConfiguration`] at `new_version`. + /// + /// # Errors + /// + /// Returns an error if the new metadata/protocol combination fails + /// [`TableConfiguration::try_new`] validation (e.g., unsupported features, invalid schema). + pub(crate) fn new_post_commit( + table_configuration: &Self, + new_version: Version, + new_metadata: Option, + new_protocol: Option, + ) -> DeltaResult { + Self::try_new_from(table_configuration, new_metadata, new_protocol, new_version) } /// Generates the expected schema for file statistics. diff --git a/kernel/src/transaction/builder/create_table.rs b/kernel/src/transaction/builder/create_table.rs index 034801da8b..b4088c2a0a 100644 --- a/kernel/src/transaction/builder/create_table.rs +++ b/kernel/src/transaction/builder/create_table.rs @@ -17,11 +17,9 @@ use crate::actions::{DomainMetadata, Metadata, Protocol}; use crate::clustering::{create_clustering_domain_metadata, validate_clustering_columns}; use crate::committer::Committer; use crate::expressions::ColumnName; -use crate::log_segment::LogSegment; use crate::schema::validation::validate_schema_for_create; use crate::schema::variant_utils::schema_contains_variant_type; use crate::schema::{normalize_column_names_to_schema_casing, DataType, SchemaRef, StructType}; -use crate::snapshot::Snapshot; use crate::table_configuration::TableConfiguration; use crate::table_features::{ assign_column_mapping_metadata, get_any_level_column_physical_name, @@ -40,7 +38,7 @@ use crate::transaction::create_table::CreateTableTransaction; use crate::transaction::data_layout::DataLayout; use crate::transaction::Transaction; use crate::utils::{current_time_ms, try_parse_uri}; -use crate::{DeltaResult, Engine, Error, StorageHandler, PRE_COMMIT_VERSION}; +use crate::{DeltaResult, Engine, Error, StorageHandler}; /// Table features allowed to be enabled via `delta.feature.*=supported` during CREATE TABLE. /// @@ -815,15 +813,12 @@ impl CreateTableTransactionBuilder { validated.properties, )?; - // Create pre-commit snapshot from protocol/metadata - let log_root = table_url.join("_delta_log/")?; - let log_segment = LogSegment::for_pre_commit(log_root); - let table_configuration = - TableConfiguration::try_new(metadata, protocol, table_url, PRE_COMMIT_VERSION)?; + // Build TableConfiguration directly for the new table + let table_configuration = TableConfiguration::try_new(metadata, protocol, table_url, 0)?; - // Create Transaction with pre-commit snapshot + // Create Transaction with the effective table configuration Transaction::try_new_create_table( - Arc::new(Snapshot::new(log_segment, table_configuration)), + table_configuration, self.engine_info, committer, data_layout_result.system_domain_metadata, diff --git a/kernel/src/transaction/create_table.rs b/kernel/src/transaction/create_table.rs index e725bab09e..42ab363b2d 100644 --- a/kernel/src/transaction/create_table.rs +++ b/kernel/src/transaction/create_table.rs @@ -40,7 +40,7 @@ use crate::actions::DomainMetadata; use crate::committer::Committer; use crate::expressions::ColumnName; use crate::schema::SchemaRef; -use crate::snapshot::SnapshotRef; +use crate::table_configuration::TableConfiguration; use crate::transaction::{CreateTable, Transaction}; use crate::utils::current_time_ms; use crate::DeltaResult; @@ -133,31 +133,29 @@ impl CreateTableTransaction { /// Create a new transaction for creating a new table. This is used when the table doesn't /// exist yet and we need to create it with Protocol and Metadata actions. /// - /// The `pre_commit_snapshot` is a synthetic snapshot created from the protocol and metadata - /// that will be committed. It uses `PRE_COMMIT_VERSION` as a sentinel to indicate no - /// version exists yet on disk. + /// The `effective_table_config` is the table configuration that will be committed (protocol, + /// metadata, schema). /// /// This is typically called via `CreateTableTransactionBuilder::build()` rather than directly. pub(crate) fn try_new_create_table( - pre_commit_snapshot: SnapshotRef, + effective_table_config: TableConfiguration, engine_info: String, committer: Box, system_domain_metadata: Vec, clustering_columns: Option>, ) -> DeltaResult { - // TODO(sanuj) Today transactions expect a read snapshot to be passed in and we pass - // in the pre_commit_snapshot for CREATE. To support other operations such as ALTERs - // there might be cleaner alternatives which can clearly disambiguate b/w a snapshot - // the was read vs the effective snapshot we will use for the commit. let span = tracing::info_span!( "txn", - path = %pre_commit_snapshot.table_root(), + path = %effective_table_config.table_root(), operation = "CREATE", ); Ok(Transaction { span, - read_snapshot: pre_commit_snapshot, + read_snapshot_opt: None, + effective_table_config, + should_emit_protocol: true, + should_emit_metadata: true, committer, operation: Some("CREATE TABLE".to_string()), engine_info: Some(engine_info), diff --git a/kernel/src/transaction/domain_metadata.rs b/kernel/src/transaction/domain_metadata.rs index 870fea86ed..a6a20352c6 100644 --- a/kernel/src/transaction/domain_metadata.rs +++ b/kernel/src/transaction/domain_metadata.rs @@ -28,8 +28,7 @@ impl Transaction { } if !self - .read_snapshot - .table_configuration() + .effective_table_config .is_feature_supported(&TableFeature::DomainMetadata) { return Err(Error::unsupported( @@ -114,7 +113,7 @@ impl Transaction { /// This prevents arbitrary `delta.*` domains from being added during table creation. /// Each known system domain must have its corresponding feature enabled in the protocol. fn validate_system_domain_feature(&self, domain: &str) -> DeltaResult<()> { - let table_config = self.read_snapshot.table_configuration(); + let table_config = &self.effective_table_config; // Map domain to its required feature let required_feature = match domain { @@ -162,7 +161,7 @@ impl Transaction { .map(String::as_str) .collect(); let existing_domains = self - .read_snapshot + .read_snapshot()? .get_domain_metadatas_internal(engine, Some(&domains))?; // Create removal tombstones with pre-image configurations diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index fd3a8bab23..a8c582a80e 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -14,11 +14,12 @@ use crate::actions::{ use crate::committer::{ CommitMetadata, CommitProtocolMetadata, CommitResponse, CommitType, Committer, }; -use crate::crc::{CrcDelta, FileStatsDelta}; +use crate::crc::{CrcDelta, FileStatsDelta, LazyCrc}; use crate::engine_data::FilteredEngineData; use crate::error::Error; use crate::expressions::UnaryExpressionOp::ToJson; use crate::expressions::{ArrayData, ColumnName, Scalar, Transform}; +use crate::log_segment::LogSegment; use crate::partition::serialization::serialize_partition_value; use crate::partition::validation::validate_partition_values; use crate::path::{LogRoot, ParsedLogPath}; @@ -30,12 +31,13 @@ use crate::scan::log_replay::{ }; use crate::scan::scan_row_schema; use crate::schema::{ArrayType, MapType, SchemaRef, StructField, StructType, StructTypeBuilder}; -use crate::snapshot::SnapshotRef; +use crate::snapshot::{Snapshot, SnapshotRef}; +use crate::table_configuration::TableConfiguration; use crate::table_features::TableFeature; use crate::utils::require; use crate::{ DataType, DeltaResult, Engine, EngineData, Expression, FileMeta, IntoEngineData, RowVisitor, - Version, PRE_COMMIT_VERSION, + Version, }; #[cfg(feature = "internal-api")] @@ -197,9 +199,17 @@ pub struct CreateTable; /// ``` pub struct Transaction { span: tracing::Span, - // The snapshot this transaction is based on. For create-table transactions, - // this is a pre-commit snapshot with PRE_COMMIT_VERSION. - read_snapshot: SnapshotRef, + // The snapshot this transaction is based on. None for CREATE TABLE (no pre-existing table). + // Use `read_snapshot()` to access; it returns an error if None. + read_snapshot_opt: Option, + // The table configuration that this commit will produce. For writes that don't change the + // config, this is cloned from the read snapshot; when the config changes (e.g. schema + // evolution), it is constructed separately with the new schema/protocol. + effective_table_config: TableConfiguration, + // Whether to emit a Protocol action. True for CREATE TABLE and ALTER TABLE, false otherwise. + should_emit_protocol: bool, + // Whether to emit a Metadata action. True for CREATE TABLE and ALTER TABLE, false otherwise. + should_emit_metadata: bool, committer: Box, operation: Option, engine_info: Option, @@ -245,10 +255,9 @@ pub struct Transaction { impl std::fmt::Debug for Transaction { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let version_info = if self.is_create_table() { - "create_table".to_string() - } else { - format!("{}", self.read_snapshot.version()) + let version_info = match &self.read_snapshot_opt { + Some(snap) => format!("{}", snap.version()), + None => "create_table".to_string(), }; f.write_str(&format!( "Transaction {{ read_snapshot version: {}, engine_info: {} }}", @@ -348,8 +357,7 @@ impl Transaction { && self.data_change { let cdf_enabled = self - .read_snapshot - .table_configuration() + .effective_table_config .table_properties() .enable_change_data_feed .unwrap_or(false); @@ -384,26 +392,22 @@ impl Transaction { ); let commit_info_action = self.generate_commit_info(engine, kernel_commit_info); - // Step 3: Generate Protocol and Metadata actions for create-table - let (protocol_action, metadata_action, protocol, metadata) = if self.is_create_table() { - let table_config = self.read_snapshot.table_configuration(); - let protocol = table_config.protocol().clone(); - let metadata = table_config.metadata().clone(); - - let protocol_schema = get_commit_schema().project(&[PROTOCOL_NAME])?; - let metadata_schema = get_commit_schema().project(&[METADATA_NAME])?; - - let protocol_data = protocol.clone().into_engine_data(protocol_schema, engine)?; - let metadata_data = metadata.clone().into_engine_data(metadata_schema, engine)?; - - ( - Some(protocol_data), - Some(metadata_data), - Some(protocol), - Some(metadata), - ) + // Step 3: Generate Protocol and Metadata actions based on emit flags + let (protocol_action, protocol) = if self.should_emit_protocol { + let protocol = self.effective_table_config.protocol().clone(); + let schema = get_commit_schema().project(&[PROTOCOL_NAME])?; + let action = protocol.clone().into_engine_data(schema, engine)?; + (Some(action), Some(protocol)) } else { - (None, None, None, None) + (None, None) + }; + let (metadata_action, metadata) = if self.should_emit_metadata { + let metadata = self.effective_table_config.metadata().clone(); + let schema = get_commit_schema().project(&[METADATA_NAME])?; + let action = metadata.clone().into_engine_data(schema, engine)?; + (Some(action), Some(metadata)) + } else { + (None, None) }; // Step 4: Generate add actions and get data for domain metadata actions (e.g. row tracking @@ -453,8 +457,9 @@ impl Transaction { { Ok(CommitResponse::Committed { file_meta }) => { let bin_boundaries = self - .read_snapshot - .get_file_stats_if_loaded() + .read_snapshot_opt + .as_ref() + .and_then(|snap| snap.get_file_stats_if_loaded()) .and_then(|s| s.file_size_histogram) .map(|h| h.sorted_bin_boundaries); let crc_delta = self.build_crc_delta( @@ -600,19 +605,23 @@ impl Transaction { new_metadata: Option, domain_metadata_changes: Vec, ) -> DeltaResult { - let log_root = LogRoot::new(self.read_snapshot.table_root().clone())?; - let table_config = self.read_snapshot.table_configuration(); + let log_root = LogRoot::new(self.effective_table_config.table_root().clone())?; let is_create = self.is_create_table(); - let commit_type = Self::determine_commit_type(is_create, table_config); + let commit_type = Self::determine_commit_type(is_create, &self.effective_table_config); Self::validate_commit_type(self.committer.is_catalog_committer(), &commit_type)?; - // For create-table: read P&M is None (no previous table), new P&M is set. - // For existing table: read P&M is from the snapshot, new P&M is None. - let (read_protocol, read_metadata) = if is_create { - (None, None) + // For create-table: previous P&M is None (no prior table), new P&M is set. + // For existing table with metadata change: previous P&M is from snapshot, new P&M + // is from effective config. + // For existing table without metadata change: previous P&M is from snapshot, new is None. + let (read_protocol, read_metadata, max_published_version) = if is_create { + (None, None, None) } else { + let snap = self.read_snapshot()?; + let read_config = snap.table_configuration(); ( - Some(table_config.protocol().clone()), - Some(table_config.metadata().clone()), + Some(read_config.protocol().clone()), + Some(read_config.metadata().clone()), + snap.log_segment().listed.max_published_version, ) }; let protocol_metadata = CommitProtocolMetadata::try_new( @@ -626,10 +635,7 @@ impl Transaction { commit_version, commit_type, in_commit_timestamp.unwrap_or(self.commit_timestamp), - self.read_snapshot - .log_segment() - .listed - .max_published_version, + max_published_version, protocol_metadata, domain_metadata_changes, )) @@ -671,15 +677,21 @@ impl Transaction { } /// Returns true if this is a create-table transaction. - /// A create-table transaction has operation "CREATE TABLE" and a pre-commit snapshot - /// with PRE_COMMIT_VERSION. + /// A create-table transaction has no read snapshot (no pre-existing table). fn is_create_table(&self) -> bool { - let is_create = self.operation.as_deref() == Some("CREATE TABLE"); debug_assert!( - !is_create || self.read_snapshot.version() == PRE_COMMIT_VERSION, - "CREATE TABLE transaction must have PRE_COMMIT_VERSION snapshot" + self.operation.as_deref() != Some("CREATE TABLE") || self.read_snapshot_opt.is_none(), + "CREATE TABLE operation should not have a read snapshot" ); - is_create + self.read_snapshot_opt.is_none() + } + + // Returns the read snapshot. Returns an error if this is a create-table transaction. + // To get the `Option` directly, use the `read_snapshot_opt` field. + fn read_snapshot(&self) -> DeltaResult<&Snapshot> { + self.read_snapshot_opt.as_deref().ok_or_else(|| { + Error::internal_error("read_snapshot() called on create-table transaction") + }) } /// Computes the in-commit timestamp for this transaction if ICT is enabled. @@ -688,8 +700,7 @@ impl Transaction { /// property must also be `true` (`is_feature_enabled`). fn get_in_commit_timestamp(&self, engine: &dyn Engine) -> DeltaResult> { let has_ict = self - .read_snapshot - .table_configuration() + .effective_table_config .is_feature_enabled(&TableFeature::InCommitTimestamp); if !has_ict { @@ -706,17 +717,19 @@ impl Transaction { // - The time at which the writer attempted the commit // - One millisecond later than the previous commit's inCommitTimestamp Ok(self - .read_snapshot + .read_snapshot()? .get_in_commit_timestamp(engine)? .map(|prev_ict| self.commit_timestamp.max(prev_ict + 1))) } /// Returns the commit version for this transaction. /// For existing table transactions, this is snapshot.version() + 1. - /// For create-table transactions (PRE_COMMIT_VERSION + 1 wraps to 0), this is 0. + /// For create-table transactions, this is 0. fn get_commit_version(&self) -> Version { - // PRE_COMMIT_VERSION (u64::MAX) + 1 wraps to 0, which is the correct first version - self.read_snapshot.version().wrapping_add(1) + match &self.read_snapshot_opt { + Some(snap) => snap.version() + 1, + None => 0, + } } /// The schema that the [`Engine`]'s [`ParquetHandler`] is expected to use when reporting @@ -769,9 +782,9 @@ impl Transaction { /// settings. #[allow(unused)] pub fn stats_schema(&self) -> DeltaResult { - let tc = self.read_snapshot.table_configuration(); - let stats_schemas = - tc.build_expected_stats_schemas(self.physical_clustering_columns.as_deref(), None)?; + let stats_schemas = self + .effective_table_config + .build_expected_stats_schemas(self.physical_clustering_columns.as_deref(), None)?; Ok(stats_schemas.physical) } @@ -788,23 +801,17 @@ impl Transaction { /// regardless of `dataSkippingStatsColumns` or `dataSkippingNumIndexedCols` settings. #[allow(unused)] pub fn stats_columns(&self) -> Vec { - self.read_snapshot - .table_configuration() + self.effective_table_config .physical_stats_column_names(self.physical_clustering_columns.as_deref()) } // Generate the logical-to-physical transform expression which must be evaluated on every data // chunk before writing. At the moment, this is a transaction-wide expression. fn generate_logical_to_physical(&self) -> Expression { - let partition_cols = self - .read_snapshot - .table_configuration() - .partition_columns() - .to_vec(); + let partition_cols = self.effective_table_config.partition_columns().to_vec(); // Check if materializePartitionColumns feature is enabled let materialize_partition_columns = self - .read_snapshot - .table_configuration() + .effective_table_config .is_feature_enabled(&TableFeature::MaterializePartitionColumns); // Build a Transform expression that drops partition columns from the input // (unless materializePartitionColumns is enabled). @@ -819,16 +826,16 @@ impl Transaction { /// Returns the logical partition column names for this table. pub fn logical_partition_columns(&self) -> &[String] { - self.read_snapshot.table_configuration().partition_columns() + self.effective_table_config.partition_columns() } /// Lazily builds and caches the [`SharedWriteState`] for this transaction. fn shared_write_state(&self) -> &Arc { self.shared_write_state.get_or_init(|| { - let table_config = self.read_snapshot.table_configuration(); + let table_config = &self.effective_table_config; Arc::new(SharedWriteState { - table_root: self.read_snapshot.table_root().clone(), - logical_schema: self.read_snapshot.schema(), + table_root: table_config.table_root().clone(), + logical_schema: table_config.logical_schema(), physical_schema: table_config.physical_write_schema(), logical_to_physical: Arc::new(self.generate_logical_to_physical()), column_mapping_mode: table_config.column_mapping_mode(), @@ -955,7 +962,7 @@ impl Transaction { } if let Some(ref clustering_cols) = self.physical_clustering_columns { if !clustering_cols.is_empty() { - let physical_schema = self.read_snapshot.table_configuration().physical_schema(); + let physical_schema = self.effective_table_config.physical_schema(); let columns_with_types: Vec<(ColumnName, DataType)> = clustering_cols .iter() .map(|col| { @@ -990,10 +997,7 @@ impl Transaction { )> { // Note: this does not require delta.enableRowTracking=true. "supported" is sufficient // for writers to assign row IDs. - let row_tracking_supported = self - .read_snapshot - .table_configuration() - .should_write_row_tracking(); + let row_tracking_supported = self.effective_table_config.should_write_row_tracking(); if self.add_files_metadata.is_empty() { // No files to add. For an empty CREATE TABLE with row tracking, emit the initial @@ -1040,7 +1044,7 @@ impl Transaction { let row_id_high_water_mark = if self.is_create_table() { None } else { - RowTrackingDomainMetadata::get_high_water_mark(&self.read_snapshot, engine)? + RowTrackingDomainMetadata::get_high_water_mark(self.read_snapshot()?, engine)? }; // Create a row tracking visitor and visit all files to collect row tracking information @@ -1101,23 +1105,46 @@ impl Transaction { let commit_version = parsed_commit.version; - let post_commit_stats = PostCommitStats { - commits_since_checkpoint: self.read_snapshot.log_segment().commits_since_checkpoint() - + 1, - commits_since_log_compaction: self - .read_snapshot - .log_segment() - .commits_since_log_compaction_or_checkpoint() - + 1, + let (post_commit_stats, post_commit_snapshot) = match &self.read_snapshot_opt { + Some(snap) => { + // Existing table path: use the read snapshot to compute post-commit state. + let stats = PostCommitStats { + commits_since_checkpoint: snap.log_segment().commits_since_checkpoint() + 1, + commits_since_log_compaction: snap + .log_segment() + .commits_since_log_compaction_or_checkpoint() + + 1, + }; + let snapshot = snap.new_post_commit(parsed_commit, crc_delta)?; + (stats, Arc::new(snapshot)) + } + None => { + // CREATE TABLE path: build a fresh Snapshot at version 0. + let log_root = self + .effective_table_config + .table_root() + .join("_delta_log/")?; + let log_segment = LogSegment::new_for_version_zero(log_root, parsed_commit)?; + let crc = crc_delta.into_crc_for_version_zero().ok_or_else(|| { + Error::internal_error("CREATE TABLE CRC delta is missing protocol or metadata") + })?; + let stats = PostCommitStats { + commits_since_checkpoint: 1, + commits_since_log_compaction: 1, + }; + let snapshot = Snapshot::new_with_crc( + log_segment, + self.effective_table_config, + Arc::new(LazyCrc::new_precomputed(crc, 0)), + ); + (stats, Arc::new(snapshot)) + } }; Ok(CommittedTransaction { commit_version, post_commit_stats, - post_commit_snapshot: Some(Arc::new( - self.read_snapshot - .new_post_commit(parsed_commit, crc_delta)?, - )), + post_commit_snapshot: Some(post_commit_snapshot), }) } @@ -1133,13 +1160,14 @@ impl Transaction { &self.remove_files_metadata, bin_boundaries, )?; - let is_create = self.is_create_table(); Ok(CrcDelta { file_stats, - protocol: is_create - .then(|| self.read_snapshot.table_configuration().protocol().clone()), - metadata: is_create - .then(|| self.read_snapshot.table_configuration().metadata().clone()), + protocol: self + .should_emit_protocol + .then(|| self.effective_table_config.protocol().clone()), + metadata: self + .should_emit_metadata + .then(|| self.effective_table_config.metadata().clone()), domain_metadata_changes: dm_changes, set_transaction_changes: self.set_transactions.clone(), in_commit_timestamp, diff --git a/kernel/src/transaction/update.rs b/kernel/src/transaction/update.rs index 4354a08ec2..7cd14ba674 100644 --- a/kernel/src/transaction/update.rs +++ b/kernel/src/transaction/update.rs @@ -71,9 +71,14 @@ impl Transaction { read_version = read_snapshot.version(), ); + let effective_table_config = read_snapshot.table_configuration().clone(); + Ok(Transaction { span, - read_snapshot, + read_snapshot_opt: Some(read_snapshot), + effective_table_config, + should_emit_protocol: false, + should_emit_metadata: false, committer, operation: None, engine_info: None, @@ -256,8 +261,7 @@ impl Transaction { )); } if !self - .read_snapshot - .table_configuration() + .effective_table_config .is_feature_supported(&TableFeature::DeletionVectors) { return Err(Error::unsupported( diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 9a0773340c..39a98ab6c4 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -3340,6 +3340,12 @@ async fn test_post_commit_snapshot_create_then_insert() -> DeltaResult<()> { let mut current_snapshot = match create_result { CommitResult::CommittedTransaction(committed) => { assert_eq!(committed.commit_version(), 0); + // CREATE TABLE is the first commit: 1 commit since last checkpoint/compaction + assert_eq!(committed.post_commit_stats().commits_since_checkpoint, 1); + assert_eq!( + committed.post_commit_stats().commits_since_log_compaction, + 1 + ); let post_snapshot = committed .post_commit_snapshot() .expect("should have post_commit_snapshot");