Init validate service #45

Merged
3023730 merged 2 commits from #12-init-validate-service into main 2025-06-01 12:51:30 +02:00
9 changed files with 333 additions and 2 deletions

View File

@ -2,3 +2,4 @@ API_KEY=
DATABASE_URL=postgresql://admin:admin@db:5432
POSTGRES_PASSWORD=admin
POSTGRES_USER=admin
COORDINATOR_URL="coordinator:5000"

View File

@ -18,4 +18,4 @@ COPY . .
ENV PYTHONUNBUFFERED=1
EXPOSE 5000
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]
CMD ["gunicorn","--timeout", "10","--workers", "2", "--bind", "0.0.0.0:5000", "app:app"]

View File

@ -0,0 +1 @@
COORDINATOR_URL=""

View File

@ -0,0 +1,97 @@
# Validate Service
Ein Flask-basierter Microservice zur asynchronen Verarbeitung und Validierung von Entitäten aus zwei verschiedenen NLP-Services (SpaCy und Exxeta).
## Funktionsweise
Das Service empfängt für jede eindeutige ID zwei POST-Requests von verschiedenen Services:
1. **SpaCy Service** - sendet extrahierte Entitäten
2. **Exxeta Service** - sendet extrahierte Entitäten
Beim ersten Request werden die Daten zwischengespeichert. Beim zweiten Request startet die asynchrone Verarbeitung. Nach der Verarbeitung werden die Ergebnisse an einen nachgelagerten Service weitergeleitet.
## API Endpoints
### POST /validate
Empfängt Entitätsdaten von SpaCy oder Exxeta Services.
**Request Body:**
```json
{
"id": "pitch_book_id",
"service": "spacy|exxeta",
"entities": [
{
"label": "PERSON",
"entity": "Max Mustermann",
"page": 1
}
]
}
```
**Response:**
- **200**: Daten erfolgreich verarbeitet
- **400**: Fehlende oder ungültige Parameter
- **500**: Serverfehler
## Installation und Start
1. **Abhängigkeiten installieren:**
```bash
pip install -r requirements.txt
```
2. **Service starten:**
```bash
python app.py
```
Das Service läuft standardmäßig auf `http://localhost:5050`
## Konfiguration
Umgebungsvariablen in `config.py`:
- `COORDINATOR_URL`: URL des Koordinators
## Verarbeitungslogik
1. **Zwischenspeicherung**: Beim ersten Request wird das JSON in einem Thread-sicheren Dictionary gespeichert
2. **Trigger**: Beim zweiten Request wird die asynchrone Verarbeitung gestartet
3. **Merge & Validate**: Die `merge_and_validate_entities` Funktion führt die Validierung durch:
- Normalisiert Entitäten (entfernt Zeilenumbrüche, konvertiert zu lowercase)
- Matched Entitäten basierend auf Label, normalisiertem Text und Seitenzahl
- Kennzeichnet Entitäten als "validated" (beide Services) oder "single-source"
4. **Weiterleitung**: Ergebnisse werden an den nächsten Service gesendet
5. **Cleanup**: Verarbeitete Daten werden aus dem Speicher entfernt
## Architektur
```
┌─────────────────┐ ┌─────────────────┐
│ SpaCy Service │ │ Exxeta Service │
└─────────┬───────┘ └─────────┬───────┘
│ │
│ POST /validate │ POST /validate
│ (service_type:spacy) │ (service_type:exxeta)
▼ ▼
┌─────────────────────────────────────┐
│ Validate Service │
│ ┌─────────────────────────────┐ │
│ │ Zwischenspeicher │ │
│ │ (Thread-safe Dictionary) │ │
│ └─────────────────────────────┘ │
│ ┌─────────────────────────────┐ │
│ │ Asynchrone Verarbeitung │ │
│ │ (merge_and_validate_entities)│ │
│ └─────────────────────────────┘ │
└─────────────┬───────────────────────┘
│ POST (processed data)
┌─────────────────────────────┐
│ Nachgelagerter Service │
└─────────────────────────────┘
```

View File

@ -0,0 +1,130 @@
from flask import Flask, request, jsonify
import threading
from merge_logic import merge_entities
from validate_logic import validate_entities
from dotenv import load_dotenv
import os
import requests
import json
app = Flask(__name__)
load_dotenv()
coordinator_url = os.getenv("COORDINATOR_URL") or ""
# todo add persistence layer
data_storage = {} # {id: {spacy_data: [], exxeta_data: []}}
storage_lock = threading.Lock()
def send_to_coordinator_service(processed_data, request_id):
if not coordinator_url:
print("Not processed, missing url", processed_data)
return
try:
payload = {
"kpi": json.dumps(processed_data),
}
requests.put(
"http://" + coordinator_url + "/api/pitch_book/" + str(request_id),
data=payload,
)
print(f"Result PitchBook {request_id} sent to coordinator")
except Exception as e:
print(f"Error sending ID {request_id}: {e}")
def process_data_async(request_id, spacy_data, exxeta_data):
try:
print(f"Start asynchronous processing for PitchBook: {request_id}")
# Perform merge
merged_entities = merge_entities(spacy_data, exxeta_data)
valid_entities = validate_entities(merged_entities)
# Send result to next service
send_to_coordinator_service(valid_entities, request_id)
# Remove processed data from storage
with storage_lock:
if request_id in data_storage:
del data_storage[request_id]
except Exception as e:
print(f"Error during asynchronous processing for ID {request_id}: {e}")
@app.route("/validate", methods=["POST"])
def validate():
try:
json_data = request.get_json()
if not json_data:
return jsonify({"error": "Missing JSON data"}), 400
# extract ID and service_type from the data
request_id = json_data.get("id")
service_type = json_data.get("service") # 'spacy' or 'exxeta'
entities = json_data.get("entities", [])
if not request_id or not service_type:
return jsonify({"error": "ID and service_type are required"}), 400
if service_type not in ["spacy", "exxeta"]:
return jsonify({"error": "service_type has to be 'spacy' or 'exxeta'"}), 400
with storage_lock:
# Initialize entry if not already present
if request_id not in data_storage:
data_storage[request_id] = {
"spacy_data": None,
"exxeta_data": None,
}
# Store the data based on the service type
data_storage[request_id][f"{service_type}_data"] = entities
# Check if both datasets are present
stored_data = data_storage[request_id]
spacy_data = stored_data["spacy_data"]
exxeta_data = stored_data["exxeta_data"]
# If both datasets are present, start asynchronous processing
if spacy_data is not None and exxeta_data is not None:
# Start asynchronous processing in a separate thread
processing_thread = threading.Thread(
target=process_data_async,
args=(request_id, spacy_data, exxeta_data),
daemon=True,
)
processing_thread.start()
return (
jsonify(
{
"message": f"Second dataset for ID {request_id} received. Processing started.",
}
),
200,
)
else:
return (
jsonify(
{
"message": f"First dataset for ID {request_id} from {service_type} stored. Waiting for second dataset.",
}
),
200,
)
except Exception as e:
print(f"Error occurred: {str(e)}")
return jsonify({"error": f"Fehler: {str(e)}"}), 500
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5050)

View File

@ -0,0 +1,68 @@
def normalize_entity(entity_str):
if not entity_str:
return ""
normalized = entity_str.replace("\n", " ")
normalized = "".join(normalized.lower().split())
return normalized
def merge_entities(spacy_data, exxeta_data):
merged = []
seen = set()
# Process SpaCy entities first
for s in spacy_data:
s_entity_norm = normalize_entity(s["entity"])
s_page = s["page"]
# Look for matching Exxeta entities
found = False
for e in exxeta_data:
e_entity_norm = normalize_entity(e["entity"])
e_page = e["page"]
# Match if normalized entity and page match
if (
s["label"] == e["label"]
and s_entity_norm == e_entity_norm
and s_page == e_page
):
merged.append(
{
"label": s["label"],
"entity": s["entity"],
"page": s_page,
"status": "validated",
}
)
seen.add((e["entity"], e_page))
found = True
break
# If no match found, add as single-source
if not found:
merged.append(
{
"label": s["label"],
"entity": s["entity"],
"page": s_page,
"status": "single-source",
"source": "spacy",
}
)
# Add remaining Exxeta entities not already processed
for e in exxeta_data:
if (e["entity"], e["page"]) not in seen:
merged.append(
{
"label": e["label"],
"entity": e["entity"],
"page": e["page"],
"status": "single-source",
"source": "exxeta",
}
)
return merged

View File

@ -0,0 +1,14 @@
blinker==1.9.0
certifi==2025.4.26
charset-normalizer==3.4.2
click==8.2.1
dotenv==0.9.9
Flask==3.1.1
idna==3.10
itsdangerous==2.2.0
Jinja2==3.1.6
MarkupSafe==3.0.2
python-dotenv==1.1.0
requests==2.32.3
urllib3==2.4.0
Werkzeug==3.1.3

View File

@ -0,0 +1,12 @@
def validate_entities(entities):
return entities
#todo
valid = []
for entity in entities:
if entity["type"] == "PERSON":
if entity["name"] == "John Doe":
valid.append(entity)
elif entity["type"] == "ORG":
if entity["name"] == "Exxeta":
valid.append(entity)
return valid

View File

@ -30,7 +30,7 @@ services:
timeout: 5s
retries: 5
ports:
- 5000:5000
- 5050:5000
spacy:
build:
@ -42,3 +42,11 @@ services:
dockerfile: ../../Dockerfile
env_file:
- .env
validate:
build:
context: backend/validate-service
dockerfile: ../../Dockerfile
env_file:
- .env
ports:
- 5051:5000