import os
import copy
import numpy as np
import torch
import torch.nn as nn
import tqdm.notebook as tqdm
import torch.nn.functional as F
42);
torch.manual_seed(42); np.random.seed(
An autoencoder is a type of unsupervised learning method. More than that, it is a type of ‘generative’ model which once trained can potentially help generate synthetic data. In essense, it is a non-linear dimensionality reduction technique to learn the internal low-dimensional structure of the data.
An autoencoder model contains two components:
1. An encoder: that takes an image as input, and outputs a low-dimensional embedding (representation) of the image.
2. A decoder: that takes the low-dimensional embedding, and reconstructs the image.
In esence autoencoder can be viewed as a dimensionality reduction tool for embedding the data in low-dimensional latent space which is non-linear.
Relationship to PCA
Autoencoder can be seen as a generalisation of principal component analysis to nonlinear manifolds. If we remove the nonlinearity (brought about by the activation functions) then the result of autoencoder will be in (some sense) equivalent to PCA. Now, however, the component vectors encoded by weights will not be orthogonal, like with PCA.
Anamoly Detection
Besides being used a generative model, it can be used as a anamoly detection method by considering the loss value between the decoded object and the encoded entity. By setting a threshold on the acceptable loss values we can train the model to flag any instances wherein the model’s loss value exceed that limit and potentially is an anamolous digit.
Such an anamoly detection could be used in processes where images, sound, or signal is scanned and flagged for being out of spec. Google I/O in 2021 had a nice workshop on introducing Autoencoder and their utility in anomaly detection for detecting abnormal heart rhythm. Video
A simple autoencoder based on a CNN architecture will be built to encode and embed MNIST hand-written digit data.
Model Development
For the CNN stride and filter size is chose to ensure final vector in the bottleneck is commensurate with a single vector. To understand more on the how the stride and filter is chosen, or the effect of those parameters on the convolution, there’s a helpful visualization and documentation here: https://github.com/vdumoulin/conv_arithmetic
import matplotlib.pyplot as plt
from matplotlib.pyplot import cm
%config InlineBackend.figure_format = 'retina'
%config InlineBackend.print_figure_kwargs={'facecolor' : "w"}
= {
plot_params 'image.cmap':'binary',
'image.interpolation':'nearest'
}
plt.rcParams.update(plot_params)
from torchvision import datasets, transforms
= torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device device
device(type='cuda')
= transforms.ToTensor()
transform_data
= datasets.MNIST(root='input/',
train_dataset =True,
train=transform_data,
transform=True)
download= datasets.MNIST(root='input/',
val_dataset =False,
train=transform_data,
transform=True) download
= train_dataset[0]
input_tensor, label print('MNIST dataset with {} train data and {} validation data'.format(len(train_dataset), len(val_dataset)))
print('Type of data in dataset: {} AND {}'.format(type(input_tensor), type(label)))
print('Input tensor image dimensions: {}'.format(input_tensor.shape))
MNIST dataset with 60000 train data and 10000 validation data
Type of data in dataset: <class 'torch.Tensor'> AND <class 'int'>
Input tensor image dimensions: torch.Size([1, 28, 28])
class AutoEncoder(nn.Module):
def __init__(self, latent_dimensions=10):
super(AutoEncoder, self).__init__()
self.encoder_module = nn.Sequential(
1, 16, 3, stride=2, padding=1),
nn.Conv2d(
nn.ReLU(),16, 32, 3, stride=2, padding=1),
nn.Conv2d(
nn.ReLU(),32, 64, 7, stride=1, padding=0)
nn.Conv2d(
)
self.decoder_module = nn.Sequential(
64, 32, 7, stride=1, padding=0),
nn.ConvTranspose2d(
nn.ReLU(),32, 16, 3, stride=2, padding=1, output_padding=1),
nn.ConvTranspose2d(
nn.ReLU(),16, 1, 3, stride=2, padding=1, output_padding=1),
nn.ConvTranspose2d(
nn.Sigmoid()
)
self.NN_encode_to_latent = nn.Linear(64,latent_dimensions)
self.NN_latent_to_decode = nn.Linear(latent_dimensions,64)
def encoder(self, x):
#Encode the points
= self.encoder_module(x)
encode_x = encode_x.shape
batch_size, _, _, _
#Bottle neck layer - dont need this but useful when converting it to variational type
= encode_x.view(batch_size, -1)
encode_x
= self.NN_encode_to_latent(encode_x)
x_encode_to_latent
return(x_encode_to_latent, batch_size)
def decoder(self, x_encode_to_latent, batch_size):
= self.NN_latent_to_decode(x_encode_to_latent)
x_latent_to_decode
# Decode the points
= x_latent_to_decode.view(batch_size, -1, 1, 1)
latent_x_reshape = self.decoder_module(latent_x_reshape)
reconst
return(reconst)
def forward(self, x):
= self.encoder(x)
latent_vector, batch_size = self.decoder(latent_vector, batch_size)
reconst return(reconst, latent_vector)
def train(model, data_loader, epoch, criterion, optimizer):
= 0
counter = []
epoch_loss_list
model.train()
for i, data_batch in enumerate(data_loader):
= data_batch
data, label = data.to(device)
data
= model(data)
reconstruction, latent_x
= criterion(reconstruction, data)
loss
loss.backward()
optimizer.step()
optimizer.zero_grad()
= counter + 1
counter
epoch_loss_list.append(loss.item())
if i == 0: #Only append first batch
= (epoch, label, data.cpu().detach(), latent_x, reconstruction.cpu().detach())
outputs
= sum(epoch_loss_list) / counter
mean_train_loss
if epoch % 5 == 0:
print('Training: Epoch: {0}, Loss: {1:0.3f}'.format(epoch+1, mean_train_loss))
return outputs, epoch_loss_list, mean_train_loss
def validation(model, data_loader, epoch, criterion):
= 0
counter = []
epoch_loss_list eval()
model.for i, data_batch in enumerate(data_loader):
with torch.no_grad():
= data_batch
data, label = data.to(device)
data
= model(data)
reconstruction, latent_x
= criterion(reconstruction, data)
loss
= counter + 1
counter
epoch_loss_list.append(loss.item())
if i == 0: #Only append first batch
= (epoch, label, data.cpu().detach(), latent_x, reconstruction.cpu().detach())
outputs
= sum(epoch_loss_list) / counter
mean_val_loss
if epoch % 5 == 0:
print('** Validation: Epoch: {0}, Loss: {1:0.3f}'.format(epoch+1, mean_val_loss))
print('-'*10)
return outputs, epoch_loss_list, mean_val_loss
= AutoEncoder(latent_dimensions=20)
model = model.to(device)
model model
AutoEncoder(
(encoder_module): Sequential(
(0): Conv2d(1, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
(1): ReLU()
(2): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
(3): ReLU()
(4): Conv2d(32, 64, kernel_size=(7, 7), stride=(1, 1))
)
(decoder_module): Sequential(
(0): ConvTranspose2d(64, 32, kernel_size=(7, 7), stride=(1, 1))
(1): ReLU()
(2): ConvTranspose2d(32, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
(3): ReLU()
(4): ConvTranspose2d(16, 1, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
(5): Sigmoid()
)
(NN_encode_to_latent): Linear(in_features=64, out_features=20, bias=True)
(NN_latent_to_decode): Linear(in_features=20, out_features=64, bias=True)
)
=20
num_epochs=64
batch_size=1e-3
learning_rate
= nn.MSELoss()
criterion = torch.optim.Adam(model.parameters(),
optimizer = learning_rate,
lr =1e-5)
weight_decay
= torch.utils.data.DataLoader(train_dataset,
train_loader = batch_size,
batch_size = True)
shuffle
= torch.utils.data.DataLoader(val_dataset,
val_loader = batch_size,
batch_size = True) shuffle
%%time
= [], []
train_output_array, val_output_array = [], []
train_loss_array, val_loss_array
for epoch in tqdm.tqdm(range(num_epochs)):
= train(model, train_loader, epoch, criterion, optimizer)
train_out, train_loss_list, epoch_train_loss = validation(model, val_loader, epoch, criterion)
val_out, val_loss_list, epoch_val_loss
train_loss_array.append(epoch_train_loss)
val_loss_array.append(epoch_val_loss)
train_output_array.append(train_out)
val_output_array.append(val_out)
# Append loss values for train and validation in the final epoch
# Another option is to taken loss values for the epoch when the validation loss is lowest
if epoch == num_epochs - 1:
= train_loss_list
final_train_loss = val_loss_list final_val_loss
Training: Epoch: 1, Loss: 0.047
** Validation: Epoch: 1, Loss: 0.022
----------
Training: Epoch: 6, Loss: 0.009
** Validation: Epoch: 6, Loss: 0.008
----------
Training: Epoch: 11, Loss: 0.008
** Validation: Epoch: 11, Loss: 0.008
----------
Training: Epoch: 16, Loss: 0.008
** Validation: Epoch: 16, Loss: 0.008
----------
CPU times: user 4min 14s, sys: 3.15 s, total: 4min 17s
Wall time: 4min 17s
# Plot loss curve
='Train loss')
plt.plot(train_loss_array, label='Validation loss')
plt.plot(val_loss_array, label'Epoch')
plt.xlabel('MSE Loss')
plt.ylabel(='best'); plt.legend(loc
len(train_output_array)
20
for k in range(0, num_epochs, 4):
= val_output_array[k][2].detach().numpy()
digit_out = val_output_array[k][1].detach().numpy()
label_out = val_output_array[k][4].detach().numpy()
reconst_out
= plt.subplots(2,9, figsize=(9,2))
fig, ax = ax.flatten()
ax
print('Epoch: {}'.format(k))
for i, item in enumerate(digit_out):
if i < 9:
0], cmap=cm.binary, interpolation='nearest')
ax[i].imshow(item['Digit:{}'.format(label_out[i]))
ax[i].set_title(
ax[i].set_xticks([])
ax[i].set_yticks([])
for i, item in enumerate(reconst_out):
if i < 9:
+9].imshow(item[0], cmap=cm.binary, interpolation='nearest')
ax[i+9].set_xticks([])
ax[i+9].set_yticks([])
ax[i plt.show()
Epoch: 0
Epoch: 4
Epoch: 8
Epoch: 12
Epoch: 16
With subsequent epochs the reconstruction of the images becomes better.
= val_output_array[num_epochs-1][2].detach().numpy()
img_temp
print(img_temp.shape)
= model.cpu()
model
= model(torch.tensor(img_temp))
ecode_img, vector_img
print(criterion(ecode_img, torch.tensor(img_temp)).item())
print(ecode_img.shape, vector_img.shape)
= vector_img.detach().numpy()
vector_img print(vector_img.shape)
= ecode_img.detach().numpy()
ecode_img print(ecode_img.shape)
(64, 1, 28, 28)
0.007447564974427223
torch.Size([64, 1, 28, 28]) torch.Size([64, 20])
(64, 20)
(64, 1, 28, 28)
Visualizing the reconstruction of a random validation data digit
= plt.subplots(1,2,figsize=(5,5))
fig, ax 0].imshow(img_temp[0][0], cmap=cm.binary, interpolation='nearest')
ax[0].set_title('Digit Input')
ax[1].imshow(ecode_img[0][0], cmap=cm.binary, interpolation='nearest')
ax[1].set_title('Reconstructed Input')
ax[0].set_xticks([])
ax[0].set_yticks([])
ax[
1].set_xticks([])
ax[1].set_yticks([]); ax[
# Final loss values for each batch of train and validation set
print(len(final_train_loss), len(final_val_loss))
938 157
Visualize the distribution of epoch losses for train and validation set from the last epoch. The statistics from this distribution will be used to set the threshold
for the anamoly detection in the later section
= plt.subplots(2,1, figsize=(10,5), sharex=True)
fig, ax 0].hist(final_train_loss, bins=100)
ax[0].axvline(x = np.median(final_train_loss), color='red', linestyle='--', linewidth=2.0, label='Median')
ax[0].axvline(x = np.mean(final_train_loss), color='cyan', linestyle='--', linewidth=2.0, label='Mean')
ax[
1].hist(final_val_loss, bins=100)
ax[1].axvline(x = np.median(final_val_loss), color='red', linestyle='--', linewidth=2.0,label='Median')
ax[1].axvline(x = np.mean(final_val_loss), color='cyan', linestyle='--', linewidth=2.0,label='Mean')
ax[
; plt.legend()
print('Mean Train Loss: {:0.6f}'.format(np.mean(final_train_loss)))
print('Mean Val Loss: {:0.6f}'.format(np.mean(final_val_loss)))
Mean Train Loss: 0.007515
Mean Val Loss: 0.007516
= np.mean(final_val_loss) + np.std(final_val_loss)
threshold_loss print('Threshold loss is set at: {:0.6f}'.format(threshold_loss))
Threshold loss is set at: 0.008046
Visualize latent space
= np.array([])
label_collection = np.zeros((0,20))
latent_space_collection for i, data_batch in enumerate(val_loader):
with torch.no_grad():
= data_batch
data, label = model(data)
reconstruction, latent_x = np.concatenate((label.numpy(), label_collection))
label_collection = np.vstack((latent_x.numpy(), latent_space_collection)) latent_space_collection
print(label_collection.shape, latent_space_collection.shape)
(10000,) (10000, 20)
tSNE plots
import openTSNE as openTSNE
print('openTSNE', openTSNE.__version__)
openTSNE 0.6.0
%%time
# BH and PCA_init by default
= openTSNE.TSNE(n_jobs=-1, negative_gradient_method='bh').fit(latent_space_collection) Z1
CPU times: user 2min 46s, sys: 2min 11s, total: 4min 57s
Wall time: 39 s
from matplotlib.colors import ListedColormap
import seaborn as sns
= ListedColormap(sns.husl_palette(len(np.unique(label_collection))))
cmap
= plt.subplots(1,1, figsize=(10,10))
fig, ax = ax.scatter(Z1[:,0], Z1[:,1], s=10, c=label_collection, cmap=cmap, edgecolor='none')
im
ax.set_xticks([])
ax.set_yticks([])'tSNE component 1')
ax.set_xlabel('tSNE component 2')
ax.set_ylabel('t-SNE on Latent Space (MNIST data)')
ax.set_title(=0.8)
fig.subplots_adjust(right= fig.add_axes([0.85, 0.25, 0.01, 0.5], label='digit')
cbar_ax = fig.colorbar(im, cax=cbar_ax, label='Digit') cbar
/depot/jgreeley/apps/envs/gpu_env1/lib/python3.7/site-packages/pandas/compat/__init__.py:97: UserWarning: Could not import the lzma module. Your installed Python is incomplete. Attempting to use lzma compression will result in a RuntimeError.
warnings.warn(msg)
Embedding 20 dimensions in 2 dimensional tSNE space, the clusters for each digit become quite clear. It is interesting to see that cluster for 1 and 7 are quite close to another, similarly cluster for 5 and 3 which have been traditionally challenging to distinguish in the classifers
Linear interpolation in the latent space
print(latent_space_collection.shape)
#Select random points for start and ending
= 4121
start_point_index = 9832
end_point_index
= latent_space_collection[start_point_index]
start_point_latent_vectors = latent_space_collection[end_point_index] end_point_latent_vectors
(10000, 20)
= 10
num_steps = np.zeros((0,20))
trajectory_points for i in range(num_steps):
= start_point_latent_vectors * i/num_steps + end_point_latent_vectors * (num_steps - i) / num_steps
z = np.vstack((z, trajectory_points))
trajectory_points
print(trajectory_points.shape)
(10, 20)
= torch.tensor(trajectory_points) trajectory_latent_tensor
= model.decoder(trajectory_latent_tensor.float(), trajectory_latent_tensor.shape[0])
reconstruction_images reconstruction_images.shape
torch.Size([10, 1, 28, 28])
= reconstruction_images.detach() reconstruction_images
= plt.subplots(1,num_steps, figsize=(num_steps+10,4))
fig, ax = ax.flatten()
ax
for i in range(0, reconstruction_images.shape[0]):
0], cmap=cm.binary, interpolation='nearest')
ax[i].imshow(reconstruction_images[i]['Step: {}'.format(i+1))
ax[i].set_title(
ax[i].set_xticks([]); ax[i].set_yticks([])
Visualizing the image generated from each embedding iterated in a linear fashion from start to finish shows the transition between end points
Denoising images
An autoencoder trained on cleaned image can be used to clear out blurry inputs from outside training set
# Add artificial noise to a random image
= 420
rand_idx = val_dataset[rand_idx]
digit_image, label = digit_image[0]
digit_image
plt.imshow(digit_image)'Image of {}'.format(label))
plt.title('off'); plt.axis(
Add gaussian noise to the image. Code taken from Github
= np.random.randn(digit_image.shape[0], digit_image.shape[1])
random_noise = np.clip( digit_image + random_noise * 0.2, 0, 1)
digit_image_noise
plt.imshow(digit_image_noise)'Image of {}'.format(label))
plt.title('off'); plt.axis(
= torch.tensor(digit_image_noise[np.newaxis, np.newaxis, ...]).float(); digit_input_tensor
/depot/jgreeley/apps/envs/gpu_env1/lib/python3.7/site-packages/ipykernel_launcher.py:1: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).
"""Entry point for launching an IPython kernel.
digit_input_tensor.shape
torch.Size([1, 1, 28, 28])
= model(digit_input_tensor)
predicted_image, _ = predicted_image.detach().numpy() predicted_image
0][0])
plt.imshow(predicted_image['Image of {}'.format(label))
plt.title('off'); plt.axis(
The results are much more useful in the case of variational autoencoder which is more robust to noise in the input data since the latent is less sparse due to the variational part
Anomaly Detection
def reconstruction_loss(input_image, _model=model, _criterion=criterion, plot=True):
= _model.cpu()
model = torch.tensor(input_image)
input_image_tensor
= model(input_image_tensor)
encoded_image, _
= _criterion(encoded_image, input_image_tensor).item()
loss_value
= encoded_image.detach().numpy()
encoded_image
if plot == True:
= plt.subplots(1,3,figsize=(10,5))
fig, ax 0].imshow(input_image[0][0], cmap=cm.binary, interpolation='nearest')
ax[0].set_title('Input Image')
ax[
1].imshow(encoded_image[0][0], cmap=cm.binary, interpolation='nearest')
ax[1].set_title('Reconstructed Input')
ax[
2].imshow(input_image[0][0] - encoded_image[0][0], cmap=cm.binary, interpolation='nearest')
ax[2].set_title('Image Difference')
ax[
0].set_xticks([])
ax[0].set_yticks([])
ax[1].set_xticks([])
ax[1].set_yticks([])
ax[2].set_xticks([])
ax[2].set_yticks([])
ax[
return(loss_value)
= val_output_array[num_epochs-4][2].detach().numpy()[0]
random_entry_from_val_set print(random_entry_from_val_set.shape)
(1, 28, 28)
= random_entry_from_val_set[np.newaxis, ...]
input_image print(input_image.shape)
(1, 1, 28, 28)
= reconstruction_loss(input_image)
loss_value = ('Higher' if loss_value > threshold_loss else 'lower')
compare_value = ('anomaly' if compare_value == 'Higher' else 'not an Anomaly')
anamoly_tag print('Loss value is {0:6f} which is {1} than set threshold, so this image is {2}'.format(loss_value, compare_value, anamoly_tag))
Loss value is 0.001904 which is lower than set threshold, so this image is not an Anomaly
Example
0][0])
plt.imshow(input_image['off'); plt.axis(
# Rotate by 90:
= np.rot90(input_image, k=1, axes=(2,3)).copy() # To get rid of negative stride error temp_image_rotate
0][0])
plt.imshow(temp_image_rotate['off'); plt.axis(
= reconstruction_loss(temp_image_rotate)
loss_value = ('Higher' if loss_value > threshold_loss else 'lower')
compare_value = ('anomaly' if compare_value == 'Higher' else 'not an Anomaly')
anamoly_tag print('Loss value is {0:6f} which is {1} than set threshold, so this image is {2}'.format(loss_value, compare_value, anamoly_tag))
Loss value is 0.028646 which is Higher than set threshold, so this image is anomaly