pse2_ff/project/backend/validate-service/app.py

131 lines
4.2 KiB
Python

from flask import Flask, request, jsonify
import threading
from merge_logic import merge_entities
from validate_logic import validate_entities
from dotenv import load_dotenv
import os
import requests
import json
app = Flask(__name__)
load_dotenv()
coordinator_url = os.getenv("COORDINATOR_URL", "localhost:5000")
# todo add persistence layer
data_storage = {} # {id: {spacy_data: [], exxeta_data: []}}
storage_lock = threading.Lock()
def send_to_coordinator_service(processed_data, request_id):
if not coordinator_url:
print("Not processed, missing url", processed_data)
return
try:
payload = {
"kpi": json.dumps(processed_data),
}
requests.put(
"http://" + coordinator_url + "/api/pitch_book/" + str(request_id),
data=payload,
)
print(f"Result PitchBook {request_id} sent to coordinator")
print(f"Result Payload {payload}")
except Exception as e:
print(f"Error sending ID {request_id}: {e}")
def process_data_async(request_id, spacy_data, exxeta_data):
try:
print(f"Start asynchronous processing for PitchBook: {request_id}")
# Perform merge
merged_entities = merge_entities(spacy_data, exxeta_data)
valid_entities = validate_entities(merged_entities)
# Send result to next service
send_to_coordinator_service(valid_entities, request_id)
# Remove processed data from storage
with storage_lock:
if request_id in data_storage:
del data_storage[request_id]
except Exception as e:
print(f"Error during asynchronous processing for ID {request_id}: {e}")
@app.route("/validate", methods=["POST"])
def validate():
try:
json_data = request.get_json()
if not json_data:
return jsonify({"error": "Missing JSON data"}), 400
# extract ID and service_type from the data
request_id = json_data.get("id")
service_type = json_data.get("service") # 'spacy' or 'exxeta'
entities = json_data.get("entities", [])
if not request_id or not service_type:
return jsonify({"error": "ID and service_type are required"}), 400
if service_type not in ["spacy", "exxeta"]:
return jsonify({"error": "service_type has to be 'spacy' or 'exxeta'"}), 400
with storage_lock:
# Initialize entry if not already present
if request_id not in data_storage:
data_storage[request_id] = {
"spacy_data": None,
"exxeta_data": None,
}
# Store the data based on the service type
data_storage[request_id][f"{service_type}_data"] = entities
# Check if both datasets are present
stored_data = data_storage[request_id]
spacy_data = stored_data["spacy_data"]
exxeta_data = stored_data["exxeta_data"]
# If both datasets are present, start asynchronous processing
if spacy_data is not None and exxeta_data is not None:
# Start asynchronous processing in a separate thread
processing_thread = threading.Thread(
target=process_data_async,
args=(request_id, spacy_data, exxeta_data),
daemon=True,
)
processing_thread.start()
return (
jsonify(
{
"message": f"Second dataset for ID {request_id} received. Processing started.",
}
),
200,
)
else:
return (
jsonify(
{
"message": f"First dataset for ID {request_id} from {service_type} stored. Waiting for second dataset.",
}
),
200,
)
except Exception as e:
print(f"Error occurred: {str(e)}")
return jsonify({"error": f"Fehler: {str(e)}"}), 500
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5054)