diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/DataCompaction.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/DataCompaction.java index df7c4c41..68bfa420 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/DataCompaction.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/DataCompaction.java @@ -177,6 +177,7 @@ private void merge( Parquet.WriteBuilder writeBuilder = Parquet.write(outputFile) + .setAll(table.properties()) .overwrite(false) .createWriterFunc(GenericParquetWriter::buildWriter) .metricsConfig(metricsConfig) diff --git a/ice-rest-catalog/src/test/resources/scenarios/insert-compression/run.sh.tmpl b/ice-rest-catalog/src/test/resources/scenarios/insert-compression/run.sh.tmpl new file mode 100644 index 00000000..f293a788 --- /dev/null +++ b/ice-rest-catalog/src/test/resources/scenarios/insert-compression/run.sh.tmpl @@ -0,0 +1,52 @@ +#!/bin/bash +set -e + +echo "Running insert with compression test..." + +SCENARIO_DIR="{{SCENARIO_DIR}}" +INPUT_PATH="${SCENARIO_DIR}/../insert-scan/input.parquet" + +# Create namespace and table +{{ICE_CLI}} --config {{CLI_CONFIG}} create-namespace ${NAMESPACE_NAME} +echo "OK Created namespace" + +# Verify invalid compression codec is rejected +if {{ICE_CLI}} --config {{CLI_CONFIG}} insert --create-table ${TABLE_NAME} "file://${INPUT_PATH}" --compression=invalid 2>/dev/null; then + echo "FAIL insert should have rejected invalid compression codec" + exit 1 +fi +echo "OK Invalid compression codec rejected" + +# Insert with zstd compression +{{ICE_CLI}} --config {{CLI_CONFIG}} insert --create-table ${TABLE_NAME} "file://${INPUT_PATH}" --compression=zstd +echo "OK Inserted data with zstd compression" + +# Scan to verify data written with zstd is readable +{{ICE_CLI}} --config {{CLI_CONFIG}} scan ${TABLE_NAME} > /tmp/compression_scan1.txt +if ! grep -q "sepal.length" /tmp/compression_scan1.txt; then + echo "FAIL Scan output missing expected column after zstd insert" + cat /tmp/compression_scan1.txt + exit 1 +fi +FIRST_LINES=$(wc -l < /tmp/compression_scan1.txt) +echo "OK Scan after zstd insert: ${FIRST_LINES} lines" + +# Insert again with snappy compression +{{ICE_CLI}} --config {{CLI_CONFIG}} insert ${TABLE_NAME} "file://${INPUT_PATH}" --compression=snappy +echo "OK Inserted data with snappy compression" + +# Scan to verify both zstd and snappy data files are readable together +{{ICE_CLI}} --config {{CLI_CONFIG}} scan ${TABLE_NAME} --limit 500 > /tmp/compression_scan2.txt +SECOND_LINES=$(wc -l < /tmp/compression_scan2.txt) +if [ "${SECOND_LINES}" -le "${FIRST_LINES}" ]; then + echo "FAIL Second scan should have more rows (got ${SECOND_LINES}, first had ${FIRST_LINES})" + exit 1 +fi +echo "OK Scan after snappy insert: ${SECOND_LINES} lines" + +# Cleanup +{{ICE_CLI}} --config {{CLI_CONFIG}} delete-table ${TABLE_NAME} +{{ICE_CLI}} --config {{CLI_CONFIG}} delete-namespace ${NAMESPACE_NAME} +echo "OK Cleanup done" + +echo "Insert with compression test completed successfully" diff --git a/ice-rest-catalog/src/test/resources/scenarios/insert-compression/scenario.yaml b/ice-rest-catalog/src/test/resources/scenarios/insert-compression/scenario.yaml new file mode 100644 index 00000000..7be5d0d9 --- /dev/null +++ b/ice-rest-catalog/src/test/resources/scenarios/insert-compression/scenario.yaml @@ -0,0 +1,10 @@ +name: "Insert with compression codec" +description: "Tests inserting data with --compression flag (zstd) and verifying data is readable" + +catalogConfig: + warehouse: "s3://test-bucket/warehouse" + +env: + NAMESPACE_NAME: "test_compression" + TABLE_NAME: "test_compression.iris_zstd" + INPUT_FILE: "input.parquet" diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index ce2fd048..79e7680e 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -417,6 +417,11 @@ void insert( description = "Number of threads to use for inserting data", defaultValue = "-1") int threadCount, + @CommandLine.Option( + names = {"--compression"}, + description = + "Parquet compression codec: gzip (default), zstd, snappy, lz4, brotli, uncompressed, or as-source") + String compression, @CommandLine.Option( names = {"--watch"}, description = "Event queue. Supported: AWS SQS") @@ -513,6 +518,7 @@ void insert( .sortOrderList(sortOrders) .threadCount( threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount) + .compression(compression) .build(); if (!watchMode) { diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 1e510680..d8139aee 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -28,8 +28,10 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -76,6 +78,7 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.rest.RESTCatalog; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import org.slf4j.Logger; @@ -108,6 +111,20 @@ public static Result run( return new Result(0, 0); } + if (options.compression() != null) { + // Validate against the list of parquet compression codecs supported by Iceberg. + Set validCompressionCodecs = + Arrays.stream(CompressionCodecName.values()) + .map(c -> c.name().toLowerCase(Locale.ENGLISH)) + .collect(Collectors.toCollection(HashSet::new)); + validCompressionCodecs.add("as-source"); + if (!validCompressionCodecs.contains(options.compression().toLowerCase(Locale.ENGLISH))) { + String accepted = String.join(", ", new TreeSet<>(validCompressionCodecs)); + throw new IllegalArgumentException( + "Unknown --compression value: " + options.compression() + ". Accepted: " + accepted); + } + } + Table table = catalog.loadTable(nsTable); // Create transaction and pass it to updatePartitionAndSortOrderMetadata @@ -501,66 +518,92 @@ private static List processFile( .build()); dataFileSizeInBytes = inputFile.getLength(); dataFile = dstDataFile; - } else if (partitionSpec.isPartitioned() && partitionKey == null) { - return copyPartitionedAndSorted( - file, - tableSchema, - partitionSpec, - sortOrder, - metricsConfig, - tableIO, - inputFile, - dstDataFileSource); - } else if (sortOrder.isSorted() && !sorted) { - return Collections.singletonList( - copySorted( - file, - dstDataFileSource.get(file), - tableSchema, - partitionSpec, - sortOrder, - metricsConfig, - tableIO, - inputFile, - dataFileNamingStrategy, - partitionKey)); } else { - // Table isn't partitioned or sorted. Copy as is. - String dstDataFile = dstDataFileSource.get(file); - if (checkNotExists.apply(dstDataFile)) { - return Collections.emptyList(); + // Copy path: compute compression override from CLI or as-source + String compressionCodecOverride = null; + if (options.compression() != null) { + if ("as-source".equalsIgnoreCase(options.compression())) { + var blocks = metadata.getBlocks(); + if (!blocks.isEmpty()) { + compressionCodecOverride = + blocks.get(0).getColumns().get(0).getCodec().name().toLowerCase(); + } + } else { + compressionCodecOverride = options.compression().toLowerCase(); + } } - OutputFile outputFile = - tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://")); - // TODO: support transferTo below (note that compression, etc. might be different) - // try (var d = outputFile.create()) { - // try (var s = inputFile.newStream()) { s.transferTo(d); } - // } - Parquet.ReadBuilder readBuilder = - Parquet.read(inputFile) - .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) - .project(tableSchema) - .reuseContainers(); - Parquet.WriteBuilder writeBuilder = - Parquet.write(outputFile) - .overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL) - .createWriterFunc(GenericParquetWriter::buildWriter) - .metricsConfig(metricsConfig) - .schema(tableSchema); + if (partitionSpec.isPartitioned() && partitionKey == null) { + return copyPartitionedAndSorted( + file, + tableSchema, + partitionSpec, + sortOrder, + metricsConfig, + tableIO, + inputFile, + dstDataFileSource, + table.properties(), + compressionCodecOverride); + } else if (sortOrder.isSorted() && !sorted) { + return Collections.singletonList( + copySorted( + file, + dstDataFileSource.get(file), + tableSchema, + partitionSpec, + sortOrder, + metricsConfig, + tableIO, + inputFile, + dataFileNamingStrategy, + partitionKey, + table.properties(), + compressionCodecOverride)); + } else { + // Table isn't partitioned or sorted. Copy as is. + String dstDataFile = dstDataFileSource.get(file); + if (checkNotExists.apply(dstDataFile)) { + return Collections.emptyList(); + } + OutputFile outputFile = + tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://")); + // TODO: support transferTo below (note that compression, etc. might be different) + // try (var d = outputFile.create()) { + // try (var s = inputFile.newStream()) { s.transferTo(d); } + // } + Parquet.ReadBuilder readBuilder = + Parquet.read(inputFile) + .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) + .project(tableSchema) + .reuseContainers(); + + Parquet.WriteBuilder writeBuilder = + Parquet.write(outputFile) + .setAll(table.properties()) + .overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL) + .createWriterFunc(GenericParquetWriter::buildWriter) + .metricsConfig(metricsConfig) + .schema(tableSchema); + if (compressionCodecOverride != null) { + + writeBuilder = + writeBuilder.set(TableProperties.PARQUET_COMPRESSION, compressionCodecOverride); + } - logger.info("{}: copying to {}", file, dstDataFile); + logger.info("{}: copying to {}", file, dstDataFile); - try (CloseableIterable parquetReader = readBuilder.build()) { - try (FileAppender writer = writeBuilder.build()) { - writer.addAll(parquetReader); - writer.close(); // for write.length() - dataFileSizeInBytes = writer.length(); - metrics = writer.metrics(); + try (CloseableIterable parquetReader = readBuilder.build()) { + try (FileAppender writer = writeBuilder.build()) { + writer.addAll(parquetReader); + writer.close(); // for write.length() + dataFileSizeInBytes = writer.length(); + metrics = writer.metrics(); + } } - } - dataFile = dstDataFile; + dataFile = dstDataFile; + } } logger.info( "{}: adding data file (copy took {}s)", file, (System.currentTimeMillis() - start) / 1000); @@ -588,7 +631,9 @@ private static List copyPartitionedAndSorted( MetricsConfig metricsConfig, FileIO tableIO, InputFile inputFile, - DataFileNamingStrategy dstDataFileSource) + DataFileNamingStrategy dstDataFileSource, + Map tableProperties, + @Nullable String compressionCodecOverride) throws IOException { logger.info("{}: partitioning{}", file, sortOrder.isSorted() ? "+sorting" : ""); @@ -622,10 +667,15 @@ private static List copyPartitionedAndSorted( Parquet.WriteBuilder writeBuilder = Parquet.write(outputFile) + .setAll(tableProperties) .overwrite(true) // FIXME .createWriterFunc(GenericParquetWriter::buildWriter) .metricsConfig(metricsConfig) .schema(tableSchema); + if (compressionCodecOverride != null) { + writeBuilder = + writeBuilder.set(TableProperties.PARQUET_COMPRESSION, compressionCodecOverride); + } try (FileAppender writer = writeBuilder.build()) { for (Record record : records) { @@ -668,7 +718,9 @@ private static DataFile copySorted( FileIO tableIO, InputFile inputFile, DataFileNamingStrategy.Name dataFileNamingStrategy, - PartitionKey partitionKey) + PartitionKey partitionKey, + Map tableProperties, + @Nullable String compressionCodecOverride) throws IOException { logger.info("{}: copying (sorted) to {}", file, dstDataFile); @@ -698,11 +750,16 @@ private static DataFile copySorted( // Write sorted records to outputFile Parquet.WriteBuilder writeBuilder = Parquet.write(outputFile) + .setAll(tableProperties) .overwrite( dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL) // FIXME .createWriterFunc(GenericParquetWriter::buildWriter) .metricsConfig(metricsConfig) .schema(tableSchema); + if (compressionCodecOverride != null) { + writeBuilder = + writeBuilder.set(TableProperties.PARQUET_COMPRESSION, compressionCodecOverride); + } long fileSizeInBytes; Metrics metrics; @@ -793,7 +850,8 @@ public record Options( @Nullable String retryListFile, @Nullable List partitionList, @Nullable List sortOrderList, - int threadCount) { + int threadCount, + @Nullable String compression) { public static Builder builder() { return new Builder(); @@ -816,6 +874,7 @@ public static final class Builder { private List partitionList = List.of(); private List sortOrderList = List.of(); private int threadCount = Runtime.getRuntime().availableProcessors(); + private String compression; private Builder() {} @@ -899,6 +958,11 @@ public Builder threadCount(int threadCount) { return this; } + public Builder compression(String compression) { + this.compression = compression; + return this; + } + public Options build() { return new Options( dataFileNamingStrategy, @@ -916,7 +980,8 @@ public Options build() { retryListFile, partitionList, sortOrderList, - threadCount); + threadCount, + compression); } } }