ANLP_WS24_CA2/dataset_helper.py

138 lines
5.4 KiB
Python

"""
This file contains the dataset generation and preprocessing.
"""
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import torch
import regex as re
def load_glove_embeddings(glove_file_path, emb_len=100):
print('Loading GloVe embeddings...')
embeddings_index = {}
with open(glove_file_path, 'r', encoding='utf-8') as f:
for line in f:
try:
# Use regex to split the line into word and coefficients
match = re.match(r"(.+?)\s+([\d\s\.\-e]+)", line)
# regex explanation: Match word followed by one or more spaces and then the coefficients
if match:
word = match.group(1)
coefs = np.fromstring(match.group(2), sep=' ', dtype='float32')
#check list length
if len(coefs) != emb_len:
print('Skip: Length mismatch with line:', line[:100])
else:
embeddings_index[word] = coefs
else:
print('Error with line:', line[:100])
except ValueError:
print('Error with line:', line[:100])
return embeddings_index
def create_embbedings_matrix(embeddings_glove, max_len=100):
embeddings_glove['<UNK>'] = np.random.rand(max_len)
embeddings_glove['<PAD>'] = np.zeros(max_len)
# Create a word index (vocabulary)
word_index = {word: idx for idx, word in enumerate(embeddings_glove.keys())}
# Special tokens are in the word index
word_index['<UNK>'] = len(word_index) - 2
word_index['<PAD>'] = len(word_index) - 1
# print len of word_index
print(len(word_index))
# Create an embedding matrix
embedding_dim = len(next(iter(embeddings_glove.values())))
embedding_matrix = np.zeros((len(word_index), embedding_dim))
for word, idx in word_index.items():
embedding_vector = embeddings_glove.get(word)
if embedding_vector is not None:
embedding_matrix[idx] = embedding_vector
# Convert the embedding matrix to a tensor
embedding_matrix = torch.tensor(embedding_matrix, dtype=torch.float32)
return embedding_matrix, word_index
def get_embedding_matrix(gloVe_path='glove.6B/glove.6B.100d.txt', emb_len=100):
embeddings_glove = load_glove_embeddings(gloVe_path, emb_len=emb_len)
embedding_matrix, word_index = create_embbedings_matrix(embeddings_glove)
vocab_size = len(embedding_matrix)
d_model = len(embedding_matrix[0])
vocab_size, d_model = embedding_matrix.size()
print(f"vocab_size: {vocab_size}, d_model: {d_model}")
return embedding_matrix, word_index, vocab_size, d_model
def load_preprocess_data(path_data='data/hack.csv', verbose=False):
# Daten laden
df = pd.read_csv(path_data)
# Fehlende Werte in der Zielspalte entfernen
df = df.dropna(subset=['humor_rating'])
# Zielvariable aus der Spalte 'humor_rating' extrahieren
df['y'] = df['humor_rating'].astype(float) # Sicherstellen, dass Zielvariable numerisch ist
# Eingabetexte und Zielvariable zuweisen
X = df['text']
y = df['y']
if verbose:
print(f"Erste Zielwerte: {y.head(10)}")
print(f"Datentyp der Zielvariable: {y.dtype}")
print(f"Anzahl der Beispiele: {len(X)}")
return X, y
def split_data(X, y, test_size=0.1, val_size=0.1):
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=test_size + val_size, random_state=42)
val_split_ratio = val_size / (test_size + val_size)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=1 - val_split_ratio, random_state=42)
ret_dict = {
'train': {'X': X_train, 'y': y_train},
'test': {'X': X_test, 'y': y_test},
'val': {'X': X_val, 'y': y_val}
}
# for each print len
for key in ret_dict.keys():
print(key, len(ret_dict[key]['X']), len(ret_dict[key]['y']))
return ret_dict
def ensemble_data_idx(labels, n_models, cur_models_idx, methods='bootstrap'):
if methods == 'bootstrap':
# Calculate the size of the subset
subset_size = len(labels) // n_models
# Calculate the start and end index of the subset
start_idx = cur_models_idx * subset_size
end_idx = start_idx + subset_size
# Calculate the indices of the subset
subset_indices = list(range(0, start_idx)) + list(range(end_idx, len(labels)))
return subset_indices
if methods == 'shuffle':
subset_indices = np.random.permutation(len(labels))
return subset_indices
if methods == 'random':
subset_indices = np.random.choice(len(labels), len(labels), replace=False)
return subset_indices
if methods == 'flatten_normal_dist':
# TODO: test this and plot if it works
subset_size = len(labels) // n_models
std_range = 1
mean = np.mean(labels)
std = np.std(labels)
# Randomly select samples arounnd the mean in the std
del_subset_indices = np.random.choice(np.where((labels >= mean - std_range * std) & (labels <= mean + std_range * std))[0], size=subset_size, replace=False)
subset = np.delete(labels, del_subset_indices)
# TODO i dont think this really uses the indices
subset_indices = np.where(np.isin(labels, subset))[0]
return subset_indices
else:
raise ValueError(f"Unknown method: {methods}")