Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash
set -e

echo "Running custom partition name test..."

SCENARIO_DIR="{{SCENARIO_DIR}}"
INPUT_PATH="${SCENARIO_DIR}/../insert-scan/input.parquet"

{{ICE_CLI}} --config {{CLI_CONFIG}} create-namespace ${NAMESPACE_NAME}
echo "OK Created namespace"

{{ICE_CLI}} --config {{CLI_CONFIG}} insert --create-table ${TABLE_NAME} "file://${INPUT_PATH}" \
--partition="${PARTITION_SPEC}"
echo "OK Inserted data with custom partition name"

{{ICE_CLI}} --config {{CLI_CONFIG}} describe -s ${TABLE_NAME} > /tmp/custom_part_describe.txt

if ! grep -q "var_trunc" /tmp/custom_part_describe.txt; then
echo "FAIL describe -s output missing custom partition name 'var_trunc'"
cat /tmp/custom_part_describe.txt
exit 1
fi
echo "OK Custom partition name 'var_trunc' found in describe output"

{{ICE_CLI}} --config {{CLI_CONFIG}} delete-table ${TABLE_NAME}
{{ICE_CLI}} --config {{CLI_CONFIG}} delete-namespace ${NAMESPACE_NAME}
echo "OK Cleanup done"

echo "Custom partition name test completed successfully"
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: "Insert with custom partition name"
description: "Tests that custom partition field names are applied via the name JSON field"

catalogConfig:
warehouse: "s3://test-bucket/warehouse"

env:
NAMESPACE_NAME: "test_custom_part"
TABLE_NAME: "test_custom_part.iris_custom"
INPUT_FILE: "input.parquet"
PARTITION_SPEC: '[{"column":"variety","transform":"truncate[3]","name":"var_trunc"}]'
8 changes: 5 additions & 3 deletions ice/src/main/java/com/altinity/ice/cli/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ public record IceSortOrder(
@JsonProperty("nullFirst") boolean nullFirst) {}

public record IcePartition(
@JsonProperty("column") String column, @JsonProperty("transform") String transform) {}
@JsonProperty("column") String column,
@JsonProperty("transform") String transform,
@JsonProperty("name") String name) {}

@CommandLine.Command(name = "create-table", description = "Create table.")
void createTable(
Expand Down Expand Up @@ -247,7 +249,7 @@ void createTable(
@CommandLine.Option(
names = {"--partition"},
description =
"Partition spec, e.g. [{\"column\":\"name\", \"transform\":\"identity\"}],"
"Partition spec, e.g. [{\"column\":\"name\", \"transform\":\"identity\", \"name\":\"custom_name\"}]. "
+ "Supported transformations: \"hour\", \"day\", \"month\", \"year\", \"identity\" (default)")
String partitionJson,
@CommandLine.Option(
Expand Down Expand Up @@ -397,7 +399,7 @@ void insert(
@CommandLine.Option(
names = {"--partition"},
description =
"Partition spec, e.g. [{\"column\":\"name\", \"transform\":\"identity\"}],"
"Partition spec, e.g. [{\"column\":\"name\", \"transform\":\"identity\", \"name\":\"custom_name\"}]. "
+ "Supported transformations: \"hour\", \"day\", \"month\", \"year\", \"identity\" (default)")
String partitionJson,
@CommandLine.Option(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package com.altinity.ice.cli.internal.iceberg;

import com.altinity.ice.cli.Main;
import com.altinity.ice.internal.strings.Strings;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
Expand Down Expand Up @@ -61,25 +62,50 @@ public static PartitionSpec newPartitionSpec(Schema schema, List<Main.IcePartiti
if (!columns.isEmpty()) {
for (Main.IcePartition partition : columns) {
String transform = Objects.requireNonNullElse(partition.transform(), "").toLowerCase();
String name = Strings.isNullOrEmpty(partition.name()) ? null : partition.name();
if (transform.startsWith("bucket[")) {
int numBuckets = Integer.parseInt(transform.substring(7, transform.length() - 1));
builder.bucket(partition.column(), numBuckets);
if (name != null) {
builder.bucket(partition.column(), numBuckets, name);
} else {
builder.bucket(partition.column(), numBuckets);
}
} else if (transform.startsWith("truncate[")) {
int width = Integer.parseInt(transform.substring(9, transform.length() - 1));
builder.truncate(partition.column(), width);
if (name != null) {
builder.truncate(partition.column(), width, name);
} else {
builder.truncate(partition.column(), width);
}
} else {
switch (transform) {
case "year":
builder.year(partition.column());
if (name != null) {
builder.year(partition.column(), name);
} else {
builder.year(partition.column());
}
break;
case "month":
builder.month(partition.column());
if (name != null) {
builder.month(partition.column(), name);
} else {
builder.month(partition.column());
}
break;
case "day":
builder.day(partition.column());
if (name != null) {
builder.day(partition.column(), name);
} else {
builder.day(partition.column());
}
break;
case "hour":
builder.hour(partition.column());
if (name != null) {
builder.hour(partition.column(), name);
} else {
builder.hour(partition.column());
}
break;
case "identity":
case "":
Expand All @@ -97,26 +123,32 @@ public static PartitionSpec newPartitionSpec(Schema schema, List<Main.IcePartiti
public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns) {
for (Main.IcePartition partition : columns) {
String transform = Objects.requireNonNullElse(partition.transform(), "").toLowerCase();
String name = Strings.isNullOrEmpty(partition.name()) ? null : partition.name();
if (transform.startsWith("bucket[")) {
int numBuckets = Integer.parseInt(transform.substring(7, transform.length() - 1));
op.addField(
partition.column() + "_bucket", Expressions.bucket(partition.column(), numBuckets));
String fieldName = name != null ? name : partition.column() + "_bucket";
op.addField(fieldName, Expressions.bucket(partition.column(), numBuckets));
} else if (transform.startsWith("truncate[")) {
int width = Integer.parseInt(transform.substring(9, transform.length() - 1));
op.addField(partition.column() + "_trunc", Expressions.truncate(partition.column(), width));
String fieldName = name != null ? name : partition.column() + "_trunc";
op.addField(fieldName, Expressions.truncate(partition.column(), width));
} else {
switch (transform) {
case "year":
op.addField(partition.column() + "_year", Expressions.year(partition.column()));
String yearField = name != null ? name : partition.column() + "_year";
op.addField(yearField, Expressions.year(partition.column()));
break;
case "month":
op.addField(partition.column() + "_month", Expressions.month(partition.column()));
String monthField = name != null ? name : partition.column() + "_month";
op.addField(monthField, Expressions.month(partition.column()));
break;
case "day":
op.addField(partition.column() + "_day", Expressions.day(partition.column()));
String dayField = name != null ? name : partition.column() + "_day";
op.addField(dayField, Expressions.day(partition.column()));
break;
case "hour":
op.addField(partition.column() + "_hour", Expressions.hour(partition.column()));
String hourField = name != null ? name : partition.column() + "_hour";
op.addField(hourField, Expressions.hour(partition.column()));
break;
case "identity":
case "":
Expand Down Expand Up @@ -217,6 +249,11 @@ public static InferPartitionKeyResult inferPartitionKey(
// Copied from org.apache.iceberg.parquet.ParquetConversions.
private static Object fromParquetPrimitive(Type type, PrimitiveType parquetType, Object value) {
switch (type.typeId()) {
case STRING:
if (value instanceof org.apache.parquet.io.api.Binary b) {
return b.toStringUsingUTF8();
}
return value;
case TIME:
case TIMESTAMP:
// time & timestamp/timestamptz are stored in microseconds
Expand Down Expand Up @@ -245,7 +282,10 @@ private static Object decodeStatValue(Object parquetStatValue, Type icebergType)
case TIME, TIMESTAMP ->
// Parquet timestamp might come as INT64 (micros) or Binary; assuming long micros for now
((Number) parquetStatValue).longValue();
case STRING -> ((org.apache.parquet.io.api.Binary) parquetStatValue).toStringUsingUTF8();
case STRING ->
parquetStatValue instanceof org.apache.parquet.io.api.Binary b
? b.toStringUsingUTF8()
: parquetStatValue.toString();
default ->
throw new UnsupportedOperationException("unsupported type: " + icebergType.typeId());
};
Expand Down Expand Up @@ -290,8 +330,13 @@ public static Map<PartitionKey, List<org.apache.iceberg.data.Record>> partition(
sourceFieldName, toGenericRecordFieldValue(value, fieldSpec.type()));
break;
default:
throw new UnsupportedOperationException(
"Unsupported transformation: " + transformName);
if (transformName.startsWith("truncate[") || transformName.startsWith("bucket[")) {
partitionRecord.setField(
sourceFieldName, toGenericRecordFieldValue(value, fieldSpec.type()));
} else {
throw new UnsupportedOperationException(
"Unsupported transformation: " + transformName);
}
}
}

Expand Down