Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
import org.apache.polaris.service.it.env.IntegrationTestsHelper;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand All @@ -43,6 +46,24 @@

@QuarkusIntegrationTest
public class SparkDeltaIT extends SparkIntegrationBase {
@Override
protected StorageConfigInfo getStorageConfigInfo() {
return FileStorageConfigInfo.builder()
.setStorageType(StorageConfigInfo.StorageTypeEnum.FILE)
.setAllowedLocations(List.of("file://"))
.build();
}

@Override
protected String getDefaultBaseLocation() {
return warehouseDir.toString();
}

@Override
protected Map<String, String> getExtraCatalogProperties() {
return Map.of();
}

private String defaultNs;
private String tableRootDir;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import java.io.File;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
import org.apache.polaris.service.it.env.IntegrationTestsHelper;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -36,6 +39,24 @@
@QuarkusIntegrationTest
public class SparkHudiIT extends SparkIntegrationBase {

@Override
protected StorageConfigInfo getStorageConfigInfo() {
return FileStorageConfigInfo.builder()
.setStorageType(StorageConfigInfo.StorageTypeEnum.FILE)
.setAllowedLocations(List.of("file://"))
.build();
}

@Override
protected String getDefaultBaseLocation() {
return warehouseDir.toString();
}

@Override
protected Map<String, String> getExtraCatalogProperties() {
return Map.of();
}

@Override
protected SparkSession buildSparkSession() {
return SparkSession.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.net.URI;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -89,24 +90,16 @@ public void before(

catalogName = client.newEntityName("spark_catalog");

AwsStorageConfigInfo awsConfigModel =
AwsStorageConfigInfo.builder()
.setRoleArn("arn:aws:iam::123456789012:role/my-role")
.setExternalId("externalId")
.setUserArn("userArn")
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.setAllowedLocations(List.of("s3://my-old-bucket/path/to/data"))
.build();
CatalogProperties props = new CatalogProperties("s3://my-bucket/path/to/data");
props.putAll(s3Container.getS3ConfigProperties());
CatalogProperties props = new CatalogProperties(getDefaultBaseLocation());
props.putAll(getExtraCatalogProperties());
props.put("polaris.config.drop-with-purge.enabled", "true");
props.put("polaris.config.namespace-custom-location.enabled", "true");
Catalog catalog =
PolarisCatalog.builder()
.setType(Catalog.TypeEnum.INTERNAL)
.setName(catalogName)
.setProperties(props)
.setStorageConfigInfo(awsConfigModel)
.setStorageConfigInfo(getStorageConfigInfo())
.build();

managementApi.createCatalog(catalog);
Expand All @@ -127,6 +120,24 @@ protected SparkSession buildSparkSession() {
.getOrCreate();
}

protected StorageConfigInfo getStorageConfigInfo() {
return AwsStorageConfigInfo.builder()
.setRoleArn("arn:aws:iam::123456789012:role/my-role")
.setExternalId("externalId")
.setUserArn("userArn")
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.setAllowedLocations(List.of("s3://my-old-bucket/path/to/data"))
.build();
}

protected String getDefaultBaseLocation() {
return "s3://my-bucket/path/to/data";
}

protected Map<String, String> getExtraCatalogProperties() {
return s3Container.getS3ConfigProperties();
}

@AfterEach
public void after() throws Exception {
cleanupCatalog(catalogName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,31 @@ public static PolarisResolvedPathWrapper findResolvedStorageEntity(
ResolvedPathKey.ofNamespace(tableIdentifier.namespace()));
}

/**
* Validates that the specified {@code location} is valid for whatever storage config is found for
* this TableLike's parent hierarchy. Resolves the storage entity from the given entity view.
*/
public static void validateLocationForTableLike(
PolarisResolutionManifestCatalogView resolvedEntityView,
RealmConfig realmConfig,
TableIdentifier identifier,
String location) {
PolarisResolvedPathWrapper resolvedStorageEntity =
resolvedEntityView.getResolvedPath(
ResolvedPathKey.ofTableLike(identifier), PolarisEntitySubType.ANY_SUBTYPE);
if (resolvedStorageEntity == null) {
resolvedStorageEntity =
resolvedEntityView.getResolvedPath(ResolvedPathKey.ofNamespace(identifier.namespace()));
}
if (resolvedStorageEntity == null) {
resolvedStorageEntity =
resolvedEntityView.getPassthroughResolvedPath(
ResolvedPathKey.ofNamespace(identifier.namespace()));
}

validateLocationsForTableLike(realmConfig, identifier, Set.of(location), resolvedStorageEntity);
}

/**
* Validates that the specified {@code locations} are valid for whatever storage config is found
* for the given entity's parent hierarchy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
import org.apache.polaris.core.persistence.resolver.ResolvedPathKey;
import org.apache.polaris.service.catalog.common.CatalogUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,6 +90,11 @@ public GenericTableEntity createGenericTable(
"Failed to fetch resolved parent for TableIdentifier '%s'", tableIdentifier));
}

if (baseLocation != null && !baseLocation.isEmpty()) {
CatalogUtils.validateLocationForTableLike(
resolvedEntityView, callContext.getRealmConfig(), tableIdentifier, baseLocation);
}

List<PolarisEntity> catalogPath = resolvedParent.getRawFullPath();

PolarisResolvedPathWrapper resolvedEntities =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1021,39 +1021,6 @@ public String transformTableLikeLocation(TableIdentifier tableIdentifier, String
tableIdentifier, applyReplaceNewLocationWithCatalogDefault(location));
}

/**
* Validates that the specified {@code location} is valid for whatever storage config is found for
* this TableLike's parent hierarchy.
*/
private void validateLocationForTableLike(TableIdentifier identifier, String location) {
PolarisResolvedPathWrapper resolvedStorageEntity =
resolvedEntityView.getResolvedPath(
ResolvedPathKey.ofTableLike(identifier), PolarisEntitySubType.ANY_SUBTYPE);
if (resolvedStorageEntity == null) {
resolvedStorageEntity =
resolvedEntityView.getResolvedPath(ResolvedPathKey.ofNamespace(identifier.namespace()));
}
if (resolvedStorageEntity == null) {
resolvedStorageEntity =
resolvedEntityView.getPassthroughResolvedPath(
ResolvedPathKey.ofNamespace(identifier.namespace()));
}

validateLocationForTableLike(identifier, location, resolvedStorageEntity);
}

/**
* Validates that the specified {@code location} is valid for whatever storage config is found for
* this TableLike's parent hierarchy.
*/
private void validateLocationForTableLike(
TableIdentifier identifier,
String location,
PolarisResolvedPathWrapper resolvedStorageEntity) {
CatalogUtils.validateLocationsForTableLike(
realmConfig, identifier, Set.of(location), resolvedStorageEntity);
}

/**
* Validates the table location has no overlap with other entities after checking the
* configuration of the service
Expand Down Expand Up @@ -1972,7 +1939,8 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) {
if (base == null || !metadata.location().equals(base.location())) {
// If location is changing then we must validate that the requested location is valid
// for the storage configuration inherited under this entity's path.
validateLocationForTableLike(identifier, metadata.location(), resolvedStorageEntity);
CatalogUtils.validateLocationsForTableLike(
realmConfig, identifier, Set.of(metadata.location()), resolvedStorageEntity);
validateNoLocationOverlap(
catalogEntity,
identifier,
Expand Down Expand Up @@ -2361,7 +2329,8 @@ private void createTableLike(

// Make sure the metadata file is valid for our allowed locations.
String metadataLocation = icebergTableLikeEntity.getMetadataLocation();
validateLocationForTableLike(identifier, metadataLocation, resolvedParent);
CatalogUtils.validateLocationsForTableLike(
realmConfig, identifier, Set.of(metadataLocation), resolvedParent);

List<PolarisEntity> catalogPath = resolvedParent.getRawFullPath();

Expand Down Expand Up @@ -2427,7 +2396,8 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) {

// Make sure the metadata file is valid for our allowed locations.
String metadataLocation = icebergTableLikeEntity.getMetadataLocation();
validateLocationForTableLike(identifier, metadataLocation, resolvedEntities);
CatalogUtils.validateLocationsForTableLike(
realmConfig, identifier, Set.of(metadataLocation), resolvedEntities);

List<PolarisEntity> catalogPath = resolvedEntities.getRawParentPath();
EntityResult res =
Expand Down Expand Up @@ -2543,7 +2513,8 @@ private boolean sendNotificationForTableLike(
// Validate location against the resolvedStorageEntity
String metadataLocation =
transformTableLikeLocation(tableIdentifier, request.getPayload().getMetadataLocation());
validateLocationForTableLike(tableIdentifier, metadataLocation, resolvedStorageEntity);
CatalogUtils.validateLocationsForTableLike(
realmConfig, tableIdentifier, Set.of(metadataLocation), resolvedStorageEntity);

// Validate that we can construct a FileIO
String locationDir = metadataLocation.substring(0, metadataLocation.lastIndexOf("/"));
Expand Down Expand Up @@ -2601,7 +2572,8 @@ private boolean sendNotificationForTableLike(
.build();
}
// first validate we can read the metadata file
validateLocationForTableLike(tableIdentifier, newLocation);
CatalogUtils.validateLocationForTableLike(
resolvedEntityView, realmConfig, tableIdentifier, newLocation);

String locationDir = newLocation.substring(0, newLocation.lastIndexOf("/"));

Expand All @@ -2618,7 +2590,8 @@ private boolean sendNotificationForTableLike(
TableMetadata tableMetadata = TableMetadataParser.read(fileIO, newLocation);

// then validate that it points to a valid location for this table
validateLocationForTableLike(tableIdentifier, tableMetadata.location());
CatalogUtils.validateLocationForTableLike(
resolvedEntityView, realmConfig, tableIdentifier, tableMetadata.location());

// finally, validate that the metadata file is within the table directory
validateMetadataFileInTableDir(tableIdentifier, tableMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.types.Types;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
Expand Down Expand Up @@ -295,7 +296,7 @@ public void testIcebergTableAlreadyExists() {

@ParameterizedTest
@NullSource
@ValueSource(strings = {"", "file://path/to/my/table"})
@ValueSource(strings = {"", "s3://my-bucket/path/to/data/ns/my-table"})
public void testGenericTableRoundTrip(String baseLocation) {
Namespace namespace = Namespace.of("ns");
icebergCatalog.createNamespace(namespace);
Expand All @@ -317,6 +318,35 @@ public void testGenericTableRoundTrip(String baseLocation) {
Assertions.assertThat(resultEntity.getBaseLocation()).isEqualTo(baseLocation);
}

@ParameterizedTest
@ValueSource(strings = {"file://path/to/my/table", "s3://unauthorized-bucket/path"})
public void testCreateGenericTableWithInvalidLocation(String baseLocation) {
Namespace namespace = Namespace.of("ns");
icebergCatalog.createNamespace(namespace);

Assertions.assertThatThrownBy(
() ->
genericTableCatalog.createGenericTable(
TableIdentifier.of("ns", "t1"), "format", baseLocation, "doc", Map.of()))
.isInstanceOf(ForbiddenException.class);
}

@Test
public void testCreateGenericTableWithValidLocation() {
Namespace namespace = Namespace.of("ns");
icebergCatalog.createNamespace(namespace);

Assertions.assertThatCode(
() ->
genericTableCatalog.createGenericTable(
TableIdentifier.of("ns", "t1"),
"format",
"s3://my-bucket/path/to/data/ns/t1",
"doc",
Map.of()))
.doesNotThrowAnyException();
}

@Test
public void testLoadNonExistentTable() {
Namespace namespace = Namespace.of("ns");
Expand Down
Loading