ANLP_WS24_CA2/dataset_generator.py

207 lines
7.4 KiB
Python

"""
This file contains the dataset generation and preprocessing.
"""
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from nltk.tokenize import word_tokenize
import gensim
import torch
import os
import copy
import regex as re
import HumorDataset
# def load_glove_embeddings(glove_file_path):
# embeddings_index = {}
# with open(glove_file_path, 'r', encoding='utf-8') as f:
# for line in f:
# try:
# values = line.split()
# #print(values)
# word = values[0]
# coefs = np.asarray(values[1:], dtype='float32')
# embeddings_index[word] = coefs
# except ValueError:
# print('Error with line:', line[:100])
# return embeddings_index
def load_glove_embeddings(glove_file_path, emb_len=100):
embeddings_index = {}
with open(glove_file_path, 'r', encoding='utf-8') as f:
for line in f:
try:
# Use regex to split the line into word and coefficients
match = re.match(r"(.+?)\s+([\d\s\.\-e]+)", line)
# regex explanation: Match word followed by one or more spaces and then the coefficients
if match:
word = match.group(1)
coefs = np.fromstring(match.group(2), sep=' ', dtype='float32')
#check list length
if len(coefs) != emb_len:
print('Skip: Length mismatch with line:', line[:100])
else:
embeddings_index[word] = coefs
else:
print('Error with line:', line[:100])
except ValueError:
print('Error with line:', line[:100])
return embeddings_index
def create_embbedings_matrix(embeddings_glove, max_len=100):
embeddings_glove['<UNK>'] = np.random.rand(max_len)
embeddings_glove['<PAD>'] = np.zeros(max_len)
# Create a word index (vocabulary)
word_index = {word: idx for idx, word in enumerate(embeddings_glove.keys())}
# Special tokens are in the word index
word_index['<UNK>'] = len(word_index) - 2
word_index['<PAD>'] = len(word_index) - 1
# print len of word_index
print(len(word_index))
# Create an embedding matrix
embedding_dim = len(next(iter(embeddings_glove.values())))
embedding_matrix = np.zeros((len(word_index), embedding_dim))
for word, idx in word_index.items():
embedding_vector = embeddings_glove.get(word)
if embedding_vector is not None:
embedding_matrix[idx] = embedding_vector
# Convert the embedding matrix to a tensor
embedding_matrix = torch.tensor(embedding_matrix, dtype=torch.float32)
return embedding_matrix, word_index
def create_embedding_matrix(gloVe_path='glove.6B/glove.6B.100d.txt', emb_len=100):
embeddings_glove = load_glove_embeddings(gloVe_path, emb_len=emb_len)
embedding_matrix, word_index = create_embbedings_matrix(embeddings_glove)
vocab_size = len(embedding_matrix)
d_model = len(embedding_matrix[0])
vocab_size, d_model = embedding_matrix.size()
print(f"vocab_size: {vocab_size}, d_model: {d_model}")
return embedding_matrix, word_index, vocab_size, d_model
def load_preprocess_data(path_data='data/hack.csv'):
df = pd.read_csv(path_data)
df = df.dropna(subset=['humor_rating'])
# find median of humor_rating
median_rating = df['humor_rating'].median()
df['y'] = df['humor_rating'] > median_rating
X = df['text']
y = df['y']
return X, y
def encode_tokens(tokens, embedding_index, default_vector_len=100):
return [embedding_index.get(token, np.random.zeros(default_vector_len)) for token in tokens]
def pad_sequences(sequences, max_len, pad_index):
return np.array([np.pad(seq, (0, max_len - len(seq)), mode='constant', constant_values=pad_index) if len(seq) < max_len else seq[:max_len] for seq in sequences])
def split_data(X, y, test_size=0.1, val_size=0.1):
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=test_size + val_size, random_state=42)
val_split_ratio = val_size / (test_size + val_size)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=1 - val_split_ratio, random_state=42)
ret_dict = {
'train': {'X': X_train, 'y': y_train},
'test': {'X': X_test, 'y': y_test},
'val': {'X': X_val, 'y': y_val}
}
# for each print len
for key in ret_dict.keys():
print(key, len(ret_dict[key]['X']), len(ret_dict[key]['y']))
return ret_dict
def save_data(data_dict, path, prefix, vocab_size=0, emb_dim=None):
if not os.path.exists(path):
print('Creating directory:', path)
os.makedirs(path)
print('saving data into:', path)
for key, value in data_dict.items():
# tansform to Dataset
dataset = HumorDataset(value['X'], value['y'], vocab_size, emb_dim)
# save dataset
torch.save(dataset, path + prefix + key + '.pt')
if __name__ == "__main__":
# Load the data from csv
df = pd.read_csv('data/hack.csv')
print(df.shape)
df = df.dropna(subset=['humor_rating'])
# find median of humor_rating
median_rating = df['humor_rating'].median()
#print('median and therefore middle of humor_rating:', median_rating)
df['y'] = df['humor_rating'] > median_rating
# transfrom data into dataset
X = df['text']
y = df['y']
# Tokenize the data with nltk
tokens = [word_tokenize(text.lower()) for text in X]
vocab_size = len(set([word for sentence in tokens for word in sentence]))
print('vocab size:', vocab_size)
# Pad the sequences
# NOTE: Info comes from data explore notebook: 280 is max length,
# 139 contains 80% and 192 contains 95% of the data
max_len = 280
padded_indices = pad_sequences(tokens, max_len=max_len, pad_index='<PAD>')
# split data into train, test, and validation
data_dict = split_data(padded_indices, y)
# data_idx_based = copy.deepcopy(data_dict)
# vector_based = False
# for key in data_idx_based.keys():
# data_idx_based[key]['X'] = [encode_tokens(tokens, vector_based) for tokens in data_dict[key]['X']]
# # print shape of data
# #print(key, len(data_dict[key]['X']), len(data_dict[key]['y']))
# # save the data
# save_data(data_idx_based, 'data/idx_based_padded/', '', vocab_size)
print('loading GloVe embeddings')
# Load GloVe embeddings
glove_file_path = 'glove.6B/glove.6B.100d.txt'
#glove_file_path = 'glove.840B.300d/glove.840B.300d.txt'
embeddings_index = load_glove_embeddings(glove_file_path)
emb_len = 100
print('starting with embedding the data')
# Encode the tokens
#for key in data_dict.keys():
#data_dict[key]['X'] = [get_embedding_glove_vector(tokens, embeddings_index, default_vector_len=emb_len) for tokens in data_dict[key]['X']]
# print shape of data
#print(key, len(data_dict[key]['X']), len(data_dict[key]['y']))
# Save the data
#save_data(data_dict, 'data/embedded_padded/', '', vocab_size, emb_dim=model_embedding.vector_size)
max_len = 100
gloVe_path = 'glove.6B/glove.6B.100d.txt'
embeddings_glove = load_glove_embeddings(gloVe_path, emb_len=max_len)
embeddings_glove['<UNK>'] = np.random.rand(max_len)
embeddings_glove['<PAD>'] = np.zeros(max_len)