Add validate service with entity merging and validation

Implements a Flask microservice that receives entities from SpaCy and
Exxeta services, merges them based on normalized text matching, and
forwards validated results to coordinator. Also updates gunicorn
configuration with timeout and worker settings.
pull/45/head
Jaronim Pracht 2025-05-30 13:44:13 +02:00
parent ba191dd0a6
commit df5ac605c2
9 changed files with 333 additions and 2 deletions

View File

@ -2,3 +2,4 @@ API_KEY=
DATABASE_URL=postgresql://admin:admin@db:5432 DATABASE_URL=postgresql://admin:admin@db:5432
POSTGRES_PASSWORD=admin POSTGRES_PASSWORD=admin
POSTGRES_USER=admin POSTGRES_USER=admin
COORDINATOR_URL="coordinator"

View File

@ -18,4 +18,4 @@ COPY . .
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
EXPOSE 5000 EXPOSE 5000
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"] CMD ["gunicorn","--timeout", "10","--workers", "2", "--bind", "0.0.0.0:5000", "app:app"]

View File

@ -0,0 +1 @@
COORDINATOR_URL=""

View File

@ -0,0 +1,97 @@
# Validate Service
Ein Flask-basierter Microservice zur asynchronen Verarbeitung und Validierung von Entitäten aus zwei verschiedenen NLP-Services (SpaCy und Exxeta).
## Funktionsweise
Das Service empfängt für jede eindeutige ID zwei POST-Requests von verschiedenen Services:
1. **SpaCy Service** - sendet extrahierte Entitäten
2. **Exxeta Service** - sendet extrahierte Entitäten
Beim ersten Request werden die Daten zwischengespeichert. Beim zweiten Request startet die asynchrone Verarbeitung. Nach der Verarbeitung werden die Ergebnisse an einen nachgelagerten Service weitergeleitet.
## API Endpoints
### POST /validate
Empfängt Entitätsdaten von SpaCy oder Exxeta Services.
**Request Body:**
```json
{
"id": "pitch_book_id",
"service": "spacy|exxeta",
"entities": [
{
"label": "PERSON",
"entity": "Max Mustermann",
"page": 1
}
]
}
```
**Response:**
- **200**: Daten erfolgreich verarbeitet
- **400**: Fehlende oder ungültige Parameter
- **500**: Serverfehler
## Installation und Start
1. **Abhängigkeiten installieren:**
```bash
pip install -r requirements.txt
```
2. **Service starten:**
```bash
python app.py
```
Das Service läuft standardmäßig auf `http://localhost:5050`
## Konfiguration
Umgebungsvariablen in `config.py`:
- `COORDINATOR_URL`: URL des Koordinators
## Verarbeitungslogik
1. **Zwischenspeicherung**: Beim ersten Request wird das JSON in einem Thread-sicheren Dictionary gespeichert
2. **Trigger**: Beim zweiten Request wird die asynchrone Verarbeitung gestartet
3. **Merge & Validate**: Die `merge_and_validate_entities` Funktion führt die Validierung durch:
- Normalisiert Entitäten (entfernt Zeilenumbrüche, konvertiert zu lowercase)
- Matched Entitäten basierend auf Label, normalisiertem Text und Seitenzahl
- Kennzeichnet Entitäten als "validated" (beide Services) oder "single-source"
4. **Weiterleitung**: Ergebnisse werden an den nächsten Service gesendet
5. **Cleanup**: Verarbeitete Daten werden aus dem Speicher entfernt
## Architektur
```
┌─────────────────┐ ┌─────────────────┐
│ SpaCy Service │ │ Exxeta Service │
└─────────┬───────┘ └─────────┬───────┘
│ │
│ POST /validate │ POST /validate
│ (service_type:spacy) │ (service_type:exxeta)
▼ ▼
┌─────────────────────────────────────┐
│ Validate Service │
│ ┌─────────────────────────────┐ │
│ │ Zwischenspeicher │ │
│ │ (Thread-safe Dictionary) │ │
│ └─────────────────────────────┘ │
│ ┌─────────────────────────────┐ │
│ │ Asynchrone Verarbeitung │ │
│ │ (merge_and_validate_entities)│ │
│ └─────────────────────────────┘ │
└─────────────┬───────────────────────┘
│ POST (processed data)
┌─────────────────────────────┐
│ Nachgelagerter Service │
└─────────────────────────────┘
```

View File

@ -0,0 +1,130 @@
from flask import Flask, request, jsonify
import threading
from merge_logic import merge_entities
from validate_logic import validate_entities
from dotenv import load_dotenv
import os
import requests
import json
app = Flask(__name__)
load_dotenv()
coordinator_url = os.getenv("COORDINATOR_URL") or ""
# todo add persistence layer
data_storage = {} # {id: {spacy_data: [], exxeta_data: []}}
storage_lock = threading.Lock()
def send_to_coordinator_service(processed_data, request_id):
if not coordinator_url:
print("Not processed, missing url", processed_data)
return
try:
payload = {
"kpi": json.dumps(processed_data),
}
requests.put(
"http://" + coordinator_url + "/api/pitch_book/" + str(request_id),
data=payload,
)
print(f"Result PitchBook {request_id} sent to coordinator")
except Exception as e:
print(f"Error sending ID {request_id}: {e}")
def process_data_async(request_id, spacy_data, exxeta_data):
try:
print(f"Start asynchronous processing for PitchBook: {request_id}")
# Perform merge
merged_entities = merge_entities(spacy_data, exxeta_data)
valid_entities = validate_entities(merged_entities)
# Send result to next service
send_to_coordinator_service(valid_entities, request_id)
# Remove processed data from storage
with storage_lock:
if request_id in data_storage:
del data_storage[request_id]
except Exception as e:
print(f"Error during asynchronous processing for ID {request_id}: {e}")
@app.route("/validate", methods=["POST"])
def validate():
try:
json_data = request.get_json()
if not json_data:
return jsonify({"error": "Missing JSON data"}), 400
# extract ID and service_type from the data
request_id = json_data.get("id")
service_type = json_data.get("service") # 'spacy' or 'exxeta'
entities = json_data.get("entities", [])
if not request_id or not service_type:
return jsonify({"error": "ID and service_type are required"}), 400
if service_type not in ["spacy", "exxeta"]:
return jsonify({"error": "service_type has to be 'spacy' or 'exxeta'"}), 400
with storage_lock:
# Initialize entry if not already present
if request_id not in data_storage:
data_storage[request_id] = {
"spacy_data": None,
"exxeta_data": None,
}
# Store the data based on the service type
data_storage[request_id][f"{service_type}_data"] = entities
# Check if both datasets are present
stored_data = data_storage[request_id]
spacy_data = stored_data["spacy_data"]
exxeta_data = stored_data["exxeta_data"]
# If both datasets are present, start asynchronous processing
if spacy_data is not None and exxeta_data is not None:
# Start asynchronous processing in a separate thread
processing_thread = threading.Thread(
target=process_data_async,
args=(request_id, spacy_data, exxeta_data),
daemon=True,
)
processing_thread.start()
return (
jsonify(
{
"message": f"Second dataset for ID {request_id} received. Processing started.",
}
),
200,
)
else:
return (
jsonify(
{
"message": f"First dataset for ID {request_id} from {service_type} stored. Waiting for second dataset.",
}
),
200,
)
except Exception as e:
print(f"Error occurred: {str(e)}")
return jsonify({"error": f"Fehler: {str(e)}"}), 500
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5050)

View File

@ -0,0 +1,68 @@
def normalize_entity(entity_str):
if not entity_str:
return ""
normalized = entity_str.replace("\n", " ")
normalized = "".join(normalized.lower().split())
return normalized
def merge_entities(spacy_data, exxeta_data):
merged = []
seen = set()
# Process SpaCy entities first
for s in spacy_data:
s_entity_norm = normalize_entity(s["entity"])
s_page = s["page"]
# Look for matching Exxeta entities
found = False
for e in exxeta_data:
e_entity_norm = normalize_entity(e["entity"])
e_page = e["page"]
# Match if normalized entity and page match
if (
s["label"] == e["label"]
and s_entity_norm == e_entity_norm
and s_page == e_page
):
merged.append(
{
"label": s["label"],
"entity": s["entity"],
"page": s_page,
"status": "validated",
}
)
seen.add((e["entity"], e_page))
found = True
break
# If no match found, add as single-source
if not found:
merged.append(
{
"label": s["label"],
"entity": s["entity"],
"page": s_page,
"status": "single-source",
"source": "spacy",
}
)
# Add remaining Exxeta entities not already processed
for e in exxeta_data:
if (e["entity"], e["page"]) not in seen:
merged.append(
{
"label": e["label"],
"entity": e["entity"],
"page": e["page"],
"status": "single-source",
"source": "exxeta",
}
)
return merged

View File

@ -0,0 +1,14 @@
blinker==1.9.0
certifi==2025.4.26
charset-normalizer==3.4.2
click==8.2.1
dotenv==0.9.9
Flask==3.1.1
idna==3.10
itsdangerous==2.2.0
Jinja2==3.1.6
MarkupSafe==3.0.2
python-dotenv==1.1.0
requests==2.32.3
urllib3==2.4.0
Werkzeug==3.1.3

View File

@ -0,0 +1,12 @@
def validate_entities(entities):
return entities
#todo
valid = []
for entity in entities:
if entity["type"] == "PERSON":
if entity["name"] == "John Doe":
valid.append(entity)
elif entity["type"] == "ORG":
if entity["name"] == "Exxeta":
valid.append(entity)
return valid

View File

@ -30,7 +30,7 @@ services:
timeout: 5s timeout: 5s
retries: 5 retries: 5
ports: ports:
- 5000:5000 - 5050:5000
spacy: spacy:
build: build:
@ -42,3 +42,11 @@ services:
dockerfile: ../../Dockerfile dockerfile: ../../Dockerfile
env_file: env_file:
- .env - .env
validate:
build:
context: backend/validate-service
dockerfile: ../../Dockerfile
env_file:
- .env
ports:
- 5051:5000