diff --git a/cnn.py b/cnn.py new file mode 100644 index 0000000..25c0442 --- /dev/null +++ b/cnn.py @@ -0,0 +1,166 @@ +import torch +import torch.nn as nn +import torch.optim as optim +import pandas as pd +import numpy as np +from sklearn.model_selection import train_test_split +from nltk.tokenize import word_tokenize +import gensim +from torch.utils.data import DataLoader, Dataset +from sklearn.metrics import accuracy_score +import time + +# Check if GPU is available +DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') +print('Using device:', DEVICE) + +# Maximum sequence length +MAX_LEN = 100 + +# NLTK downloads +import nltk +nltk.download('punkt') + +# Data processing helpers +def get_embedding(model, word): + if word in model.wv: + return model.wv.key_to_index[word] + else: + return unk_index + +def encode_tokens(tokens): + return [get_embedding(model_embedding, token) for token in tokens] + +def pad_sequences(sequences, MAX_LEN): + return np.array([ + np.pad(seq, (0, MAX_LEN - len(seq)), mode='constant', constant_values=unk_index) + if len(seq) < MAX_LEN else seq[:MAX_LEN] + for seq in sequences + ]) +# Dataset class +class HumorDataset(Dataset): + def __init__(self, encodings, labels): + self.encodings = encodings + self.labels = labels + + def __getitem__(self, idx): + item = {'input_ids': torch.tensor(self.encodings[idx], dtype=torch.long)} + item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float) + return item + + def __len__(self): + return len(self.labels) + +# CNN Model +class CNNBinaryClassifier(nn.Module): + def __init__(self, vocab_size, embed_dim, num_filters, kernel_sizes, hidden_dim, dropout=0.1): + super(CNNBinaryClassifier, self).__init__() + self.embedding = nn.Embedding(vocab_size, embed_dim) + self.conv_layers = nn.ModuleList([ + nn.Conv1d(in_channels=embed_dim, out_channels=num_filters, kernel_size=k) + for k in kernel_sizes + ]) + self.fc = nn.Linear(num_filters * len(kernel_sizes), hidden_dim) + self.out = nn.Linear(hidden_dim, 1) + self.relu = nn.ReLU() + self.dropout = nn.Dropout(dropout) + self.sigmoid = nn.Sigmoid() + + def forward(self, input_ids): + embedded = self.embedding(input_ids).permute(0, 2, 1) + conv_outs = [self.relu(conv(embedded)) for conv in self.conv_layers] + pooled_outs = [torch.max(out, dim=2)[0] for out in conv_outs] + concatenated = torch.cat(pooled_outs, dim=1) + fc_out = self.relu(self.fc(self.dropout(concatenated))) + logits = self.out(fc_out) + return self.sigmoid(logits) + +# Main +if __name__ == "__main__": + # Load and process data + df = pd.read_csv('data/hack.csv') # Ensure this file exists + print(f"Loaded dataset: {df.shape}") + + X = df['text'] + y = df['is_humor'] + + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) + + # Tokenize the datapp + train_tokens = [word_tokenize(text.lower()) for text in X_train] + test_tokens = [word_tokenize(text.lower()) for text in X_test] + + # Train Word2Vec model + model_embedding = gensim.models.Word2Vec(train_tokens, window=5, min_count=5, workers=4) + + # Add unknown token + model_embedding.wv.add_vector('', np.zeros(model_embedding.vector_size)) + unk_index = model_embedding.wv.key_to_index[''] + + # Encode tokens + train_encodings = [encode_tokens(tokens) for tokens in train_tokens] + test_encodings = [encode_tokens(tokens) for tokens in test_tokens] + + # Pad sequences + train_encodings = pad_sequences(train_encodings, MAX_LEN) + test_encodings = pad_sequences(test_encodings, MAX_LEN) + + # Create datasets + train_dataset = HumorDataset(train_encodings, y_train.reset_index(drop=True)) + test_dataset = HumorDataset(test_encodings, y_test.reset_index(drop=True)) + + # Model parameters + vocab_size = len(model_embedding.wv.key_to_index) + embed_dim = model_embedding.vector_size + num_filters = 100 + kernel_sizes = [3, 4, 5] + hidden_dim = 128 + dropout = 0.5 + + model = CNNBinaryClassifier(vocab_size, embed_dim, num_filters, kernel_sizes, hidden_dim, dropout) + + # Training parameters + epochs = 10 + batch_size = 8 + learning_rate = 2e-5 + + optimizer = optim.Adam(model.parameters(), lr=learning_rate) + criterion = nn.BCELoss() + + train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) + test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) + + # Move model to device + model.to(DEVICE) + + # Training loop + print("Starting training...") + model.train() + for epoch in range(epochs): + epoch_loss = 0 + for batch in train_loader: + optimizer.zero_grad() + input_ids = batch['input_ids'].to(DEVICE) + labels = batch['labels'].unsqueeze(1).to(DEVICE) + outputs = model(input_ids) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + epoch_loss += loss.item() + print(f"Epoch {epoch + 1}, Loss: {epoch_loss / len(train_loader):.4f}") + + # Evaluation loop + print("Starting evaluation...") + model.eval() + predictions, true_labels = [], [] + with torch.no_grad(): + for batch in test_loader: + input_ids = batch['input_ids'].to(DEVICE) + labels = batch['labels'].unsqueeze(1).to(DEVICE) + outputs = model(input_ids) + preds = outputs.round() + predictions.extend(preds.cpu().numpy()) + true_labels.extend(labels.cpu().numpy()) + + accuracy = accuracy_score(true_labels, predictions) + print(f"Accuracy: {accuracy}")