ANLP_WS24_CA2/transformer_1a.py

240 lines
7.3 KiB
Python

"""
This file contains the transformer model.
"""
# TODO refactor the code
# TODO create ml helper script
# TODO create ml evaluation script
# TODO track overfitting better
# TODO validate model in training (accuracy, loss, etc)
# TODO set length to a constant value which is the max length of the sentences or nearly
# TODO user gloVe embeddings
#TODO: add attention mask
# TODO: add positional encoding
#TODO: add dropout (if needed)
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from nltk.tokenize import word_tokenize
from transformers import BertTokenizer, BertModel
from torch.utils.data import DataLoader
from transformers import AdamW
from sklearn.metrics import accuracy_score
import gensim
import time
# Disable the warning for beta transformers
import torchvision
torchvision.disable_beta_transforms_warning()
def get_device(verbose=False):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if verbose:
print('Using device:', device)
return device
# Test if GPU is available
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', DEVICE)
# Input maximum length
MAX_LEN = 100
# download nltk data
import nltk
nltk.download('punkt')
nltk.download('punkt_tab')
def get_embedding(model, word):
if word in model.wv:
return model.wv.key_to_index[word]
else:
return unk_index
def encode_tokens(tokens):
return [get_embedding(model_embedding, token) for token in tokens]
def pad_sequences(sequences, MAX_LEN):
return np.array([np.pad(seq, (0, MAX_LEN - len(seq)), mode='constant', constant_values=unk_index) if len(seq) < MAX_LEN else seq[:MAX_LEN] for seq in sequences])
class HumorDataset(torch.utils.data.Dataset):
def __init__(self, encodings, labels):
self.encodings = encodings
self.labels = labels.reset_index(drop=True)
def __getitem__(self, idx):
item = {'input_ids': torch.tensor(self.encodings[idx], dtype=torch.float)}
item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)
return item
def __len__(self):
return len(self.labels)
class TransformerBinaryClassifier(nn.Module):
def __init__(self, vocab_size, embed_dim, num_heads, num_layers, hidden_dim, dropout=0.1):
super(TransformerBinaryClassifier, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.transformer = nn.Transformer(embed_dim, num_heads, num_layers, num_layers, hidden_dim, dropout)
self.fc = nn.Linear(embed_dim, 1)
self.sigmoid = nn.Sigmoid()
def forward(self, input_ids):
input_ids = input_ids.long()
embedded = self.embedding(input_ids)
transformer_output = self.transformer(embedded, embedded)
pooled_output = transformer_output.mean(dim=1)
logits = self.fc(pooled_output)
return self.sigmoid(logits)
if __name__ == "__main__":
# Load the data from csv
df = pd.read_csv('data/hack.csv')
print(df.shape)
# transfrom data into dataset
X = df['text']
y = df['is_humor']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Tokenize the data with nltk
train_tokens = [word_tokenize(text.lower()) for text in X_train]
test_tokens = [word_tokenize(text.lower()) for text in X_test]
# Embed the data with word2vec
model_embedding = gensim.models.Word2Vec(train_tokens, window=5, min_count=1, workers=4)
# Add a special token for out-of-vocabulary words
model_embedding.wv.add_vector('<UNK>', np.zeros(model_embedding.vector_size))
unk_index = model_embedding.wv.key_to_index['<UNK>']
# Encode the tokens
train_encodings = [encode_tokens(tokens) for tokens in train_tokens]
test_encodings = [encode_tokens(tokens) for tokens in test_tokens]
# Define the maximum sequence length
train_encodings = pad_sequences(train_encodings, MAX_LEN)
test_encodings = pad_sequences(test_encodings, MAX_LEN)
train_dataset = HumorDataset(train_encodings, y_train.reset_index(drop=True))
test_dataset = HumorDataset(test_encodings, y_test.reset_index(drop=True))
vocab_size = len(model_embedding.wv.key_to_index)
embed_dim = model_embedding.vector_size
num_heads = 2
num_layers = 2
hidden_dim = 256
print(f"Vocabulary size: {vocab_size}")
print(f"Embedding dimension: {embed_dim}")
model = TransformerBinaryClassifier(vocab_size, embed_dim, num_heads, num_layers, hidden_dim)
# Training parameters
epochs = 30 #3
batch_size = 8
learning_rate = 2e-5
# Optimizer and loss function
optimizer = AdamW(model.parameters(), lr=learning_rate)
criterion = nn.BCEWithLogitsLoss()
# Data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
for td in train_dataset:
print(td['input_ids'].shape)
print(td['labels'])
break
for batch in train_loader:
print(batch['input_ids'].shape)
print(batch['labels'])
break
# Model to device
model.to(DEVICE)
print("Starting training...")
start_training_time = time.time()
losses = []
# Training loop
model.train()
for epoch in range(epochs):
epoch_start_time = time.time()
batch_losses = []
for batch in train_loader:
optimizer.zero_grad()
input_ids = batch['input_ids'].to(DEVICE)
labels = batch['labels'].unsqueeze(1).to(DEVICE)
outputs = model(input_ids)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
batch_losses.append(loss.item())
losses.append(np.mean(batch_losses))
epoch_end_time = time.time()
print(f"Epoch {epoch + 1}/{epochs}, Time: {epoch_end_time - epoch_start_time:.2f} sec, Loss: {losses[-1]:.5f}")
end_training_time = time.time()
print(f"Training finished in {end_training_time - start_training_time:.2f} seconds")
print("Starting evaluation...")
# Evaluation
model.eval()
predictions, true_labels = [], []
with torch.no_grad():
for batch in test_loader:
input_ids = batch['input_ids'].to(DEVICE)
labels = batch['labels'].unsqueeze(1).to(DEVICE)
outputs = model(input_ids)
preds = outputs.round()
predictions.extend(preds.cpu().numpy())
true_labels.extend(labels.cpu().numpy())
accuracy = accuracy_score(true_labels, predictions)
print(f"Accuracy: {accuracy}")
# Save the model
timestamp = time.strftime("%Y%m%d-%H%M%S")
torch.save(model.state_dict(), f'models/transformer_acc_{accuracy}_{timestamp}.pth')
print("Model saved.")
# Save model hyperparameters as json
hyperparameters = {
'max_len': MAX_LEN,
'vocab_size': vocab_size,
'embed_dim': embed_dim,
'num_heads': num_heads,
'num_layers': num_layers,
'hidden_dim': hidden_dim,
'epochs': epochs,
'batch_size': batch_size,
'learning_rate': learning_rate,
'accuracy': accuracy
}
pd.DataFrame(hyperparameters, index=[0]).to_json(f'models/transformer_acc_{accuracy}_{timestamp}.json')