This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
# Scikit-learn | |
from sklearn.base import ClassifierMixin | |
from sklearn.svm import SVC | |
from sklearn.base import clone | |
from sklearn.ensemble import RandomForestClassifier | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.model_selection import train_test_split | |
from sklearn.metrics import accuracy_score, balanced_accuracy_score, roc_curve, auc | |
from sklearn.exceptions import ConvergenceWarning | |
import threading | |
import os | |
import warnings | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from sklearn.utils.estimator_checks import check_estimator | |
from copy import deepcopy | |
warnings.filterwarnings("ignore", category=ConvergenceWarning) | |
from sklearn.base import BaseEstimator, ClassifierMixin | |
from sklearn.utils.validation import check_is_fitted, check_X_y | |
from imblearn.over_sampling import RandomOverSampler | |
MAX_THREADS = 50 | |
thread_sem = threading.Semaphore(MAX_THREADS) | |
DROP = False | |
ZOOM = True | |
class Unbuffered(object): | |
def __init__(self, stream): | |
self.stream = stream | |
def write(self, data): | |
self.stream.write(data) | |
self.stream.flush() | |
def writelines(self, datas): | |
self.stream.writelines(datas) | |
self.stream.flush() | |
def __getattr__(self, attr): | |
return getattr(self.stream, attr) | |
models = list(zip( | |
[ | |
"Logistic Regression", | |
"SVC (Linear)", | |
"SVC (RBF)", | |
"Random Forest" | |
], | |
[ | |
LogisticRegression(solver = 'saga', fit_intercept = True, random_state = 42, n_jobs = 1), | |
SVC(kernel = "linear", random_state = 42), | |
SVC(kernel = "rbf", random_state = 42), | |
RandomForestClassifier(n_estimators = 100, random_state = 42, n_jobs = 1) | |
] | |
)) | |
model_mapping = {x: y for (x,y) in models} | |
class ScaledAvailability(BaseEstimator, ClassifierMixin): | |
def __init__(self, oversample=False, base_model=models[0][0], metric=balanced_accuracy_score, scaling_range=(1,), col_nums=(0,)): | |
self.oversample = oversample | |
self.base_model = base_model | |
self.metric = metric | |
self.scaling_range = scaling_range | |
self.col_nums = col_nums | |
# we want to test if we apply some scaling factor to the | |
# input then we can alter the availability and thereby 'fix' the | |
# issue of the encoding | |
# assumes better metric performance = better (maximizes the metric) | |
def scale(self, X, scale): | |
X = np.array(X) | |
X[:, self.col_nums] *= scale | |
return X | |
def fit(self, X, y): | |
X, y = check_X_y(X, y) | |
if self.oversample: | |
ros = RandomOverSampler(sampling_strategy='minority') | |
self.model_ = clone(model_mapping[self.base_model]) | |
if not self.col_nums: | |
if self.oversample: | |
self.model_.fit(*ros.fit_resample(X,y)) | |
return self | |
else: | |
self.model_.fit(X,y) | |
return self | |
# grid search the scaled range | |
self.best_scale_ = None | |
best_performance = None | |
best_model = None | |
self.n_features_in_ = X.shape[1] | |
for scale in self.scaling_range: | |
X_scaled = self.scale(X, scale) | |
if self.oversample: | |
self.model_.fit(*ros.fit_resample(X_scaled, y)) | |
else: | |
self.model_.fit(X_scaled, y) | |
y_pred = self.model_.predict(X_scaled) | |
performance = self.metric(y, y_pred) | |
if not self.best_scale_ or performance > best_performance: | |
self.best_scale_ = scale | |
best_performance = performance | |
best_model = deepcopy(self.model_) | |
self.model_ = best_model | |
check_is_fitted(self.model_) | |
self.classes_ = self.model_.classes_ | |
return self | |
def predict(self, X): | |
check_is_fitted(self) | |
if not self.col_nums: | |
return self.model_.predict(X) | |
assert self.best_scale_ != None | |
check_is_fitted(self.model_) | |
return self.model_.predict(self.scale(X, self.best_scale_)) | |
def run_test(model_description, preprocess_dataset, dataset, N_trials, dataset_divisions, scaling_range, root_folder, y_value, oversample): | |
preprocessed_dataset, one_hot_mapping, label_encoded_vars = preprocess_dataset(dataset) | |
def convert_dataset(dataset: pd.DataFrame): | |
X = dataset.drop([y_value], axis=1).values | |
Y = dataset[[y_value]].values.flatten() | |
return X, Y | |
train, test = train_test_split(preprocessed_dataset) | |
def evaluate_subset( | |
train_data: pd.DataFrame, | |
test_data: pd.DataFrame, | |
sample_fraction: float, | |
model: str, | |
metric: any, | |
label_encoded_var: str = None): | |
distribution = np.zeros(N_trials) | |
scale_distribution = np.zeros(N_trials) | |
# swap the one hot variable for label encoding | |
if label_encoded_var: | |
if not DROP: | |
train_data = train_data.drop(one_hot_mapping[label_encoded_var], axis=1) | |
test_data = test_data.drop(one_hot_mapping[label_encoded_var], axis=1) | |
train_data[label_encoded_var] = label_encoded_vars[label_encoded_var][train_data.index] | |
test_data[label_encoded_var] = label_encoded_vars[label_encoded_var][test_data.index] | |
col_nums = train_data.columns.get_loc(label_encoded_var) # need to account for the location of the y_var | |
y_var_loc = train_data.columns.get_loc(y_var) | |
if y_var_loc < col_nums: | |
col_nums -= 1 | |
col_nums = [col_nums] | |
else: | |
train_data = train_data.drop(one_hot_mapping[label_encoded_var], axis=1) | |
test_data = test_data.drop(one_hot_mapping[label_encoded_var], axis=1) | |
col_nums = None | |
# assert col_nums[0] == test_data.columns.get_loc(label_encoded_var) | |
else: | |
col_nums = None | |
X_test, Y_test = convert_dataset(test_data) | |
lock = threading.Lock() | |
def run_trial(i): | |
with lock: | |
dataset_sample = train_data.sample(frac=sample_fraction) | |
X, Y = convert_dataset(dataset_sample) | |
trial_model = ScaledAvailability(oversample, model, metric, scaling_range, col_nums) | |
# fit model | |
trial_model.fit(X, Y) | |
# evaluate | |
Y_pred = trial_model.predict(X_test) | |
distribution[i] = metric(Y_test, Y_pred) | |
if trial_model.col_nums == None: | |
scale_distribution[i] = 1 | |
else: | |
scale_distribution[i] = trial_model.best_scale_ | |
thread_sem.release() | |
threads = [] | |
for i in range(N_trials): | |
# get sample | |
thread_sem.acquire() | |
threads.append(threading.Thread(target=run_trial, args=[i])) | |
threads[-1].start() | |
for thread in threads: | |
thread.join() | |
return distribution, scale_distribution | |
tested_variables = list(one_hot_mapping.keys()) + [None] | |
total_runs = len(tested_variables) * len(dataset_divisions) | |
counter = 0 | |
print_interval = len(tested_variables) | |
results = [] | |
train, test = train_test_split(preprocessed_dataset, test_size=0.3, random_state=42) | |
model_type, _ = model_description | |
def test_fraction(fraction, variable): | |
distribution, scale_distribution = evaluate_subset( | |
train.copy(), | |
test.copy(), | |
fraction, | |
model_type, | |
balanced_accuracy_score, | |
variable | |
) | |
results.append([ | |
variable if variable else "None", | |
model_type, | |
fraction, | |
np.mean(distribution), | |
np.std(distribution), | |
np.mean(scale_distribution), | |
np.std(scale_distribution) | |
]) | |
threads = [] | |
for fraction in dataset_divisions: | |
for variable in tested_variables: | |
threads.append( | |
threading.Thread(target=test_fraction, args=[fraction, variable]) | |
) | |
threads[-1].start() | |
for thread in threads: | |
thread.join() | |
counter += 1 | |
if counter % print_interval == 0: | |
print(f"{counter}/{total_runs}: {int(100 * counter / total_runs)}%", flush=True) | |
assert len(results) == total_runs | |
total_results = pd.DataFrame(results, columns=["label_encoded_var", "model", "fraction", "metric_mean", "metric_std", "scale_mean", "scale_std"]) | |
total_results.to_csv(os.path.join(root_folder, "final.csv")) | |
plt.figure(figsize=(15, 10)) | |
sns.set_style("whitegrid") | |
palette = sns.color_palette("tab10", len(tested_variables)) | |
for i, variable in enumerate(tested_variables): | |
if variable == None: | |
variable = "None" | |
variable_data = total_results[total_results['label_encoded_var'] == variable] | |
variable_data = variable_data.sort_values(by="fraction") | |
plt.errorbar(variable_data['fraction'], variable_data['metric_mean'], yerr=variable_data['metric_std'] / np.sqrt(N_trials), | |
label=variable, marker='o', linestyle='-', color=palette[i], capsize=5) | |
plt.xlabel('Fraction') | |
plt.legend() | |
plt.ylabel('Mean') | |
plt.xscale('log') | |
plt.savefig(os.path.join(root_folder, "figure.png")) | |
if __name__ == "__main__": | |
import sys | |
if len(sys.argv) < 5: | |
print("python testing.py <model_num> <N_trials> <divisions> <dataset_num> [scale1, scale2, ... ]") | |
print("models:\t model_num\t model_name ") | |
for i, (model_name, _) in enumerate(models): | |
print(f"\t {i} \t\t {model_name}") | |
exit() | |
model_num = int(sys.argv[1]) | |
assert model_num < len(models) and model_num >= 0 | |
model = models[model_num] | |
N_trials = int(sys.argv[2]) | |
assert N_trials >= 1 | |
dataset_num = int(sys.argv[4]) | |
assert dataset_num in [0,1] | |
# adult start | |
if dataset_num == 0: | |
if not ZOOM: | |
start = -3 | |
else: | |
start = -1.5 | |
else: | |
# mimic cannot start less than | |
start = -2 | |
divisions = int(sys.argv[3]) | |
assert divisions >= 1 | |
dataset_divisions = np.logspace(start, 0, divisions) | |
if len(sys.argv) > 5: | |
scaled = True | |
scaling_range = [] | |
for i in range(len(sys.argv) - 5): | |
scale = float(sys.argv[i + 5]) | |
assert scale > 0 | |
scaling_range.append(scale) | |
else: | |
scaled = False | |
scaling_range = [1] | |
dataset_name = "adult" if dataset_num == 0 else "mimic" | |
name = "{}_{}_{}_{}_{}{}{}".format( | |
dataset_name, | |
N_trials, | |
divisions, | |
"scaled" if scaled else "not", | |
model[0].replace(" ", "-").replace("(","").replace(")",""), | |
"_dropped" if DROP else "", | |
"_zoomed-in" if ZOOM else "" | |
) | |
if not os.path.exists(name): | |
os.mkdir(name) | |
print("changing out to", os.path.join(name, "out.log")) | |
sys.stderr = open(os.path.join(name, "err.log"), 'w') | |
sys.stdout = open(os.path.join(name, "out.log"), 'w') | |
sys.stdout = Unbuffered(sys.stdout) | |
sys.stderr = Unbuffered(sys.stderr) | |
if dataset_num == 0: | |
from load_adult import EXPORTED_DATASET | |
oversample = False | |
else: | |
from load_mimic import EXPORTED_DATASET | |
oversample = True | |
preprocess_dataset, dataset, y_var = EXPORTED_DATASET | |
run_test(model, preprocess_dataset, dataset, N_trials, dataset_divisions, scaling_range, name, y_var, oversample) | |
Sign in
to join this conversation on GitHub.