""" Embedding-Modul für Textdokumente mit Datenbank-Integration Unterstützt PDF, TXT, DOCX und andere Textformate Speichert Embeddings in Datenbank oder als Fallback in Datei Kommuniziert mit HNSW-Modul über localhost """ import os import pickle import numpy as np import requests import json from typing import List, Dict, Tuple, Optional from pathlib import Path from dataclasses import dataclass, asdict from datetime import datetime # Text-Extraktion import PyPDF2 from docx import Document # Embedding-Modelle from sentence_transformers import SentenceTransformer @dataclass class DocumentChunk: """Repräsentiert einen Text-Chunk mit Metadaten""" text: str embedding: np.ndarray source_file: str chunk_index: int metadata: Dict = None def to_dict(self) -> Dict: """Konvertiert zu Dictionary für Speicherung/API""" return { 'text': self.text, 'embedding': self.embedding.tolist(), # NumPy → Liste für JSON 'source_file': self.source_file, 'chunk_index': self.chunk_index, 'metadata': self.metadata } class StorageManager: """Verwaltet Speicherung von Embeddings (Datenbank oder Datei)""" def __init__(self, storage_dir: str = "./embeddings_storage"): """ Initialisiert Storage Manager Args: storage_dir: Verzeichnis für Datei-Fallback """ self.storage_dir = Path(storage_dir) self.storage_dir.mkdir(exist_ok=True) self.db_available = self._check_database_connection() def _check_database_connection(self) -> bool: """ Prüft, ob Datenbank-Verbindung verfügbar ist Returns: True wenn verbunden, False sonst """ # TODO: Hier Datenbank-Verbindung prüfen # Beispiel für PostgreSQL/MySQL: # try: # import psycopg2 # conn = psycopg2.connect(...) # return True # except: # return False print("⚠️ Keine Datenbank konfiguriert - verwende Datei-Speicherung") return False def save_to_database(self, chunks: List[DocumentChunk]) -> bool: """ Speichert Chunks in Datenbank Args: chunks: Liste von DocumentChunk-Objekten Returns: True bei Erfolg, False bei Fehler """ if not self.db_available: return False try: # TODO: Implementiere Datenbank-Speicherung # Beispiel: # for chunk in chunks: # cursor.execute( # "INSERT INTO embeddings (text, vector, source, chunk_index) VALUES (%s, %s, %s, %s)", # (chunk.text, chunk.embedding.tolist(), chunk.source_file, chunk.chunk_index) # ) # conn.commit() print(f"✓ {len(chunks)} Chunks in Datenbank gespeichert") return True except Exception as e: print(f"✗ Datenbank-Fehler: {e}") return False def save_to_file(self, chunks: List[DocumentChunk], filename: Optional[str] = None) -> str: """ Speichert Chunks in Datei (Fallback) Args: chunks: Liste von DocumentChunk-Objekten filename: Optional - Dateiname, sonst timestamp-basiert Returns: Pfad zur gespeicherten Datei """ if filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") source = Path(chunks[0].source_file).stem filename = f"embeddings_{source}_{timestamp}.pkl" filepath = self.storage_dir / filename with open(filepath, 'wb') as f: pickle.dump(chunks, f) print(f"✓ {len(chunks)} Chunks gespeichert in: {filepath}") return str(filepath) def save(self, chunks: List[DocumentChunk]) -> Dict[str, any]: """ Speichert Chunks - versucht erst Datenbank, dann Datei Args: chunks: Liste von DocumentChunk-Objekten Returns: Dictionary mit Status und Speicherort """ result = { 'success': False, 'storage_type': None, 'location': None, 'chunks_count': len(chunks) } # Versuch 1: Datenbank if self.db_available: if self.save_to_database(chunks): result['success'] = True result['storage_type'] = 'database' result['location'] = 'database' return result # Versuch 2: Datei (Fallback) filepath = self.save_to_file(chunks) result['success'] = True result['storage_type'] = 'file' result['location'] = filepath return result def load_from_file(self, filepath: str) -> List[DocumentChunk]: """ Lädt Chunks aus Datei Args: filepath: Pfad zur Datei Returns: Liste von DocumentChunk-Objekten """ with open(filepath, 'rb') as f: chunks = pickle.load(f) print(f"✓ {len(chunks)} Chunks geladen aus: {filepath}") return chunks class HNSWConnector: """Verbindung zum HNSW-Modul über localhost""" def __init__(self, host: str = "localhost", port: int = 8000): """ Initialisiert HNSW-Connector Args: host: Hostname des HNSW-Servers port: Port des HNSW-Servers """ self.base_url = f"http://{host}:{port}" self.connected = self._check_connection() def _check_connection(self) -> bool: """ Prüft Verbindung zum HNSW-Modul Returns: True wenn verbunden, False sonst """ try: response = requests.get(f"{self.base_url}/health", timeout=2) if response.status_code == 200: print(f"✓ HNSW-Modul erreichbar: {self.base_url}") return True except requests.exceptions.RequestException: pass print(f"✗ HNSW-Modul nicht erreichbar: {self.base_url}") return False def send_embeddings(self, chunks: List[DocumentChunk]) -> Dict[str, any]: """ Sendet Embeddings an HNSW-Modul Args: chunks: Liste von DocumentChunk-Objekten Returns: Dictionary mit Status und Antwort """ if not self.connected: return { 'success': False, 'error': 'HNSW-Modul nicht erreichbar', 'message': f'Keine Verbindung zu {self.base_url}' } try: # Konvertiere Chunks zu JSON-Format payload = { 'chunks': [chunk.to_dict() for chunk in chunks], 'timestamp': datetime.now().isoformat(), 'source_file': chunks[0].source_file } # Sende POST-Request response = requests.post( f"{self.base_url}/api/embeddings/add", json=payload, timeout=30 ) if response.status_code == 200: result = response.json() print(f"✓ {len(chunks)} Chunks an HNSW-Modul gesendet") return { 'success': True, 'response': result } else: return { 'success': False, 'error': f'HTTP {response.status_code}', 'message': response.text } except requests.exceptions.Timeout: return { 'success': False, 'error': 'Timeout', 'message': 'HNSW-Modul antwortet nicht (Timeout)' } except Exception as e: return { 'success': False, 'error': 'Exception', 'message': str(e) } def retry_connection(self, max_retries: int = 3) -> bool: """ Versucht erneut, Verbindung herzustellen Args: max_retries: Maximale Anzahl Versuche Returns: True wenn verbunden, False sonst """ print(f"Versuche Verbindung zu HNSW-Modul ({max_retries} Versuche)...") for i in range(max_retries): print(f" Versuch {i+1}/{max_retries}...") if self._check_connection(): self.connected = True return True if i < max_retries - 1: import time time.sleep(2) self.connected = False return False class TextExtractor: """Extrahiert Text aus verschiedenen Dateiformaten""" @staticmethod def extract_from_pdf(filepath: str) -> str: text = "" try: with open(filepath, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page in pdf_reader.pages: text += page.extract_text() + "\n" except Exception as e: raise Exception(f"Fehler beim Lesen der PDF: {e}") return text.strip() @staticmethod def extract_from_txt(filepath: str, encoding: str = 'utf-8') -> str: try: with open(filepath, 'r', encoding=encoding) as file: return file.read() except UnicodeDecodeError: for enc in ['latin-1', 'cp1252', 'iso-8859-1']: try: with open(filepath, 'r', encoding=enc) as file: return file.read() except: continue raise Exception("Konnte Datei mit keinem Encoding lesen") @staticmethod def extract_from_docx(filepath: str) -> str: try: doc = Document(filepath) text = "\n".join([paragraph.text for paragraph in doc.paragraphs]) return text.strip() except Exception as e: raise Exception(f"Fehler beim Lesen der DOCX: {e}") @staticmethod def extract_text(filepath: str) -> str: file_ext = Path(filepath).suffix.lower() if file_ext == '.pdf': return TextExtractor.extract_from_pdf(filepath) elif file_ext == '.txt': return TextExtractor.extract_from_txt(filepath) elif file_ext in ['.docx', '.doc']: return TextExtractor.extract_from_docx(filepath) else: raise ValueError(f"Nicht unterstütztes Dateiformat: {file_ext}") class TextChunker: """Teilt Text in sinnvolle Chunks auf""" @staticmethod def chunk_by_tokens(text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]: words = text.split() chunks = [] for i in range(0, len(words), chunk_size - overlap): chunk = ' '.join(words[i:i + chunk_size]) if chunk.strip(): chunks.append(chunk) return chunks @staticmethod def chunk_by_sentences(text: str, sentences_per_chunk: int = 5) -> List[str]: sentences = text.replace('!', '.').replace('?', '.').split('.') sentences = [s.strip() for s in sentences if s.strip()] chunks = [] for i in range(0, len(sentences), sentences_per_chunk): chunk = '. '.join(sentences[i:i + sentences_per_chunk]) if chunk.strip(): chunks.append(chunk + '.') return chunks class EmbeddingGenerator: """Erzeugt Embeddings aus Text-Chunks""" def __init__(self, model_name: str = 'all-MiniLM-L6-v2'): print(f"Lade Embedding-Modell: {model_name}") self.model = SentenceTransformer(model_name) self.embedding_dimension = self.model.get_sentence_embedding_dimension() print(f"Modell geladen. Embedding-Dimension: {self.embedding_dimension}") def generate_embeddings(self, texts: List[str], batch_size: int = 32) -> np.ndarray: print(f"Erzeuge Embeddings für {len(texts)} Text-Chunks...") embeddings = self.model.encode( texts, batch_size=batch_size, show_progress_bar=True, convert_to_numpy=True ) return embeddings class DocumentEmbedder: """Hauptklasse für das Embedding von Dokumenten mit Storage & HNSW-Integration""" def __init__( self, model_name: str = 'all-MiniLM-L6-v2', storage_dir: str = "./embeddings_storage", hnsw_host: str = "localhost", hnsw_port: int = 8000 ): """ Initialisiert den Document Embedder mit Storage und HNSW-Anbindung Args: model_name: Name des Embedding-Modells storage_dir: Verzeichnis für Datei-Speicherung hnsw_host: Hostname des HNSW-Servers hnsw_port: Port des HNSW-Servers """ self.text_extractor = TextExtractor() self.chunker = TextChunker() self.embedding_generator = EmbeddingGenerator(model_name) self.storage_manager = StorageManager(storage_dir) self.hnsw_connector = HNSWConnector(hnsw_host, hnsw_port) def process_file( self, filepath: str, chunk_method: str = 'tokens', chunk_size: int = 500, overlap: int = 50, auto_store: bool = True, send_to_hnsw: bool = True ) -> Dict[str, any]: """ Verarbeitet eine Datei komplett: Embedding → Storage → HNSW Args: filepath: Pfad zur Datei chunk_method: 'tokens' oder 'sentences' chunk_size: Größe der Chunks overlap: Überlappung zwischen Chunks auto_store: Automatisch speichern nach Embedding send_to_hnsw: An HNSW-Modul senden nach Speicherung Returns: Dictionary mit Status aller Schritte """ print(f"\n{'='*60}") print(f"Verarbeite Datei: {filepath}") print(f"{'='*60}") result = { 'file': filepath, 'embedding': {'success': False}, 'storage': {'success': False}, 'hnsw': {'success': False} } try: # 1. Text extrahieren print("1. Extrahiere Text...") text = self.text_extractor.extract_text(filepath) print(f" → {len(text)} Zeichen extrahiert") # 2. Text in Chunks aufteilen print("2. Teile Text in Chunks...") if chunk_method == 'tokens': chunks_text = self.chunker.chunk_by_tokens(text, chunk_size, overlap) else: chunks_text = self.chunker.chunk_by_sentences(text, chunk_size) print(f" → {len(chunks_text)} Chunks erstellt") # 3. Embeddings erzeugen print("3. Erzeuge Embeddings...") embeddings = self.embedding_generator.generate_embeddings(chunks_text) # 4. DocumentChunk-Objekte erstellen document_chunks = [] for i, (chunk_text, embedding) in enumerate(zip(chunks_text, embeddings)): doc_chunk = DocumentChunk( text=chunk_text, embedding=embedding, source_file=filepath, chunk_index=i, metadata={ 'total_chunks': len(chunks_text), 'chunk_method': chunk_method, 'file_size': os.path.getsize(filepath) } ) document_chunks.append(doc_chunk) result['embedding'] = { 'success': True, 'chunks_count': len(document_chunks) } print(f"✓ Embedding abgeschlossen: {len(document_chunks)} Chunks") # 5. Speichern (wenn gewünscht) if auto_store: print("\n4. Speichere Embeddings...") storage_result = self.storage_manager.save(document_chunks) result['storage'] = storage_result if not storage_result['success']: print("✗ Speicherung fehlgeschlagen") return result # 6. An HNSW senden (wenn gewünscht) if send_to_hnsw: print("\n5. Sende an HNSW-Modul...") hnsw_result = self.hnsw_connector.send_embeddings(document_chunks) result['hnsw'] = hnsw_result if not hnsw_result['success']: print(f"✗ HNSW-Fehler: {hnsw_result.get('message', 'Unbekannt')}") print("\nℹ️ Tipp: Starte das HNSW-Modul mit:") print(f" python hnsw_server.py --port {self.hnsw_connector.base_url.split(':')[-1]}") print(f"\n{'='*60}") print("✓ VERARBEITUNG ABGESCHLOSSEN") print(f"{'='*60}") self._print_summary(result) return result except Exception as e: print(f"\n✗ FEHLER: {e}") result['error'] = str(e) return result def _print_summary(self, result: Dict): """Gibt Zusammenfassung der Verarbeitung aus""" print("\n📊 ZUSAMMENFASSUNG:") print(f" • Embedding: {'✓' if result['embedding']['success'] else '✗'}") if result['storage']['success']: print(f" • Storage: ✓ ({result['storage']['storage_type']})") if result['storage']['storage_type'] == 'file': print(f" → {result['storage']['location']}") else: print(f" • Storage: ✗") if result['hnsw']['success']: print(f" • HNSW: ✓ Gesendet") else: print(f" • HNSW: ✗ {result['hnsw'].get('error', 'Fehler')}") # Beispiel-Verwendung if __name__ == "__main__": # Embedder initialisieren embedder = DocumentEmbedder( model_name='all-MiniLM-L6-v2', storage_dir='./embeddings_storage', hnsw_host='localhost', hnsw_port=8000 ) # Datei verarbeiten filepath = "beispiel_dokument.pdf" if os.path.exists(filepath): result = embedder.process_file( filepath=filepath, chunk_size=500, overlap=50, auto_store=True, # Automatisch speichern send_to_hnsw=True # An HNSW senden ) else: print(f"Datei nicht gefunden: {filepath}") print("\nErstelle Test-Datei...") with open("test.txt", "w", encoding="utf-8") as f: f.write("Das ist ein Beispieltext für das Embedding-Modul. " * 100) result = embedder.process_file( filepath="test.txt", auto_store=True, send_to_hnsw=True )