Init validate service #45
|
|
@ -2,3 +2,4 @@ API_KEY=
|
||||||
DATABASE_URL=postgresql://admin:admin@db:5432
|
DATABASE_URL=postgresql://admin:admin@db:5432
|
||||||
POSTGRES_PASSWORD=admin
|
POSTGRES_PASSWORD=admin
|
||||||
POSTGRES_USER=admin
|
POSTGRES_USER=admin
|
||||||
|
COORDINATOR_URL="coordinator:5000"
|
||||||
|
|
|
||||||
|
|
@ -18,4 +18,4 @@ COPY . .
|
||||||
ENV PYTHONUNBUFFERED=1
|
ENV PYTHONUNBUFFERED=1
|
||||||
EXPOSE 5000
|
EXPOSE 5000
|
||||||
|
|
||||||
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]
|
CMD ["gunicorn","--timeout", "10","--workers", "2", "--bind", "0.0.0.0:5000", "app:app"]
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
COORDINATOR_URL=""
|
||||||
|
|
@ -0,0 +1,97 @@
|
||||||
|
# Validate Service
|
||||||
|
|
||||||
|
Ein Flask-basierter Microservice zur asynchronen Verarbeitung und Validierung von Entitäten aus zwei verschiedenen NLP-Services (SpaCy und Exxeta).
|
||||||
|
|
||||||
|
## Funktionsweise
|
||||||
|
|
||||||
|
Das Service empfängt für jede eindeutige ID zwei POST-Requests von verschiedenen Services:
|
||||||
|
1. **SpaCy Service** - sendet extrahierte Entitäten
|
||||||
|
2. **Exxeta Service** - sendet extrahierte Entitäten
|
||||||
|
|
||||||
|
Beim ersten Request werden die Daten zwischengespeichert. Beim zweiten Request startet die asynchrone Verarbeitung. Nach der Verarbeitung werden die Ergebnisse an einen nachgelagerten Service weitergeleitet.
|
||||||
|
|
||||||
|
## API Endpoints
|
||||||
|
|
||||||
|
### POST /validate
|
||||||
|
|
||||||
|
Empfängt Entitätsdaten von SpaCy oder Exxeta Services.
|
||||||
|
|
||||||
|
**Request Body:**
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"id": "pitch_book_id",
|
||||||
|
"service": "spacy|exxeta",
|
||||||
|
"entities": [
|
||||||
|
{
|
||||||
|
"label": "PERSON",
|
||||||
|
"entity": "Max Mustermann",
|
||||||
|
"page": 1
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Response:**
|
||||||
|
- **200**: Daten erfolgreich verarbeitet
|
||||||
|
- **400**: Fehlende oder ungültige Parameter
|
||||||
|
- **500**: Serverfehler
|
||||||
|
|
||||||
|
## Installation und Start
|
||||||
|
|
||||||
|
1. **Abhängigkeiten installieren:**
|
||||||
|
```bash
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Service starten:**
|
||||||
|
```bash
|
||||||
|
python app.py
|
||||||
|
```
|
||||||
|
|
||||||
|
Das Service läuft standardmäßig auf `http://localhost:5050`
|
||||||
|
|
||||||
|
## Konfiguration
|
||||||
|
|
||||||
|
Umgebungsvariablen in `config.py`:
|
||||||
|
|
||||||
|
- `COORDINATOR_URL`: URL des Koordinators
|
||||||
|
|
||||||
|
## Verarbeitungslogik
|
||||||
|
|
||||||
|
1. **Zwischenspeicherung**: Beim ersten Request wird das JSON in einem Thread-sicheren Dictionary gespeichert
|
||||||
|
2. **Trigger**: Beim zweiten Request wird die asynchrone Verarbeitung gestartet
|
||||||
|
3. **Merge & Validate**: Die `merge_and_validate_entities` Funktion führt die Validierung durch:
|
||||||
|
- Normalisiert Entitäten (entfernt Zeilenumbrüche, konvertiert zu lowercase)
|
||||||
|
- Matched Entitäten basierend auf Label, normalisiertem Text und Seitenzahl
|
||||||
|
- Kennzeichnet Entitäten als "validated" (beide Services) oder "single-source"
|
||||||
|
4. **Weiterleitung**: Ergebnisse werden an den nächsten Service gesendet
|
||||||
|
5. **Cleanup**: Verarbeitete Daten werden aus dem Speicher entfernt
|
||||||
|
|
||||||
|
## Architektur
|
||||||
|
|
||||||
|
```
|
||||||
|
┌─────────────────┐ ┌─────────────────┐
|
||||||
|
│ SpaCy Service │ │ Exxeta Service │
|
||||||
|
└─────────┬───────┘ └─────────┬───────┘
|
||||||
|
│ │
|
||||||
|
│ POST /validate │ POST /validate
|
||||||
|
│ (service_type:spacy) │ (service_type:exxeta)
|
||||||
|
▼ ▼
|
||||||
|
┌─────────────────────────────────────┐
|
||||||
|
│ Validate Service │
|
||||||
|
│ ┌─────────────────────────────┐ │
|
||||||
|
│ │ Zwischenspeicher │ │
|
||||||
|
│ │ (Thread-safe Dictionary) │ │
|
||||||
|
│ └─────────────────────────────┘ │
|
||||||
|
│ ┌─────────────────────────────┐ │
|
||||||
|
│ │ Asynchrone Verarbeitung │ │
|
||||||
|
│ │ (merge_and_validate_entities)│ │
|
||||||
|
│ └─────────────────────────────┘ │
|
||||||
|
└─────────────┬───────────────────────┘
|
||||||
|
│
|
||||||
|
│ POST (processed data)
|
||||||
|
▼
|
||||||
|
┌─────────────────────────────┐
|
||||||
|
│ Nachgelagerter Service │
|
||||||
|
└─────────────────────────────┘
|
||||||
|
```
|
||||||
|
|
@ -0,0 +1,130 @@
|
||||||
|
from flask import Flask, request, jsonify
|
||||||
|
import threading
|
||||||
|
from merge_logic import merge_entities
|
||||||
|
from validate_logic import validate_entities
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
import os
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
|
||||||
|
app = Flask(__name__)
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
coordinator_url = os.getenv("COORDINATOR_URL") or ""
|
||||||
|
|
||||||
|
# todo add persistence layer
|
||||||
|
data_storage = {} # {id: {spacy_data: [], exxeta_data: []}}
|
||||||
|
|
||||||
|
storage_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
def send_to_coordinator_service(processed_data, request_id):
|
||||||
|
if not coordinator_url:
|
||||||
|
print("Not processed, missing url", processed_data)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
payload = {
|
||||||
|
"kpi": json.dumps(processed_data),
|
||||||
|
}
|
||||||
|
requests.put(
|
||||||
|
"http://" + coordinator_url + "/api/pitch_book/" + str(request_id),
|
||||||
|
data=payload,
|
||||||
|
)
|
||||||
|
print(f"Result PitchBook {request_id} sent to coordinator")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error sending ID {request_id}: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def process_data_async(request_id, spacy_data, exxeta_data):
|
||||||
|
try:
|
||||||
|
print(f"Start asynchronous processing for PitchBook: {request_id}")
|
||||||
|
|
||||||
|
# Perform merge
|
||||||
|
merged_entities = merge_entities(spacy_data, exxeta_data)
|
||||||
|
valid_entities = validate_entities(merged_entities)
|
||||||
|
|
||||||
|
# Send result to next service
|
||||||
|
send_to_coordinator_service(valid_entities, request_id)
|
||||||
|
|
||||||
|
# Remove processed data from storage
|
||||||
|
with storage_lock:
|
||||||
|
if request_id in data_storage:
|
||||||
|
del data_storage[request_id]
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error during asynchronous processing for ID {request_id}: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/validate", methods=["POST"])
|
||||||
|
def validate():
|
||||||
|
try:
|
||||||
|
json_data = request.get_json()
|
||||||
|
|
||||||
|
if not json_data:
|
||||||
|
return jsonify({"error": "Missing JSON data"}), 400
|
||||||
|
|
||||||
|
# extract ID and service_type from the data
|
||||||
|
request_id = json_data.get("id")
|
||||||
|
service_type = json_data.get("service") # 'spacy' or 'exxeta'
|
||||||
|
entities = json_data.get("entities", [])
|
||||||
|
|
||||||
|
if not request_id or not service_type:
|
||||||
|
return jsonify({"error": "ID and service_type are required"}), 400
|
||||||
|
|
||||||
|
if service_type not in ["spacy", "exxeta"]:
|
||||||
|
return jsonify({"error": "service_type has to be 'spacy' or 'exxeta'"}), 400
|
||||||
|
|
||||||
|
with storage_lock:
|
||||||
|
# Initialize entry if not already present
|
||||||
|
if request_id not in data_storage:
|
||||||
|
data_storage[request_id] = {
|
||||||
|
"spacy_data": None,
|
||||||
|
"exxeta_data": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Store the data based on the service type
|
||||||
|
data_storage[request_id][f"{service_type}_data"] = entities
|
||||||
|
|
||||||
|
# Check if both datasets are present
|
||||||
|
stored_data = data_storage[request_id]
|
||||||
|
spacy_data = stored_data["spacy_data"]
|
||||||
|
exxeta_data = stored_data["exxeta_data"]
|
||||||
|
|
||||||
|
# If both datasets are present, start asynchronous processing
|
||||||
|
if spacy_data is not None and exxeta_data is not None:
|
||||||
|
|
||||||
|
# Start asynchronous processing in a separate thread
|
||||||
|
processing_thread = threading.Thread(
|
||||||
|
target=process_data_async,
|
||||||
|
args=(request_id, spacy_data, exxeta_data),
|
||||||
|
daemon=True,
|
||||||
|
)
|
||||||
|
processing_thread.start()
|
||||||
|
|
||||||
|
return (
|
||||||
|
jsonify(
|
||||||
|
{
|
||||||
|
"message": f"Second dataset for ID {request_id} received. Processing started.",
|
||||||
|
}
|
||||||
|
),
|
||||||
|
200,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return (
|
||||||
|
jsonify(
|
||||||
|
{
|
||||||
|
"message": f"First dataset for ID {request_id} from {service_type} stored. Waiting for second dataset.",
|
||||||
|
}
|
||||||
|
),
|
||||||
|
200,
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error occurred: {str(e)}")
|
||||||
|
return jsonify({"error": f"Fehler: {str(e)}"}), 500
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
app.run(debug=True, host="0.0.0.0", port=5050)
|
||||||
|
|
@ -0,0 +1,68 @@
|
||||||
|
def normalize_entity(entity_str):
|
||||||
|
if not entity_str:
|
||||||
|
return ""
|
||||||
|
normalized = entity_str.replace("\n", " ")
|
||||||
|
normalized = "".join(normalized.lower().split())
|
||||||
|
return normalized
|
||||||
|
|
||||||
|
|
||||||
|
def merge_entities(spacy_data, exxeta_data):
|
||||||
|
merged = []
|
||||||
|
seen = set()
|
||||||
|
|
||||||
|
# Process SpaCy entities first
|
||||||
|
for s in spacy_data:
|
||||||
|
s_entity_norm = normalize_entity(s["entity"])
|
||||||
|
s_page = s["page"]
|
||||||
|
|
||||||
|
# Look for matching Exxeta entities
|
||||||
|
found = False
|
||||||
|
for e in exxeta_data:
|
||||||
|
e_entity_norm = normalize_entity(e["entity"])
|
||||||
|
e_page = e["page"]
|
||||||
|
|
||||||
|
# Match if normalized entity and page match
|
||||||
|
if (
|
||||||
|
s["label"] == e["label"]
|
||||||
|
and s_entity_norm == e_entity_norm
|
||||||
|
and s_page == e_page
|
||||||
|
):
|
||||||
|
|
||||||
|
merged.append(
|
||||||
|
{
|
||||||
|
"label": s["label"],
|
||||||
|
"entity": s["entity"],
|
||||||
|
"page": s_page,
|
||||||
|
"status": "validated",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
seen.add((e["entity"], e_page))
|
||||||
|
found = True
|
||||||
|
break
|
||||||
|
|
||||||
|
# If no match found, add as single-source
|
||||||
|
if not found:
|
||||||
|
merged.append(
|
||||||
|
{
|
||||||
|
"label": s["label"],
|
||||||
|
"entity": s["entity"],
|
||||||
|
"page": s_page,
|
||||||
|
"status": "single-source",
|
||||||
|
"source": "spacy",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add remaining Exxeta entities not already processed
|
||||||
|
for e in exxeta_data:
|
||||||
|
if (e["entity"], e["page"]) not in seen:
|
||||||
|
merged.append(
|
||||||
|
{
|
||||||
|
"label": e["label"],
|
||||||
|
"entity": e["entity"],
|
||||||
|
"page": e["page"],
|
||||||
|
"status": "single-source",
|
||||||
|
"source": "exxeta",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return merged
|
||||||
|
|
@ -0,0 +1,14 @@
|
||||||
|
blinker==1.9.0
|
||||||
|
certifi==2025.4.26
|
||||||
|
charset-normalizer==3.4.2
|
||||||
|
click==8.2.1
|
||||||
|
dotenv==0.9.9
|
||||||
|
Flask==3.1.1
|
||||||
|
idna==3.10
|
||||||
|
itsdangerous==2.2.0
|
||||||
|
Jinja2==3.1.6
|
||||||
|
MarkupSafe==3.0.2
|
||||||
|
python-dotenv==1.1.0
|
||||||
|
requests==2.32.3
|
||||||
|
urllib3==2.4.0
|
||||||
|
Werkzeug==3.1.3
|
||||||
|
|
@ -0,0 +1,12 @@
|
||||||
|
def validate_entities(entities):
|
||||||
|
return entities
|
||||||
|
#todo
|
||||||
|
valid = []
|
||||||
|
for entity in entities:
|
||||||
|
if entity["type"] == "PERSON":
|
||||||
|
if entity["name"] == "John Doe":
|
||||||
|
valid.append(entity)
|
||||||
|
elif entity["type"] == "ORG":
|
||||||
|
if entity["name"] == "Exxeta":
|
||||||
|
valid.append(entity)
|
||||||
|
return valid
|
||||||
|
|
@ -30,7 +30,7 @@ services:
|
||||||
timeout: 5s
|
timeout: 5s
|
||||||
retries: 5
|
retries: 5
|
||||||
ports:
|
ports:
|
||||||
- 5000:5000
|
- 5050:5000
|
||||||
|
|
||||||
spacy:
|
spacy:
|
||||||
build:
|
build:
|
||||||
|
|
@ -42,3 +42,11 @@ services:
|
||||||
dockerfile: ../../Dockerfile
|
dockerfile: ../../Dockerfile
|
||||||
env_file:
|
env_file:
|
||||||
- .env
|
- .env
|
||||||
|
validate:
|
||||||
|
build:
|
||||||
|
context: backend/validate-service
|
||||||
|
dockerfile: ../../Dockerfile
|
||||||
|
env_file:
|
||||||
|
- .env
|
||||||
|
ports:
|
||||||
|
- 5051:5000
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue