Skip to content
Merged
9 changes: 9 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7315,6 +7315,15 @@ Allows creation of [QBit](../../sql-reference/data-types/qbit.md) data type.
)", BETA, allow_experimental_qbit_type) \
DECLARE(UInt64, archive_adaptive_buffer_max_size_bytes, 8 * DBMS_DEFAULT_BUFFER_SIZE, R"(
Limits the maximum size of the adaptive buffer used when writing to archive files (for example, tar archives)", 0) \
DECLARE(Timezone, iceberg_partition_timezone, "", R"(
Time zone by which partitioning of Iceberg tables was performed.
Possible values:

- Any valid timezone, e.g. `Europe/Berlin`, `UTC` or `Zulu`
- `` (empty value) - use server or session timezone

Default value is empty.
)", 0) \
\
/* ####################################################### */ \
/* ########### START OF EXPERIMENTAL FEATURES ############ */ \
Expand Down
5 changes: 1 addition & 4 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
/// Note: please check if the key already exists to prevent duplicate entries.
// addSettingsChanges(settings_changes_history, "26.1.3.20001",
// {
// });
addSettingsChanges(settings_changes_history, "26.1.3.20001.altinityantalya",
{
// {"object_storage_cluster", "", "", "Antalya: New setting"},
{"iceberg_partition_timezone", "", "", "New setting."},
// {"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
});
addSettingsChanges(settings_changes_history, "26.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace DB
namespace Setting
{
extern const SettingsUInt64 iceberg_insert_max_partitions;
extern const SettingsTimezone iceberg_partition_timezone;
}

namespace ErrorCodes
Expand Down Expand Up @@ -53,7 +54,7 @@ ChunkPartitioner::ChunkPartitioner(

auto & factory = FunctionFactory::instance();

auto transform_and_argument = Iceberg::parseTransformAndArgument(transform_name);
auto transform_and_argument = Iceberg::parseTransformAndArgument(transform_name, context->getSettingsRef()[Setting::iceberg_partition_timezone]);
if (!transform_and_argument)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown transform {}", transform_name);

Expand All @@ -67,6 +68,7 @@ ChunkPartitioner::ChunkPartitioner(
result_data_types.push_back(function->getReturnType(columns_for_function));
functions.push_back(function);
function_params.push_back(transform_and_argument->argument);
function_time_zones.push_back(transform_and_argument->time_zone);
columns_to_apply.push_back(column_name);
}
}
Expand Down Expand Up @@ -103,6 +105,14 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk)
arguments.push_back(ColumnWithTypeAndName(const_column->clone(), type, "#"));
}
arguments.push_back(name_to_column[columns_to_apply[transform_ind]]);
if (function_time_zones[transform_ind].has_value())
{
auto type = std::make_shared<DataTypeString>();
auto column_value = ColumnString::create();
column_value->insert(*function_time_zones[transform_ind]);
auto const_column = ColumnConst::create(std::move(column_value), chunk.getNumRows());
arguments.push_back(ColumnWithTypeAndName(const_column->clone(), type, "PartitioningTimezone"));
}
auto result
= functions[transform_ind]->build(arguments)->execute(arguments, std::make_shared<DataTypeString>(), chunk.getNumRows(), false);
for (size_t i = 0; i < chunk.getNumRows(); ++i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class ChunkPartitioner

std::vector<FunctionOverloadResolverPtr> functions;
std::vector<std::optional<size_t>> function_params;
std::vector<std::optional<String>> function_time_zones;
std::vector<String> columns_to_apply;
std::vector<DataTypePtr> result_data_types;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <compare>
#include <optional>

#include <Interpreters/Context.h>
#include <Interpreters/IcebergMetadataLog.h>

#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
Expand All @@ -14,6 +15,7 @@
#include <Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>

#include <Core/Settings.h>
#include <Core/TypeId.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Poco/JSON/Parser.h>
Expand All @@ -34,6 +36,11 @@ namespace DB::ErrorCodes
extern const int BAD_ARGUMENTS;
}

namespace DB::Setting
{
extern const SettingsTimezone iceberg_partition_timezone;
}

namespace DB::Iceberg
{

Expand Down Expand Up @@ -219,7 +226,7 @@ ManifestFileContent::ManifestFileContent(
auto transform_name = partition_specification_field->getValue<String>(f_partition_transform);
auto partition_name = partition_specification_field->getValue<String>(f_partition_name);
common_partition_specification.emplace_back(source_id, transform_name, partition_name);
auto partition_ast = getASTFromTransform(transform_name, numeric_column_name);
auto partition_ast = getASTFromTransform(transform_name, numeric_column_name, context->getSettingsRef()[Setting::iceberg_partition_timezone]);
/// Unsupported partition key expression
if (partition_ast == nullptr)
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ using namespace DB;
namespace DB::Iceberg
{

DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name)
DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name, const String & time_zone)
{
auto transform_and_argument = parseTransformAndArgument(transform_name_src);
auto transform_and_argument = parseTransformAndArgument(transform_name_src, time_zone);
if (!transform_and_argument)
{
LOG_WARNING(&Poco::Logger::get("Iceberg Partition Pruning"), "Cannot parse iceberg transform name: {}.", transform_name_src);
Expand All @@ -47,6 +47,13 @@ DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String &
return makeASTFunction(
transform_and_argument->transform_name, make_intrusive<ASTLiteral>(*transform_and_argument->argument), make_intrusive<ASTIdentifier>(column_name));
}
if (transform_and_argument->time_zone)
{
return makeASTFunction(
transform_and_argument->transform_name,
make_intrusive<ASTIdentifier>(column_name),
make_intrusive<ASTLiteral>(*transform_and_argument->time_zone));
}
return makeASTFunction(transform_and_argument->transform_name, make_intrusive<ASTIdentifier>(column_name));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace DB::Iceberg
struct ManifestFileEntry;
class ManifestFileContent;

DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name);
DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name, const String & time_zone);

/// Prune specific data files based on manifest content
class ManifestFilesPruner
Expand Down
31 changes: 20 additions & 11 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ namespace ProfileEvents
namespace DB::Setting
{
extern const SettingsUInt64 output_format_compression_level;
extern const SettingsTimezone iceberg_partition_timezone;
}

/// Hard to imagine a hint file larger than 10 MB
Expand Down Expand Up @@ -242,27 +243,31 @@ bool writeMetadataFileAndVersionHint(
}


std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src)
std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src, const String & time_zone)
{
std::string transform_name = Poco::toLower(transform_name_src);

std::optional<String> time_zone_opt;
if (!time_zone.empty())
time_zone_opt = time_zone;

if (transform_name == "year" || transform_name == "years")
return TransformAndArgument{"toYearNumSinceEpoch", std::nullopt};
return TransformAndArgument{"toYearNumSinceEpoch", std::nullopt, time_zone_opt};

if (transform_name == "month" || transform_name == "months")
return TransformAndArgument{"toMonthNumSinceEpoch", std::nullopt};
return TransformAndArgument{"toMonthNumSinceEpoch", std::nullopt, time_zone_opt};

if (transform_name == "day" || transform_name == "date" || transform_name == "days" || transform_name == "dates")
return TransformAndArgument{"toRelativeDayNum", std::nullopt};
return TransformAndArgument{"toRelativeDayNum", std::nullopt, time_zone_opt};

if (transform_name == "hour" || transform_name == "hours")
return TransformAndArgument{"toRelativeHourNum", std::nullopt};
return TransformAndArgument{"toRelativeHourNum", std::nullopt, time_zone_opt};

if (transform_name == "identity")
return TransformAndArgument{"identity", std::nullopt};
return TransformAndArgument{"identity", std::nullopt, std::nullopt};

if (transform_name == "void")
return TransformAndArgument{"tuple", std::nullopt};
return TransformAndArgument{"tuple", std::nullopt, std::nullopt};

if (transform_name.starts_with("truncate") || transform_name.starts_with("bucket"))
{
Expand All @@ -286,11 +291,11 @@ std::optional<TransformAndArgument> parseTransformAndArgument(const String & tra

if (transform_name.starts_with("truncate"))
{
return TransformAndArgument{"icebergTruncate", argument};
return TransformAndArgument{"icebergTruncate", argument, std::nullopt};
}
else if (transform_name.starts_with("bucket"))
{
return TransformAndArgument{"icebergBucket", argument};
return TransformAndArgument{"icebergBucket", argument, std::nullopt};
}
}
return std::nullopt;
Expand Down Expand Up @@ -1245,7 +1250,8 @@ KeyDescription getSortingKeyDescriptionFromMetadata(Poco::JSON::Object::Ptr meta
auto column_name = source_id_to_column_name[source_id];
int direction = field->getValue<String>(f_direction) == "asc" ? 1 : -1;
auto iceberg_transform_name = field->getValue<String>(f_transform);
auto clickhouse_transform_name = parseTransformAndArgument(iceberg_transform_name);
auto clickhouse_transform_name = parseTransformAndArgument(iceberg_transform_name,
local_context->getSettingsRef()[Setting::iceberg_partition_timezone]);
String full_argument;
if (clickhouse_transform_name->transform_name != "identity")
{
Expand All @@ -1254,7 +1260,10 @@ KeyDescription getSortingKeyDescriptionFromMetadata(Poco::JSON::Object::Ptr meta
{
full_argument += std::to_string(*clickhouse_transform_name->argument) + ", ";
}
full_argument += column_name + ")";
full_argument += column_name;
if (clickhouse_transform_name->time_zone)
full_argument += ", " + *clickhouse_transform_name->time_zone;
full_argument += ")";
}
else
{
Expand Down
7 changes: 6 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,14 @@ struct TransformAndArgument
{
String transform_name;
std::optional<size_t> argument;
/// When Iceberg table is partitioned by time, splitting by partitions can be made using different timezone
/// (UTC in most cases). This timezone can be set with setting `iceberg_partition_timezone`, value is in this member.
/// When Iceberg partition condition converted to ClickHouse function in `parseTransformAndArgument` method
/// `time_zone` added as second argument to functions like `toRelativeDayNum`, `toYearNumSinceEpoch`, etc.
std::optional<String> time_zone;
};

std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src);
std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src, const String & time_zone);

Poco::JSON::Object::Ptr getMetadataJSONObject(
const String & metadata_file_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ services:
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8080:8080
- 10002:10000
- 10003:10001
- ${SPARK_ICEBERG_EXTERNAL_PORT:-8080}:8080
- ${SPARK_ICEBERG_EXTERNAL_PORT_2:-10002}:10000
- ${SPARK_ICEBERG_EXTERNAL_PORT_3:-10003}:10001
stop_grace_period: 5s
cpus: 3
rest:
image: tabulario/iceberg-rest:1.6.0
ports:
- 8182:8181
- ${ICEBERG_REST_EXTERNAL_PORT:-8182}:8181
environment:
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=ClickHouse_Minio_P@ssw0rd
Expand Down
9 changes: 9 additions & 0 deletions tests/integration/helpers/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,9 @@ def __init__(
self.minio_secret_key = minio_secret_key

self.spark_session = None
self.spark_iceberg_external_port = 8080
self.spark_iceberg_external_port_2 = 10002
self.spark_iceberg_external_port_3 = 10003
self.with_iceberg_catalog = False
self.with_glue_catalog = False
self.with_hms_catalog = False
Expand Down Expand Up @@ -885,6 +888,8 @@ def __init__(
self._letsencrypt_pebble_api_port = 14000
self._letsencrypt_pebble_management_port = 15000

self.iceberg_rest_external_port = 8182

self.docker_client: docker.DockerClient = None
self.is_up = False
self.env = os.environ.copy()
Expand Down Expand Up @@ -1702,6 +1707,10 @@ def setup_hms_catalog_cmd(self, instance, env_variables, docker_compose_yml_dir)
def setup_iceberg_catalog_cmd(
self, instance, env_variables, docker_compose_yml_dir, extra_parameters=None
):
env_variables["ICEBERG_REST_EXTERNAL_PORT"] = str(self.iceberg_rest_external_port)
env_variables["SPARK_ICEBERG_EXTERNAL_PORT"] = str(self.spark_iceberg_external_port)
env_variables["SPARK_ICEBERG_EXTERNAL_PORT_2"] = str(self.spark_iceberg_external_port_2)
env_variables["SPARK_ICEBERG_EXTERNAL_PORT_3"] = str(self.spark_iceberg_external_port_3)
self.with_iceberg_catalog = True
file_name = "docker_compose_iceberg_rest_catalog.yml"
if extra_parameters is not None and extra_parameters["docker_compose_file_name"] != "":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<iceberg_partition_timezone>UTC</iceberg_partition_timezone>
</default>
</profiles>
</clickhouse>
3 changes: 3 additions & 0 deletions tests/integration/test_database_iceberg/configs/timezone.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<clickhouse>
<timezone>Asia/Istanbul</timezone>
</clickhouse>
Loading
Loading