import pandas as pd
import numpy as np
import os
import pickle
from tqdm.notebook import tqdm
- Given a dataset, artificially introduce gaps to represent missing data.
- Using a specific imputatations method, impute those gaps.
- Measure the error between the orignal dataset and the imputed dataset.
- Repeat for a range of different gaps sizes and combinations.
- Compare between different imputations methods.
Abstract out what an ‘imputer’ needs to accomplish.
class BaseImputer():
def __init__(self, **params):
# Store hyperparameters and parameters for this imputer
self.name = 'BaseImputer'
def __str__(self):
return f"{self.name}"
def __repr__(self):
return "BaseImputer()"
def impute(self, df):
# impute with this imputation algorithm
df_imputed = df.copy()
return df_imputed
Implementation for different imputation methods.
from sklearn.impute import SimpleImputer
class MeanImputer(BaseImputer):
def __init__(self, **params):
# Apply hyperparameters and parameters for this imputer
self.name = 'Mean'
def __repr__(self):
return "MeanImputer()"
def impute(self, df):
imputer = SimpleImputer(missing_values=np.nan, strategy='mean')
df_imputed = pd.DataFrame(imputer.fit_transform(df), columns=df.columns, index=df.index)
return df_imputed
class LinearImputer(BaseImputer):
def __init__(self):
self.name = 'Linear'
def __repr__(self):
return "LinearImputer()"
def impute(self, df):
df_imputed = df.interpolate(method='linear')
return df_imputed
from sklearn.ensemble import RandomForestRegressor
class MissForestImputer(BaseImputer):
def __init__(self, max_iter=10):
self.name = 'MissForest'
self.max_iter = max_iter
def __repr__(self):
return f"MissForestImputer(max_iter={self.max_iter})"
def impute(self, df):
df_imputed = pd.DataFrame(self.imputeMF(df.values), columns=df.columns, index=df.index)
return df_imputed
def imputeMF(self, X):
mask = np.isnan(X)
# Count missing per column
col_missing_count = mask.sum(axis=0)
# Get col and row indices for missing
missing_rows, missing_cols = np.where(mask)
Ximp = X.copy()
# 1. Make initial guess for missing values
col_means = np.nanmean(X, axis=0)
Ximp[missing_rows, missing_cols] = np.take(col_means, missing_cols)
# 2. misscount_idx: sorted indices of cols in X based on missing count
misscount_idx = np.argsort(col_missing_count)
iter_count = 0
gamma_new = 0
gamma_old = np.inf
col_index = np.arange(Ximp.shape[1])
rf_regressor = RandomForestRegressor(n_jobs=-1)
while(gamma_new < gamma_old and iter_count < self.max_iter):
#4. store previously imputed matrix
Ximp_old = np.copy(Ximp)
if iter_count != 0:
gamma_old = gamma_new
#5. loop for s in k
for s in tqdm(misscount_idx,
desc=f'MF iteration {iter_count}/{self.max_iter}, γ = {gamma_new:.3e}',
leave=False):
# Column indices other than the one being imputed
s_prime = np.delete(col_index, s)
# Get indices of rows where 's' is observed and missing
obs_rows = np.where(~mask[:, s])[0]
mis_rows = np.where(mask[:, s])[0]
# If no missing, then skip
if len(mis_rows) == 0:
continue
# Get observed values of 's'
yobs = Ximp[obs_rows, s]
# Get 'X' for both observed and missing 's' column
xobs = Ximp[np.ix_(obs_rows, s_prime)]
xmis = Ximp[np.ix_(mis_rows, s_prime)]
# 6. Fit a random forest over observed and predict the missing
rf_regressor.fit(X=xobs, y=yobs)
# 7. predict ymis(s) using xmis(x)
ymis = rf_regressor.predict(xmis)
# 8. update imputed matrix using predicted matrix ymis(s)
Ximp[mis_rows, s] = ymis
# 9. Update gamma (stopping criterion)
gamma_new = np.sum((Ximp - Ximp_old) ** 2) / np.sum(Ximp ** 2)
iter_count += 1
return Ximp
from sklearn.impute import KNNImputer as sklearnKNNImputer
class KNNImputer(BaseImputer):
def __init__(self, n_neighbors=7):
self.name = 'KNN'
self.n_neighbors = 7
def __repr__(self):
return f"KNNImputer(n_neighbors={self.n_neighbors})"
def impute(self, df):
imputer = sklearnKNNImputer(n_neighbors=self.n_neighbors)
df_imputed = pd.DataFrame(imputer.fit_transform(df), columns=df.columns, index=df.index)
return df_imputed
from statsmodels.imputation.mice import MICEData
class MICEImputer(BaseImputer):
def __init__(self, k_pmm=7, n_iterations=10):
self.name = 'MICE'
self.k_pmm = k_pmm
self.n_iterations = n_iterations
def __repr__(self):
return f"MICEImputer(k_pmm={self.k_pmm},n_iterations={self.n_iterations})"
def impute(self, df):
mice_data = MICEData(df, k_pmm=self.k_pmm)
mice_data.update_all(self.n_iterations)
df_imputed = pd.DataFrame(mice_data.data.values, columns=df.columns, index=df.index)
return df_imputed
Routine to add artificial gaps to the data sets. Note there is random seed that is kept constant to ensure the gaps are always the same for different methods.
We could also replicate each experiment multiple times.
def add_artificial_gaps(df, num_sites=24, gap_length=56):
"""
This function takes a dataframe and create contiguous gaps in place
Returns dictionary to indicate where these gaps have been added.
"""
np.random.seed(4152)
gaps = {}
# randomly set a n-day contiguous segment as missing for each column
random_columns = np.random.choice(df.columns, size=num_sites, replace=False)
N = len(df.values.flatten())
m = df.isnull().values.flatten().sum()
missing_data = m / N * 100
for col in random_columns:
# Randomly select the start of the n-day segment
start_idx = np.random.randint(0, len(df) - gap_length)
end_idx = start_idx + gap_length
gaps[col] = [start_idx, end_idx]
# Set the values in this range to NaN
df.iloc[start_idx:end_idx, df.columns.get_loc(col)] = np.nan
m = df.isnull().values.flatten().sum()
missing_fraction = float(m / N * 100)
return {'gaps': gaps, 'num_sites': num_sites, 'gap_length': gap_length, 'missing_fraction': missing_fraction}
Driver program to run single experiment.
def run_experiment(imputer, minimum_missing_data=0, num_sites=24, gap_length=56, rerun_experiment=False):
"""
This routine implements the steps
1. Given a dataset, artificially introduce gaps to represent missing data.
2. Using a specific imputatations method, impute those gaps.
3. Measure the error between the orignal dataset and the imputed dataset.
The results are cached in a pickle file and passed back as a dictionary.
"""
experiment_name = f"{repr(imputer)}_missing{minimum_missing_data}_sites{num_sites}_gaplen{gap_length}"
results_filename = f'results/{experiment_name}.pkl'
if not rerun_experiment and os.path.exists(results_filename):
with open(results_filename, 'rb') as f:
results = pickle.load(f)
return results
# load the data
df = pd.read_csv('dataset.csv', parse_dates=True, index_col=0)
df = df.rename(columns = lambda x: 'S_'+x)
# Calculate the percentage of non-missing data for each study site
non_missing_percentage = df.notna().mean() * 100
# Filter study sites with at least 90% non-missing data
selected_sites = non_missing_percentage[non_missing_percentage >= minimum_missing_data].index
df_true = df[selected_sites].copy()
# add artifical gaps
df = df_true.copy()
results = add_artificial_gaps(df, num_sites, gap_length)
# impute the missing data using the supplied method
df_imputed = imputer.impute(df)
# calculate metrics
error = df_imputed - df_true
MAE = np.mean(abs(error))
RMSE = np.sqrt(np.mean((error)**2))
# capture metadata and data from the experiment
results.update( { 'experiment_name': experiment_name,
'imputer_name': str(imputer),
'imputer_details': repr(imputer),
'df_true': df_true,
'df' : df,
'df_imputed': df_imputed,
'RMSE' : RMSE,
'MAE' : MAE } )
os.makedirs('results', exist_ok=True)
with open(f'results/{experiment_name}.pkl', 'wb') as f:
pickle.dump(results, f)
return results
Example of running one experiment
imputer= MeanImputer()
results = run_experiment(imputer, minimum_missing_data=90, num_sites=8, gap_length=56)
Running a suite of experiments and save the summary to a the file results.csv
imputers = [LinearImputer(),
KNNImputer(),
MICEImputer(),
MissForestImputer()]
results_list = []
filtered_results = []
for imputer in tqdm(imputers, desc="Imputer Methods" , position=0, leave=True):
for num_sites in tqdm([6, 12, 18], desc="num_sites", position=1, leave=False):
for gap_length in tqdm([7, 14, 21, 35, 56, 91,], desc="gap_length", position=2, leave=False):
result = run_experiment(imputer, minimum_missing_data=90, num_sites=num_sites, gap_length=gap_length)
results_list.append(result)
filtered_result = {k: v for k, v in result.items() if isinstance(v, (int, float, str))}
filtered_results.append(filtered_result)
filtered_results = pd.DataFrame(filtered_results)
filtered_results.to_csv('filtered_results.csv', index=False)
Loading...
import holoviews as hv
hv.extension('bokeh')
df = pd.read_csv('filtered_results.csv')
plots = []
for metric in ['MAE', 'RMSE']:
scatter = hv.NdOverlay({
imputer: hv.Scatter(df[df['imputer_name'] == imputer], 'missing_fraction', metric, label=imputer).opts(size=8)
for imputer in df['imputer_name'].unique()
})
scatter.opts(
title=f'{metric} vs Missing Fraction by Imputation Strategy',
xlabel='Missing Fraction (%)',
ylabel=metric,
width=800,
height=400,
legend_position='right'
)
plots.append(scatter)
(plots[0] + plots[1]).cols(1)
Loading...