Merge branch 'main' of https://gitty.informatik.hs-mannheim.de/3016498/ANLP_WS24_CA2
commit
80e6f7849f
168
LSTM.py
168
LSTM.py
|
|
@ -1,171 +1,139 @@
|
|||
import time
|
||||
import json
|
||||
import numpy as np
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.optim as optim
|
||||
from torch.utils.data import DataLoader
|
||||
from sklearn.metrics import accuracy_score, f1_score
|
||||
from sklearn.metrics import mean_squared_error, r2_score
|
||||
from torch.optim.lr_scheduler import ReduceLROnPlateau
|
||||
import matplotlib.pyplot as plt
|
||||
import time
|
||||
from tqdm import tqdm
|
||||
|
||||
# Automatische Geräteauswahl (Apple MPS, CUDA, CPU)
|
||||
if torch.backends.mps.is_available():
|
||||
device = torch.device("mps")
|
||||
elif torch.cuda.is_available():
|
||||
device = torch.device("cuda")
|
||||
else:
|
||||
device = torch.device("cpu")
|
||||
print('Using device:', device)
|
||||
|
||||
class ImprovedLSTMBinaryClassifier(nn.Module):
|
||||
def __init__(self, input_dim, hidden_dim, num_layers, dropout=0.1):
|
||||
super(ImprovedLSTMBinaryClassifier, self).__init__()
|
||||
self.lstm = nn.LSTM(input_dim,
|
||||
hidden_dim,
|
||||
num_layers,
|
||||
batch_first=True,
|
||||
dropout=dropout,
|
||||
bidirectional=False)
|
||||
self.layer_norm = nn.LayerNorm(hidden_dim)
|
||||
self.fc = nn.Linear(hidden_dim, 1)
|
||||
self.sigmoid = nn.Sigmoid()
|
||||
class LSTMNetwork(nn.Module):
|
||||
def __init__(self, input_dim, hidden_dim, num_layers, output_dim, dropout=0.3):
|
||||
super(LSTMNetwork, self).__init__()
|
||||
self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, dropout=dropout, batch_first=True)
|
||||
self.fc = nn.Linear(hidden_dim, output_dim)
|
||||
self.dropout = nn.Dropout(dropout)
|
||||
|
||||
def forward(self, input_ids):
|
||||
lstm_out, _ = self.lstm(input_ids)
|
||||
lstm_out = self.dropout(lstm_out)
|
||||
pooled = lstm_out[:, -1, :] # Letztes verstecktes Zustand
|
||||
normalized = self.layer_norm(pooled)
|
||||
logits = self.fc(normalized)
|
||||
return self.sigmoid(logits)
|
||||
def forward(self, x):
|
||||
lstm_out, _ = self.lstm(x)
|
||||
return self.fc(self.dropout(lstm_out[:, -1, :]))
|
||||
|
||||
# Training und Evaluation
|
||||
def train_model(model, train_loader, val_loader, test_loader, epochs=10):
|
||||
criterion = nn.BCELoss()
|
||||
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
|
||||
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, verbose=True)
|
||||
|
||||
def compute_metrics(predictions, labels):
|
||||
mse = mean_squared_error(labels, predictions)
|
||||
r2 = r2_score(labels, predictions)
|
||||
return mse, r2
|
||||
|
||||
|
||||
def train_model(model, train_loader, val_loader, test_loader, epochs=10, device='cuda'):
|
||||
criterion = nn.MSELoss()
|
||||
optimizer = optim.Adam(model.parameters(), lr=0.001)
|
||||
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)
|
||||
|
||||
best_val_loss = float('inf')
|
||||
best_test_accuracy = 0
|
||||
best_test_r2 = -float('inf')
|
||||
patience = 3
|
||||
counter = 0
|
||||
history = {'train_loss': [], 'val_loss': [], 'test_acc': [], 'test_f1': []}
|
||||
|
||||
history = {'train_loss': [], 'val_loss': [], 'test_r2': [], 'test_mse': []}
|
||||
|
||||
for epoch in range(epochs):
|
||||
# Training
|
||||
model.train()
|
||||
total_loss = 0
|
||||
start_time = time.time()
|
||||
|
||||
for batch in train_loader:
|
||||
train_preds, train_labels = [], []
|
||||
|
||||
for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", ncols=100):
|
||||
optimizer.zero_grad()
|
||||
input_ids = batch['input_ids'].to(device)
|
||||
labels = batch['labels'].unsqueeze(1).to(device)
|
||||
|
||||
outputs = model(input_ids)
|
||||
loss = criterion(outputs, labels)
|
||||
|
||||
inputs = batch['input_ids'].to(device)
|
||||
labels = batch['labels'].to(device)
|
||||
outputs = model(inputs)
|
||||
loss = criterion(outputs.squeeze(), labels)
|
||||
loss.backward()
|
||||
nn.utils.clip_grad_norm_(model.parameters(), 5) # Gradient Clipping
|
||||
optimizer.step()
|
||||
|
||||
total_loss += loss.item()
|
||||
train_preds.extend(outputs.squeeze().detach().cpu().numpy())
|
||||
train_labels.extend(labels.cpu().numpy())
|
||||
|
||||
avg_train_loss = total_loss / len(train_loader)
|
||||
|
||||
# Validierung
|
||||
model.eval()
|
||||
val_loss = 0
|
||||
val_preds, val_labels = [], []
|
||||
|
||||
with torch.no_grad():
|
||||
for batch in val_loader:
|
||||
input_ids = batch['input_ids'].to(device)
|
||||
labels = batch['labels'].unsqueeze(1).to(device)
|
||||
outputs = model(input_ids)
|
||||
val_loss += criterion(outputs, labels).item()
|
||||
inputs = batch['input_ids'].to(device)
|
||||
labels = batch['labels'].to(device)
|
||||
outputs = model(inputs)
|
||||
val_loss += criterion(outputs.squeeze(), labels).item()
|
||||
val_preds.extend(outputs.squeeze().cpu().numpy())
|
||||
val_labels.extend(labels.cpu().numpy())
|
||||
|
||||
avg_val_loss = val_loss / len(val_loader)
|
||||
|
||||
# Test Evaluation
|
||||
test_preds = []
|
||||
test_labels = []
|
||||
test_preds, test_labels = [], []
|
||||
|
||||
with torch.no_grad():
|
||||
for batch in test_loader:
|
||||
input_ids = batch['input_ids'].to(device)
|
||||
labels = batch['labels'].unsqueeze(1).to(device)
|
||||
outputs = model(input_ids)
|
||||
preds = (outputs > 0.5).float()
|
||||
test_preds.extend(preds.cpu().numpy())
|
||||
inputs = batch['input_ids'].to(device)
|
||||
labels = batch['labels'].to(device)
|
||||
outputs = model(inputs)
|
||||
test_preds.extend(outputs.squeeze().cpu().numpy())
|
||||
test_labels.extend(labels.cpu().numpy())
|
||||
|
||||
test_accuracy = accuracy_score(test_labels, test_preds)
|
||||
test_f1 = f1_score(test_labels, test_preds)
|
||||
test_mse, test_r2 = compute_metrics(test_preds, test_labels)
|
||||
|
||||
# History aktualisieren
|
||||
history['train_loss'].append(avg_train_loss)
|
||||
history['val_loss'].append(avg_val_loss)
|
||||
history['test_acc'].append(test_accuracy)
|
||||
history['test_f1'].append(test_f1)
|
||||
history['test_r2'].append(test_r2)
|
||||
history['test_mse'].append(test_mse)
|
||||
|
||||
# Lernrate anpassen
|
||||
scheduler.step(avg_val_loss)
|
||||
|
||||
# Ausgabe
|
||||
epoch_time = time.time() - start_time
|
||||
print(f'Epoch {epoch+1}/{epochs} | Time: {epoch_time:.2f}s')
|
||||
print(f'Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}')
|
||||
print(f'Test Acc: {test_accuracy:.4f} | Test F1: {test_f1:.4f}\n')
|
||||
print(f'Test MSE: {test_mse:.4f} | Test R2: {test_r2:.4f}\n')
|
||||
|
||||
# Bestes Modell speichern
|
||||
if test_accuracy > best_test_accuracy:
|
||||
best_test_accuracy = test_accuracy
|
||||
if test_r2 > best_test_r2:
|
||||
best_test_r2 = test_r2
|
||||
torch.save(model.state_dict(), "best_lstm_model.pth")
|
||||
print(f"🚀 Neues bestes Modell gespeichert (Acc: {test_accuracy:.4f})")
|
||||
print(f"🚀 New best model saved (R2: {test_r2:.4f})")
|
||||
|
||||
# Early Stopping
|
||||
if avg_val_loss < best_val_loss:
|
||||
best_val_loss = avg_val_loss
|
||||
counter = 0
|
||||
else:
|
||||
counter += 1
|
||||
if counter >= patience:
|
||||
print("⛔ Early Stopping ausgelöst!")
|
||||
print("⛔ Early stopping triggered!")
|
||||
break
|
||||
|
||||
return history
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Daten laden (Annahme: Eingebettete Daten sind bereits vorbereitet)
|
||||
data_path = '/content/drive/MyDrive/Colab Notebooks/ANLP_WS24_CA2/data/embedded_padded'
|
||||
train_dataset = torch.load(data_path + '/train.pt')
|
||||
test_dataset = torch.load(data_path + '/test.pt')
|
||||
val_dataset = torch.load(data_path + '/val.pt')
|
||||
train_dataset = torch.load(f'{data_path}/train.pt')
|
||||
test_dataset = torch.load(f'{data_path}/test.pt')
|
||||
val_dataset = torch.load(f'{data_path}/val.pt')
|
||||
|
||||
# Hyperparameter
|
||||
input_dim = 100
|
||||
hidden_dim = 256
|
||||
hidden_dim = 1024
|
||||
num_layers = 2
|
||||
dropout = 0.3
|
||||
batch_size = 64
|
||||
output_dim = 1
|
||||
dropout = 0.2
|
||||
batch_size = 256
|
||||
epochs = 5
|
||||
|
||||
# DataLoader
|
||||
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
|
||||
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
|
||||
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
|
||||
|
||||
# Modell initialisieren
|
||||
model = ImprovedLSTMBinaryClassifier(
|
||||
input_dim=input_dim,
|
||||
hidden_dim=hidden_dim,
|
||||
num_layers=num_layers,
|
||||
dropout=dropout
|
||||
).to(device)
|
||||
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
model = LSTMNetwork(input_dim=input_dim, hidden_dim=hidden_dim, num_layers=num_layers, output_dim=output_dim, dropout=dropout).to(device)
|
||||
|
||||
# Training starten
|
||||
history = train_model(
|
||||
model,
|
||||
train_loader,
|
||||
val_loader,
|
||||
test_loader,
|
||||
epochs=5
|
||||
)
|
||||
history = train_model(model, train_loader, val_loader, test_loader, epochs=epochs, device=device)
|
||||
|
|
|
|||
Loading…
Reference in New Issue