Compare commits
No commits in common. "7180db773ea800ef0e1069b23efe6f6da6969dc7" and "ba191dd0a647f3e78ef1d034ebd254ace61234e7" have entirely different histories.
7180db773e
...
ba191dd0a6
|
|
@ -2,4 +2,3 @@ API_KEY=
|
||||||
DATABASE_URL=postgresql://admin:admin@db:5432
|
DATABASE_URL=postgresql://admin:admin@db:5432
|
||||||
POSTGRES_PASSWORD=admin
|
POSTGRES_PASSWORD=admin
|
||||||
POSTGRES_USER=admin
|
POSTGRES_USER=admin
|
||||||
COORDINATOR_URL="coordinator:5000"
|
|
||||||
|
|
|
||||||
|
|
@ -18,4 +18,4 @@ COPY . .
|
||||||
ENV PYTHONUNBUFFERED=1
|
ENV PYTHONUNBUFFERED=1
|
||||||
EXPOSE 5000
|
EXPOSE 5000
|
||||||
|
|
||||||
CMD ["gunicorn","--timeout", "10","--workers", "2", "--bind", "0.0.0.0:5000", "app:app"]
|
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]
|
||||||
|
|
|
||||||
|
|
@ -1 +0,0 @@
|
||||||
COORDINATOR_URL=""
|
|
||||||
|
|
@ -1,97 +0,0 @@
|
||||||
# Validate Service
|
|
||||||
|
|
||||||
Ein Flask-basierter Microservice zur asynchronen Verarbeitung und Validierung von Entitäten aus zwei verschiedenen NLP-Services (SpaCy und Exxeta).
|
|
||||||
|
|
||||||
## Funktionsweise
|
|
||||||
|
|
||||||
Das Service empfängt für jede eindeutige ID zwei POST-Requests von verschiedenen Services:
|
|
||||||
1. **SpaCy Service** - sendet extrahierte Entitäten
|
|
||||||
2. **Exxeta Service** - sendet extrahierte Entitäten
|
|
||||||
|
|
||||||
Beim ersten Request werden die Daten zwischengespeichert. Beim zweiten Request startet die asynchrone Verarbeitung. Nach der Verarbeitung werden die Ergebnisse an einen nachgelagerten Service weitergeleitet.
|
|
||||||
|
|
||||||
## API Endpoints
|
|
||||||
|
|
||||||
### POST /validate
|
|
||||||
|
|
||||||
Empfängt Entitätsdaten von SpaCy oder Exxeta Services.
|
|
||||||
|
|
||||||
**Request Body:**
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"id": "pitch_book_id",
|
|
||||||
"service": "spacy|exxeta",
|
|
||||||
"entities": [
|
|
||||||
{
|
|
||||||
"label": "PERSON",
|
|
||||||
"entity": "Max Mustermann",
|
|
||||||
"page": 1
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
**Response:**
|
|
||||||
- **200**: Daten erfolgreich verarbeitet
|
|
||||||
- **400**: Fehlende oder ungültige Parameter
|
|
||||||
- **500**: Serverfehler
|
|
||||||
|
|
||||||
## Installation und Start
|
|
||||||
|
|
||||||
1. **Abhängigkeiten installieren:**
|
|
||||||
```bash
|
|
||||||
pip install -r requirements.txt
|
|
||||||
```
|
|
||||||
|
|
||||||
2. **Service starten:**
|
|
||||||
```bash
|
|
||||||
python app.py
|
|
||||||
```
|
|
||||||
|
|
||||||
Das Service läuft standardmäßig auf `http://localhost:5050`
|
|
||||||
|
|
||||||
## Konfiguration
|
|
||||||
|
|
||||||
Umgebungsvariablen in `config.py`:
|
|
||||||
|
|
||||||
- `COORDINATOR_URL`: URL des Koordinators
|
|
||||||
|
|
||||||
## Verarbeitungslogik
|
|
||||||
|
|
||||||
1. **Zwischenspeicherung**: Beim ersten Request wird das JSON in einem Thread-sicheren Dictionary gespeichert
|
|
||||||
2. **Trigger**: Beim zweiten Request wird die asynchrone Verarbeitung gestartet
|
|
||||||
3. **Merge & Validate**: Die `merge_and_validate_entities` Funktion führt die Validierung durch:
|
|
||||||
- Normalisiert Entitäten (entfernt Zeilenumbrüche, konvertiert zu lowercase)
|
|
||||||
- Matched Entitäten basierend auf Label, normalisiertem Text und Seitenzahl
|
|
||||||
- Kennzeichnet Entitäten als "validated" (beide Services) oder "single-source"
|
|
||||||
4. **Weiterleitung**: Ergebnisse werden an den nächsten Service gesendet
|
|
||||||
5. **Cleanup**: Verarbeitete Daten werden aus dem Speicher entfernt
|
|
||||||
|
|
||||||
## Architektur
|
|
||||||
|
|
||||||
```
|
|
||||||
┌─────────────────┐ ┌─────────────────┐
|
|
||||||
│ SpaCy Service │ │ Exxeta Service │
|
|
||||||
└─────────┬───────┘ └─────────┬───────┘
|
|
||||||
│ │
|
|
||||||
│ POST /validate │ POST /validate
|
|
||||||
│ (service_type:spacy) │ (service_type:exxeta)
|
|
||||||
▼ ▼
|
|
||||||
┌─────────────────────────────────────┐
|
|
||||||
│ Validate Service │
|
|
||||||
│ ┌─────────────────────────────┐ │
|
|
||||||
│ │ Zwischenspeicher │ │
|
|
||||||
│ │ (Thread-safe Dictionary) │ │
|
|
||||||
│ └─────────────────────────────┘ │
|
|
||||||
│ ┌─────────────────────────────┐ │
|
|
||||||
│ │ Asynchrone Verarbeitung │ │
|
|
||||||
│ │ (merge_and_validate_entities)│ │
|
|
||||||
│ └─────────────────────────────┘ │
|
|
||||||
└─────────────┬───────────────────────┘
|
|
||||||
│
|
|
||||||
│ POST (processed data)
|
|
||||||
▼
|
|
||||||
┌─────────────────────────────┐
|
|
||||||
│ Nachgelagerter Service │
|
|
||||||
└─────────────────────────────┘
|
|
||||||
```
|
|
||||||
|
|
@ -1,130 +0,0 @@
|
||||||
from flask import Flask, request, jsonify
|
|
||||||
import threading
|
|
||||||
from merge_logic import merge_entities
|
|
||||||
from validate_logic import validate_entities
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
import os
|
|
||||||
import requests
|
|
||||||
import json
|
|
||||||
|
|
||||||
app = Flask(__name__)
|
|
||||||
|
|
||||||
load_dotenv()
|
|
||||||
coordinator_url = os.getenv("COORDINATOR_URL") or ""
|
|
||||||
|
|
||||||
# todo add persistence layer
|
|
||||||
data_storage = {} # {id: {spacy_data: [], exxeta_data: []}}
|
|
||||||
|
|
||||||
storage_lock = threading.Lock()
|
|
||||||
|
|
||||||
|
|
||||||
def send_to_coordinator_service(processed_data, request_id):
|
|
||||||
if not coordinator_url:
|
|
||||||
print("Not processed, missing url", processed_data)
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
payload = {
|
|
||||||
"kpi": json.dumps(processed_data),
|
|
||||||
}
|
|
||||||
requests.put(
|
|
||||||
"http://" + coordinator_url + "/api/pitch_book/" + str(request_id),
|
|
||||||
data=payload,
|
|
||||||
)
|
|
||||||
print(f"Result PitchBook {request_id} sent to coordinator")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error sending ID {request_id}: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
def process_data_async(request_id, spacy_data, exxeta_data):
|
|
||||||
try:
|
|
||||||
print(f"Start asynchronous processing for PitchBook: {request_id}")
|
|
||||||
|
|
||||||
# Perform merge
|
|
||||||
merged_entities = merge_entities(spacy_data, exxeta_data)
|
|
||||||
valid_entities = validate_entities(merged_entities)
|
|
||||||
|
|
||||||
# Send result to next service
|
|
||||||
send_to_coordinator_service(valid_entities, request_id)
|
|
||||||
|
|
||||||
# Remove processed data from storage
|
|
||||||
with storage_lock:
|
|
||||||
if request_id in data_storage:
|
|
||||||
del data_storage[request_id]
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error during asynchronous processing for ID {request_id}: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
@app.route("/validate", methods=["POST"])
|
|
||||||
def validate():
|
|
||||||
try:
|
|
||||||
json_data = request.get_json()
|
|
||||||
|
|
||||||
if not json_data:
|
|
||||||
return jsonify({"error": "Missing JSON data"}), 400
|
|
||||||
|
|
||||||
# extract ID and service_type from the data
|
|
||||||
request_id = json_data.get("id")
|
|
||||||
service_type = json_data.get("service") # 'spacy' or 'exxeta'
|
|
||||||
entities = json_data.get("entities", [])
|
|
||||||
|
|
||||||
if not request_id or not service_type:
|
|
||||||
return jsonify({"error": "ID and service_type are required"}), 400
|
|
||||||
|
|
||||||
if service_type not in ["spacy", "exxeta"]:
|
|
||||||
return jsonify({"error": "service_type has to be 'spacy' or 'exxeta'"}), 400
|
|
||||||
|
|
||||||
with storage_lock:
|
|
||||||
# Initialize entry if not already present
|
|
||||||
if request_id not in data_storage:
|
|
||||||
data_storage[request_id] = {
|
|
||||||
"spacy_data": None,
|
|
||||||
"exxeta_data": None,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Store the data based on the service type
|
|
||||||
data_storage[request_id][f"{service_type}_data"] = entities
|
|
||||||
|
|
||||||
# Check if both datasets are present
|
|
||||||
stored_data = data_storage[request_id]
|
|
||||||
spacy_data = stored_data["spacy_data"]
|
|
||||||
exxeta_data = stored_data["exxeta_data"]
|
|
||||||
|
|
||||||
# If both datasets are present, start asynchronous processing
|
|
||||||
if spacy_data is not None and exxeta_data is not None:
|
|
||||||
|
|
||||||
# Start asynchronous processing in a separate thread
|
|
||||||
processing_thread = threading.Thread(
|
|
||||||
target=process_data_async,
|
|
||||||
args=(request_id, spacy_data, exxeta_data),
|
|
||||||
daemon=True,
|
|
||||||
)
|
|
||||||
processing_thread.start()
|
|
||||||
|
|
||||||
return (
|
|
||||||
jsonify(
|
|
||||||
{
|
|
||||||
"message": f"Second dataset for ID {request_id} received. Processing started.",
|
|
||||||
}
|
|
||||||
),
|
|
||||||
200,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
return (
|
|
||||||
jsonify(
|
|
||||||
{
|
|
||||||
"message": f"First dataset for ID {request_id} from {service_type} stored. Waiting for second dataset.",
|
|
||||||
}
|
|
||||||
),
|
|
||||||
200,
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error occurred: {str(e)}")
|
|
||||||
return jsonify({"error": f"Fehler: {str(e)}"}), 500
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
app.run(debug=True, host="0.0.0.0", port=5050)
|
|
||||||
|
|
@ -1,68 +0,0 @@
|
||||||
def normalize_entity(entity_str):
|
|
||||||
if not entity_str:
|
|
||||||
return ""
|
|
||||||
normalized = entity_str.replace("\n", " ")
|
|
||||||
normalized = "".join(normalized.lower().split())
|
|
||||||
return normalized
|
|
||||||
|
|
||||||
|
|
||||||
def merge_entities(spacy_data, exxeta_data):
|
|
||||||
merged = []
|
|
||||||
seen = set()
|
|
||||||
|
|
||||||
# Process SpaCy entities first
|
|
||||||
for s in spacy_data:
|
|
||||||
s_entity_norm = normalize_entity(s["entity"])
|
|
||||||
s_page = s["page"]
|
|
||||||
|
|
||||||
# Look for matching Exxeta entities
|
|
||||||
found = False
|
|
||||||
for e in exxeta_data:
|
|
||||||
e_entity_norm = normalize_entity(e["entity"])
|
|
||||||
e_page = e["page"]
|
|
||||||
|
|
||||||
# Match if normalized entity and page match
|
|
||||||
if (
|
|
||||||
s["label"] == e["label"]
|
|
||||||
and s_entity_norm == e_entity_norm
|
|
||||||
and s_page == e_page
|
|
||||||
):
|
|
||||||
|
|
||||||
merged.append(
|
|
||||||
{
|
|
||||||
"label": s["label"],
|
|
||||||
"entity": s["entity"],
|
|
||||||
"page": s_page,
|
|
||||||
"status": "validated",
|
|
||||||
}
|
|
||||||
)
|
|
||||||
seen.add((e["entity"], e_page))
|
|
||||||
found = True
|
|
||||||
break
|
|
||||||
|
|
||||||
# If no match found, add as single-source
|
|
||||||
if not found:
|
|
||||||
merged.append(
|
|
||||||
{
|
|
||||||
"label": s["label"],
|
|
||||||
"entity": s["entity"],
|
|
||||||
"page": s_page,
|
|
||||||
"status": "single-source",
|
|
||||||
"source": "spacy",
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Add remaining Exxeta entities not already processed
|
|
||||||
for e in exxeta_data:
|
|
||||||
if (e["entity"], e["page"]) not in seen:
|
|
||||||
merged.append(
|
|
||||||
{
|
|
||||||
"label": e["label"],
|
|
||||||
"entity": e["entity"],
|
|
||||||
"page": e["page"],
|
|
||||||
"status": "single-source",
|
|
||||||
"source": "exxeta",
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
return merged
|
|
||||||
|
|
@ -1,14 +0,0 @@
|
||||||
blinker==1.9.0
|
|
||||||
certifi==2025.4.26
|
|
||||||
charset-normalizer==3.4.2
|
|
||||||
click==8.2.1
|
|
||||||
dotenv==0.9.9
|
|
||||||
Flask==3.1.1
|
|
||||||
idna==3.10
|
|
||||||
itsdangerous==2.2.0
|
|
||||||
Jinja2==3.1.6
|
|
||||||
MarkupSafe==3.0.2
|
|
||||||
python-dotenv==1.1.0
|
|
||||||
requests==2.32.3
|
|
||||||
urllib3==2.4.0
|
|
||||||
Werkzeug==3.1.3
|
|
||||||
|
|
@ -1,12 +0,0 @@
|
||||||
def validate_entities(entities):
|
|
||||||
return entities
|
|
||||||
#todo
|
|
||||||
valid = []
|
|
||||||
for entity in entities:
|
|
||||||
if entity["type"] == "PERSON":
|
|
||||||
if entity["name"] == "John Doe":
|
|
||||||
valid.append(entity)
|
|
||||||
elif entity["type"] == "ORG":
|
|
||||||
if entity["name"] == "Exxeta":
|
|
||||||
valid.append(entity)
|
|
||||||
return valid
|
|
||||||
|
|
@ -30,7 +30,7 @@ services:
|
||||||
timeout: 5s
|
timeout: 5s
|
||||||
retries: 5
|
retries: 5
|
||||||
ports:
|
ports:
|
||||||
- 5050:5000
|
- 5000:5000
|
||||||
|
|
||||||
spacy:
|
spacy:
|
||||||
build:
|
build:
|
||||||
|
|
@ -42,11 +42,3 @@ services:
|
||||||
dockerfile: ../../Dockerfile
|
dockerfile: ../../Dockerfile
|
||||||
env_file:
|
env_file:
|
||||||
- .env
|
- .env
|
||||||
validate:
|
|
||||||
build:
|
|
||||||
context: backend/validate-service
|
|
||||||
dockerfile: ../../Dockerfile
|
|
||||||
env_file:
|
|
||||||
- .env
|
|
||||||
ports:
|
|
||||||
- 5051:5000
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue