base struct, bug needs to be found
parent
c444b0d451
commit
5a9cd6efad
|
|
@ -0,0 +1,144 @@
|
|||
# PyTorch Imports
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.optim as optim
|
||||
from torch.utils.data import Dataset, DataLoader
|
||||
# scikit-learn Imports
|
||||
from sklearn.metrics import accuracy_score, confusion_matrix
|
||||
from sklearn.model_selection import train_test_split
|
||||
#Gensim Imports
|
||||
import gensim
|
||||
# Bert improts
|
||||
from transformers import BertForSequenceClassification, BertTokenizer, BertPreTrainedModel, AdamW
|
||||
#Default imports (pandas, numpy, matplotlib, etc.)
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
## Select Device
|
||||
if torch.cuda.is_available():
|
||||
DEVICE = torch.device("cuda")
|
||||
else:
|
||||
DEVICE = torch.device("cpu")
|
||||
|
||||
|
||||
class SimpleHumorDataset(Dataset):
|
||||
def __init__(self,tokenizer,dataframe,max_length=256):
|
||||
super().__init__()
|
||||
self.tokenizer = tokenizer
|
||||
self.max_length = max_length
|
||||
self.text = dataframe['text'].tolist()
|
||||
self.labels = dataframe['is_humor'].tolist()
|
||||
def __getitem__(self,idx):
|
||||
text = self.text
|
||||
labels = self.labels
|
||||
encoding = self.tokenizer.encode_plus(
|
||||
text,
|
||||
add_special_tokens=True,
|
||||
padding="max_length",
|
||||
trunction = True,
|
||||
return_attention_mask = True,
|
||||
return_token_type_ids = False,
|
||||
max_length=self.max_length,
|
||||
truncation = True,
|
||||
return_tensors = 'pt'
|
||||
)
|
||||
input_ids = encoding['input_ids'].flatten()
|
||||
attention_mask = encoding['attention_mask'].flatten()
|
||||
|
||||
return {
|
||||
'input_ids': torch.as_tensor(input_ids,dtype=torch.long),
|
||||
'attention_mask':torch.as_tensor(attention_mask,dtype=torch.long),
|
||||
'labels':torch.as_tensor(self.labels,dtype=torch.long),
|
||||
'text':text
|
||||
}
|
||||
|
||||
def __len__(self):
|
||||
return len(self.labels)
|
||||
|
||||
|
||||
class BERT(nn.Module):
|
||||
def __init__(self,dropout,num_layers=2):
|
||||
super().__init__()
|
||||
self.model = BertForSequenceClassification.from_pretrained("google-bert/bert-base-uncased"),
|
||||
self.ln1 = nn.Linear(768,num_layers),
|
||||
self.dropout = nn.Dropout(dropout),
|
||||
self.sm1 = nn.Sigmoid()
|
||||
|
||||
def forward(self,input_ids,attention_mask):
|
||||
return self.sm1(self.dropout(self.ln1(self.model[0](input_ids,attention_mask))))
|
||||
# return self.model(input_ids)
|
||||
|
||||
def train(self,criterion,optimizer,train_loader):
|
||||
self.model[0].train()
|
||||
total_loss = 0
|
||||
for train_batch in train_loader:
|
||||
optimizer.zero_grad()
|
||||
input_ids, att_mask, labels,_ = train_batch
|
||||
outputs = self.forward(input_ids,att_mask)
|
||||
loss = criterion(outputs,labels)
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
total_loss+=loss.item()
|
||||
print(f"Total Loss is {(total_loss/len(train_loader)):.4f}")
|
||||
|
||||
def eval(model,criterion,validation_loader):
|
||||
model.model[0].eval()
|
||||
total_loss = 0.0
|
||||
total_acc = 0.0
|
||||
with torch.no_grad():
|
||||
for val_batch in validation_loader:
|
||||
input_ids, att_mask ,labels,_ = val_batch
|
||||
input_ids, labels = input_ids.to(DEVICE),att_mask.to(DEVICE), labels.to(DEVICE)
|
||||
outputs = model(input_ids,att_mask)
|
||||
loss = criterion(outputs,labels)
|
||||
total_loss += loss.item()
|
||||
predictions = torch.argmax(outputs,dim=1)
|
||||
total_acc += (predictions == labels).sum().item()
|
||||
print(f"Total Loss: {total_loss/len(validation_loader)} ### Test Accuracy {total_acc/len(validation_loader)*100}%")
|
||||
|
||||
def generate_tokens(tokenizer,raw_data):
|
||||
return tokenizer.encode_plus(
|
||||
raw_data,
|
||||
add_special_tokens=True,
|
||||
padding="max_length",
|
||||
return_attention_mask = True,
|
||||
return_token_type_ids = False,
|
||||
max_length=128,
|
||||
truncation = True,
|
||||
return_tensors = 'pt'
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
# Initialize Bert Model with dropout probability and Num End Layers
|
||||
bert = BERT(0.1)
|
||||
|
||||
# Set Max Epoch Amount
|
||||
EPOCH = 50
|
||||
|
||||
# Read Raw Data from csv and save as DataFrame
|
||||
df = pd.read_csv("./data/hack.csv",encoding="latin1")
|
||||
|
||||
# Initialize BertTokenizer from Pretrained
|
||||
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased",do_lower_case=True)
|
||||
|
||||
#Split DataFrame into Train and Test Sets
|
||||
train,test = train_test_split(df,random_state=501,test_size=.2)
|
||||
|
||||
# Create Custom Datasets for Train and Test
|
||||
train_data = SimpleHumorDataset(tokenizer,train)
|
||||
test_data = SimpleHumorDataset(tokenizer,test)
|
||||
|
||||
# Initialize Dataloader with Train and Test Sets
|
||||
train_loader = DataLoader(dataset=train_data,batch_size=16,shuffle=True)
|
||||
test_loader = DataLoader(dataset=test_data,batch_size=len(test_data))
|
||||
|
||||
# Set criterion to BCELoss (Binary Cross Entropy) and define Adam Optimizer with model parameters and learning rate
|
||||
criterion_bce = nn.BCELoss()
|
||||
optimizer_adam = optim.Adam(bert.model[0].parameters(), lr = 1e-5)
|
||||
|
||||
for epoch in range(EPOCH):
|
||||
print(f"For {epoch} the Scores are: ")
|
||||
bert.train(optimizer=optimizer_adam,criterion=criterion_bce,train_loader=train_loader)
|
||||
bert.eval(criterion=criterion_bce,validation_loader=test_loader)
|
||||
Loading…
Reference in New Issue