Skip to content

spark: Don't use table FileIO for checkpointing files#15239

Open
c2zwdjnlcg wants to merge 2 commits intoapache:mainfrom
c2zwdjnlcg:fix-checkpoint-fs-impl
Open

spark: Don't use table FileIO for checkpointing files#15239
c2zwdjnlcg wants to merge 2 commits intoapache:mainfrom
c2zwdjnlcg:fix-checkpoint-fs-impl

Conversation

@c2zwdjnlcg
Copy link

Fixes: #14762

@github-actions github-actions bot added the spark label Feb 5, 2026
@c2zwdjnlcg c2zwdjnlcg force-pushed the fix-checkpoint-fs-impl branch from f994146 to 13f02a1 Compare February 19, 2026 23:44
@c2zwdjnlcg
Copy link
Author

@nastra could you take a look at this PR and see if you are aligned with separating the checkpoint IO from the table IO?

@nastra
Copy link
Contributor

nastra commented Feb 20, 2026

@c2zwdjnlcg I currently don't have any cycles to review this. Maybe @huaxingao, @RussellSpitzer or @aokolnychyi have some time to review it

@RussellSpitzer
Copy link
Member

First look, please do larger changes like this only on a single module first, then backport to the others in a follow up. It makes reviewing a bit more difficult to have duplicated changes. Taking a pass in depth now

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than changing the IO here to something a user wouldn't expect, I think it's probably better for us to change InitialOffsetStore itself directly.

Since Spark Checkpoints are expected to go through HadoopFS we should probably just use Hadoop FileSystem directly instead of using Iceberg FileIO class. This of course is a breaking change so we probably also need to gate this at least initially.

Maybe build two OffsetStores with the same interface and allow users to opt to Hadoop based with a spark read conf property?

interface InitialOffsetStore {
  StreamingOffset initialOffset();

  class TableIOOffsetStore implements InitialOffsetStore {
  }
  class HadoopOffsetStore implements InitialOffsetStore {
}

@c2zwdjnlcg c2zwdjnlcg force-pushed the fix-checkpoint-fs-impl branch 3 times, most recently from 257b264 to 0f5ead4 Compare February 24, 2026 23:37
@c2zwdjnlcg
Copy link
Author

@RussellSpitzer Thanks for the review.

please do larger changes like this only on a single module first, then backport to the others in a follow up.

Sorry about that, will keep in mind for next time

Hopefully this is more inline with what you were thinking.

I named the setting streaming-checkpoint-use-table-io. If you are generally ok with this approach and name I'll also add documentation to this PR.

Comment on lines 381 to 384
protected StreamingOffset readOffset() {
try (FSDataInputStream inputStream = fileSystem.open(initialOffsetPath);
InputStreamReader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8)) {
String json = CharStreams.toString(reader);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already have a StreamingOffset.fromJson(InputStream in) ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final Long fromTimestamp;

InitialOffsetStore(Table table, String checkpointLocation, Long fromTimestamp) {
BaseOffsetStore(Table table, String checkpointLocation, Long fromTimestamp) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be "long fromTimestamp"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was preserved from the previous implementation but changed, i don't think it will do any harm.


// Controls whether streaming checkpoint operations use table FileIO or Hadoop FileSystem
public static final String STREAMING_CHECKPOINT_USE_TABLE_IO =
"streaming-checkpoint-use-table-io";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it should be use-table-io or use-hdfs ... I think either is probably fine but maybe I slightly prefer use-hdfs because I know the opposite should be using the table io?

I do think using an enum may be overboard here but maybe that's just cleaner all around
streaming-checkpoint = {table-io, hdfs}
streaming-checkpoint_default = table-io
?

Wdyt?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with

streaming-checkpoint-storage = {table-io, hadoop-fs}

since I didn't want it to seem like it was just for hdfs. But can revert to your original naming if you prefer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I keep waffling on this, I think maybe you are right to keep this as a boolean. I can't think of a third value there. Can we please change this back to just streaming-checkpoint-use-hadoop default false?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking pretty close to me! I have a some feelings about the parameter name and whether it should be boolean.


@Override
public OutputFile newOutputFile(String path) {
if (path.contains("/offsets/")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could use a constant here and in the tests above

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@c2zwdjnlcg c2zwdjnlcg force-pushed the fix-checkpoint-fs-impl branch from 0f5ead4 to 1e60a05 Compare February 26, 2026 00:54
@github-actions github-actions bot added the docs label Feb 26, 2026
@c2zwdjnlcg c2zwdjnlcg force-pushed the fix-checkpoint-fs-impl branch from 1e60a05 to bf98b1f Compare February 26, 2026 01:01
}
}

private static class InitialOffsetStore {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was running today and I actually thought that maybe we can simplify this and instead of having two different implementations, could we do a single one that takes an "IO" as an argument and just create a new HadoopFileIO from the Spark HadoopConfiguration?

I think this actually would be a little bit tighter and then removes all the hadoop code from here again. I'm trying to think if there is a good reason not to not use HadoopFileIO here ... Please let me know if you can think of one. Originally I just thought this approach would be simpler but now I'm thinking there probably isn't a significant difference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So basically everything is the same but in the switch statement above

You have

case (tableIO) ->  InitialOffsetStore(TableIO,....)
case (hadoop) ->  InitialOffsetStore(HadoopFileIO(spark.hadoopConfiguration, ...)
}

@c2zwdjnlcg c2zwdjnlcg force-pushed the fix-checkpoint-fs-impl branch from bf98b1f to e5d9946 Compare February 26, 2026 19:42
io = table.io();
break;
case HADOOP:
io = new org.apache.iceberg.hadoop.HadoopFileIO(sparkContext.hadoopConfiguration());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Avoid the fully qualified name here and import o.a.i.h.HadoopFileIO

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@c2zwdjnlcg c2zwdjnlcg force-pushed the fix-checkpoint-fs-impl branch from e5d9946 to 490b60d Compare February 26, 2026 20:45
* Tests to verify that streaming checkpoint configuration correctly controls which I/O
* implementation is used for checkpoint operations.
*/
@ExtendWith(ParameterizedTestExtension.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few instances where the comments here dont' match the new impl "hadoop fs" vs "hadoopfileIO"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

went through it again, think i got them all

@c2zwdjnlcg c2zwdjnlcg force-pushed the fix-checkpoint-fs-impl branch 3 times, most recently from ee547ad to d69df45 Compare February 26, 2026 23:04
@RussellSpitzer
Copy link
Member

RussellSpitzer commented Feb 27, 2026

* What went wrong:
Execution failed for task ':iceberg-spark:iceberg-spark-4.1_2.13:spotlessJavaCheck'.
> The following files had format violations:
      src/test/java/org/apache/iceberg/spark/source/TestStreamingCheckpointLocation.java
          @@ -27,14 +27,13 @@

           import·org.apache.iceberg.CatalogProperties;
           import·org.apache.iceberg.ParameterizedTestExtension;
           import·org.apache.iceberg.Parameters;
          +import·org.apache.iceberg.hadoop.HadoopFileIO;
           import·org.apache.iceberg.io.FileIO;
           import·org.apache.iceberg.io.InputFile;
           import·org.apache.iceberg.io.OutputFile;
          -import·org.apache.iceberg.hadoop.HadoopFileIO;
           import·org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
           import·org.apache.iceberg.spark.SparkCatalog;
           import·org.apache.iceberg.spark.SparkReadConf;
          -import·org.apache.iceberg.spark.SparkReadOptions;
           import·org.apache.iceberg.spark.TestBaseWithCatalog;
           import·org.apache.spark.sql.streaming.StreamingQuery;
           import·org.junit.jupiter.api.AfterEach;
           ```
  **Run './gradlew spotlessApply' to fix all violations.**

We are going to be good to merge after all the tests pass. If you can just add addtional commits in the future rather than squashing and force pushing. It's easier to check the delta that way, the reviewer will always squash on merge so it's not a big deal

@c2zwdjnlcg c2zwdjnlcg force-pushed the fix-checkpoint-fs-impl branch 3 times, most recently from 8c1ce4f to 67c725f Compare February 27, 2026 04:48
@c2zwdjnlcg c2zwdjnlcg force-pushed the fix-checkpoint-fs-impl branch from 67c725f to 717917e Compare February 27, 2026 04:58
@c2zwdjnlcg
Copy link
Author

Tests passing now

If you can just add addtional commits in the future rather than squashing and force pushing. It's easier to check the delta that way, the reviewer will always squash on merge so it's not a big deal\

Sorry didn't see this comment till after I fixed the tests, will keep in mind for future pushes.

SparkReadConf readConf) {
FileIO io =
readConf.streamingCheckpointUseHadoop()
? new HadoopFileIO(sparkContext.hadoopConfiguration())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should go through CatalogUtil.loadFileIO here so that the FileIO instance is properly initialized. However, we're not really passing any configuration properties, meaning that the IO instance can't really be configured from a user's perspective

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought here was basically that we shouldn't be going down the Catalog path since we are essentially just trying to force the Hadoop FS api, We dont' actually want any Catalog config here, just things that re in the Spark Hadoop configuration (which is what would be used for every other streaming checkpointer).

We've lost the history here but I originally requested we just use the pure Hadoop Filesystem for this but I later came back and said we probably would save code by just using HadoopFileIo directly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok yeah thanks for clarifying and I think that should be fine then

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment above, but I'm not sure we're thinking about this the right way. What we really need is to ensure we expose that access is configured correctly for the checkpointing io. Just forcing HadoopFileIO feels like a bit of a hack because we're just trying to go down the hadoop config path to configure the access.

I don't love having properties like ...use-hadoop as a workaround for not being able to configure the checkpointer.

this.initialOffset = initialOffsetStore.initialOffset();
}

private static InitialOffsetStore createInitialOffsetStore(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this method is needed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved

| stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used |
| streaming-max-files-per-micro-batch | INT_MAX | Maximum number of files per microbatch |
| streaming-max-rows-per-micro-batch | INT_MAX | "Soft maximum" number of rows per microbatch; always includes all rows in next unprocessed file, excludes additional files if their inclusion would exceed the soft max limit |
| streaming-checkpoint-use-hadoop | false | Use Hadoop FileSystem for streaming checkpoint operations instead of the table's FileIO implementation |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned with the way we're approaching this. This PR seems to be focused on whether we're suing HadoopFileIO vs one coming from the table or a native FileIO implementation.

However, that's not the issue. The issue is that the IO performing checkpoints doesn't have access to the physical storage (e.g. using table io and trying write outside of the table location).

I would rather we focus on how do we define the checkpoint location and ensure that the IO is created in such a way that access is provided.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The end user defines the checkpoint with a property like

.option("checkpointLocation", "/path/to/checkpoint")

Which I think folks are comparing to other Spark writers where this is just some HDFS location.

I don't think most folks will consider that this is some "Iceberg" specific thing and actually has anything to do with our fileIO at all. As I mentioned to Eduard, I originally thought we should just have either "use fileio" or "use Hadoop filesystem" but in order to save code I thought it may be more efficient to just reuse HadoopFileIO.

While we could force users into a specific location or try to backwards engineer from this given path what we should be using, I think it's better to just allow the fallback .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More references since there is also a high level property I didn't know about

https://spark.apache.org/docs/latest/configuration.html

Streaming Checkpoint Configuration

Property Default Since
spark.sql.streaming.checkpointLocation (none) 2.0.0

But the basic picture here is, this is a path that we assume "Spark" with it's configuration can access via HDFS. The only reason I think we shouldn't always just swap to HadoopIO (or hadoopFS directly) is that we should avoid what may be a behavior change for existing users.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was that spark via the hadoop filesystem already provides lots of ways to configure storage implementations and creds for different locations. Plugging into that seemed the most natural and matched my expectation of how things should work for non-table data.

I'm open to other suggestions though, but having a hard time coming up with something concrete that doesn't seem super cumbersome. Did you have an API in mind?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we should drop the option entirely. Looking here

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala#L253-L255

There is basically no situation where TableIO is correct when a HadoopFS based access with the Spark Hadoop Conf won't work. Spark itself relies on that working so we can't even write (or read) to this directory unless Spark did so first using it's own HadoopFS

I'm in favor now of just dropping the fallback entirely and just always using a HadoopFileIO based on the spark hadoop conf (or using pure HadoopFS operations but I think that's probably still not necessary)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Spark Iceberg streaming - checkpoint leverages S3fileIo signer path instead of hadoop's S3AFileSystem

4 participants