491 lines
16 KiB
Python
491 lines
16 KiB
Python
from flask import Flask, request, jsonify
|
|
import numpy as np
|
|
import faiss
|
|
from typing import List, Dict, Optional
|
|
|
|
class FAISSSearch:
|
|
"""
|
|
HNSW-basierte Vektorsuche mit FAISS
|
|
Schnelle approximate nearest neighbor search
|
|
"""
|
|
|
|
def __init__(self, dim: int, metric: str = 'cosine', max_elements: int = 10000):
|
|
"""
|
|
Initialisiert FAISS Index
|
|
|
|
Args:
|
|
dim: Vektor-Dimension
|
|
metric: 'cosine', 'l2' (euclidean), oder 'ip' (inner product)
|
|
max_elements: Maximale Anzahl von Vektoren (für HNSW)
|
|
"""
|
|
self.dim = dim
|
|
self.metric = metric
|
|
self.max_elements = max_elements
|
|
self.index = None
|
|
self.metadata_store = {} # Speichert Text und Metadata
|
|
self.current_id = 0
|
|
self.is_initialized = False
|
|
self.id_map = [] # Mapping von FAISS Index Position zu Custom ID
|
|
|
|
def init_index(self, M: int = 32, ef_construction: int = 200, ef_search: int = 50):
|
|
"""
|
|
Erstellt neuen FAISS HNSW Index
|
|
|
|
Args:
|
|
M: Anzahl der Verbindungen pro Knoten (32 = standard, höher = besser)
|
|
ef_construction: Suchbereich beim Index-Aufbau (200 = standard)
|
|
ef_search: Standard-Suchbereich für Queries (50 = standard)
|
|
"""
|
|
# Erstelle HNSW Index basierend auf Metrik
|
|
if self.metric == 'cosine':
|
|
# Für Cosine: verwende IP (Inner Product) mit normalisierten Vektoren
|
|
self.index = faiss.IndexHNSWFlat(self.dim, M, faiss.METRIC_INNER_PRODUCT)
|
|
self.normalize = True
|
|
elif self.metric == 'l2':
|
|
# Für L2 (Euclidean)
|
|
self.index = faiss.IndexHNSWFlat(self.dim, M, faiss.METRIC_L2)
|
|
self.normalize = False
|
|
elif self.metric == 'ip':
|
|
# Inner Product ohne Normalisierung
|
|
self.index = faiss.IndexHNSWFlat(self.dim, M, faiss.METRIC_INNER_PRODUCT)
|
|
self.normalize = False
|
|
else:
|
|
raise ValueError(f"Unbekannte Metrik: {self.metric}")
|
|
|
|
# Setze HNSW Parameter
|
|
self.index.hnsw.efConstruction = ef_construction
|
|
self.index.hnsw.efSearch = ef_search
|
|
|
|
self.is_initialized = True
|
|
|
|
def add_items(self, vectors: List[List[float]], texts: List[str] = None,
|
|
metadata: List[Dict] = None, ids: List[int] = None):
|
|
"""
|
|
Fügt Vektoren zum Index hinzu
|
|
|
|
Args:
|
|
vectors: Liste von Vektoren
|
|
texts: Optionale Texte zu den Vektoren
|
|
metadata: Optionale Metadata zu den Vektoren
|
|
ids: Optionale IDs (werden automatisch vergeben wenn None)
|
|
"""
|
|
if not self.is_initialized:
|
|
raise RuntimeError("Index muss zuerst initialisiert werden (init_index)")
|
|
|
|
vectors_np = np.array(vectors, dtype=np.float32)
|
|
|
|
# Prüfe Dimensionen
|
|
if vectors_np.shape[1] != self.dim:
|
|
raise ValueError(
|
|
f"Vektor-Dimension {vectors_np.shape[1]} stimmt nicht mit Index-Dimension {self.dim} überein"
|
|
)
|
|
|
|
# Normalisiere Vektoren für Cosine Similarity
|
|
if self.normalize:
|
|
faiss.normalize_L2(vectors_np)
|
|
|
|
# Generiere IDs falls nicht vorhanden
|
|
if ids is None:
|
|
ids = list(range(self.current_id, self.current_id + len(vectors)))
|
|
self.current_id += len(vectors)
|
|
|
|
# Speichere ID-Mapping (FAISS Position -> Custom ID)
|
|
start_idx = self.index.ntotal
|
|
for i, custom_id in enumerate(ids):
|
|
self.id_map.append(custom_id)
|
|
|
|
# Speichere Metadata
|
|
self.metadata_store[custom_id] = {
|
|
'id': custom_id,
|
|
'text': texts[i] if texts and i < len(texts) else '',
|
|
'metadata': metadata[i] if metadata and i < len(metadata) else {}
|
|
}
|
|
|
|
# Füge Vektoren zum FAISS Index hinzu
|
|
self.index.add(vectors_np)
|
|
|
|
def search(self, query_vector: List[float], k: int = 5, ef_search: int = None) -> List[Dict]:
|
|
"""
|
|
Sucht die k ähnlichsten Vektoren
|
|
|
|
Args:
|
|
query_vector: Suchvektor
|
|
k: Anzahl der Ergebnisse
|
|
ef_search: Suchbereich (höher = genauer aber langsamer, None = use default)
|
|
|
|
Returns:
|
|
Liste von Ergebnis-Dictionaries mit id, score, text, metadata
|
|
"""
|
|
if not self.is_initialized:
|
|
raise RuntimeError("Index muss zuerst initialisiert werden")
|
|
|
|
if self.index.ntotal == 0:
|
|
return []
|
|
|
|
# Setze ef_search falls angegeben
|
|
if ef_search is not None:
|
|
self.index.hnsw.efSearch = max(ef_search, k)
|
|
|
|
query_np = np.array([query_vector], dtype=np.float32)
|
|
|
|
# Normalisiere Query-Vektor für Cosine
|
|
if self.normalize:
|
|
faiss.normalize_L2(query_np)
|
|
|
|
# Suche durchführen
|
|
k_actual = min(k, self.index.ntotal) # Nicht mehr als verfügbar
|
|
distances, indices = self.index.search(query_np, k_actual)
|
|
|
|
# Ergebnisse zusammenstellen
|
|
results = []
|
|
for idx, distance in zip(indices[0], distances[0]):
|
|
if idx == -1: # FAISS gibt -1 zurück wenn keine weiteren Ergebnisse
|
|
continue
|
|
|
|
# Hole Custom ID aus Mapping
|
|
custom_id = self.id_map[idx]
|
|
|
|
# Konvertiere Distance zu Score
|
|
if self.metric == 'cosine' or self.metric == 'ip':
|
|
# Bei Inner Product/Cosine: höher = ähnlicher
|
|
score = float(distance)
|
|
else: # l2
|
|
# Bei L2: kleinere Distance = ähnlicher
|
|
score = float(1.0 / (1.0 + distance))
|
|
|
|
result = {
|
|
'id': int(custom_id),
|
|
'score': score,
|
|
'distance': float(distance),
|
|
**self.metadata_store.get(custom_id, {})
|
|
}
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
def get_stats(self) -> Dict:
|
|
"""Gibt Statistiken über den Index zurück mit umfangreichen Prüfungen"""
|
|
|
|
# Basis-Validierung: Objekt vollständig initialisiert?
|
|
if not hasattr(self, 'dim') or not hasattr(self, 'metric'):
|
|
return {
|
|
'initialized': False,
|
|
'error': 'Objekt nicht vollständig initialisiert'
|
|
}
|
|
|
|
# Prüfe ob Index initialisiert wurde
|
|
if not self.is_initialized or self.index is None:
|
|
return {
|
|
'initialized': False,
|
|
'dimension': self.dim,
|
|
'metric': self.metric
|
|
}
|
|
|
|
# Basis-Stats sammeln
|
|
stats = {
|
|
'initialized': True,
|
|
'dimension': self.dim,
|
|
'metric': self.metric,
|
|
'max_elements': self.max_elements
|
|
}
|
|
|
|
# Sichere Abfrage: Anzahl Vektoren in FAISS
|
|
try:
|
|
stats['current_count'] = self.index.ntotal
|
|
except (AttributeError, RuntimeError) as e:
|
|
stats['current_count'] = 0
|
|
stats['warning'] = f'Konnte current_count nicht abrufen: {str(e)}'
|
|
|
|
# Sichere Abfrage: Metadata Count
|
|
try:
|
|
stats['metadata_count'] = len(self.metadata_store)
|
|
except (AttributeError, TypeError):
|
|
stats['metadata_count'] = 0
|
|
|
|
# Konsistenz-Check: FAISS vs Metadata
|
|
if 'current_count' in stats and 'metadata_count' in stats:
|
|
if stats['current_count'] != stats['metadata_count']:
|
|
stats['consistency_warning'] = (
|
|
f"Inkonsistenz erkannt: {stats['current_count']} Vektoren in FAISS, "
|
|
f"aber {stats['metadata_count']} Einträge in Metadata-Store"
|
|
)
|
|
|
|
# HNSW-spezifische Parameter (mit Fehlerbehandlung)
|
|
try:
|
|
if hasattr(self.index, 'hnsw') and self.index.hnsw is not None:
|
|
stats['M'] = self.index.hnsw.M
|
|
stats['ef_construction'] = self.index.hnsw.efConstruction
|
|
stats['ef_search'] = self.index.hnsw.efSearch
|
|
else:
|
|
stats['M'] = 'N/A'
|
|
stats['ef_construction'] = 'N/A'
|
|
stats['ef_search'] = 'N/A'
|
|
except (AttributeError, RuntimeError) as e:
|
|
stats['M'] = 'N/A'
|
|
stats['ef_construction'] = 'N/A'
|
|
stats['ef_search'] = 'N/A'
|
|
stats['hnsw_warning'] = f'HNSW-Parameter nicht verfügbar: {str(e)}'
|
|
|
|
# Speicher-Schätzung (Vektoren im RAM)
|
|
try:
|
|
if stats['current_count'] > 0:
|
|
# float32 = 4 bytes pro Dimension
|
|
vector_memory_mb = (stats['current_count'] * self.dim * 4) / (1024 * 1024)
|
|
stats['estimated_memory_mb'] = round(vector_memory_mb, 2)
|
|
except (KeyError, ZeroDivisionError, TypeError):
|
|
stats['estimated_memory_mb'] = 'N/A'
|
|
|
|
# Kapazitäts-Warnung
|
|
try:
|
|
if stats['current_count'] >= self.max_elements * 0.9:
|
|
stats['capacity_warning'] = (
|
|
f"Index ist zu {(stats['current_count'] / self.max_elements * 100):.1f}% gefüllt. "
|
|
f"Bald Kapazitätsgrenze erreicht!"
|
|
)
|
|
except (KeyError, ZeroDivisionError, TypeError):
|
|
pass
|
|
|
|
return stats
|
|
|
|
|
|
# Flask App
|
|
app = Flask(__name__)
|
|
|
|
# Globaler FAISS Index
|
|
faiss_index: Optional[FAISSSearch] = None
|
|
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health():
|
|
"""Health Check Endpoint"""
|
|
return jsonify({
|
|
'status': 'ok',
|
|
'service': 'FAISS HNSW Search API',
|
|
'version': '3.0',
|
|
'library': 'FAISS',
|
|
'index_initialized': faiss_index is not None and faiss_index.is_initialized
|
|
})
|
|
|
|
|
|
@app.route('/init', methods=['POST'])
|
|
def init_index():
|
|
"""
|
|
Initialisiert einen neuen FAISS HNSW Index
|
|
|
|
Body:
|
|
{
|
|
"dim": 128,
|
|
"metric": "cosine", // optional: "cosine", "l2", "ip"
|
|
"max_elements": 10000, // optional (für Info)
|
|
"M": 32, // optional (Standard: 32)
|
|
"ef_construction": 200, // optional
|
|
"ef_search": 50 // optional
|
|
}
|
|
"""
|
|
global faiss_index
|
|
|
|
try:
|
|
data = request.get_json()
|
|
|
|
if not data or 'dim' not in data:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Feld "dim" (Dimension) ist erforderlich'
|
|
}), 400
|
|
|
|
dim = data['dim']
|
|
metric = data.get('metric', 'cosine')
|
|
max_elements = data.get('max_elements', 10000)
|
|
M = data.get('M', 32)
|
|
ef_construction = data.get('ef_construction', 200)
|
|
ef_search = data.get('ef_search', 50)
|
|
|
|
# Erstelle neuen Index
|
|
faiss_index = FAISSSearch(dim=dim, metric=metric, max_elements=max_elements)
|
|
faiss_index.init_index(M=M, ef_construction=ef_construction, ef_search=ef_search)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'FAISS HNSW Index erfolgreich initialisiert',
|
|
'stats': faiss_index.get_stats()
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Fehler beim Initialisieren: {str(e)}'
|
|
}), 500
|
|
|
|
|
|
@app.route('/add', methods=['POST'])
|
|
def add_vectors():
|
|
"""
|
|
Fügt Vektoren zum Index hinzu
|
|
|
|
Body:
|
|
{
|
|
"vectors": [[0.1, 0.2, ...], [0.3, 0.4, ...]],
|
|
"texts": ["Text 1", "Text 2"], // optional
|
|
"metadata": [{"key": "value"}, {}], // optional
|
|
"ids": [1, 2] // optional
|
|
}
|
|
"""
|
|
global faiss_index
|
|
|
|
try:
|
|
if faiss_index is None or not faiss_index.is_initialized:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Index muss zuerst initialisiert werden (/init)'
|
|
}), 400
|
|
|
|
data = request.get_json()
|
|
|
|
if not data or 'vectors' not in data:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Feld "vectors" ist erforderlich'
|
|
}), 400
|
|
|
|
vectors = data['vectors']
|
|
texts = data.get('texts', None)
|
|
metadata = data.get('metadata', None)
|
|
ids = data.get('ids', None)
|
|
|
|
if not isinstance(vectors, list) or len(vectors) == 0:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'vectors muss eine nicht-leere Liste sein'
|
|
}), 400
|
|
|
|
# Füge Vektoren hinzu
|
|
faiss_index.add_items(vectors=vectors, texts=texts, metadata=metadata, ids=ids)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'{len(vectors)} Vektoren erfolgreich hinzugefügt',
|
|
'stats': faiss_index.get_stats()
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Fehler beim Hinzufügen: {str(e)}'
|
|
}), 500
|
|
|
|
|
|
@app.route('/search', methods=['POST'])
|
|
def search_vectors():
|
|
"""
|
|
Sucht ähnliche Vektoren im Index
|
|
|
|
Body:
|
|
{
|
|
"query_vector": [0.1, 0.2, ...],
|
|
"k": 5, // optional, default: 5
|
|
"ef_search": 50 // optional (Suchgenauigkeit)
|
|
}
|
|
"""
|
|
global faiss_index
|
|
|
|
try:
|
|
if faiss_index is None or not faiss_index.is_initialized:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Index muss zuerst initialisiert werden (/init)'
|
|
}), 400
|
|
|
|
data = request.get_json()
|
|
|
|
if not data or 'query_vector' not in data:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Feld "query_vector" ist erforderlich'
|
|
}), 400
|
|
|
|
query_vector = data['query_vector']
|
|
k = data.get('k', 5)
|
|
ef_search = data.get('ef_search', None)
|
|
|
|
if not isinstance(query_vector, list):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'query_vector muss eine Liste sein'
|
|
}), 400
|
|
|
|
# Suche durchführen
|
|
results = faiss_index.search(query_vector=query_vector, k=k, ef_search=ef_search)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'query_vector_dim': len(query_vector),
|
|
'k': k,
|
|
'results_count': len(results),
|
|
'results': results
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Fehler bei der Suche: {str(e)}'
|
|
}), 500
|
|
|
|
|
|
@app.route('/stats', methods=['GET'])
|
|
def get_stats():
|
|
"""
|
|
Gibt Statistiken über den Index zurück (mit umfassender Fehlerbehandlung)
|
|
"""
|
|
try:
|
|
global faiss_index
|
|
|
|
# Prüfe ob Index-Objekt existiert
|
|
if faiss_index is None:
|
|
return jsonify({
|
|
'success': True,
|
|
'stats': {
|
|
'initialized': False,
|
|
'message': 'Index wurde noch nicht erstellt. Bitte /init aufrufen.'
|
|
}
|
|
}), 200
|
|
|
|
# Hole Stats mit integrierter Fehlerbehandlung
|
|
stats = faiss_index.get_stats()
|
|
|
|
# Prüfe ob Stats-Abruf Fehler enthält
|
|
if 'error' in stats:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': stats['error'],
|
|
'stats': stats
|
|
}), 500
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'stats': stats
|
|
}), 200
|
|
|
|
except AttributeError as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Attributfehler beim Abrufen der Statistiken: {str(e)}',
|
|
'stats': {'initialized': False}
|
|
}), 500
|
|
|
|
except RuntimeError as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Laufzeitfehler beim Abrufen der Statistiken: {str(e)}',
|
|
'stats': {'initialized': False}
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Unerwarteter Fehler beim Abrufen der Statistiken: {str(e)}',
|
|
'stats': {'initialized': False}
|
|
}), 500
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, host='0.0.0.0', port=5000) |