import pandas as pd import numpy as np import torch import torch.nn as nn from torch.utils.data import DataLoader, Dataset from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score import matplotlib.pyplot as plt import matplotlib.patches as mpatches from tqdm import tqdm from dataset_generator import create_embedding_matrix from EarlyStopping import EarlyStopping import torch.optim as optim from torch.utils.data import DataLoader, Dataset, Subset # Import Subset #from utils import tokenize_and_pad, HumorDataset, evaluate_model, bootstrap_aggregation def train_model(model, train_dataset, val_dataset, criterion, optimizer, epochs, batch_size): train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) model.to(device) history = {'train_loss': [], 'val_loss': [], 'train_r2': [], 'val_r2': []} for epoch in range(epochs): model.train() total_loss = 0 all_train_preds, all_train_targets = [], [] for inputs, targets in train_dataloader: inputs, targets = inputs.to(device), targets.to(device) optimizer.zero_grad() outputs = model(inputs).squeeze() loss = criterion(outputs, targets) loss.backward() optimizer.step() total_loss += loss.item() all_train_preds.extend(outputs.detach().cpu().numpy()) all_train_targets.extend(targets.detach().cpu().numpy()) train_r2 = r2_score(all_train_targets, all_train_preds) train_loss = total_loss / len(train_dataloader) history['train_loss'].append(train_loss) history['train_r2'].append(train_r2) # **Validierung nach jeder Epoche** model.eval() val_loss = 0 all_val_preds, all_val_targets = [], [] with torch.no_grad(): for inputs, targets in val_dataloader: inputs, targets = inputs.to(device), targets.to(device) outputs = model(inputs).squeeze() loss = criterion(outputs, targets) val_loss += loss.item() all_val_preds.extend(outputs.cpu().numpy()) all_val_targets.extend(targets.cpu().numpy()) val_r2 = r2_score(all_val_targets, all_val_preds) val_loss /= len(val_dataloader) history['val_loss'].append(val_loss) history['val_r2'].append(val_r2) print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Train R²: {train_r2:.4f}, Val R²: {val_r2:.4f}") return history # **Gibt die Verlaufsdaten zurück** def bootstrap_aggregation(ModelClass, train_dataset, num_models=3, epochs=5, batch_size=32, learning_rate=0.001): models = [] all_histories = [] # **Speichert Trainingsverlauf aller Modelle** subset_size = len(train_dataset) // num_models for i in range(num_models): print(f"Training Model {i+1}/{num_models}...") start_idx = i * subset_size end_idx = start_idx + subset_size subset_indices = list(range(0, start_idx)) + list(range(end_idx, len(train_dataset))) subset = Subset(train_dataset, subset_indices) # **Validierungsdaten als restliche Daten** val_indices = list(range(start_idx, end_idx)) val_subset = Subset(train_dataset, val_indices) model = ModelClass() criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=learning_rate) history = train_model(model, subset, val_subset, criterion, optimizer, epochs, batch_size) all_histories.append(history) # **Speichere Verlaufsdaten** models.append(model) return models, all_histories def ensemble_predict(models, test_dataset): dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False) all_predictions = [] with torch.no_grad(): for inputs, _ in dataloader: inputs = inputs.to(device) predictions = torch.stack([model(inputs).squeeze() for model in models]) avg_predictions = predictions.mean(dim=0) # Mittelwert über alle Modelle all_predictions.extend(avg_predictions.cpu().numpy()) return np.array(all_predictions) import matplotlib.pyplot as plt def plot_training_histories(histories, num_models): epochs = range(1, len(histories[0]['train_loss']) + 1) fig, axes = plt.subplots(1, 2, figsize=(14, 5)) # **Links: Trainings- und Validierungsverlust** for i in range(num_models): axes[0].plot(epochs, histories[i]['train_loss'], label=f"Train Loss Model {i+1}") axes[0].plot(epochs, histories[i]['val_loss'], linestyle='dashed', label=f"Val Loss Model {i+1}") axes[0].set_title("Train & Validation Loss") axes[0].set_xlabel("Epochs") axes[0].set_ylabel("Loss") axes[0].legend() # **Rechts: R²-Werte für Training und Validierung** for i in range(num_models): axes[1].plot(epochs, histories[i]['train_r2'], label=f"Train R² Model {i+1}") axes[1].plot(epochs, histories[i]['val_r2'], linestyle='dashed', label=f"Val R² Model {i+1}") axes[1].set_title("Train & Validation R² Score") axes[1].set_xlabel("Epochs") axes[1].set_ylabel("R² Score") axes[1].legend() plt.show() # 1. Gerät automatisch erkennen device = torch.device('mps' if torch.backends.mps.is_available() else 'cuda' if torch.cuda.is_available() else 'cpu') print(f"Using device: {device}") # 2. Daten laden data = pd.read_csv('data/hack.csv') # 3. Filtern humorvoller Texte humor_data = data[data['is_humor'] == 1].dropna(subset=['humor_rating']).copy() # 4. Einbettungsmatrix erstellen embedding_matrix, word_index, vocab_size, d_model = create_embedding_matrix( gloVe_path='data/glove.6B.100d.txt', emb_len=100 ) print(f"vocab_size: {vocab_size}, d_model: {d_model}") # 5. Tokenisierung und Padding def tokenize_and_pad(texts, word_index, max_len=50): sequences = [] for text in texts: tokens = [word_index.get(word, 0) for word in text.split()] if len(tokens) < max_len: tokens += [0] * (max_len - len(tokens)) else: tokens = tokens[:max_len] sequences.append(tokens) return torch.tensor(sequences, dtype=torch.long) max_len = 50 train_texts, test_texts, train_labels, test_labels = train_test_split( humor_data['text'], humor_data['humor_rating'], test_size=0.2, random_state=42 ) train_input_ids = tokenize_and_pad(train_texts, word_index, max_len=max_len) test_input_ids = tokenize_and_pad(test_texts, word_index, max_len=max_len) # Labels in Tensor konvertieren train_labels = torch.tensor(train_labels.values, dtype=torch.float) test_labels = torch.tensor(test_labels.values, dtype=torch.float) # 6. Dataset und DataLoader class HumorDataset(Dataset): def __init__(self, input_ids, labels): self.input_ids = input_ids self.labels = labels def __len__(self): return len(self.input_ids) def __getitem__(self, idx): return self.input_ids[idx], self.labels[idx] dataset = HumorDataset(train_input_ids, train_labels) # 7. CNN-Regression-Modell def create_cnn(vocab_size, embed_dim, embedding_matrix): class CNNRegressor(nn.Module): def __init__(self, vocab_size, embed_dim, embedding_matrix): super(CNNRegressor, self).__init__() self.embedding = nn.Embedding(vocab_size, embed_dim) self.embedding.weight.data.copy_(embedding_matrix.clone().detach()) self.embedding.weight.requires_grad = False self.conv1 = nn.Conv1d(embed_dim, 128, kernel_size=3) self.conv2 = nn.Conv1d(128, 64, kernel_size=3) self.dropout = nn.Dropout(0.5) self.fc = nn.Linear(64, 1) def forward(self, x): x = self.embedding(x).permute(0, 2, 1) x = torch.relu(self.conv1(x)) x = torch.relu(self.conv2(x)) x = self.dropout(x) x = torch.max(x, dim=2).values x = self.fc(x) return torch.sigmoid(x) * 5 return CNNRegressor(vocab_size, embed_dim, embedding_matrix) # 8. Bootstrap Aggregation mit CNN models, histories = bootstrap_aggregation( lambda: create_cnn(vocab_size, d_model, embedding_matrix), dataset, num_models=5, epochs=10, batch_size=32, learning_rate=0.001 ) # **Plot Training & Validation Loss & R²** plot_training_histories(histories, num_models=5) # Vorhersagen mit Ensemble predictions = ensemble_predict(models, HumorDataset(test_input_ids, test_labels)) actuals = test_labels.numpy() # 9. Metriken berechnen mse = mean_squared_error(actuals, predictions) mae = mean_absolute_error(actuals, predictions) r2 = r2_score(actuals, predictions) print(f"MSE: {mse:.4f}, MAE: {mae:.4f}, R²: {r2:.4f}") # 10. Visualisierung tolerance = 0.5 # Toleranz für korrekte Vorhersagen predictions = np.array(predictions) actuals = np.array(actuals) correct = np.abs(predictions - actuals) <= tolerance colors = np.where(correct, 'green', 'red') plt.figure(figsize=(8, 6)) plt.scatter(actuals, predictions, c=colors, alpha=0.6, edgecolor='k', s=50) plt.plot([0, 5], [0, 5], color='red', linestyle='--') green_patch = mpatches.Patch(color='green', label='Correct Predictions') red_patch = mpatches.Patch(color='red', label='Incorrect Predictions') plt.legend(handles=[green_patch, red_patch]) plt.xlabel("True Humor Ratings") plt.ylabel("Predicted Humor Ratings") plt.title("True vs Predicted Humor Ratings (Correct vs Incorrect)") plt.show()