Add reshard support for JSON Lines and CSV file formats#8084
Add reshard support for JSON Lines and CSV file formats#8084muyihao wants to merge 4 commits intohuggingface:mainfrom
Conversation
|
@lhoestq , hi, lhoestq, could you please help review this PR? Thanks! |
|
The docs for this PR live here. All of your documentation changes will be reflected on that endpoint. The docs are available until 30 days after the last update. |
lhoestq
left a comment
There was a problem hiding this comment.
Cool ! I left a few comments. Mostly on CSV but this also applies to JSON since the logic is almost the same. Let me know what you think :)
| with open( | ||
| file, | ||
| "r", | ||
| encoding=self.config.encoding or "utf-8", | ||
| errors=self.config.encoding_errors or "strict", | ||
| ) as f: | ||
| # Read the file to count lines | ||
| line_count = 0 | ||
| while f.readline(): | ||
| line_count += 1 |
There was a problem hiding this comment.
this can be expensive on big files, maybe limit to the first 20MB to estimate the total number of lines ?
we could also assume that the files contain the same kind of data, so maybe there is no need to check every single file
There was a problem hiding this comment.
That's a great point. Iterating through massive files just for a line count is indeed a bottleneck.
| # No resharding - return the original gen_kwargs | ||
| for base_file, files_iterable in zip(base_files, files_iterables): | ||
| for file in files_iterable: | ||
| yield { | ||
| "base_files": [base_file], | ||
| "files_iterables": [[file]], | ||
| "shard_start_line": [0], | ||
| "shard_end_line": [None], | ||
| } | ||
| return |
There was a problem hiding this comment.
IIUC when num_shards is None then it should shard maximally.
In the case of the CSV loader, the minimum shard size is defined by self.config.chunksize which is 10_000 lines by default. It could make sense to define maximum sharding as aiming for self.config.chunksize lines per shard, WDYT ?
There was a problem hiding this comment.
Makes sense. I'll refactor this part to
| if shard_start_line > 0: | ||
| read_csv_kwargs["skiprows"] = shard_start_line |
There was a problem hiding this comment.
skiprows may lead some shard to require downloading a lot of unnecessary data to skip the rows, maybe you could skip bytes instead. Something along those lines maybe ?
if skip_bytes is not None:
with open(file, "rb") as f:
header = f.readline()
f.seek(skip_bytes)
lines = (f.read(n_bytes) + f.readline()).splitlines()
lines[0] = header
file = io.BytesIO("\n".join(lines))
csv_file_reader = pd.read_csv(file, iterator=True, dtype=dtype, **read_csv_kwargs)e.g. this dataset has multiple CSV files of 4GB each: https://huggingface.co/datasets/Koala-36M/Koala-36M-v1
| "base_files": [base_file], | ||
| "files_iterables": [[file]], | ||
| "shard_start_line": [0], | ||
| "shard_end_line": [None], |
There was a problem hiding this comment.
it's probably more practical to have them together like this:
| "base_files": [base_file], | |
| "files_iterables": [[file]], | |
| "shard_start_line": [0], | |
| "shard_end_line": [None], | |
| "base_files": [base_file], | |
| "files_iterables": [[(file, 0, None)]], |
this way later you can do
for shard_idx, files_iterable in enumerate(files_iterables):
for file in files_iterable:
shard_start_line, shard_end_line = None, None
if isinstance(file, tuple):
file, shard_start_line, shard_end_line = fileThere was a problem hiding this comment.
alternatively you could keep it this way, but in that case you need to zip() on files_iterables:
| "base_files": [base_file], | |
| "files_iterables": [[file]], | |
| "shard_start_line": [0], | |
| "shard_end_line": [None], | |
| "base_files": [base_file], | |
| "files_iterables": [[file]], | |
| "shard_start_lines": [0], | |
| "shard_end_lines": [None], |
shard_start_lines = shard_start_lines or [None] * len(base_files)
shard_end_lines = shard_start_lines or [None] * len(base_files)
for base_file, files_iterable, shard_start_line, shard_end_line in zip(base_files, files_iterables, shard_start_lines, shard_end_lines):
for file in files_iterable:| if num_shards is None: | ||
| # No resharding - return the original gen_kwargs | ||
| for base_file, files_iterable in zip(base_files, files_iterables): | ||
| for file in files_iterable: | ||
| yield { | ||
| "base_files": [base_file], | ||
| "files_iterables": [[file]], | ||
| "shard_start_line": [0], | ||
| "shard_end_line": [None], | ||
| } | ||
| return |
There was a problem hiding this comment.
when sharding maximally we could use the fact that self.config.chunksize is 10MB by default and aim for this maybe
(yes it has the same name as CSV but is in bytes, not lines. This collision is not ideal and I guess comes from the fact that pandas uses chunksize in lines for CSV and pyarrow uses chunksize in bytes for json)
@lhoestq Hi, lhoestq, I've made the updates accordingly, could you please take another look? 😊 |
|
Hi @lhoestq , just a gentle ping on this. I've re-verified the byte-offset logic for CSV/JSONL and confirmed it handles large files efficiently. Let me know if you have any concerns or would like me to adjust the implementation further. Looking forward to your feedback! |
This PR extends the IterableDataset.reshard() functionality to support JSON Lines and CSV file formats, building on the foundation laid in #7992.
Summary
Usage