Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/en/engines/table-engines/mergetree-family/part_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ In case a table function is used as the destination, the schema can be omitted a
- **Default**: `true`
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

### export_merge_tree_part_filename_pattern

- **Type**: `String`
- **Default**: `{part_name}_{checksum}`
- **Description**: Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.


## Examples

### Basic Export to S3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ TO TABLE [destination_database.]destination_table
- **Default**: `true`
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

### export_merge_tree_part_filename_pattern

- **Type**: `String`
- **Default**: `{part_name}_{checksum}`
- **Description**: Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we duplicate part_export.md content here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is export partition (slightly different feature), and at some point there might be settings that are not supported by export partition and only by export part.

I don't have a good answer tbh.

## Examples

### Basic Export to S3
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7388,6 +7388,9 @@ Possible values:
- `` (empty value) - use server or session timezone
Default value is empty.
)", 0) \
DECLARE(String, export_merge_tree_part_filename_pattern, "{part_name}_{checksum}", R"(
Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.
)", 0) \
\
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"iceberg_partition_timezone", "", "", "New setting."},
// {"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
{"s3_propagate_credentials_to_other_storages", false, false, "New setting"},
{"export_merge_tree_part_filename_pattern", "", "{part_name}_{checksum}", "New setting"},
});
addSettingsChanges(settings_changes_history, "26.1",
{
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ struct ExportReplicatedMergeTreePartitionManifest
size_t max_bytes_per_file;
size_t max_rows_per_file;
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
String filename_pattern;
bool lock_inside_the_task; /// todo temporary

std::string toJsonString() const
Expand All @@ -139,6 +140,7 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("max_bytes_per_file", max_bytes_per_file);
json.set("max_rows_per_file", max_rows_per_file);
json.set("file_already_exists_policy", String(magic_enum::enum_name(file_already_exists_policy)));
json.set("filename_pattern", filename_pattern);
json.set("create_time", create_time);
json.set("max_retries", max_retries);
json.set("ttl_seconds", ttl_seconds);
Expand Down Expand Up @@ -175,6 +177,7 @@ struct ExportReplicatedMergeTreePartitionManifest
manifest.parquet_parallel_encoding = json->getValue<bool>("parquet_parallel_encoding");
manifest.max_bytes_per_file = json->getValue<size_t>("max_bytes_per_file");
manifest.max_rows_per_file = json->getValue<size_t>("max_rows_per_file");
manifest.filename_pattern = json->getValue<String>("filename_pattern");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve manifest backward compatibility for filename_pattern

Deserialization now requires filename_pattern unconditionally, but metadata written by earlier versions does not include this key. Any node that reads an older exports/.../metadata.json (for example while checking existing exports or canceling an export in StorageReplicatedMergeTree) will throw during fromJsonString, breaking in-flight export management after upgrade. Make this field optional and fall back to the default pattern when absent.

Useful? React with 👍 / 👎.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arthurpassos , what do you think on ^^ ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nobody is using this feature yet, it is ok to introduce backwards incompatible changes like this. We literally have 0 users so far.


if (json->has("file_already_exists_policy"))
{
Expand Down
37 changes: 35 additions & 2 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Core/Settings.h>
#include <Common/Macros.h>
#include <boost/algorithm/string/replace.hpp>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
Expand All @@ -17,6 +19,7 @@
#include "Common/setThreadName.h"
#include <Common/Exception.h>
#include <Common/ProfileEventsScope.h>
#include <Databases/DatabaseReplicated.h>
#include <Storages/MergeTree/ExportList.h>
#include <Formats/FormatFactory.h>
#include <Databases/enableAllExperimentalSettings.h>
Expand Down Expand Up @@ -47,6 +50,7 @@ namespace Setting
extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file;
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
extern const SettingsBool allow_experimental_analyzer;
extern const SettingsString export_merge_tree_part_filename_pattern;
}

namespace
Expand Down Expand Up @@ -93,6 +97,33 @@ namespace
plan_for_part.addStep(std::move(expression_step));
}
}

String buildDestinationFilename(
const MergeTreePartExportManifest & manifest,
const StorageID & storage_id,
const ContextPtr & local_context)
{
auto filename = manifest.settings[Setting::export_merge_tree_part_filename_pattern].value;

boost::replace_all(filename, "{part_name}", manifest.data_part->name);
boost::replace_all(filename, "{checksum}", manifest.data_part->checksums.getTotalChecksumHex());

Macros::MacroExpansionInfo macro_info;
macro_info.table_id = storage_id;

if (auto database = DatabaseCatalog::instance().tryGetDatabase(storage_id.database_name))
{
if (const auto replicated = dynamic_cast<const DatabaseReplicated *>(database.get()))
{
macro_info.shard = replicated->getShardName();
macro_info.replica = replicated->getReplicaName();
}
}

filename = local_context->getMacros()->expand(filename, macro_info);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need special logic from {part_name} and {checksum}?
In other words, why we do not put it inside expand() ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because part_name and checksum are calculated on the fly based on the data part being exported. They are not meant to be extracted from macros, it would not even work tbh

return filename;
}
}

ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_)
Expand Down Expand Up @@ -154,8 +185,10 @@ bool ExportPartTask::executeStep()

try
{
const auto filename = buildDestinationFilename(manifest, storage.getStorageID(), local_context);

sink = destination_storage->import(
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
filename,
block_with_partition_values,
new_file_path_callback,
manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite,
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ namespace
context_copy->setSetting("export_merge_tree_part_throw_on_pending_mutations", false);
context_copy->setSetting("export_merge_tree_part_throw_on_pending_patch_parts", false);

return context_copy;
context_copy->setSetting("export_merge_tree_part_filename_pattern", manifest.filename_pattern);

return context_copy;
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ namespace Setting
extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations;
extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts;
extern const SettingsBool export_merge_tree_partition_lock_inside_the_task;
extern const SettingsString export_merge_tree_part_filename_pattern;
}

namespace MergeTreeSetting
Expand Down Expand Up @@ -8209,6 +8210,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &


manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value;
manifest.filename_pattern = query_context->getSettingsRef()[Setting::export_merge_tree_part_filename_pattern].value;

ops.emplace_back(zkutil::makeCreateRequest(
fs::path(partition_exports_path) / "metadata.json",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<clickhouse>
<macros>
<shard>shard1</shard>
<replica>replica1</replica>
</macros>
</clickhouse>
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<clickhouse>
<macros>
<shard>shard2</shard>
<replica>replica1</replica>
</macros>
</clickhouse>
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,26 @@ def cluster():
with_zookeeper=True,
keeper_required_feature_flags=["multi_read"],
)
# Sharded instances for filename pattern tests
cluster.add_instance(
"shard1_replica1",
main_configs=["configs/named_collections.xml", "configs/allow_experimental_export_partition.xml", "configs/macros_shard1_replica1.xml"],
user_configs=["configs/users.d/profile.xml"],
with_minio=True,
stay_alive=True,
with_zookeeper=True,
keeper_required_feature_flags=["multi_read"],
)

cluster.add_instance(
"shard2_replica1",
main_configs=["configs/named_collections.xml", "configs/allow_experimental_export_partition.xml", "configs/macros_shard2_replica1.xml"],
user_configs=["configs/users.d/profile.xml"],
with_minio=True,
stay_alive=True,
with_zookeeper=True,
keeper_required_feature_flags=["multi_read"],
)
logging.info("Starting cluster...")
cluster.start()
yield cluster
Expand Down Expand Up @@ -161,6 +181,14 @@ def create_tables_and_insert_data(node, mt_table, s3_table, replica_name):
create_s3_table(node, s3_table)


def create_sharded_tables_and_insert_data(node, mt_table, s3_table, replica_name):
"""Create sharded ReplicatedMergeTree table with {shard} macro in ZooKeeper path."""
node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{{shard}}/{mt_table}', '{replica_name}') PARTITION BY year ORDER BY tuple()")
node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)")

create_s3_table(node, s3_table)


def test_restart_nodes_during_export(cluster):
skip_if_remote_database_disk_enabled(cluster)
node = cluster.instances["replica1"]
Expand Down Expand Up @@ -1148,3 +1176,162 @@ def test_export_partition_with_mixed_computed_columns(cluster):
AND partition_id = '1'
""")
assert status.strip() == "COMPLETED", f"Expected COMPLETED status, got: {status}"


def test_sharded_export_partition_with_filename_pattern(cluster):
"""Test that export partition with filename pattern prevents collisions in sharded setup."""
shard1_r1 = cluster.instances["shard1_replica1"]
shard2_r1 = cluster.instances["shard2_replica1"]
watcher_node = cluster.instances["watcher_node"]

mt_table = "sharded_mt_table"
s3_table = "sharded_s3_table"

# Create sharded tables on all shards with same partition data (same part names)
# Each shard uses different ZooKeeper path via {shard} macro
create_sharded_tables_and_insert_data(shard1_r1, mt_table, s3_table, "replica1")
create_sharded_tables_and_insert_data(shard2_r1, mt_table, s3_table, "replica1")
create_s3_table(watcher_node, s3_table)

# Export partition from both shards with filename pattern including shard
# This should prevent filename collisions
shard1_r1.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} "
f"SETTINGS export_merge_tree_part_filename_pattern = '{{part_name}}_{{shard}}_{{replica}}_{{checksum}}'"
)
shard2_r1.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} "
f"SETTINGS export_merge_tree_part_filename_pattern = '{{part_name}}_{{shard}}_{{replica}}_{{checksum}}'"
)

# Wait for exports to complete
wait_for_export_status(shard1_r1, mt_table, s3_table, "2020", "COMPLETED")
wait_for_export_status(shard2_r1, mt_table, s3_table, "2020", "COMPLETED")

total_count = watcher_node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip()
assert total_count == "6", f"Expected 6 total rows (3 from each shard), got {total_count}"

# Verify filenames contain shard information (check via S3 directly)
# Get all files from S3 - query from watcher_node since S3 is shared
files_shard1 = watcher_node.query(
f"SELECT _file FROM s3(s3_conn, filename='{s3_table}/**', format='One') WHERE _file LIKE '%shard1%' LIMIT 1"
).strip()
files_shard2 = watcher_node.query(
f"SELECT _file FROM s3(s3_conn, filename='{s3_table}/**', format='One') WHERE _file LIKE '%shard2%' LIMIT 1"
).strip()

# Both shards should have files with their shard names
assert "shard1" in files_shard1 or files_shard1 == "", f"Expected shard1 in filenames, got: {files_shard1}"
assert "shard2" in files_shard2 or files_shard2 == "", f"Expected shard2 in filenames, got: {files_shard2}"


def test_export_partition_from_replicated_database_uses_db_shard_replica_macros(cluster):
"""Test that {shard} and {replica} in the filename pattern are expanded from the
DatabaseReplicated identity, NOT from server config macros.

replica1 has no <shard>/<replica> entries in its server config <macros> section.
Without the fix buildDestinationFilename() leaves macro_info.shard/replica unset, so
Macros::expand() falls through to the config-macros lookup and throws NO_ELEMENTS_IN_CONFIG.
With the fix the DatabaseReplicated shard_name / replica_name are injected into macro_info
before the expand call, and the pattern resolves correctly.
"""
node = cluster.instances["replica1"]
watcher_node = cluster.instances["watcher_node"]

postfix = str(uuid.uuid4()).replace("-", "_")
db_name = f"repdb_{postfix}"
table_name = "mt_table"
s3_table = f"s3_dbreplicated_{postfix}"

# These values exist only in the DatabaseReplicated definition – they are NOT
# present anywhere in replica1's server config <macros>.
db_shard = "db_shard_x"
db_replica = "db_replica_y"

node.query(
f"CREATE DATABASE {db_name} "
f"ENGINE = Replicated('/clickhouse/databases/{db_name}', '{db_shard}', '{db_replica}')")

node.query(f"""
CREATE TABLE {db_name}.{table_name}
(id UInt64, year UInt16)
ENGINE = ReplicatedMergeTree()
PARTITION BY year ORDER BY tuple()""")

node.query(f"INSERT INTO {db_name}.{table_name} VALUES (1, 2020), (2, 2020), (3, 2020)")
# Stop merges so part names stay stable during the test.
node.query(f"SYSTEM STOP MERGES {db_name}.{table_name}")

node.query(
f"CREATE TABLE {s3_table} (id UInt64, year UInt16) "
f"ENGINE = S3(s3_conn, filename='{s3_table}', format=Parquet, partition_strategy='hive') "
f"PARTITION BY year")

watcher_node.query(
f"CREATE TABLE {s3_table} (id UInt64, year UInt16) "
f"ENGINE = S3(s3_conn, filename='{s3_table}', format=Parquet, partition_strategy='hive') "
f"PARTITION BY year")

# Export with {shard} and {replica} in the pattern.
# Before the fix: Macros::expand throws NO_ELEMENTS_IN_CONFIG because replica1 has
# no <shard>/<replica> server config macros.
# After the fix: DatabaseReplicated's shard_name/replica_name are wired into
# macro_info before the expand call, so this succeeds and produces the right names.
node.query(
f"ALTER TABLE {db_name}.{table_name} EXPORT PARTITION ID '2020' TO TABLE {s3_table} "
f"SETTINGS export_merge_tree_part_filename_pattern = "
f"'{{part_name}}_{{shard}}_{{replica}}_{{checksum}}'")

# A FAILED status here almost certainly means the macro expansion threw
# NO_ELEMENTS_IN_CONFIG (i.e. the fix is missing or broken).
wait_for_export_status(node, table_name, s3_table, "2020", "COMPLETED")

# Data should have landed in S3.
count = watcher_node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip()
assert count == "3", f"Expected 3 exported rows, got {count}"

# The exported filename must contain the exact shard and replica names from the
# DatabaseReplicated definition, proving the fix injected them (not server config macros).
filename = watcher_node.query(
f"SELECT _file FROM s3(s3_conn, filename='{s3_table}/**/*.parquet', format='One') LIMIT 1"
).strip()

assert db_shard in filename, (
f"Expected filename to contain DatabaseReplicated shard '{db_shard}', got: {filename!r}. "
"Suggests {shard} was not expanded from the DatabaseReplicated identity.")

assert db_replica in filename, (
f"Expected filename to contain DatabaseReplicated replica '{db_replica}', got: {filename!r}. "
"Suggests {replica} was not expanded from the DatabaseReplicated identity.")


def test_sharded_export_partition_default_pattern(cluster):
shard1_r1 = cluster.instances["shard1_replica1"]
shard2_r1 = cluster.instances["shard2_replica1"]
watcher_node = cluster.instances["watcher_node"]

mt_table = "sharded_mt_table_default"
s3_table = "sharded_s3_table_default"

# Create sharded tables with different ZooKeeper paths per shard
create_sharded_tables_and_insert_data(shard1_r1, mt_table, s3_table, "replica1")
create_sharded_tables_and_insert_data(shard2_r1, mt_table, s3_table, "replica1")
create_s3_table(watcher_node, s3_table)

# Export with default pattern ({part_name}_{checksum}) - may cause collisions if parts have same name and the same checksum
shard1_r1.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}"
)
shard2_r1.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}"
)

wait_for_export_status(shard1_r1, mt_table, s3_table, "2020", "COMPLETED")
wait_for_export_status(shard2_r1, mt_table, s3_table, "2020", "COMPLETED")

# Both exports should complete (even if there are collisions, the overwrite policy handles it)
# S3 tables are shared, so query from watcher_node
total_count = watcher_node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip()

# only one file with 3 rows should be present
assert int(total_count) == 3, f"Expected 3 rows, got {total_count}"
Loading
Loading