import time import json import numpy as np import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader from sklearn.metrics import accuracy_score, f1_score from torch.optim.lr_scheduler import ReduceLROnPlateau import matplotlib.pyplot as plt # Automatische Geräteauswahl (Apple MPS, CUDA, CPU) if torch.backends.mps.is_available(): device = torch.device("mps") elif torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") print('Using device:', device) class ImprovedLSTMBinaryClassifier(nn.Module): def __init__(self, input_dim, hidden_dim, num_layers, dropout=0.1): super(ImprovedLSTMBinaryClassifier, self).__init__() self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout, bidirectional=False) self.layer_norm = nn.LayerNorm(hidden_dim) self.fc = nn.Linear(hidden_dim, 1) self.sigmoid = nn.Sigmoid() self.dropout = nn.Dropout(dropout) def forward(self, input_ids): lstm_out, _ = self.lstm(input_ids) lstm_out = self.dropout(lstm_out) pooled = lstm_out[:, -1, :] # Letztes verstecktes Zustand normalized = self.layer_norm(pooled) logits = self.fc(normalized) return self.sigmoid(logits) # Training und Evaluation def train_model(model, train_loader, val_loader, test_loader, epochs=10): criterion = nn.BCELoss() optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5) scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, verbose=True) best_val_loss = float('inf') best_test_accuracy = 0 patience = 3 counter = 0 history = {'train_loss': [], 'val_loss': [], 'test_acc': [], 'test_f1': []} for epoch in range(epochs): # Training model.train() total_loss = 0 start_time = time.time() for batch in train_loader: optimizer.zero_grad() input_ids = batch['input_ids'].to(device) labels = batch['labels'].unsqueeze(1).to(device) outputs = model(input_ids) loss = criterion(outputs, labels) loss.backward() nn.utils.clip_grad_norm_(model.parameters(), 5) # Gradient Clipping optimizer.step() total_loss += loss.item() avg_train_loss = total_loss / len(train_loader) # Validierung model.eval() val_loss = 0 with torch.no_grad(): for batch in val_loader: input_ids = batch['input_ids'].to(device) labels = batch['labels'].unsqueeze(1).to(device) outputs = model(input_ids) val_loss += criterion(outputs, labels).item() avg_val_loss = val_loss / len(val_loader) # Test Evaluation test_preds = [] test_labels = [] with torch.no_grad(): for batch in test_loader: input_ids = batch['input_ids'].to(device) labels = batch['labels'].unsqueeze(1).to(device) outputs = model(input_ids) preds = (outputs > 0.5).float() test_preds.extend(preds.cpu().numpy()) test_labels.extend(labels.cpu().numpy()) test_accuracy = accuracy_score(test_labels, test_preds) test_f1 = f1_score(test_labels, test_preds) # History aktualisieren history['train_loss'].append(avg_train_loss) history['val_loss'].append(avg_val_loss) history['test_acc'].append(test_accuracy) history['test_f1'].append(test_f1) # Lernrate anpassen scheduler.step(avg_val_loss) # Ausgabe epoch_time = time.time() - start_time print(f'Epoch {epoch+1}/{epochs} | Time: {epoch_time:.2f}s') print(f'Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}') print(f'Test Acc: {test_accuracy:.4f} | Test F1: {test_f1:.4f}\n') # Bestes Modell speichern if test_accuracy > best_test_accuracy: best_test_accuracy = test_accuracy torch.save(model.state_dict(), "best_lstm_model.pth") print(f"🚀 Neues bestes Modell gespeichert (Acc: {test_accuracy:.4f})") # Early Stopping if avg_val_loss < best_val_loss: best_val_loss = avg_val_loss counter = 0 else: counter += 1 if counter >= patience: print("⛔ Early Stopping ausgelöst!") break return history if __name__ == "__main__": # Daten laden (Annahme: Eingebettete Daten sind bereits vorbereitet) data_path = '/content/drive/MyDrive/Colab Notebooks/ANLP_WS24_CA2/data/embedded_padded' train_dataset = torch.load(data_path + '/train.pt') test_dataset = torch.load(data_path + '/test.pt') val_dataset = torch.load(data_path + '/val.pt') # Hyperparameter input_dim = 100 hidden_dim = 256 num_layers = 2 dropout = 0.3 batch_size = 64 # DataLoader train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) # Modell initialisieren model = ImprovedLSTMBinaryClassifier( input_dim=input_dim, hidden_dim=hidden_dim, num_layers=num_layers, dropout=dropout ).to(device) # Training starten history = train_model( model, train_loader, val_loader, test_loader, epochs=5 )