Skip to content

Commit b293bf7

Browse files
refactor: separate read state from effective state in Transaction
Split the Transaction struct's read_snapshot field into two concerns: - read_snapshot: Option<SnapshotRef> (None for CREATE TABLE) - effective_table_config: TableConfiguration (always the config this commit produces) - should_emit_protocol/should_emit_metadata flags for P&M action emission This cleanly separates "what version did I read?" (for conflict detection, post-commit snapshots, ICT monotonicity) from "what schema/protocol/metadata will this commit produce?" (for write context, stats, feature checks). Key changes: - 17 write-path call sites migrated to effective_table_config - 9 read-path call sites use read_snapshot() helper - Protocol/Metadata emission is now flag-based instead of is_create_table() - CREATE TABLE no longer creates a synthetic pre-commit snapshot - LogSegment::new_for_version_zero replaces LogSegment::for_pre_commit - TableConfiguration::new_post_commit now accepts optional new P&M - CRC delta uses emit flags instead of is_create check This is a pure refactor with no behavior change. All existing tests pass.
1 parent ed6b22f commit b293bf7

11 files changed

Lines changed: 246 additions & 191 deletions

File tree

kernel/src/actions/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,9 +243,7 @@ impl Metadata {
243243
/// # Errors
244244
///
245245
/// Returns an error if there are any metadata columns in the schema.
246-
// TODO: remove allow(dead_code) after we use this API in CREATE TABLE, etc.
247246
#[internal_api]
248-
#[allow(dead_code)]
249247
pub(crate) fn try_new(
250248
name: Option<String>,
251249
description: Option<String>,

kernel/src/lib.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,6 @@ pub mod engine;
194194
/// Delta table version is 8 byte unsigned int
195195
pub type Version = u64;
196196

197-
/// Sentinel version indicating a pre-commit state (table does not exist yet).
198-
/// Used for create-table transactions before the first commit.
199-
pub const PRE_COMMIT_VERSION: Version = u64::MAX;
200-
201197
pub type FileSize = u64;
202198
pub type FileIndex = u64;
203199

kernel/src/log_segment.rs

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _}
2222
use crate::utils::require;
2323
use crate::{
2424
DeltaResult, Engine, Error, Expression, FileMeta, Predicate, PredicateRef, RowVisitor,
25-
StorageHandler, Version, PRE_COMMIT_VERSION,
25+
StorageHandler, Version,
2626
};
2727
use delta_kernel_derive::internal_api;
2828

@@ -155,20 +155,38 @@ fn schema_to_is_not_null_predicate(schema: &StructType) -> Option<PredicateRef>
155155
}
156156

157157
impl LogSegment {
158-
/// Creates a synthetic LogSegment for pre-commit transactions (e.g., create-table).
159-
/// The sentinel version PRE_COMMIT_VERSION indicates no version exists yet on disk.
160-
/// This is used to construct a pre-commit snapshot that provides table configuration
161-
/// (protocol, metadata, schema) for operations like CTAS.
162-
#[allow(dead_code)] // Used by create_table module
163-
pub(crate) fn for_pre_commit(log_root: Url) -> Self {
164-
use crate::PRE_COMMIT_VERSION;
165-
Self {
166-
end_version: PRE_COMMIT_VERSION,
158+
/// Creates a LogSegment for a newly created table at version 0.
159+
/// The `commit_file` is the parsed commit file for version 0.
160+
pub(crate) fn new_for_version_zero(
161+
log_root: Url,
162+
commit_file: ParsedLogPath,
163+
) -> DeltaResult<Self> {
164+
require!(
165+
commit_file.version == 0,
166+
crate::Error::internal_error(format!(
167+
"new_for_version_zero called with version {}",
168+
commit_file.version
169+
))
170+
);
171+
require!(
172+
commit_file.is_commit(),
173+
crate::Error::internal_error(format!(
174+
"new_for_version_zero called with non-commit file type: {:?}",
175+
commit_file.file_type
176+
))
177+
);
178+
Ok(Self {
179+
end_version: commit_file.version,
167180
checkpoint_version: None,
168181
log_root,
169182
last_checkpoint_metadata: None,
170-
listed: LogSegmentFiles::default(),
171-
}
183+
listed: LogSegmentFiles {
184+
max_published_version: Some(commit_file.version),
185+
latest_commit_file: Some(commit_file.clone()),
186+
ascending_commit_files: vec![commit_file],
187+
..Default::default()
188+
},
189+
})
172190
}
173191

174192
#[internal_api]
@@ -1100,11 +1118,7 @@ impl LogSegment {
11001118
}
11011119

11021120
/// How many commits since a checkpoint, according to this log segment.
1103-
/// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION).
11041121
pub(crate) fn commits_since_checkpoint(&self) -> u64 {
1105-
if self.end_version == PRE_COMMIT_VERSION {
1106-
return 0;
1107-
}
11081122
// we can use 0 as the checkpoint version if there is no checkpoint since `end_version - 0`
11091123
// is the correct number of commits since a checkpoint if there are no checkpoints
11101124
let checkpoint_version = self.checkpoint_version.unwrap_or(0);
@@ -1113,11 +1127,7 @@ impl LogSegment {
11131127
}
11141128

11151129
/// How many commits since a log-compaction or checkpoint, according to this log segment.
1116-
/// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION).
11171130
pub(crate) fn commits_since_log_compaction_or_checkpoint(&self) -> u64 {
1118-
if self.end_version == PRE_COMMIT_VERSION {
1119-
return 0;
1120-
}
11211131
// Annoyingly we have to search all the compaction files to determine this, because we only
11221132
// sort by start version, so technically the max end version could be anywhere in the vec.
11231133
// We can return 0 in the case there is no compaction since end_version - 0 is the correct

kernel/src/log_segment/tests.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4484,3 +4484,29 @@ async fn read_actions_with_null_map_values(
44844484
}
44854485
assert!(found, "Should have found a {action_name} action batch");
44864486
}
4487+
4488+
#[test]
4489+
fn new_for_version_zero_creates_valid_log_segment() {
4490+
let log_root = Url::parse("memory:///_delta_log/").unwrap();
4491+
let commit_path = create_log_path("memory:///_delta_log/00000000000000000000.json");
4492+
let segment = super::LogSegment::new_for_version_zero(log_root.clone(), commit_path).unwrap();
4493+
assert_eq!(segment.end_version, 0);
4494+
assert_eq!(segment.log_root, log_root);
4495+
}
4496+
4497+
#[test]
4498+
fn new_for_version_zero_rejects_non_zero_version() {
4499+
let log_root = Url::parse("memory:///_delta_log/").unwrap();
4500+
let commit_path = create_log_path("memory:///_delta_log/00000000000000000001.json");
4501+
let err = super::LogSegment::new_for_version_zero(log_root, commit_path).unwrap_err();
4502+
assert!(err.to_string().contains("version"));
4503+
}
4504+
4505+
#[test]
4506+
fn new_for_version_zero_rejects_non_commit_file() {
4507+
let log_root = Url::parse("memory:///_delta_log/").unwrap();
4508+
let checkpoint_path =
4509+
create_log_path("memory:///_delta_log/00000000000000000000.checkpoint.parquet");
4510+
let err = super::LogSegment::new_for_version_zero(log_root, checkpoint_path).unwrap_err();
4511+
assert!(err.to_string().contains("non-commit"));
4512+
}

kernel/src/snapshot.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -408,15 +408,12 @@ impl Snapshot {
408408
/// producing a post-commit snapshot without a full log replay from storage.
409409
///
410410
/// The `crc_delta` captures the CRC-relevant changes from the committed transaction
411-
/// (file stats, domain metadata, ICT, etc.). If the pre-commit snapshot had a loaded CRC
412-
/// at its version, the delta is applied to produce a precomputed in-memory CRC for the new
413-
/// version -- this CRC contains all important table metadata (protocol, metadata, domain
414-
/// metadata, set transactions, ICT) and avoids re-reading them from storage. CREATE TABLE
415-
/// always produces a CRC at v0. If no CRC was available on the pre-commit snapshot, the
416-
/// existing lazy CRC is carried forward unchanged.
411+
/// (file stats, domain metadata, ICT, etc.). If this snapshot had a loaded CRC at its
412+
/// version, the delta is applied to produce a precomputed in-memory CRC for the new
413+
/// version -- this avoids re-reading metadata from storage. If no CRC was available, the
414+
/// existing lazy CRC is carried forward unchanged. CREATE TABLE handles CRC construction
415+
/// separately in `Transaction::into_committed`.
417416
///
418-
/// TODO: Handle Protocol changes in CrcDelta (when Kernel-RS supports protocol changes)
419-
/// TODO: Handle Metadata changes in CrcDelta (when Kernel-RS supports metadata changes)
420417
pub(crate) fn new_post_commit(
421418
&self,
422419
commit: ParsedLogPath,
@@ -440,8 +437,12 @@ impl Snapshot {
440437
))
441438
);
442439

443-
let new_table_configuration =
444-
TableConfiguration::new_post_commit(self.table_configuration(), new_version);
440+
let new_table_configuration = TableConfiguration::new_post_commit(
441+
self.table_configuration(),
442+
new_version,
443+
crc_delta.metadata.clone(),
444+
crc_delta.protocol.clone(),
445+
)?;
445446

446447
let new_log_segment = self.log_segment.new_with_commit_appended(commit)?;
447448

@@ -456,20 +457,18 @@ impl Snapshot {
456457

457458
/// Compute the lazy CRC for a post-commit snapshot by applying a [`CrcDelta`].
458459
///
459-
/// For CREATE TABLE, builds a fresh CRC from the `crc_delta`. For existing tables, applies
460-
/// the `crc_delta` to the current CRC if loaded, otherwise carries forward the existing lazy CRC.
460+
/// Applies the `crc_delta` to the current CRC if loaded, otherwise carries forward the
461+
/// existing lazy CRC. Not used by CREATE TABLE, which builds its CRC from scratch via
462+
/// `CrcDelta::into_crc_for_version_zero` in `Transaction::into_committed`.
461463
fn compute_post_commit_crc(&self, new_version: Version, crc_delta: CrcDelta) -> Arc<LazyCrc> {
462-
let crc = if self.version() == crate::PRE_COMMIT_VERSION {
463-
crc_delta.into_crc_for_version_zero()
464-
} else {
465-
self.lazy_crc
466-
.get_if_loaded_at_version(self.version())
467-
.map(|base| {
468-
let mut crc = base.as_ref().clone();
469-
crc.apply(crc_delta);
470-
crc
471-
})
472-
};
464+
let crc = self
465+
.lazy_crc
466+
.get_if_loaded_at_version(self.version())
467+
.map(|base| {
468+
let mut crc = base.as_ref().clone();
469+
crc.apply(crc_delta);
470+
crc
471+
});
473472

474473
match crc {
475474
Some(c) => Arc::new(LazyCrc::new_precomputed(c, new_version)),

kernel/src/table_configuration.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -202,17 +202,18 @@ impl TableConfiguration {
202202
/// Creates a new [`TableConfiguration`] representing the table configuration immediately
203203
/// after a commit.
204204
///
205-
/// This method takes a pre-commit table configuration and produces a post-commit
206-
/// configuration at the committed version. This allows immediate use of the new table
207-
/// configuration without re-reading metadata from storage.
208-
///
209-
/// TODO: Take in Protocol (when Kernel-RS supports protocol changes)
210-
/// TODO: Take in Metadata (when Kernel-RS supports metadata changes)
211-
pub(crate) fn new_post_commit(table_configuration: &Self, new_version: Version) -> Self {
212-
Self {
213-
version: new_version,
214-
..table_configuration.clone()
215-
}
205+
/// This method takes the current table configuration and produces a post-commit
206+
/// configuration at the committed version. If the commit included new Protocol or Metadata
207+
/// actions (e.g. CREATE TABLE or ALTER TABLE), those are passed in and the configuration
208+
/// is rebuilt with full validation. Otherwise the existing configuration is cloned with
209+
/// only the version updated.
210+
pub(crate) fn new_post_commit(
211+
table_configuration: &Self,
212+
new_version: Version,
213+
new_metadata: Option<Metadata>,
214+
new_protocol: Option<Protocol>,
215+
) -> DeltaResult<Self> {
216+
Self::try_new_from(table_configuration, new_metadata, new_protocol, new_version)
216217
}
217218

218219
/// Generates the expected schema for file statistics.

kernel/src/transaction/builder/create_table.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@ use crate::actions::{DomainMetadata, Metadata, Protocol};
1616
use crate::clustering::{create_clustering_domain_metadata, validate_clustering_columns};
1717
use crate::committer::Committer;
1818
use crate::expressions::ColumnName;
19-
use crate::log_segment::LogSegment;
2019
use crate::schema::variant_utils::schema_contains_variant_type;
2120
use crate::schema::{normalize_column_names_to_schema_casing, DataType, SchemaRef, StructType};
22-
use crate::snapshot::Snapshot;
2321
use crate::table_configuration::TableConfiguration;
2422
use crate::table_features::{
2523
assign_column_mapping_metadata, get_any_level_column_physical_name,
@@ -37,7 +35,7 @@ use crate::transaction::create_table::CreateTableTransaction;
3735
use crate::transaction::data_layout::DataLayout;
3836
use crate::transaction::Transaction;
3937
use crate::utils::{current_time_ms, try_parse_uri};
40-
use crate::{DeltaResult, Engine, Error, StorageHandler, PRE_COMMIT_VERSION};
38+
use crate::{DeltaResult, Engine, Error, StorageHandler};
4139

4240
/// Table features allowed to be enabled via `delta.feature.*=supported` during CREATE TABLE.
4341
///
@@ -752,15 +750,12 @@ impl CreateTableTransactionBuilder {
752750
validated.properties,
753751
)?;
754752

755-
// Create pre-commit snapshot from protocol/metadata
756-
let log_root = table_url.join("_delta_log/")?;
757-
let log_segment = LogSegment::for_pre_commit(log_root);
758-
let table_configuration =
759-
TableConfiguration::try_new(metadata, protocol, table_url, PRE_COMMIT_VERSION)?;
753+
// Build TableConfiguration directly for the new table
754+
let table_configuration = TableConfiguration::try_new(metadata, protocol, table_url, 0)?;
760755

761-
// Create Transaction<CreateTable> with pre-commit snapshot
756+
// Create Transaction<CreateTable> with the effective table configuration
762757
Transaction::try_new_create_table(
763-
Arc::new(Snapshot::new(log_segment, table_configuration)),
758+
table_configuration,
764759
self.engine_info,
765760
committer,
766761
data_layout_result.system_domain_metadata,

kernel/src/transaction/create_table.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::actions::DomainMetadata;
3838
use crate::committer::Committer;
3939
use crate::expressions::ColumnName;
4040
use crate::schema::SchemaRef;
41-
use crate::snapshot::SnapshotRef;
41+
use crate::table_configuration::TableConfiguration;
4242
use crate::transaction::{CreateTable, Transaction};
4343
use crate::utils::current_time_ms;
4444
use crate::DeltaResult;
@@ -134,31 +134,29 @@ impl CreateTableTransaction {
134134
/// Create a new transaction for creating a new table. This is used when the table doesn't
135135
/// exist yet and we need to create it with Protocol and Metadata actions.
136136
///
137-
/// The `pre_commit_snapshot` is a synthetic snapshot created from the protocol and metadata
138-
/// that will be committed. It uses `PRE_COMMIT_VERSION` as a sentinel to indicate no
139-
/// version exists yet on disk.
137+
/// The `effective_table_config` is the table configuration that will be committed (protocol,
138+
/// metadata, schema).
140139
///
141140
/// This is typically called via `CreateTableTransactionBuilder::build()` rather than directly.
142141
pub(crate) fn try_new_create_table(
143-
pre_commit_snapshot: SnapshotRef,
142+
effective_table_config: TableConfiguration,
144143
engine_info: String,
145144
committer: Box<dyn Committer>,
146145
system_domain_metadata: Vec<DomainMetadata>,
147146
clustering_columns: Option<Vec<ColumnName>>,
148147
) -> DeltaResult<Self> {
149-
// TODO(sanuj) Today transactions expect a read snapshot to be passed in and we pass
150-
// in the pre_commit_snapshot for CREATE. To support other operations such as ALTERs
151-
// there might be cleaner alternatives which can clearly disambiguate b/w a snapshot
152-
// the was read vs the effective snapshot we will use for the commit.
153148
let span = tracing::info_span!(
154149
"txn",
155-
path = %pre_commit_snapshot.table_root(),
150+
path = %effective_table_config.table_root(),
156151
operation = "CREATE",
157152
);
158153

159154
Ok(Transaction {
160155
span,
161-
read_snapshot: pre_commit_snapshot,
156+
read_snapshot_opt: None,
157+
effective_table_config,
158+
should_emit_protocol: true,
159+
should_emit_metadata: true,
162160
committer,
163161
operation: Some("CREATE TABLE".to_string()),
164162
engine_info: Some(engine_info),

kernel/src/transaction/domain_metadata.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ impl<S> Transaction<S> {
2929
}
3030

3131
if !self
32-
.read_snapshot
33-
.table_configuration()
32+
.effective_table_config
3433
.is_feature_supported(&TableFeature::DomainMetadata)
3534
{
3635
return Err(Error::unsupported(
@@ -114,7 +113,7 @@ impl<S> Transaction<S> {
114113
/// This prevents arbitrary `delta.*` domains from being added during table creation.
115114
/// Each known system domain must have its corresponding feature enabled in the protocol.
116115
fn validate_system_domain_feature(&self, domain: &str) -> DeltaResult<()> {
117-
let table_config = self.read_snapshot.table_configuration();
116+
let table_config = &self.effective_table_config;
118117

119118
// Map domain to its required feature
120119
let required_feature = match domain {
@@ -162,7 +161,7 @@ impl<S> Transaction<S> {
162161
.map(String::as_str)
163162
.collect();
164163
let existing_domains = self
165-
.read_snapshot
164+
.read_snapshot()?
166165
.get_domain_metadatas_internal(engine, Some(&domains))?;
167166

168167
// Create removal tombstones with pre-image configurations

0 commit comments

Comments
 (0)