from flask import Flask, request, jsonify import numpy as np import faiss from typing import List, Dict, Optional class FAISSSearch: """ HNSW-basierte Vektorsuche mit FAISS Schnelle approximate nearest neighbor search """ def __init__(self, dim: int, metric: str = 'cosine', max_elements: int = 10000): """ Initialisiert FAISS Index Args: dim: Vektor-Dimension metric: 'cosine', 'l2' (euclidean), oder 'ip' (inner product) max_elements: Maximale Anzahl von Vektoren (für HNSW) """ self.dim = dim self.metric = metric self.max_elements = max_elements self.index = None self.metadata_store = {} # Speichert Text und Metadata self.current_id = 0 self.is_initialized = False self.id_map = [] # Mapping von FAISS Index Position zu Custom ID def init_index(self, M: int = 32, ef_construction: int = 200, ef_search: int = 50): """ Erstellt neuen FAISS HNSW Index Args: M: Anzahl der Verbindungen pro Knoten (32 = standard, höher = besser) ef_construction: Suchbereich beim Index-Aufbau (200 = standard) ef_search: Standard-Suchbereich für Queries (50 = standard) """ # Erstelle HNSW Index basierend auf Metrik if self.metric == 'cosine': # Für Cosine: verwende IP (Inner Product) mit normalisierten Vektoren self.index = faiss.IndexHNSWFlat(self.dim, M, faiss.METRIC_INNER_PRODUCT) self.normalize = True elif self.metric == 'l2': # Für L2 (Euclidean) self.index = faiss.IndexHNSWFlat(self.dim, M, faiss.METRIC_L2) self.normalize = False elif self.metric == 'ip': # Inner Product ohne Normalisierung self.index = faiss.IndexHNSWFlat(self.dim, M, faiss.METRIC_INNER_PRODUCT) self.normalize = False else: raise ValueError(f"Unbekannte Metrik: {self.metric}") # Setze HNSW Parameter self.index.hnsw.efConstruction = ef_construction self.index.hnsw.efSearch = ef_search self.is_initialized = True def add_items(self, vectors: List[List[float]], texts: List[str] = None, metadata: List[Dict] = None, ids: List[int] = None): """ Fügt Vektoren zum Index hinzu Args: vectors: Liste von Vektoren texts: Optionale Texte zu den Vektoren metadata: Optionale Metadata zu den Vektoren ids: Optionale IDs (werden automatisch vergeben wenn None) """ if not self.is_initialized: raise RuntimeError("Index muss zuerst initialisiert werden (init_index)") vectors_np = np.array(vectors, dtype=np.float32) # Prüfe Dimensionen if vectors_np.shape[1] != self.dim: raise ValueError( f"Vektor-Dimension {vectors_np.shape[1]} stimmt nicht mit Index-Dimension {self.dim} überein" ) # Normalisiere Vektoren für Cosine Similarity if self.normalize: faiss.normalize_L2(vectors_np) # Generiere IDs falls nicht vorhanden if ids is None: ids = list(range(self.current_id, self.current_id + len(vectors))) self.current_id += len(vectors) # Speichere ID-Mapping (FAISS Position -> Custom ID) start_idx = self.index.ntotal for i, custom_id in enumerate(ids): self.id_map.append(custom_id) # Speichere Metadata self.metadata_store[custom_id] = { 'id': custom_id, 'text': texts[i] if texts and i < len(texts) else '', 'metadata': metadata[i] if metadata and i < len(metadata) else {} } # Füge Vektoren zum FAISS Index hinzu self.index.add(vectors_np) def search(self, query_vector: List[float], k: int = 5, ef_search: int = None) -> List[Dict]: """ Sucht die k ähnlichsten Vektoren Args: query_vector: Suchvektor k: Anzahl der Ergebnisse ef_search: Suchbereich (höher = genauer aber langsamer, None = use default) Returns: Liste von Ergebnis-Dictionaries mit id, score, text, metadata """ if not self.is_initialized: raise RuntimeError("Index muss zuerst initialisiert werden") if self.index.ntotal == 0: return [] # Setze ef_search falls angegeben if ef_search is not None: self.index.hnsw.efSearch = max(ef_search, k) query_np = np.array([query_vector], dtype=np.float32) # Normalisiere Query-Vektor für Cosine if self.normalize: faiss.normalize_L2(query_np) # Suche durchführen k_actual = min(k, self.index.ntotal) # Nicht mehr als verfügbar distances, indices = self.index.search(query_np, k_actual) # Ergebnisse zusammenstellen results = [] for idx, distance in zip(indices[0], distances[0]): if idx == -1: # FAISS gibt -1 zurück wenn keine weiteren Ergebnisse continue # Hole Custom ID aus Mapping custom_id = self.id_map[idx] # Konvertiere Distance zu Score if self.metric == 'cosine' or self.metric == 'ip': # Bei Inner Product/Cosine: höher = ähnlicher score = float(distance) else: # l2 # Bei L2: kleinere Distance = ähnlicher score = float(1.0 / (1.0 + distance)) result = { 'id': int(custom_id), 'score': score, 'distance': float(distance), **self.metadata_store.get(custom_id, {}) } results.append(result) return results def get_stats(self) -> Dict: """Gibt Statistiken über den Index zurück mit umfangreichen Prüfungen""" # Basis-Validierung: Objekt vollständig initialisiert? if not hasattr(self, 'dim') or not hasattr(self, 'metric'): return { 'initialized': False, 'error': 'Objekt nicht vollständig initialisiert' } # Prüfe ob Index initialisiert wurde if not self.is_initialized or self.index is None: return { 'initialized': False, 'dimension': self.dim, 'metric': self.metric } # Basis-Stats sammeln stats = { 'initialized': True, 'dimension': self.dim, 'metric': self.metric, 'max_elements': self.max_elements } # Sichere Abfrage: Anzahl Vektoren in FAISS try: stats['current_count'] = self.index.ntotal except (AttributeError, RuntimeError) as e: stats['current_count'] = 0 stats['warning'] = f'Konnte current_count nicht abrufen: {str(e)}' # Sichere Abfrage: Metadata Count try: stats['metadata_count'] = len(self.metadata_store) except (AttributeError, TypeError): stats['metadata_count'] = 0 # Konsistenz-Check: FAISS vs Metadata if 'current_count' in stats and 'metadata_count' in stats: if stats['current_count'] != stats['metadata_count']: stats['consistency_warning'] = ( f"Inkonsistenz erkannt: {stats['current_count']} Vektoren in FAISS, " f"aber {stats['metadata_count']} Einträge in Metadata-Store" ) # HNSW-spezifische Parameter (mit Fehlerbehandlung) try: if hasattr(self.index, 'hnsw') and self.index.hnsw is not None: stats['M'] = self.index.hnsw.M stats['ef_construction'] = self.index.hnsw.efConstruction stats['ef_search'] = self.index.hnsw.efSearch else: stats['M'] = 'N/A' stats['ef_construction'] = 'N/A' stats['ef_search'] = 'N/A' except (AttributeError, RuntimeError) as e: stats['M'] = 'N/A' stats['ef_construction'] = 'N/A' stats['ef_search'] = 'N/A' stats['hnsw_warning'] = f'HNSW-Parameter nicht verfügbar: {str(e)}' # Speicher-Schätzung (Vektoren im RAM) try: if stats['current_count'] > 0: # float32 = 4 bytes pro Dimension vector_memory_mb = (stats['current_count'] * self.dim * 4) / (1024 * 1024) stats['estimated_memory_mb'] = round(vector_memory_mb, 2) except (KeyError, ZeroDivisionError, TypeError): stats['estimated_memory_mb'] = 'N/A' # Kapazitäts-Warnung try: if stats['current_count'] >= self.max_elements * 0.9: stats['capacity_warning'] = ( f"Index ist zu {(stats['current_count'] / self.max_elements * 100):.1f}% gefüllt. " f"Bald Kapazitätsgrenze erreicht!" ) except (KeyError, ZeroDivisionError, TypeError): pass return stats # Flask App app = Flask(__name__) # Globaler FAISS Index faiss_index: Optional[FAISSSearch] = None @app.route('/health', methods=['GET']) def health(): """Health Check Endpoint""" return jsonify({ 'status': 'ok', 'service': 'FAISS HNSW Search API', 'version': '3.0', 'library': 'FAISS', 'index_initialized': faiss_index is not None and faiss_index.is_initialized }) @app.route('/init', methods=['POST']) def init_index(): """ Initialisiert einen neuen FAISS HNSW Index Body: { "dim": 128, "metric": "cosine", // optional: "cosine", "l2", "ip" "max_elements": 10000, // optional (für Info) "M": 32, // optional (Standard: 32) "ef_construction": 200, // optional "ef_search": 50 // optional } """ global faiss_index try: data = request.get_json() if not data or 'dim' not in data: return jsonify({ 'success': False, 'error': 'Feld "dim" (Dimension) ist erforderlich' }), 400 dim = data['dim'] metric = data.get('metric', 'cosine') max_elements = data.get('max_elements', 10000) M = data.get('M', 32) ef_construction = data.get('ef_construction', 200) ef_search = data.get('ef_search', 50) # Erstelle neuen Index faiss_index = FAISSSearch(dim=dim, metric=metric, max_elements=max_elements) faiss_index.init_index(M=M, ef_construction=ef_construction, ef_search=ef_search) return jsonify({ 'success': True, 'message': 'FAISS HNSW Index erfolgreich initialisiert', 'stats': faiss_index.get_stats() }), 200 except Exception as e: return jsonify({ 'success': False, 'error': f'Fehler beim Initialisieren: {str(e)}' }), 500 @app.route('/add', methods=['POST']) def add_vectors(): """ Fügt Vektoren zum Index hinzu Body: { "vectors": [[0.1, 0.2, ...], [0.3, 0.4, ...]], "texts": ["Text 1", "Text 2"], // optional "metadata": [{"key": "value"}, {}], // optional "ids": [1, 2] // optional } """ global faiss_index try: if faiss_index is None or not faiss_index.is_initialized: return jsonify({ 'success': False, 'error': 'Index muss zuerst initialisiert werden (/init)' }), 400 data = request.get_json() if not data or 'vectors' not in data: return jsonify({ 'success': False, 'error': 'Feld "vectors" ist erforderlich' }), 400 vectors = data['vectors'] texts = data.get('texts', None) metadata = data.get('metadata', None) ids = data.get('ids', None) if not isinstance(vectors, list) or len(vectors) == 0: return jsonify({ 'success': False, 'error': 'vectors muss eine nicht-leere Liste sein' }), 400 # Füge Vektoren hinzu faiss_index.add_items(vectors=vectors, texts=texts, metadata=metadata, ids=ids) return jsonify({ 'success': True, 'message': f'{len(vectors)} Vektoren erfolgreich hinzugefügt', 'stats': faiss_index.get_stats() }), 200 except Exception as e: return jsonify({ 'success': False, 'error': f'Fehler beim Hinzufügen: {str(e)}' }), 500 @app.route('/search', methods=['POST']) def search_vectors(): """ Sucht ähnliche Vektoren im Index Body: { "query_vector": [0.1, 0.2, ...], "k": 5, // optional, default: 5 "ef_search": 50 // optional (Suchgenauigkeit) } """ global faiss_index try: if faiss_index is None or not faiss_index.is_initialized: return jsonify({ 'success': False, 'error': 'Index muss zuerst initialisiert werden (/init)' }), 400 data = request.get_json() if not data or 'query_vector' not in data: return jsonify({ 'success': False, 'error': 'Feld "query_vector" ist erforderlich' }), 400 query_vector = data['query_vector'] k = data.get('k', 5) ef_search = data.get('ef_search', None) if not isinstance(query_vector, list): return jsonify({ 'success': False, 'error': 'query_vector muss eine Liste sein' }), 400 # Suche durchführen results = faiss_index.search(query_vector=query_vector, k=k, ef_search=ef_search) return jsonify({ 'success': True, 'query_vector_dim': len(query_vector), 'k': k, 'results_count': len(results), 'results': results }), 200 except Exception as e: return jsonify({ 'success': False, 'error': f'Fehler bei der Suche: {str(e)}' }), 500 @app.route('/stats', methods=['GET']) def get_stats(): """ Gibt Statistiken über den Index zurück (mit umfassender Fehlerbehandlung) """ try: global faiss_index # Prüfe ob Index-Objekt existiert if faiss_index is None: return jsonify({ 'success': True, 'stats': { 'initialized': False, 'message': 'Index wurde noch nicht erstellt. Bitte /init aufrufen.' } }), 200 # Hole Stats mit integrierter Fehlerbehandlung stats = faiss_index.get_stats() # Prüfe ob Stats-Abruf Fehler enthält if 'error' in stats: return jsonify({ 'success': False, 'error': stats['error'], 'stats': stats }), 500 return jsonify({ 'success': True, 'stats': stats }), 200 except AttributeError as e: return jsonify({ 'success': False, 'error': f'Attributfehler beim Abrufen der Statistiken: {str(e)}', 'stats': {'initialized': False} }), 500 except RuntimeError as e: return jsonify({ 'success': False, 'error': f'Laufzeitfehler beim Abrufen der Statistiken: {str(e)}', 'stats': {'initialized': False} }), 500 except Exception as e: return jsonify({ 'success': False, 'error': f'Unerwarteter Fehler beim Abrufen der Statistiken: {str(e)}', 'stats': {'initialized': False} }), 500 if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5000)