import pandas as pd
import numpy as np
import os
import pickle
from rich.progress import trackGive a several different imputation algorithms, we need to choose one that we think is ‘best’ for our dataset.
Given a dataset, artificially introduce gaps to represent missing data.
Using a specific imputatations method, impute those gaps.
Measure the error between the orignal dataset and the imputed dataset.
Repeat for a range of different gaps sizes and combinations.
Compare between different imputations methods.
Abstract out what an ‘imputer’ needs to accomplish.
class BaseImputer():
def __init__(self, **params):
# Store hyperparameters and parameters for this imputer
self.name = 'BaseImputer'
def __str__(self):
return f"{self.name}"
def __repr__(self):
return "BaseImputer()"
def impute(self, df):
# impute with this imputation algorithm
df_imputed = df.copy()
return df_imputedImplementation for different imputation methods.
from sklearn.impute import SimpleImputer
class MeanImputer(BaseImputer):
def __init__(self, **params):
# Apply hyperparameters and parameters for this imputer
self.name = 'Mean'
def __repr__(self):
return "MeanImputer()"
def impute(self, df):
imputer = SimpleImputer(missing_values=np.nan, strategy='mean')
df_imputed = pd.DataFrame(imputer.fit_transform(df), columns=df.columns, index=df.index)
return df_imputed
class LinearImputer(BaseImputer):
def __init__(self):
self.name = 'Linear'
def __repr__(self):
return "LinearImputer()"
def impute(self, df):
df_imputed = df.interpolate(method='linear')
return df_imputedfrom imputeMF import imputeMF
class MissForestImputer(BaseImputer):
def __init__(self, max_iter=10):
self.name = 'MissForest'
self.max_iter = max_iter
def __repr__(self):
return f"MissForestImputer(max_iter={self.max_iter})"
def impute(self, df):
df_imputed = pd.DataFrame(imputeMF(df.values), columns=df.columns, index=df.index)
return df_imputedfrom sklearn.impute import KNNImputer as sklearnKNNImputer
class KNNImputer(BaseImputer):
def __init__(self, n_neighbors=7):
self.name = 'KNN'
self.n_neighbors = n_neighbors
def __repr__(self):
return f"KNNImputer(n_neighbors={self.n_neighbors})"
def impute(self, df):
imputer = sklearnKNNImputer(n_neighbors=self.n_neighbors)
df_imputed = pd.DataFrame(imputer.fit_transform(df), columns=df.columns, index=df.index)
return df_imputedfrom statsmodels.imputation.mice import MICEData
class MICEImputer(BaseImputer):
def __init__(self, k_pmm=3, n_iterations=10):
self.name = 'MICE'
self.k_pmm = k_pmm
self.n_iterations = n_iterations
def __repr__(self):
return f"MICEImputer(k_pmm={self.k_pmm},n_iterations={self.n_iterations})"
def impute(self, df):
mice_data = MICEData(df, k_pmm=self.k_pmm)
mice_data.update_all(self.n_iterations)
df_imputed = pd.DataFrame(mice_data.data.values, columns=df.columns, index=df.index)
return df_imputedRoutine to add artificial gaps to the data sets. Note there is random seed that is kept constant to ensure the gaps are always the same for different methods.
We could also replicate each experiment multiple times.
def add_artificial_gaps(df, num_sites=1, gap_length=30):
"""
This function takes a dataframe and create contiguous gaps in place
Returns dictionary to indicate where these gaps have been added.
"""
np.random.seed(4152)
gaps = {}
# randomly set a n-day contiguous segment as missing for each column
random_columns = np.random.choice(df.columns, size=num_sites, replace=False)
N = len(df.values.flatten())
m = df.isnull().values.flatten().sum()
missing_data = m / N * 100
for col in random_columns:
# Randomly select the start of the n-day segment
start_idx = np.random.randint(0, len(df) - gap_length)
end_idx = start_idx + gap_length
gaps[col] = [start_idx, end_idx]
# Set the values in this range to NaN
df.iloc[start_idx:end_idx, df.columns.get_loc(col)] = np.nan
m = df.isnull().values.flatten().sum()
missing_fraction = float(m / N * 100)
return {'gaps': gaps, 'num_sites': num_sites, 'gap_length': gap_length, 'missing_fraction': missing_fraction}Driver program to run single experiment.
import re
def make_valid_filename(s, replacement='_'):
# Remove leading/trailing whitespace
s = s.strip()
# Replace invalid characters with the replacement
s = re.sub(r'[\\/*?:"<>|]', replacement, s)
# Optionally collapse multiple replacements
s = re.sub(f'{re.escape(replacement)}+', replacement, s)
return sdef run_experiment(imputer, minimum_missing_data=0, num_sites=5, gap_length=30, rerun_experiment=False):
"""
This routine implements the steps
1. Given a dataset, artificially introduce gaps to represent missing data.
2. Using a specific imputatations method, impute those gaps.
3. Measure the error between the orignal dataset and the imputed dataset.
The results are cached in a pickle file and passed back as a dictionary.
"""
experiment_name = f"{repr(imputer)}_missing{minimum_missing_data}_sites{num_sites}_gaplen{gap_length}"
experiment_name = make_valid_filename(experiment_name)
results_filename = f'results/{experiment_name}.pkl'
if not rerun_experiment and os.path.exists(results_filename):
with open(results_filename, 'rb') as f:
results = pickle.load(f)
return results
# load the data
df = pd.read_csv('dataset.csv', parse_dates=True, index_col=0)
# Calculate the percentage of non-missing data for each study site
non_missing_percentage = df.notna().mean() * 100
# Filter study sites with at least 90% non-missing data
selected_sites = non_missing_percentage[non_missing_percentage >= minimum_missing_data].index
df_true = df[selected_sites].copy()
# add artifical gaps
df = df_true.copy()
results = add_artificial_gaps(df, num_sites, gap_length)
# impute the missing data using the supplied method
df_imputed = imputer.impute(df)
# calculate metrics
error = df_imputed - df_true
MAE = np.mean(abs(error))
RMSE = np.sqrt(np.mean((error)**2))
# capture metadata and data from the experiment
results.update( { 'experiment_name': experiment_name,
'imputer_name': str(imputer),
'imputer_details': repr(imputer),
'df_true': df_true,
'df' : df,
'df_imputed': df_imputed,
'RMSE' : RMSE,
'MAE' : MAE } )
os.makedirs('results', exist_ok=True)
with open(f'results/{experiment_name}.pkl', 'wb') as f:
pickle.dump(results, f)
return results
Example of running one experiment
imputer= MeanImputer()
results = run_experiment(imputer, minimum_missing_data=90, num_sites=6, gap_length=90)Running a suite of experiments and save the summary to a file in the results subdirectory.
imputers = [LinearImputer(),
KNNImputer(),
MissForestImputer(),
MICEImputer()
]
results_list = []
filtered_results = []
for imputer in track(imputers, description="Imputation Method"):
for num_sites in track([2, 4, 6,], description="NumSites", transient=True):
for gap_length in track([7, 14, 21, 35, 56, 90], description="GapLength", transient=True):
result = run_experiment(imputer, minimum_missing_data=90, num_sites=num_sites, gap_length=gap_length)
results_list.append(result)
filtered_result = {k: v for k, v in result.items() if isinstance(v, (int, float, str))}
filtered_results.append(filtered_result)
filtered_results = pd.DataFrame(filtered_results)
filtered_results.to_csv('results.csv', index=False)import holoviews as hv
hv.extension('bokeh')
df = pd.read_csv('results.csv')
plots = []
for metric in ['MAE', 'RMSE']:
scatter = hv.NdOverlay({
imputer: hv.Scatter(df[df['imputer_name'] == imputer], 'missing_fraction', metric, label=imputer).opts(size=8)
for imputer in df['imputer_name'].unique()
})
scatter.opts(
title=f'{metric} vs Missing Fraction by Imputation Strategy',
xlabel='Missing Fraction (%)',
ylabel=metric,
width=800,
height=400,
legend_position='right'
)
plots.append(scatter)
(plots[0] + plots[1]).cols(1)This analysis shows that the MissForest has the lowest errors for this kind of dataset.