# PyTorch Imports import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader # scikit-learn Imports from sklearn.metrics import accuracy_score, confusion_matrix, f1_score from sklearn.model_selection import train_test_split # Bert imports from transformers import BertForSequenceClassification, AutoTokenizer #Default imports (pandas, numpy, matplotlib, etc.) import pandas as pd import numpy as np import matplotlib.pyplot as plt ## Select Device if torch.cuda.is_available(): DEVICE = torch.device("cuda") else: DEVICE = torch.device("cpu") class SimpleHumorDataset(Dataset): def __init__(self,tokenizer:AutoTokenizer,dataframe:pd.DataFrame,max_length:int=128): super(SimpleHumorDataset,self).__init__() self.tokenizer = tokenizer self.max_length = max_length self.text = dataframe['text'].to_numpy() self.labels = dataframe['is_humor'].to_numpy() def __getitem__(self,idx:int): text = self.text[idx] labels = self.labels[idx] encoding = self.tokenizer( text, padding="max_length", return_attention_mask = True, max_length=self.max_length, truncation = True, return_tensors = 'pt' ) input_ids = encoding['input_ids'].flatten() attention_mask = encoding['attention_mask'].flatten() return { 'input_ids': torch.as_tensor(input_ids,dtype=torch.long), 'attention_mask':torch.as_tensor(attention_mask,dtype=torch.long), 'labels':torch.tensor(labels,dtype=torch.long) } def __len__(self): return len(self.labels) class CustomBert(nn.Module): def __init__(self,dropout): super().__init__() #Bert + Custom Layers (Not a tuple any longer -- idk why) self.bfsc = BertForSequenceClassification.from_pretrained("bert-base-uncased") self.dropout = nn.Dropout(dropout) self.classifier = nn.Linear(2,2) # self.sm = nn.Softmax(dim=1) def forward(self, input_ids, attention_mask): x = self.bfsc(input_ids, attention_mask = attention_mask) x = self.dropout(x[0]) x = self.classifier(x) return x def freeze_bert_params(self): for param in self.bfsc.named_parameters(): param[1].requires_grad_(False) def unfreeze_bert_params(self): for param in self.bfsc.named_parameters(): param[1].requires_grad_(True) def training_loop(model:CustomBert,criterion:nn.CrossEntropyLoss,optimizer:optim.AdamW,train_loader:DataLoader,freeze_bert:bool=False): model.train() if freeze_bert: model.freeze_bert_params() total_loss = 0 len_train_loader = len(train_loader) for train_batch in train_loader: # Set Gradient to Zero optimizer.zero_grad() # Unpack batch values and "push" it to GPU input_ids, att_mask, labels = train_batch.values() input_ids, att_mask, labels = input_ids.to(DEVICE),att_mask.to(DEVICE),labels.to(DEVICE) # Feed Model with Data outputs = model(input_ids, attention_mask=att_mask) # print(f"{model.bfsc.}") # print(f"{outputs.shape}") loss = criterion(outputs,labels) loss.backward() optimizer.step() total_loss+=loss.item() print(f"Training Loss is {(total_loss/len(train_loader)):.4f}") return (total_loss/len(train_loader)) def eval_loop(model:CustomBert,criterion:nn.CrossEntropyLoss,validation_loader:DataLoader): model.eval() total, correct = 0.0, 0.0 total_loss = 0.0 best_loss = float("Inf") with torch.no_grad(): for val_batch in validation_loader: input_ids, att_mask ,labels = val_batch.values() input_ids, att_mask, labels = input_ids.to(DEVICE),att_mask.to(DEVICE), labels.to(DEVICE) outputs = model(input_ids,attention_mask=att_mask) loss = criterion(outputs,labels) total_loss += loss.item() predictions = torch.argmax(outputs,1) total += labels.size(0) correct += (predictions == labels).sum().item() if total_loss/len(validation_loader) < best_loss: best_loss = total_loss/len(validation_loader) torch.save(model,"best_bert_model.pt") print(f"Validation Loss: {total_loss/len(validation_loader):.4f} ### Validation Accuracy {correct/total*100:.4f}%") return total_loss/len(validation_loader) def test_loop(model:CustomBert, test_loader:DataLoader): for batch in test_loader: input_ids, att_mask, labels = batch.values() input_ids, att_mask, labels = input_ids.to(DEVICE), att_mask.to(DEVICE), labels.to(DEVICE) with torch.no_grad(): model = torch.load("best_bert_model") model.to(DEVICE) output = model(input_ids,att_mask) output.detach().cpu().numpy() labels.detach().cpu().numpy() pred_flat = np.argmax(output,1).flatten() print(accuracy_score(labels,pred_flat)) def plot_metrics_loss_n_acc(train_loss,validation_loss,train_acc,validation_acc): """ Method that plots Loss and Accuracy of Training and Validation Data used in given modelinstance """ # Visualize Training Loss # plt.plot(loss_values) # plt.plot(eval_values) # plt.hlines(np.mean(loss_values),xmin=0,xmax=EPOCH,colors='red',linestyles="dotted",label="Average Loss") # plt.hlines(np.mean(eval_values),xmin=0,xmax=EPOCH,colors='green',linestyles="dashed",label="Average Val Loss") # plt.title("Test Loss") # plt.xlabel("Num Epochs") # plt.ylabel("Total Loss of Epoch") # plt.show() pass def plot_test_metrics(accuracy): """ Plot Test Metrics of Model (Confiuson Matrix, Accuracy) """ plt.plot(accuracy) plt.hlines(np.mean(accuracy),0,len(accuracy),'red','dotted','Mean Accuracy %d'.format(np.mean(accuracy))) plt.title("Accuracy of Test") plt.xlabel("Num Epochs") plt.ylabel("Accurcy 0.0 - 1.0") plt.grid(True) plt.legend() plt.show() # def performance_metrics(true_labels,predictions): # confusion_matrix(true_labels,predictions) # accuracy_score(true_labels,predictions) # f1_score(true_labels,predictions) # pass def create_datasets(tokenizer:AutoTokenizer,dataframe:pd.DataFrame,train_split_ratio:float,val:bool=False)->tuple[SimpleHumorDataset,SimpleHumorDataset,SimpleHumorDataset]|tuple[SimpleHumorDataset,SimpleHumorDataset]: if train_split_ratio > 1.0: raise AssertionError("Trainsplit sollte kleiner(-gleich) 1.0 sein") train,test = train_test_split(dataframe,train_size=train_split_ratio,random_state=501) if val: test,validation = train_test_split(test,train_size=.5,random_state=501) return SimpleHumorDataset(tokenizer,train), SimpleHumorDataset(tokenizer,test), SimpleHumorDataset(tokenizer,validation) return SimpleHumorDataset(tokenizer,train), SimpleHumorDataset(tokenizer,test) def create_dataloaders(datasets:tuple|list,batchsize:int,shufflelist:list): train_loader = DataLoader(datasets[0],batchsize,shuffle=shufflelist[0]) test_loader = DataLoader(datasets[1],batchsize,shuffle=shufflelist[1]) if len(datasets) == 3: return train_loader, test_loader, DataLoader(datasets[2],batchsize,shuffle=shufflelist[2]) return train_loader, test_loader # if __name__ == "__main__": # # HYPERPARAMETERS # # Set Max Epoch Amount # EPOCH = 10 # # DROPOUT-PROBABILITY # DROPOUT = 0.1 # # BATCHSIZE # BATCH_SIZE = 16 # #LEARNING RATE # LEARNING_RATE = 1e-5 # # RANDOM SEED # RNDM_SEED = 501 # # FREEZE Bert Layers # FREEZE = True # torch.manual_seed(RNDM_SEED) # np.random.seed(RNDM_SEED) # torch.cuda.manual_seed_all(RNDM_SEED) # Initialize Bert Model with dropout probability and port to DEVICE # mybert = CustomBert(DROPOUT) # print("Bert Initialized") # mybert.to(DEVICE) # Read Raw Data from csv and save as DataFrame # df = pd.read_csv("./data/hack.csv",encoding="latin1") # print("Raw Data read") # Initialize BertTokenizer from Pretrained # tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-uncased",do_lower_case=True) # print("Tokenizer Initialized") # Split DataFrame into Train and Test Sets # Create Custom Datasets for Train and Test # train_data,test_data,validation_data = create_datasets(tokenizer,df,.7,True) # print("Splitted Data in Train and Test Sets") # print("Custom Datasets created") # Initialize Dataloader with Train and Test Sets # train_loader, test_loader, validation_loader = create_dataloaders([train_data,test_data,validation_data],batchsize=BATCH_SIZE,shufflelist=[True,True,False]) # print("DataLoaders created") # Set criterion to Cross Entropy and define Adam Optimizer with model parameters and learning rate # criterion_cross_entropy = nn.CrossEntropyLoss() # optimizer_adamW = optim.Adam(mybert.parameters(), lr = LEARNING_RATE) # import time # Set Scheduler for dynamically Learning Rate adjustment loss_values, eval_values = np.zeros(EPOCH), np.zeros(EPOCH) # for epoch in range(EPOCH): # start = time.time() # print(f"For {epoch+1} the Scores are: ") # loss_values[epoch] = training_loop(mybert,optimizer=optimizer_adamW,criterion=criterion_cross_entropy,train_loader=train_loader,freeze_bert=FREEZE) # eval_values[epoch] = eval_loop(mybert,criterion=criterion_cross_entropy,validation_loader=test_loader) # end = time.time() # print((end-start),"seconds per epoch needed") # plot_metrics_loss_n_acc("x","x","x","x") # for epoch in range(EPOCH): # test_loop(mybert,validation_loader)