arman 2025-02-14 14:28:42 +01:00
commit 1807985736
2 changed files with 374 additions and 36 deletions

342
TEST_CNN_2.py 100644
View File

@ -0,0 +1,342 @@
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from tqdm import tqdm # Fortschrittsbalken-Bibliothek
from dataset_generator import create_embedding_matrix, split_data
from HumorDataset import TextDataset
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
# Hyperparameter und Konfigurationen
params = {
"embedding_dim": 100,
"filter_sizes": [2, 3, 4, 5], # Zusätzliche Filtergröße
"num_filters": 150, # Erhöhte Anzahl von Filtern
"batch_size": 32,
"learning_rate": 0.001,
"epochs": 25,
"glove_path": 'data/glove.6B.100d.txt', # Pfad zu GloVe
"max_len": 50,
"test_size": 0.1,
"val_size": 0.1,
"patience": 5,
"data_path": 'data/hack.csv', # Pfad zu den Daten
"dropout": 0.6, # Erhöhtes Dropout
"weight_decay": 5e-4 # L2-Regularisierung
}
# EarlyStopping-Klasse mit Ordnerprüfung
class EarlyStopping:
def __init__(self, patience=5, verbose=False):
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
def __call__(self, val_loss, model):
score = -val_loss
if self.best_score is None:
self.best_score = score
self.save_checkpoint(val_loss, model)
elif score < self.best_score:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(val_loss, model)
self.counter = 0
def save_checkpoint(self, val_loss, model, filename='checkpoint.pt'):
directory = "checkpoints"
if not os.path.exists(directory):
os.makedirs(directory) # Erstelle den Ordner, falls er nicht existiert
if self.verbose:
print(f'Validation loss decreased ({self.best_score:.6f} --> {val_loss:.6f}). Saving model ...')
torch.save(model.state_dict(), os.path.join(directory, filename))
# Plot-Funktion für Training
def plot_learning_curves(history):
epochs = range(1, len(history['train_loss']) + 1)
# Loss-Plot
plt.figure(figsize=(14, 6))
plt.subplot(1, 2, 1)
plt.plot(epochs, history['train_loss'], label='Train Loss')
plt.plot(epochs, history['val_loss'], label='Val Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
# RMSE-Plot
plt.subplot(1, 2, 2)
plt.plot(epochs, history['train_rmse'], label='Train RMSE')
plt.plot(epochs, history['val_rmse'], label='Val RMSE')
plt.xlabel('Epochs')
plt.ylabel('RMSE')
plt.title('Training and Validation RMSE')
plt.legend()
plt.tight_layout()
plt.show()
# Visualisierung der Zielvariablen (Scores)
def visualize_data_distribution(y):
print("\n--- Zielvariable: Statistik ---")
print(f"Min: {np.min(y)}, Max: {np.max(y)}")
print(f"Mittelwert: {np.mean(y):.4f}, Standardabweichung: {np.std(y):.4f}")
# Histogramm plotten
plt.figure(figsize=(10, 6))
plt.hist(y, bins=20, color='skyblue', edgecolor='black')
plt.title('Verteilung der Zielvariable (Scores)')
plt.xlabel('Score')
plt.ylabel('Häufigkeit')
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()
# Funktion zum Laden und Vorverarbeiten der Daten
def load_preprocess_data(path_data='data/hack.csv'):
# Daten laden
df = pd.read_csv(path_data)
# Fehlende Werte in der Zielspalte entfernen
df = df.dropna(subset=['humor_rating'])
# Zielvariable aus der Spalte 'humor_rating' extrahieren
df['y'] = df['humor_rating'].astype(float) # Sicherstellen, dass Zielvariable numerisch ist
# Eingabetexte und Zielvariable zuweisen
X = df['text']
y = df['y']
# Debug-Ausgabe zur Überprüfung
print(f"Erste Zielwerte: {y.head(10)}")
print(f"Datentyp der Zielvariable: {y.dtype}")
print(f"Anzahl der Beispiele: {len(X)}")
return X, y
# CNN-Modell für Regression mit erweiterten Regularisierungen
class EnhancedCNNRegressor(nn.Module):
def __init__(self, vocab_size, embedding_dim, filter_sizes, num_filters, embedding_matrix, dropout):
super(EnhancedCNNRegressor, self).__init__()
self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze=False)
# Convolutional Schichten mit Batch-Normalisierung
self.convs = nn.ModuleList([
nn.Sequential(
nn.Conv2d(1, num_filters, (fs, embedding_dim)),
nn.BatchNorm2d(num_filters), # Batch-Normalisierung
nn.ReLU(),
nn.MaxPool2d((params["max_len"] - fs + 1, 1)),
nn.Dropout(dropout) # Dropout nach jeder Schicht
)
for fs in filter_sizes
])
# Fully-Connected Layer
self.fc1 = nn.Linear(len(filter_sizes) * num_filters, 128) # Erweiterte Dense-Schicht
self.fc2 = nn.Linear(128, 1) # Ausgangsschicht (Regression)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
x = self.embedding(x).unsqueeze(1) # [Batch, 1, Seq, Embedding]
conv_outputs = [conv(x).squeeze(3).squeeze(2) for conv in self.convs] # Pooling reduziert Dim
x = torch.cat(conv_outputs, 1) # Kombiniere Features von allen Filtern
x = torch.relu(self.fc1(x)) # Zusätzliche Dense-Schicht
x = self.dropout(x)
return self.fc2(x).squeeze(1)
# Device auf CPU setzen
device = torch.device("cpu")
print(f"Using device: {device}")
# Daten laden und vorbereiten
embedding_matrix, word_index, vocab_size, d_model = create_embedding_matrix(
gloVe_path=params["glove_path"], emb_len=params["embedding_dim"]
)
X, y = load_preprocess_data(path_data=params["data_path"])
# Visualisierung der Daten
visualize_data_distribution(y)
# Aufteilen der Daten
data_split = split_data(X, y, test_size=params["test_size"], val_size=params["val_size"])
# Dataset und DataLoader
train_dataset = TextDataset(data_split['train']['X'], data_split['train']['y'], word_index, max_len=params["max_len"])
val_dataset = TextDataset(data_split['val']['X'], data_split['val']['y'], word_index, max_len=params["max_len"])
test_dataset = TextDataset(data_split['test']['X'], data_split['test']['y'], word_index, max_len=params["max_len"])
train_loader = DataLoader(train_dataset, batch_size=params["batch_size"], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=params["batch_size"], shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=params["batch_size"], shuffle=False)
# Modell initialisieren
model = EnhancedCNNRegressor(
vocab_size=vocab_size,
embedding_dim=params["embedding_dim"],
filter_sizes=params["filter_sizes"],
num_filters=params["num_filters"],
embedding_matrix=embedding_matrix,
dropout=params["dropout"]
).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=params["learning_rate"], weight_decay=params["weight_decay"])
early_stopping = EarlyStopping(patience=params["patience"], verbose=True)
# Speicher für Trainingsmetriken
history = {
"train_loss": [],
"val_loss": [],
"train_rmse": [],
"val_rmse": [],
}
# Training und Validierung
for epoch in range(params["epochs"]):
model.train()
train_loss = 0.0
train_preds, train_labels = [], []
# Fortschrittsbalken für Training innerhalb einer Epoche
with tqdm(train_loader, desc=f"Epoch {epoch + 1}/{params['epochs']}") as pbar:
for X_batch, y_batch in pbar:
X_batch, y_batch = X_batch.to(device), y_batch.to(device).float()
optimizer.zero_grad()
predictions = model(X_batch).float()
loss = criterion(predictions, y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Speichere echte und vorhergesagte Werte für Metriken
train_preds.extend(predictions.cpu().detach().numpy())
train_labels.extend(y_batch.cpu().detach().numpy())
# Update der Fortschrittsanzeige
pbar.set_postfix({"Train Loss": loss.item()})
train_rmse = np.sqrt(mean_squared_error(train_labels, train_preds)) # RMSE
history["train_loss"].append(train_loss / len(train_loader))
history["train_rmse"].append(train_rmse)
# Validation
model.eval()
val_loss = 0.0
val_preds, val_labels = [], []
with torch.no_grad():
for X_batch, y_batch in val_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device).float()
predictions = model(X_batch).float()
loss = criterion(predictions, y_batch)
val_loss += loss.item()
val_preds.extend(predictions.cpu().detach().numpy())
val_labels.extend(y_batch.cpu().detach().numpy())
val_rmse = np.sqrt(mean_squared_error(val_labels, val_preds)) # RMSE
history["val_loss"].append(val_loss / len(val_loader))
history["val_rmse"].append(val_rmse)
print(f"\nEpoch {epoch + 1}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
print(f"Train RMSE: {train_rmse:.4f}, Val RMSE: {val_rmse:.4f}")
early_stopping(val_rmse, model)
if early_stopping.early_stop:
print("Early stopping triggered.")
break
# Plot der Lernkurven
plot_learning_curves(history)
# Funktion zur Visualisierung der richtigen und falschen Vorhersagen
def visualize_predictions(true_values, predicted_values):
plt.figure(figsize=(10, 6))
# Unterschied zwischen vorhergesagten und wahren Werten
correct_indices = np.isclose(true_values, predicted_values, atol=0.3) # Als korrekt angenommen, wenn Differenz <= 0.3
# Plot
plt.scatter(true_values[correct_indices], predicted_values[correct_indices], color='green', label='Richtig vorhergesagt')
plt.scatter(true_values[~correct_indices], predicted_values[~correct_indices], color='red', label='Falsch vorhergesagt')
plt.plot([min(true_values), max(true_values)], [min(true_values), max(true_values)], color='blue', linestyle='--', label='Ideal-Linie')
plt.xlabel('Wahre Werte')
plt.ylabel('Vorhergesagte Werte')
plt.title('Richtige vs Falsche Vorhersagen')
plt.legend()
plt.grid(True)
plt.show()
# Test Evaluation
model.eval()
test_preds, test_labels = [], []
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device).float()
predictions = model(X_batch).float()
test_preds.extend(predictions.cpu().detach().numpy())
test_labels.extend(y_batch.cpu().detach().numpy())
# Konvertierung zu NumPy-Arrays
true_values = np.array(test_labels)
predicted_values = np.array(test_preds)
# Visualisierung der Ergebnisse
visualize_predictions(true_values, predicted_values)
# RMSE, MAE und R²-Score für das Test-Set
test_rmse = np.sqrt(mean_squared_error(test_labels, test_preds))
test_mae = mean_absolute_error(test_labels, test_preds)
test_r2 = r2_score(test_labels, test_preds)
print(f"Test RMSE: {test_rmse:.4f}, Test MAE: {test_mae:.4f}, Test R²: {test_r2:.4f}")
# Funktion zur Visualisierung der richtigen und falschen Vorhersagen
def visualize_predictions(true_values, predicted_values):
plt.figure(figsize=(10, 6))
# Unterschied zwischen vorhergesagten und wahren Werten
correct_indices = np.isclose(true_values, predicted_values, atol=0.3) # Als korrekt angenommen, wenn Differenz <= 0.3
# Plot
plt.scatter(true_values[correct_indices], predicted_values[correct_indices], color='green', label='Richtig vorhergesagt')
plt.scatter(true_values[~correct_indices], predicted_values[~correct_indices], color='red', label='Falsch vorhergesagt')
plt.plot([min(true_values), max(true_values)], [min(true_values), max(true_values)], color='blue', linestyle='--', label='Ideal-Linie')
plt.xlabel('Wahre Werte')
plt.ylabel('Vorhergesagte Werte')
plt.title('Richtige vs Falsche Vorhersagen')
plt.legend()
plt.grid(True)
plt.show()
# Test Evaluation
model.eval()
test_preds, test_labels = [], []
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device).float()
predictions = model(X_batch).float()
test_preds.extend(predictions.cpu().detach().numpy())
test_labels.extend(y_batch.cpu().detach().numpy())
# Konvertierung zu NumPy-Arrays
true_values = np.array(test_labels)
predicted_values = np.array(test_preds)
# Visualisierung der Ergebnisse
visualize_predictions(true_values, predicted_values)
# RMSE, MAE und R²-Score für das Test-Set
test_rmse = np.sqrt(mean_squared_error(test_labels, test_preds))
test_mae = mean_absolute_error(test_labels, test_preds)
test_r2 = r2_score(test_labels, test_preds)
print(f"Test RMSE: {test_rmse:.4f}, Test MAE: {test_mae:.4f}, Test R²: {test_r2:.4f}")

View File

@ -4,10 +4,10 @@ import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
# scikit-learn Imports
from sklearn.metrics import accuracy_score, confusion_matrix
# from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.model_selection import train_test_split
# Bert imports
from transformers import BertForSequenceClassification, BertTokenizer
from transformers import BertForSequenceClassification, AutoTokenizer
#Default imports (pandas, numpy, matplotlib, etc.)
import pandas as pd
import numpy as np
@ -21,23 +21,20 @@ else:
class SimpleHumorDataset(Dataset):
def __init__(self,tokenizer,dataframe,max_length=280):
super().__init__()
def __init__(self,tokenizer:AutoTokenizer,dataframe:pd.DataFrame,max_length:int=128):
super(SimpleHumorDataset,self).__init__()
self.tokenizer = tokenizer
self.max_length = max_length
self.text = dataframe['text'].tolist()
self.labels = dataframe['is_humor'].tolist()
self.text = dataframe['text'].to_list()
self.labels = dataframe['is_humor'].to_list()
def __getitem__(self,idx):
def __getitem__(self,idx:int):
text = self.text[idx]
labels = self.labels[idx]
encoding = self.tokenizer.encode_plus(
encoding = self.tokenizer(
text,
add_special_tokens=True,
padding="max_length",
# trunction = True,
return_attention_mask = True,
return_token_type_ids = False,
max_length=self.max_length,
truncation = True,
return_tensors = 'pt'
@ -48,17 +45,15 @@ class SimpleHumorDataset(Dataset):
return {
'input_ids': torch.as_tensor(input_ids,dtype=torch.long),
'attention_mask':torch.as_tensor(attention_mask,dtype=torch.long),
'labels':torch.as_tensor(labels,dtype=torch.long),
'text':text
}
'labels':torch.tensor(labels,dtype=torch.long)
}
def __len__(self):
return len(self.labels)
class CustomBert(nn.Module):
def __init__(self):
super(CustomBert,self).__init__()
super().__init__()
#Bert + Custom Layers (Not a tuple any longer -- idk why)
self.bfsc = BertForSequenceClassification.from_pretrained("bert-base-uncased")
self.classifier = nn.Linear(2,2)
@ -69,8 +64,7 @@ class CustomBert(nn.Module):
x = self.classifier(seq_out.logits)
return self.sm(x)
def training_loop(model,criterion,optimizer,train_loader):
torch.cuda.empty_cache()
def training_loop(model:CustomBert,criterion:nn.CrossEntropyLoss,optimizer:optim.AdamW,train_loader:DataLoader):
model.train()
total_loss = 0
@ -78,7 +72,7 @@ def training_loop(model,criterion,optimizer,train_loader):
# Set Gradient to Zero
optimizer.zero_grad()
# Unpack batch values and "push" it to GPU
input_ids, att_mask, labels,_ = train_batch.values()
input_ids, att_mask, labels = train_batch.values()
input_ids, att_mask, labels = input_ids.to(DEVICE),att_mask.to(DEVICE),labels.to(DEVICE)
# Feed Model with Data
outputs = model(input_ids, attention_mask=att_mask)
@ -89,14 +83,13 @@ def training_loop(model,criterion,optimizer,train_loader):
print(f"Total Loss is {(total_loss/len(train_loader)):.4f}")
return (total_loss/len(train_loader))
def eval_loop(model,criterion,validation_loader):
torch.cuda.empty_cache()
def eval_loop(model:CustomBert,criterion:nn.CrossEntropyLoss,validation_loader:DataLoader):
model.eval()
total, correct = 0.0, 0.0
total_loss = 0.0
with torch.no_grad():
for val_batch in validation_loader:
input_ids, att_mask ,labels,_ = val_batch.values()
input_ids, att_mask ,labels = val_batch.values()
input_ids, att_mask, labels = input_ids.to(DEVICE),att_mask.to(DEVICE), labels.to(DEVICE)
outputs = model(input_ids,attention_mask=att_mask)
loss = criterion(outputs,labels)
@ -104,7 +97,7 @@ def eval_loop(model,criterion,validation_loader):
predictions = torch.argmax(outputs,1)
total += labels.size(0)
correct += (predictions == labels).sum().item()
print(f"Total Loss: {total_loss/len(validation_loader)} ### Test Accuracy {correct/total}%")
print(f"Total Loss: {total_loss/len(validation_loader)} ### Test Accuracy {correct/total*100}%")
return total_loss/len(validation_loader)
@ -112,11 +105,11 @@ if __name__ == "__main__":
torch.manual_seed(501)
# HYPERPARAMETERS
# Set Max Epoch Amount
EPOCH = 5
EPOCH = 1
# DROPOUT-PROBABILITY
DROPOUT = 0.1
# BATCHSIZE
BATCH_SIZE = 32
BATCH_SIZE = 8
#LEARNING RATE
LEARNING_RATE = 1e-5
# Initialize Bert Model with dropout probability and Num End Layers
@ -131,19 +124,20 @@ if __name__ == "__main__":
# Initialize BertTokenizer from Pretrained
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased",do_lower_case=True)
tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-uncased",do_lower_case=True)
print("Tokenizer Initialized")
# print(tokenizer(df['text'][0],padding=True,truncation=True,max_length=256))
#Split DataFrame into Train and Test Sets
train,test = train_test_split(df,random_state=501,test_size=.2)
print("Splitted Data in Train and Test Sets")
# val = []
# Create Custom Datasets for Train and Test
train_data = SimpleHumorDataset(tokenizer,train)
# val_data = SimpleHumorDataset(tokenizer,val)
test_data = SimpleHumorDataset(tokenizer,test)
print("Custom Datasets created")
print("Custom Datasets created")
# Initialize Dataloader with Train and Test Sets
@ -152,21 +146,23 @@ if __name__ == "__main__":
test_loader = DataLoader(dataset=test_data,batch_size=BATCH_SIZE,shuffle=False)
print("DataLoaders created")
# Set criterion to BCELoss (Binary Cross Entropy) and define Adam Optimizer with model parameters and learning rate
criterion_bce = nn.CrossEntropyLoss()
# Set criterion to Cross Entropy and define Adam Optimizer with model parameters and learning rate
criterion_cross_entropy = nn.CrossEntropyLoss()
optimizer_adamW = optim.Adam(mybert.parameters(), lr = LEARNING_RATE)
import time
# Set Scheduler for dynamically Learning Rate adjustment
# scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer_adam)
loss_values = np.zeros(EPOCH)
eval_values = np.zeros(EPOCH)
start = time.time()
for epoch in range(EPOCH):
print(f"For {epoch+1} the Scores are: ")
loss_values[epoch] = training_loop(mybert,optimizer=optimizer_adamW,criterion=criterion_bce,train_loader=train_loader)
eval_values[epoch] = eval_loop(mybert,criterion=criterion_bce,validation_loader=test_loader)
loss_values[epoch] = training_loop(mybert,optimizer=optimizer_adamW,criterion=criterion_cross_entropy,train_loader=train_loader)
eval_values[epoch] = eval_loop(mybert,criterion=criterion_cross_entropy,validation_loader=test_loader)
end = time.time()
print((end-start),"seconds per epoch needed")
# Visualize Training Loss
# plt.plot(loss_values)
plt.plot(loss_values)
plt.plot(eval_values)
plt.hlines(np.mean(loss_values),xmin=0,xmax=EPOCH,colors='red',linestyles="dotted",label="Average Loss")
plt.hlines(np.mean(eval_values),xmin=0,xmax=EPOCH,colors='green',linestyles="dashed",label="Average Val Loss")