import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score from tqdm import tqdm # Fortschrittsbalken-Bibliothek from dataset_generator import create_embedding_matrix, split_data from HumorDataset import TextDataset import numpy as np import pandas as pd import os import matplotlib.pyplot as plt # Hyperparameter und Konfigurationen params = { "embedding_dim": 100, "filter_sizes": [2, 3, 4, 5], # Zusätzliche Filtergröße "num_filters": 150, # Erhöhte Anzahl von Filtern "batch_size": 32, "learning_rate": 0.001, "epochs": 25, "glove_path": 'data/glove.6B.100d.txt', # Pfad zu GloVe "max_len": 50, "test_size": 0.1, "val_size": 0.1, "patience": 5, "data_path": 'data/hack.csv', # Pfad zu den Daten "dropout": 0.6, # Erhöhtes Dropout "weight_decay": 5e-4 # L2-Regularisierung } # EarlyStopping-Klasse mit Ordnerprüfung class EarlyStopping: def __init__(self, patience=5, verbose=False): self.patience = patience self.verbose = verbose self.counter = 0 self.best_score = None self.early_stop = False def __call__(self, val_loss, model): score = -val_loss if self.best_score is None: self.best_score = score self.save_checkpoint(val_loss, model) elif score < self.best_score: self.counter += 1 if self.counter >= self.patience: self.early_stop = True else: self.best_score = score self.save_checkpoint(val_loss, model) self.counter = 0 def save_checkpoint(self, val_loss, model, filename='checkpoint.pt'): directory = "checkpoints" if not os.path.exists(directory): os.makedirs(directory) # Erstelle den Ordner, falls er nicht existiert if self.verbose: print(f'Validation loss decreased ({self.best_score:.6f} --> {val_loss:.6f}). Saving model ...') torch.save(model.state_dict(), os.path.join(directory, filename)) # Plot-Funktion für Training def plot_learning_curves(history): epochs = range(1, len(history['train_loss']) + 1) # Loss-Plot plt.figure(figsize=(14, 6)) plt.subplot(1, 2, 1) plt.plot(epochs, history['train_loss'], label='Train Loss') plt.plot(epochs, history['val_loss'], label='Val Loss') plt.xlabel('Epochs') plt.ylabel('Loss') plt.title('Training and Validation Loss') plt.legend() # RMSE-Plot plt.subplot(1, 2, 2) plt.plot(epochs, history['train_rmse'], label='Train RMSE') plt.plot(epochs, history['val_rmse'], label='Val RMSE') plt.xlabel('Epochs') plt.ylabel('RMSE') plt.title('Training and Validation RMSE') plt.legend() plt.tight_layout() plt.show() # Visualisierung der Zielvariablen (Scores) def visualize_data_distribution(y): print("\n--- Zielvariable: Statistik ---") print(f"Min: {np.min(y)}, Max: {np.max(y)}") print(f"Mittelwert: {np.mean(y):.4f}, Standardabweichung: {np.std(y):.4f}") # Histogramm plotten plt.figure(figsize=(10, 6)) plt.hist(y, bins=20, color='skyblue', edgecolor='black') plt.title('Verteilung der Zielvariable (Scores)') plt.xlabel('Score') plt.ylabel('Häufigkeit') plt.grid(axis='y', linestyle='--', alpha=0.7) plt.show() # Funktion zum Laden und Vorverarbeiten der Daten def load_preprocess_data(path_data='data/hack.csv'): # Daten laden df = pd.read_csv(path_data) # Fehlende Werte in der Zielspalte entfernen df = df.dropna(subset=['humor_rating']) # Zielvariable aus der Spalte 'humor_rating' extrahieren df['y'] = df['humor_rating'].astype(float) # Sicherstellen, dass Zielvariable numerisch ist # Eingabetexte und Zielvariable zuweisen X = df['text'] y = df['y'] # Debug-Ausgabe zur Überprüfung print(f"Erste Zielwerte: {y.head(10)}") print(f"Datentyp der Zielvariable: {y.dtype}") print(f"Anzahl der Beispiele: {len(X)}") return X, y # CNN-Modell für Regression mit erweiterten Regularisierungen class EnhancedCNNRegressor(nn.Module): def __init__(self, vocab_size, embedding_dim, filter_sizes, num_filters, embedding_matrix, dropout): super(EnhancedCNNRegressor, self).__init__() self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze=False) # Convolutional Schichten mit Batch-Normalisierung self.convs = nn.ModuleList([ nn.Sequential( nn.Conv2d(1, num_filters, (fs, embedding_dim)), nn.BatchNorm2d(num_filters), # Batch-Normalisierung nn.ReLU(), nn.MaxPool2d((params["max_len"] - fs + 1, 1)), nn.Dropout(dropout) # Dropout nach jeder Schicht ) for fs in filter_sizes ]) # Fully-Connected Layer self.fc1 = nn.Linear(len(filter_sizes) * num_filters, 128) # Erweiterte Dense-Schicht self.fc2 = nn.Linear(128, 1) # Ausgangsschicht (Regression) self.dropout = nn.Dropout(dropout) def forward(self, x): x = self.embedding(x).unsqueeze(1) # [Batch, 1, Seq, Embedding] conv_outputs = [conv(x).squeeze(3).squeeze(2) for conv in self.convs] # Pooling reduziert Dim x = torch.cat(conv_outputs, 1) # Kombiniere Features von allen Filtern x = torch.relu(self.fc1(x)) # Zusätzliche Dense-Schicht x = self.dropout(x) return self.fc2(x).squeeze(1) # Device auf CPU setzen device = torch.device("cpu") print(f"Using device: {device}") # Daten laden und vorbereiten embedding_matrix, word_index, vocab_size, d_model = create_embedding_matrix( gloVe_path=params["glove_path"], emb_len=params["embedding_dim"] ) X, y = load_preprocess_data(path_data=params["data_path"]) # Visualisierung der Daten visualize_data_distribution(y) # Aufteilen der Daten data_split = split_data(X, y, test_size=params["test_size"], val_size=params["val_size"]) # Dataset und DataLoader train_dataset = TextDataset(data_split['train']['X'], data_split['train']['y'], word_index, max_len=params["max_len"]) val_dataset = TextDataset(data_split['val']['X'], data_split['val']['y'], word_index, max_len=params["max_len"]) test_dataset = TextDataset(data_split['test']['X'], data_split['test']['y'], word_index, max_len=params["max_len"]) train_loader = DataLoader(train_dataset, batch_size=params["batch_size"], shuffle=True) val_loader = DataLoader(val_dataset, batch_size=params["batch_size"], shuffle=False) test_loader = DataLoader(test_dataset, batch_size=params["batch_size"], shuffle=False) # Modell initialisieren model = EnhancedCNNRegressor( vocab_size=vocab_size, embedding_dim=params["embedding_dim"], filter_sizes=params["filter_sizes"], num_filters=params["num_filters"], embedding_matrix=embedding_matrix, dropout=params["dropout"] ).to(device) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=params["learning_rate"], weight_decay=params["weight_decay"]) early_stopping = EarlyStopping(patience=params["patience"], verbose=True) # Speicher für Trainingsmetriken history = { "train_loss": [], "val_loss": [], "train_rmse": [], "val_rmse": [], } # Training und Validierung for epoch in range(params["epochs"]): model.train() train_loss = 0.0 train_preds, train_labels = [], [] # Fortschrittsbalken für Training innerhalb einer Epoche with tqdm(train_loader, desc=f"Epoch {epoch + 1}/{params['epochs']}") as pbar: for X_batch, y_batch in pbar: X_batch, y_batch = X_batch.to(device), y_batch.to(device).float() optimizer.zero_grad() predictions = model(X_batch).float() loss = criterion(predictions, y_batch) loss.backward() optimizer.step() train_loss += loss.item() # Speichere echte und vorhergesagte Werte für Metriken train_preds.extend(predictions.cpu().detach().numpy()) train_labels.extend(y_batch.cpu().detach().numpy()) # Update der Fortschrittsanzeige pbar.set_postfix({"Train Loss": loss.item()}) train_rmse = np.sqrt(mean_squared_error(train_labels, train_preds)) # RMSE history["train_loss"].append(train_loss / len(train_loader)) history["train_rmse"].append(train_rmse) # Validation model.eval() val_loss = 0.0 val_preds, val_labels = [], [] with torch.no_grad(): for X_batch, y_batch in val_loader: X_batch, y_batch = X_batch.to(device), y_batch.to(device).float() predictions = model(X_batch).float() loss = criterion(predictions, y_batch) val_loss += loss.item() val_preds.extend(predictions.cpu().detach().numpy()) val_labels.extend(y_batch.cpu().detach().numpy()) val_rmse = np.sqrt(mean_squared_error(val_labels, val_preds)) # RMSE history["val_loss"].append(val_loss / len(val_loader)) history["val_rmse"].append(val_rmse) print(f"\nEpoch {epoch + 1}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}") print(f"Train RMSE: {train_rmse:.4f}, Val RMSE: {val_rmse:.4f}") early_stopping(val_rmse, model) if early_stopping.early_stop: print("Early stopping triggered.") break # Plot der Lernkurven plot_learning_curves(history) # Funktion zur Visualisierung der richtigen und falschen Vorhersagen def visualize_predictions(true_values, predicted_values): plt.figure(figsize=(10, 6)) # Unterschied zwischen vorhergesagten und wahren Werten correct_indices = np.isclose(true_values, predicted_values, atol=0.3) # Als korrekt angenommen, wenn Differenz <= 0.3 # Plot plt.scatter(true_values[correct_indices], predicted_values[correct_indices], color='green', label='Richtig vorhergesagt') plt.scatter(true_values[~correct_indices], predicted_values[~correct_indices], color='red', label='Falsch vorhergesagt') plt.plot([min(true_values), max(true_values)], [min(true_values), max(true_values)], color='blue', linestyle='--', label='Ideal-Linie') plt.xlabel('Wahre Werte') plt.ylabel('Vorhergesagte Werte') plt.title('Richtige vs Falsche Vorhersagen') plt.legend() plt.grid(True) plt.show() # Test Evaluation model.eval() test_preds, test_labels = [], [] with torch.no_grad(): for X_batch, y_batch in test_loader: X_batch, y_batch = X_batch.to(device), y_batch.to(device).float() predictions = model(X_batch).float() test_preds.extend(predictions.cpu().detach().numpy()) test_labels.extend(y_batch.cpu().detach().numpy()) # Konvertierung zu NumPy-Arrays true_values = np.array(test_labels) predicted_values = np.array(test_preds) # Visualisierung der Ergebnisse visualize_predictions(true_values, predicted_values) # RMSE, MAE und R²-Score für das Test-Set test_rmse = np.sqrt(mean_squared_error(test_labels, test_preds)) test_mae = mean_absolute_error(test_labels, test_preds) test_r2 = r2_score(test_labels, test_preds) print(f"Test RMSE: {test_rmse:.4f}, Test MAE: {test_mae:.4f}, Test R²: {test_r2:.4f}") # Funktion zur Visualisierung der richtigen und falschen Vorhersagen def visualize_predictions(true_values, predicted_values): plt.figure(figsize=(10, 6)) # Unterschied zwischen vorhergesagten und wahren Werten correct_indices = np.isclose(true_values, predicted_values, atol=0.3) # Als korrekt angenommen, wenn Differenz <= 0.3 # Plot plt.scatter(true_values[correct_indices], predicted_values[correct_indices], color='green', label='Richtig vorhergesagt') plt.scatter(true_values[~correct_indices], predicted_values[~correct_indices], color='red', label='Falsch vorhergesagt') plt.plot([min(true_values), max(true_values)], [min(true_values), max(true_values)], color='blue', linestyle='--', label='Ideal-Linie') plt.xlabel('Wahre Werte') plt.ylabel('Vorhergesagte Werte') plt.title('Richtige vs Falsche Vorhersagen') plt.legend() plt.grid(True) plt.show() # Test Evaluation model.eval() test_preds, test_labels = [], [] with torch.no_grad(): for X_batch, y_batch in test_loader: X_batch, y_batch = X_batch.to(device), y_batch.to(device).float() predictions = model(X_batch).float() test_preds.extend(predictions.cpu().detach().numpy()) test_labels.extend(y_batch.cpu().detach().numpy()) # Konvertierung zu NumPy-Arrays true_values = np.array(test_labels) predicted_values = np.array(test_preds) # Visualisierung der Ergebnisse visualize_predictions(true_values, predicted_values) # RMSE, MAE und R²-Score für das Test-Set test_rmse = np.sqrt(mean_squared_error(test_labels, test_preds)) test_mae = mean_absolute_error(test_labels, test_preds) test_r2 = r2_score(test_labels, test_preds) print(f"Test RMSE: {test_rmse:.4f}, Test MAE: {test_mae:.4f}, Test R²: {test_r2:.4f}")