DSA_SS24/skripts/generate_data.py

166 lines
5.9 KiB
Python

import wfdb
import os
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import pickle
#project_dir = "C:/Users/Nils/Documents/0000MASTER/IM1/DSA/a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0/a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0"
# Directories and file paths
# --------------------------------------------------------------------------------
# Specify the directory where the WFDB records are stored
project_dir = 'C:/Users/felix/OneDrive/Studium/Master MDS/1 Semester/DSA/physionet/large_12_ecg_data/a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0'
data_dir = project_dir + '/WFDBRecords'
path_diag_lookup = "C:/Users/felix/OneDrive/Studium/Master MDS/1 Semester/DSA/physionet/large_12_ecg_data/a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0/ConditionNames_SNOMED-CT.csv"
#path_diag_lookup = project_dir + '/ConditionNames_SNOMED-CT.csv'
#project_dir +'/ConditionNames_SNOMED-CT.csv'
# --------------------------------------------------------------------------------
def get_diagnosis_ids(record):
# Get the diagnosis
diagnosis = record.comments[2]
# clean the diagnosis
diagnosis = diagnosis.replace('Dx: ', '')
list_diagnosis = [int(x.strip()) for x in diagnosis.split(',')]
return list_diagnosis
def get_diagnosis_name(diagnosis):
# get the diagnosis name from the lookup table
name = [diagnosis_lookup[diagnosis_lookup['Snomed_CT'] == x]['Full Name'].to_string(index=False) for x in diagnosis]
return name
def filter_signal_df_on_diag(df_dict, diagnosis_dict, filter_codes_df):
# Create a list with filter codes and add 0 for padding
filter_cod_li = list(filter_codes_df['Snomed_CT']) + [0]
# Filter the diagnosis dictionary based on the filter codes
filter_dict_diag = {k: v for k, v in diagnosis_dict.items() if all(i in filter_cod_li for i in v)}
# Filter the df_dict based on the filtered_dict_diag
filtered_df_dict = {i: df.loc[df.index.isin(filter_dict_diag.keys())] for i, df in df_dict.items()}
return filtered_df_dict
# --------------------------------------------------------------------------------
# Explore the data
# --------------------------------------------------------------------------------
# Read the diagnosis lookup table
diagnosis_lookup = pd.read_csv(path_diag_lookup)
#print(diagnosis_lookup.head())
# Filter data based on the diagnosis
# ----------------------------------------------
"""
SB, Sinusbradykardie
AFIB, Vorhofflimmern und Vorhofflattern (AFL)
GSVT, supraventrikulärer Tachykardie, Vorhoftachykardie, AV-Knoten-Reentry-Tachykardie, AV-Reentry-Tachykardie, Vorhofschrittmacher
SR Sinusrhythmus und Sinusunregelmäßigkeiten
(Vorhofschrittmacher = 713422000)
"""
categories = {
'SB': [426177001],
'AFIB': [164889003, 164890007],
'GSVT': [426761007, 713422000, 233896004, 233897008, 713422000],
'SR': [426783006, 427393009]
}
#diag_dict = {k: 0 for k in categories.keys()}
diag_dict = {k: [] for k in categories.keys()}
# Create a counter for the number of records
counter = 0
max_counter = 100#100_000
# Loop through the records
for dir_th in os.listdir(data_dir):
path_to_1000_records = data_dir + '/' + dir_th
for dir_hd in os.listdir(path_to_1000_records):
path_to_100_records = path_to_1000_records + '/' + dir_hd
for record_name in os.listdir(path_to_100_records):
# check if .hea is in the record_name
if '.hea' not in record_name:
continue
# Remove the .hea extension from record_name
record_name = record_name.replace('.hea', '')
try:
# Read the record
record = wfdb.rdrecord(path_to_100_records + '/' + record_name)
# Get the diagnosis
diagnosis = np.array(get_diagnosis_ids(record))
# check if diagnosis is a subset of one of the categories
for category_name, category_codes in categories.items():
#if set(diagnosis).issubset(set(category_codes)):
# if any of the diagnosis codes is in the category_codes
if any(i in category_codes for i in diagnosis):
# Increment the counter for the category
#diag_dict[category_name] += 1
# Add record to the category
diag_dict[category_name].append(record)
break
# Increment the counter
counter += 1
counter_bool = counter >= max_counter
# Break the loop if we have read max_counter records
if counter % 100 == 0:
print(f"Read {counter} records")
if counter_bool:
break
except Exception as e:
print(f"Failed to read record {record_name} due to ValueError")
if counter_bool:
break
if counter_bool:
break
"""
if any(i in category_codes for i in diagnosis):
ID: SB, Count: 16559
ID: AFIB, Count: 9839
ID: GSVT, Count: 948
ID: SR, Count: 9720
break
Der Counter gibt an ob eine Diagnose in einer Kategorie ist
---------------------------------------------------------------------------------------------------------------------
set(diagnosis).issubset(set(category_codes)):
ID: SB, Count: 8909
ID: AFIB, Count: 1905
ID: GSVT, Count: 431
ID: SR, Count: 7299
break
Der Counter gibt an ob alle Diagnosen in einer Kategorie sind
"""
# for id, count in diag_dict.items():
# print(f"ID: {id}, Count: {count}")
# write to pickle
for cat_name, records in diag_dict.items():
print(f"Writing {cat_name} to pickle with {len(records)} records")
# if path not exists create it
if not os.path.exists('./data'):
os.makedirs('./data')
with open(f'./data/{cat_name}.pkl', 'wb') as f:
pickle.dump(records, f)