import math import random import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Subset from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score import numpy as np from datetime import datetime import json import Datasets import dataset_helper import EarlyStopping import ml_helper import ml_history import ml_train SEED = 501 random.seed(SEED) np.random.seed(SEED) torch.manual_seed(SEED) torch.cuda.manual_seed_all(SEED) torch.backends.cudnn.deterministic = True class PositionalEncoding(nn.Module): """ https://pytorch.org/tutorials/beginner/transformer_tutorial.html """ def __init__(self, d_model, vocab_size=5000, dropout=0.1): super().__init__() self.dropout = nn.Dropout(p=dropout) pe = torch.zeros(vocab_size, d_model) position = torch.arange(0, vocab_size, dtype=torch.float).unsqueeze(1) div_term = torch.exp( torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model) ) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) self.register_buffer("pe", pe) def forward(self, x): x = x + self.pe[:, : x.size(1), :] return self.dropout(x) class TransformerBinaryClassifier(nn.Module): """ Text classifier based on a pytorch TransformerEncoder. """ def __init__( self, embeddings, nhead=8, dim_feedforward=2048, num_layers=6, positional_dropout=0.1, classifier_dropout=0.1, ): super().__init__() vocab_size, d_model = embeddings.size() assert d_model % nhead == 0, "nheads must divide evenly into d_model" self.emb = nn.Embedding.from_pretrained(embeddings, freeze=False) self.pos_encoder = PositionalEncoding( d_model=d_model, dropout=positional_dropout, vocab_size=vocab_size, ) encoder_layer = nn.TransformerEncoderLayer( d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=classifier_dropout, ) self.transformer_encoder = nn.TransformerEncoder( encoder_layer, num_layers=num_layers, ) # normalize to stabilize and stop overfitting self.batch_norm = nn.BatchNorm1d(d_model) self.classifier = nn.Linear(d_model, 1) self.d_model = d_model def forward(self, x): x = self.emb(x) * math.sqrt(self.d_model) x = self.pos_encoder(x) x = self.transformer_encoder(x) x = x.mean(dim=1) # normalize to stabilize and stop overfitting #x = self.batch_norm(x) #NOTE: no activation function for regression x = self.classifier(x) x = x.squeeze(1) return x if __name__ == '__main__': # Hyperparameter und Konfigurationen params = { # Config "max_len": 280, # Training "epochs": 1, "patience": 7, "batch_size": 32, "learning_rate": 1e-4, # 1e-4 "weight_decay": 5e-4 , # Model 'nhead': 2, # 5 "dropout": 0.2, 'hiden_dim': 2048, 'num_layers': 6 } # Configs GLOVE_PATH = 'data/glove.6B.100d.txt' DATA_PATH = 'data/hack.csv' EMBEDDING_DIM = 100 TEST_SIZE = 0.1 VAL_SIZE = 0.1 N_MODELS = 2 models = [] timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") # Daten laden und vorbereiten embedding_matrix, word_index, vocab_size, d_model = dataset_helper.get_embedding_matrix( gloVe_path=GLOVE_PATH, emb_len=EMBEDDING_DIM) X, y = dataset_helper.load_preprocess_data(path_data=DATA_PATH, verbose=True) # Aufteilen der Daten data_split = dataset_helper.split_data(X, y, test_size=TEST_SIZE, val_size=VAL_SIZE) # Dataset und DataLoader train_dataset = Datasets.GloveDataset(data_split['train']['X'], data_split['train']['y'], word_index, max_len=params["max_len"]) val_dataset = Datasets.GloveDataset(data_split['val']['X'], data_split['val']['y'], word_index, max_len=params["max_len"]) test_dataset = Datasets.GloveDataset(data_split['test']['X'], data_split['test']['y'], word_index, max_len=params["max_len"]) train_loader = DataLoader(train_dataset, batch_size=params["batch_size"], shuffle=True) val_loader = DataLoader(val_dataset, batch_size=params["batch_size"], shuffle=False) test_loader = DataLoader(test_dataset, batch_size=params["batch_size"], shuffle=False) subset_size = len(train_dataset) // N_MODELS device = ml_helper.get_device(verbose=True, include_mps=False) for i in range(N_MODELS): model_name = f'Transformer.pt' hist_name = f'Transformer_history' if N_MODELS > 1: model_name = f'Transformer_{i}_ensemble.pt' hist_name = f'Transformer_{i}_ensemble_history' subset_indices = dataset_helper.ensemble_data_idx(train_dataset.labels, N_MODELS, i, methods='bootstrap') train_dataset_sub = Subset(train_dataset, subset_indices) train_loader = DataLoader(train_dataset_sub, batch_size=params["batch_size"], shuffle=True) # Modell initialisieren model = TransformerBinaryClassifier( embeddings=embedding_matrix, nhead=params['nhead'], dim_feedforward=params['hiden_dim'], num_layers=params['num_layers'], positional_dropout=params["dropout"], classifier_dropout=params["dropout"], ) model = model.to(device) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=params["learning_rate"]) #, weight_decay=params["weight_decay"]) early_stopping = EarlyStopping.EarlyStoppingCallback(patience=params["patience"], verbose=True, model_name=model_name) hist = ml_history.History() # Training und Validierung for epoch in range(params["epochs"]): ml_train.train_epoch(model, train_loader, criterion, optimizer, device, hist, epoch, params["epochs"]) val_rmse = ml_train.validate_epoch(model, val_loader, epoch, criterion, device, hist) early_stopping(val_rmse, model) if early_stopping.early_stop: print("Early stopping triggered.") break # Load best model model.load_state_dict(torch.load('models/checkpoints/' + model_name, weights_only=False)) models.append(model) # Test Evaluation test_labels, test_preds = ml_train.test_loop(model, test_loader, device) hist.add_test_results(test_labels, test_preds) # save training history hist.save_history(hist_name, timestamp) # RMSE, MAE und R²-Score für das Test-Set test_mae = mean_absolute_error(test_labels, test_preds) test_rmse = np.sqrt(mean_squared_error(test_labels, test_preds)) test_r2 = r2_score(test_labels, test_preds) print(f"Test RMSE: {test_rmse:.4f}, Test MAE: {test_mae:.4f}, Test R²: {test_r2:.4f}") if N_MODELS >1: # Ensemble Prediction ensemble_test_preds = ml_train.ensemble_predict(models, test_loader, device) ensemble_avg_preds = np.mean(ensemble_test_preds, axis=0) # Save ensemble predictions as json ensemble_preds_path = f'histories/ensemble_preds_Transformer_{timestamp}.json' with open(ensemble_preds_path, 'w') as f: json.dump(ensemble_avg_preds.tolist(), f) # Test Evaluation test_labels = test_dataset.labels.to_numpy() test_mse = mean_squared_error(test_labels, ensemble_avg_preds) test_mae = mean_absolute_error(test_labels, ensemble_avg_preds) test_r2 = r2_score(test_labels, ensemble_avg_preds) print(f"Ensemble Test RMSE: {test_mse:.4f}, Test MAE: {test_mae:.4f}, Test R²: {test_r2:.4f}")