Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
More unit tests
  • Loading branch information
Neeratyoy committed Feb 5, 2021
commit 43ae42da964f518c2e066f4f46b843307042cf7c
13 changes: 0 additions & 13 deletions openml/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ def set_file_log_level(file_output_level: int):
"avoid_duplicate_runs": "True",
"connection_n_retries": 10,
"max_retries": 20,
"n_jobs": 4,
}

config_file = os.path.expanduser(os.path.join("~", ".openml", "config"))
Expand Down Expand Up @@ -119,7 +118,6 @@ def get_server_base_url() -> str:
# Number of retries if the connection breaks
connection_n_retries = _defaults["connection_n_retries"]
max_retries = _defaults["max_retries"]
n_jobs = _defaults["n_jobs"]


class ConfigurationForExamples:
Expand Down Expand Up @@ -172,12 +170,6 @@ def stop_using_configuration_for_example(cls):
apikey = cls._last_used_key
cls._start_last_called = False

@classmethod
def set_n_jobs_for_parallel_runs(cls, n=4):
""" Set the number of workers to be used while running a flow/model on a task. """
global n_jobs
n_jobs = n


def _setup(config=None):
"""Setup openml package. Called on first import.
Expand All @@ -194,7 +186,6 @@ def _setup(config=None):
global avoid_duplicate_runs
global connection_n_retries
global max_retries
global n_jobs

# read config file, create cache directory
try:
Expand Down Expand Up @@ -222,7 +213,6 @@ def _get(config, key):
short_cache_dir = _get(config, "cachedir")
connection_n_retries = _get(config, "connection_n_retries")
max_retries = _get(config, "max_retries")
n_jobs = _get(config, "n_jobs")

cache_directory = os.path.expanduser(short_cache_dir)
# create the cache subdirectory
Expand Down Expand Up @@ -275,7 +265,6 @@ def get_config_as_dict():
config["avoid_duplicate_runs"] = avoid_duplicate_runs
config["connection_n_retries"] = connection_n_retries
config["max_retries"] = max_retries
config["n_jobs"] = n_jobs
return config


Expand Down Expand Up @@ -321,15 +310,13 @@ def set_cache_directory(cachedir):
ConfigurationForExamples.start_using_configuration_for_example
)
stop_using_configuration_for_example = ConfigurationForExamples.stop_using_configuration_for_example
set_n_jobs_for_parallel_runs = ConfigurationForExamples.set_n_jobs_for_parallel_runs


__all__ = [
"get_cache_directory",
"set_cache_directory",
"start_using_configuration_for_example",
"stop_using_configuration_for_example",
"set_n_jobs_for_parallel_runs",
"get_config_as_dict",
]

Expand Down
7 changes: 6 additions & 1 deletion openml/runs/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def run_model_on_task(
upload_flow: bool = False,
return_flow: bool = False,
dataset_format: str = "dataframe",
n_jobs: int = None,
Comment thread
mfeurer marked this conversation as resolved.
Outdated
) -> Union[OpenMLRun, Tuple[OpenMLRun, OpenMLFlow]]:
"""Run the model on the dataset defined by the task.

Expand Down Expand Up @@ -132,6 +133,7 @@ def get_task_and_type_conversion(task: Union[int, str, OpenMLTask]) -> OpenMLTas
add_local_measures=add_local_measures,
upload_flow=upload_flow,
dataset_format=dataset_format,
n_jobs=n_jobs,
)
if return_flow:
return run, flow
Expand All @@ -147,6 +149,7 @@ def run_flow_on_task(
add_local_measures: bool = True,
upload_flow: bool = False,
dataset_format: str = "dataframe",
n_jobs: int = None,
Comment thread
mfeurer marked this conversation as resolved.
Outdated
) -> OpenMLRun:

"""Run the model provided by the flow on the dataset defined by task.
Expand Down Expand Up @@ -266,6 +269,7 @@ def run_flow_on_task(
extension=flow.extension,
add_local_measures=add_local_measures,
dataset_format=dataset_format,
n_jobs=n_jobs,
)

data_content, trace, fold_evaluations, sample_evaluations = res
Expand Down Expand Up @@ -426,6 +430,7 @@ def _run_task_get_arffcontent(
extension: "Extension",
add_local_measures: bool,
dataset_format: str,
n_jobs: int = None,
) -> Tuple[
List[List],
Optional[OpenMLRunTrace],
Expand Down Expand Up @@ -460,7 +465,7 @@ def _run_task_get_arffcontent(
# Execute runs in parallel
# assuming the same number of tasks as workers (n_jobs), the total compute time for this
# statement will be similar to the slowest run
job_rvals = Parallel(verbose=0, n_jobs=config.n_jobs)(
job_rvals = Parallel(verbose=0, n_jobs=n_jobs)(
delayed(_run_task_get_arffcontent_parallel_helper)(
extension=extension,
flow=flow,
Expand Down
2 changes: 2 additions & 0 deletions openml/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ def setUp(self, n_levels: int = 1):
# Increase the number of retries to avoid spurious server failures
self.connection_n_retries = openml.config.connection_n_retries
openml.config.connection_n_retries = 10
# Number of processes to parallelize any evaluation made by a unit test
self.n_jobs = 2
Comment thread
Neeratyoy marked this conversation as resolved.
Outdated

def tearDown(self):
os.chdir(self.cwd)
Expand Down
4 changes: 1 addition & 3 deletions tests/test_openml/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ def test_get_config_as_dict(self):
""" Checks if the current configuration is returned accurately as a dict. """
config = openml.config.get_config_as_dict()
self.assertIsInstance(config, dict)
self.assertEqual(len(config), 7)
self.assertEqual(len(config), 6)
self.assertEqual(config.get("server"), "https://test.openml.org/api/v1/xml")
self.assertEqual(config.get("apikey"), "610344db6388d9ba34f6db45a3cf71de")
self.assertEqual(config.get("cachedir"), self.workdir)
self.assertEqual(config.get("avoid_duplicate_runs"), False)
self.assertEqual(config.get("max_retries"), 20)
Comment thread
mfeurer marked this conversation as resolved.
Outdated
self.assertEqual(config.get("n_jobs"), 4)

def test_setup_with_config(self):
""" Checks if the OpenML configuration can be updated using _setup(). """
Expand All @@ -32,7 +31,6 @@ def test_setup_with_config(self):
_config["avoid_duplicate_runs"] = True
_config["connection_n_retries"] = 100
_config["max_retries"] = 1000
_config["n_jobs"] = 64
orig_config = openml.config.get_config_as_dict()
openml.config._setup(_config)
updated_config = openml.config.get_config_as_dict()
Expand Down
63 changes: 61 additions & 2 deletions tests/test_runs/test_run_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1586,17 +1586,76 @@ def test__run_task_get_arffcontent_2(self):
flow = unittest.mock.Mock()
flow.name = "dummy"
clf = SGDClassifier(loss="log", random_state=1)
with parallel_backend("loky", n_jobs=2):
with parallel_backend("loky", n_jobs=self.n_jobs):
res = openml.runs.functions._run_task_get_arffcontent(
flow=flow,
extension=self.extension,
model=clf,
task=task,
add_local_measures=True,
dataset_format="array",
dataset_format="array", # "dataframe" would require handling of categoricals
n_jobs=self.n_jobs,
)
self.assertEqual(type(res[0]), list)
self.assertEqual(len(res[0]), num_instances)
self.assertEqual(len(res[0][0]), line_length)
self.assertEqual(len(res[2]), 7)
self.assertEqual(len(res[3]), 7)
Comment thread
mfeurer marked this conversation as resolved.

@unittest.skipIf(
LooseVersion(sklearn.__version__) < "0.21",
reason="couldn't perform local tests successfully w/o bloating RAM",
)
def test_joblib_backends(self):
""" Tests evaluation of a run using various joblib backends and n_jobs. """
task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp
x, y = task.get_X_and_y(dataset_format="dataframe")
num_instances = x.shape[0]
line_length = 6 + len(task.class_labels)
flow = unittest.mock.Mock()
flow.name = "dummy"

for n_jobs, backend, len_time_stats in [
(1, "loky", 7),
(2, "loky", 4),
(-1, "loky", 1),
(1, "threading", 7),
(-1, "threading", 1),
(1, "sequential", 7),
]:
clf = sklearn.model_selection.RandomizedSearchCV(
estimator=sklearn.ensemble.RandomForestClassifier(n_estimators=5),
param_distributions={
"max_depth": [3, None],
"max_features": [1, 2, 3, 4],
"min_samples_split": [2, 3, 4, 5, 6, 7, 8, 9, 10],
"min_samples_leaf": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
"bootstrap": [True, False],
"criterion": ["gini", "entropy"],
},
random_state=1,
cv=sklearn.model_selection.StratifiedKFold(
n_splits=2, shuffle=True, random_state=1
),
n_iter=5,
n_jobs=n_jobs,
)
with parallel_backend(backend, n_jobs=n_jobs):
Comment thread
mfeurer marked this conversation as resolved.
res = openml.runs.functions._run_task_get_arffcontent(
flow=flow,
extension=self.extension,
model=clf,
task=task,
add_local_measures=True,
dataset_format="array", # "dataframe" would require handling of categoricals
n_jobs=n_jobs,
)
self.assertEqual(type(res[0]), list)
Comment thread
Neeratyoy marked this conversation as resolved.
self.assertEqual(len(res[0]), num_instances)
self.assertEqual(len(res[0][0]), line_length)
# usercpu_time_millis_* not recorded when n_jobs > 1
# *_time_millis_* not recorded when n_jobs = -1
self.assertEqual(len(res[2]), len_time_stats)
self.assertEqual(len(res[3]), len_time_stats)
self.assertEqual(len(res[2]["predictive_accuracy"][0]), 10)
self.assertEqual(len(res[3]["predictive_accuracy"][0]), 10)