diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/ETaggedLoadTableResponse.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/ETaggedLoadTableResponse.java new file mode 100644 index 0000000000..1b10d2b555 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/ETaggedLoadTableResponse.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.catalog.iceberg; + +import java.util.Optional; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +/** + * Pairs a {@link LoadTableResponse} with an optional entity tag derived from the response's + * metadata location. The handler is responsible for computing the etag (when possible); transport + * layers such as the REST adapter translate the etag into an HTTP {@code ETag} header. + */ +public record ETaggedLoadTableResponse(LoadTableResponse response, Optional etag) {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 780a85054f..1b53188f54 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -43,7 +43,6 @@ import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; import org.apache.iceberg.rest.requests.UpdateTableRequest; -import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.context.CallContext; @@ -56,7 +55,6 @@ import org.apache.polaris.service.catalog.api.IcebergRestConfigurationApiService; import org.apache.polaris.service.catalog.common.CatalogAdapter; import org.apache.polaris.service.config.ReservedProperties; -import org.apache.polaris.service.http.IcebergHttpUtil; import org.apache.polaris.service.http.IfNoneMatch; import org.apache.polaris.service.types.CommitTableRequest; import org.apache.polaris.service.types.CommitViewRequest; @@ -171,27 +169,12 @@ public Response loadNamespaceMetadata( } /** - * For situations where we typically expect a metadataLocation to be present in the response and - * so expect to insert an etag header, this helper gracefully falls back to omitting the header if - * unable to get metadata location and logs a warning. + * Translate an {@link ETaggedLoadTableResponse} produced by the handler into a JAX-RS response + * builder, attaching the handler-computed etag as an {@code ETag} HTTP header when present. */ - private Response.ResponseBuilder tryInsertETagHeader( - Response.ResponseBuilder builder, - LoadTableResponse response, - String namespace, - String tableName) { - if (response.metadataLocation() != null) { - builder = - builder.header( - HttpHeaders.ETAG, - IcebergHttpUtil.generateETagForMetadataFileLocation(response.metadataLocation())); - } else { - LOGGER - .atWarn() - .addKeyValue("namespace", namespace) - .addKeyValue("tableName", tableName) - .log("Response has null metadataLocation; omitting etag"); - } + private Response.ResponseBuilder toResponseBuilder(ETaggedLoadTableResponse tagged) { + Response.ResponseBuilder builder = Response.ok(tagged.response()); + tagged.etag().ifPresent(etag -> builder.header(HttpHeaders.ETAG, etag)); return builder; } @@ -282,11 +265,9 @@ public Response createTable( ns, createTableRequest, delegationModes, refreshCredentialsEndpoint)) .build(); } else { - LoadTableResponse response = - catalog.createTableDirect( - ns, createTableRequest, delegationModes, refreshCredentialsEndpoint); - return tryInsertETagHeader( - Response.ok(response), response, namespace, createTableRequest.name()) + return toResponseBuilder( + catalog.createTableDirect( + ns, createTableRequest, delegationModes, refreshCredentialsEndpoint)) .build(); } }); @@ -334,7 +315,7 @@ public Response loadTable( securityContext, prefix, catalog -> { - Optional response = + Optional response = catalog.loadTable( tableIdentifier, snapshots, @@ -346,8 +327,7 @@ public Response loadTable( return Response.notModified().build(); } - return tryInsertETagHeader(Response.ok(response.get()), response.get(), namespace, table) - .build(); + return toResponseBuilder(response.get()).build(); }); } @@ -415,12 +395,7 @@ public Response registerTable( return withCatalog( securityContext, prefix, - catalog -> { - LoadTableResponse response = catalog.registerTable(ns, registerTableRequest); - return tryInsertETagHeader( - Response.ok(response), response, namespace, registerTableRequest.name()) - .build(); - }); + catalog -> toResponseBuilder(catalog.registerTable(ns, registerTableRequest)).build()); } @Override diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 1a556eb5cc..9bf04efc9e 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -420,7 +420,8 @@ public ListTablesResponse listTables(Namespace namespace) { */ public LoadTableResponse createTableDirect(Namespace namespace, CreateTableRequest request) { return createTableDirect( - namespace, request, EnumSet.noneOf(AccessDelegationMode.class), Optional.empty()); + namespace, request, EnumSet.noneOf(AccessDelegationMode.class), Optional.empty()) + .response(); } /** @@ -435,7 +436,8 @@ public LoadTableResponse createTableDirectWithWriteDelegation( CreateTableRequest request, Optional refreshCredentialsEndpoint) { return createTableDirect( - namespace, request, EnumSet.of(VENDED_CREDENTIALS), refreshCredentialsEndpoint); + namespace, request, EnumSet.of(VENDED_CREDENTIALS), refreshCredentialsEndpoint) + .response(); } public void authorizeCreateTableDirect( @@ -456,7 +458,7 @@ public void authorizeCreateTableDirect( } } - public LoadTableResponse createTableDirect( + public ETaggedLoadTableResponse createTableDirect( Namespace namespace, CreateTableRequest request, EnumSet delegationModes, @@ -488,16 +490,18 @@ public LoadTableResponse createTableDirect( if (table instanceof BaseTable baseTable) { TableMetadata tableMetadata = baseTable.operations().current(); - return buildLoadTableResponseWithDelegationCredentials( - tableIdentifier, - tableMetadata, - resolvedMode, - Set.of( - PolarisStorageActions.READ, - PolarisStorageActions.WRITE, - PolarisStorageActions.LIST), - refreshCredentialsEndpoint) - .build(); + return withETag( + buildLoadTableResponseWithDelegationCredentials( + tableIdentifier, + tableMetadata, + resolvedMode, + Set.of( + PolarisStorageActions.READ, + PolarisStorageActions.WRITE, + PolarisStorageActions.LIST), + refreshCredentialsEndpoint) + .build(), + tableIdentifier); } else if (table instanceof BaseMetadataTable) { // metadata tables are loaded on the client side, return NoSuchTableException for now throw notFoundExceptionForTableLikeEntity( @@ -610,12 +614,13 @@ public LoadTableResponse createTableStaged( * @param request the register table request * @return ETagged {@link LoadTableResponse} to uniquely identify the table metadata */ - public LoadTableResponse registerTable(Namespace namespace, RegisterTableRequest request) { + public ETaggedLoadTableResponse registerTable(Namespace namespace, RegisterTableRequest request) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.REGISTER_TABLE; - authorizeCreateTableLikeUnderNamespaceOperationOrThrow( - op, TableIdentifier.of(namespace, request.name())); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, request.name()); + authorizeCreateTableLikeUnderNamespaceOperationOrThrow(op, tableIdentifier); - return catalogHandlerUtils().registerTable(baseCatalog, namespace, request); + return withETag( + catalogHandlerUtils().registerTable(baseCatalog, namespace, request), tableIdentifier); } public boolean sendNotification(TableIdentifier identifier, NotificationRequest request) { @@ -708,11 +713,12 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier, String snaps public Optional loadTableIfStale( TableIdentifier tableIdentifier, IfNoneMatch ifNoneMatch, String snapshots) { return loadTable( - tableIdentifier, - snapshots, - ifNoneMatch, - EnumSet.noneOf(AccessDelegationMode.class), - Optional.empty()); + tableIdentifier, + snapshots, + ifNoneMatch, + EnumSet.noneOf(AccessDelegationMode.class), + Optional.empty()) + .map(ETaggedLoadTableResponse::response); } public LoadTableResponse loadTableWithAccessDelegation( @@ -740,11 +746,12 @@ public Optional loadTableWithAccessDelegationIfStale( String snapshots, Optional refreshCredentialsEndpoint) { return loadTable( - tableIdentifier, - snapshots, - ifNoneMatch, - EnumSet.of(VENDED_CREDENTIALS), - refreshCredentialsEndpoint); + tableIdentifier, + snapshots, + ifNoneMatch, + EnumSet.of(VENDED_CREDENTIALS), + refreshCredentialsEndpoint) + .map(ETaggedLoadTableResponse::response); } /** @@ -852,7 +859,7 @@ private Set authorizeLoadTable( return actionsRequested; } - public Optional loadTable( + public Optional loadTable( TableIdentifier tableIdentifier, String snapshots, IfNoneMatch ifNoneMatch, @@ -897,7 +904,7 @@ public Optional loadTable( actionsRequested, refreshCredentialsEndpoint) .build(); - return Optional.of(filterResponseToSnapshots(response, snapshots)); + return Optional.of(withETag(filterResponseToSnapshots(response, snapshots), tableIdentifier)); } else if (table instanceof BaseMetadataTable) { // metadata tables are loaded on the client side, return NoSuchTableException for now throw notFoundExceptionForTableLikeEntity( @@ -907,6 +914,27 @@ public Optional loadTable( throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); } + /** + * Pair a {@link LoadTableResponse} with an entity tag derived from its metadata location. When + * the response has no metadata location (e.g. staged create or external catalogs that don't + * expose one) the etag is omitted and a warning is logged. + */ + private ETaggedLoadTableResponse withETag( + LoadTableResponse response, TableIdentifier tableIdentifier) { + if (response.metadataLocation() != null) { + return new ETaggedLoadTableResponse( + response, + Optional.of( + IcebergHttpUtil.generateETagForMetadataFileLocation(response.metadataLocation()))); + } + LOGGER + .atWarn() + .addKeyValue("namespace", tableIdentifier.namespace()) + .addKeyValue("tableName", tableIdentifier.name()) + .log("Response has null metadataLocation; omitting etag"); + return new ETaggedLoadTableResponse(response, Optional.empty()); + } + private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredentials( TableIdentifier tableIdentifier, TableMetadata tableMetadata, diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapterETagTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapterETagTest.java new file mode 100644 index 0000000000..790b969ef5 --- /dev/null +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapterETagTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.catalog.iceberg; + +import static org.apache.polaris.service.admin.PolarisAuthzTestBase.SCHEMA; +import static org.assertj.core.api.Assertions.assertThat; + +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.Response; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.RegisterTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.polaris.core.admin.model.Catalog; +import org.apache.polaris.core.admin.model.CatalogProperties; +import org.apache.polaris.core.admin.model.CreateCatalogRequest; +import org.apache.polaris.core.admin.model.FileStorageConfigInfo; +import org.apache.polaris.core.admin.model.StorageConfigInfo; +import org.apache.polaris.service.TestServices; +import org.apache.polaris.service.http.IcebergHttpUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Verifies the end-to-end ETag behavior after {@link IcebergCatalogHandler} became responsible for + * computing the etag (packaged in {@link ETaggedLoadTableResponse}). The adapter only copies the + * etag onto the HTTP response, so any regression in the handler surfaces here as a missing or + * incorrect {@code ETag} header. + */ +public class IcebergCatalogAdapterETagTest { + + private static final String CATALOG = "test-catalog"; + private static final String NAMESPACE = "ns"; + + private String catalogLocation; + + @BeforeEach + public void setUp(@TempDir Path tempDir) { + catalogLocation = tempDir.toAbsolutePath().toUri().toString(); + if (catalogLocation.endsWith("/")) { + catalogLocation = catalogLocation.substring(0, catalogLocation.length() - 1); + } + } + + @Test + void createTableDirectResponseContainsExpectedETag() { + TestServices services = createTestServices(); + createCatalogAndNamespace(services); + + try (Response response = createTable(services, "etag_direct_table")) { + assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode()); + assertThat(response.getHeaders()).containsKey(HttpHeaders.ETAG); + + LoadTableResponse body = (LoadTableResponse) response.getEntity(); + String expected = + IcebergHttpUtil.generateETagForMetadataFileLocation(body.metadataLocation()); + assertThat(response.getHeaders().getFirst(HttpHeaders.ETAG)).hasToString(expected); + } + } + + @Test + void stagedCreateResponseOmitsETag() { + TestServices services = createTestServices(); + createCatalogAndNamespace(services); + + CreateTableRequest stagedRequest = + CreateTableRequest.builder() + .withName("staged_table") + .withLocation(tableLocation("staged_table")) + .withSchema(SCHEMA) + .stageCreate() + .build(); + + try (Response response = + services + .restApi() + .createTable( + CATALOG, + NAMESPACE, + stagedRequest, + null, + services.realmContext(), + services.securityContext())) { + assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode()); + // Staged creates have no metadata location yet, so the handler must omit the etag. + assertThat(response.getHeaders()).doesNotContainKey(HttpHeaders.ETAG); + } + } + + @Test + void loadTableResponseContainsETagMatchingCreate() { + TestServices services = createTestServices(); + createCatalogAndNamespace(services); + + String createETag; + try (Response createResponse = createTable(services, "etag_load_table")) { + createETag = createResponse.getHeaders().getFirst(HttpHeaders.ETAG).toString(); + } + + try (Response loadResponse = + services + .restApi() + .loadTable( + CATALOG, + NAMESPACE, + "etag_load_table", + null, + null, + null, + services.realmContext(), + services.securityContext())) { + assertThat(loadResponse.getStatus()).isEqualTo(Response.Status.OK.getStatusCode()); + assertThat(loadResponse.getHeaders()).containsKey(HttpHeaders.ETAG); + assertThat(loadResponse.getHeaders().getFirst(HttpHeaders.ETAG)).hasToString(createETag); + } + } + + @Test + void loadTableWithMatchingIfNoneMatchReturnsNotModified() { + TestServices services = createTestServices(); + createCatalogAndNamespace(services); + + String etag; + try (Response createResponse = createTable(services, "etag_ifnm_match_table")) { + etag = createResponse.getHeaders().getFirst(HttpHeaders.ETAG).toString(); + } + + try (Response loadResponse = + services + .restApi() + .loadTable( + CATALOG, + NAMESPACE, + "etag_ifnm_match_table", + null, + etag, + null, + services.realmContext(), + services.securityContext())) { + assertThat(loadResponse.getStatus()) + .isEqualTo(Response.Status.NOT_MODIFIED.getStatusCode()); + assertThat(loadResponse.hasEntity()).isFalse(); + } + } + + @Test + void loadTableWithMismatchedIfNoneMatchReturnsOkWithCurrentETag() { + TestServices services = createTestServices(); + createCatalogAndNamespace(services); + + String currentETag; + try (Response createResponse = createTable(services, "etag_ifnm_miss_table")) { + currentETag = createResponse.getHeaders().getFirst(HttpHeaders.ETAG).toString(); + } + + String staleETag = "W/\"stale-etag-value\""; + try (Response loadResponse = + services + .restApi() + .loadTable( + CATALOG, + NAMESPACE, + "etag_ifnm_miss_table", + null, + staleETag, + null, + services.realmContext(), + services.securityContext())) { + assertThat(loadResponse.getStatus()).isEqualTo(Response.Status.OK.getStatusCode()); + assertThat(loadResponse.getHeaders().getFirst(HttpHeaders.ETAG)).hasToString(currentETag); + } + } + + @Test + void registerTableResponseContainsExpectedETag() { + TestServices services = createTestServices(); + createCatalogAndNamespace(services); + + // Create a table so that a metadata file exists on disk, then drop it (keeping the files) so + // we can re-register a new table at the same metadata location without a location conflict. + String tableName = "registered_table"; + String metadataLocation; + try (Response createResponse = createTable(services, tableName)) { + metadataLocation = ((LoadTableResponse) createResponse.getEntity()).metadataLocation(); + } + try (Response dropResponse = + services + .restApi() + .dropTable( + CATALOG, + NAMESPACE, + tableName, + false, + services.realmContext(), + services.securityContext())) { + assertThat(dropResponse.getStatus()).isEqualTo(Response.Status.NO_CONTENT.getStatusCode()); + } + + final String finalMetadataLocation = metadataLocation; + RegisterTableRequest registerRequest = + new RegisterTableRequest() { + @Override + public String name() { + return tableName; + } + + @Override + public String metadataLocation() { + return finalMetadataLocation; + } + }; + + try (Response registerResponse = + services + .restApi() + .registerTable( + CATALOG, + NAMESPACE, + registerRequest, + services.realmContext(), + services.securityContext())) { + assertThat(registerResponse.getStatus()).isEqualTo(Response.Status.OK.getStatusCode()); + assertThat(registerResponse.getHeaders()).containsKey(HttpHeaders.ETAG); + + String expected = IcebergHttpUtil.generateETagForMetadataFileLocation(metadataLocation); + assertThat(registerResponse.getHeaders().getFirst(HttpHeaders.ETAG)).hasToString(expected); + } + } + + private Response createTable(TestServices services, String tableName) { + CreateTableRequest createTableRequest = + CreateTableRequest.builder() + .withName(tableName) + .withLocation(tableLocation(tableName)) + .withSchema(SCHEMA) + .build(); + return services + .restApi() + .createTable( + CATALOG, + NAMESPACE, + createTableRequest, + null, + services.realmContext(), + services.securityContext()); + } + + private String tableLocation(String tableName) { + return String.format("%s/%s/%s/%s", catalogLocation, CATALOG, NAMESPACE, tableName); + } + + private void createCatalogAndNamespace(TestServices services) { + CatalogProperties props = + CatalogProperties.builder() + .setDefaultBaseLocation(String.format("%s/%s", catalogLocation, CATALOG)) + .build(); + StorageConfigInfo storage = + FileStorageConfigInfo.builder() + .setStorageType(StorageConfigInfo.StorageTypeEnum.FILE) + .build(); + Catalog catalog = + new Catalog(Catalog.TypeEnum.INTERNAL, CATALOG, props, 0L, 0L, 1, storage); + try (Response response = + services + .catalogsApi() + .createCatalog( + new CreateCatalogRequest(catalog), + services.realmContext(), + services.securityContext())) { + assertThat(response.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode()); + } + + try (Response response = + services + .restApi() + .createNamespace( + CATALOG, + org.apache.iceberg.rest.requests.CreateNamespaceRequest.builder() + .withNamespace(org.apache.iceberg.catalog.Namespace.of(NAMESPACE)) + .build(), + services.realmContext(), + services.securityContext())) { + assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode()); + } + } + + private TestServices createTestServices() { + Map config = + Map.of( + "ALLOW_INSECURE_STORAGE_TYPES", + "true", + "SUPPORTED_CATALOG_STORAGE_TYPES", + List.of("FILE")); + return TestServices.builder().config(config).build(); + } +}