Core: update manifest delete file size after rewrite table action#15470
Core: update manifest delete file size after rewrite table action#15470mbutrovich wants to merge 14 commits into
Conversation
| "1.10.0": | ||
| org.apache.iceberg:iceberg-api: | ||
| - code: "java.class.defaultSerializationChanged" | ||
| old: "class org.apache.iceberg.encryption.EncryptingFileIO" | ||
| new: "class org.apache.iceberg.encryption.EncryptingFileIO" | ||
| justification: "New method for Manifest List reading" | ||
| org.apache.iceberg:iceberg-core: | ||
| - code: "java.class.noLongerInheritsFromClass" |
There was a problem hiding this comment.
Was this moved unintentionally?
There was a problem hiding this comment.
It's just what the ./gradlew revapi generated for me. As best as I can tell, this is what it did:
- The existing 1.10.0 block moved earlier in the file (alphabetical sort?)
- YAML indent changed from - code: (4-space) to - code: (2-space)
- Our 4 new entries added (java.element.noLongerDeprecated, two java.method.numberOfParametersChanged, one java.method.removed for rewriteDeleteManifest)
There was a problem hiding this comment.
Could you keep only the four new entries and revert the reordering/re-indentation of the existing 1.10.0 block? Manually adding the new entries into the existing block (keeping the original 4-space indent and ordering) would make the diff much smaller and easier to review.
There was a problem hiding this comment.
I just want to mention that we must avoid breaking the API here. Please follow https://iceberg.apache.org/contribute/#minor-version-deprecations-required to avoid breaking the API
277c0e2 to
8922c70
Compare
|
hmmm. In #15586 I've been working on skipping the HEAD call within s3afs and azure abfs client openFile() calls, and had been thinking that the manifest data would be best as it'd save another 100 mS/file on what is often a critical path. We need accurate manifests for that. |
| // manifest was not updated. Readers that use file_size_in_bytes to elide a stat() call may | ||
| // fail. | ||
| @TestTemplate | ||
| public void testDeleteFileSizeInBytesAfterRewrite() throws Exception { |
There was a problem hiding this comment.
are there equivalents of this for the other operations which add files to the manifests? if so they'd be handy being added for regression testing there too -though not in this PR
steveloughran
left a comment
There was a problem hiding this comment.
reviewed again. I think changes within RewriteTablePathUtil can eliminate the need for a HEAD call with a v3 puffin file.
| String stagingPath = stagingPath(file.location(), sourcePrefix, stagingLocation); | ||
| OutputFile outputFile = io.newOutputFile(stagingPath); | ||
| try { | ||
| rewritePositionDeleteFile( |
There was a problem hiding this comment.
If this could actually return the length of the file written (which would need changes in rewriteDVFile() to do the same then there'd be no need for the HEAD call when rewriting puffin files.
it'd probably be needed in v2 rewrites, but here the getLength call should be pushed into rewritePositionDeleteFile() which would only invoke it on the v2 codepath.
…ask, eliminating the separate `rewritePositionDeletes()` Spark job. Each manifest-writing task now also rewrites the delete files that manifest references, measures the actual size via `getLength()`, and records it in the manifest entry.
…ustification "rewriteDeleteManifest signature changed to accept PositionDeleteReaderWriter for inline position delete rewriting; rewritePositionDeleteFile now returns file size to avoid getLength HEAD call".
58725c3 to
78107a1
Compare
steveloughran
left a comment
There was a problem hiding this comment.
I like the change.
|
@mbutrovich as you're changing the signature of |
Sounds good, thanks @wypoon! @anuragmantri suggested I do the change in one version first to make it easier for reviewers (reduce the diff), get consensus on the approach, and then once folks are good with it I can generate the changes for the other versions. I'm happy to do that at any time. |
|
@mbutrovich it is true that having changes for multiple versions of Spark and Flink clutters up a PR and makes reviewing more difficult. However, you can at least put up the changes for one version of Flink and for Delta conversion, and then later for all the versions. |
|
@mbutrovich |
This is my first upstream contribution to Iceberg Java, so happy to do so if the community requires. Is there a process to propose that besides the mailing list? |
|
…nifest tasks to avoid FileAlreadyExistsException on the shared staging path.
|
I took another look. The PR looks good to me.
The process involves merging the PR into the main branch and then creating a PR against the 1.10.x branch. I don’t believe the 1.10.2 maintenance release has been released yet. If we can make it before that, we could include this one in the release. You can also reply to the thread if you’d like to @mbutrovich https://lists.apache.org/thread/5442q5bddyoss9mw27r0zbnobysfpn0t |
| // Another task in this Spark job already staged this file. Rewriting is deterministic, so | ||
| // its content (and therefore length) match what this task would have produced. | ||
| if (e.getCause() instanceof FileAlreadyExistsException) { | ||
| return io.newInputFile(stagingPath).getLength(); |
There was a problem hiding this comment.
@steveloughran flagged the HDFS in-progress-write issue earlier:
There's actually something bad with hdfs here where the length of in-progress-writes can underreport length...it's only after close() that everything syncs up.
I think the concern applies here too. If Task A is still writing when Task B reaches getLength(), B reads an in‑progress length on HDFS, then you will get the same class of bug this PR is fixing.
Which issue does this PR close?
Closes #12554.
Rationale for this change
rewriteTablePathrewrites position delete files (updating embedded data file paths), which changes their size. The manifest was written before the delete files were rewritten, sofile_size_in_bytesin the manifest reflected the original size. Readers that trust this field (Trino, Impala, Comet, iceberg-rust) fail with errors like "end of stream not reached."What changes are included in this PR?
Moves position delete file rewriting into the manifest-writing Spark task, eliminating the separate
rewritePositionDeletes()Spark job. Each manifest-writing task now also rewrites the delete files that manifest references, measures the actual size viawriter.length()(orgetLength()on the staged file when another task got there first), and records it in the manifest entry.This means:
file_size_in_bytesat manifest write time, no reconciliation neededTrade-off: if the same delete file appears in multiple manifests, it gets rewritten redundantly. The staging path is deterministic so the output is identical — wasted I/O, not incorrect. In practice this is rare.
How are these changes tested?
New test
testDeleteFileSizeInBytesAfterRewritecreates a table with position deletes using a deeply nested path (so the rewritten path differs in length), runsrewriteTablePath, copies the result, and asserts thatfile_size_in_bytesin the rewritten manifest matches the actual file size on disk.AI Usage
I am more familiar with the iceberg-rust codebase, so Claude helped me navigate the code, prototype a design, and draft the PR description (in DataFusion Comet's PR template). Claude also helped me with the API change failures in CI.