import torch import torch.nn as nn import torch.optim as optim import pandas as pd import numpy as np from sklearn.model_selection import train_test_split from nltk.tokenize import word_tokenize import gensim from torch.utils.data import DataLoader, Dataset from sklearn.metrics import accuracy_score import matplotlib.pyplot as plt # NLTK downloads import nltk nltk.download('punkt_tab') nltk.download('punkt') # Check if GPU is available (CUDA for NVIDIA or MPS for Apple devices) if torch.cuda.is_available(): DEVICE = torch.device('cuda') # Use CUDA if available elif torch.backends.mps.is_available(): DEVICE = torch.device('mps') # Use MPS if available else: DEVICE = torch.device('cpu') # Default to CPU if no GPU support is available print('Using device:', DEVICE) # Maximum sequence length MAX_LEN = 100 # Data processing helpers def get_embedding(model, word): if word in model.wv: return model.wv.key_to_index[word] else: return unk_index def encode_tokens(tokens): return [get_embedding(model_embedding, token) for token in tokens] def pad_sequences(sequences, MAX_LEN): return np.array([ np.pad(seq, (0, MAX_LEN - len(seq)), mode='constant', constant_values=unk_index) if len(seq) < MAX_LEN else seq[:MAX_LEN] for seq in sequences ]) # Dataset class class HumorDataset(Dataset): def __init__(self, encodings, labels): self.encodings = encodings self.labels = labels def __getitem__(self, idx): item = {'input_ids': torch.tensor(self.encodings[idx], dtype=torch.long)} item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float) return item def __len__(self): return len(self.labels) # CNN Model class CNNBinaryClassifier(nn.Module): def __init__(self, vocab_size, embed_dim, num_filters, kernel_sizes, hidden_dim, dropout=0.1): super(CNNBinaryClassifier, self).__init__() self.embedding = nn.Embedding(vocab_size, embed_dim) self.conv_layers = nn.ModuleList([ nn.Conv1d(in_channels=embed_dim, out_channels=num_filters, kernel_size=k) for k in kernel_sizes ]) self.fc = nn.Linear(num_filters * len(kernel_sizes), hidden_dim) self.out = nn.Linear(hidden_dim, 1) self.relu = nn.ReLU() self.dropout = nn.Dropout(dropout) self.sigmoid = nn.Sigmoid() def forward(self, input_ids): embedded = self.embedding(input_ids).permute(0, 2, 1) conv_outs = [self.relu(conv(embedded)) for conv in self.conv_layers] pooled_outs = [torch.max(out, dim=2)[0] for out in conv_outs] concatenated = torch.cat(pooled_outs, dim=1) fc_out = self.relu(self.fc(self.dropout(concatenated))) logits = self.out(fc_out) return self.sigmoid(logits) # Main if __name__ == "__main__": # Load and process data df = pd.read_csv('ANLP_WS24_CA2/data/hack.csv') # Ensure this file exists print(f"Loaded dataset: {df.shape}") X = df['text'] y = df['is_humor'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Tokenize the datapp train_tokens = [word_tokenize(text.lower()) for text in X_train] test_tokens = [word_tokenize(text.lower()) for text in X_test] # Train Word2Vec model model_embedding = gensim.models.Word2Vec(train_tokens, window=5, min_count=5, workers=4) # Add unknown token model_embedding.wv.add_vector('', np.zeros(model_embedding.vector_size)) unk_index = model_embedding.wv.key_to_index[''] # Encode tokens train_encodings = [encode_tokens(tokens) for tokens in train_tokens] test_encodings = [encode_tokens(tokens) for tokens in test_tokens] # Pad sequences train_encodings = pad_sequences(train_encodings, MAX_LEN) test_encodings = pad_sequences(test_encodings, MAX_LEN) # Create datasets train_dataset = HumorDataset(train_encodings, y_train.reset_index(drop=True)) test_dataset = HumorDataset(test_encodings, y_test.reset_index(drop=True)) # Model parameters vocab_size = len(model_embedding.wv.key_to_index) embed_dim = model_embedding.vector_size num_filters = 200 kernel_sizes = [3, 4, 5] hidden_dim = 128 dropout = 0.5 model = CNNBinaryClassifier(vocab_size, embed_dim, num_filters, kernel_sizes, hidden_dim, dropout) # Training parameters epochs = 10 batch_size = 8 learning_rate = 2e-5 optimizer = optim.Adam(model.parameters(), lr=learning_rate) criterion = nn.BCELoss() train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) # Move model to device model.to(DEVICE) # Training loop with loss visualization print("Starting training...") train_losses = [] val_losses = [] for epoch in range(epochs): epoch_loss = 0 model.train() for batch in train_loader: optimizer.zero_grad() input_ids = batch['input_ids'].to(DEVICE) labels = batch['labels'].unsqueeze(1).to(DEVICE) outputs = model(input_ids) loss = criterion(outputs, labels) loss.backward() optimizer.step() epoch_loss += loss.item() train_loss = epoch_loss / len(train_loader) train_losses.append(train_loss) # Evaluation during training model.eval() val_loss = 0 with torch.no_grad(): for batch in test_loader: input_ids = batch['input_ids'].to(DEVICE) labels = batch['labels'].unsqueeze(1).to(DEVICE) outputs = model(input_ids) loss = criterion(outputs, labels) val_loss += loss.item() val_loss /= len(test_loader) val_losses.append(val_loss) print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}") # Final evaluation print("Starting evaluation...") model.eval() predictions, true_labels = [], [] with torch.no_grad(): for batch in test_loader: input_ids = batch['input_ids'].to(DEVICE) labels = batch['labels'].unsqueeze(1).to(DEVICE) outputs = model(input_ids) preds = (outputs > 0.5).float() predictions.extend(preds.cpu().numpy()) true_labels.extend(labels.cpu().numpy()) accuracy = accuracy_score(true_labels, predictions) print(f"Final Accuracy: {accuracy:.4f}") # Visualize Losses plt.plot(train_losses, label="Train Loss") plt.plot(val_losses, label="Validation Loss") plt.xlabel("Epochs") plt.ylabel("Loss") plt.title("Loss Curve") plt.legend() plt.show()