import wfdb import os import matplotlib.pyplot as plt import seaborn as sns import pandas as pd import numpy as np # Directories and file paths # -------------------------------------------------------------------------------- # Specify the directory where the WFDB records are stored project_dir = 'C:/Users/felix/OneDrive/Studium/Master MDS/1 Semester/DSA/physionet/large_12_ecg_data/a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0' data_dir = project_dir + '/WFDBRecords' path_diag_lookup = "C:/Users/felix/OneDrive/Studium/Master MDS/1 Semester/DSA/physionet/large_12_ecg_data/a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0/ConditionNames_SNOMED-CT.csv" #project_dir +'/ConditionNames_SNOMED-CT.csv' # -------------------------------------------------------------------------------- # print if project_dir exists if not os.path.exists("C:/Users/felix/OneDrive/Studium/Master MDS/1 Semester/DSA/physionet/large_12_ecg_data/a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0/ConditionNames_SNOMED-CT.csv"): print(f"Directory {path_diag_lookup} does not exist") def get_diagnosis_ids(record): # Get the diagnosis diagnosis = record.comments[2] # clean the diagnosis diagnosis = diagnosis.replace('Dx: ', '') list_diagnosis = [int(x.strip()) for x in diagnosis.split(',')] return list_diagnosis def get_diagnosis_name(diagnosis): # get the diagnosis name from the lookup table name = [diagnosis_lookup[diagnosis_lookup['Snomed_CT'] == x]['Full Name'].to_string(index=False) for x in diagnosis] return name def filter_signal_df_on_diag(df_dict, diagnosis_dict, filter_codes_df): # Create a list with filter codes and add 0 for padding filter_cod_li = list(filter_codes_df['Snomed_CT']) + [0] # Filter the diagnosis dictionary based on the filter codes filter_dict_diag = {k: v for k, v in diagnosis_dict.items() if all(i in filter_cod_li for i in v)} # Filter the df_dict based on the filtered_dict_diag filtered_df_dict = {i: df.loc[df.index.isin(filter_dict_diag.keys())] for i, df in df_dict.items()} return filtered_df_dict # -------------------------------------------------------------------------------- # Explore the data # -------------------------------------------------------------------------------- # Read the diagnosis lookup table diagnosis_lookup = pd.read_csv(path_diag_lookup) #print(diagnosis_lookup.head()) # Filter data based on the diagnosis # ---------------------------------------------- healthy_codes = [426177001, 426783006] categories = { 'Gesund': [426177001, 426783006], # '426177001', '426783006 'Herzrhythmusstörungen': [164890007, 427084000, 164889003, 426761007, 713422000, 427393009, 284470004, 17338001], 'Leitungsstörungen': [270492004, 233917008, 59118001, 164909002, 698252002], 'EKG-Welle': [164934002, 59931005, 428750005, 164917005, 429622005, 164930006, 164931005, 164912004, 164937009], 'Spannungsänderungen': [39732003, 47665007, 251146004, 251199005], 'Hypertrophien': [164873001, 89792004], 'QT': [111975006], 'Repolarisation': [428417006], 'Myokardinfarkt': [164865005] } diag_dict = {k: 0 for k in categories.keys()} # Create a counter for the number of records counter = 0 max_counter = 100_000#100_000 # Loop through the records for dir_th in os.listdir(data_dir): path_to_1000_records = data_dir + '/' + dir_th for dir_hd in os.listdir(path_to_1000_records): path_to_100_records = path_to_1000_records + '/' + dir_hd for record_name in os.listdir(path_to_100_records): # check if .hea is in the record_name if '.hea' not in record_name: continue # Remove the .hea extension from record_name record_name = record_name.replace('.hea', '') try: # Read the record record = wfdb.rdrecord(path_to_100_records + '/' + record_name) # Get the diagnosis diagnosis = np.array(get_diagnosis_ids(record)) # check if diagnosis is a subset of one of the categories for category_name, category_codes in categories.items(): if set(diagnosis).issubset(set(category_codes)): # Increment the counter for the category diag_dict[category_name] += 1 break # Increment the counter counter += 1 counter_bool = counter >= max_counter # Break the loop if we have read max_counter records if counter % 100 == 0: print(f"Read {counter} records") if counter_bool: break except Exception as e: print(f"Failed to read record {record_name} due to ValueError") if counter_bool: break if counter_bool: break """ ID: Herzrhythmusstörungen, Count: 22571 ID: Leitungsstörungen, Count: 505 ID: EKG-Welle, Count: 2067 ID: Spannungsänderungen, Count: 613 ID: Hypertrophien, Count: 5 ID: QT, Count: 43 ID: Repolarisation, Count: 73 ID: Myokardinfarkt, Count: 1 """ # # get the data # dict_healthy, dict_afib, dict_mi = get_diag_filtered_data_dict() # # get unique diagnosis codes # unique_health_codes = np.unique(np.array([np.array(get_diagnosis_ids(d)) for d in dict_healthy.values()]).flatten()) # unique_afib_codes = np.unique(np.array([np.array(get_diagnosis_ids(d)) for d in dict_afib.values()]).flatten()) # unique_mi_codes = np.unique(np.array([np.array(get_diagnosis_ids(d)) for d in dict_mi.values()]).flatten()) # print(unique_health_codes) # print(unique_afib_codes) # print(unique_mi_codes) # print(dict_healthy['JS00004'].__dict__) #print(diag_dict) for id, count in diag_dict.items(): print(f"ID: {id}, Count: {count}") print(f'Number of counter diagnoses: {len(diag_dict)}') print(f'Number of diagnoses in the lookup table: {len(diagnosis_lookup)}') print('found in the lookup table: ', len(diag_dict) == len(diagnosis_lookup)) # flatten the counters and count the unique values # healthy_counter = np.array(healthy_counter).flatten() # afib_counter = np.array(afib_counter).flatten() # mi_counter = np.array(mi_counter).flatten() # unique_health_codes, counts_health = np.unique(healthy_counter, return_counts=True) # unique_afib_codes, counts_afib = np.unique(afib_counter, return_counts=True) # unique_mi_codes, counts_mi = np.unique(mi_counter, return_counts=True) # print(unique_health_codes) # print(counts_health) # print(unique_afib_codes) # print(counts_afib) # print(unique_mi_codes) # print(counts_mi) # # get the names of the diagnosis # names_health = get_diagnosis_name(unique_health_codes) # names_afib = get_diagnosis_name(unique_afib_codes) # names_mi = get_diagnosis_name(unique_mi_codes)