ANLP_WS24_CA2/CNN_CLASS.py

228 lines
7.9 KiB
Python

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score
from tqdm import tqdm
from dataset_generator import create_embedding_matrix, split_data, load_preprocess_data
from HumorDataset import TextDataset
from BalancedCELoss import BalancedCELoss
import matplotlib.pyplot as plt
import numpy as np
# Hyperparameter und Konfigurationen
params = {
"embedding_dim": 100,
"filter_sizes": [2, 3, 4, 5],
"num_filters": 150,
"batch_size": 32,
"learning_rate": 0.001,
"epochs": 25,
"glove_path": 'data/glove.6B.100d.txt',
"max_len": 280,
"test_size": 0.1,
"val_size": 0.1,
"patience": 5,
"data_path": 'data/hack.csv',
"dropout": 0.6,
"weight_decay": 5e-4,
"alpha": 0.1 # Alpha für die Balance in der Loss-Funktion
}
# CNN-Modell für binäre Klassifikation
class EnhancedCNNBinaryClassifier(nn.Module):
def __init__(self, vocab_size, embedding_dim, filter_sizes, num_filters, embedding_matrix, dropout):
super(EnhancedCNNBinaryClassifier, self).__init__()
self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze=False)
self.convs = nn.ModuleList([
nn.Sequential(
nn.Conv2d(1, num_filters, (fs, embedding_dim)),
nn.BatchNorm2d(num_filters),
nn.ReLU(),
nn.MaxPool2d((params["max_len"] - fs + 1, 1)),
nn.Dropout(dropout)
)
for fs in filter_sizes
])
self.fc1 = nn.Linear(len(filter_sizes) * num_filters, 128)
self.fc2 = nn.Linear(128, 2) # 2 Klassen, daher 2 Outputs für CrossEntropyLoss
self.dropout = nn.Dropout(dropout)
def forward(self, x):
x = self.embedding(x).unsqueeze(1)
conv_outputs = [conv(x).squeeze(3).squeeze(2) for conv in self.convs]
x = torch.cat(conv_outputs, 1)
x = torch.relu(self.fc1(x))
x = self.dropout(x)
return self.fc2(x) # 2 Outputs, CrossEntropyLoss übernimmt die Softmax
# Visualisierungsfunktionen
def visualize_predictions(true_values, predicted_values):
plt.figure(figsize=(10, 6))
# Unterschied zwischen vorhergesagten und wahren Werten
true_values = np.array(true_values)
predicted_values = np.array(predicted_values)
correct_indices = true_values == predicted_values
incorrect_indices = ~correct_indices
# Scatterplot
plt.scatter(
np.arange(len(true_values))[correct_indices],
true_values[correct_indices],
color='green',
label='Richtig vorhergesagt'
)
plt.scatter(
np.arange(len(true_values))[incorrect_indices],
true_values[incorrect_indices],
color='red',
label='Falsch vorhergesagt'
)
plt.axhline(0.5, linestyle='--', color='blue', label='Schwelle (0.5)')
plt.ylim(-0.5, 1.5)
plt.yticks([0, 1], labels=['Klasse 0', 'Klasse 1'])
plt.xlabel('Datenindex')
plt.ylabel('Klassifikation')
plt.title('Richtige vs. Falsche Vorhersagen')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.6)
plt.tight_layout()
plt.show()
def visualize_distribution(true_values, predicted_values):
plt.figure(figsize=(10, 6))
# Häufigkeiten der Klassen berechnen
true_counts = np.bincount(true_values, minlength=2)
predicted_counts = np.bincount(predicted_values, minlength=2)
# Barplot erstellen
labels = ['Klasse 0', 'Klasse 1']
x = np.arange(len(labels))
plt.bar(x - 0.2, true_counts, width=0.4, color='skyblue', label='Wahre Werte', edgecolor='black')
plt.bar(x + 0.2, predicted_counts, width=0.4, color='salmon', label='Vorhergesagte Werte', edgecolor='black')
plt.title('Verteilung der wahren Werte und Vorhersagen')
plt.xticks(x, labels)
plt.ylabel('Häufigkeit')
plt.xlabel('Klassen')
plt.legend()
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()
# Gerät initialisieren
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Daten laden
embedding_matrix, word_index, vocab_size, d_model = create_embedding_matrix(
gloVe_path=params["glove_path"], emb_len=params["embedding_dim"]
)
X, y = load_preprocess_data(path_data=params["data_path"])
# Daten splitten
data_split = split_data(X, y, test_size=params["test_size"], val_size=params["val_size"])
train_dataset = TextDataset(data_split['train']['X'], data_split['train']['y'], word_index, max_len=params["max_len"])
val_dataset = TextDataset(data_split['val']['X'], data_split['val']['y'], word_index, max_len=params["max_len"])
test_dataset = TextDataset(data_split['test']['X'], data_split['test']['y'], word_index, max_len=params["max_len"])
train_loader = DataLoader(train_dataset, batch_size=params["batch_size"], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=params["batch_size"], shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=params["batch_size"], shuffle=False)
# Modell initialisieren
model = EnhancedCNNBinaryClassifier(
vocab_size=vocab_size,
embedding_dim=params["embedding_dim"],
filter_sizes=params["filter_sizes"],
num_filters=params["num_filters"],
embedding_matrix=embedding_matrix,
dropout=params["dropout"]
)
model = model.to(device)
# BalancedCELoss verwenden
criterion = BalancedCELoss(alpha=params["alpha"])
optimizer = optim.Adam(model.parameters(), lr=params["learning_rate"], weight_decay=params["weight_decay"])
# Training
history = {
"train_loss": [],
"val_loss": [],
"train_acc": [],
"val_acc": [],
}
for epoch in range(params["epochs"]):
model.train()
train_loss, correct, total = 0.0, 0, 0
with tqdm(train_loader, desc=f"Epoch {epoch + 1}/{params['epochs']}") as pbar:
for X_batch, y_batch in pbar:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item()
predicted = torch.argmax(outputs, dim=1)
correct += (predicted == y_batch).sum().item()
total += y_batch.size(0)
pbar.set_postfix({"Train Loss": loss.item()})
train_acc = correct / total
history["train_loss"].append(train_loss / len(train_loader))
history["train_acc"].append(train_acc)
# Validation
model.eval()
val_loss, correct, total = 0.0, 0, 0
with torch.no_grad():
for X_batch, y_batch in val_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
val_loss += loss.item()
predicted = torch.argmax(outputs, dim=1)
correct += (predicted == y_batch).sum().item()
total += y_batch.size(0)
val_acc = correct / total
history["val_loss"].append(val_loss / len(val_loader))
history["val_acc"].append(val_acc)
print(f"\nEpoch {epoch + 1}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
print(f"Train Accuracy: {train_acc:.4f}, Val Accuracy: {val_acc:.4f}")
# Testen und Visualisieren
model.eval()
test_correct, test_total = 0, 0
true_labels, predicted_labels = [], []
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
outputs = model(X_batch)
predicted = torch.argmax(outputs, dim=1)
true_labels.extend(y_batch.cpu().numpy())
predicted_labels.extend(predicted.cpu().numpy())
test_correct += (predicted == y_batch).sum().item()
test_total += y_batch.size(0)
test_accuracy = test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.4f}")
# Visualisierung der Vorhersagen (Scatterplot)
visualize_predictions(true_labels, predicted_labels)
# Visualisierung der Verteilung (Barplot)
visualize_distribution(true_labels, predicted_labels)