From 594eb4ea48a3ec5f8217953220f789b3f3f3c91b Mon Sep 17 00:00:00 2001 From: Jaronim Pracht Date: Sat, 7 Jun 2025 12:40:32 +0200 Subject: [PATCH] Add: OCR sends pdf async to coordinantor Add progress tracking and storage lock for PDF processing Refactor OCR service to handle PDF processing asynchronously --- project/.env.template | 2 +- project/backend/coordinator/app.py | 2 + .../controller/pitch_book_controller.py | 100 +++++++++------ .../controller/progress_controller.py | 9 +- .../coordinator/controller/socketIO.py | 6 +- .../coordinator/model/kpi_setting_model.py | 2 +- project/backend/ocr-service/app.py | 121 ++++++++++-------- project/backend/validate-service/app.py | 6 +- project/docker-compose.yml | 4 +- 9 files changed, 151 insertions(+), 101 deletions(-) diff --git a/project/.env.template b/project/.env.template index e7fe96c..6fbee81 100644 --- a/project/.env.template +++ b/project/.env.template @@ -2,4 +2,4 @@ API_KEY= DATABASE_URL=postgresql://admin:admin@db:5432 POSTGRES_PASSWORD=admin POSTGRES_USER=admin -COORDINATOR_URL="coordinator:5000" +COORDINATOR_URL="http://coordinator:5000" diff --git a/project/backend/coordinator/app.py b/project/backend/coordinator/app.py index 4b734ff..2603f42 100644 --- a/project/backend/coordinator/app.py +++ b/project/backend/coordinator/app.py @@ -20,10 +20,12 @@ app.config["MAX_CONTENT_LENGTH"] = 100 * 1024 * 1024 # 100 MB init_db(app) register_routes(app) + @app.route("/health") def health_check(): return "OK" + # für Docker wichtig: host='0.0.0.0' if __name__ == "__main__": socketio.run(app, debug=True, host="0.0.0.0", port=5050) diff --git a/project/backend/coordinator/controller/pitch_book_controller.py b/project/backend/coordinator/controller/pitch_book_controller.py index e04e31e..8dc6776 100644 --- a/project/backend/coordinator/controller/pitch_book_controller.py +++ b/project/backend/coordinator/controller/pitch_book_controller.py @@ -13,44 +13,51 @@ from controller.socketIO import socketio pitch_book_controller = Blueprint("pitch_books", __name__, url_prefix="/api/pitch_book") OCR_SERVICE_URL = os.getenv("OCR_SERVICE_URL", "http://localhost:5051") + +progress_per_id = {} # {id: {kpi: 0, pdf: 0}} +storage_lock = threading.Lock() + def process_pdf_async(app, file_id, file_data, filename): with app.app_context(): try: file_obj = BytesIO(file_data) file_obj.name = filename - files = {'file': (filename, file_obj, 'application/pdf')} - data = {'id': file_id} + files = {"file": (filename, file_obj, "application/pdf")} + data = {"id": file_id} response = requests.post( - f"{OCR_SERVICE_URL}/ocr", - files=files, - data=data, - timeout=600 + f"{OCR_SERVICE_URL}/ocr", files=files, data=data, timeout=600 ) if response.status_code == 200: response_data = response.json() - if 'ocr_pdf' in response_data: + if "ocr_pdf" in response_data: import base64 - ocr_pdf_data = base64.b64decode(response_data['ocr_pdf']) + + ocr_pdf_data = base64.b64decode(response_data["ocr_pdf"]) file_record = PitchBookModel.query.get(file_id) if file_record: file_record.file = ocr_pdf_data db.session.commit() - print(f"[DEBUG] PDF updated in database:") - print(f"[DEBUG] - Successfully saved to database") + print("[DEBUG] PDF updated in database:") + print("[DEBUG] - Successfully saved to database") socketio.emit("progress", {"id": file_id, "progress": 50}) else: - socketio.emit("error", {"id": file_id, "message": "OCR processing failed"}) + socketio.emit( + "error", {"id": file_id, "message": "OCR processing failed"} + ) except Exception as e: import traceback + traceback.print_exc() - socketio.emit("error", {"id": file_id, "message": f"Processing failed: {str(e)}"}) + socketio.emit( + "error", {"id": file_id, "message": f"Processing failed: {str(e)}"} + ) @pitch_book_controller.route("/", methods=["POST"]) @@ -78,27 +85,33 @@ def upload_file(): db.session.add(new_file) db.session.commit() - app = current_app._get_current_object() + files = {"file": (uploaded_file.filename, file_data, "application/pdf")} + data = {"id": new_file.id} - socketio.emit("progress", {"id": new_file.id, "progress": 10}) - - processing_thread = threading.Thread( - target=process_pdf_async, - args=(app, new_file.id, file_data, fileName), - daemon=True + response = requests.post( + f"{OCR_SERVICE_URL}/ocr", files=files, data=data, timeout=600 ) - processing_thread.start() + if response.status_code == 200: + socketio.emit("progress", {"id": new_file.id, "progress": 10}) + else: + print("Failed to process file") - return jsonify({ - **new_file.to_dict(), - "status": "processing", - "message": "File uploaded successfully. Processing started." - }), 202 + return ( + jsonify( + { + **new_file.to_dict(), + "status": "processing", + "message": "File uploaded successfully. Processing started.", + } + ), + 202, + ) except Exception as e: print(e) return jsonify({"error": "Invalid file format. Only PDF files are accepted"}), 400 + @pitch_book_controller.route("/", methods=["GET"]) def get_all_files(): files = PitchBookModel.query.all() @@ -128,21 +141,32 @@ def update_file(id): if uploaded_file.filename != "": file.filename = uploaded_file.filename - # Read file data once - file_data = uploaded_file.read() - try: - if ( - uploaded_file - and puremagic.from_string(file_data, mime=True) == "application/pdf" - ): - file.file = file_data - except Exception as e: - print(e) + # Read file data once + file_data = uploaded_file.read() + try: + if ( + file_data + and puremagic.from_string(file_data, mime=True) == "application/pdf" + ): + file.file = file_data + with storage_lock: + if id in progress_per_id and "kpi" in progress_per_id[id]: + del progress_per_id[id]["kpi"] + socketio.emit("progress", {"id": id, "progress": 100}) + else: + progress_per_id[id] = {"pdf": 0} + print(f"[DEBUG] PDF updated in database {id}") + except Exception as e: + print(e) if "kpi" in request.form: - socketio.emit("progress", {"id": id, "progress": 100}) file.kpi = request.form.get("kpi") - + with storage_lock: + if id in progress_per_id and "pdf" in progress_per_id[id]: + del progress_per_id[id]["pdf"] + socketio.emit("progress", {"id": id, "progress": 100}) + else: + progress_per_id[id] = {"kpi": 0} db.session.commit() return jsonify(file.to_dict()), 200 @@ -154,4 +178,4 @@ def delete_file(id): db.session.delete(file) db.session.commit() - return jsonify({"message": f"File {id} deleted successfully"}), 200 \ No newline at end of file + return jsonify({"message": f"File {id} deleted successfully"}), 200 diff --git a/project/backend/coordinator/controller/progress_controller.py b/project/backend/coordinator/controller/progress_controller.py index 90db4df..4d523ce 100644 --- a/project/backend/coordinator/controller/progress_controller.py +++ b/project/backend/coordinator/controller/progress_controller.py @@ -1,6 +1,7 @@ from flask import Blueprint, request, jsonify from controller.socketIO import socketio + progress_controller = Blueprint("progress", __name__, url_prefix="/api/progress") @@ -8,10 +9,14 @@ progress_controller = Blueprint("progress", __name__, url_prefix="/api/progress" def progress(): data = request.get_json() - if 'id' not in data or 'progress' not in data: + if "id" not in data or "progress" not in data: return jsonify({"error": "Missing required fields. [id, progress]"}), 400 - if not isinstance(data['progress'], (int, float)) or data['progress'] < 0 or data['progress'] >= 100: + if ( + not isinstance(data["progress"], (int, float)) + or data["progress"] < 0 + or data["progress"] >= 100 + ): return jsonify({"error": "Invalid progress value"}), 400 socketio.emit("progress", {"id": data["id"], "progress": data["progress"]}) diff --git a/project/backend/coordinator/controller/socketIO.py b/project/backend/coordinator/controller/socketIO.py index 81ee5d4..287b6ee 100644 --- a/project/backend/coordinator/controller/socketIO.py +++ b/project/backend/coordinator/controller/socketIO.py @@ -1,4 +1,6 @@ from flask_socketio import SocketIO -socketio = SocketIO(cors_allowed_origins=["http://localhost:8080", "http://localhost:3000"], - transports=['polling', 'websocket'] ) +socketio = SocketIO( + cors_allowed_origins=["http://localhost:8080", "http://localhost:3000"], + transports=["polling", "websocket"], +) diff --git a/project/backend/coordinator/model/kpi_setting_model.py b/project/backend/coordinator/model/kpi_setting_model.py index f3775a0..a64f75b 100644 --- a/project/backend/coordinator/model/kpi_setting_model.py +++ b/project/backend/coordinator/model/kpi_setting_model.py @@ -13,7 +13,7 @@ class KPISettingType(Enum): class KPISettingModel(db.Model): - __tablename__ = 'kpi_setting_model' + __tablename__ = "kpi_setting_model" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(unique=True) diff --git a/project/backend/ocr-service/app.py b/project/backend/ocr-service/app.py index 96bc4f8..06a6f14 100644 --- a/project/backend/ocr-service/app.py +++ b/project/backend/ocr-service/app.py @@ -1,11 +1,12 @@ -from flask import Flask, request, jsonify +from flask import Flask, request from ocr_runner import pdf_to_json, ocr_pdf import requests import os import tempfile -import base64 from pathlib import Path import logging +import threading + # Set up logging logging.basicConfig(level=logging.INFO) @@ -15,6 +16,61 @@ app = Flask(__name__) EXXETA_URL = os.getenv("EXXETA_SERVICE_URL", "http://localhost:5053/extract") SPACY_URL = os.getenv("SPACY_SERVICE_URL", "http://localhost:5052/extract") +COORDINATOR_URL = os.getenv("COORDINATOR_URL", "http://localhost:5050") + + +def convert_pdf_async(temp_path, pitchbook_id): + try: + logger.info("Starting OCR process...") + + ocr_path = ocr_pdf(temp_path) + + if not ocr_path or not ocr_path.exists(): + temp_path.unlink() # cleanup + return {"error": "OCR processing failed - all PDFs must be OCR'd"}, 500 + + with open(ocr_path, 'rb') as ocr_file: + ocr_file.seek(0) + result = pdf_to_json(ocr_file) + + + payload = { + "id": int(pitchbook_id), + "extracted_text_per_page": result["pages"] + } + + logger.info("Sending payload to EXXETA and SPACY services") + + try: + exxeta_response = requests.post(EXXETA_URL, json=payload, timeout=600) + logger.info(f"EXXETA response: {exxeta_response.status_code}") + except Exception as e: + logger.error(f"Error calling EXXETA: {e}") + + try: + spacy_response = requests.post(SPACY_URL, json=payload, timeout=600) + logger.info(f"SPACY response: {spacy_response.status_code}") + except Exception as e: + logger.error(f"Error calling SPACY: {e}") + + files=[ + ('file',('',open(ocr_path,'rb'),'application/pdf')) + ] + headers = {} + + try: + requests.put(f"{COORDINATOR_URL}/api/pitch_book/{pitchbook_id}", files=files, timeout=600, headers=headers) + + requests.post(COORDINATOR_URL + "/api/progress", json={"id": pitchbook_id, "progress": 50}, timeout=600) + logger.info("COORDINATOR response: Progress + File updated") + except Exception as e: + logger.error(f"Error calling COORDINATOR: {e}") + + ocr_path.unlink() + temp_path.unlink() + except Exception as e: + logger.error(f"Exception in OCR processing: {str(e)}", exc_info=True) + @app.route('/ocr', methods=['POST']) def convert_extract_text_from_pdf(): @@ -29,59 +85,20 @@ def convert_extract_text_from_pdf(): if not pitchbook_id: return {"error": "No ID"}, 400 - try: - with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: - file.seek(0) - temp_file.write(file.read()) - temp_path = Path(temp_file.name) + with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: + file.seek(0) + temp_file.write(file.read()) + temp_path = Path(temp_file.name) - logger.info("Starting OCR process...") + thread = threading.Thread(target=convert_pdf_async, args=(temp_path, pitchbook_id)) + thread.start() - ocr_path = ocr_pdf(temp_path) + return { + "status": "sent", + "message": "PDF successfully OCR'd and processed" + }, 200 - if not ocr_path or not ocr_path.exists(): - temp_path.unlink() # cleanup - return {"error": "OCR processing failed - all PDFs must be OCR'd"}, 500 - - with open(ocr_path, 'rb') as ocr_file: - ocr_pdf_data = ocr_file.read() - ocr_pdf_base64 = base64.b64encode(ocr_pdf_data).decode('utf-8') - - ocr_file.seek(0) - result = pdf_to_json(ocr_file) - - ocr_path.unlink() - temp_path.unlink() - - payload = { - "id": int(pitchbook_id), - "extracted_text_per_page": result["pages"] - } - - logger.info(f"Sending payload to EXXETA and SPACY services") - - try: - exxeta_response = requests.post(EXXETA_URL, json=payload, timeout=600) - logger.info(f"EXXETA response: {exxeta_response.status_code}") - except Exception as e: - logger.error(f"Error calling EXXETA: {e}") - - try: - spacy_response = requests.post(SPACY_URL, json=payload, timeout=600) - logger.info(f"SPACY response: {spacy_response.status_code}") - except Exception as e: - logger.error(f"Error calling SPACY: {e}") - - return { - "status": "sent", - "ocr_pdf": ocr_pdf_base64, - "message": "PDF successfully OCR'd and processed" - }, 200 - - except Exception as e: - logger.error(f"Exception in OCR processing: {str(e)}", exc_info=True) - return {"error": f"Processing failed: {str(e)}"}, 500 if __name__ == "__main__": logger.info("Starting OCR service on port 5000") - app.run(host="0.0.0.0", port=5000, debug=True) \ No newline at end of file + app.run(host="0.0.0.0", port=5051, debug=True) diff --git a/project/backend/validate-service/app.py b/project/backend/validate-service/app.py index 2c821d5..2dfb9cb 100644 --- a/project/backend/validate-service/app.py +++ b/project/backend/validate-service/app.py @@ -10,7 +10,7 @@ import json app = Flask(__name__) load_dotenv() -coordinator_url = os.getenv("COORDINATOR_URL", "localhost:5000") +coordinator_url = os.getenv("COORDINATOR_URL", "http://localhost:5000") # todo add persistence layer data_storage = {} # {id: {spacy_data: [], exxeta_data: []}} @@ -28,7 +28,7 @@ def send_to_coordinator_service(processed_data, request_id): "kpi": json.dumps(processed_data), } requests.put( - "http://" + coordinator_url + "/api/pitch_book/" + str(request_id), + coordinator_url + "/api/pitch_book/" + str(request_id), data=payload, ) print(f"Result PitchBook {request_id} sent to coordinator") @@ -128,4 +128,4 @@ def validate(): if __name__ == "__main__": - app.run(debug=True, host="0.0.0.0", port=5054) \ No newline at end of file + app.run(debug=True, host="0.0.0.0", port=5054) diff --git a/project/docker-compose.yml b/project/docker-compose.yml index df08628..d15ab12 100644 --- a/project/docker-compose.yml +++ b/project/docker-compose.yml @@ -73,6 +73,6 @@ services: env_file: - .env environment: - - COORDINATOR_URL=coordinator:5000 + - COORDINATOR_URL=http://coordinator:5000 ports: - - 5054:5000 \ No newline at end of file + - 5054:5000