266 lines
9.8 KiB
Python
266 lines
9.8 KiB
Python
# PyTorch Imports
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.optim as optim
|
|
from torch.utils.data import Dataset, DataLoader
|
|
# scikit-learn Imports
|
|
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score
|
|
from sklearn.model_selection import train_test_split
|
|
# Bert imports
|
|
from transformers import BertForSequenceClassification, AutoTokenizer
|
|
#Default imports (pandas, numpy, matplotlib, etc.)
|
|
import pandas as pd
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
|
|
## Select Device
|
|
if torch.cuda.is_available():
|
|
DEVICE = torch.device("cuda")
|
|
else:
|
|
DEVICE = torch.device("cpu")
|
|
|
|
|
|
class SimpleHumorDataset(Dataset):
|
|
def __init__(self,tokenizer:AutoTokenizer,dataframe:pd.DataFrame,max_length:int=128):
|
|
super(SimpleHumorDataset,self).__init__()
|
|
self.tokenizer = tokenizer
|
|
self.max_length = max_length
|
|
self.text = dataframe['text'].to_numpy()
|
|
self.labels = dataframe['is_humor'].to_numpy()
|
|
|
|
def __getitem__(self,idx:int):
|
|
text = self.text[idx]
|
|
labels = self.labels[idx]
|
|
encoding = self.tokenizer(
|
|
text,
|
|
padding="max_length",
|
|
return_attention_mask = True,
|
|
max_length=self.max_length,
|
|
truncation = True,
|
|
return_tensors = 'pt'
|
|
)
|
|
input_ids = encoding['input_ids'].flatten()
|
|
attention_mask = encoding['attention_mask'].flatten()
|
|
|
|
return {
|
|
'input_ids': torch.as_tensor(input_ids,dtype=torch.long),
|
|
'attention_mask':torch.as_tensor(attention_mask,dtype=torch.long),
|
|
'labels':torch.tensor(labels,dtype=torch.long)
|
|
}
|
|
|
|
def __len__(self):
|
|
return len(self.labels)
|
|
|
|
class CustomBert(nn.Module):
|
|
def __init__(self,dropout):
|
|
super().__init__()
|
|
#Bert + Custom Layers (Not a tuple any longer -- idk why)
|
|
self.bfsc = BertForSequenceClassification.from_pretrained("bert-base-uncased")
|
|
self.dropout = nn.Dropout(dropout)
|
|
self.classifier = nn.Linear(2,2)
|
|
# self.sm = nn.Softmax(dim=1)
|
|
|
|
def forward(self, input_ids, attention_mask):
|
|
x = self.bfsc(input_ids, attention_mask = attention_mask)
|
|
x = self.dropout(x[0])
|
|
x = self.classifier(x)
|
|
return x
|
|
|
|
|
|
def freeze_bert_params(self):
|
|
for param in self.bfsc.named_parameters():
|
|
param[1].requires_grad_(False)
|
|
|
|
def unfreeze_bert_params(self):
|
|
for param in self.bfsc.named_parameters():
|
|
param[1].requires_grad_(True)
|
|
|
|
def training_loop(model:CustomBert,criterion:nn.CrossEntropyLoss,optimizer:optim.AdamW,train_loader:DataLoader,freeze_bert:bool=False):
|
|
model.train()
|
|
if freeze_bert:
|
|
model.freeze_bert_params()
|
|
|
|
total_loss = 0
|
|
len_train_loader = len(train_loader)
|
|
for train_batch in train_loader:
|
|
|
|
# Set Gradient to Zero
|
|
optimizer.zero_grad()
|
|
|
|
# Unpack batch values and "push" it to GPU
|
|
input_ids, att_mask, labels = train_batch.values()
|
|
input_ids, att_mask, labels = input_ids.to(DEVICE),att_mask.to(DEVICE),labels.to(DEVICE)
|
|
|
|
# Feed Model with Data
|
|
outputs = model(input_ids, attention_mask=att_mask)
|
|
# print(f"{model.bfsc.}")
|
|
# print(f"{outputs.shape}")
|
|
loss = criterion(outputs,labels)
|
|
loss.backward()
|
|
optimizer.step()
|
|
total_loss+=loss.item()
|
|
|
|
print(f"Training Loss is {(total_loss/len(train_loader)):.4f}")
|
|
return (total_loss/len(train_loader))
|
|
|
|
def eval_loop(model:CustomBert,criterion:nn.CrossEntropyLoss,validation_loader:DataLoader):
|
|
model.eval()
|
|
total, correct = 0.0, 0.0
|
|
total_loss = 0.0
|
|
best_loss = float("Inf")
|
|
with torch.no_grad():
|
|
for val_batch in validation_loader:
|
|
|
|
input_ids, att_mask ,labels = val_batch.values()
|
|
input_ids, att_mask, labels = input_ids.to(DEVICE),att_mask.to(DEVICE), labels.to(DEVICE)
|
|
|
|
outputs = model(input_ids,attention_mask=att_mask)
|
|
|
|
loss = criterion(outputs,labels)
|
|
total_loss += loss.item()
|
|
|
|
predictions = torch.argmax(outputs,1)
|
|
total += labels.size(0)
|
|
correct += (predictions == labels).sum().item()
|
|
|
|
if total_loss/len(validation_loader) < best_loss:
|
|
best_loss = total_loss/len(validation_loader)
|
|
torch.save(model,"best_bert_model.pt")
|
|
|
|
print(f"Validation Loss: {total_loss/len(validation_loader):.4f} ### Validation Accuracy {correct/total*100:.4f}%")
|
|
return total_loss/len(validation_loader)
|
|
|
|
def test_loop(model:CustomBert, test_loader:DataLoader):
|
|
for batch in test_loader:
|
|
input_ids, att_mask, labels = batch.values()
|
|
input_ids, att_mask, labels = input_ids.to(DEVICE), att_mask.to(DEVICE), labels.to(DEVICE)
|
|
with torch.no_grad():
|
|
model = torch.load("best_bert_model")
|
|
model.to(DEVICE)
|
|
output = model(input_ids,att_mask)
|
|
output.detach().cpu().numpy()
|
|
labels.detach().cpu().numpy()
|
|
pred_flat = np.argmax(output,1).flatten()
|
|
print(accuracy_score(labels,pred_flat))
|
|
|
|
def plot_metrics_loss_n_acc(train_loss,validation_loss,train_acc,validation_acc):
|
|
"""
|
|
Method that plots Loss and Accuracy of Training and Validation Data used in given modelinstance
|
|
"""
|
|
# Visualize Training Loss
|
|
# plt.plot(loss_values)
|
|
# plt.plot(eval_values)
|
|
# plt.hlines(np.mean(loss_values),xmin=0,xmax=EPOCH,colors='red',linestyles="dotted",label="Average Loss")
|
|
# plt.hlines(np.mean(eval_values),xmin=0,xmax=EPOCH,colors='green',linestyles="dashed",label="Average Val Loss")
|
|
# plt.title("Test Loss")
|
|
# plt.xlabel("Num Epochs")
|
|
# plt.ylabel("Total Loss of Epoch")
|
|
# plt.show()
|
|
pass
|
|
|
|
def plot_test_metrics(accuracy):
|
|
"""
|
|
Plot Test Metrics of Model (Confiuson Matrix, Accuracy)
|
|
"""
|
|
plt.plot(accuracy)
|
|
plt.hlines(np.mean(accuracy),0,len(accuracy),'red','dotted','Mean Accuracy %d'.format(np.mean(accuracy)))
|
|
plt.title("Accuracy of Test")
|
|
plt.xlabel("Num Epochs")
|
|
plt.ylabel("Accurcy 0.0 - 1.0")
|
|
plt.grid(True)
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
# def performance_metrics(true_labels,predictions):
|
|
# confusion_matrix(true_labels,predictions)
|
|
# accuracy_score(true_labels,predictions)
|
|
# f1_score(true_labels,predictions)
|
|
# pass
|
|
|
|
def create_datasets(tokenizer:AutoTokenizer,dataframe:pd.DataFrame,train_split_ratio:float,val:bool=False)->tuple[SimpleHumorDataset,SimpleHumorDataset,SimpleHumorDataset]|tuple[SimpleHumorDataset,SimpleHumorDataset]:
|
|
if train_split_ratio > 1.0:
|
|
raise AssertionError("Trainsplit sollte kleiner(-gleich) 1.0 sein")
|
|
train,test = train_test_split(dataframe,train_size=train_split_ratio,random_state=501)
|
|
if val:
|
|
test,validation = train_test_split(test,train_size=.5,random_state=501)
|
|
return SimpleHumorDataset(tokenizer,train), SimpleHumorDataset(tokenizer,test), SimpleHumorDataset(tokenizer,validation)
|
|
return SimpleHumorDataset(tokenizer,train), SimpleHumorDataset(tokenizer,test)
|
|
|
|
def create_dataloaders(datasets:tuple|list,batchsize:int,shufflelist:list):
|
|
train_loader = DataLoader(datasets[0],batchsize,shuffle=shufflelist[0])
|
|
test_loader = DataLoader(datasets[1],batchsize,shuffle=shufflelist[1])
|
|
if len(datasets) == 3:
|
|
return train_loader, test_loader, DataLoader(datasets[2],batchsize,shuffle=shufflelist[2])
|
|
return train_loader, test_loader
|
|
|
|
|
|
# if __name__ == "__main__":
|
|
|
|
# # HYPERPARAMETERS
|
|
# # Set Max Epoch Amount
|
|
# EPOCH = 10
|
|
# # DROPOUT-PROBABILITY
|
|
# DROPOUT = 0.1
|
|
# # BATCHSIZE
|
|
# BATCH_SIZE = 16
|
|
# #LEARNING RATE
|
|
# LEARNING_RATE = 1e-5
|
|
# # RANDOM SEED
|
|
# RNDM_SEED = 501
|
|
# # FREEZE Bert Layers
|
|
# FREEZE = True
|
|
|
|
# torch.manual_seed(RNDM_SEED)
|
|
# np.random.seed(RNDM_SEED)
|
|
# torch.cuda.manual_seed_all(RNDM_SEED)
|
|
|
|
|
|
# Initialize Bert Model with dropout probability and port to DEVICE
|
|
# mybert = CustomBert(DROPOUT)
|
|
# print("Bert Initialized")
|
|
# mybert.to(DEVICE)
|
|
|
|
|
|
# Read Raw Data from csv and save as DataFrame
|
|
# df = pd.read_csv("./data/hack.csv",encoding="latin1")
|
|
# print("Raw Data read")
|
|
|
|
|
|
# Initialize BertTokenizer from Pretrained
|
|
# tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-uncased",do_lower_case=True)
|
|
# print("Tokenizer Initialized")
|
|
|
|
|
|
# Split DataFrame into Train and Test Sets
|
|
# Create Custom Datasets for Train and Test
|
|
# train_data,test_data,validation_data = create_datasets(tokenizer,df,.7,True)
|
|
# print("Splitted Data in Train and Test Sets")
|
|
# print("Custom Datasets created")
|
|
|
|
|
|
# Initialize Dataloader with Train and Test Sets
|
|
# train_loader, test_loader, validation_loader = create_dataloaders([train_data,test_data,validation_data],batchsize=BATCH_SIZE,shufflelist=[True,True,False])
|
|
# print("DataLoaders created")
|
|
|
|
|
|
# Set criterion to Cross Entropy and define Adam Optimizer with model parameters and learning rate
|
|
# criterion_cross_entropy = nn.CrossEntropyLoss()
|
|
# optimizer_adamW = optim.Adam(mybert.parameters(), lr = LEARNING_RATE)
|
|
# import time
|
|
|
|
|
|
# Set Scheduler for dynamically Learning Rate adjustment
|
|
loss_values, eval_values = np.zeros(EPOCH), np.zeros(EPOCH)
|
|
|
|
# for epoch in range(EPOCH):
|
|
# start = time.time()
|
|
# print(f"For {epoch+1} the Scores are: ")
|
|
# loss_values[epoch] = training_loop(mybert,optimizer=optimizer_adamW,criterion=criterion_cross_entropy,train_loader=train_loader,freeze_bert=FREEZE)
|
|
# eval_values[epoch] = eval_loop(mybert,criterion=criterion_cross_entropy,validation_loader=test_loader)
|
|
# end = time.time()
|
|
# print((end-start),"seconds per epoch needed")
|
|
|
|
# plot_metrics_loss_n_acc("x","x","x","x")
|
|
|
|
# for epoch in range(EPOCH):
|
|
# test_loop(mybert,validation_loader) |