Init validate service #45
|
|
@ -2,3 +2,4 @@ API_KEY=
|
|||
DATABASE_URL=postgresql://admin:admin@db:5432
|
||||
POSTGRES_PASSWORD=admin
|
||||
POSTGRES_USER=admin
|
||||
COORDINATOR_URL="coordinator:5000"
|
||||
|
|
|
|||
|
|
@ -18,4 +18,4 @@ COPY . .
|
|||
ENV PYTHONUNBUFFERED=1
|
||||
EXPOSE 5000
|
||||
|
||||
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]
|
||||
CMD ["gunicorn","--timeout", "10","--workers", "2", "--bind", "0.0.0.0:5000", "app:app"]
|
||||
|
|
|
|||
|
|
@ -0,0 +1 @@
|
|||
COORDINATOR_URL=""
|
||||
|
|
@ -0,0 +1,97 @@
|
|||
# Validate Service
|
||||
|
||||
Ein Flask-basierter Microservice zur asynchronen Verarbeitung und Validierung von Entitäten aus zwei verschiedenen NLP-Services (SpaCy und Exxeta).
|
||||
|
||||
## Funktionsweise
|
||||
|
||||
Das Service empfängt für jede eindeutige ID zwei POST-Requests von verschiedenen Services:
|
||||
1. **SpaCy Service** - sendet extrahierte Entitäten
|
||||
2. **Exxeta Service** - sendet extrahierte Entitäten
|
||||
|
||||
Beim ersten Request werden die Daten zwischengespeichert. Beim zweiten Request startet die asynchrone Verarbeitung. Nach der Verarbeitung werden die Ergebnisse an einen nachgelagerten Service weitergeleitet.
|
||||
|
||||
## API Endpoints
|
||||
|
||||
### POST /validate
|
||||
|
||||
Empfängt Entitätsdaten von SpaCy oder Exxeta Services.
|
||||
|
||||
**Request Body:**
|
||||
```json
|
||||
{
|
||||
"id": "pitch_book_id",
|
||||
"service": "spacy|exxeta",
|
||||
"entities": [
|
||||
{
|
||||
"label": "PERSON",
|
||||
"entity": "Max Mustermann",
|
||||
"page": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
**Response:**
|
||||
- **200**: Daten erfolgreich verarbeitet
|
||||
- **400**: Fehlende oder ungültige Parameter
|
||||
- **500**: Serverfehler
|
||||
|
||||
## Installation und Start
|
||||
|
||||
1. **Abhängigkeiten installieren:**
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
2. **Service starten:**
|
||||
```bash
|
||||
python app.py
|
||||
```
|
||||
|
||||
Das Service läuft standardmäßig auf `http://localhost:5050`
|
||||
|
||||
## Konfiguration
|
||||
|
||||
Umgebungsvariablen in `config.py`:
|
||||
|
||||
- `COORDINATOR_URL`: URL des Koordinators
|
||||
|
||||
## Verarbeitungslogik
|
||||
|
||||
1. **Zwischenspeicherung**: Beim ersten Request wird das JSON in einem Thread-sicheren Dictionary gespeichert
|
||||
2. **Trigger**: Beim zweiten Request wird die asynchrone Verarbeitung gestartet
|
||||
3. **Merge & Validate**: Die `merge_and_validate_entities` Funktion führt die Validierung durch:
|
||||
- Normalisiert Entitäten (entfernt Zeilenumbrüche, konvertiert zu lowercase)
|
||||
- Matched Entitäten basierend auf Label, normalisiertem Text und Seitenzahl
|
||||
- Kennzeichnet Entitäten als "validated" (beide Services) oder "single-source"
|
||||
4. **Weiterleitung**: Ergebnisse werden an den nächsten Service gesendet
|
||||
5. **Cleanup**: Verarbeitete Daten werden aus dem Speicher entfernt
|
||||
|
||||
## Architektur
|
||||
|
||||
```
|
||||
┌─────────────────┐ ┌─────────────────┐
|
||||
│ SpaCy Service │ │ Exxeta Service │
|
||||
└─────────┬───────┘ └─────────┬───────┘
|
||||
│ │
|
||||
│ POST /validate │ POST /validate
|
||||
│ (service_type:spacy) │ (service_type:exxeta)
|
||||
▼ ▼
|
||||
┌─────────────────────────────────────┐
|
||||
│ Validate Service │
|
||||
│ ┌─────────────────────────────┐ │
|
||||
│ │ Zwischenspeicher │ │
|
||||
│ │ (Thread-safe Dictionary) │ │
|
||||
│ └─────────────────────────────┘ │
|
||||
│ ┌─────────────────────────────┐ │
|
||||
│ │ Asynchrone Verarbeitung │ │
|
||||
│ │ (merge_and_validate_entities)│ │
|
||||
│ └─────────────────────────────┘ │
|
||||
└─────────────┬───────────────────────┘
|
||||
│
|
||||
│ POST (processed data)
|
||||
▼
|
||||
┌─────────────────────────────┐
|
||||
│ Nachgelagerter Service │
|
||||
└─────────────────────────────┘
|
||||
```
|
||||
|
|
@ -0,0 +1,130 @@
|
|||
from flask import Flask, request, jsonify
|
||||
import threading
|
||||
from merge_logic import merge_entities
|
||||
from validate_logic import validate_entities
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
import requests
|
||||
import json
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
load_dotenv()
|
||||
coordinator_url = os.getenv("COORDINATOR_URL") or ""
|
||||
|
||||
# todo add persistence layer
|
||||
data_storage = {} # {id: {spacy_data: [], exxeta_data: []}}
|
||||
|
||||
storage_lock = threading.Lock()
|
||||
|
||||
|
||||
def send_to_coordinator_service(processed_data, request_id):
|
||||
if not coordinator_url:
|
||||
print("Not processed, missing url", processed_data)
|
||||
return
|
||||
|
||||
try:
|
||||
payload = {
|
||||
"kpi": json.dumps(processed_data),
|
||||
}
|
||||
requests.put(
|
||||
"http://" + coordinator_url + "/api/pitch_book/" + str(request_id),
|
||||
data=payload,
|
||||
)
|
||||
print(f"Result PitchBook {request_id} sent to coordinator")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error sending ID {request_id}: {e}")
|
||||
|
||||
|
||||
def process_data_async(request_id, spacy_data, exxeta_data):
|
||||
try:
|
||||
print(f"Start asynchronous processing for PitchBook: {request_id}")
|
||||
|
||||
# Perform merge
|
||||
merged_entities = merge_entities(spacy_data, exxeta_data)
|
||||
valid_entities = validate_entities(merged_entities)
|
||||
|
||||
# Send result to next service
|
||||
send_to_coordinator_service(valid_entities, request_id)
|
||||
|
||||
# Remove processed data from storage
|
||||
with storage_lock:
|
||||
if request_id in data_storage:
|
||||
del data_storage[request_id]
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error during asynchronous processing for ID {request_id}: {e}")
|
||||
|
||||
|
||||
@app.route("/validate", methods=["POST"])
|
||||
def validate():
|
||||
try:
|
||||
json_data = request.get_json()
|
||||
|
||||
if not json_data:
|
||||
return jsonify({"error": "Missing JSON data"}), 400
|
||||
|
||||
# extract ID and service_type from the data
|
||||
request_id = json_data.get("id")
|
||||
service_type = json_data.get("service") # 'spacy' or 'exxeta'
|
||||
entities = json_data.get("entities", [])
|
||||
|
||||
if not request_id or not service_type:
|
||||
return jsonify({"error": "ID and service_type are required"}), 400
|
||||
|
||||
if service_type not in ["spacy", "exxeta"]:
|
||||
return jsonify({"error": "service_type has to be 'spacy' or 'exxeta'"}), 400
|
||||
|
||||
with storage_lock:
|
||||
# Initialize entry if not already present
|
||||
if request_id not in data_storage:
|
||||
data_storage[request_id] = {
|
||||
"spacy_data": None,
|
||||
"exxeta_data": None,
|
||||
}
|
||||
|
||||
# Store the data based on the service type
|
||||
data_storage[request_id][f"{service_type}_data"] = entities
|
||||
|
||||
# Check if both datasets are present
|
||||
stored_data = data_storage[request_id]
|
||||
spacy_data = stored_data["spacy_data"]
|
||||
exxeta_data = stored_data["exxeta_data"]
|
||||
|
||||
# If both datasets are present, start asynchronous processing
|
||||
if spacy_data is not None and exxeta_data is not None:
|
||||
|
||||
# Start asynchronous processing in a separate thread
|
||||
processing_thread = threading.Thread(
|
||||
target=process_data_async,
|
||||
args=(request_id, spacy_data, exxeta_data),
|
||||
daemon=True,
|
||||
)
|
||||
processing_thread.start()
|
||||
|
||||
return (
|
||||
jsonify(
|
||||
{
|
||||
"message": f"Second dataset for ID {request_id} received. Processing started.",
|
||||
}
|
||||
),
|
||||
200,
|
||||
)
|
||||
else:
|
||||
return (
|
||||
jsonify(
|
||||
{
|
||||
"message": f"First dataset for ID {request_id} from {service_type} stored. Waiting for second dataset.",
|
||||
}
|
||||
),
|
||||
200,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error occurred: {str(e)}")
|
||||
return jsonify({"error": f"Fehler: {str(e)}"}), 500
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(debug=True, host="0.0.0.0", port=5050)
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
def normalize_entity(entity_str):
|
||||
if not entity_str:
|
||||
return ""
|
||||
normalized = entity_str.replace("\n", " ")
|
||||
normalized = "".join(normalized.lower().split())
|
||||
return normalized
|
||||
|
||||
|
||||
def merge_entities(spacy_data, exxeta_data):
|
||||
merged = []
|
||||
seen = set()
|
||||
|
||||
# Process SpaCy entities first
|
||||
for s in spacy_data:
|
||||
s_entity_norm = normalize_entity(s["entity"])
|
||||
s_page = s["page"]
|
||||
|
||||
# Look for matching Exxeta entities
|
||||
found = False
|
||||
for e in exxeta_data:
|
||||
e_entity_norm = normalize_entity(e["entity"])
|
||||
e_page = e["page"]
|
||||
|
||||
# Match if normalized entity and page match
|
||||
if (
|
||||
s["label"] == e["label"]
|
||||
and s_entity_norm == e_entity_norm
|
||||
and s_page == e_page
|
||||
):
|
||||
|
||||
merged.append(
|
||||
{
|
||||
"label": s["label"],
|
||||
"entity": s["entity"],
|
||||
"page": s_page,
|
||||
"status": "validated",
|
||||
}
|
||||
)
|
||||
seen.add((e["entity"], e_page))
|
||||
found = True
|
||||
break
|
||||
|
||||
# If no match found, add as single-source
|
||||
if not found:
|
||||
merged.append(
|
||||
{
|
||||
"label": s["label"],
|
||||
"entity": s["entity"],
|
||||
"page": s_page,
|
||||
"status": "single-source",
|
||||
"source": "spacy",
|
||||
}
|
||||
)
|
||||
|
||||
# Add remaining Exxeta entities not already processed
|
||||
for e in exxeta_data:
|
||||
if (e["entity"], e["page"]) not in seen:
|
||||
merged.append(
|
||||
{
|
||||
"label": e["label"],
|
||||
"entity": e["entity"],
|
||||
"page": e["page"],
|
||||
"status": "single-source",
|
||||
"source": "exxeta",
|
||||
}
|
||||
)
|
||||
|
||||
return merged
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
blinker==1.9.0
|
||||
certifi==2025.4.26
|
||||
charset-normalizer==3.4.2
|
||||
click==8.2.1
|
||||
dotenv==0.9.9
|
||||
Flask==3.1.1
|
||||
idna==3.10
|
||||
itsdangerous==2.2.0
|
||||
Jinja2==3.1.6
|
||||
MarkupSafe==3.0.2
|
||||
python-dotenv==1.1.0
|
||||
requests==2.32.3
|
||||
urllib3==2.4.0
|
||||
Werkzeug==3.1.3
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
def validate_entities(entities):
|
||||
return entities
|
||||
#todo
|
||||
valid = []
|
||||
for entity in entities:
|
||||
if entity["type"] == "PERSON":
|
||||
if entity["name"] == "John Doe":
|
||||
valid.append(entity)
|
||||
elif entity["type"] == "ORG":
|
||||
if entity["name"] == "Exxeta":
|
||||
valid.append(entity)
|
||||
return valid
|
||||
|
|
@ -30,7 +30,7 @@ services:
|
|||
timeout: 5s
|
||||
retries: 5
|
||||
ports:
|
||||
- 5000:5000
|
||||
- 5050:5000
|
||||
|
||||
spacy:
|
||||
build:
|
||||
|
|
@ -42,3 +42,11 @@ services:
|
|||
dockerfile: ../../Dockerfile
|
||||
env_file:
|
||||
- .env
|
||||
validate:
|
||||
build:
|
||||
context: backend/validate-service
|
||||
dockerfile: ../../Dockerfile
|
||||
env_file:
|
||||
- .env
|
||||
ports:
|
||||
- 5051:5000
|
||||
|
|
|
|||
Loading…
Reference in New Issue