main
arman 2025-02-16 13:25:37 +01:00
parent 544f16d316
commit e03b884092
1 changed files with 68 additions and 100 deletions

168
LSTM.py
View File

@ -1,171 +1,139 @@
import time
import json
import numpy as np
import torch import torch
import torch.nn as nn import torch.nn as nn
import torch.optim as optim import torch.optim as optim
from torch.utils.data import DataLoader from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, f1_score from sklearn.metrics import mean_squared_error, r2_score
from torch.optim.lr_scheduler import ReduceLROnPlateau from torch.optim.lr_scheduler import ReduceLROnPlateau
import matplotlib.pyplot as plt import time
from tqdm import tqdm
# Automatische Geräteauswahl (Apple MPS, CUDA, CPU)
if torch.backends.mps.is_available():
device = torch.device("mps")
elif torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
print('Using device:', device)
class ImprovedLSTMBinaryClassifier(nn.Module): class LSTMNetwork(nn.Module):
def __init__(self, input_dim, hidden_dim, num_layers, dropout=0.1): def __init__(self, input_dim, hidden_dim, num_layers, output_dim, dropout=0.3):
super(ImprovedLSTMBinaryClassifier, self).__init__() super(LSTMNetwork, self).__init__()
self.lstm = nn.LSTM(input_dim, self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, dropout=dropout, batch_first=True)
hidden_dim, self.fc = nn.Linear(hidden_dim, output_dim)
num_layers,
batch_first=True,
dropout=dropout,
bidirectional=False)
self.layer_norm = nn.LayerNorm(hidden_dim)
self.fc = nn.Linear(hidden_dim, 1)
self.sigmoid = nn.Sigmoid()
self.dropout = nn.Dropout(dropout) self.dropout = nn.Dropout(dropout)
def forward(self, input_ids): def forward(self, x):
lstm_out, _ = self.lstm(input_ids) lstm_out, _ = self.lstm(x)
lstm_out = self.dropout(lstm_out) return self.fc(self.dropout(lstm_out[:, -1, :]))
pooled = lstm_out[:, -1, :] # Letztes verstecktes Zustand
normalized = self.layer_norm(pooled)
logits = self.fc(normalized)
return self.sigmoid(logits)
# Training und Evaluation
def train_model(model, train_loader, val_loader, test_loader, epochs=10): def compute_metrics(predictions, labels):
criterion = nn.BCELoss() mse = mean_squared_error(labels, predictions)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5) r2 = r2_score(labels, predictions)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, verbose=True) return mse, r2
def train_model(model, train_loader, val_loader, test_loader, epochs=10, device='cuda'):
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)
best_val_loss = float('inf') best_val_loss = float('inf')
best_test_accuracy = 0 best_test_r2 = -float('inf')
patience = 3 patience = 3
counter = 0 counter = 0
history = {'train_loss': [], 'val_loss': [], 'test_acc': [], 'test_f1': []}
history = {'train_loss': [], 'val_loss': [], 'test_r2': [], 'test_mse': []}
for epoch in range(epochs): for epoch in range(epochs):
# Training
model.train() model.train()
total_loss = 0 total_loss = 0
start_time = time.time() start_time = time.time()
for batch in train_loader: train_preds, train_labels = [], []
for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", ncols=100):
optimizer.zero_grad() optimizer.zero_grad()
input_ids = batch['input_ids'].to(device) inputs = batch['input_ids'].to(device)
labels = batch['labels'].unsqueeze(1).to(device) labels = batch['labels'].to(device)
outputs = model(inputs)
outputs = model(input_ids) loss = criterion(outputs.squeeze(), labels)
loss = criterion(outputs, labels)
loss.backward() loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), 5) # Gradient Clipping
optimizer.step() optimizer.step()
total_loss += loss.item() total_loss += loss.item()
train_preds.extend(outputs.squeeze().detach().cpu().numpy())
train_labels.extend(labels.cpu().numpy())
avg_train_loss = total_loss / len(train_loader) avg_train_loss = total_loss / len(train_loader)
# Validierung
model.eval() model.eval()
val_loss = 0 val_loss = 0
val_preds, val_labels = [], []
with torch.no_grad(): with torch.no_grad():
for batch in val_loader: for batch in val_loader:
input_ids = batch['input_ids'].to(device) inputs = batch['input_ids'].to(device)
labels = batch['labels'].unsqueeze(1).to(device) labels = batch['labels'].to(device)
outputs = model(input_ids) outputs = model(inputs)
val_loss += criterion(outputs, labels).item() val_loss += criterion(outputs.squeeze(), labels).item()
val_preds.extend(outputs.squeeze().cpu().numpy())
val_labels.extend(labels.cpu().numpy())
avg_val_loss = val_loss / len(val_loader) avg_val_loss = val_loss / len(val_loader)
# Test Evaluation test_preds, test_labels = [], []
test_preds = []
test_labels = []
with torch.no_grad(): with torch.no_grad():
for batch in test_loader: for batch in test_loader:
input_ids = batch['input_ids'].to(device) inputs = batch['input_ids'].to(device)
labels = batch['labels'].unsqueeze(1).to(device) labels = batch['labels'].to(device)
outputs = model(input_ids) outputs = model(inputs)
preds = (outputs > 0.5).float() test_preds.extend(outputs.squeeze().cpu().numpy())
test_preds.extend(preds.cpu().numpy())
test_labels.extend(labels.cpu().numpy()) test_labels.extend(labels.cpu().numpy())
test_accuracy = accuracy_score(test_labels, test_preds) test_mse, test_r2 = compute_metrics(test_preds, test_labels)
test_f1 = f1_score(test_labels, test_preds)
# History aktualisieren
history['train_loss'].append(avg_train_loss) history['train_loss'].append(avg_train_loss)
history['val_loss'].append(avg_val_loss) history['val_loss'].append(avg_val_loss)
history['test_acc'].append(test_accuracy) history['test_r2'].append(test_r2)
history['test_f1'].append(test_f1) history['test_mse'].append(test_mse)
# Lernrate anpassen
scheduler.step(avg_val_loss) scheduler.step(avg_val_loss)
# Ausgabe
epoch_time = time.time() - start_time epoch_time = time.time() - start_time
print(f'Epoch {epoch+1}/{epochs} | Time: {epoch_time:.2f}s') print(f'Epoch {epoch+1}/{epochs} | Time: {epoch_time:.2f}s')
print(f'Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}') print(f'Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}')
print(f'Test Acc: {test_accuracy:.4f} | Test F1: {test_f1:.4f}\n') print(f'Test MSE: {test_mse:.4f} | Test R2: {test_r2:.4f}\n')
# Bestes Modell speichern if test_r2 > best_test_r2:
if test_accuracy > best_test_accuracy: best_test_r2 = test_r2
best_test_accuracy = test_accuracy
torch.save(model.state_dict(), "best_lstm_model.pth") torch.save(model.state_dict(), "best_lstm_model.pth")
print(f"🚀 Neues bestes Modell gespeichert (Acc: {test_accuracy:.4f})") print(f"🚀 New best model saved (R2: {test_r2:.4f})")
# Early Stopping
if avg_val_loss < best_val_loss: if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss best_val_loss = avg_val_loss
counter = 0 counter = 0
else: else:
counter += 1 counter += 1
if counter >= patience: if counter >= patience:
print("⛔ Early Stopping ausgelöst!") print("⛔ Early stopping triggered!")
break break
return history return history
if __name__ == "__main__": if __name__ == "__main__":
# Daten laden (Annahme: Eingebettete Daten sind bereits vorbereitet)
data_path = '/content/drive/MyDrive/Colab Notebooks/ANLP_WS24_CA2/data/embedded_padded' data_path = '/content/drive/MyDrive/Colab Notebooks/ANLP_WS24_CA2/data/embedded_padded'
train_dataset = torch.load(data_path + '/train.pt') train_dataset = torch.load(f'{data_path}/train.pt')
test_dataset = torch.load(data_path + '/test.pt') test_dataset = torch.load(f'{data_path}/test.pt')
val_dataset = torch.load(data_path + '/val.pt') val_dataset = torch.load(f'{data_path}/val.pt')
# Hyperparameter
input_dim = 100 input_dim = 100
hidden_dim = 256 hidden_dim = 1024
num_layers = 2 num_layers = 2
dropout = 0.3 output_dim = 1
batch_size = 64 dropout = 0.2
batch_size = 256
epochs = 5
# DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# Modell initialisieren device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ImprovedLSTMBinaryClassifier( model = LSTMNetwork(input_dim=input_dim, hidden_dim=hidden_dim, num_layers=num_layers, output_dim=output_dim, dropout=dropout).to(device)
input_dim=input_dim,
hidden_dim=hidden_dim,
num_layers=num_layers,
dropout=dropout
).to(device)
# Training starten history = train_model(model, train_loader, val_loader, test_loader, epochs=epochs, device=device)
history = train_model(
model,
train_loader,
val_loader,
test_loader,
epochs=5
)