From 6356cd29bbd75ae4e057dbc398c2b87ed2f4e9ae Mon Sep 17 00:00:00 2001 From: Michelle Goeppinger Date: Fri, 14 Feb 2025 10:09:41 +0100 Subject: [PATCH] new CNN Reg --- TEST_CNN_2.py | 342 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 342 insertions(+) create mode 100644 TEST_CNN_2.py diff --git a/TEST_CNN_2.py b/TEST_CNN_2.py new file mode 100644 index 0000000..f0f52d3 --- /dev/null +++ b/TEST_CNN_2.py @@ -0,0 +1,342 @@ +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader +from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score +from tqdm import tqdm # Fortschrittsbalken-Bibliothek +from dataset_generator import create_embedding_matrix, split_data +from HumorDataset import TextDataset +import numpy as np +import pandas as pd +import os +import matplotlib.pyplot as plt + +# Hyperparameter und Konfigurationen +params = { + "embedding_dim": 100, + "filter_sizes": [2, 3, 4, 5], # Zusätzliche Filtergröße + "num_filters": 150, # Erhöhte Anzahl von Filtern + "batch_size": 32, + "learning_rate": 0.001, + "epochs": 25, + "glove_path": 'data/glove.6B.100d.txt', # Pfad zu GloVe + "max_len": 50, + "test_size": 0.1, + "val_size": 0.1, + "patience": 5, + "data_path": 'data/hack.csv', # Pfad zu den Daten + "dropout": 0.6, # Erhöhtes Dropout + "weight_decay": 5e-4 # L2-Regularisierung +} + +# EarlyStopping-Klasse mit Ordnerprüfung +class EarlyStopping: + def __init__(self, patience=5, verbose=False): + self.patience = patience + self.verbose = verbose + self.counter = 0 + self.best_score = None + self.early_stop = False + + def __call__(self, val_loss, model): + score = -val_loss + if self.best_score is None: + self.best_score = score + self.save_checkpoint(val_loss, model) + elif score < self.best_score: + self.counter += 1 + if self.counter >= self.patience: + self.early_stop = True + else: + self.best_score = score + self.save_checkpoint(val_loss, model) + self.counter = 0 + + def save_checkpoint(self, val_loss, model, filename='checkpoint.pt'): + directory = "checkpoints" + if not os.path.exists(directory): + os.makedirs(directory) # Erstelle den Ordner, falls er nicht existiert + if self.verbose: + print(f'Validation loss decreased ({self.best_score:.6f} --> {val_loss:.6f}). Saving model ...') + torch.save(model.state_dict(), os.path.join(directory, filename)) + +# Plot-Funktion für Training +def plot_learning_curves(history): + epochs = range(1, len(history['train_loss']) + 1) + + # Loss-Plot + plt.figure(figsize=(14, 6)) + plt.subplot(1, 2, 1) + plt.plot(epochs, history['train_loss'], label='Train Loss') + plt.plot(epochs, history['val_loss'], label='Val Loss') + plt.xlabel('Epochs') + plt.ylabel('Loss') + plt.title('Training and Validation Loss') + plt.legend() + + # RMSE-Plot + plt.subplot(1, 2, 2) + plt.plot(epochs, history['train_rmse'], label='Train RMSE') + plt.plot(epochs, history['val_rmse'], label='Val RMSE') + plt.xlabel('Epochs') + plt.ylabel('RMSE') + plt.title('Training and Validation RMSE') + plt.legend() + + plt.tight_layout() + plt.show() + +# Visualisierung der Zielvariablen (Scores) +def visualize_data_distribution(y): + print("\n--- Zielvariable: Statistik ---") + print(f"Min: {np.min(y)}, Max: {np.max(y)}") + print(f"Mittelwert: {np.mean(y):.4f}, Standardabweichung: {np.std(y):.4f}") + + # Histogramm plotten + plt.figure(figsize=(10, 6)) + plt.hist(y, bins=20, color='skyblue', edgecolor='black') + plt.title('Verteilung der Zielvariable (Scores)') + plt.xlabel('Score') + plt.ylabel('Häufigkeit') + plt.grid(axis='y', linestyle='--', alpha=0.7) + plt.show() + +# Funktion zum Laden und Vorverarbeiten der Daten +def load_preprocess_data(path_data='data/hack.csv'): + # Daten laden + df = pd.read_csv(path_data) + + # Fehlende Werte in der Zielspalte entfernen + df = df.dropna(subset=['humor_rating']) + + # Zielvariable aus der Spalte 'humor_rating' extrahieren + df['y'] = df['humor_rating'].astype(float) # Sicherstellen, dass Zielvariable numerisch ist + + # Eingabetexte und Zielvariable zuweisen + X = df['text'] + y = df['y'] + + # Debug-Ausgabe zur Überprüfung + print(f"Erste Zielwerte: {y.head(10)}") + print(f"Datentyp der Zielvariable: {y.dtype}") + print(f"Anzahl der Beispiele: {len(X)}") + + return X, y + +# CNN-Modell für Regression mit erweiterten Regularisierungen +class EnhancedCNNRegressor(nn.Module): + def __init__(self, vocab_size, embedding_dim, filter_sizes, num_filters, embedding_matrix, dropout): + super(EnhancedCNNRegressor, self).__init__() + self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze=False) + + # Convolutional Schichten mit Batch-Normalisierung + self.convs = nn.ModuleList([ + nn.Sequential( + nn.Conv2d(1, num_filters, (fs, embedding_dim)), + nn.BatchNorm2d(num_filters), # Batch-Normalisierung + nn.ReLU(), + nn.MaxPool2d((params["max_len"] - fs + 1, 1)), + nn.Dropout(dropout) # Dropout nach jeder Schicht + ) + for fs in filter_sizes + ]) + + # Fully-Connected Layer + self.fc1 = nn.Linear(len(filter_sizes) * num_filters, 128) # Erweiterte Dense-Schicht + self.fc2 = nn.Linear(128, 1) # Ausgangsschicht (Regression) + self.dropout = nn.Dropout(dropout) + + def forward(self, x): + x = self.embedding(x).unsqueeze(1) # [Batch, 1, Seq, Embedding] + conv_outputs = [conv(x).squeeze(3).squeeze(2) for conv in self.convs] # Pooling reduziert Dim + x = torch.cat(conv_outputs, 1) # Kombiniere Features von allen Filtern + x = torch.relu(self.fc1(x)) # Zusätzliche Dense-Schicht + x = self.dropout(x) + return self.fc2(x).squeeze(1) + +# Device auf CPU setzen +device = torch.device("cpu") +print(f"Using device: {device}") + +# Daten laden und vorbereiten +embedding_matrix, word_index, vocab_size, d_model = create_embedding_matrix( + gloVe_path=params["glove_path"], emb_len=params["embedding_dim"] +) +X, y = load_preprocess_data(path_data=params["data_path"]) + +# Visualisierung der Daten +visualize_data_distribution(y) + +# Aufteilen der Daten +data_split = split_data(X, y, test_size=params["test_size"], val_size=params["val_size"]) + +# Dataset und DataLoader +train_dataset = TextDataset(data_split['train']['X'], data_split['train']['y'], word_index, max_len=params["max_len"]) +val_dataset = TextDataset(data_split['val']['X'], data_split['val']['y'], word_index, max_len=params["max_len"]) +test_dataset = TextDataset(data_split['test']['X'], data_split['test']['y'], word_index, max_len=params["max_len"]) + +train_loader = DataLoader(train_dataset, batch_size=params["batch_size"], shuffle=True) +val_loader = DataLoader(val_dataset, batch_size=params["batch_size"], shuffle=False) +test_loader = DataLoader(test_dataset, batch_size=params["batch_size"], shuffle=False) + +# Modell initialisieren +model = EnhancedCNNRegressor( + vocab_size=vocab_size, + embedding_dim=params["embedding_dim"], + filter_sizes=params["filter_sizes"], + num_filters=params["num_filters"], + embedding_matrix=embedding_matrix, + dropout=params["dropout"] +).to(device) + +criterion = nn.MSELoss() +optimizer = optim.Adam(model.parameters(), lr=params["learning_rate"], weight_decay=params["weight_decay"]) +early_stopping = EarlyStopping(patience=params["patience"], verbose=True) + +# Speicher für Trainingsmetriken +history = { + "train_loss": [], + "val_loss": [], + "train_rmse": [], + "val_rmse": [], +} + +# Training und Validierung +for epoch in range(params["epochs"]): + model.train() + train_loss = 0.0 + train_preds, train_labels = [], [] + + # Fortschrittsbalken für Training innerhalb einer Epoche + with tqdm(train_loader, desc=f"Epoch {epoch + 1}/{params['epochs']}") as pbar: + for X_batch, y_batch in pbar: + X_batch, y_batch = X_batch.to(device), y_batch.to(device).float() + optimizer.zero_grad() + predictions = model(X_batch).float() + loss = criterion(predictions, y_batch) + loss.backward() + optimizer.step() + train_loss += loss.item() + + # Speichere echte und vorhergesagte Werte für Metriken + train_preds.extend(predictions.cpu().detach().numpy()) + train_labels.extend(y_batch.cpu().detach().numpy()) + + # Update der Fortschrittsanzeige + pbar.set_postfix({"Train Loss": loss.item()}) + + train_rmse = np.sqrt(mean_squared_error(train_labels, train_preds)) # RMSE + history["train_loss"].append(train_loss / len(train_loader)) + history["train_rmse"].append(train_rmse) + + # Validation + model.eval() + val_loss = 0.0 + val_preds, val_labels = [], [] + with torch.no_grad(): + for X_batch, y_batch in val_loader: + X_batch, y_batch = X_batch.to(device), y_batch.to(device).float() + predictions = model(X_batch).float() + loss = criterion(predictions, y_batch) + val_loss += loss.item() + + val_preds.extend(predictions.cpu().detach().numpy()) + val_labels.extend(y_batch.cpu().detach().numpy()) + + val_rmse = np.sqrt(mean_squared_error(val_labels, val_preds)) # RMSE + history["val_loss"].append(val_loss / len(val_loader)) + history["val_rmse"].append(val_rmse) + + print(f"\nEpoch {epoch + 1}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}") + print(f"Train RMSE: {train_rmse:.4f}, Val RMSE: {val_rmse:.4f}") + + early_stopping(val_rmse, model) + if early_stopping.early_stop: + print("Early stopping triggered.") + break + +# Plot der Lernkurven +plot_learning_curves(history) +# Funktion zur Visualisierung der richtigen und falschen Vorhersagen +def visualize_predictions(true_values, predicted_values): + plt.figure(figsize=(10, 6)) + + # Unterschied zwischen vorhergesagten und wahren Werten + correct_indices = np.isclose(true_values, predicted_values, atol=0.3) # Als korrekt angenommen, wenn Differenz <= 0.3 + + # Plot + plt.scatter(true_values[correct_indices], predicted_values[correct_indices], color='green', label='Richtig vorhergesagt') + plt.scatter(true_values[~correct_indices], predicted_values[~correct_indices], color='red', label='Falsch vorhergesagt') + plt.plot([min(true_values), max(true_values)], [min(true_values), max(true_values)], color='blue', linestyle='--', label='Ideal-Linie') + + plt.xlabel('Wahre Werte') + plt.ylabel('Vorhergesagte Werte') + plt.title('Richtige vs Falsche Vorhersagen') + plt.legend() + plt.grid(True) + plt.show() + +# Test Evaluation +model.eval() +test_preds, test_labels = [], [] +with torch.no_grad(): + for X_batch, y_batch in test_loader: + X_batch, y_batch = X_batch.to(device), y_batch.to(device).float() + predictions = model(X_batch).float() + test_preds.extend(predictions.cpu().detach().numpy()) + test_labels.extend(y_batch.cpu().detach().numpy()) + +# Konvertierung zu NumPy-Arrays +true_values = np.array(test_labels) +predicted_values = np.array(test_preds) + +# Visualisierung der Ergebnisse +visualize_predictions(true_values, predicted_values) + +# RMSE, MAE und R²-Score für das Test-Set +test_rmse = np.sqrt(mean_squared_error(test_labels, test_preds)) +test_mae = mean_absolute_error(test_labels, test_preds) +test_r2 = r2_score(test_labels, test_preds) +print(f"Test RMSE: {test_rmse:.4f}, Test MAE: {test_mae:.4f}, Test R²: {test_r2:.4f}") + +# Funktion zur Visualisierung der richtigen und falschen Vorhersagen +def visualize_predictions(true_values, predicted_values): + plt.figure(figsize=(10, 6)) + + # Unterschied zwischen vorhergesagten und wahren Werten + correct_indices = np.isclose(true_values, predicted_values, atol=0.3) # Als korrekt angenommen, wenn Differenz <= 0.3 + + # Plot + plt.scatter(true_values[correct_indices], predicted_values[correct_indices], color='green', label='Richtig vorhergesagt') + plt.scatter(true_values[~correct_indices], predicted_values[~correct_indices], color='red', label='Falsch vorhergesagt') + plt.plot([min(true_values), max(true_values)], [min(true_values), max(true_values)], color='blue', linestyle='--', label='Ideal-Linie') + + plt.xlabel('Wahre Werte') + plt.ylabel('Vorhergesagte Werte') + plt.title('Richtige vs Falsche Vorhersagen') + plt.legend() + plt.grid(True) + plt.show() + +# Test Evaluation +model.eval() +test_preds, test_labels = [], [] +with torch.no_grad(): + for X_batch, y_batch in test_loader: + X_batch, y_batch = X_batch.to(device), y_batch.to(device).float() + predictions = model(X_batch).float() + test_preds.extend(predictions.cpu().detach().numpy()) + test_labels.extend(y_batch.cpu().detach().numpy()) + +# Konvertierung zu NumPy-Arrays +true_values = np.array(test_labels) +predicted_values = np.array(test_preds) + +# Visualisierung der Ergebnisse +visualize_predictions(true_values, predicted_values) + +# RMSE, MAE und R²-Score für das Test-Set +test_rmse = np.sqrt(mean_squared_error(test_labels, test_preds)) +test_mae = mean_absolute_error(test_labels, test_preds) +test_r2 = r2_score(test_labels, test_preds) +print(f"Test RMSE: {test_rmse:.4f}, Test MAE: {test_mae:.4f}, Test R²: {test_r2:.4f}")