""" This file contains the dataset generation and preprocessing. """ import pandas as pd import numpy as np from sklearn.model_selection import train_test_split import torch import regex as re def load_glove_embeddings(glove_file_path, emb_len=100): print('Loading GloVe embeddings...') embeddings_index = {} with open(glove_file_path, 'r', encoding='utf-8') as f: for line in f: try: # Use regex to split the line into word and coefficients match = re.match(r"(.+?)\s+([\d\s\.\-e]+)", line) # regex explanation: Match word followed by one or more spaces and then the coefficients if match: word = match.group(1) coefs = np.fromstring(match.group(2), sep=' ', dtype='float32') #check list length if len(coefs) != emb_len: print('Skip: Length mismatch with line:', line[:100]) else: embeddings_index[word] = coefs else: print('Error with line:', line[:100]) except ValueError: print('Error with line:', line[:100]) return embeddings_index def create_embbedings_matrix(embeddings_glove, max_len=100): embeddings_glove[''] = np.random.rand(max_len) embeddings_glove[''] = np.zeros(max_len) # Create a word index (vocabulary) word_index = {word: idx for idx, word in enumerate(embeddings_glove.keys())} # Special tokens are in the word index word_index[''] = len(word_index) - 2 word_index[''] = len(word_index) - 1 # print len of word_index print(len(word_index)) # Create an embedding matrix embedding_dim = len(next(iter(embeddings_glove.values()))) embedding_matrix = np.zeros((len(word_index), embedding_dim)) for word, idx in word_index.items(): embedding_vector = embeddings_glove.get(word) if embedding_vector is not None: embedding_matrix[idx] = embedding_vector # Convert the embedding matrix to a tensor embedding_matrix = torch.tensor(embedding_matrix, dtype=torch.float32) return embedding_matrix, word_index def get_embedding_matrix(gloVe_path='glove.6B/glove.6B.100d.txt', emb_len=100): embeddings_glove = load_glove_embeddings(gloVe_path, emb_len=emb_len) embedding_matrix, word_index = create_embbedings_matrix(embeddings_glove) vocab_size = len(embedding_matrix) d_model = len(embedding_matrix[0]) vocab_size, d_model = embedding_matrix.size() print(f"vocab_size: {vocab_size}, d_model: {d_model}") return embedding_matrix, word_index, vocab_size, d_model def load_preprocess_data(path_data='data/hack.csv', verbose=False): # Daten laden df = pd.read_csv(path_data) # Fehlende Werte in der Zielspalte entfernen df = df.dropna(subset=['humor_rating']) # Zielvariable aus der Spalte 'humor_rating' extrahieren df['y'] = df['humor_rating'].astype(float) # Sicherstellen, dass Zielvariable numerisch ist # Eingabetexte und Zielvariable zuweisen X = df['text'] y = df['y'] if verbose: print(f"Erste Zielwerte: {y.head(10)}") print(f"Datentyp der Zielvariable: {y.dtype}") print(f"Anzahl der Beispiele: {len(X)}") return X, y def split_data(X, y, test_size=0.1, val_size=0.1): X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=test_size + val_size, random_state=42) val_split_ratio = val_size / (test_size + val_size) X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=1 - val_split_ratio, random_state=42) ret_dict = { 'train': {'X': X_train, 'y': y_train}, 'test': {'X': X_test, 'y': y_test}, 'val': {'X': X_val, 'y': y_val} } # for each print len for key in ret_dict.keys(): print(key, len(ret_dict[key]['X']), len(ret_dict[key]['y'])) return ret_dict def ensemble_data_idx(labels, n_models, cur_models_idx, methods='bootstrap'): if methods == 'bootstrap': # Calculate the size of the subset subset_size = len(labels) // n_models # Calculate the start and end index of the subset start_idx = cur_models_idx * subset_size end_idx = start_idx + subset_size # Calculate the indices of the subset subset_indices = list(range(0, start_idx)) + list(range(end_idx, len(labels))) return subset_indices if methods == 'shuffle': subset_indices = np.random.permutation(len(labels)) return subset_indices if methods == 'random': subset_indices = np.random.choice(len(labels), len(labels), replace=False) return subset_indices if methods == 'flatten_normal_dist': # TODO: test this and plot if it works subset_size = len(labels) // n_models std_range = 1 mean = np.mean(labels) std = np.std(labels) # Randomly select samples arounnd the mean in the std del_subset_indices = np.random.choice(np.where((labels >= mean - std_range * std) & (labels <= mean + std_range * std))[0], size=subset_size, replace=False) subset = np.delete(labels, del_subset_indices) # TODO i dont think this really uses the indices subset_indices = np.where(np.isin(labels, subset))[0] return subset_indices else: raise ValueError(f"Unknown method: {methods}")