Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,7 @@ impl Metadata {
/// # Errors
///
/// Returns an error if there are any metadata columns in the schema.
// TODO: remove allow(dead_code) after we use this API in CREATE TABLE, etc.
#[internal_api]
#[allow(dead_code)]
pub(crate) fn try_new(
name: Option<String>,
description: Option<String>,
Expand Down
4 changes: 0 additions & 4 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,6 @@ pub mod engine;
/// Delta table version is 8 byte unsigned int
pub type Version = u64;

/// Sentinel version indicating a pre-commit state (table does not exist yet).
/// Used for create-table transactions before the first commit.
pub const PRE_COMMIT_VERSION: Version = u64::MAX;

pub type FileSize = u64;
pub type FileIndex = u64;

Expand Down
57 changes: 37 additions & 20 deletions kernel/src/log_segment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _}
use crate::utils::require;
use crate::{
DeltaResult, Engine, Error, Expression, FileMeta, Predicate, PredicateRef, RowVisitor,
StorageHandler, Version, PRE_COMMIT_VERSION,
StorageHandler, Version,
};

mod domain_metadata_replay;
Expand Down Expand Up @@ -153,20 +153,45 @@ fn schema_to_is_not_null_predicate(schema: &StructType) -> Option<PredicateRef>
}

impl LogSegment {
/// Creates a synthetic LogSegment for pre-commit transactions (e.g., create-table).
/// The sentinel version PRE_COMMIT_VERSION indicates no version exists yet on disk.
/// This is used to construct a pre-commit snapshot that provides table configuration
/// (protocol, metadata, schema) for operations like CTAS.
#[allow(dead_code)] // Used by create_table module
pub(crate) fn for_pre_commit(log_root: Url) -> Self {
use crate::PRE_COMMIT_VERSION;
Self {
end_version: PRE_COMMIT_VERSION,
/// Creates a LogSegment for a newly created table at version 0 from a single commit file.
///
/// Normal log segments are built by listing files from storage and replaying them. For CREATE
/// TABLE, the table has no prior log. We construct the segment directly from the just created
/// commit file.
///
/// # Errors
///
/// Returns an `internal_error` if `commit_file` is not version 0 or not a commit file type.
pub(crate) fn new_for_version_zero(
Comment thread
william-ch-databricks marked this conversation as resolved.
log_root: Url,
commit_file: ParsedLogPath,
) -> DeltaResult<Self> {
require!(
commit_file.version == 0,
crate::Error::internal_error(format!(
"new_for_version_zero called with version {}",
commit_file.version
))
);
require!(
commit_file.is_commit(),
crate::Error::internal_error(format!(
"new_for_version_zero called with non-commit file type: {:?}",
commit_file.file_type
))
);
Ok(Self {
end_version: commit_file.version,
checkpoint_version: None,
log_root,
last_checkpoint_metadata: None,
listed: LogSegmentFiles::default(),
}
listed: LogSegmentFiles {
max_published_version: Some(commit_file.version),
latest_commit_file: Some(commit_file.clone()),
ascending_commit_files: vec![commit_file],
..Default::default()
},
})
}

#[internal_api]
Expand Down Expand Up @@ -1104,11 +1129,7 @@ impl LogSegment {
}

/// How many commits since a checkpoint, according to this log segment.
/// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION).
pub(crate) fn commits_since_checkpoint(&self) -> u64 {
if self.end_version == PRE_COMMIT_VERSION {
return 0;
}
// we can use 0 as the checkpoint version if there is no checkpoint since `end_version - 0`
// is the correct number of commits since a checkpoint if there are no checkpoints
let checkpoint_version = self.checkpoint_version.unwrap_or(0);
Expand All @@ -1117,11 +1138,7 @@ impl LogSegment {
}

/// How many commits since a log-compaction or checkpoint, according to this log segment.
/// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION).
pub(crate) fn commits_since_log_compaction_or_checkpoint(&self) -> u64 {
if self.end_version == PRE_COMMIT_VERSION {
return 0;
}
// Annoyingly we have to search all the compaction files to determine this, because we only
// sort by start version, so technically the max end version could be anywhere in the vec.
// We can return 0 in the case there is no compaction since end_version - 0 is the correct
Expand Down
26 changes: 26 additions & 0 deletions kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4485,3 +4485,29 @@ async fn read_actions_with_null_map_values(
}
assert!(found, "Should have found a {action_name} action batch");
}

#[test]
fn new_for_version_zero_creates_valid_log_segment() {
let log_root = Url::parse("memory:///_delta_log/").unwrap();
let commit_path = create_log_path("memory:///_delta_log/00000000000000000000.json");
let segment = super::LogSegment::new_for_version_zero(log_root.clone(), commit_path).unwrap();
assert_eq!(segment.end_version, 0);
assert_eq!(segment.log_root, log_root);
}

#[test]
fn new_for_version_zero_rejects_non_zero_version() {
let log_root = Url::parse("memory:///_delta_log/").unwrap();
let commit_path = create_log_path("memory:///_delta_log/00000000000000000001.json");
let err = super::LogSegment::new_for_version_zero(log_root, commit_path).unwrap_err();
assert!(err.to_string().contains("version"));
}

#[test]
fn new_for_version_zero_rejects_non_commit_file() {
let log_root = Url::parse("memory:///_delta_log/").unwrap();
let checkpoint_path =
create_log_path("memory:///_delta_log/00000000000000000000.checkpoint.parquet");
let err = super::LogSegment::new_for_version_zero(log_root, checkpoint_path).unwrap_err();
assert!(err.to_string().contains("non-commit"));
}
63 changes: 27 additions & 36 deletions kernel/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,6 @@ impl Snapshot {
SnapshotBuilder::new_from(existing_snapshot)
}

/// Create a new [`Snapshot`] from a [`LogSegment`] and [`TableConfiguration`].
#[internal_api]
pub(crate) fn new(log_segment: LogSegment, table_configuration: TableConfiguration) -> Self {
Self::new_with_crc(
log_segment,
table_configuration,
Arc::new(LazyCrc::new(None)),
)
}

/// Internal constructor that accepts an explicit [`LazyCrc`].
pub(crate) fn new_with_crc(
log_segment: LogSegment,
Expand Down Expand Up @@ -400,15 +390,11 @@ impl Snapshot {
/// producing a post-commit snapshot without a full log replay from storage.
///
/// The `crc_delta` captures the CRC-relevant changes from the committed transaction
/// (file stats, domain metadata, ICT, etc.). If the pre-commit snapshot had a loaded CRC
/// at its version, the delta is applied to produce a precomputed in-memory CRC for the new
/// version -- this CRC contains all important table metadata (protocol, metadata, domain
/// metadata, set transactions, ICT) and avoids re-reading them from storage. CREATE TABLE
/// always produces a CRC at v0. If no CRC was available on the pre-commit snapshot, the
/// existing lazy CRC is carried forward unchanged.
///
/// TODO: Handle Protocol changes in CrcDelta (when Kernel-RS supports protocol changes)
/// TODO: Handle Metadata changes in CrcDelta (when Kernel-RS supports metadata changes)
/// (file stats, domain metadata, ICT, etc.). If this snapshot had a loaded CRC at its
/// version, the delta is applied to produce a precomputed in-memory CRC for the new
/// version -- this avoids re-reading metadata from storage. If no CRC was available, the
/// existing lazy CRC is carried forward unchanged. CREATE TABLE handles CRC construction
/// separately in `Transaction::into_committed`.
pub(crate) fn new_post_commit(
&self,
commit: ParsedLogPath,
Expand All @@ -432,8 +418,12 @@ impl Snapshot {
))
);

let new_table_configuration =
TableConfiguration::new_post_commit(self.table_configuration(), new_version);
let new_table_configuration = TableConfiguration::new_post_commit(
self.table_configuration(),
new_version,
crc_delta.metadata.clone(),
crc_delta.protocol.clone(),
)?;

let new_log_segment = self.log_segment.new_with_commit_appended(commit)?;

Expand All @@ -448,21 +438,18 @@ impl Snapshot {

/// Compute the lazy CRC for a post-commit snapshot by applying a [`CrcDelta`].
///
/// For CREATE TABLE, builds a fresh CRC from the `crc_delta`. For existing tables, applies
/// the `crc_delta` to the current CRC if loaded, otherwise carries forward the existing lazy
/// CRC.
/// Applies the `crc_delta` to the current CRC if loaded, otherwise carries forward the
/// existing lazy CRC. Not used by CREATE TABLE, which builds its CRC from scratch via
/// `CrcDelta::into_crc_for_version_zero` in `Transaction::into_committed`.
fn compute_post_commit_crc(&self, new_version: Version, crc_delta: CrcDelta) -> Arc<LazyCrc> {
let crc = if self.version() == crate::PRE_COMMIT_VERSION {
crc_delta.into_crc_for_version_zero()
} else {
self.lazy_crc
.get_if_loaded_at_version(self.version())
.map(|base| {
let mut crc = base.as_ref().clone();
crc.apply(crc_delta);
crc
})
};
let crc = self
.lazy_crc
.get_if_loaded_at_version(self.version())
.map(|base| {
let mut crc = base.as_ref().clone();
crc.apply(crc_delta);
crc
});

match crc {
Some(c) => Arc::new(LazyCrc::new_precomputed(c, new_version)),
Expand Down Expand Up @@ -1297,7 +1284,11 @@ mod tests {
let log_segment =
LogSegment::try_new(listed_files, url.join("_delta_log/")?, Some(0), None)?;

Ok(Snapshot::new(log_segment, table_cfg))
Ok(Snapshot::new_with_crc(
log_segment,
table_cfg,
Arc::new(LazyCrc::new(None)),
))
}

#[test]
Expand Down
28 changes: 18 additions & 10 deletions kernel/src/table_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,25 @@ impl TableConfiguration {
/// Creates a new [`TableConfiguration`] representing the table configuration immediately
/// after a commit.
///
/// This method takes a pre-commit table configuration and produces a post-commit
/// configuration at the committed version. This allows immediate use of the new table
/// configuration without re-reading metadata from storage.
/// This method takes the current table configuration and produces a post-commit
/// configuration at the committed version. If the commit included new Protocol or Metadata
/// actions (e.g. ALTER TABLE), those are passed in and the configuration is rebuilt with
/// full validation. Otherwise the existing configuration is cloned with only the version
/// updated.
///
/// TODO: Take in Protocol (when Kernel-RS supports protocol changes)
/// TODO: Take in Metadata (when Kernel-RS supports metadata changes)
pub(crate) fn new_post_commit(table_configuration: &Self, new_version: Version) -> Self {
Self {
version: new_version,
..table_configuration.clone()
}
/// Returns the new [`TableConfiguration`] at `new_version`.
///
/// # Errors
///
/// Returns an error if the new metadata/protocol combination fails
/// [`TableConfiguration::try_new`] validation (e.g., unsupported features, invalid schema).
pub(crate) fn new_post_commit(
table_configuration: &Self,
new_version: Version,
new_metadata: Option<Metadata>,
new_protocol: Option<Protocol>,
) -> DeltaResult<Self> {
Self::try_new_from(table_configuration, new_metadata, new_protocol, new_version)
}

/// Generates the expected schema for file statistics.
Expand Down
15 changes: 5 additions & 10 deletions kernel/src/transaction/builder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ use crate::actions::{DomainMetadata, Metadata, Protocol};
use crate::clustering::{create_clustering_domain_metadata, validate_clustering_columns};
use crate::committer::Committer;
use crate::expressions::ColumnName;
use crate::log_segment::LogSegment;
use crate::schema::validation::validate_schema_for_create;
use crate::schema::variant_utils::schema_contains_variant_type;
use crate::schema::{normalize_column_names_to_schema_casing, DataType, SchemaRef, StructType};
use crate::snapshot::Snapshot;
use crate::table_configuration::TableConfiguration;
use crate::table_features::{
assign_column_mapping_metadata, get_any_level_column_physical_name,
Expand All @@ -40,7 +38,7 @@ use crate::transaction::create_table::CreateTableTransaction;
use crate::transaction::data_layout::DataLayout;
use crate::transaction::Transaction;
use crate::utils::{current_time_ms, try_parse_uri};
use crate::{DeltaResult, Engine, Error, StorageHandler, PRE_COMMIT_VERSION};
use crate::{DeltaResult, Engine, Error, StorageHandler};

/// Table features allowed to be enabled via `delta.feature.*=supported` during CREATE TABLE.
///
Expand Down Expand Up @@ -815,15 +813,12 @@ impl CreateTableTransactionBuilder {
validated.properties,
)?;

// Create pre-commit snapshot from protocol/metadata
let log_root = table_url.join("_delta_log/")?;
let log_segment = LogSegment::for_pre_commit(log_root);
let table_configuration =
TableConfiguration::try_new(metadata, protocol, table_url, PRE_COMMIT_VERSION)?;
// Build TableConfiguration directly for the new table
let table_configuration = TableConfiguration::try_new(metadata, protocol, table_url, 0)?;

// Create Transaction<CreateTable> with pre-commit snapshot
// Create Transaction<CreateTable> with the effective table configuration
Transaction::try_new_create_table(
Arc::new(Snapshot::new(log_segment, table_configuration)),
table_configuration,
self.engine_info,
committer,
data_layout_result.system_domain_metadata,
Expand Down
20 changes: 9 additions & 11 deletions kernel/src/transaction/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::actions::DomainMetadata;
use crate::committer::Committer;
use crate::expressions::ColumnName;
use crate::schema::SchemaRef;
use crate::snapshot::SnapshotRef;
use crate::table_configuration::TableConfiguration;
use crate::transaction::{CreateTable, Transaction};
use crate::utils::current_time_ms;
use crate::DeltaResult;
Expand Down Expand Up @@ -133,31 +133,29 @@ impl CreateTableTransaction {
/// Create a new transaction for creating a new table. This is used when the table doesn't
/// exist yet and we need to create it with Protocol and Metadata actions.
///
/// The `pre_commit_snapshot` is a synthetic snapshot created from the protocol and metadata
/// that will be committed. It uses `PRE_COMMIT_VERSION` as a sentinel to indicate no
/// version exists yet on disk.
/// The `effective_table_config` is the table configuration that will be committed (protocol,
/// metadata, schema).
///
/// This is typically called via `CreateTableTransactionBuilder::build()` rather than directly.
pub(crate) fn try_new_create_table(
pre_commit_snapshot: SnapshotRef,
effective_table_config: TableConfiguration,
engine_info: String,
committer: Box<dyn Committer>,
system_domain_metadata: Vec<DomainMetadata>,
clustering_columns: Option<Vec<ColumnName>>,
) -> DeltaResult<Self> {
// TODO(sanuj) Today transactions expect a read snapshot to be passed in and we pass
// in the pre_commit_snapshot for CREATE. To support other operations such as ALTERs
// there might be cleaner alternatives which can clearly disambiguate b/w a snapshot
// the was read vs the effective snapshot we will use for the commit.
let span = tracing::info_span!(
"txn",
path = %pre_commit_snapshot.table_root(),
path = %effective_table_config.table_root(),
operation = "CREATE",
);

Ok(Transaction {
span,
read_snapshot: pre_commit_snapshot,
read_snapshot_opt: None,
effective_table_config,
should_emit_protocol: true,
should_emit_metadata: true,
committer,
operation: Some("CREATE TABLE".to_string()),
engine_info: Some(engine_info),
Expand Down
7 changes: 3 additions & 4 deletions kernel/src/transaction/domain_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ impl<S> Transaction<S> {
}

if !self
.read_snapshot
.table_configuration()
.effective_table_config
.is_feature_supported(&TableFeature::DomainMetadata)
{
return Err(Error::unsupported(
Expand Down Expand Up @@ -114,7 +113,7 @@ impl<S> Transaction<S> {
/// This prevents arbitrary `delta.*` domains from being added during table creation.
/// Each known system domain must have its corresponding feature enabled in the protocol.
fn validate_system_domain_feature(&self, domain: &str) -> DeltaResult<()> {
let table_config = self.read_snapshot.table_configuration();
let table_config = &self.effective_table_config;

// Map domain to its required feature
let required_feature = match domain {
Expand Down Expand Up @@ -162,7 +161,7 @@ impl<S> Transaction<S> {
.map(String::as_str)
.collect();
let existing_domains = self
.read_snapshot
.read_snapshot()?
.get_domain_metadatas_internal(engine, Some(&domains))?;

// Create removal tombstones with pre-image configurations
Expand Down
Loading
Loading