ANLP_WS24_CA2/transformer_bootstrap_agg.py

281 lines
9.8 KiB
Python

import random
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Subset
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np
import Datasets
import dataset_helper
import EarlyStopping
import ml_helper
import ml_history
import ml_train
SEED = 501
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
class PositionalEncoding(nn.Module):
"""
https://pytorch.org/tutorials/beginner/transformer_tutorial.html
"""
def __init__(self, d_model, vocab_size=5000, dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(vocab_size, d_model)
position = torch.arange(0, vocab_size, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(
torch.arange(0, d_model, 2).float()
* (-math.log(10000.0) / d_model)
)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer("pe", pe)
def forward(self, x):
x = x + self.pe[:, : x.size(1), :]
return self.dropout(x)
class TransformerBinaryClassifier(nn.Module):
"""
Text classifier based on a pytorch TransformerEncoder.
"""
def __init__(
self,
embeddings,
nhead=8,
dim_feedforward=2048,
num_layers=6,
positional_dropout=0.1,
classifier_dropout=0.1,
):
super().__init__()
vocab_size, d_model = embeddings.size()
assert d_model % nhead == 0, "nheads must divide evenly into d_model"
self.emb = nn.Embedding.from_pretrained(embeddings, freeze=False)
self.pos_encoder = PositionalEncoding(
d_model=d_model,
dropout=positional_dropout,
vocab_size=vocab_size,
)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=dim_feedforward,
dropout=classifier_dropout,
)
self.transformer_encoder = nn.TransformerEncoder(
encoder_layer,
num_layers=num_layers,
)
# normalize to stabilize and stop overfitting
self.batch_norm = nn.BatchNorm1d(d_model)
self.classifier = nn.Linear(d_model, 1)
self.d_model = d_model
def forward(self, x):
x = self.emb(x) * math.sqrt(self.d_model)
x = self.pos_encoder(x)
x = self.transformer_encoder(x)
x = x.mean(dim=1)
# normalize to stabilize and stop overfitting
#x = self.batch_norm(x)
#NOTE: no activation function for regression
x = self.classifier(x)
x = x.squeeze(1)
return x
def train_model(model, train_dataset, test_dataset, criterion, optimizer, epochs, batch_size):
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
test_losses, train_losses = [], []
train_r2_scores, test_r2_scores = [], []
for epoch in range(epochs):
model.train()
running_loss = 0.0
running_r2 = 0.0
# Training
for inputs, labels in train_loader:
inputs = inputs.to(device)
labels = labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
running_r2 += r2_score(labels.cpu().numpy(), outputs.cpu().detach().numpy())
train_losses.append(running_loss / len(train_loader))
train_r2_scores.append(running_r2 / len(train_loader))
# Test
model.eval() # Set model to evaluation mode
test_loss = 0.0
test_r2 = 0.0
with torch.no_grad(): # No gradient calculation for testing
for inputs, labels in test_loader:
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
test_loss += loss.item()
test_r2 += r2_score(labels.cpu().numpy(), outputs.cpu().detach().numpy())
test_losses.append(test_loss / len(test_loader))
test_r2_scores.append(test_r2 / len(test_loader))
print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {train_losses[-1]:.4f}, Train R²: {train_r2_scores[-1]:.4f}, Test Loss: {test_losses[-1]:.4f}, Test R²: {test_r2_scores[-1]:.4f}')
return train_losses, test_losses, train_r2_scores, test_r2_scores
# Bootstrap Aggregation (Bagging) Update
def bootstrap_aggregation(ModelClass, train_dataset, test_dataset, num_models=5, epochs=10, batch_size=32, learning_rate=0.001):
models = []
all_train_losses, all_test_losses = [], []
all_train_r2_scores, all_test_r2_scores = [], []
subset_size = len(train_dataset) // num_models
for i in range(num_models):
print(f"Training Model {i + 1}/{num_models}...")
start_idx = i * subset_size
end_idx = start_idx + subset_size
subset_indices = list(range(0, start_idx)) + list(range(end_idx, len(train_dataset)))
subset = Subset(train_dataset, subset_indices)
model = ModelClass(vocab_size, EMBEDDING_DIM, params["filter_sizes"], params["num_filters"], embedding_matrix, params["dropout"])
model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
train_losses, test_losses, train_r2_scores, test_r2_scores = train_model(model, subset, test_dataset, criterion, optimizer, epochs, batch_size)
models.append(model)
all_train_losses.append(train_losses)
all_test_losses.append(test_losses)
all_train_r2_scores.append(train_r2_scores)
all_test_r2_scores.append(test_r2_scores)
# Plot für alle Modelle
plt.figure(figsize=(12, 6))
for i in range(num_models):
plt.plot(all_train_losses[i], label=f'Model {i + 1} Train Loss')
plt.plot(all_test_losses[i], label=f'Model {i + 1} Test Loss', linestyle = 'dashed')
plt.title("Training and Test Loss for all Models")
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()
plt.figure(figsize=(12, 6))
for i in range(num_models):
plt.plot(all_train_r2_scores[i], label=f'Model {i + 1} Train R²')
plt.plot(all_test_r2_scores[i], label=f'Model {i + 1} Test R²', linestyle = 'dashed')
plt.title("Training and Test R² for all Models")
plt.xlabel('Epochs')
plt.ylabel('')
plt.legend()
plt.show()
return models, all_train_losses, all_test_losses, all_train_r2_scores, all_test_r2_scores
# Ensemble Prediction
def ensemble_predict(models, test_dataset):
dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)
all_predictions = []
with torch.no_grad():
for inputs, _ in dataloader:
inputs = inputs.to(device)
predictions = torch.stack([model(inputs).squeeze() for model in models])
avg_predictions = predictions.mean(dim=0)
all_predictions.extend(avg_predictions.cpu().numpy())
return np.array(all_predictions)
if __name__ == '__main__':
# Hyperparameter und Konfigurationen
params = {
# Config
"max_len": 280,
# Training
"epochs": 25,
"patience": 7,
"batch_size": 32,
"learning_rate": 1e-4, # 1e-4
"weight_decay": 5e-4 ,
# Model
'nhead': 2, # 5
"dropout": 0.2,
'hiden_dim': 2048,
'num_layers': 6
}
# TODO set seeds
# Configs
MODEL_NAME = 'transfomrer.pt'
HIST_NAME = 'transformer_history'
GLOVE_PATH = 'data/glove.6B.100d.txt'
DATA_PATH = 'data/hack.csv'
EMBEDDING_DIM = 100
TEST_SIZE = 0.1
VAL_SIZE = 0.1
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Daten laden und vorbereiten
embedding_matrix, word_index, vocab_size, d_model = dataset_helper.get_embedding_matrix(
gloVe_path=GLOVE_PATH, emb_len=EMBEDDING_DIM)
X, y = dataset_helper.load_preprocess_data(path_data=DATA_PATH, verbose=True)
# Aufteilen der Daten
data_split = dataset_helper.split_data(X, y, test_size=TEST_SIZE, val_size=VAL_SIZE)
# Dataset und DataLoader
train_dataset = Datasets.GloveDataset(data_split['train']['X'], data_split['train']['y'], word_index, max_len=params["max_len"])
val_dataset = Datasets.GloveDataset(data_split['val']['X'], data_split['val']['y'], word_index, max_len=params["max_len"])
test_dataset = Datasets.GloveDataset(data_split['test']['X'], data_split['test']['y'], word_index, max_len=params["max_len"])
# Bootstrap Aggregation (Bagging) Training
models, all_train_losses, all_test_losses, all_train_r2_scores, all_test_r2_scores = bootstrap_aggregation(
TransformerBinaryClassifier, train_dataset, test_dataset, num_models=2, epochs=params["epochs"], batch_size=params["batch_size"], learning_rate=params["learning_rate"])
# Ensemble Prediction
test_predictions = ensemble_predict(models, test_dataset)
# Test Evaluation
# test_labels = np.array([y for _, y in test_dataset])
test_mse = mean_squared_error(test_dataset.labels.to_numpy(), test_predictions)
test_mae = mean_absolute_error(test_dataset.labels.to_numpy(), test_predictions)
test_r2 = r2_score(test_dataset.labels.to_numpy(), test_predictions)
print(f"Test RMSE: {test_mse:.4f}, Test MAE: {test_mae:.4f}, Test R²: {test_r2:.4f}")