Skip to content

Commit 029d667

Browse files
refactor: separate read state from effective state in Transaction
1 parent 76eebdb commit 029d667

12 files changed

Lines changed: 263 additions & 197 deletions

File tree

kernel/src/actions/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,7 @@ impl Metadata {
242242
/// # Errors
243243
///
244244
/// Returns an error if there are any metadata columns in the schema.
245-
// TODO: remove allow(dead_code) after we use this API in CREATE TABLE, etc.
246245
#[internal_api]
247-
#[allow(dead_code)]
248246
pub(crate) fn try_new(
249247
name: Option<String>,
250248
description: Option<String>,

kernel/src/lib.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,10 +198,6 @@ pub mod engine;
198198
/// Delta table version is 8 byte unsigned int
199199
pub type Version = u64;
200200

201-
/// Sentinel version indicating a pre-commit state (table does not exist yet).
202-
/// Used for create-table transactions before the first commit.
203-
pub const PRE_COMMIT_VERSION: Version = u64::MAX;
204-
205201
pub type FileSize = u64;
206202
pub type FileIndex = u64;
207203

kernel/src/log_segment/mod.rs

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _}
2929
use crate::utils::require;
3030
use crate::{
3131
DeltaResult, Engine, Error, Expression, FileMeta, Predicate, PredicateRef, RowVisitor,
32-
StorageHandler, Version, PRE_COMMIT_VERSION,
32+
StorageHandler, Version,
3333
};
3434

3535
mod domain_metadata_replay;
@@ -153,20 +153,45 @@ fn schema_to_is_not_null_predicate(schema: &StructType) -> Option<PredicateRef>
153153
}
154154

155155
impl LogSegment {
156-
/// Creates a synthetic LogSegment for pre-commit transactions (e.g., create-table).
157-
/// The sentinel version PRE_COMMIT_VERSION indicates no version exists yet on disk.
158-
/// This is used to construct a pre-commit snapshot that provides table configuration
159-
/// (protocol, metadata, schema) for operations like CTAS.
160-
#[allow(dead_code)] // Used by create_table module
161-
pub(crate) fn for_pre_commit(log_root: Url) -> Self {
162-
use crate::PRE_COMMIT_VERSION;
163-
Self {
164-
end_version: PRE_COMMIT_VERSION,
156+
/// Creates a LogSegment for a newly created table at version 0 from a single commit file.
157+
///
158+
/// Normal log segments are built by listing files from storage and replaying them. For CREATE
159+
/// TABLE, the table has no prior log. We construct the segment directly from the just created
160+
/// commit file.
161+
///
162+
/// # Errors
163+
///
164+
/// Returns an `internal_error` if `commit_file` is not version 0 or not a commit file type.
165+
pub(crate) fn new_for_version_zero(
166+
log_root: Url,
167+
commit_file: ParsedLogPath,
168+
) -> DeltaResult<Self> {
169+
require!(
170+
commit_file.version == 0,
171+
crate::Error::internal_error(format!(
172+
"new_for_version_zero called with version {}",
173+
commit_file.version
174+
))
175+
);
176+
require!(
177+
commit_file.is_commit(),
178+
crate::Error::internal_error(format!(
179+
"new_for_version_zero called with non-commit file type: {:?}",
180+
commit_file.file_type
181+
))
182+
);
183+
Ok(Self {
184+
end_version: commit_file.version,
165185
checkpoint_version: None,
166186
log_root,
167187
last_checkpoint_metadata: None,
168-
listed: LogSegmentFiles::default(),
169-
}
188+
listed: LogSegmentFiles {
189+
max_published_version: Some(commit_file.version),
190+
latest_commit_file: Some(commit_file.clone()),
191+
ascending_commit_files: vec![commit_file],
192+
..Default::default()
193+
},
194+
})
170195
}
171196

172197
#[internal_api]
@@ -1104,11 +1129,7 @@ impl LogSegment {
11041129
}
11051130

11061131
/// How many commits since a checkpoint, according to this log segment.
1107-
/// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION).
11081132
pub(crate) fn commits_since_checkpoint(&self) -> u64 {
1109-
if self.end_version == PRE_COMMIT_VERSION {
1110-
return 0;
1111-
}
11121133
// we can use 0 as the checkpoint version if there is no checkpoint since `end_version - 0`
11131134
// is the correct number of commits since a checkpoint if there are no checkpoints
11141135
let checkpoint_version = self.checkpoint_version.unwrap_or(0);
@@ -1117,11 +1138,7 @@ impl LogSegment {
11171138
}
11181139

11191140
/// How many commits since a log-compaction or checkpoint, according to this log segment.
1120-
/// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION).
11211141
pub(crate) fn commits_since_log_compaction_or_checkpoint(&self) -> u64 {
1122-
if self.end_version == PRE_COMMIT_VERSION {
1123-
return 0;
1124-
}
11251142
// Annoyingly we have to search all the compaction files to determine this, because we only
11261143
// sort by start version, so technically the max end version could be anywhere in the vec.
11271144
// We can return 0 in the case there is no compaction since end_version - 0 is the correct

kernel/src/log_segment/tests.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4485,3 +4485,29 @@ async fn read_actions_with_null_map_values(
44854485
}
44864486
assert!(found, "Should have found a {action_name} action batch");
44874487
}
4488+
4489+
#[test]
4490+
fn new_for_version_zero_creates_valid_log_segment() {
4491+
let log_root = Url::parse("memory:///_delta_log/").unwrap();
4492+
let commit_path = create_log_path("memory:///_delta_log/00000000000000000000.json");
4493+
let segment = super::LogSegment::new_for_version_zero(log_root.clone(), commit_path).unwrap();
4494+
assert_eq!(segment.end_version, 0);
4495+
assert_eq!(segment.log_root, log_root);
4496+
}
4497+
4498+
#[test]
4499+
fn new_for_version_zero_rejects_non_zero_version() {
4500+
let log_root = Url::parse("memory:///_delta_log/").unwrap();
4501+
let commit_path = create_log_path("memory:///_delta_log/00000000000000000001.json");
4502+
let err = super::LogSegment::new_for_version_zero(log_root, commit_path).unwrap_err();
4503+
assert!(err.to_string().contains("version"));
4504+
}
4505+
4506+
#[test]
4507+
fn new_for_version_zero_rejects_non_commit_file() {
4508+
let log_root = Url::parse("memory:///_delta_log/").unwrap();
4509+
let checkpoint_path =
4510+
create_log_path("memory:///_delta_log/00000000000000000000.checkpoint.parquet");
4511+
let err = super::LogSegment::new_for_version_zero(log_root, checkpoint_path).unwrap_err();
4512+
assert!(err.to_string().contains("non-commit"));
4513+
}

kernel/src/snapshot/mod.rs

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,6 @@ impl Snapshot {
129129
SnapshotBuilder::new_from(existing_snapshot)
130130
}
131131

132-
/// Create a new [`Snapshot`] from a [`LogSegment`] and [`TableConfiguration`].
133-
#[internal_api]
134-
pub(crate) fn new(log_segment: LogSegment, table_configuration: TableConfiguration) -> Self {
135-
Self::new_with_crc(
136-
log_segment,
137-
table_configuration,
138-
Arc::new(LazyCrc::new(None)),
139-
)
140-
}
141-
142132
/// Internal constructor that accepts an explicit [`LazyCrc`].
143133
pub(crate) fn new_with_crc(
144134
log_segment: LogSegment,
@@ -400,15 +390,11 @@ impl Snapshot {
400390
/// producing a post-commit snapshot without a full log replay from storage.
401391
///
402392
/// The `crc_delta` captures the CRC-relevant changes from the committed transaction
403-
/// (file stats, domain metadata, ICT, etc.). If the pre-commit snapshot had a loaded CRC
404-
/// at its version, the delta is applied to produce a precomputed in-memory CRC for the new
405-
/// version -- this CRC contains all important table metadata (protocol, metadata, domain
406-
/// metadata, set transactions, ICT) and avoids re-reading them from storage. CREATE TABLE
407-
/// always produces a CRC at v0. If no CRC was available on the pre-commit snapshot, the
408-
/// existing lazy CRC is carried forward unchanged.
409-
///
410-
/// TODO: Handle Protocol changes in CrcDelta (when Kernel-RS supports protocol changes)
411-
/// TODO: Handle Metadata changes in CrcDelta (when Kernel-RS supports metadata changes)
393+
/// (file stats, domain metadata, ICT, etc.). If this snapshot had a loaded CRC at its
394+
/// version, the delta is applied to produce a precomputed in-memory CRC for the new
395+
/// version -- this avoids re-reading metadata from storage. If no CRC was available, the
396+
/// existing lazy CRC is carried forward unchanged. CREATE TABLE handles CRC construction
397+
/// separately in `Transaction::into_committed`.
412398
pub(crate) fn new_post_commit(
413399
&self,
414400
commit: ParsedLogPath,
@@ -432,8 +418,12 @@ impl Snapshot {
432418
))
433419
);
434420

435-
let new_table_configuration =
436-
TableConfiguration::new_post_commit(self.table_configuration(), new_version);
421+
let new_table_configuration = TableConfiguration::new_post_commit(
422+
self.table_configuration(),
423+
new_version,
424+
crc_delta.metadata.clone(),
425+
crc_delta.protocol.clone(),
426+
)?;
437427

438428
let new_log_segment = self.log_segment.new_with_commit_appended(commit)?;
439429

@@ -448,21 +438,18 @@ impl Snapshot {
448438

449439
/// Compute the lazy CRC for a post-commit snapshot by applying a [`CrcDelta`].
450440
///
451-
/// For CREATE TABLE, builds a fresh CRC from the `crc_delta`. For existing tables, applies
452-
/// the `crc_delta` to the current CRC if loaded, otherwise carries forward the existing lazy
453-
/// CRC.
441+
/// Applies the `crc_delta` to the current CRC if loaded, otherwise carries forward the
442+
/// existing lazy CRC. Not used by CREATE TABLE, which builds its CRC from scratch via
443+
/// `CrcDelta::into_crc_for_version_zero` in `Transaction::into_committed`.
454444
fn compute_post_commit_crc(&self, new_version: Version, crc_delta: CrcDelta) -> Arc<LazyCrc> {
455-
let crc = if self.version() == crate::PRE_COMMIT_VERSION {
456-
crc_delta.into_crc_for_version_zero()
457-
} else {
458-
self.lazy_crc
459-
.get_if_loaded_at_version(self.version())
460-
.map(|base| {
461-
let mut crc = base.as_ref().clone();
462-
crc.apply(crc_delta);
463-
crc
464-
})
465-
};
445+
let crc = self
446+
.lazy_crc
447+
.get_if_loaded_at_version(self.version())
448+
.map(|base| {
449+
let mut crc = base.as_ref().clone();
450+
crc.apply(crc_delta);
451+
crc
452+
});
466453

467454
match crc {
468455
Some(c) => Arc::new(LazyCrc::new_precomputed(c, new_version)),
@@ -1297,7 +1284,11 @@ mod tests {
12971284
let log_segment =
12981285
LogSegment::try_new(listed_files, url.join("_delta_log/")?, Some(0), None)?;
12991286

1300-
Ok(Snapshot::new(log_segment, table_cfg))
1287+
Ok(Snapshot::new_with_crc(
1288+
log_segment,
1289+
table_cfg,
1290+
Arc::new(LazyCrc::new(None)),
1291+
))
13011292
}
13021293

13031294
#[test]

kernel/src/table_configuration.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -202,17 +202,25 @@ impl TableConfiguration {
202202
/// Creates a new [`TableConfiguration`] representing the table configuration immediately
203203
/// after a commit.
204204
///
205-
/// This method takes a pre-commit table configuration and produces a post-commit
206-
/// configuration at the committed version. This allows immediate use of the new table
207-
/// configuration without re-reading metadata from storage.
205+
/// This method takes the current table configuration and produces a post-commit
206+
/// configuration at the committed version. If the commit included new Protocol or Metadata
207+
/// actions (e.g. ALTER TABLE), those are passed in and the configuration is rebuilt with
208+
/// full validation. Otherwise the existing configuration is cloned with only the version
209+
/// updated.
208210
///
209-
/// TODO: Take in Protocol (when Kernel-RS supports protocol changes)
210-
/// TODO: Take in Metadata (when Kernel-RS supports metadata changes)
211-
pub(crate) fn new_post_commit(table_configuration: &Self, new_version: Version) -> Self {
212-
Self {
213-
version: new_version,
214-
..table_configuration.clone()
215-
}
211+
/// Returns the new [`TableConfiguration`] at `new_version`.
212+
///
213+
/// # Errors
214+
///
215+
/// Returns an error if the new metadata/protocol combination fails
216+
/// [`TableConfiguration::try_new`] validation (e.g., unsupported features, invalid schema).
217+
pub(crate) fn new_post_commit(
218+
table_configuration: &Self,
219+
new_version: Version,
220+
new_metadata: Option<Metadata>,
221+
new_protocol: Option<Protocol>,
222+
) -> DeltaResult<Self> {
223+
Self::try_new_from(table_configuration, new_metadata, new_protocol, new_version)
216224
}
217225

218226
/// Generates the expected schema for file statistics.

kernel/src/transaction/builder/create_table.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@ use crate::actions::{DomainMetadata, Metadata, Protocol};
1717
use crate::clustering::{create_clustering_domain_metadata, validate_clustering_columns};
1818
use crate::committer::Committer;
1919
use crate::expressions::ColumnName;
20-
use crate::log_segment::LogSegment;
2120
use crate::schema::validation::validate_schema_for_create;
2221
use crate::schema::variant_utils::schema_contains_variant_type;
2322
use crate::schema::{normalize_column_names_to_schema_casing, DataType, SchemaRef, StructType};
24-
use crate::snapshot::Snapshot;
2523
use crate::table_configuration::TableConfiguration;
2624
use crate::table_features::{
2725
assign_column_mapping_metadata, get_any_level_column_physical_name,
@@ -40,7 +38,7 @@ use crate::transaction::create_table::CreateTableTransaction;
4038
use crate::transaction::data_layout::DataLayout;
4139
use crate::transaction::Transaction;
4240
use crate::utils::{current_time_ms, try_parse_uri};
43-
use crate::{DeltaResult, Engine, Error, StorageHandler, PRE_COMMIT_VERSION};
41+
use crate::{DeltaResult, Engine, Error, StorageHandler};
4442

4543
/// Table features allowed to be enabled via `delta.feature.*=supported` during CREATE TABLE.
4644
///
@@ -815,15 +813,12 @@ impl CreateTableTransactionBuilder {
815813
validated.properties,
816814
)?;
817815

818-
// Create pre-commit snapshot from protocol/metadata
819-
let log_root = table_url.join("_delta_log/")?;
820-
let log_segment = LogSegment::for_pre_commit(log_root);
821-
let table_configuration =
822-
TableConfiguration::try_new(metadata, protocol, table_url, PRE_COMMIT_VERSION)?;
816+
// Build TableConfiguration directly for the new table
817+
let table_configuration = TableConfiguration::try_new(metadata, protocol, table_url, 0)?;
823818

824-
// Create Transaction<CreateTable> with pre-commit snapshot
819+
// Create Transaction<CreateTable> with the effective table configuration
825820
Transaction::try_new_create_table(
826-
Arc::new(Snapshot::new(log_segment, table_configuration)),
821+
table_configuration,
827822
self.engine_info,
828823
committer,
829824
data_layout_result.system_domain_metadata,

kernel/src/transaction/create_table.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use crate::actions::DomainMetadata;
4040
use crate::committer::Committer;
4141
use crate::expressions::ColumnName;
4242
use crate::schema::SchemaRef;
43-
use crate::snapshot::SnapshotRef;
43+
use crate::table_configuration::TableConfiguration;
4444
use crate::transaction::{CreateTable, Transaction};
4545
use crate::utils::current_time_ms;
4646
use crate::DeltaResult;
@@ -133,31 +133,29 @@ impl CreateTableTransaction {
133133
/// Create a new transaction for creating a new table. This is used when the table doesn't
134134
/// exist yet and we need to create it with Protocol and Metadata actions.
135135
///
136-
/// The `pre_commit_snapshot` is a synthetic snapshot created from the protocol and metadata
137-
/// that will be committed. It uses `PRE_COMMIT_VERSION` as a sentinel to indicate no
138-
/// version exists yet on disk.
136+
/// The `effective_table_config` is the table configuration that will be committed (protocol,
137+
/// metadata, schema).
139138
///
140139
/// This is typically called via `CreateTableTransactionBuilder::build()` rather than directly.
141140
pub(crate) fn try_new_create_table(
142-
pre_commit_snapshot: SnapshotRef,
141+
effective_table_config: TableConfiguration,
143142
engine_info: String,
144143
committer: Box<dyn Committer>,
145144
system_domain_metadata: Vec<DomainMetadata>,
146145
clustering_columns: Option<Vec<ColumnName>>,
147146
) -> DeltaResult<Self> {
148-
// TODO(sanuj) Today transactions expect a read snapshot to be passed in and we pass
149-
// in the pre_commit_snapshot for CREATE. To support other operations such as ALTERs
150-
// there might be cleaner alternatives which can clearly disambiguate b/w a snapshot
151-
// the was read vs the effective snapshot we will use for the commit.
152147
let span = tracing::info_span!(
153148
"txn",
154-
path = %pre_commit_snapshot.table_root(),
149+
path = %effective_table_config.table_root(),
155150
operation = "CREATE",
156151
);
157152

158153
Ok(Transaction {
159154
span,
160-
read_snapshot: pre_commit_snapshot,
155+
read_snapshot_opt: None,
156+
effective_table_config,
157+
should_emit_protocol: true,
158+
should_emit_metadata: true,
161159
committer,
162160
operation: Some("CREATE TABLE".to_string()),
163161
engine_info: Some(engine_info),

kernel/src/transaction/domain_metadata.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ impl<S> Transaction<S> {
2828
}
2929

3030
if !self
31-
.read_snapshot
32-
.table_configuration()
31+
.effective_table_config
3332
.is_feature_supported(&TableFeature::DomainMetadata)
3433
{
3534
return Err(Error::unsupported(
@@ -114,7 +113,7 @@ impl<S> Transaction<S> {
114113
/// This prevents arbitrary `delta.*` domains from being added during table creation.
115114
/// Each known system domain must have its corresponding feature enabled in the protocol.
116115
fn validate_system_domain_feature(&self, domain: &str) -> DeltaResult<()> {
117-
let table_config = self.read_snapshot.table_configuration();
116+
let table_config = &self.effective_table_config;
118117

119118
// Map domain to its required feature
120119
let required_feature = match domain {
@@ -162,7 +161,7 @@ impl<S> Transaction<S> {
162161
.map(String::as_str)
163162
.collect();
164163
let existing_domains = self
165-
.read_snapshot
164+
.read_snapshot()?
166165
.get_domain_metadatas_internal(engine, Some(&domains))?;
167166

168167
// Create removal tombstones with pre-image configurations

0 commit comments

Comments
 (0)