diff --git a/src/protify/probes/lazy_predict.py b/src/protify/probes/lazy_predict.py index 602fe2b..d3a4527 100644 --- a/src/protify/probes/lazy_predict.py +++ b/src/protify/probes/lazy_predict.py @@ -6,6 +6,7 @@ import xgboost import lightgbm from tqdm import tqdm +from joblib import Parallel, delayed from sklearn.pipeline import Pipeline from sklearn.impute import SimpleImputer from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder @@ -248,6 +249,7 @@ def __init__( predictions=False, random_state=None, classifiers="all", + n_jobs=1, ): self.verbose = verbose self.ignore_warnings = ignore_warnings @@ -256,6 +258,7 @@ def __init__( self.models = {} self.random_state = random_state or get_global_seed() self.classifiers = classifiers + self.n_jobs = n_jobs def fit(self, X_train, X_test, y_train, y_test): """Fit Classification algorithms to X_train and y_train, predict and score on X_test, y_test. @@ -323,9 +326,8 @@ def fit(self, X_train, X_test, y_train, y_test): print_message(exception) print_message("Invalid Classifier(s)") - pbar = tqdm(self.classifiers) - for name, model in pbar: - pbar.set_description(f"Training {name}") + def _fit_classifier(name, model): + """Train a single classifier and return results.""" start = time.time() try: if "random_state" in model().get_params().keys(): @@ -341,63 +343,57 @@ def fit(self, X_train, X_test, y_train, y_test): ) pipe.fit(X_train, y_train) - self.models[name] = pipe y_pred = pipe.predict(X_test) accuracy = accuracy_score(y_test, y_pred, normalize=True) b_accuracy = balanced_accuracy_score(y_test, y_pred) f1 = f1_score(y_test, y_pred, average="weighted") try: roc_auc = roc_auc_score(y_test, y_pred) - except Exception as exception: + except: roc_auc = None - if self.ignore_warnings is False: - print_message("ROC AUC couldn't be calculated for " + name) - print_message(exception) - names.append(name) - Accuracy.append(accuracy) - B_Accuracy.append(b_accuracy) - ROC_AUC.append(roc_auc) - F1.append(f1) - TIME.append(time.time() - start) + fit_time = time.time() - start + custom = self.custom_metric(y_test, y_pred) if self.custom_metric else None + return {"name": name, "pipe": pipe, "y_pred": y_pred, "accuracy": accuracy, + "b_accuracy": b_accuracy, "roc_auc": roc_auc, "f1": f1, + "time": fit_time, "custom": custom, "failed": False} + except Exception as e: + return {"name": name, "failed": True, "error": e} + + # Parallel or sequential execution + if self.n_jobs != 1: + results = Parallel(n_jobs=self.n_jobs, prefer="threads")( + delayed(_fit_classifier)(name, model) for name, model in self.classifiers + ) + else: + results = [_fit_classifier(name, model) for name, model in tqdm(self.classifiers, desc="Training classifiers")] + # Collect results + for r in results: + if r["failed"]: + if self.ignore_warnings is False: + print_message(f'\n{r["name"]} model failed to execute') + print_message(r["error"]) + else: + self.models[r["name"]] = r["pipe"] + names.append(r["name"]) + Accuracy.append(r["accuracy"]) + B_Accuracy.append(r["b_accuracy"]) + ROC_AUC.append(r["roc_auc"]) + F1.append(r["f1"]) + TIME.append(r["time"]) if self.custom_metric is not None: - custom_metric = self.custom_metric(y_test, y_pred) - CUSTOM_METRIC.append(custom_metric) - + CUSTOM_METRIC.append(r["custom"]) if self.verbose > 0: if self.custom_metric is not None: - print_message( - { - "Model": name, - "Accuracy": accuracy, - "Balanced Accuracy": b_accuracy, - "ROC AUC": roc_auc, - "F1 Score": f1, - "Custom Metric": custom_metric, - "Time taken": time.time() - start, - } - ) + print_message({"Model": r["name"], "Accuracy": r["accuracy"], + "Balanced Accuracy": r["b_accuracy"], "ROC AUC": r["roc_auc"], + "F1 Score": r["f1"], "Custom Metric": r["custom"], "Time taken": r["time"]}) else: - print_message( - { - "Model": name, - "Accuracy": accuracy, - "Balanced Accuracy": b_accuracy, - "ROC AUC": roc_auc, - "F1 Score": f1, - "Time taken": time.time() - start, - } - ) + print_message({"Model": r["name"], "Accuracy": r["accuracy"], + "Balanced Accuracy": r["b_accuracy"], "ROC AUC": r["roc_auc"], + "F1 Score": r["f1"], "Time taken": r["time"]}) if self.predictions: - predictions[name] = y_pred - - - except Exception as exception: - if self.ignore_warnings is False: - print_message(f'\n{name} model failed to execute') - print_message(exception) - pbar.update(1) - pbar.close() + predictions[r["name"]] = r["y_pred"] if self.custom_metric is None: scores = pd.DataFrame( @@ -492,6 +488,7 @@ def __init__( predictions=False, random_state=None, regressors="all", + n_jobs=1, ): self.verbose = verbose self.ignore_warnings = ignore_warnings @@ -500,6 +497,7 @@ def __init__( self.models = {} self.random_state = random_state or get_global_seed() self.regressors = regressors + self.n_jobs = n_jobs def fit(self, X_train, X_test, y_train, y_test): """Fit Regression algorithms to X_train and y_train, predict and score on X_test, y_test. @@ -567,9 +565,10 @@ def fit(self, X_train, X_test, y_train, y_test): print_message(exception) print_message("Invalid Regressor(s)") - pbar = tqdm(self.regressors) - for name, model in pbar: - pbar.set_description(f"Training {name}") + n_test, n_features = X_test.shape[0], X_test.shape[1] + + def _fit_regressor(name, model): + """Train a single regressor and return results.""" start = time.time() try: if "random_state" in model().get_params().keys(): @@ -585,48 +584,51 @@ def fit(self, X_train, X_test, y_train, y_test): ) pipe.fit(X_train, y_train) - self.models[name] = pipe y_pred = pipe.predict(X_test) r_squared = r2_score(y_test, y_pred) - adj_rsquared = adjusted_rsquared( - r_squared, X_test.shape[0], X_test.shape[1] - ) + adj_rsquared = adjusted_rsquared(r_squared, n_test, n_features) rmse = np.sqrt(mean_squared_error(y_test, y_pred)) - names.append(name) - R2.append(r_squared) - ADJR2.append(adj_rsquared) - RMSE.append(rmse) - TIME.append(time.time() - start) + fit_time = time.time() - start + custom = self.custom_metric(y_test, y_pred) if self.custom_metric else None + return {"name": name, "pipe": pipe, "y_pred": y_pred, "r2": r_squared, + "adj_r2": adj_rsquared, "rmse": rmse, "time": fit_time, + "custom": custom, "failed": False} + except Exception as e: + return {"name": name, "failed": True, "error": e} + + # Parallel or sequential execution + if self.n_jobs != 1: + results = Parallel(n_jobs=self.n_jobs, prefer="threads")( + delayed(_fit_regressor)(name, model) for name, model in self.regressors + ) + else: + results = [_fit_regressor(name, model) for name, model in tqdm(self.regressors, desc="Training regressors")] + # Collect results + for r in results: + if r["failed"]: + if self.ignore_warnings is False: + print_message(f'\n{r["name"]} model failed to execute') + print_message(r["error"]) + else: + self.models[r["name"]] = r["pipe"] + names.append(r["name"]) + R2.append(r["r2"]) + ADJR2.append(r["adj_r2"]) + RMSE.append(r["rmse"]) + TIME.append(r["time"]) if self.custom_metric: - custom_metric = self.custom_metric(y_test, y_pred) - CUSTOM_METRIC.append(custom_metric) - + CUSTOM_METRIC.append(r["custom"]) if self.verbose > 0: - scores_verbose = { - "Model": name, - "R-Squared": r_squared, - "Adjusted R-Squared": adj_rsquared, - "RMSE": rmse, - "Time taken": time.time() - start, - } - + scores_verbose = {"Model": r["name"], "R-Squared": r["r2"], + "Adjusted R-Squared": r["adj_r2"], "RMSE": r["rmse"], "Time taken": r["time"]} if self.custom_metric: - scores_verbose[self.custom_metric.__name__] = custom_metric - + scores_verbose[self.custom_metric.__name__] = r["custom"] print_message(scores_verbose) if self.predictions: - predictions[name] = y_pred - - except Exception as exception: - if self.ignore_warnings is False: - print_message(f'\n{name} model failed to execute') - print_message(exception) - - pbar.update(1) - pbar.close() + predictions[r["name"]] = r["y_pred"] scores = { "Model": names, diff --git a/src/protify/testing_suite/test_lazy_predict_sped_up.py b/src/protify/testing_suite/test_lazy_predict_sped_up.py new file mode 100644 index 0000000..475f31b --- /dev/null +++ b/src/protify/testing_suite/test_lazy_predict_sped_up.py @@ -0,0 +1,48 @@ +# Example usage: +# python -m src.protify.testing_suite.test_lazy_predict_sped_up + +import time +import numpy as np +from src.protify.probes.lazy_predict import LazyClassifier, LazyRegressor + +# Larger synthetic data for more reliable benchmarking +X = np.random.randn(1000, 256) # 1000 samples, 256 features +y_cls = np.random.randint(0, 2, 1000) +y_reg = np.random.randn(1000) + +NUM_RUNS = 3 +times = [] + +for run in range(NUM_RUNS): + print(f"\n=== Run {run + 1}/{NUM_RUNS} ===") + run_start = time.time() + + # Classifier (n_jobs=-1 uses all cores) + clf = LazyClassifier(classifiers="all", verbose=0, n_jobs=-1) + clf_scores = clf.fit(X[:800], X[800:], y_cls[:800], y_cls[800:]) + + # Regressor (n_jobs=-1 uses all cores) + reg = LazyRegressor(regressors="all", verbose=0, n_jobs=-1) + reg_scores = reg.fit(X[:800], X[800:], y_reg[:800], y_reg[800:]) + + run_time = time.time() - run_start + times.append(run_time) + print(f"Run {run + 1} time: {run_time:.2f}s") + +print(f"\n=== Results ===") +print(f"Times: {[f'{t:.2f}s' for t in times]}") +print(f"Average: {np.mean(times):.2f}s") +print(f"Std: {np.std(times):.2f}s") + +# BASELINE: +# Times: ['26.06s', '26.31s', '26.94s'] +# Average: 26.43s +# Std: 0.37s +# PARALLELIZED VERSION: +# === Results === +# Times: ['23.54s', '11.55s', '8.83s'] +# Average: 14.64s +# Std: 6.39s +# Times: ['14.22s', '14.22s', '10.64s'] +# Average: 13.03s +# Std: 1.69s \ No newline at end of file