Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
24faeb7
bump to 0.11.1dev to continue developing (#971)
mfeurer Oct 25, 2020
e84cdf9
update home page example to numerical dataset (pendigits) (#976)
a-moadel Oct 26, 2020
07e87ad
Speed up tests (#977)
PGijsbers Oct 29, 2020
4923e5b
Additional fixes to PR 777 (#967)
Neeratyoy Oct 29, 2020
f2af798
Improving the performance of check_datasets_active (#980)
ArlindKadra Oct 29, 2020
756e747
Add CI through Github Actions (#975)
PGijsbers Oct 29, 2020
3132dac
add validation for ignore_attributes and default_target_attribute at …
a-moadel Oct 29, 2020
6afc880
Updated the way 'image features' are stored, updated old unit tests, …
ArlindKadra Oct 29, 2020
5b6de8a
Retry on database error to reduce number of test failures (#984)
mfeurer Oct 30, 2020
63ec0ae
Transition other Travis jobs to Github Actions (#988)
PGijsbers Nov 2, 2020
9a3a6dd
update progress file (#991)
a-moadel Nov 2, 2020
81cc423
docs: add a-moadel as a contributor (#992)
allcontributors[bot] Nov 2, 2020
51eaff6
docs: add Neeratyoy as a contributor (#998)
allcontributors[bot] Nov 2, 2020
a629562
Improve unit tests (#985)
mfeurer Nov 3, 2020
accde88
Warning if fitted sklearn model being used (#989)
Neeratyoy Nov 3, 2020
560e952
Cache dataset features and qualities as pickle (#979)
mfeurer Nov 3, 2020
5d5a48e
Update string formatting (#1001)
PGijsbers Nov 17, 2020
16799ad
Specify encoding for README file (#1004)
PGijsbers Nov 18, 2020
fba6aab
Making some unit tests work (#1000)
Neeratyoy Dec 24, 2020
e074c14
Refactor data loading/storing (#1018)
PGijsbers Jan 19, 2021
ab793a6
Adding helper functions to support ColumnTransformer (#982)
Neeratyoy Jan 28, 2021
47cda65
Rework local openml directory (#987)
mfeurer Feb 10, 2021
80ae046
Feature/give possibility to not download the dataset qualities (#1017)
a-moadel Feb 11, 2021
d2945ba
Adding sklearn 0.24 support (#1016)
Neeratyoy Feb 11, 2021
3c680c1
improve path detection (#1021)
mfeurer Feb 12, 2021
7553281
Removing flaky decorator for study unit test (#1024)
Neeratyoy Feb 16, 2021
ff7a251
Adding sklearn min. dependencies for all versions (#1022)
Neeratyoy Feb 18, 2021
4ff66ed
Parallel evaluation of tasks (#1020)
Neeratyoy Feb 18, 2021
38f9bf0
Parquet Support (#1029)
PGijsbers Mar 4, 2021
6c609b8
API for topics (#1023)
sahithyaravi Mar 9, 2021
4aec00a
Remove nan-likes from category header (#1037)
PGijsbers Mar 12, 2021
f94672e
Measuring runtimes (#1031)
Neeratyoy Mar 12, 2021
bd8ae14
Fix 1013: Store run `setup_string` (#1015)
PGijsbers Mar 25, 2021
11e6235
Fix #1033: skip two unit tests on Windows (#1040)
mfeurer Mar 26, 2021
d9037e7
bump version for new release (#1041)
mfeurer Mar 29, 2021
5511fa0
fix loky/concurrency issue (#1042)
mfeurer Mar 30, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Parquet Support (#1029)
* Store the minio_url from description xml

* Add minio dependency

* Add call for downloading file from minio bucket

* Allow objects to be located in directories

* add parquet equivalent of _get_dataset_arff

* Store parquet alongside arff, if available

* Deal with unknown buckets, fix path expectation

* Update test to reflect parquet file is downloaded

* Download parquet file through lazy loading

i.e. if the dataset was initially retrieved with download_data=False,
make sure to download the dataset on first get_data call.

* Load data from parquet if available

* Update (doc) strings

* Cast to signify url is str

* Make cache file path generation extension agnostic

Fixes a bug where the parquet files would simply be overwritten.
Also now only save the local files to members only if they actually
exist.

* Remove return argument

* Add clear test messages, update minio urls

* Debugging on CI with print

* Add pyarrow dependency for loading parquet

* Remove print
  • Loading branch information
PGijsbers authored Mar 4, 2021
commit 38f9bf001d22cbd3e79a990c069c8a6a9b7af4f5
45 changes: 44 additions & 1 deletion openml/_api_calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import time
import hashlib
import logging
import pathlib
import requests
import urllib.parse
import xml
import xmltodict
from typing import Dict, Optional
from typing import Dict, Optional, Union

import minio

from . import config
from .exceptions import (
Expand Down Expand Up @@ -68,6 +72,45 @@ def _perform_api_call(call, request_method, data=None, file_elements=None):
return response.text


def _download_minio_file(
source: str, destination: Union[str, pathlib.Path], exists_ok: bool = True,
) -> None:
""" Download file ``source`` from a MinIO Bucket and store it at ``destination``.

Parameters
----------
source : Union[str, pathlib.Path]
URL to a file in a MinIO bucket.
destination : str
Path to store the file to, if a directory is provided the original filename is used.
exists_ok : bool, optional (default=True)
If False, raise FileExists if a file already exists in ``destination``.

"""
destination = pathlib.Path(destination)
parsed_url = urllib.parse.urlparse(source)

# expect path format: /BUCKET/path/to/file.ext
bucket, object_name = parsed_url.path[1:].split("/", maxsplit=1)
if destination.is_dir():
destination = pathlib.Path(destination, object_name)
if destination.is_file() and not exists_ok:
raise FileExistsError(f"File already exists in {destination}.")

client = minio.Minio(endpoint=parsed_url.netloc, secure=False)

try:
client.fget_object(
bucket_name=bucket, object_name=object_name, file_path=str(destination),
)
except minio.error.S3Error as e:
if e.message.startswith("Object does not exist"):
raise FileNotFoundError(f"Object at '{source}' does not exist.") from e
# e.g. permission error, or a bucket does not exist (which is also interpreted as a
# permission error on minio level).
raise FileNotFoundError("Bucket does not exist or is private.") from e


def _download_text_file(
source: str,
output_path: Optional[str] = None,
Expand Down
58 changes: 44 additions & 14 deletions openml/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ class OpenMLDataset(OpenMLBase):
which maps a quality name to a quality value.
dataset: string, optional
Serialized arff dataset string.
minio_url: string, optional
URL to the MinIO bucket with dataset files
parquet_file: string, optional
Path to the local parquet file.
"""

def __init__(
Expand Down Expand Up @@ -128,6 +132,8 @@ def __init__(
features_file: Optional[str] = None,
qualities_file: Optional[str] = None,
dataset=None,
minio_url: Optional[str] = None,
parquet_file: Optional[str] = None,
):
def find_invalid_characters(string, pattern):
invalid_chars = set()
Expand Down Expand Up @@ -202,7 +208,9 @@ def find_invalid_characters(string, pattern):
self.update_comment = update_comment
self.md5_checksum = md5_checksum
self.data_file = data_file
self.parquet_file = parquet_file
self._dataset = dataset
self._minio_url = minio_url

if features_file is not None:
self.features = _read_features(
Expand Down Expand Up @@ -291,9 +299,11 @@ def __eq__(self, other):
def _download_data(self) -> None:
""" Download ARFF data file to standard cache directory. Set `self.data_file`. """
# import required here to avoid circular import.
from .functions import _get_dataset_arff
from .functions import _get_dataset_arff, _get_dataset_parquet

self.data_file = _get_dataset_arff(self)
if self._minio_url is not None:
self.parquet_file = _get_dataset_parquet(self)

def _get_arff(self, format: str) -> Dict:
"""Read ARFF file and return decoded arff.
Expand Down Expand Up @@ -454,22 +464,38 @@ def _parse_data_from_arff(
return X, categorical, attribute_names

def _compressed_cache_file_paths(self, data_file: str) -> Tuple[str, str, str]:
data_pickle_file = data_file.replace(".arff", ".pkl.py3")
data_feather_file = data_file.replace(".arff", ".feather")
feather_attribute_file = data_file.replace(".arff", ".feather.attributes.pkl.py3")
ext = f".{data_file.split('.')[-1]}"
data_pickle_file = data_file.replace(ext, ".pkl.py3")
data_feather_file = data_file.replace(ext, ".feather")
feather_attribute_file = data_file.replace(ext, ".feather.attributes.pkl.py3")
return data_pickle_file, data_feather_file, feather_attribute_file

def _cache_compressed_file_from_arff(
self, arff_file: str
def _cache_compressed_file_from_file(
self, data_file: str
) -> Tuple[Union[pd.DataFrame, scipy.sparse.csr_matrix], List[bool], List[str]]:
""" Store data from the arff file in compressed format. Sets cache_format to 'pickle' if data is sparse. """ # noqa: 501
""" Store data from the local file in compressed format.

If a local parquet file is present it will be used instead of the arff file.
Sets cache_format to 'pickle' if data is sparse.
"""
(
data_pickle_file,
data_feather_file,
feather_attribute_file,
) = self._compressed_cache_file_paths(arff_file)
) = self._compressed_cache_file_paths(data_file)

data, categorical, attribute_names = self._parse_data_from_arff(arff_file)
if data_file.endswith(".arff"):
data, categorical, attribute_names = self._parse_data_from_arff(data_file)
elif data_file.endswith(".pq"):
try:
data = pd.read_parquet(data_file)
except Exception as e:
raise Exception(f"File: {data_file}") from e

categorical = [data[c].dtype.name == "category" for c in data.columns]
attribute_names = list(data.columns)
else:
raise ValueError(f"Unknown file type for file '{data_file}'.")

# Feather format does not work for sparse datasets, so we use pickle for sparse datasets
if scipy.sparse.issparse(data):
Expand All @@ -480,12 +506,16 @@ def _cache_compressed_file_from_arff(
data.to_feather(data_feather_file)
with open(feather_attribute_file, "wb") as fh:
pickle.dump((categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)
self.data_feather_file = data_feather_file
self.feather_attribute_file = feather_attribute_file
else:
with open(data_pickle_file, "wb") as fh:
pickle.dump((data, categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)
self.data_pickle_file = data_pickle_file

data_file = data_pickle_file if self.cache_format == "pickle" else data_feather_file
logger.debug(f"Saved dataset {int(self.dataset_id or -1)}: {self.name} to file {data_file}")

return data, categorical, attribute_names

def _load_data(self):
Expand All @@ -496,10 +526,9 @@ def _load_data(self):
if need_to_create_pickle or need_to_create_feather:
if self.data_file is None:
self._download_data()
res = self._compressed_cache_file_paths(self.data_file)
self.data_pickle_file, self.data_feather_file, self.feather_attribute_file = res
# Since our recently stored data is exists in memory, there is no need to load from disk
return self._cache_compressed_file_from_arff(self.data_file)

file_to_load = self.data_file if self.parquet_file is None else self.parquet_file
return self._cache_compressed_file_from_file(file_to_load)

# helper variable to help identify where errors occur
fpath = self.data_feather_file if self.cache_format == "feather" else self.data_pickle_file
Expand Down Expand Up @@ -543,7 +572,8 @@ def _load_data(self):
data_up_to_date = isinstance(data, pd.DataFrame) or scipy.sparse.issparse(data)
if self.cache_format == "pickle" and not data_up_to_date:
logger.info("Updating outdated pickle file.")
return self._cache_compressed_file_from_arff(self.data_file)
file_to_load = self.data_file if self.parquet_file is None else self.parquet_file
return self._cache_compressed_file_from_file(file_to_load)
return data, categorical, attribute_names

@staticmethod
Expand Down
62 changes: 60 additions & 2 deletions openml/datasets/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io
import logging
import os
from typing import List, Dict, Union, Optional
from typing import List, Dict, Union, Optional, cast

import numpy as np
import arff
Expand Down Expand Up @@ -424,6 +424,10 @@ def get_dataset(
raise

arff_file = _get_dataset_arff(description) if download_data else None
if "oml:minio_url" in description and download_data:
parquet_file = _get_dataset_parquet(description)
else:
parquet_file = None
remove_dataset_cache = False
except OpenMLServerException as e:
# if there was an exception,
Expand All @@ -437,7 +441,7 @@ def get_dataset(
_remove_cache_dir_for_id(DATASETS_CACHE_DIR_NAME, did_cache_dir)

dataset = _create_dataset_from_description(
description, features_file, qualities_file, arff_file, cache_format
description, features_file, qualities_file, arff_file, parquet_file, cache_format
)
return dataset

Expand Down Expand Up @@ -908,6 +912,55 @@ def _get_dataset_description(did_cache_dir, dataset_id):
return description


def _get_dataset_parquet(
description: Union[Dict, OpenMLDataset], cache_directory: str = None
) -> Optional[str]:
""" Return the path to the local parquet file of the dataset. If is not cached, it is downloaded.

Checks if the file is in the cache, if yes, return the path to the file.
If not, downloads the file and caches it, then returns the file path.
The cache directory is generated based on dataset information, but can also be specified.

This function is NOT thread/multiprocessing safe.
Unlike the ARFF equivalent, checksums are not available/used (for now).

Parameters
----------
description : dictionary or OpenMLDataset
Either a dataset description as dict or OpenMLDataset.

cache_directory: str, optional (default=None)
Folder to store the parquet file in.
If None, use the default cache directory for the dataset.

Returns
-------
output_filename : string, optional
Location of the Parquet file if successfully downloaded, None otherwise.
"""
if isinstance(description, dict):
url = description.get("oml:minio_url")
did = description.get("oml:id")
elif isinstance(description, OpenMLDataset):
url = description._minio_url
did = description.dataset_id
else:
raise TypeError("`description` should be either OpenMLDataset or Dict.")

if cache_directory is None:
cache_directory = _create_cache_directory_for_id(DATASETS_CACHE_DIR_NAME, did)
output_file_path = os.path.join(cache_directory, "dataset.pq")

if not os.path.isfile(output_file_path):
try:
openml._api_calls._download_minio_file(
source=cast(str, url), destination=output_file_path
)
except FileNotFoundError:
return None
return output_file_path


def _get_dataset_arff(description: Union[Dict, OpenMLDataset], cache_directory: str = None) -> str:
""" Return the path to the local arff file of the dataset. If is not cached, it is downloaded.

Expand Down Expand Up @@ -1031,6 +1084,7 @@ def _create_dataset_from_description(
features_file: str,
qualities_file: str,
arff_file: str = None,
parquet_file: str = None,
cache_format: str = "pickle",
) -> OpenMLDataset:
"""Create a dataset object from a description dict.
Expand All @@ -1045,6 +1099,8 @@ def _create_dataset_from_description(
Path of the dataset qualities as xml file.
arff_file : string, optional
Path of dataset ARFF file.
parquet_file : string, optional
Path of dataset Parquet file.
cache_format: string, optional
Caching option for datasets (feather/pickle)

Expand Down Expand Up @@ -1081,6 +1137,8 @@ def _create_dataset_from_description(
cache_format=cache_format,
features_file=features_file,
qualities_file=qualities_file,
minio_url=description.get("oml:minio_url"),
parquet_file=parquet_file,
)


Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
"pandas>=1.0.0",
"scipy>=0.13.3",
"numpy>=1.6.2",
"minio",
"pyarrow",
],
extras_require={
"test": [
Expand All @@ -65,7 +67,6 @@
"nbformat",
"oslo.concurrency",
"flaky",
"pyarrow",
"pre-commit",
"pytest-cov",
"mypy",
Expand Down
Binary file not shown.
Loading