from flask import Flask, request, jsonify import threading from merge_logic import merge_entities from validate_logic import validate_entities from dotenv import load_dotenv import os import requests import json app = Flask(__name__) load_dotenv() COORDINATOR_URL = os.getenv("COORDINATOR_URL", "http://localhost:5000") # todo add persistence layer data_storage = {} # {id: {spacy_data: [], exxeta_data: []}} storage_lock = threading.Lock() def send_to_coordinator_service(processed_data, request_id): if not COORDINATOR_URL: print("Not processed, missing url", processed_data) return try: payload = { "kpi": json.dumps(processed_data), } requests.put( COORDINATOR_URL + "/api/pitch_book/" + str(request_id), data=payload, ) print(f"Result PitchBook {request_id} sent to coordinator") print(f"Result Payload {payload}") except Exception as e: print(f"Error sending ID {request_id}: {e}") def process_data_async(request_id, spacy_data, exxeta_data): try: requests.post(COORDINATOR_URL + "/api/progress", json={"id": request_id, "progress": 95}) print(f"Start asynchronous processing for PitchBook: {request_id}") # Perform merge merged_entities = merge_entities(spacy_data, exxeta_data) valid_entities = validate_entities(merged_entities) # Send result to next service send_to_coordinator_service(valid_entities, request_id) # Remove processed data from storage with storage_lock: if request_id in data_storage: del data_storage[request_id] except Exception as e: print(f"Error during asynchronous processing for ID {request_id}: {e}") @app.route("/validate", methods=["POST"]) def validate(): try: json_data = request.get_json() if not json_data: return jsonify({"error": "Missing JSON data"}), 400 # extract ID and service_type from the data request_id = json_data.get("id") service_type = json_data.get("service") # 'spacy' or 'exxeta' entities = json_data.get("entities", []) if not request_id or not service_type: return jsonify({"error": "ID and service_type are required"}), 400 if service_type not in ["spacy", "exxeta"]: return jsonify({"error": "service_type has to be 'spacy' or 'exxeta'"}), 400 with storage_lock: # Initialize entry if not already present if request_id not in data_storage: data_storage[request_id] = { "spacy_data": None, "exxeta_data": None, } # Store the data based on the service type data_storage[request_id][f"{service_type}_data"] = entities # Check if both datasets are present stored_data = data_storage[request_id] spacy_data = stored_data["spacy_data"] exxeta_data = stored_data["exxeta_data"] # If both datasets are present, start asynchronous processing if spacy_data is not None and exxeta_data is not None: # Start asynchronous processing in a separate thread processing_thread = threading.Thread( target=process_data_async, args=(request_id, spacy_data, exxeta_data), daemon=True, ) processing_thread.start() return ( jsonify( { "message": f"Second dataset for ID {request_id} received. Processing started.", } ), 200, ) else: return ( jsonify( { "message": f"First dataset for ID {request_id} from {service_type} stored. Waiting for second dataset.", } ), 200, ) except Exception as e: print(f"Error occurred: {str(e)}") return jsonify({"error": f"Fehler: {str(e)}"}), 500 if __name__ == "__main__": app.run(debug=True, host="0.0.0.0", port=5054)