A random forest regression model is built to predict the heat capacity ($C_p$) of solid inorganic materials at different temperatures. The dataset is collected from the NIST JANAF Thermochemical Table
This project is adapted from recent publication looking at best practices for setting up mateial informatics task.
import os
import pandas as pd
import numpy as np
np.random.seed(42)
import matplotlib.pyplot as plt
from matplotlib.pyplot import cm
# High DPI rendering for mac
%config InlineBackend.figure_format = 'retina'
# Plot matplotlib plots with white background:
%config InlineBackend.print_figure_kwargs={'facecolor' : "w"}
plot_params = {
'font.size' : 15,
'axes.titlesize' : 15,
'axes.labelsize' : 15,
'axes.labelweight' : 'bold',
'xtick.labelsize' : 12,
'ytick.labelsize' : 12,
}
plt.rcParams.update(plot_params)
root_dir = os.getcwd()
csv_file_path = os.path.join(root_dir, 'material_cp.csv')
df = pd.read_csv(csv_file_path)
df.sample(5)
print(df.shape)
df.describe().round(2)
Rename columns for better data handling
rename_dict = {'FORMULA':'formula', 'CONDITION: Temperature (K)':'T', 'PROPERTY: Heat Capacity (J/mol K)':'Cp'}
df = df.rename(columns=rename_dict)
df
Check for null entries in the dataset
columns_has_NaN = df.columns[df.isnull().any()]
df[columns_has_NaN].isnull().sum()
Show the null entries in the dataframe
is_NaN = df.isnull()
row_has_NaN = is_NaN.any(axis=1)
df[row_has_NaN]
df_remove_NaN = df.dropna(subset=['formula','Cp','T'])
df_remove_NaN.isnull().sum()
Remove unrealistic values from the dataset
df_remove_NaN.describe()
T_filter = (df_remove_NaN['T'] < 0)
Cp_filter = (df_remove_NaN['Cp'] < 0)
df_remove_NaN_neg_values = df_remove_NaN.loc[(~T_filter) & (~Cp_filter)]
print(df_remove_NaN_neg_values.shape)
Splitting data
The dataset in this exercise contains different formulae, Cp and T for that entry as a function of T. There are lot of repeated formulae and there is a chance that randomly splitting the dataset in train/val/test would lead to leaks of material entries between 3 sets.
To avoid this the idea is to generate train/val/test such that all material entries belonging a particular type are included in only that set. Eg: B2O3 entries are only in either train/val/test set. To do so let's first find the unique material entries in the set and sample those without replacement when making the new train/val/test set
df = df_remove_NaN_neg_values.copy()
df
from sklearn.model_selection import train_test_split
train_df, test_df = train_test_split(df, test_size=0.4, random_state=42)
There are going to be couple of materials which are going to be present in training and test both
train_set = set(train_df['formula'].unique())
test_set = set(test_df['formula'].unique())
# Check for intersection with val and test
len(train_set.intersection(test_set))
Start with unique splitting task
len(df['formula'].unique())
Out of 244 unique materials entries, 233 are present in both training and test. This is problematic for model building especially since we're going to featurize the materials using solely the composition-based features.
f_entries = df['formula'].value_counts()[:50]
fig, ax = plt.subplots(1,1, figsize=(5,20))
ax.barh(f_entries.index, f_entries.values)
ax.tick_params(axis='x', rotation=90);
df['formula'].unique()[:10]
Creating train/val/test manually
unique_entries = df['formula'].unique()
train_set = 0.7
val_set = 0.2
test_set = 1 - train_set - val_set
num_entries_train = int( train_set * len(unique_entries) )
num_entries_val = int( val_set * len(unique_entries) )
num_entries_test = int( test_set * len(unique_entries) )
print(num_entries_train, num_entries_val, num_entries_test)
train_formulae = np.random.choice(unique_entries, num_entries_train, replace=False)
unique_entries_minus_train = [i for i in unique_entries if i not in train_formulae]
# Val formula
val_formulae = np.random.choice(unique_entries_minus_train, num_entries_val, replace=False)
unique_entries_minus_train_val = [i for i in unique_entries_minus_train if i not in val_formulae]
# Test formula
test_formulae = unique_entries_minus_train_val.copy()
print(len(train_formulae), len(val_formulae), len(test_formulae))
train_points = df.loc[ df['formula'].isin(train_formulae) ]
val_points = df.loc[ df['formula'].isin(val_formulae) ]
test_points = df.loc[ df['formula'].isin(test_formulae) ]
print(train_points.shape, val_points.shape, test_points.shape)
train_set = set(train_points['formula'].unique())
val_set = set(val_points['formula'].unique())
test_set = set(test_points['formula'].unique())
# Check for intersection with val and test
print(len(train_set.intersection(val_set)), len(train_set.intersection(test_set)))
from cbfv.composition import generate_features
rename_columns = {'Cp':'target'}
train_points['Type'] = 'Train'
val_points['Type'] = 'Val'
test_points['Type'] = 'Test'
total_data = pd.concat([train_points, val_points, test_points], ignore_index=True);
total_data = total_data.rename(columns=rename_columns)
total_data.sample(5)
train_df = total_data.loc[ total_data['Type'] == 'Train' ].drop(columns=['Type']).reset_index(drop=True)
val_df = total_data.loc[ total_data['Type'] == 'Val' ].drop(columns=['Type']).reset_index(drop=True)
test_df = total_data.loc[ total_data['Type'] == 'Test' ].drop(columns=['Type']).reset_index(drop=True)
train_df.shape
train_df = train_df.sample(n=1000, random_state=42)
train_df.shape
X_unscaled_train, y_train, formulae_entry_train, skipped_entry = generate_features(train_df, elem_prop='oliynyk', drop_duplicates=False, extend_features=True, sum_feat=True)
X_unscaled_val, y_val, formulae_entry_val, skipped_entry = generate_features(val_df, elem_prop='oliynyk', drop_duplicates=False, extend_features=True, sum_feat=True)
X_unscaled_test, y_test, formulae_entry_test, skipped_entry = generate_features(test_df, elem_prop='oliynyk', drop_duplicates=False, extend_features=True, sum_feat=True)
X_unscaled_train.head(5)
formulae_entry_train.head(5)
X_unscaled_train.shape
X_unscaled_train.columns
X_unscaled_train.describe().round(2)
X_unscaled_train['range_heat_of_vaporization_(kJ/mol)_'].hist();
from sklearn.preprocessing import StandardScaler, normalize
stdscaler = StandardScaler()
X_train = stdscaler.fit_transform(X_unscaled_train)
X_val = stdscaler.transform(X_unscaled_val)
X_test = stdscaler.transform(X_unscaled_test)
pd.DataFrame(X_train, columns=X_unscaled_train.columns).describe().round(2)
pd.DataFrame(X_train, columns=X_unscaled_train.columns)['range_heat_of_vaporization_(kJ/mol)_'].hist()
X_train = normalize(X_train)
X_val = normalize(X_val)
X_test = normalize(X_test)
pd.DataFrame(X_train, columns=X_unscaled_train.columns).describe().round(2)
pd.DataFrame(X_train, columns=X_unscaled_train.columns)['range_heat_of_vaporization_(kJ/mol)_'].hist()
from time import time
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
model = RandomForestRegressor(random_state=42)
%%time
model.fit(X_train, y_train)
def display_performance(y_true, y_pred):
r2 = r2_score(y_true, y_pred)
mae = mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
print('R2: {0:0.2f}\n'
'MAE: {1:0.2f}\n'
'RMSE: {2:0.2f}'.format(r2, mae, rmse))
return(r2, mae, rmse)
y_pred = model.predict(X_val)
display_performance(y_val,y_pred);
fig, ax = plt.subplots(1,1, figsize=(5,5))
ax.scatter(y_val, y_pred, alpha=0.6, label='Random Forest')
lims = [np.min([ax.get_xlim(), ax.get_ylim()]), # min of both axes
np.max([ax.get_xlim(), ax.get_ylim()]), # max of both axes
]
# Linear fit
reg = np.polyfit(y_val, y_pred, deg=1)
ax.plot(lims, reg[0] * np.array(lims) + reg[1], 'r--', linewidth=1.5, label='Linear Fit')
ax.plot(lims, lims, 'k--', alpha=0.75, zorder=0, label='Parity Line')
ax.set_aspect('equal')
ax.set_xlabel('True value')
ax.set_ylabel('Model predicted')
ax.legend(loc='best')
feature_name = [i for i in X_unscaled_train.columns]
len(feature_name)
X_train.shape
len(model.estimators_)
mean_feature_importance = model.feature_importances_
std_feature_importance = np.std([ tree.feature_importances_ for tree in model.estimators_ ], axis=0)
feat_imp_df = pd.DataFrame({'name':feature_name, 'mean_imp':mean_feature_importance, 'std_dev':std_feature_importance})
feat_imp_df_top = feat_imp_df.sort_values('mean_imp', ascending=False)[:20]
feat_imp_df_top[:5]
fig, ax = plt.subplots(1,1, figsize=(30,3))
ax.bar(feat_imp_df_top['name'], feat_imp_df_top['mean_imp'], yerr=feat_imp_df_top['std_dev'])
ax.tick_params(axis='x', rotation=90)
ax.set_title('Feature Importance');
top_feature_list = feat_imp_df.loc[ feat_imp_df['mean_imp'] > 0.001 ]['name']
len(top_feature_list)
X_train_df = pd.DataFrame(X_train, columns=feature_name)
X_val_df = pd.DataFrame(X_val, columns=feature_name)
X_train_short = X_train_df[list(top_feature_list)]
X_val_short = X_val_df[list(top_feature_list)]
print(X_train_short.shape, X_train.shape)
model_small = RandomForestRegressor(random_state=42)
%%time
model_small.fit(X_train_short, y_train)
y_pred = model_small.predict(X_val_short)
display_performance(y_val, y_pred);
fig, ax = plt.subplots(1,1, figsize=(5,5))
ax.scatter(y_val, y_pred, alpha=0.6, label='Random Forest')
lims = [np.min([ax.get_xlim(), ax.get_ylim()]), # min of both axes
np.max([ax.get_xlim(), ax.get_ylim()]), # max of both axes
]
# Linear fit
reg = np.polyfit(y_val, y_pred, deg=1)
ax.plot(lims, reg[0] * np.array(lims) + reg[1], 'r--', linewidth=1.5, label='Linear Fit')
ax.plot(lims, lims, 'k--', alpha=0.75, zorder=0, label='Parity Line')
ax.set_aspect('equal')
ax.set_xlabel('True value')
ax.set_ylabel('Model predicted')
ax.legend(loc='best')
Combine train and validation set to generate one train - test set for cross-validation
X_y_train = np.c_[X_train_short, y_train]
X_y_train.shape
np.unique(X_y_train[:,-1] - y_train)
X_y_val = np.c_[X_val_short, y_val]
X_Y_TRAIN = np.vstack((X_y_train, X_y_val))
X_TRAIN = X_Y_TRAIN[:,0:-1].copy()
Y_TRAIN = X_Y_TRAIN[:,-1].copy()
print(X_TRAIN.shape, Y_TRAIN.shape)
from sklearn.model_selection import cross_validate
def display_score(scores, metric):
score_key = 'test_{}'.format(metric)
print(metric)
print('Mean: {}'.format(scores[score_key].mean()))
print('Std dev: {}'.format(scores[score_key].std()))
%%time
_scoring = ['neg_root_mean_squared_error', 'neg_mean_absolute_error']
forest_scores = cross_validate(model, X_TRAIN, Y_TRAIN,
scoring = _scoring, cv=5)
display_score(forest_scores, _scoring[0])
display_score(forest_scores, _scoring[1])
import joblib
from sklearn.model_selection import RandomizedSearchCV
random_forest_base_model = RandomForestRegressor(random_state=42)
param_grid = {
'bootstrap':[True],
'min_samples_leaf':[5,10,100,200,500],
'min_samples_split':[5,10,100,200,500],
'n_estimators':[100,200,400,500],
'max_features':['auto','sqrt','log2'],
'max_depth':[5,10,15,20]
}
CV_rf = RandomizedSearchCV(estimator=random_forest_base_model,
n_iter=50,
param_distributions=param_grid,
scoring='neg_root_mean_squared_error',
cv = 5, verbose = 1, n_jobs=-1, refit=True)
%%time
with joblib.parallel_backend('multiprocessing'):
CV_rf.fit(X_TRAIN, Y_TRAIN)
print(CV_rf.best_params_, CV_rf.best_score_)
pd.DataFrame(CV_rf.cv_results_).sort_values('rank_test_score')[:5]
best_model = CV_rf.best_estimator_
best_model