diff --git a/project/.env.template b/project/.env.template index 7be0ac1..e7fe96c 100644 --- a/project/.env.template +++ b/project/.env.template @@ -2,3 +2,4 @@ API_KEY= DATABASE_URL=postgresql://admin:admin@db:5432 POSTGRES_PASSWORD=admin POSTGRES_USER=admin +COORDINATOR_URL="coordinator:5000" diff --git a/project/Dockerfile b/project/Dockerfile index 979c701..a1af038 100644 --- a/project/Dockerfile +++ b/project/Dockerfile @@ -18,4 +18,4 @@ COPY . . ENV PYTHONUNBUFFERED=1 EXPOSE 5000 -CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"] +CMD ["gunicorn","--timeout", "10","--workers", "2", "--bind", "0.0.0.0:5000", "app:app"] diff --git a/project/backend/validate-service/.env.template b/project/backend/validate-service/.env.template new file mode 100644 index 0000000..94dd768 --- /dev/null +++ b/project/backend/validate-service/.env.template @@ -0,0 +1 @@ +COORDINATOR_URL="" diff --git a/project/backend/validate-service/README.md b/project/backend/validate-service/README.md new file mode 100644 index 0000000..f4adf62 --- /dev/null +++ b/project/backend/validate-service/README.md @@ -0,0 +1,97 @@ +# Validate Service + +Ein Flask-basierter Microservice zur asynchronen Verarbeitung und Validierung von Entitäten aus zwei verschiedenen NLP-Services (SpaCy und Exxeta). + +## Funktionsweise + +Das Service empfängt für jede eindeutige ID zwei POST-Requests von verschiedenen Services: +1. **SpaCy Service** - sendet extrahierte Entitäten +2. **Exxeta Service** - sendet extrahierte Entitäten + +Beim ersten Request werden die Daten zwischengespeichert. Beim zweiten Request startet die asynchrone Verarbeitung. Nach der Verarbeitung werden die Ergebnisse an einen nachgelagerten Service weitergeleitet. + +## API Endpoints + +### POST /validate + +Empfängt Entitätsdaten von SpaCy oder Exxeta Services. + +**Request Body:** +```json +{ + "id": "pitch_book_id", + "service": "spacy|exxeta", + "entities": [ + { + "label": "PERSON", + "entity": "Max Mustermann", + "page": 1 + } + ] +} +``` + +**Response:** +- **200**: Daten erfolgreich verarbeitet +- **400**: Fehlende oder ungültige Parameter +- **500**: Serverfehler + +## Installation und Start + +1. **Abhängigkeiten installieren:** +```bash +pip install -r requirements.txt +``` + +2. **Service starten:** +```bash +python app.py +``` + +Das Service läuft standardmäßig auf `http://localhost:5050` + +## Konfiguration + +Umgebungsvariablen in `config.py`: + +- `COORDINATOR_URL`: URL des Koordinators + +## Verarbeitungslogik + +1. **Zwischenspeicherung**: Beim ersten Request wird das JSON in einem Thread-sicheren Dictionary gespeichert +2. **Trigger**: Beim zweiten Request wird die asynchrone Verarbeitung gestartet +3. **Merge & Validate**: Die `merge_and_validate_entities` Funktion führt die Validierung durch: + - Normalisiert Entitäten (entfernt Zeilenumbrüche, konvertiert zu lowercase) + - Matched Entitäten basierend auf Label, normalisiertem Text und Seitenzahl + - Kennzeichnet Entitäten als "validated" (beide Services) oder "single-source" +4. **Weiterleitung**: Ergebnisse werden an den nächsten Service gesendet +5. **Cleanup**: Verarbeitete Daten werden aus dem Speicher entfernt + +## Architektur + +``` +┌─────────────────┐ ┌─────────────────┐ +│ SpaCy Service │ │ Exxeta Service │ +└─────────┬───────┘ └─────────┬───────┘ + │ │ + │ POST /validate │ POST /validate + │ (service_type:spacy) │ (service_type:exxeta) + ▼ ▼ + ┌─────────────────────────────────────┐ + │ Validate Service │ + │ ┌─────────────────────────────┐ │ + │ │ Zwischenspeicher │ │ + │ │ (Thread-safe Dictionary) │ │ + │ └─────────────────────────────┘ │ + │ ┌─────────────────────────────┐ │ + │ │ Asynchrone Verarbeitung │ │ + │ │ (merge_and_validate_entities)│ │ + │ └─────────────────────────────┘ │ + └─────────────┬───────────────────────┘ + │ + │ POST (processed data) + ▼ + ┌─────────────────────────────┐ + │ Nachgelagerter Service │ + └─────────────────────────────┘ +``` diff --git a/project/backend/validate-service/app.py b/project/backend/validate-service/app.py new file mode 100644 index 0000000..6bd5e45 --- /dev/null +++ b/project/backend/validate-service/app.py @@ -0,0 +1,130 @@ +from flask import Flask, request, jsonify +import threading +from merge_logic import merge_entities +from validate_logic import validate_entities +from dotenv import load_dotenv +import os +import requests +import json + +app = Flask(__name__) + +load_dotenv() +coordinator_url = os.getenv("COORDINATOR_URL") or "" + +# todo add persistence layer +data_storage = {} # {id: {spacy_data: [], exxeta_data: []}} + +storage_lock = threading.Lock() + + +def send_to_coordinator_service(processed_data, request_id): + if not coordinator_url: + print("Not processed, missing url", processed_data) + return + + try: + payload = { + "kpi": json.dumps(processed_data), + } + requests.put( + "http://" + coordinator_url + "/api/pitch_book/" + str(request_id), + data=payload, + ) + print(f"Result PitchBook {request_id} sent to coordinator") + + except Exception as e: + print(f"Error sending ID {request_id}: {e}") + + +def process_data_async(request_id, spacy_data, exxeta_data): + try: + print(f"Start asynchronous processing for PitchBook: {request_id}") + + # Perform merge + merged_entities = merge_entities(spacy_data, exxeta_data) + valid_entities = validate_entities(merged_entities) + + # Send result to next service + send_to_coordinator_service(valid_entities, request_id) + + # Remove processed data from storage + with storage_lock: + if request_id in data_storage: + del data_storage[request_id] + + except Exception as e: + print(f"Error during asynchronous processing for ID {request_id}: {e}") + + +@app.route("/validate", methods=["POST"]) +def validate(): + try: + json_data = request.get_json() + + if not json_data: + return jsonify({"error": "Missing JSON data"}), 400 + + # extract ID and service_type from the data + request_id = json_data.get("id") + service_type = json_data.get("service") # 'spacy' or 'exxeta' + entities = json_data.get("entities", []) + + if not request_id or not service_type: + return jsonify({"error": "ID and service_type are required"}), 400 + + if service_type not in ["spacy", "exxeta"]: + return jsonify({"error": "service_type has to be 'spacy' or 'exxeta'"}), 400 + + with storage_lock: + # Initialize entry if not already present + if request_id not in data_storage: + data_storage[request_id] = { + "spacy_data": None, + "exxeta_data": None, + } + + # Store the data based on the service type + data_storage[request_id][f"{service_type}_data"] = entities + + # Check if both datasets are present + stored_data = data_storage[request_id] + spacy_data = stored_data["spacy_data"] + exxeta_data = stored_data["exxeta_data"] + + # If both datasets are present, start asynchronous processing + if spacy_data is not None and exxeta_data is not None: + + # Start asynchronous processing in a separate thread + processing_thread = threading.Thread( + target=process_data_async, + args=(request_id, spacy_data, exxeta_data), + daemon=True, + ) + processing_thread.start() + + return ( + jsonify( + { + "message": f"Second dataset for ID {request_id} received. Processing started.", + } + ), + 200, + ) + else: + return ( + jsonify( + { + "message": f"First dataset for ID {request_id} from {service_type} stored. Waiting for second dataset.", + } + ), + 200, + ) + + except Exception as e: + print(f"Error occurred: {str(e)}") + return jsonify({"error": f"Fehler: {str(e)}"}), 500 + + +if __name__ == "__main__": + app.run(debug=True, host="0.0.0.0", port=5050) diff --git a/project/backend/validate-service/merge_logic.py b/project/backend/validate-service/merge_logic.py new file mode 100644 index 0000000..1bc404c --- /dev/null +++ b/project/backend/validate-service/merge_logic.py @@ -0,0 +1,68 @@ +def normalize_entity(entity_str): + if not entity_str: + return "" + normalized = entity_str.replace("\n", " ") + normalized = "".join(normalized.lower().split()) + return normalized + + +def merge_entities(spacy_data, exxeta_data): + merged = [] + seen = set() + + # Process SpaCy entities first + for s in spacy_data: + s_entity_norm = normalize_entity(s["entity"]) + s_page = s["page"] + + # Look for matching Exxeta entities + found = False + for e in exxeta_data: + e_entity_norm = normalize_entity(e["entity"]) + e_page = e["page"] + + # Match if normalized entity and page match + if ( + s["label"] == e["label"] + and s_entity_norm == e_entity_norm + and s_page == e_page + ): + + merged.append( + { + "label": s["label"], + "entity": s["entity"], + "page": s_page, + "status": "validated", + } + ) + seen.add((e["entity"], e_page)) + found = True + break + + # If no match found, add as single-source + if not found: + merged.append( + { + "label": s["label"], + "entity": s["entity"], + "page": s_page, + "status": "single-source", + "source": "spacy", + } + ) + + # Add remaining Exxeta entities not already processed + for e in exxeta_data: + if (e["entity"], e["page"]) not in seen: + merged.append( + { + "label": e["label"], + "entity": e["entity"], + "page": e["page"], + "status": "single-source", + "source": "exxeta", + } + ) + + return merged diff --git a/project/backend/validate-service/requirements.txt b/project/backend/validate-service/requirements.txt new file mode 100644 index 0000000..c637b6e --- /dev/null +++ b/project/backend/validate-service/requirements.txt @@ -0,0 +1,14 @@ +blinker==1.9.0 +certifi==2025.4.26 +charset-normalizer==3.4.2 +click==8.2.1 +dotenv==0.9.9 +Flask==3.1.1 +idna==3.10 +itsdangerous==2.2.0 +Jinja2==3.1.6 +MarkupSafe==3.0.2 +python-dotenv==1.1.0 +requests==2.32.3 +urllib3==2.4.0 +Werkzeug==3.1.3 diff --git a/project/backend/validate-service/validate_logic.py b/project/backend/validate-service/validate_logic.py new file mode 100644 index 0000000..4685790 --- /dev/null +++ b/project/backend/validate-service/validate_logic.py @@ -0,0 +1,12 @@ +def validate_entities(entities): + return entities + #todo + valid = [] + for entity in entities: + if entity["type"] == "PERSON": + if entity["name"] == "John Doe": + valid.append(entity) + elif entity["type"] == "ORG": + if entity["name"] == "Exxeta": + valid.append(entity) + return valid diff --git a/project/docker-compose.yml b/project/docker-compose.yml index e4877ba..5615026 100644 --- a/project/docker-compose.yml +++ b/project/docker-compose.yml @@ -30,7 +30,7 @@ services: timeout: 5s retries: 5 ports: - - 5000:5000 + - 5050:5000 spacy: build: @@ -42,3 +42,11 @@ services: dockerfile: ../../Dockerfile env_file: - .env + validate: + build: + context: backend/validate-service + dockerfile: ../../Dockerfile + env_file: + - .env + ports: + - 5051:5000