import random import torch import torch.nn as nn import torch.optim as optim import matplotlib.pyplot as plt from torch.utils.data import DataLoader, Subset from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score import numpy as np import Datasets import dataset_helper import EarlyStopping import ml_helper import ml_history import ml_train SEED = 501 random.seed(SEED) np.random.seed(SEED) torch.manual_seed(SEED) torch.cuda.manual_seed_all(SEED) torch.backends.cudnn.deterministic = True class EnhancedCNNRegressor(nn.Module): def __init__(self, vocab_size, embedding_dim, filter_sizes, num_filters, embedding_matrix, dropout): super(EnhancedCNNRegressor, self).__init__() self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze=False) # Convolutional Schichten mit Batch-Normalisierung self.convs = nn.ModuleList([ nn.Sequential( nn.Conv2d(1, num_filters, (fs, embedding_dim)), nn.BatchNorm2d(num_filters), # Batch-Normalisierung nn.ReLU(), nn.MaxPool2d((params["max_len"] - fs + 1, 1)), nn.Dropout(dropout) # Dropout nach jeder Schicht ) for fs in filter_sizes ]) # Fully-Connected Layer self.fc1 = nn.Linear(len(filter_sizes) * num_filters, 128) # Erweiterte Dense-Schicht self.fc2 = nn.Linear(128, 1) # Ausgangsschicht (Regression) self.dropout = nn.Dropout(dropout) def forward(self, x): x = self.embedding(x).unsqueeze(1) # [Batch, 1, Seq, Embedding] conv_outputs = [conv(x).squeeze(3).squeeze(2) for conv in self.convs] # Pooling reduziert Dim x = torch.cat(conv_outputs, 1) # Kombiniere Features von allen Filtern x = torch.relu(self.fc1(x)) # Zusätzliche Dense-Schicht x = self.dropout(x) return self.fc2(x).squeeze(1) def train_model(model, train_dataset, test_dataset, criterion, optimizer, epochs, batch_size): train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) test_losses, train_losses = [], [] train_r2_scores, test_r2_scores = [], [] for epoch in range(epochs): model.train() running_loss = 0.0 running_r2 = 0.0 # Training for inputs, labels in train_loader: inputs = inputs.to(device) labels = labels.to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() running_r2 += r2_score(labels.cpu().numpy(), outputs.cpu().detach().numpy()) train_losses.append(running_loss / len(train_loader)) train_r2_scores.append(running_r2 / len(train_loader)) # Test model.eval() # Set model to evaluation mode test_loss = 0.0 test_r2 = 0.0 with torch.no_grad(): # No gradient calculation for testing for inputs, labels in test_loader: inputs = inputs.to(device) labels = labels.to(device) outputs = model(inputs) loss = criterion(outputs, labels) test_loss += loss.item() test_r2 += r2_score(labels.cpu().numpy(), outputs.cpu().detach().numpy()) test_losses.append(test_loss / len(test_loader)) test_r2_scores.append(test_r2 / len(test_loader)) print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {train_losses[-1]:.4f}, Train R²: {train_r2_scores[-1]:.4f}, Test Loss: {test_losses[-1]:.4f}, Test R²: {test_r2_scores[-1]:.4f}') return train_losses, test_losses, train_r2_scores, test_r2_scores # Bootstrap Aggregation (Bagging) Update def bootstrap_aggregation(ModelClass, train_dataset, test_dataset, num_models=5, epochs=10, batch_size=32, learning_rate=0.001): models = [] all_train_losses, all_test_losses = [], [] all_train_r2_scores, all_test_r2_scores = [], [] subset_size = len(train_dataset) // num_models for i in range(num_models): print(f"Training Model {i + 1}/{num_models}...") start_idx = i * subset_size end_idx = start_idx + subset_size subset_indices = list(range(0, start_idx)) + list(range(end_idx, len(train_dataset))) subset = Subset(train_dataset, subset_indices) model = ModelClass(vocab_size, EMBEDDING_DIM, params["filter_sizes"], params["num_filters"], embedding_matrix, params["dropout"]) model.to(device) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=learning_rate) train_losses, test_losses, train_r2_scores, test_r2_scores = train_model(model, subset, test_dataset, criterion, optimizer, epochs, batch_size) models.append(model) all_train_losses.append(train_losses) all_test_losses.append(test_losses) all_train_r2_scores.append(train_r2_scores) all_test_r2_scores.append(test_r2_scores) # Plot für alle Modelle plt.figure(figsize=(12, 6)) for i in range(num_models): plt.plot(all_train_losses[i], label=f'Model {i + 1} Train Loss') plt.plot(all_test_losses[i], label=f'Model {i + 1} Test Loss', linestyle = 'dashed') plt.title("Training and Test Loss for all Models") plt.xlabel('Epochs') plt.ylabel('Loss') plt.legend() plt.show() plt.figure(figsize=(12, 6)) for i in range(num_models): plt.plot(all_train_r2_scores[i], label=f'Model {i + 1} Train R²') plt.plot(all_test_r2_scores[i], label=f'Model {i + 1} Test R²', linestyle = 'dashed') plt.title("Training and Test R² for all Models") plt.xlabel('Epochs') plt.ylabel('R²') plt.legend() plt.show() return models, all_train_losses, all_test_losses, all_train_r2_scores, all_test_r2_scores # Ensemble Prediction def ensemble_predict(models, test_dataset): dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False) all_predictions = [] with torch.no_grad(): for inputs, _ in dataloader: inputs = inputs.to(device) predictions = torch.stack([model(inputs).squeeze() for model in models]) avg_predictions = predictions.mean(dim=0) all_predictions.extend(avg_predictions.cpu().numpy()) return np.array(all_predictions) if __name__ == '__main__': # Hyperparameter und Konfigurationen params = { # Config "max_len": 280, # Training "epochs": 2, "patience": 7, "batch_size": 16, "learning_rate": 0.001, "weight_decay": 5e-4 , # Model "filter_sizes": [2, 3, 4, 5], "num_filters": 150, "dropout": 0.6 } # Configs MODEL_NAME = 'CNN.pt' HIST_NAME = 'CNN_history' GLOVE_PATH = 'data/glove.6B.100d.txt' DATA_PATH = 'data/hack.csv' EMBEDDING_DIM = 100 TEST_SIZE = 0.1 VAL_SIZE = 0.1 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # Daten laden und vorbereiten embedding_matrix, word_index, vocab_size, d_model = dataset_helper.get_embedding_matrix( gloVe_path=GLOVE_PATH, emb_len=EMBEDDING_DIM) X, y = dataset_helper.load_preprocess_data(path_data=DATA_PATH, verbose=True) # Aufteilen der Daten data_split = dataset_helper.split_data(X, y, test_size=TEST_SIZE, val_size=VAL_SIZE) # Dataset und DataLoader train_dataset = Datasets.GloveDataset(data_split['train']['X'], data_split['train']['y'], word_index, max_len=params["max_len"]) val_dataset = Datasets.GloveDataset(data_split['val']['X'], data_split['val']['y'], word_index, max_len=params["max_len"]) test_dataset = Datasets.GloveDataset(data_split['test']['X'], data_split['test']['y'], word_index, max_len=params["max_len"]) # Bootstrap Aggregation (Bagging) Training models, all_train_losses, all_test_losses, all_train_r2_scores, all_test_r2_scores = bootstrap_aggregation( EnhancedCNNRegressor, train_dataset, test_dataset, num_models=2, epochs=params["epochs"], batch_size=params["batch_size"], learning_rate=params["learning_rate"]) # Ensemble Prediction test_predictions = ensemble_predict(models, test_dataset) # Test Evaluation # test_labels = np.array([y for _, y in test_dataset]) test_mse = mean_squared_error(test_dataset.labels.to_numpy(), test_predictions) test_mae = mean_absolute_error(test_dataset.labels.to_numpy(), test_predictions) test_r2 = r2_score(test_dataset.labels.to_numpy(), test_predictions) print(f"Test RMSE: {test_mse:.4f}, Test MAE: {test_mae:.4f}, Test R²: {test_r2:.4f}")