Skip to content

feat: Add metadata-only replace API to Table for REPLACE snapshot operations#3131

Open
qzyu999 wants to merge 23 commits intoapache:mainfrom
qzyu999:feature/core-rewrite-api
Open

feat: Add metadata-only replace API to Table for REPLACE snapshot operations#3131
qzyu999 wants to merge 23 commits intoapache:mainfrom
qzyu999:feature/core-rewrite-api

Conversation

@qzyu999
Copy link
Copy Markdown

@qzyu999 qzyu999 commented Mar 9, 2026

Closes #3130

Rationale for this change

In a current PR (#3124, part of #1092), the proposed replace() API accepts a PyArrow dataframe (pa.Table), forcing the table engine to physically serialize data during a metadata transaction commit. This couples execution with the catalog, diverges from Java Iceberg's native RewriteFiles builder behavior, and fails to register under Operation.REPLACE.

This PR redesigns table.replace() and transaction.replace() to accept Iterable[DataFile] inputs. By externalizing physical data writing (e.g., compaction via Ray), the new explicit metadata-only _RewriteFiles SnapshotProducer can natively swap snapshot pointers in the manifests, perfectly inheriting ancestral sequence numbers for DELETED entries to ensure time-travel equivalence.

Are these changes tested?

Yes.

Fully exhaustive test coverage has been added to tests/table/test_replace.py. The suite validates:

  1. Context manager executions tracking valid history growth (len(table.history())).
  2. Snapshot summary bindings asserting strict Operation.REPLACE tags.
  3. Accurate evaluation of delta-metrics (added/deleted files and records tracking perfectly).
  4. Low-level serialization: Bypassed high-level discard filters on manifest.fetch_manifest_entry(discard_deleted=False) to natively assert that status=DELETED overrides are accurately preserving avro sequence numbers.
  5. Idempotent edge cases where replace([], []) successfully short-circuits the commit loop without mutating history.

Are there any user-facing changes?

Yes.

The method signature for Table.replace() and Transaction.replace() has been updated from the original PR #3124.
It no longer accepts a PyArrow DataFrame (df: pa.Table). Instead, it now requests two arguments:
files_to_delete: Iterable[DataFile] and files_to_add: Iterable[DataFile], following the convention seen in the Java implementation.

(Please add the changelog label)

AI Disclosure

AI was used to help understand the code base and draft code changes. All code changes have been thoroughly reviewed, ensuring that the code changes are in line with a broader understanding of the codebase.

  • Worth deeper review after AI-assistance:
  • The test_invalid_operation() in tests/table/test_snapshots.py previously used Operation.REPLACE as a value to test invalid operations, but with this change Operation.REPLACE becomes valid. In place I just put a dummy Operation.
  • The _RewriteFiles in pyiceberg/table/update/snapshot.py overrides the _deleted_entries and _existing_manifests functions. I sought to test this thoroughly that it was done correctly. I am thinking it's possible to improve the test suite to make this more rigorous. I am open to suggestions on how that could be done.

qzyu999 added 4 commits March 9, 2026 15:40
- Fixed positional argument type mismatch for `snapshot_properties` in [_RewriteFiles](iceberg-python/pyiceberg/table/update/snapshot.py)
- Added missing `Catalog` type annotations to pytest fixtures in [test_replace.py](iceberg-python/tests/table/test_replace.py)
- Added strict `is not None` assertions for `table.current_snapshot()` to satisfy mypy Optional checking
- Auto-formatted tests with ruff
…ass enum validation (Operation.REPLACE is valid so we can no longer use it in test_invalid_operation)
Copy link
Copy Markdown
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.

There are a couple of things we should check (and add as test cases):

Scanning Manifests

  • Every file marked for replacement was found in at least one manifest — if any are missing, abort
  • Matched entries are rewritten as DELETED with the new snapshot ID
  • Unmatched entries are carried over as EXISTING
  • Manifests with no affected files are reused unchanged

New Manifest

  • All incoming replacement files are present with status ADDED
  • The new snapshot ID is applied to every entry

Manifest List

  • Includes the new ADDED manifest
  • Includes all rewritten manifests with DELETED entries
  • Includes all unchanged manifests
  • Includes any pre-existing delete manifests, passed through as-is

Invariant Check

  • Records added ≤ records removed
  • If the difference is due to prior soft-deletes, confirm those delete files account for it
  • Records added never exceed records removed — if they do, the operation is invalid

Snapshot

  • Has a unique snapshot ID
  • Parent points to the previous snapshot
  • Sequence number is exactly previous + 1
  • Operation type is set to "replace"
  • Manifest list path is correct
  • Summary counts (files and records) are accurate

Comment thread pyiceberg/table/__init__.py Outdated
"""
return UpdateStatistics(transaction=self)

def replace(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not expose replace as a public function as we cannot guarantee that the files_to_delete and files_to_add contains the same records.

I think we should start at _RewriteFiles

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu, I think that logic makes sense, as it would be dangerous for users to use these without being able to enforce the underlying expectations of the values input to these functions. Removed them in 94bd87e

Comment thread tests/table/test_replace.py Outdated
Comment on lines +84 to +93
assert len(manifest_files) == 2 # One for ADDED, one for DELETED

# Check that sequence numbers were handled properly natively by verifying the manifest contents
entries = []
for manifest in manifest_files:
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
entries.append(entry)

# One entry for ADDED (new file), one for DELETED (old file)
assert len(entries) == 2
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to test a bit more on the status of each file.

https://github.com/apache/iceberg/blob/main/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java is a good reference

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu, I've addressed this in the latest 33aaef0, where the status of each file is tested.

@qzyu999
Copy link
Copy Markdown
Author

qzyu999 commented Mar 28, 2026

Hi @kevinjqliu, apologies for the delay, thank you so much for taking the time to review the PR again, I understand that you are quite busy. I've addressed all your points in the latest set of tests within 33aaef0. I've thoroughly expanded the tests to integrate those requirements across a broad set of tests.

There are two minor issues I noticed however:

  • Requirement: If the difference is due to prior soft-deletes, confirm those delete files account for it
    • This would require however that the _RewriteFiles be scoped to handle Delete Manifests, but currently it's only set to handle Data Files. Handling Delete Manifests would make it so that we could potentially do REPLACE operations on deleted files. For example the purpose of this PR is to allow for compaction of data files, but we could in theory also compact delete files for the use case that someone has run many delete operations on many small files.
    • I think this is definitely something to work on, but perhaps not in this PR. The Java code seems to handle this well. I am thinking that after we merge this REPLACE, we can next work on the data compaction issue. Then after that we can come back to work on _RewriteFiles for Delete Manifests and work on metadata compaction afterwards.
  • Another more minor issue I noticed is that from running the tests and doing fast_append() on files that are DataFileContent.POSITION_DELETES, they're not yet being labeled properly as ManifestContent.DELETES. IIUC this is due to the fact that _SnapshotProducer._manifests() (which fast_append relies on under the hood) currently defaults to creating standard ManifestContent.DATA writers. It doesn't yet inspect the incoming file's content type to route POSITION_DELETES into a dedicated ManifestContent.DELETES writer. I worked around this in my test by scanning the manifest entries directly rather than relying on the manifest's label, but I just wanted to flag it for the roadmap for when we build out full Merge-on-Read write support.

Copy link
Copy Markdown
Member

@geruh geruh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @qzyu999, thanks for raising. I think the direction is right here. I just have a few design questions before going more indepth on the implementation details.

Comment thread pyiceberg/table/update/snapshot.py Outdated
return []

def _existing_manifests(self) -> list[ManifestFile]:
"""To determine if there are any existing manifests."""
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic looks nearly identical to the OverwriteFiles._existing_manifests(), and probably can use some clean python oop with the different snapshot classes. So let's create a helper in the _SnapshotProducer class. Then Overwrite can add it's additional logic on top and rewrite can just use it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, thank you for the feedback! I agree with this idea that the two sets of code are highly similar, in b0a770c I create a _get_existing_manifests function in _SnapshotProducer that is reused in both _OverwriteFiles and _RewriteFiles.


def _commit(self) -> UpdatesAndRequirements:
# Only produce a commit when there is something to rewrite
if self._deleted_data_files or self._added_data_files:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can replicate the _DeleteFiles logic here by using the @cache_property on _the compute deletes function. Especially since _commit() calls self._deleted_entries() for validation and then calls the super commit to write and get delete entries.

def _compute_deletes(self) -> tuple[list[ManifestFile], list[ManifestEntry], bool]:

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, great suggestion, I've applied the changes in c8162a8.

)

def replace(self) -> _RewriteFiles:
return _RewriteFiles(
Copy link
Copy Markdown
Member

@geruh geruh Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sort of confused by the naming since we are introducing a user facing API replace but the underlying snapshot operation is a rewrite? We should rename to rewrite() for consistency? Unless I'm missing something?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, you bring up a good point, and it's something I noticed seemed off along the way. The reason why we have this discrepancy is because we're mirroring what's found in the Java code itself.

I named the Python API replace() to accurately reflect the Operation.REPLACE snapshot string it generates, while keeping the internal class named _RewriteFiles to match the Java builder logic.

That said, if you feel strongly about matching the Java API's user-facing method (rewrite()) rather than the snapshot operation (replace()), I'm happy to rename the public method to rewrite() for consistency. Let me know what you prefer!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there is a bit of a distinction here, since rewrite is basically the rewrite of data files and replace is the logical change to your snapshot metadata. My thinking is that the users in java today are used to interacting with this api through:

table.newRewrite()
    .deleteFile(old)
    .addFile(new)
    .commit();

So someone coming from Java Iceberg will look for rewrite, not replace. But ultimately maybe there is more of a history as to why the it follows this naming convention im missing on.

WDYT @kevinjqliu?

manifests_after = snapshot_after.manifests(table.io)
manifest_paths_after = [m.manifest_path for m in manifests_after]

assert delete_manifest_path in manifest_paths_after
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also test to ensure that original sequence numbers are carried over from before the rewrite.

Copy link
Copy Markdown
Author

@qzyu999 qzyu999 Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, great idea, I've added a test to check the sequence numbers in d939b67.

from pyiceberg.typedef import Record


def test_replace_internally(catalog: Catalog) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests are on the right track can we add a few more like:

  • test with multiple files for set matching rows
  • test against partitioned table
  • did we do a no-op replace to ensure we have correct state matching java impl

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, these are all great test ideas, I've added them in d939b67.

added_records = sum(f.record_count for f in self._added_data_files)
deleted_records = sum(entry.data_file.record_count for entry in deleted_entries)

if added_records > deleted_records:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are you seeing this invariant? I mean this seems correct since the spec says rewrite must be "logically equivalent". This check could reasonable as a safety guard, but what happens when delete file rewriting is added? Then these numbers could be incorrect.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, thanks for flagging this, you're right that this is a safety guard, but it doesn't yet factor in future changes when adding delete file rewriting. Should we add something like this?

# Note: This physical record count invariant is a sanity guard for data file 
# compaction to ensure no data is accidentally duplicated or invented. 
# TODO: This will need to be evolved into a logical record count validation 
# once PyIceberg supports rewriting delete files (Merge-on-Read).
added_records = sum(f.record_count for f in self._added_data_files)
deleted_records = sum(entry.data_file.record_count for entry in deleted_entries)

if added_records > deleted_records:
    raise ValueError(f"Invalid replace: records added ({added_records}) exceeds records removed ({deleted_records})")

This logical record count validation would involve something like having the _commit method to do the following, which the codebase currently cannot do:

  • Identify associated Delete Files: For every DataFile you are deleting, you would need to find every Position Delete or Equality Delete file that points to it.
  • Calculate the "Subtraction": You would need to subtract those delete row counts from the physical record_count of the old files to find the Old Logical Count.
  • Compare: You would then verify that Old Logical Count == New Logical Count.
    The current _RewriteFiles implementation is "blind" to deletes. It only tracks _added_data_files and _deleted_data_files.
    I believe this can be part of a full MoR implementation, something that I would love to work on after finishing these maintenance tasks.

Otherwise, I can also remove it from _RerwriteFiles and move forward, WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay let's keep the check I took a deeper look into the snapshot producer on the java side so let's align closer to that:

https://github.com/apache/iceberg/blob/dde712ec9ed6c9d28183ee4615d50f97b246af5d/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L322-L334

@qzyu999
Copy link
Copy Markdown
Author

qzyu999 commented Apr 16, 2026

Hi @geruh, thanks for the awesome feedback, I've responded to each of your replies, PTAL.

def replace(self) -> _RewriteFiles:
return _RewriteFiles(
operation=Operation.REPLACE,
transaction=self._transaction,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that branch is missing here is there a reason for that?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, thanks for noticing, this was an oversight. The changes have been made in c3570d8 and a test has been added for this.

Comment thread pyiceberg/table/update/snapshot.py Outdated
"""A snapshot producer that rewrites data files."""

def __init__(self, operation: Operation, transaction: Transaction, io: FileIO, snapshot_properties: dict[str, str]):
super().__init__(operation, transaction, io, snapshot_properties=snapshot_properties)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can remove the constructor here

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, the changes have been made in c3570d8 as part of adding branch as an arg for _RewriteFiles.

Comment thread tests/table/test_replace.py Outdated
)

# 1. File we will delete
file_to_delete = DataFile.from_args(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extract this into a helper or see if one exists already.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, great idea, this has been implemented in 9681ec3.

with pytest.raises(ValueError) as e:
update_snapshot_summaries(summary=Summary(Operation.REPLACE))
assert "Operation not implemented: Operation.REPLACE" in str(e.value)
update_snapshot_summaries(summary=Summary.model_construct(operation="unknown_operation"))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense if we have support :)

Comment thread pyiceberg/table/update/snapshot.py Outdated
if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch):
for manifest_file in snapshot.manifests(io=self._io):
# Skip pruning for rewrite operations unless we want to optimize later
if self._operation == Operation.OVERWRITE and not manifest_evaluators[manifest_file.partition_spec_id](
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too keen on the child classes information leaking into the base here. How about we use an argument like should_use_manifest_pruning or something along those lines.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, great catch, I agree that the base class shouldn't be coupled to specific child operations. I've refactored _get_existing_manifests to accept a should_use_manifest_pruning flag so the child classes can explicitly opt-in. The updates are in 8f1f9b9.

Comment on lines +727 to +733
def _deleted_entries(self) -> list[ManifestEntry]:
"""Check if we need to mark the files as deleted."""
return self._cached_deleted_entries

def _existing_manifests(self) -> list[ManifestFile]:
"""To determine if there are any existing manifests."""
return self._get_existing_manifests()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Looks like these doc strings were copy pasta'd over from the other classes, and don't fit how they are used here. Either we can remove them or change to fit their usage.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, thanks for pointing this out, these doc strings in _RewriteFiles have been removed in c60d5ad.

)

def replace(self) -> _RewriteFiles:
return _RewriteFiles(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there is a bit of a distinction here, since rewrite is basically the rewrite of data files and replace is the logical change to your snapshot metadata. My thinking is that the users in java today are used to interacting with this api through:

table.newRewrite()
    .deleteFile(old)
    .addFile(new)
    .commit();

So someone coming from Java Iceberg will look for rewrite, not replace. But ultimately maybe there is more of a history as to why the it follows this naming convention im missing on.

WDYT @kevinjqliu?

@qzyu999
Copy link
Copy Markdown
Author

qzyu999 commented Apr 16, 2026

Hi @geruh, thanks again for the helpful feedback, I've responded to each of your review comments and updated the code accordingly, with the exception on the note about naming convention for rewrite/replace as that is pending a response from @kevinjqliu, PTAL.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature: Add metadata-only replace API to Table for REPLACE snapshot operations

3 participants