Add local pHash deduplication utility#462
Conversation
|
@greptile-apps review this pr |
|
@greptileai review |
There was a problem hiding this comment.
(meant to click approve)
Wowza what an optimization. Took me a while to understand the logic behind it. Only left some small nit comments.
I'd suggest adding a comment like my explanation below within the code to help future readers.
Let me know if my understanding is correct (had ai format the wording):
We can safely skip most images without comparing against them, thanks to the pigeonhole principle. Chunks are defined by bit position, so we always compare a new pHash against a kept one chunk-by-chunk at the same positions — chunk 0 against chunk 0, chunk 1 against chunk 1, and so on. The principle applies to the positions where two hashes disagree: if two images are true duplicates, they differ in at most 10 of the 64 bits. Because the 6 chunks don't overlap and together cover all 64 bits, the total difference is just the sum of the per-chunk differences. So if every chunk differed by 2 or more bits, the total would be at least 2 × 6 = 12 — but a duplicate has at most 10 differences, and you can't have ≤10 and ≥12 at once. That contradiction forces at least one chunk down to at most floor(10/6) = 1 differing bit. In other words, for any real duplicate, there is guaranteed to be at least one chunk where the two hashes are within 1 bit of each other.
So we index every kept image by all 6 of its chunk values, and for each new pHash we split it into 6 chunks and probe all of them — each at its exact value plus every value within chunk_radius = 1 bit (the variant masks). We check all 6 because pigeonhole guarantees some chunk is close but not which one. Any kept image found in those buckets is a candidate, which we then confirm with an exact Hamming-distance check on the full hash (sharing one near-chunk is necessary but not sufficient, since the other chunks could still differ a lot). If none of the 6 chunks produces a match, the image is guaranteed not to be a duplicate — because a real duplicate would have been forced to leave at least one near-matching chunk behind.
The chunk lookups are dictionary (hash-map) lookups, which jump directly to the drawer for a given chunk value without scanning the other drawers. So for each new pHash we do a fixed number of cheap lookups (6 chunks × the variant masks, a constant that doesn't grow with the dataset), and those lookups hand us only the small set of kept items that share a near-chunk — the candidates. We run the expensive full-hash Hamming check only on those candidates, never on the rest. Every kept item filed under unrelated chunk values is never touched at all. Naive dedup is O(n) comparisons per new item (compare against all n kept items), so the whole run is O(n²); the index replaces that per-item scan with a handful of constant-time lookups plus a few exact checks on candidates, so in practice each item costs roughly O(1) instead of O(n) — turning an O(n²) algorithm into something close to O(n). The pigeonhole guarantee is what makes this safe: since a real duplicate is forced to share a near-chunk, the items we skip are provably not duplicates, so ignoring them costs us no correctness.
| ) | ||
|
|
||
|
|
||
| def _validate_threshold(threshold: int) -> None: |
There was a problem hiding this comment.
is this validation not also happening in the original dedup sdk path? either way, can it be shared between both?
There was a problem hiding this comment.
I left this local-only for now. The existing async Dataset.deduplicate() path relies on backend/job validation today, and the existing integration tests expect NucleusAPIError/JobError for invalid thresholds. Adding shared client-side validation there would change that public error behavior. This local helper needs the upfront check because it does not make an API call and should fail before doing CPU-heavy local work.
| phash = dataset_item.phash | ||
| if phash is None or not PHASH_REGEX.fullmatch(phash): | ||
| ref_id = dataset_item.reference_id | ||
| ref_msg = f" with reference_id {ref_id!r}" if ref_id else "" |
There was a problem hiding this comment.
nit: would it be useful to include the dataset_item_id as well?
There was a problem hiding this comment.
I did not add this because the supported local input shape only exposes a DatasetItem, and the SDK currently discards the backend item ID when it builds DatasetItem objects from export/API payloads. reference_id is the only reliable identifier available here; adding a best-effort dataset_item_id field would usually be absent for the exact generator rows this utility is intended to consume.
| if not records: | ||
| return LocalDeduplicationResult( | ||
| unique=[], | ||
| unique_dataset_items=[], | ||
| unique_reference_ids=[], | ||
| stats=DeduplicationStats( | ||
| threshold=threshold, | ||
| original_count=0, | ||
| deduplicated_count=0, | ||
| ), | ||
| ) |
There was a problem hiding this comment.
nbd but might be slightly more efficient to place this above the sort on 179? if i understand correctly the sort won't drop any records
There was a problem hiding this comment.
Done, the empty-input return now happens immediately after normalization and before the sort.
| for _ in range(PARTITION_COUNT) | ||
| ] | ||
|
|
||
| def add(self, phash_value: int) -> None: |
There was a problem hiding this comment.
nit: should this also be _add
| index = self._indexes[partition_index][chunk_index] | ||
| index.setdefault(chunk_value, []).append(kept_index) | ||
|
|
||
| def find_duplicate_index(self, phash_value: int) -> Optional[int]: |
|
@edwinpav I think your explanation is pretty good. To be pedantic, it's not necessarily most and the runtime stuff is handwavy and really can only be validated empirically. Some other key points: |
My understanding was that the bit rotation is because since the floor(10/n) = 1, we need to rotate each candidate chunk by 1 bit in each position to check if its 1 bit difference but any stores ones in that chunk. Or is it also/just for something else? |
|
also @vinay553 if u follow this and run |
|
@edwinpav ah I wouldn't call that rotating, that's just applying a mask. rotation is when you rotate the bits like 1234 -> 2341. The reason is that any potential duplicate candidate would need to pass the chunk test both times before we do the full 64-bit check, reducing then number of candidates. |
I see, why is the rotation safe? ie. we guaranteed that it wont be a duplicate if they dont match? |
We can talk abt the algo in our 1:1 |
Summary
Adds a local pHash-based deduplication utility for Nucleus SDK users who already have a large list of
DatasetItemobjects or rows fromitems_and_annotation_generator().nucleus.deduplicate_by_phash(...)andLocalDeduplicationResult.DatasetItemobjects or generator rows with an"item"DatasetItem.0..64.threshold == 0, a single-item fast path forthreshold == 64, an exact chunked Hamming-neighbor index for1 <= threshold <= 10, and a linear exact fallback for thresholds above 10.DeduplicationStats.Writeup: Local pHash Deduplication
Baseline Problem
The logical algorithm is greedy deduplication. Sort rows deterministically, then keep a candidate only if no previously kept pHash is within the requested Hamming distance threshold.
This is exact but effectively quadratic for mostly distinct inputs, because each new candidate may scan every pHash kept so far.
Technique 1: Fail Fast and Normalize Once
Before the dedup pass starts, each input is normalized into an internal record with the original object, its
DatasetItem, a stable tie-break ID, and the pHash parsed as a 64-bit integer. Threshold validation and pHash validation happen up front, so a long local run fails before doing expensive work if any pHash is missing or malformed.Technique 2: Integer pHashes and Integer Hamming Distance
Backend pHashes arrive as 64-character binary strings. The utility parses each pHash once with
int(phash, 2), then computes Hamming distance with:This avoids per-character string comparison and enables sorting by
(phash_value, stable_id).Technique 3: Threshold-Specific Fast Paths
threshold == 0threshold == 641 <= threshold <= 10threshold > 10Technique 4: Chunking With the Pigeonhole Principle
The accelerated path splits each 64-bit pHash into six chunks:
For threshold
t, the per-chunk radius isfloor(t / 6). If two pHashes have total Hamming distance at mostt, at least one of the six chunks must be within that per-chunk radius. Otherwise every chunk would exceed the radius, forcing the total Hamming distance abovet.For
threshold=10, the per-chunk radius is1. One partition performs4 * 12 + 2 * 11 = 70dictionary lookups: each 11-bit chunk checks the exact value plus 11 one-bit variants, and each 10-bit chunk checks the exact value plus 10 one-bit variants.Technique 5: Two Partitions With Cheap Rotation
A single chunk partition can produce many false candidates. The utility uses two exact partitions:
The rotation is cheap:
The pigeonhole argument applies independently to both partitions, so true duplicates cannot be missed. A candidate must be found by both partitions before the exact 64-bit Hamming check runs.
Technique 6: Reusable Candidate Marks Instead of Set Intersections
The permanent state remains simple:
kept_hashesstores pHashes that survived deduplication, and chunk dictionaries point chunk values back to positions inkept_hashes. Those positions are called kept indexes.Conceptually, each query intersects two candidate sets:
The first implementation literally allocated Python
set()objects and intersected them per candidate. The current implementation uses a reusablebytearraywith one mark byte per kept pHash:After each query, only touched indexes are cleared. This computes the same intersection while avoiding large temporary set allocations.
Real Dataset Profile
I profiled this on an internal real production dataset with
278,587rows. Because this repository is public, the dataset identifier and name are intentionally omitted from the PR body. The API key was not included in the report or the PR.The profiler exported rows with
items_and_annotation_generator(), held the exported rows in memory, and then timed onlydeduplicate_by_phash(...)so export/API time and local dedup CPU time are separated.Local dedup timing only:
The practical read: the expensive exact local dedup case took about
6m 36.6sfor278,587real rows. If a user exports the dataset and runs threshold 10, the observed end-to-end time for this dataset would be roughly127.04s + 396.58s = 523.62s, or about8m 43.6s.That is a reasonable runtime for an offline local utility, especially because the implementation is deterministic, exact, requires no server-side job, and validates pHashes before starting the expensive pass.
Validation
NUCLEUS_PYTEST_API_KEY=dummy .venv/bin/python -m pytest tests/test_local_deduplication.py -q.venv.Focused tests cover row preservation,
DatasetIteminputs, threshold 0, threshold 64, invalid thresholds, malformed pHashes, unsupported input shapes, custom sort keys, and parity against a simple linear baseline for thresholds 1, 5, and 10.Greptile Summary
This PR adds a fully offline
deduplicate_by_phash()utility andLocalDeduplicationResultto the Nucleus SDK. It requires no API key, acceptsDatasetItemobjects or generator rows, and dispatches to an exact-match path (threshold 0), a two-partition chunked Hamming index (thresholds 1–10), a linear greedy fallback (thresholds 11–63), or a keep-one path (threshold 64).nucleus/local_deduplication.py: New module implementing the full pipeline — pHash validation, stable-id tie-breaking via_tie_break_key, the_HammingIndexwith pigeonhole-based candidate filtering and reusablebytearraymarks, and the three dedup paths.tests/test_local_deduplication.py: Covers all dispatch paths, custom/integer/None sort keys, ordinal fallback, missing reference IDs, and parity against a linear baseline for thresholds 1, 5, 10, 11, and 20.nucleus/__init__.py: Re-exportsdeduplicate_by_phashandLocalDeduplicationResultfrom the public namespace.Confidence Score: 5/5
Safe to merge — the new utility is fully offline and additive, with no changes to existing API paths or data models.
The algorithm is provably correct: the pigeonhole argument holds for both partitions independently, the mark-based candidate intersection correctly resets state after every query, and all three dedup paths produce results consistent with the linear baseline. The previous issues around string-encoded sort keys and empty-string reference IDs are both resolved. The type annotation for sort_key's return type is the only gap.
No files require special attention.
Important Files Changed
Flowchart
%%{init: {'theme': 'neutral'}}%% flowchart TD A[deduplicate_by_phash] --> B[_validate_threshold] B --> C[_normalize_records] C --> D{empty?} D -- yes --> E[return empty result] D -- no --> F[sort by phash_value, stable_id] F --> G{threshold?} G -- 0 --> H[_deduplicate_exact] G -- 64 --> I[keep records 0] G -- 1..10 --> J[_deduplicate_with_index] G -- 11..63 --> K[_deduplicate_with_linear_scan] H --> L[LocalDeduplicationResult] I --> L J --> L K --> LPrompt To Fix All With AI
Reviews (8): Last reviewed commit: "Run pre-commit formatting" | Re-trigger Greptile