top of page
realcode4you

Implement and Train LSTM-based Models and Transformer-based Models



In this task the student to implement and train LSTM-based models and Transformer-based models to solve a time-series data classification problem.


You are asked to implement the code. All changes should be made in the models.py and train_inference.py. You should ONLY modify the code parts which have the TODO flag. You need to run the codes in the task5.ipynb to do tasks. You need to use the Latex template we provided in the report/report.tex. The report should ONLY contain TWO pages. Anything that exceeds two pages will be ignored.


Setup

It is recommended to use GPUs. If you want to use free GPUs provided by Google Colab, you need to first download task5.zip, then unzip the task5.zip file, and finally upload the Assignment5 directory to your Google Drive. The first step is to run the first “Setup” block in the task5.ipynb if you are using Colab, otherwise skip the first “Setup” block. You can use Overleaf to edit your report.tex.


Task 1

In this task, you need to train LSTM-based models to solve a time-series data classification problem. First, we will introduce the datasets. Then, we will provide the detailed requirements


Datasets.

The dataset is the Electrocardiogram (ECG) Heartbeat Categorization Dataset, which contains heartbeat signals for exploring the categorization of heartbeats. The ECG dataset is composed of two benchmark datasets for heartbeat classification, i.e., the MIT-BIH Arrhythmia dataset and the PTB Diagnostic ECG dataset. MIT-BIH dataset contains 109446 samples and 5 categories, and PTB dataset contains 14552 samples and 2 categories. Each sample is an ECG whose size is [1, 187]. You can use the “utils.Vis_ECG” function to visualize the ECG data. The dataset can be downloaded via this link, and then put the “dataset” directory into the “Assignment5” directory. Please refer to this paper if you want to know more about the datasets.


Implementation details.

Design your own LSTM-based model. The LSTM-based model must contain an LSTM module. Hint: you can use the function nn.LSTM().


Objective. You need to train LSTM-based models on the full training set, and try to obtain at least 97.5% accuracy on the MIT-BIH dataset and 96% accuracy on the PTB dataset. Report the best test accuracy evaluated on the test set of both MIT-BIH and PTB datasets, and illustrate your methods to improve the performance in the report.


Task 2

In this task, you need to train Transformer-based models for the heartbeat classification.


Implementation details. The Transformer-based model must contain a Transformer-based encoder module. Hint: you can use the PyTorch package to realize the Transformer-based encoder.


Objective.

  • You need to train Transformer-based models on the full training set, and try to obtain at least 97.5% accuracy on the MIT-BIH dataset and 96.5% accuracy on the PTB dataset. Report the best test accuracy evaluated on the test set of both MIT-BIH and PTB datasets, and illustrate your methods to improve the performance in the report.

  • Report the best test accuracy of both Transformer-based models and LSTM-based models trained on MIT-BIH training sets with different sizes. You need to report the best test accuracy evaluated on the full MIT-BIH testing sets w.r.t. different subset sizes, and provide your observations as well as analyses in the report. Note that the variable “subset_percent” in the code means that the current training set is the #subset_percent fraction of the full training set.


Implementation

import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import matplotlib.pyplot as plt
import os

import utils
from train_inference import train_lstm, train_transformer
from models import LSTM_based_Classifier, Transformer_based_classifier
torch.manual_seed(0)
random.seed(0)
np.random.seed(0)
torch.backends.cudnn.deterministic = True
os.environ["CUDA_VISIBLE_DEVICES"] = '0'
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
TRAIN = True

model_dir = './checkpoint'
if not os.path.exists(model_dir):
    os.makedirs(model_dir)

Task 1 Train LSTM-based models on the MIT-BIH and PTB datasets.

net = LSTM_based_Classifier(class_num=5).double().to(device)
LSTM_mit_full = train_lstm(net, subset_percent=1,dataset_type='MIT_BIH', device=device, verbose=1, TRAIN=TRAIN)
net = LSTM_based_Classifier(class_num=2).double().to(device)
LSTM_ptb_full = train_lstm(net, subset_percent=1, dataset_type='PTB', device=device, verbose=1, TRAIN=TRAIN)
subset_percent = [0.01, 0.05, 0.1, 0.2, 0.5, 1.0]
LSTM_best_acc_subset = []

for sp in subset_percent:
    net = LSTM_based_Classifier(class_num=5).double().to(device)
    print('##################################### Subset ratio: {} #####################################'.format(sp))
    LSTM_best_acc_subset.append(train_lstm(net=net, subset_percent=sp, dataset_type='MIT_BIH', device=device, verbose=1, TRAIN=TRAIN))


Task 2 Train Transformer-based models on the MIT-BIH and PTB datasets.

net = Transformer_based_classifier(class_num=5).double().to(device)
Transformer_mit_full = train_transformer(net, subset_percent=1, dataset_type='MIT_BIH', device=device, verbose=1, TRAIN=TRAIN)
net = Transformer_based_classifier(class_num=2).double().to(device)
Transformer_ptb_full = train_transformer(net, subset_percent=1, dataset_type='PTB', device=device, verbose=1, TRAIN=TRAIN)

Train Transformer-based models on the MIT-BIH datasets with different sizes.

subset_percent = [0.01, 0.05, 0.1, 0.2, 0.5, 1.0]
Transformer_best_acc_subset = []
​
for sp in subset_percent:
    net = Transformer_based_classifier(class_num=5).double().to(device)
    print('##################################### Subset ratio: {} #####################################'.format(sp))
    Transformer_best_acc_subset.append(train_transformer(net=net, subset_percent=sp, dataset_type='MIT_BIH', device=device, verbos

RUN the following blocks to save results for auto-grading

net = LSTM_based_Classifier(class_num=5).double().to(device)
LSTM_mit_full_test = train_lstm(net, subset_percent=1,dataset_type='MIT_BIH', device=device, verbose=1, TRAIN=False)
net = LSTM_based_Classifier(class_num=2).double().to(device)
LSTM_ptb_full_test = train_lstm(net, subset_percent=1, dataset_type='PTB', device=device, verbose=1, TRAIN=False)
subset_percent = [0.01, 0.05, 0.1, 0.2, 0.5, 1.0]
LSTM_best_acc_subset_test = []

for sp in subset_percent:
    net = LSTM_based_Classifier(class_num=5).double().to(device)
    print('##################################### Subset ratio: {} #####################################'.format(sp))
    LSTM_best_acc_subset_test.append(train_lstm(net=net, subset_percent=sp, dataset_type='MIT_BIH', device=device, verbose=1, TRAIN=False))
net = Transformer_based_classifier(class_num=5).double().to(device)
Transformer_mit_full_test = train_transformer(net, subset_percent=1, dataset_type='MIT_BIH', device=device, verbose=1, TRAIN=False)
net = Transformer_based_classifier(class_num=2).double().to(device)
Transformer_ptb_full_test = train_transformer(net, subset_percent=1, dataset_type='PTB', device=device, verbose=1, TRAIN=False)
subset_percent = [0.01, 0.05, 0.1, 0.2, 0.5, 1.0]
Transformer_best_acc_subset_test = []

for sp in subset_percent:
    net = Transformer_based_classifier(class_num=5).double().to(device)
    print('##################################### Subset ratio: {} #####################################'.format(sp))
    Transformer_best_acc_subset_test.append(train_transformer(net=net, subset_percent=sp, dataset_type='MIT_BIH', device=device, verbose=1, TRAIN=False))
import pickle
result_pkl = 'result.pkl'
result = dict()
result.update({'LSTM_MIT_Full': LSTM_mit_full_test})
result.update({'LSTM_PTB_Full': LSTM_ptb_full_test})
result.update({'Transformer_MIT_Full': Transformer_mit_full_test})
result.update({'Transformer_PTB_Full': Transformer_ptb_full_test})
result.update({'LSTM_best_acc_subset': LSTM_best_acc_subset_test})
result.update({'Transformer_best_acc_subset': Transformer_best_acc_subset_test})
with open(result_pkl, 'wb') as f:
    pickle.dump(result, f)
    f.close()

# NOTED! TA will check the test accuracy of your submitted models according to the above scripts. 
# There will be a penalty if the results obtained by TA are significantly not aligned with those in your submitted pickle file.


import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import time
import matplotlib.pyplot as plt
import os

# Hyper-parameter configuration
#############
# TODO
hidden_size = ...
num_layers = ...
# Add hyper-parameters you need to define.
#############

class LSTM_based_Classifier(nn.Module):
    #############
    # TODO
    def __init__(self, class_num):
       super(LSTM_based_Classifier, self).__init__()
       self.lstm = ...

    def forward(self, x):

      return 0
    #############


class Transformer_based_classifier(nn.Module):
    ############
    #TODO: design Transformer-based model
    def __init__(self, class_num):
        super(Transformer_based_classifier, self).__init__()
        
        self.transformer_encoder_layer = ...
        self.transformer_encoder = ...

        
    def forward(self, x):
        return 0
    ###############

train_inference.py

import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import time
import matplotlib.pyplot as plt
import os
import utils


def train_lstm(net, subset_percent, dataset_type, device, verbose=0, TRAIN=True):
    # Hyperparameter configuration
    #############
    # TODO
    n_epochs = ...
    learning_rate = ...
    batch_size = ...
    # Add hyperparameters
    #############

    if dataset_type == 'MIT_BIH':
        train_loader = utils.load_mit_datasets(data_path='./dataset/mitbih_train.csv', batch_size=batch_size, subset_percent=subset_percent)
        test_loader = utils.load_mit_datasets(data_path='./dataset/mitbih_test.csv', batch_size=batch_size, shuffle=False)
    elif dataset_type == 'PTB':
        train_loader, test_loader = utils.load_ptb_datasets(data_dir='./dataset', batch_size=batch_size)

    #############
    #TODO: define the loss function
    criterion = ...
    #############

    def eval_on_set(net, loader):
        running_loss=0
        num_batches=0 
        correct = 0
        net.eval()
        
        for batch_idx, (data, target) in enumerate(loader):
            data, target = data.to(device), target.to(device)

            #############
            #TODO: implememnt the test function
            #
            scores = ...
            loss = ...
            #
            #############

            num_batches += 1
            running_loss += loss.item() 
            pred = scores.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()
        total_loss = running_loss/num_batches 
        acc = correct / len(loader.dataset)
        return acc, total_loss

    best_acc = 0

    #############
    #TODO: define the optimizer
    optimizer = ...
    #############
    
    if TRAIN:
        for epoch in range(n_epochs):
            start=time.time()
            running_loss=0
            num_batches=0    
            net.train()
            
            for batch_idx, (data, target) in enumerate(train_loader):
                data, target = data.to(device), target.to(device)

                #############
                #TODO: implememnt the training procedure
                #
                loss = ...
                #
                #############

                running_loss += loss.item()
                num_batches += 1
            total_loss = running_loss/num_batches
            elapsed = time.time()-start
            train_acc, train_loss = eval_on_set(net, train_loader)
            test_acc, test_loss = eval_on_set(net, test_loader)
            if test_acc > best_acc:
                best_acc = test_acc
                torch.save(net.state_dict(), './checkpoint/LSTM_{}_{}.pt'.format(dataset_type, subset_percent))
            
            if verbose:
                print('################################################################################')
                print('Epoch={}\tTime={}\tLoss={}'.format(epoch+1, elapsed, total_loss))
                print('Training Acc={}\tLoss={}'.format(train_acc, train_loss))
                print('Test Acc={}\tLoss={}'.format(test_acc, test_loss))
                print('Best Test Acc: {}'.format(best_acc))
    else:
        model_name = './checkpoint/LSTM_{}_{}.pt'.format(dataset_type, subset_percent)
        net.load_state_dict(torch.load(model_name))
        test_acc, test_loss = eval_on_set(net, test_loader)
        print('Test Acc={}\tLoss={}'.format(test_acc, test_loss))
    return best_acc

        

def train_transformer(net, subset_percent, dataset_type, device, verbose=0, TRAIN=True):
    # Hyperparameter configuration
    #############
    # TODO
    n_epochs = ...
    learning_rate = ...
    batch_size = ...
    # Add hyperparameters
    #############

    if dataset_type == 'MIT_BIH':
        train_loader = utils.load_mit_datasets(data_path='./dataset/mitbih_train.csv', batch_size=batch_size, subset_percent=subset_percent)
        test_loader = utils.load_mit_datasets(data_path='./dataset/mitbih_test.csv', batch_size=batch_size, shuffle=False)
    elif dataset_type == 'PTB':
        train_loader, test_loader = utils.load_ptb_datasets(data_dir='./dataset', batch_size=batch_size)

    #############
    #TODO: define the loss function
    criterion = ...
    #############

    def eval_on_set(net, loader):
        running_loss=0
        num_batches=0 
        correct = 0
        net.eval()
        
        for batch_idx, (data, target) in enumerate(loader):
            data, target = data.to(device), target.to(device)

            #############
            #TODO: implememnt the test function
            #
            scores = ...
            loss = ...
            #
            #############
            
            num_batches += 1
            running_loss += loss.item() 
            pred = scores.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()
        total_loss = running_loss/num_batches 
        acc = correct / len(loader.dataset)
        return acc, total_loss

    best_acc = 0

    #############
    #TODO: define the optimizer
    optimizer = ...
    #############
    
    if TRAIN:
        for epoch in range(n_epochs):
            start=time.time()
            running_loss=0
            num_batches=0    
            net.train()
            
            for batch_idx, (data, target) in enumerate(train_loader):
                data, target = data.to(device), target.to(device)
                #############
                #TODO: implememnt the training procedure
                #
                loss = ...
                #
                #############

                running_loss += loss.item()
                num_batches += 1
            total_loss = running_loss/num_batches
            elapsed = time.time()-start
            train_acc, train_loss = eval_on_set(net, train_loader)
            test_acc, test_loss = eval_on_set(net, test_loader)
            if test_acc > best_acc:
                best_acc = test_acc
                torch.save(net.state_dict(), './checkpoint/Transformer_{}_{}.pt'.format(dataset_type, subset_percent))
            
            if verbose:
                print('################################################################################')
                print('Epoch={}\tTime={}\tLoss={}'.format(epoch+1, elapsed, total_loss))
                print('Training Acc={}\tLoss={}'.format(train_acc, train_loss))
                print('Test Acc={}\tLoss={}'.format(test_acc, test_loss))
                print('Best Test Acc: {}'.format(best_acc))
    else:
        model_name = './checkpoint/Transformer_{}_{}.pt'.format(dataset_type, subset_percent)
        net.load_state_dict(torch.load(model_name))
        test_acc, test_loss = eval_on_set(net, test_loader)
        print('Test Acc={}\tLoss={}'.format(test_acc, test_loss))
       
    return best_acc

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import matplotlib.pyplot as plt
import os

def load_mit_datasets(data_path, batch_size, subset_percent=1, shuffle=True):
    data = pd.read_csv(data_path,header=None,index_col=False)
    X = torch.DoubleTensor(data.iloc[:,:-1].values)
    y = torch.LongTensor(data.iloc[:,-1].values)
    category = torch.unique(y)
    subset_index = []
    for _ in range(len(category)):
        subset_index.append([])
    for i in range(len(y)):
        subset_index[y[i].item()].append(i)
    num_class = [0]*len(category)
    for c in category:
        subset_index[c] = subset_index[c][:int(len(subset_index[c]) * subset_percent)]
        num_class[c] = len(subset_index[c])
    subset_index = [i for i_list in subset_index for i in i_list]
    dataset = torch.utils.data.TensorDataset(X[subset_index], y[subset_index])
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
    print('MIT BIH Dataset Info. Number of data:{}. Number of data in each class:{}'.format(len(subset_index), num_class))
    return dataloader

def load_ptb_datasets(data_dir, batch_size, split_percent=0.7):
    on_data = pd.read_csv(os.path.join(data_dir, 'ptbdb_abnormal.csv'),header=None,index_col=False)
    n_data = pd.read_csv(os.path.join(data_dir, 'ptbdb_normal.csv'),header=None,index_col=False)
    on_X = torch.DoubleTensor(on_data.iloc[:,:-1].values)
    on_y = torch.zeros(size=(len(on_X), 1 )).long().squeeze()
    n_X = torch.DoubleTensor(n_data.iloc[:,:-1].values)
    n_y = torch.ones(size=(len(n_X), 1 )).long().squeeze()

    X_train = torch.cat((on_X[:int(len(on_X) * split_percent)], n_X[:int(len(n_X) * split_percent)]), dim=0)
    Y_train = torch.cat((on_y[:int(len(on_X) * split_percent)], n_y[:int(len(n_X) * split_percent)]), dim=0)
    X_test = torch.cat((on_X[int(len(on_X) * split_percent):], n_X[int(len(n_X) * split_percent):]), dim=0)
    Y_test = torch.cat((on_y[int(len(on_X) * split_percent):], n_y[int(len(n_X) * split_percent):]), dim=0)

    train_dataset = torch.utils.data.TensorDataset(X_train, Y_train)
    test_dataset = torch.utils.data.TensorDataset(X_test, Y_test)
    train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    print('PTB Dataset Info. Number of training data:{}. Number of test data:{}.'.format(len(train_dataset), len(test_dataset)))
    return train_dataloader, test_dataloader

def vis_ECG(X,y):
    plt.figure()
    plt.plot(np.arange(0,187),X)
    plt.title(y)
    plt.show()

Comments


bottom of page