main
arman 2025-02-14 17:22:38 +01:00
parent 1807985736
commit ffc959e884
1 changed files with 206 additions and 0 deletions

View File

@ -0,0 +1,206 @@
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from tqdm import tqdm
from dataset_generator import create_embedding_matrix
from EarlyStopping import EarlyStopping
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, Subset # Import Subset
#from utils import tokenize_and_pad, HumorDataset, evaluate_model, bootstrap_aggregation
def train_model(model, train_dataset, criterion, optimizer, epochs, batch_size):
dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
model.to(device)
for epoch in range(epochs):
model.train()
total_loss = 0
all_preds, all_targets = [], []
for inputs, targets in dataloader:
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
outputs = model(inputs).squeeze()
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
total_loss += loss.item()
all_preds.extend(outputs.detach().cpu().numpy())
all_targets.extend(targets.detach().cpu().numpy())
r2 = r2_score(all_targets, all_preds)
print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(dataloader):.4f}, R^2: {r2:.4f}")
def bootstrap_aggregation(ModelClass, train_dataset, num_models=5, epochs=10, batch_size=32, learning_rate=0.001):
models = []
all_r2_scores, all_mse_scores, all_mae_scores = [], [], []
for i in range(num_models):
print(f"Training Model {i+1}/{num_models}...")
subset_indices = np.random.choice(len(train_dataset), len(train_dataset), replace=True)
subset = Subset(train_dataset, subset_indices)
model = ModelClass()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
train_model(model, subset, criterion, optimizer, epochs, batch_size)
models.append(model)
# Performance evaluieren
predictions = ensemble_predict([model], HumorDataset(test_input_ids, test_labels))
mse = mean_squared_error(test_labels.numpy(), predictions)
mae = mean_absolute_error(test_labels.numpy(), predictions)
r2 = r2_score(test_labels.numpy(), predictions)
all_mse_scores.append(mse)
all_mae_scores.append(mae)
all_r2_scores.append(r2)
print(f"Model {i+1}: MSE = {mse:.4f}, MAE = {mae:.4f}, Test-R² = {r2:.4f}\n")
return models
def ensemble_predict(models, test_dataset):
dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)
all_predictions = []
with torch.no_grad():
for inputs, _ in dataloader:
inputs = inputs.to(device)
predictions = torch.stack([model(inputs).squeeze() for model in models])
avg_predictions = predictions.mean(dim=0) # Mittelwert über alle Modelle
all_predictions.extend(avg_predictions.cpu().numpy())
return np.array(all_predictions)
# 1. Gerät automatisch erkennen
device = torch.device('mps' if torch.backends.mps.is_available()
else 'cuda' if torch.cuda.is_available()
else 'cpu')
print(f"Using device: {device}")
# 2. Daten laden
data = pd.read_csv('data/hack.csv')
# 3. Filtern humorvoller Texte
humor_data = data[data['is_humor'] == 1].dropna(subset=['humor_rating']).copy()
# 4. Einbettungsmatrix erstellen
embedding_matrix, word_index, vocab_size, d_model = create_embedding_matrix(
gloVe_path='data/glove.6B.100d.txt', emb_len=100
)
print(f"vocab_size: {vocab_size}, d_model: {d_model}")
# 5. Tokenisierung und Padding
def tokenize_and_pad(texts, word_index, max_len=50):
sequences = []
for text in texts:
tokens = [word_index.get(word, 0) for word in text.split()]
if len(tokens) < max_len:
tokens += [0] * (max_len - len(tokens))
else:
tokens = tokens[:max_len]
sequences.append(tokens)
return torch.tensor(sequences, dtype=torch.long)
max_len = 50
train_texts, test_texts, train_labels, test_labels = train_test_split(
humor_data['text'], humor_data['humor_rating'], test_size=0.2, random_state=42
)
train_input_ids = tokenize_and_pad(train_texts, word_index, max_len=max_len)
test_input_ids = tokenize_and_pad(test_texts, word_index, max_len=max_len)
# Labels in Tensor konvertieren
train_labels = torch.tensor(train_labels.values, dtype=torch.float)
test_labels = torch.tensor(test_labels.values, dtype=torch.float)
# 6. Dataset und DataLoader
class HumorDataset(Dataset):
def __init__(self, input_ids, labels):
self.input_ids = input_ids
self.labels = labels
def __len__(self):
return len(self.input_ids)
def __getitem__(self, idx):
return self.input_ids[idx], self.labels[idx]
dataset = HumorDataset(train_input_ids, train_labels)
# 7. CNN-Regression-Modell
def create_cnn(vocab_size, embed_dim, embedding_matrix):
class CNNRegressor(nn.Module):
def __init__(self, vocab_size, embed_dim, embedding_matrix):
super(CNNRegressor, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.embedding.weight.data.copy_(embedding_matrix.clone().detach())
self.embedding.weight.requires_grad = False
self.conv1 = nn.Conv1d(embed_dim, 128, kernel_size=3)
self.conv2 = nn.Conv1d(128, 64, kernel_size=3)
self.dropout = nn.Dropout(0.5)
self.fc = nn.Linear(64, 1)
def forward(self, x):
x = self.embedding(x).permute(0, 2, 1)
x = torch.relu(self.conv1(x))
x = torch.relu(self.conv2(x))
x = self.dropout(x)
x = torch.max(x, dim=2).values
x = self.fc(x)
return torch.sigmoid(x) * 5
return CNNRegressor(vocab_size, embed_dim, embedding_matrix)
# 8. Bootstrap Aggregation mit CNN
models = bootstrap_aggregation(
lambda: create_cnn(vocab_size, d_model, embedding_matrix),
dataset,
num_models=5,
epochs=10,
batch_size=32,
learning_rate=0.001
)
# Vorhersagen mit Ensemble
predictions = ensemble_predict(models, HumorDataset(test_input_ids, test_labels))
actuals = test_labels.numpy()
# 9. Metriken berechnen
mse = mean_squared_error(actuals, predictions)
mae = mean_absolute_error(actuals, predictions)
r2 = r2_score(actuals, predictions)
print(f"MSE: {mse:.4f}, MAE: {mae:.4f}, R²: {r2:.4f}")
# 10. Visualisierung
tolerance = 0.5 # Toleranz für korrekte Vorhersagen
predictions = np.array(predictions)
actuals = np.array(actuals)
correct = np.abs(predictions - actuals) <= tolerance
colors = np.where(correct, 'green', 'red')
plt.figure(figsize=(8, 6))
plt.scatter(actuals, predictions, c=colors, alpha=0.6, edgecolor='k', s=50)
plt.plot([0, 5], [0, 5], color='red', linestyle='--')
green_patch = mpatches.Patch(color='green', label='Correct Predictions')
red_patch = mpatches.Patch(color='red', label='Incorrect Predictions')
plt.legend(handles=[green_patch, red_patch])
plt.xlabel("True Humor Ratings")
plt.ylabel("Predicted Humor Ratings")
plt.title("True vs Predicted Humor Ratings (Correct vs Incorrect)")
plt.show()