Skip to content

Instantly share code, notes, and snippets.

@koppd
Created October 9, 2023 14:51
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Embed
What would you like to do?
import numpy as np
import pandas as pd
# Scikit-learn
from sklearn.base import ClassifierMixin
from sklearn.svm import SVC
from sklearn.base import clone
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, balanced_accuracy_score, roc_curve, auc
from sklearn.exceptions import ConvergenceWarning
import threading
import os
import warnings
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.utils.estimator_checks import check_estimator
from copy import deepcopy
warnings.filterwarnings("ignore", category=ConvergenceWarning)
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_is_fitted, check_X_y
from imblearn.over_sampling import RandomOverSampler
MAX_THREADS = 50
thread_sem = threading.Semaphore(MAX_THREADS)
DROP = False
ZOOM = True
class Unbuffered(object):
def __init__(self, stream):
self.stream = stream
def write(self, data):
self.stream.write(data)
self.stream.flush()
def writelines(self, datas):
self.stream.writelines(datas)
self.stream.flush()
def __getattr__(self, attr):
return getattr(self.stream, attr)
models = list(zip(
[
"Logistic Regression",
"SVC (Linear)",
"SVC (RBF)",
"Random Forest"
],
[
LogisticRegression(solver = 'saga', fit_intercept = True, random_state = 42, n_jobs = 1),
SVC(kernel = "linear", random_state = 42),
SVC(kernel = "rbf", random_state = 42),
RandomForestClassifier(n_estimators = 100, random_state = 42, n_jobs = 1)
]
))
model_mapping = {x: y for (x,y) in models}
class ScaledAvailability(BaseEstimator, ClassifierMixin):
def __init__(self, oversample=False, base_model=models[0][0], metric=balanced_accuracy_score, scaling_range=(1,), col_nums=(0,)):
self.oversample = oversample
self.base_model = base_model
self.metric = metric
self.scaling_range = scaling_range
self.col_nums = col_nums
# we want to test if we apply some scaling factor to the
# input then we can alter the availability and thereby 'fix' the
# issue of the encoding
# assumes better metric performance = better (maximizes the metric)
def scale(self, X, scale):
X = np.array(X)
X[:, self.col_nums] *= scale
return X
def fit(self, X, y):
X, y = check_X_y(X, y)
if self.oversample:
ros = RandomOverSampler(sampling_strategy='minority')
self.model_ = clone(model_mapping[self.base_model])
if not self.col_nums:
if self.oversample:
self.model_.fit(*ros.fit_resample(X,y))
return self
else:
self.model_.fit(X,y)
return self
# grid search the scaled range
self.best_scale_ = None
best_performance = None
best_model = None
self.n_features_in_ = X.shape[1]
for scale in self.scaling_range:
X_scaled = self.scale(X, scale)
if self.oversample:
self.model_.fit(*ros.fit_resample(X_scaled, y))
else:
self.model_.fit(X_scaled, y)
y_pred = self.model_.predict(X_scaled)
performance = self.metric(y, y_pred)
if not self.best_scale_ or performance > best_performance:
self.best_scale_ = scale
best_performance = performance
best_model = deepcopy(self.model_)
self.model_ = best_model
check_is_fitted(self.model_)
self.classes_ = self.model_.classes_
return self
def predict(self, X):
check_is_fitted(self)
if not self.col_nums:
return self.model_.predict(X)
assert self.best_scale_ != None
check_is_fitted(self.model_)
return self.model_.predict(self.scale(X, self.best_scale_))
def run_test(model_description, preprocess_dataset, dataset, N_trials, dataset_divisions, scaling_range, root_folder, y_value, oversample):
preprocessed_dataset, one_hot_mapping, label_encoded_vars = preprocess_dataset(dataset)
def convert_dataset(dataset: pd.DataFrame):
X = dataset.drop([y_value], axis=1).values
Y = dataset[[y_value]].values.flatten()
return X, Y
train, test = train_test_split(preprocessed_dataset)
def evaluate_subset(
train_data: pd.DataFrame,
test_data: pd.DataFrame,
sample_fraction: float,
model: str,
metric: any,
label_encoded_var: str = None):
distribution = np.zeros(N_trials)
scale_distribution = np.zeros(N_trials)
# swap the one hot variable for label encoding
if label_encoded_var:
if not DROP:
train_data = train_data.drop(one_hot_mapping[label_encoded_var], axis=1)
test_data = test_data.drop(one_hot_mapping[label_encoded_var], axis=1)
train_data[label_encoded_var] = label_encoded_vars[label_encoded_var][train_data.index]
test_data[label_encoded_var] = label_encoded_vars[label_encoded_var][test_data.index]
col_nums = train_data.columns.get_loc(label_encoded_var) # need to account for the location of the y_var
y_var_loc = train_data.columns.get_loc(y_var)
if y_var_loc < col_nums:
col_nums -= 1
col_nums = [col_nums]
else:
train_data = train_data.drop(one_hot_mapping[label_encoded_var], axis=1)
test_data = test_data.drop(one_hot_mapping[label_encoded_var], axis=1)
col_nums = None
# assert col_nums[0] == test_data.columns.get_loc(label_encoded_var)
else:
col_nums = None
X_test, Y_test = convert_dataset(test_data)
lock = threading.Lock()
def run_trial(i):
with lock:
dataset_sample = train_data.sample(frac=sample_fraction)
X, Y = convert_dataset(dataset_sample)
trial_model = ScaledAvailability(oversample, model, metric, scaling_range, col_nums)
# fit model
trial_model.fit(X, Y)
# evaluate
Y_pred = trial_model.predict(X_test)
distribution[i] = metric(Y_test, Y_pred)
if trial_model.col_nums == None:
scale_distribution[i] = 1
else:
scale_distribution[i] = trial_model.best_scale_
thread_sem.release()
threads = []
for i in range(N_trials):
# get sample
thread_sem.acquire()
threads.append(threading.Thread(target=run_trial, args=[i]))
threads[-1].start()
for thread in threads:
thread.join()
return distribution, scale_distribution
tested_variables = list(one_hot_mapping.keys()) + [None]
total_runs = len(tested_variables) * len(dataset_divisions)
counter = 0
print_interval = len(tested_variables)
results = []
train, test = train_test_split(preprocessed_dataset, test_size=0.3, random_state=42)
model_type, _ = model_description
def test_fraction(fraction, variable):
distribution, scale_distribution = evaluate_subset(
train.copy(),
test.copy(),
fraction,
model_type,
balanced_accuracy_score,
variable
)
results.append([
variable if variable else "None",
model_type,
fraction,
np.mean(distribution),
np.std(distribution),
np.mean(scale_distribution),
np.std(scale_distribution)
])
threads = []
for fraction in dataset_divisions:
for variable in tested_variables:
threads.append(
threading.Thread(target=test_fraction, args=[fraction, variable])
)
threads[-1].start()
for thread in threads:
thread.join()
counter += 1
if counter % print_interval == 0:
print(f"{counter}/{total_runs}: {int(100 * counter / total_runs)}%", flush=True)
assert len(results) == total_runs
total_results = pd.DataFrame(results, columns=["label_encoded_var", "model", "fraction", "metric_mean", "metric_std", "scale_mean", "scale_std"])
total_results.to_csv(os.path.join(root_folder, "final.csv"))
plt.figure(figsize=(15, 10))
sns.set_style("whitegrid")
palette = sns.color_palette("tab10", len(tested_variables))
for i, variable in enumerate(tested_variables):
if variable == None:
variable = "None"
variable_data = total_results[total_results['label_encoded_var'] == variable]
variable_data = variable_data.sort_values(by="fraction")
plt.errorbar(variable_data['fraction'], variable_data['metric_mean'], yerr=variable_data['metric_std'] / np.sqrt(N_trials),
label=variable, marker='o', linestyle='-', color=palette[i], capsize=5)
plt.xlabel('Fraction')
plt.legend()
plt.ylabel('Mean')
plt.xscale('log')
plt.savefig(os.path.join(root_folder, "figure.png"))
if __name__ == "__main__":
import sys
if len(sys.argv) < 5:
print("python testing.py <model_num> <N_trials> <divisions> <dataset_num> [scale1, scale2, ... ]")
print("models:\t model_num\t model_name ")
for i, (model_name, _) in enumerate(models):
print(f"\t {i} \t\t {model_name}")
exit()
model_num = int(sys.argv[1])
assert model_num < len(models) and model_num >= 0
model = models[model_num]
N_trials = int(sys.argv[2])
assert N_trials >= 1
dataset_num = int(sys.argv[4])
assert dataset_num in [0,1]
# adult start
if dataset_num == 0:
if not ZOOM:
start = -3
else:
start = -1.5
else:
# mimic cannot start less than
start = -2
divisions = int(sys.argv[3])
assert divisions >= 1
dataset_divisions = np.logspace(start, 0, divisions)
if len(sys.argv) > 5:
scaled = True
scaling_range = []
for i in range(len(sys.argv) - 5):
scale = float(sys.argv[i + 5])
assert scale > 0
scaling_range.append(scale)
else:
scaled = False
scaling_range = [1]
dataset_name = "adult" if dataset_num == 0 else "mimic"
name = "{}_{}_{}_{}_{}{}{}".format(
dataset_name,
N_trials,
divisions,
"scaled" if scaled else "not",
model[0].replace(" ", "-").replace("(","").replace(")",""),
"_dropped" if DROP else "",
"_zoomed-in" if ZOOM else ""
)
if not os.path.exists(name):
os.mkdir(name)
print("changing out to", os.path.join(name, "out.log"))
sys.stderr = open(os.path.join(name, "err.log"), 'w')
sys.stdout = open(os.path.join(name, "out.log"), 'w')
sys.stdout = Unbuffered(sys.stdout)
sys.stderr = Unbuffered(sys.stderr)
if dataset_num == 0:
from load_adult import EXPORTED_DATASET
oversample = False
else:
from load_mimic import EXPORTED_DATASET
oversample = True
preprocess_dataset, dataset, y_var = EXPORTED_DATASET
run_test(model, preprocess_dataset, dataset, N_trials, dataset_divisions, scaling_range, name, y_var, oversample)
Sign in to join this conversation on GitHub.