Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def _update_partitions_map_from_manifest_entry(
partition_row = partitions_map[partition_record_key]

if snapshot is not None:
if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
if partition_row["last_updated_at"] is None or partition_row["last_updated_at"] < snapshot.timestamp_ms:
partition_row["last_updated_at"] = snapshot.timestamp_ms
partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id

Expand Down
29 changes: 28 additions & 1 deletion tests/table/test_inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
# under the License.

from pathlib import PosixPath
from typing import Any

import pyarrow as pa
import pytest

from pyiceberg.conversions import to_bytes
from pyiceberg.manifest import DataFile, DataFileContent
from pyiceberg.schema import Schema
from pyiceberg.table.inspect import _readable_bound
from pyiceberg.table.inspect import InspectTable, _readable_bound
from pyiceberg.table.snapshots import Snapshot
from pyiceberg.typedef import Record
from pyiceberg.types import NestedField, StringType
from tests.catalog.test_base import InMemoryCatalog

Expand Down Expand Up @@ -68,3 +72,26 @@ def test_inspect_entries_and_files_render_null_bound(catalog: InMemoryCatalog) -
files_metrics = tbl.inspect.files().to_pydict()["readable_metrics"][0]["s"]
assert files_metrics["lower_bound"] is None
assert files_metrics["upper_bound"] is None


@pytest.mark.parametrize("newest_first", [False, True])
def test_partitions_last_updated_uses_latest_snapshot_regardless_of_order(newest_first: bool) -> None:
# Manifest entries are visited in manifest order, which is not chronological, so the
# `partitions` metadata table must keep the snapshot with the highest commit timestamp
# per partition regardless of the order in which the entries are aggregated.
older = Snapshot(snapshot_id=6446744073709551000, timestamp_ms=1000, manifest_list="file:///dev/null")
newer = Snapshot(snapshot_id=8446744073709551111, timestamp_ms=5000, manifest_list="file:///dev/null")

def _data_file() -> DataFile:
file = DataFile.from_args(content=DataFileContent.DATA, record_count=1, file_size_in_bytes=1, partition=Record("a"))
file.spec_id = 0
return file
Comment on lines +85 to +88

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: A single shared instance works fine here:

    data_file = DataFile.from_args(content=DataFileContent.DATA, record_count=1, file_size_in_bytes=1, partition=Record("a"))
    data_file.spec_id = 0


inspect = InspectTable.__new__(InspectTable)
partitions_map: dict[tuple[str, Any], Any] = {}
for snapshot in [newer, older] if newest_first else [older, newer]:
inspect._update_partitions_map_from_manifest_entry(partitions_map, _data_file(), {"part": "a"}, snapshot)

(partition_row,) = partitions_map.values()
assert partition_row["last_updated_at"] == newer.timestamp_ms
assert partition_row["last_updated_snapshot_id"] == newer.snapshot_id