From df5ac605c23993d79bc9f0401a51485ed282ed9f Mon Sep 17 00:00:00 2001 From: Jaronim Pracht Date: Fri, 30 May 2025 13:44:13 +0200 Subject: [PATCH 1/2] Add validate service with entity merging and validation Implements a Flask microservice that receives entities from SpaCy and Exxeta services, merges them based on normalized text matching, and forwards validated results to coordinator. Also updates gunicorn configuration with timeout and worker settings. --- project/.env.template | 1 + project/Dockerfile | 2 +- .../backend/validate-service/.env.template | 1 + project/backend/validate-service/README.md | 97 +++++++++++++ project/backend/validate-service/app.py | 130 ++++++++++++++++++ .../backend/validate-service/merge_logic.py | 68 +++++++++ .../backend/validate-service/requirements.txt | 14 ++ .../validate-service/validate_logic.py | 12 ++ project/docker-compose.yml | 10 +- 9 files changed, 333 insertions(+), 2 deletions(-) create mode 100644 project/backend/validate-service/.env.template create mode 100644 project/backend/validate-service/README.md create mode 100644 project/backend/validate-service/app.py create mode 100644 project/backend/validate-service/merge_logic.py create mode 100644 project/backend/validate-service/requirements.txt create mode 100644 project/backend/validate-service/validate_logic.py diff --git a/project/.env.template b/project/.env.template index 7be0ac1..390f001 100644 --- a/project/.env.template +++ b/project/.env.template @@ -2,3 +2,4 @@ API_KEY= DATABASE_URL=postgresql://admin:admin@db:5432 POSTGRES_PASSWORD=admin POSTGRES_USER=admin +COORDINATOR_URL="coordinator" diff --git a/project/Dockerfile b/project/Dockerfile index 979c701..a1af038 100644 --- a/project/Dockerfile +++ b/project/Dockerfile @@ -18,4 +18,4 @@ COPY . . ENV PYTHONUNBUFFERED=1 EXPOSE 5000 -CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"] +CMD ["gunicorn","--timeout", "10","--workers", "2", "--bind", "0.0.0.0:5000", "app:app"] diff --git a/project/backend/validate-service/.env.template b/project/backend/validate-service/.env.template new file mode 100644 index 0000000..94dd768 --- /dev/null +++ b/project/backend/validate-service/.env.template @@ -0,0 +1 @@ +COORDINATOR_URL="" diff --git a/project/backend/validate-service/README.md b/project/backend/validate-service/README.md new file mode 100644 index 0000000..f4adf62 --- /dev/null +++ b/project/backend/validate-service/README.md @@ -0,0 +1,97 @@ +# Validate Service + +Ein Flask-basierter Microservice zur asynchronen Verarbeitung und Validierung von Entitäten aus zwei verschiedenen NLP-Services (SpaCy und Exxeta). + +## Funktionsweise + +Das Service empfängt für jede eindeutige ID zwei POST-Requests von verschiedenen Services: +1. **SpaCy Service** - sendet extrahierte Entitäten +2. **Exxeta Service** - sendet extrahierte Entitäten + +Beim ersten Request werden die Daten zwischengespeichert. Beim zweiten Request startet die asynchrone Verarbeitung. Nach der Verarbeitung werden die Ergebnisse an einen nachgelagerten Service weitergeleitet. + +## API Endpoints + +### POST /validate + +Empfängt Entitätsdaten von SpaCy oder Exxeta Services. + +**Request Body:** +```json +{ + "id": "pitch_book_id", + "service": "spacy|exxeta", + "entities": [ + { + "label": "PERSON", + "entity": "Max Mustermann", + "page": 1 + } + ] +} +``` + +**Response:** +- **200**: Daten erfolgreich verarbeitet +- **400**: Fehlende oder ungültige Parameter +- **500**: Serverfehler + +## Installation und Start + +1. **Abhängigkeiten installieren:** +```bash +pip install -r requirements.txt +``` + +2. **Service starten:** +```bash +python app.py +``` + +Das Service läuft standardmäßig auf `http://localhost:5050` + +## Konfiguration + +Umgebungsvariablen in `config.py`: + +- `COORDINATOR_URL`: URL des Koordinators + +## Verarbeitungslogik + +1. **Zwischenspeicherung**: Beim ersten Request wird das JSON in einem Thread-sicheren Dictionary gespeichert +2. **Trigger**: Beim zweiten Request wird die asynchrone Verarbeitung gestartet +3. **Merge & Validate**: Die `merge_and_validate_entities` Funktion führt die Validierung durch: + - Normalisiert Entitäten (entfernt Zeilenumbrüche, konvertiert zu lowercase) + - Matched Entitäten basierend auf Label, normalisiertem Text und Seitenzahl + - Kennzeichnet Entitäten als "validated" (beide Services) oder "single-source" +4. **Weiterleitung**: Ergebnisse werden an den nächsten Service gesendet +5. **Cleanup**: Verarbeitete Daten werden aus dem Speicher entfernt + +## Architektur + +``` +┌─────────────────┐ ┌─────────────────┐ +│ SpaCy Service │ │ Exxeta Service │ +└─────────┬───────┘ └─────────┬───────┘ + │ │ + │ POST /validate │ POST /validate + │ (service_type:spacy) │ (service_type:exxeta) + ▼ ▼ + ┌─────────────────────────────────────┐ + │ Validate Service │ + │ ┌─────────────────────────────┐ │ + │ │ Zwischenspeicher │ │ + │ │ (Thread-safe Dictionary) │ │ + │ └─────────────────────────────┘ │ + │ ┌─────────────────────────────┐ │ + │ │ Asynchrone Verarbeitung │ │ + │ │ (merge_and_validate_entities)│ │ + │ └─────────────────────────────┘ │ + └─────────────┬───────────────────────┘ + │ + │ POST (processed data) + ▼ + ┌─────────────────────────────┐ + │ Nachgelagerter Service │ + └─────────────────────────────┘ +``` diff --git a/project/backend/validate-service/app.py b/project/backend/validate-service/app.py new file mode 100644 index 0000000..6bd5e45 --- /dev/null +++ b/project/backend/validate-service/app.py @@ -0,0 +1,130 @@ +from flask import Flask, request, jsonify +import threading +from merge_logic import merge_entities +from validate_logic import validate_entities +from dotenv import load_dotenv +import os +import requests +import json + +app = Flask(__name__) + +load_dotenv() +coordinator_url = os.getenv("COORDINATOR_URL") or "" + +# todo add persistence layer +data_storage = {} # {id: {spacy_data: [], exxeta_data: []}} + +storage_lock = threading.Lock() + + +def send_to_coordinator_service(processed_data, request_id): + if not coordinator_url: + print("Not processed, missing url", processed_data) + return + + try: + payload = { + "kpi": json.dumps(processed_data), + } + requests.put( + "http://" + coordinator_url + "/api/pitch_book/" + str(request_id), + data=payload, + ) + print(f"Result PitchBook {request_id} sent to coordinator") + + except Exception as e: + print(f"Error sending ID {request_id}: {e}") + + +def process_data_async(request_id, spacy_data, exxeta_data): + try: + print(f"Start asynchronous processing for PitchBook: {request_id}") + + # Perform merge + merged_entities = merge_entities(spacy_data, exxeta_data) + valid_entities = validate_entities(merged_entities) + + # Send result to next service + send_to_coordinator_service(valid_entities, request_id) + + # Remove processed data from storage + with storage_lock: + if request_id in data_storage: + del data_storage[request_id] + + except Exception as e: + print(f"Error during asynchronous processing for ID {request_id}: {e}") + + +@app.route("/validate", methods=["POST"]) +def validate(): + try: + json_data = request.get_json() + + if not json_data: + return jsonify({"error": "Missing JSON data"}), 400 + + # extract ID and service_type from the data + request_id = json_data.get("id") + service_type = json_data.get("service") # 'spacy' or 'exxeta' + entities = json_data.get("entities", []) + + if not request_id or not service_type: + return jsonify({"error": "ID and service_type are required"}), 400 + + if service_type not in ["spacy", "exxeta"]: + return jsonify({"error": "service_type has to be 'spacy' or 'exxeta'"}), 400 + + with storage_lock: + # Initialize entry if not already present + if request_id not in data_storage: + data_storage[request_id] = { + "spacy_data": None, + "exxeta_data": None, + } + + # Store the data based on the service type + data_storage[request_id][f"{service_type}_data"] = entities + + # Check if both datasets are present + stored_data = data_storage[request_id] + spacy_data = stored_data["spacy_data"] + exxeta_data = stored_data["exxeta_data"] + + # If both datasets are present, start asynchronous processing + if spacy_data is not None and exxeta_data is not None: + + # Start asynchronous processing in a separate thread + processing_thread = threading.Thread( + target=process_data_async, + args=(request_id, spacy_data, exxeta_data), + daemon=True, + ) + processing_thread.start() + + return ( + jsonify( + { + "message": f"Second dataset for ID {request_id} received. Processing started.", + } + ), + 200, + ) + else: + return ( + jsonify( + { + "message": f"First dataset for ID {request_id} from {service_type} stored. Waiting for second dataset.", + } + ), + 200, + ) + + except Exception as e: + print(f"Error occurred: {str(e)}") + return jsonify({"error": f"Fehler: {str(e)}"}), 500 + + +if __name__ == "__main__": + app.run(debug=True, host="0.0.0.0", port=5050) diff --git a/project/backend/validate-service/merge_logic.py b/project/backend/validate-service/merge_logic.py new file mode 100644 index 0000000..1bc404c --- /dev/null +++ b/project/backend/validate-service/merge_logic.py @@ -0,0 +1,68 @@ +def normalize_entity(entity_str): + if not entity_str: + return "" + normalized = entity_str.replace("\n", " ") + normalized = "".join(normalized.lower().split()) + return normalized + + +def merge_entities(spacy_data, exxeta_data): + merged = [] + seen = set() + + # Process SpaCy entities first + for s in spacy_data: + s_entity_norm = normalize_entity(s["entity"]) + s_page = s["page"] + + # Look for matching Exxeta entities + found = False + for e in exxeta_data: + e_entity_norm = normalize_entity(e["entity"]) + e_page = e["page"] + + # Match if normalized entity and page match + if ( + s["label"] == e["label"] + and s_entity_norm == e_entity_norm + and s_page == e_page + ): + + merged.append( + { + "label": s["label"], + "entity": s["entity"], + "page": s_page, + "status": "validated", + } + ) + seen.add((e["entity"], e_page)) + found = True + break + + # If no match found, add as single-source + if not found: + merged.append( + { + "label": s["label"], + "entity": s["entity"], + "page": s_page, + "status": "single-source", + "source": "spacy", + } + ) + + # Add remaining Exxeta entities not already processed + for e in exxeta_data: + if (e["entity"], e["page"]) not in seen: + merged.append( + { + "label": e["label"], + "entity": e["entity"], + "page": e["page"], + "status": "single-source", + "source": "exxeta", + } + ) + + return merged diff --git a/project/backend/validate-service/requirements.txt b/project/backend/validate-service/requirements.txt new file mode 100644 index 0000000..c637b6e --- /dev/null +++ b/project/backend/validate-service/requirements.txt @@ -0,0 +1,14 @@ +blinker==1.9.0 +certifi==2025.4.26 +charset-normalizer==3.4.2 +click==8.2.1 +dotenv==0.9.9 +Flask==3.1.1 +idna==3.10 +itsdangerous==2.2.0 +Jinja2==3.1.6 +MarkupSafe==3.0.2 +python-dotenv==1.1.0 +requests==2.32.3 +urllib3==2.4.0 +Werkzeug==3.1.3 diff --git a/project/backend/validate-service/validate_logic.py b/project/backend/validate-service/validate_logic.py new file mode 100644 index 0000000..4685790 --- /dev/null +++ b/project/backend/validate-service/validate_logic.py @@ -0,0 +1,12 @@ +def validate_entities(entities): + return entities + #todo + valid = [] + for entity in entities: + if entity["type"] == "PERSON": + if entity["name"] == "John Doe": + valid.append(entity) + elif entity["type"] == "ORG": + if entity["name"] == "Exxeta": + valid.append(entity) + return valid diff --git a/project/docker-compose.yml b/project/docker-compose.yml index e4877ba..5615026 100644 --- a/project/docker-compose.yml +++ b/project/docker-compose.yml @@ -30,7 +30,7 @@ services: timeout: 5s retries: 5 ports: - - 5000:5000 + - 5050:5000 spacy: build: @@ -42,3 +42,11 @@ services: dockerfile: ../../Dockerfile env_file: - .env + validate: + build: + context: backend/validate-service + dockerfile: ../../Dockerfile + env_file: + - .env + ports: + - 5051:5000 -- 2.43.0 From 420e21e8c4014eb30bb98d6536e3e1ac69194fba Mon Sep 17 00:00:00 2001 From: Zainab2604 Date: Sun, 1 Jun 2025 12:49:23 +0200 Subject: [PATCH 2/2] Add Port to COORNATOR_URL --- project/.env.template | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/.env.template b/project/.env.template index 390f001..e7fe96c 100644 --- a/project/.env.template +++ b/project/.env.template @@ -2,4 +2,4 @@ API_KEY= DATABASE_URL=postgresql://admin:admin@db:5432 POSTGRES_PASSWORD=admin POSTGRES_USER=admin -COORDINATOR_URL="coordinator" +COORDINATOR_URL="coordinator:5000" -- 2.43.0