arman 2025-02-15 14:23:23 +01:00
commit 556ed1c292
7 changed files with 1471 additions and 1613 deletions

227
CNN_CLASS.py 100644
View File

@ -0,0 +1,227 @@
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score
from tqdm import tqdm
from dataset_generator import create_embedding_matrix, split_data, load_preprocess_data
from HumorDataset import TextDataset
from BalancedCELoss import BalancedCELoss
import matplotlib.pyplot as plt
import numpy as np
# Hyperparameter und Konfigurationen
params = {
"embedding_dim": 100,
"filter_sizes": [2, 3, 4, 5],
"num_filters": 150,
"batch_size": 32,
"learning_rate": 0.001,
"epochs": 25,
"glove_path": 'data/glove.6B.100d.txt',
"max_len": 280,
"test_size": 0.1,
"val_size": 0.1,
"patience": 5,
"data_path": 'data/hack.csv',
"dropout": 0.6,
"weight_decay": 5e-4,
"alpha": 0.1 # Alpha für die Balance in der Loss-Funktion
}
# CNN-Modell für binäre Klassifikation
class EnhancedCNNBinaryClassifier(nn.Module):
def __init__(self, vocab_size, embedding_dim, filter_sizes, num_filters, embedding_matrix, dropout):
super(EnhancedCNNBinaryClassifier, self).__init__()
self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze=False)
self.convs = nn.ModuleList([
nn.Sequential(
nn.Conv2d(1, num_filters, (fs, embedding_dim)),
nn.BatchNorm2d(num_filters),
nn.ReLU(),
nn.MaxPool2d((params["max_len"] - fs + 1, 1)),
nn.Dropout(dropout)
)
for fs in filter_sizes
])
self.fc1 = nn.Linear(len(filter_sizes) * num_filters, 128)
self.fc2 = nn.Linear(128, 2) # 2 Klassen, daher 2 Outputs für CrossEntropyLoss
self.dropout = nn.Dropout(dropout)
def forward(self, x):
x = self.embedding(x).unsqueeze(1)
conv_outputs = [conv(x).squeeze(3).squeeze(2) for conv in self.convs]
x = torch.cat(conv_outputs, 1)
x = torch.relu(self.fc1(x))
x = self.dropout(x)
return self.fc2(x) # 2 Outputs, CrossEntropyLoss übernimmt die Softmax
# Visualisierungsfunktionen
def visualize_predictions(true_values, predicted_values):
plt.figure(figsize=(10, 6))
# Unterschied zwischen vorhergesagten und wahren Werten
true_values = np.array(true_values)
predicted_values = np.array(predicted_values)
correct_indices = true_values == predicted_values
incorrect_indices = ~correct_indices
# Scatterplot
plt.scatter(
np.arange(len(true_values))[correct_indices],
true_values[correct_indices],
color='green',
label='Richtig vorhergesagt'
)
plt.scatter(
np.arange(len(true_values))[incorrect_indices],
true_values[incorrect_indices],
color='red',
label='Falsch vorhergesagt'
)
plt.axhline(0.5, linestyle='--', color='blue', label='Schwelle (0.5)')
plt.ylim(-0.5, 1.5)
plt.yticks([0, 1], labels=['Klasse 0', 'Klasse 1'])
plt.xlabel('Datenindex')
plt.ylabel('Klassifikation')
plt.title('Richtige vs. Falsche Vorhersagen')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.6)
plt.tight_layout()
plt.show()
def visualize_distribution(true_values, predicted_values):
plt.figure(figsize=(10, 6))
# Häufigkeiten der Klassen berechnen
true_counts = np.bincount(true_values, minlength=2)
predicted_counts = np.bincount(predicted_values, minlength=2)
# Barplot erstellen
labels = ['Klasse 0', 'Klasse 1']
x = np.arange(len(labels))
plt.bar(x - 0.2, true_counts, width=0.4, color='skyblue', label='Wahre Werte', edgecolor='black')
plt.bar(x + 0.2, predicted_counts, width=0.4, color='salmon', label='Vorhergesagte Werte', edgecolor='black')
plt.title('Verteilung der wahren Werte und Vorhersagen')
plt.xticks(x, labels)
plt.ylabel('Häufigkeit')
plt.xlabel('Klassen')
plt.legend()
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()
# Gerät initialisieren
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Daten laden
embedding_matrix, word_index, vocab_size, d_model = create_embedding_matrix(
gloVe_path=params["glove_path"], emb_len=params["embedding_dim"]
)
X, y = load_preprocess_data(path_data=params["data_path"])
# Daten splitten
data_split = split_data(X, y, test_size=params["test_size"], val_size=params["val_size"])
train_dataset = TextDataset(data_split['train']['X'], data_split['train']['y'], word_index, max_len=params["max_len"])
val_dataset = TextDataset(data_split['val']['X'], data_split['val']['y'], word_index, max_len=params["max_len"])
test_dataset = TextDataset(data_split['test']['X'], data_split['test']['y'], word_index, max_len=params["max_len"])
train_loader = DataLoader(train_dataset, batch_size=params["batch_size"], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=params["batch_size"], shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=params["batch_size"], shuffle=False)
# Modell initialisieren
model = EnhancedCNNBinaryClassifier(
vocab_size=vocab_size,
embedding_dim=params["embedding_dim"],
filter_sizes=params["filter_sizes"],
num_filters=params["num_filters"],
embedding_matrix=embedding_matrix,
dropout=params["dropout"]
)
model = model.to(device)
# BalancedCELoss verwenden
criterion = BalancedCELoss(alpha=params["alpha"])
optimizer = optim.Adam(model.parameters(), lr=params["learning_rate"], weight_decay=params["weight_decay"])
# Training
history = {
"train_loss": [],
"val_loss": [],
"train_acc": [],
"val_acc": [],
}
for epoch in range(params["epochs"]):
model.train()
train_loss, correct, total = 0.0, 0, 0
with tqdm(train_loader, desc=f"Epoch {epoch + 1}/{params['epochs']}") as pbar:
for X_batch, y_batch in pbar:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item()
predicted = torch.argmax(outputs, dim=1)
correct += (predicted == y_batch).sum().item()
total += y_batch.size(0)
pbar.set_postfix({"Train Loss": loss.item()})
train_acc = correct / total
history["train_loss"].append(train_loss / len(train_loader))
history["train_acc"].append(train_acc)
# Validation
model.eval()
val_loss, correct, total = 0.0, 0, 0
with torch.no_grad():
for X_batch, y_batch in val_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
val_loss += loss.item()
predicted = torch.argmax(outputs, dim=1)
correct += (predicted == y_batch).sum().item()
total += y_batch.size(0)
val_acc = correct / total
history["val_loss"].append(val_loss / len(val_loader))
history["val_acc"].append(val_acc)
print(f"\nEpoch {epoch + 1}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
print(f"Train Accuracy: {train_acc:.4f}, Val Accuracy: {val_acc:.4f}")
# Testen und Visualisieren
model.eval()
test_correct, test_total = 0, 0
true_labels, predicted_labels = [], []
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
outputs = model(X_batch)
predicted = torch.argmax(outputs, dim=1)
true_labels.extend(y_batch.cpu().numpy())
predicted_labels.extend(predicted.cpu().numpy())
test_correct += (predicted == y_batch).sum().item()
test_total += y_batch.size(0)
test_accuracy = test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.4f}")
# Visualisierung der Vorhersagen (Scatterplot)
visualize_predictions(true_labels, predicted_labels)
# Visualisierung der Verteilung (Barplot)
visualize_distribution(true_labels, predicted_labels)

View File

@ -302,47 +302,6 @@ test_mae = mean_absolute_error(test_labels, test_preds)
test_r2 = r2_score(test_labels, test_preds)
print(f"Test RMSE: {test_rmse:.4f}, Test MAE: {test_mae:.4f}, Test R²: {test_r2:.4f}")
# Funktion zur Visualisierung der richtigen und falschen Vorhersagen
def visualize_predictions(true_values, predicted_values):
plt.figure(figsize=(10, 6))
# Unterschied zwischen vorhergesagten und wahren Werten
correct_indices = np.isclose(true_values, predicted_values, atol=0.3) # Als korrekt angenommen, wenn Differenz <= 0.3
# Plot
plt.scatter(true_values[correct_indices], predicted_values[correct_indices], color='green', label='Richtig vorhergesagt')
plt.scatter(true_values[~correct_indices], predicted_values[~correct_indices], color='red', label='Falsch vorhergesagt')
plt.plot([min(true_values), max(true_values)], [min(true_values), max(true_values)], color='blue', linestyle='--', label='Ideal-Linie')
plt.xlabel('Wahre Werte')
plt.ylabel('Vorhergesagte Werte')
plt.title('Richtige vs Falsche Vorhersagen')
plt.legend()
plt.grid(True)
plt.show()
# Test Evaluation
model.eval()
test_preds, test_labels = [], []
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device).float()
predictions = model(X_batch).float()
test_preds.extend(predictions.cpu().detach().numpy())
test_labels.extend(y_batch.cpu().detach().numpy())
# Konvertierung zu NumPy-Arrays
true_values = np.array(test_labels)
predicted_values = np.array(test_preds)
# Visualisierung der Ergebnisse
visualize_predictions(true_values, predicted_values)
# RMSE, MAE und R²-Score für das Test-Set
test_rmse = np.sqrt(mean_squared_error(test_labels, test_preds))
test_mae = mean_absolute_error(test_labels, test_preds)
test_r2 = r2_score(test_labels, test_preds)
print(f"Test RMSE: {test_rmse:.4f}, Test MAE: {test_mae:.4f}, Test R²: {test_r2:.4f}")
# plot distribution of predicted values and true values

203
cnn.py
View File

@ -1,203 +0,0 @@
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from nltk.tokenize import word_tokenize
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import accuracy_score
import gensim
import nltk
import time
import matplotlib.pyplot as plt
# NLTK Downloads
nltk.download('punkt') # Entferne punkt_tab, da es nicht existiert
# Check if GPU is available
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', DEVICE)
# Maximum sequence length
MAX_LEN = 100
# Data helpers
def get_embedding(model, word):
if word in model.wv:
return model.wv.key_to_index[word]
else:
return unk_index
def encode_tokens(tokens):
return [get_embedding(model_embedding, token) for token in tokens]
def pad_sequences(sequences, MAX_LEN):
return np.array([np.pad(seq, (0, MAX_LEN - len(seq)), mode='constant', constant_values=unk_index)
if len(seq) < MAX_LEN else seq[:MAX_LEN] for seq in sequences])
# Dataset class
class HumorDataset(Dataset):
def __init__(self, encodings, labels):
self.encodings = encodings
self.labels = labels
def __getitem__(self, idx):
item = {'input_ids': torch.tensor(self.encodings[idx], dtype=torch.long)}
item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)
return item
def __len__(self):
return len(self.labels)
# CNN Model
class CNNBinaryClassifier(nn.Module):
def __init__(self, vocab_size, embed_dim, num_filters, kernel_sizes, hidden_dim, dropout=0.1):
super(CNNBinaryClassifier, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.conv_layers = nn.ModuleList([
nn.Conv1d(in_channels=embed_dim, out_channels=num_filters, kernel_size=k)
for k in kernel_sizes
])
self.fc = nn.Linear(num_filters * len(kernel_sizes), hidden_dim)
self.out = nn.Linear(hidden_dim, 1)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(dropout)
self.sigmoid = nn.Sigmoid()
def forward(self, input_ids):
embedded = self.embedding(input_ids).permute(0, 2, 1)
conv_outs = [self.relu(conv(embedded)) for conv in self.conv_layers]
pooled_outs = [torch.max(out, dim=2)[0] for out in conv_outs]
concatenated = torch.cat(pooled_outs, dim=1)
fc_out = self.relu(self.fc(self.dropout(concatenated)))
logits = self.out(fc_out)
return self.sigmoid(logits)
# Main script
if __name__ == "__main__":
# Load and process data
df = pd.read_csv('/content/hack.csv')
print(f"Loaded dataset: {df.shape}")
X = df['text'].fillna("unknown").astype(str)
y = df['is_humor']
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Tokenization with error handling
train_tokens = []
test_tokens = []
for text in X_train:
try:
train_tokens.append(word_tokenize(text.lower()))
except Exception as e:
print(f"Error tokenizing: {text}. Error: {e}")
train_tokens.append(["unknown"])
for text in X_test:
try:
test_tokens.append(word_tokenize(text.lower()))
except Exception as e:
print(f"Error tokenizing: {text}. Error: {e}")
test_tokens.append(["unknown"])
print("Sample tokenization (Train):", train_tokens[:2])
print("Sample tokenization (Test):", test_tokens[:2])
# Train Word2Vec model
model_embedding = gensim.models.Word2Vec(train_tokens, vector_size=100, window=5, min_count=1, workers=4)
# Add unknown token
model_embedding.wv.add_vector('<UNK>', np.zeros(model_embedding.vector_size))
unk_index = model_embedding.wv.key_to_index['<UNK>']
# Encode tokens
train_encodings = [encode_tokens(tokens) for tokens in train_tokens]
test_encodings = [encode_tokens(tokens) for tokens in test_tokens]
# Pad sequences with validation
train_encodings = pad_sequences(train_encodings, MAX_LEN)
test_encodings = pad_sequences(test_encodings, MAX_LEN)
if len(train_encodings) == 0 or len(test_encodings) == 0:
raise ValueError("Tokenization or padding failed. Please check your input data.")
# Create datasets
train_dataset = HumorDataset(train_encodings, y_train.reset_index(drop=True))
test_dataset = HumorDataset(test_encodings, y_test.reset_index(drop=True))
# Model parameters
vocab_size = len(model_embedding.wv.key_to_index)
embed_dim = model_embedding.vector_size
num_filters = 200
kernel_sizes = [3, 4, 5]
hidden_dim = 128
dropout = 0.5
model = CNNBinaryClassifier(vocab_size, embed_dim, num_filters, kernel_sizes, hidden_dim, dropout)
# Training parameters
epochs = 10
batch_size = 8
learning_rate = 2e-5
# Optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCELoss()
# Data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# Move model to device
model.to(DEVICE)
print("Starting training...")
train_losses = []
# Training loop
for epoch in range(epochs):
epoch_loss = 0
model.train()
for batch in train_loader:
optimizer.zero_grad()
input_ids = batch['input_ids'].to(DEVICE)
labels = batch['labels'].unsqueeze(1).to(DEVICE)
outputs = model(input_ids)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
train_loss = epoch_loss / len(train_loader)
train_losses.append(train_loss)
print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}")
# Visualize training loss
plt.figure(figsize=(10, 6))
plt.plot(range(1, epochs + 1), train_losses, marker='o', linestyle='-', label='Train Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss Over Epochs')
plt.legend()
plt.grid(True)
plt.show()
print("Starting evaluation...")
# Evaluation
model.eval()
predictions, true_labels = [], []
with torch.no_grad():
for batch in test_loader:
input_ids = batch['input_ids'].to(DEVICE)
labels = batch['labels'].unsqueeze(1).to(DEVICE)
outputs = model(input_ids)
preds = (outputs > 0.5).float()
predictions.extend(preds.cpu().numpy())
true_labels.extend(labels.cpu().numpy())
accuracy = accuracy_score(true_labels, predictions)
print(f"Final Accuracy: {accuracy:.4f}")

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,186 +0,0 @@
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from tqdm import tqdm
from dataset_generator import create_embedding_matrix
from EarlyStopping import EarlyStopping
# 1. Gerät automatisch erkennen (MPS, CUDA oder CPU)
device = torch.device('mps' if torch.backends.mps.is_available()
else 'cuda' if torch.cuda.is_available()
else 'cpu')
print(f"Using device: {device}")
# 2. Daten laden
data = pd.read_csv('data/hack.csv')
# 3. Filtern humorvoller Texte
humor_data = data[data['is_humor'] == 1].dropna(subset=['humor_rating']).copy()
# 4. Einbettungsmatrix erstellen
embedding_matrix, word_index, vocab_size, d_model = create_embedding_matrix(
gloVe_path='data/glove.6B.100d.txt', emb_len=100
)
print(f"vocab_size: {vocab_size}, d_model: {d_model}")
# 5. Tokenisierung und Padding mit PyTorch
def tokenize_and_pad(texts, word_index, max_len=50):
sequences = []
for text in texts:
tokens = [word_index.get(word, 0) for word in text.split()]
if len(tokens) < max_len:
tokens += [0] * (max_len - len(tokens))
else:
tokens = tokens[:max_len]
sequences.append(tokens)
return torch.tensor(sequences, dtype=torch.long)
# Training und Testdaten splitten
train_texts, test_texts, train_labels, test_labels = train_test_split(
humor_data['text'], humor_data['humor_rating'], test_size=0.2, random_state=42
)
# Tokenisierung und Padding
max_len = 50
train_input_ids = tokenize_and_pad(train_texts, word_index, max_len=max_len)
test_input_ids = tokenize_and_pad(test_texts, word_index, max_len=max_len)
# Labels in Tensor konvertieren
train_labels = torch.tensor(train_labels.values, dtype=torch.float)
test_labels = torch.tensor(test_labels.values, dtype=torch.float)
# 6. Dataset-Klasse für PyTorch
class HumorDataset(Dataset):
def __init__(self, input_ids, labels):
self.input_ids = input_ids
self.labels = labels
def __len__(self):
return len(self.input_ids)
def __getitem__(self, idx):
return self.input_ids[idx], self.labels[idx]
# Dataset und DataLoader erstellen
train_dataset = HumorDataset(train_input_ids, train_labels)
test_dataset = HumorDataset(test_input_ids, test_labels)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
# 7. CNN-Regression-Modell definieren
class CNNRegressor(nn.Module):
def __init__(self, vocab_size, embed_dim, embedding_matrix):
super(CNNRegressor, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.embedding.weight.data.copy_(embedding_matrix.clone().detach())
self.embedding.weight.requires_grad = False
self.conv1 = nn.Conv1d(embed_dim, 128, kernel_size=3)
self.conv2 = nn.Conv1d(128, 64, kernel_size=3)
self.dropout = nn.Dropout(0.5)
self.fc = nn.Linear(64, 1)
def forward(self, x):
x = self.embedding(x).permute(0, 2, 1)
x = torch.relu(self.conv1(x))
x = torch.relu(self.conv2(x))
x = self.dropout(x)
x = torch.max(x, dim=2).values
x = self.fc(x)
x = torch.sigmoid(x) * 5 # Wertebereich [0, 5]
return x
# Initialisiere das Modell
model = CNNRegressor(vocab_size, d_model, embedding_matrix).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# Early Stopping
#early_stopping = EarlyStopping(patience=5)
# 8. Training mit Validierung
for epoch in range(20): # Maximal 20 Epochen
model.train()
train_loss = 0
for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs).squeeze()
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
# Validierungsverlust berechnen
model.eval()
val_loss = 0
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs).squeeze()
loss = criterion(outputs, labels)
val_loss += loss.item()
val_loss /= len(test_loader)
print(f"Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
# Early Stopping
'''early_stopping(val_loss, model)
if early_stopping.early_stop:
print("Early stopping triggered")
break'''
# 9. Modell evaluieren
def evaluate_model(model, data_loader):
model.eval()
predictions = []
actuals = []
with torch.no_grad():
for inputs, labels in data_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs).squeeze()
predictions.extend(outputs.cpu().numpy())
actuals.extend(labels.cpu().numpy())
return predictions, actuals
predictions, actuals = evaluate_model(model, test_loader)
# Metriken berechnen
mse = mean_squared_error(actuals, predictions)
mae = mean_absolute_error(actuals, predictions)
r2 = r2_score(actuals, predictions)
print(f"MSE: {mse:.4f}, MAE: {mae:.4f}, R²: {r2:.4f}")
# 10. Visualisierung (Korrekte und falsche Vorhersagen farblich darstellen)
tolerance = 0.5 # Toleranz für korrekte Vorhersagen
predictions = np.array(predictions)
actuals = np.array(actuals)
# Klassifikation: Grün (korrekt), Rot (falsch)
correct = np.abs(predictions - actuals) <= tolerance
colors = np.where(correct, 'green', 'red')
# Scatter-Plot
plt.figure(figsize=(8, 6))
plt.scatter(actuals, predictions, c=colors, alpha=0.6, edgecolor='k', s=50)
plt.plot([0, 5], [0, 5], color='red', linestyle='--') # Perfekte Vorhersage-Linie
# Legende
green_patch = mpatches.Patch(color='green', label='Correct Predictions')
red_patch = mpatches.Patch(color='red', label='Incorrect Predictions')
plt.legend(handles=[green_patch, red_patch])
# Achsen und Titel
plt.xlabel("True Humor Ratings")
plt.ylabel("Predicted Humor Ratings")
plt.title("True vs Predicted Humor Ratings (Correct vs Incorrect)")
plt.show()