From e03b884092489063eae35e06aeda10d8a0f555ca Mon Sep 17 00:00:00 2001 From: arman Date: Sun, 16 Feb 2025 13:25:37 +0100 Subject: [PATCH] lstm reg --- LSTM.py | 168 +++++++++++++++++++++++--------------------------------- 1 file changed, 68 insertions(+), 100 deletions(-) diff --git a/LSTM.py b/LSTM.py index 58145a5..ad15754 100644 --- a/LSTM.py +++ b/LSTM.py @@ -1,171 +1,139 @@ -import time -import json -import numpy as np import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader -from sklearn.metrics import accuracy_score, f1_score +from sklearn.metrics import mean_squared_error, r2_score from torch.optim.lr_scheduler import ReduceLROnPlateau -import matplotlib.pyplot as plt +import time +from tqdm import tqdm -# Automatische Geräteauswahl (Apple MPS, CUDA, CPU) -if torch.backends.mps.is_available(): - device = torch.device("mps") -elif torch.cuda.is_available(): - device = torch.device("cuda") -else: - device = torch.device("cpu") -print('Using device:', device) -class ImprovedLSTMBinaryClassifier(nn.Module): - def __init__(self, input_dim, hidden_dim, num_layers, dropout=0.1): - super(ImprovedLSTMBinaryClassifier, self).__init__() - self.lstm = nn.LSTM(input_dim, - hidden_dim, - num_layers, - batch_first=True, - dropout=dropout, - bidirectional=False) - self.layer_norm = nn.LayerNorm(hidden_dim) - self.fc = nn.Linear(hidden_dim, 1) - self.sigmoid = nn.Sigmoid() +class LSTMNetwork(nn.Module): + def __init__(self, input_dim, hidden_dim, num_layers, output_dim, dropout=0.3): + super(LSTMNetwork, self).__init__() + self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, dropout=dropout, batch_first=True) + self.fc = nn.Linear(hidden_dim, output_dim) self.dropout = nn.Dropout(dropout) - def forward(self, input_ids): - lstm_out, _ = self.lstm(input_ids) - lstm_out = self.dropout(lstm_out) - pooled = lstm_out[:, -1, :] # Letztes verstecktes Zustand - normalized = self.layer_norm(pooled) - logits = self.fc(normalized) - return self.sigmoid(logits) + def forward(self, x): + lstm_out, _ = self.lstm(x) + return self.fc(self.dropout(lstm_out[:, -1, :])) -# Training und Evaluation -def train_model(model, train_loader, val_loader, test_loader, epochs=10): - criterion = nn.BCELoss() - optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5) - scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, verbose=True) + +def compute_metrics(predictions, labels): + mse = mean_squared_error(labels, predictions) + r2 = r2_score(labels, predictions) + return mse, r2 + + +def train_model(model, train_loader, val_loader, test_loader, epochs=10, device='cuda'): + criterion = nn.MSELoss() + optimizer = optim.Adam(model.parameters(), lr=0.001) + scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True) best_val_loss = float('inf') - best_test_accuracy = 0 + best_test_r2 = -float('inf') patience = 3 counter = 0 - history = {'train_loss': [], 'val_loss': [], 'test_acc': [], 'test_f1': []} + + history = {'train_loss': [], 'val_loss': [], 'test_r2': [], 'test_mse': []} for epoch in range(epochs): - # Training model.train() total_loss = 0 start_time = time.time() - for batch in train_loader: + train_preds, train_labels = [], [] + + for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", ncols=100): optimizer.zero_grad() - input_ids = batch['input_ids'].to(device) - labels = batch['labels'].unsqueeze(1).to(device) - - outputs = model(input_ids) - loss = criterion(outputs, labels) - + inputs = batch['input_ids'].to(device) + labels = batch['labels'].to(device) + outputs = model(inputs) + loss = criterion(outputs.squeeze(), labels) loss.backward() - nn.utils.clip_grad_norm_(model.parameters(), 5) # Gradient Clipping optimizer.step() - total_loss += loss.item() + train_preds.extend(outputs.squeeze().detach().cpu().numpy()) + train_labels.extend(labels.cpu().numpy()) avg_train_loss = total_loss / len(train_loader) - # Validierung model.eval() val_loss = 0 + val_preds, val_labels = [], [] + with torch.no_grad(): for batch in val_loader: - input_ids = batch['input_ids'].to(device) - labels = batch['labels'].unsqueeze(1).to(device) - outputs = model(input_ids) - val_loss += criterion(outputs, labels).item() + inputs = batch['input_ids'].to(device) + labels = batch['labels'].to(device) + outputs = model(inputs) + val_loss += criterion(outputs.squeeze(), labels).item() + val_preds.extend(outputs.squeeze().cpu().numpy()) + val_labels.extend(labels.cpu().numpy()) avg_val_loss = val_loss / len(val_loader) - # Test Evaluation - test_preds = [] - test_labels = [] + test_preds, test_labels = [], [] + with torch.no_grad(): for batch in test_loader: - input_ids = batch['input_ids'].to(device) - labels = batch['labels'].unsqueeze(1).to(device) - outputs = model(input_ids) - preds = (outputs > 0.5).float() - test_preds.extend(preds.cpu().numpy()) + inputs = batch['input_ids'].to(device) + labels = batch['labels'].to(device) + outputs = model(inputs) + test_preds.extend(outputs.squeeze().cpu().numpy()) test_labels.extend(labels.cpu().numpy()) - test_accuracy = accuracy_score(test_labels, test_preds) - test_f1 = f1_score(test_labels, test_preds) + test_mse, test_r2 = compute_metrics(test_preds, test_labels) - # History aktualisieren history['train_loss'].append(avg_train_loss) history['val_loss'].append(avg_val_loss) - history['test_acc'].append(test_accuracy) - history['test_f1'].append(test_f1) + history['test_r2'].append(test_r2) + history['test_mse'].append(test_mse) - # Lernrate anpassen scheduler.step(avg_val_loss) - # Ausgabe epoch_time = time.time() - start_time print(f'Epoch {epoch+1}/{epochs} | Time: {epoch_time:.2f}s') print(f'Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}') - print(f'Test Acc: {test_accuracy:.4f} | Test F1: {test_f1:.4f}\n') + print(f'Test MSE: {test_mse:.4f} | Test R2: {test_r2:.4f}\n') - # Bestes Modell speichern - if test_accuracy > best_test_accuracy: - best_test_accuracy = test_accuracy + if test_r2 > best_test_r2: + best_test_r2 = test_r2 torch.save(model.state_dict(), "best_lstm_model.pth") - print(f"🚀 Neues bestes Modell gespeichert (Acc: {test_accuracy:.4f})") + print(f"🚀 New best model saved (R2: {test_r2:.4f})") - # Early Stopping if avg_val_loss < best_val_loss: best_val_loss = avg_val_loss counter = 0 else: counter += 1 if counter >= patience: - print("⛔ Early Stopping ausgelöst!") + print("⛔ Early stopping triggered!") break return history + if __name__ == "__main__": - # Daten laden (Annahme: Eingebettete Daten sind bereits vorbereitet) data_path = '/content/drive/MyDrive/Colab Notebooks/ANLP_WS24_CA2/data/embedded_padded' - train_dataset = torch.load(data_path + '/train.pt') - test_dataset = torch.load(data_path + '/test.pt') - val_dataset = torch.load(data_path + '/val.pt') + train_dataset = torch.load(f'{data_path}/train.pt') + test_dataset = torch.load(f'{data_path}/test.pt') + val_dataset = torch.load(f'{data_path}/val.pt') - # Hyperparameter input_dim = 100 - hidden_dim = 256 + hidden_dim = 1024 num_layers = 2 - dropout = 0.3 - batch_size = 64 + output_dim = 1 + dropout = 0.2 + batch_size = 256 + epochs = 5 - # DataLoader train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) - # Modell initialisieren - model = ImprovedLSTMBinaryClassifier( - input_dim=input_dim, - hidden_dim=hidden_dim, - num_layers=num_layers, - dropout=dropout - ).to(device) + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model = LSTMNetwork(input_dim=input_dim, hidden_dim=hidden_dim, num_layers=num_layers, output_dim=output_dim, dropout=dropout).to(device) - # Training starten - history = train_model( - model, - train_loader, - val_loader, - test_loader, - epochs=5 - ) + history = train_model(model, train_loader, val_loader, test_loader, epochs=epochs, device=device)