Add: OCR sends pdf async to coordinantor
Add progress tracking and storage lock for PDF processing Refactor OCR service to handle PDF processing asynchronouslypull/57/head
parent
26d945e7eb
commit
594eb4ea48
|
|
@ -2,4 +2,4 @@ API_KEY=
|
||||||
DATABASE_URL=postgresql://admin:admin@db:5432
|
DATABASE_URL=postgresql://admin:admin@db:5432
|
||||||
POSTGRES_PASSWORD=admin
|
POSTGRES_PASSWORD=admin
|
||||||
POSTGRES_USER=admin
|
POSTGRES_USER=admin
|
||||||
COORDINATOR_URL="coordinator:5000"
|
COORDINATOR_URL="http://coordinator:5000"
|
||||||
|
|
|
||||||
|
|
@ -20,10 +20,12 @@ app.config["MAX_CONTENT_LENGTH"] = 100 * 1024 * 1024 # 100 MB
|
||||||
init_db(app)
|
init_db(app)
|
||||||
register_routes(app)
|
register_routes(app)
|
||||||
|
|
||||||
|
|
||||||
@app.route("/health")
|
@app.route("/health")
|
||||||
def health_check():
|
def health_check():
|
||||||
return "OK"
|
return "OK"
|
||||||
|
|
||||||
|
|
||||||
# für Docker wichtig: host='0.0.0.0'
|
# für Docker wichtig: host='0.0.0.0'
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
socketio.run(app, debug=True, host="0.0.0.0", port=5050)
|
socketio.run(app, debug=True, host="0.0.0.0", port=5050)
|
||||||
|
|
|
||||||
|
|
@ -13,44 +13,51 @@ from controller.socketIO import socketio
|
||||||
pitch_book_controller = Blueprint("pitch_books", __name__, url_prefix="/api/pitch_book")
|
pitch_book_controller = Blueprint("pitch_books", __name__, url_prefix="/api/pitch_book")
|
||||||
OCR_SERVICE_URL = os.getenv("OCR_SERVICE_URL", "http://localhost:5051")
|
OCR_SERVICE_URL = os.getenv("OCR_SERVICE_URL", "http://localhost:5051")
|
||||||
|
|
||||||
|
|
||||||
|
progress_per_id = {} # {id: {kpi: 0, pdf: 0}}
|
||||||
|
storage_lock = threading.Lock()
|
||||||
|
|
||||||
def process_pdf_async(app, file_id, file_data, filename):
|
def process_pdf_async(app, file_id, file_data, filename):
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
try:
|
try:
|
||||||
file_obj = BytesIO(file_data)
|
file_obj = BytesIO(file_data)
|
||||||
file_obj.name = filename
|
file_obj.name = filename
|
||||||
|
|
||||||
files = {'file': (filename, file_obj, 'application/pdf')}
|
files = {"file": (filename, file_obj, "application/pdf")}
|
||||||
data = {'id': file_id}
|
data = {"id": file_id}
|
||||||
|
|
||||||
response = requests.post(
|
response = requests.post(
|
||||||
f"{OCR_SERVICE_URL}/ocr",
|
f"{OCR_SERVICE_URL}/ocr", files=files, data=data, timeout=600
|
||||||
files=files,
|
|
||||||
data=data,
|
|
||||||
timeout=600
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
response_data = response.json()
|
response_data = response.json()
|
||||||
if 'ocr_pdf' in response_data:
|
if "ocr_pdf" in response_data:
|
||||||
import base64
|
import base64
|
||||||
ocr_pdf_data = base64.b64decode(response_data['ocr_pdf'])
|
|
||||||
|
ocr_pdf_data = base64.b64decode(response_data["ocr_pdf"])
|
||||||
|
|
||||||
file_record = PitchBookModel.query.get(file_id)
|
file_record = PitchBookModel.query.get(file_id)
|
||||||
if file_record:
|
if file_record:
|
||||||
file_record.file = ocr_pdf_data
|
file_record.file = ocr_pdf_data
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
print(f"[DEBUG] PDF updated in database:")
|
print("[DEBUG] PDF updated in database:")
|
||||||
print(f"[DEBUG] - Successfully saved to database")
|
print("[DEBUG] - Successfully saved to database")
|
||||||
|
|
||||||
socketio.emit("progress", {"id": file_id, "progress": 50})
|
socketio.emit("progress", {"id": file_id, "progress": 50})
|
||||||
else:
|
else:
|
||||||
socketio.emit("error", {"id": file_id, "message": "OCR processing failed"})
|
socketio.emit(
|
||||||
|
"error", {"id": file_id, "message": "OCR processing failed"}
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
socketio.emit("error", {"id": file_id, "message": f"Processing failed: {str(e)}"})
|
socketio.emit(
|
||||||
|
"error", {"id": file_id, "message": f"Processing failed: {str(e)}"}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pitch_book_controller.route("/", methods=["POST"])
|
@pitch_book_controller.route("/", methods=["POST"])
|
||||||
|
|
@ -78,27 +85,33 @@ def upload_file():
|
||||||
db.session.add(new_file)
|
db.session.add(new_file)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
app = current_app._get_current_object()
|
files = {"file": (uploaded_file.filename, file_data, "application/pdf")}
|
||||||
|
data = {"id": new_file.id}
|
||||||
|
|
||||||
socketio.emit("progress", {"id": new_file.id, "progress": 10})
|
response = requests.post(
|
||||||
|
f"{OCR_SERVICE_URL}/ocr", files=files, data=data, timeout=600
|
||||||
processing_thread = threading.Thread(
|
|
||||||
target=process_pdf_async,
|
|
||||||
args=(app, new_file.id, file_data, fileName),
|
|
||||||
daemon=True
|
|
||||||
)
|
)
|
||||||
processing_thread.start()
|
if response.status_code == 200:
|
||||||
|
socketio.emit("progress", {"id": new_file.id, "progress": 10})
|
||||||
|
else:
|
||||||
|
print("Failed to process file")
|
||||||
|
|
||||||
return jsonify({
|
return (
|
||||||
**new_file.to_dict(),
|
jsonify(
|
||||||
"status": "processing",
|
{
|
||||||
"message": "File uploaded successfully. Processing started."
|
**new_file.to_dict(),
|
||||||
}), 202
|
"status": "processing",
|
||||||
|
"message": "File uploaded successfully. Processing started.",
|
||||||
|
}
|
||||||
|
),
|
||||||
|
202,
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(e)
|
||||||
return jsonify({"error": "Invalid file format. Only PDF files are accepted"}), 400
|
return jsonify({"error": "Invalid file format. Only PDF files are accepted"}), 400
|
||||||
|
|
||||||
|
|
||||||
@pitch_book_controller.route("/", methods=["GET"])
|
@pitch_book_controller.route("/", methods=["GET"])
|
||||||
def get_all_files():
|
def get_all_files():
|
||||||
files = PitchBookModel.query.all()
|
files = PitchBookModel.query.all()
|
||||||
|
|
@ -128,21 +141,32 @@ def update_file(id):
|
||||||
if uploaded_file.filename != "":
|
if uploaded_file.filename != "":
|
||||||
file.filename = uploaded_file.filename
|
file.filename = uploaded_file.filename
|
||||||
|
|
||||||
# Read file data once
|
# Read file data once
|
||||||
file_data = uploaded_file.read()
|
file_data = uploaded_file.read()
|
||||||
try:
|
try:
|
||||||
if (
|
if (
|
||||||
uploaded_file
|
file_data
|
||||||
and puremagic.from_string(file_data, mime=True) == "application/pdf"
|
and puremagic.from_string(file_data, mime=True) == "application/pdf"
|
||||||
):
|
):
|
||||||
file.file = file_data
|
file.file = file_data
|
||||||
except Exception as e:
|
with storage_lock:
|
||||||
print(e)
|
if id in progress_per_id and "kpi" in progress_per_id[id]:
|
||||||
|
del progress_per_id[id]["kpi"]
|
||||||
|
socketio.emit("progress", {"id": id, "progress": 100})
|
||||||
|
else:
|
||||||
|
progress_per_id[id] = {"pdf": 0}
|
||||||
|
print(f"[DEBUG] PDF updated in database {id}")
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
|
||||||
if "kpi" in request.form:
|
if "kpi" in request.form:
|
||||||
socketio.emit("progress", {"id": id, "progress": 100})
|
|
||||||
file.kpi = request.form.get("kpi")
|
file.kpi = request.form.get("kpi")
|
||||||
|
with storage_lock:
|
||||||
|
if id in progress_per_id and "pdf" in progress_per_id[id]:
|
||||||
|
del progress_per_id[id]["pdf"]
|
||||||
|
socketio.emit("progress", {"id": id, "progress": 100})
|
||||||
|
else:
|
||||||
|
progress_per_id[id] = {"kpi": 0}
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
return jsonify(file.to_dict()), 200
|
return jsonify(file.to_dict()), 200
|
||||||
|
|
@ -154,4 +178,4 @@ def delete_file(id):
|
||||||
db.session.delete(file)
|
db.session.delete(file)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
return jsonify({"message": f"File {id} deleted successfully"}), 200
|
return jsonify({"message": f"File {id} deleted successfully"}), 200
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
from flask import Blueprint, request, jsonify
|
from flask import Blueprint, request, jsonify
|
||||||
|
|
||||||
from controller.socketIO import socketio
|
from controller.socketIO import socketio
|
||||||
|
|
||||||
progress_controller = Blueprint("progress", __name__, url_prefix="/api/progress")
|
progress_controller = Blueprint("progress", __name__, url_prefix="/api/progress")
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -8,10 +9,14 @@ progress_controller = Blueprint("progress", __name__, url_prefix="/api/progress"
|
||||||
def progress():
|
def progress():
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
|
|
||||||
if 'id' not in data or 'progress' not in data:
|
if "id" not in data or "progress" not in data:
|
||||||
return jsonify({"error": "Missing required fields. [id, progress]"}), 400
|
return jsonify({"error": "Missing required fields. [id, progress]"}), 400
|
||||||
|
|
||||||
if not isinstance(data['progress'], (int, float)) or data['progress'] < 0 or data['progress'] >= 100:
|
if (
|
||||||
|
not isinstance(data["progress"], (int, float))
|
||||||
|
or data["progress"] < 0
|
||||||
|
or data["progress"] >= 100
|
||||||
|
):
|
||||||
return jsonify({"error": "Invalid progress value"}), 400
|
return jsonify({"error": "Invalid progress value"}), 400
|
||||||
|
|
||||||
socketio.emit("progress", {"id": data["id"], "progress": data["progress"]})
|
socketio.emit("progress", {"id": data["id"], "progress": data["progress"]})
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,6 @@
|
||||||
from flask_socketio import SocketIO
|
from flask_socketio import SocketIO
|
||||||
|
|
||||||
socketio = SocketIO(cors_allowed_origins=["http://localhost:8080", "http://localhost:3000"],
|
socketio = SocketIO(
|
||||||
transports=['polling', 'websocket'] )
|
cors_allowed_origins=["http://localhost:8080", "http://localhost:3000"],
|
||||||
|
transports=["polling", "websocket"],
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ class KPISettingType(Enum):
|
||||||
|
|
||||||
|
|
||||||
class KPISettingModel(db.Model):
|
class KPISettingModel(db.Model):
|
||||||
__tablename__ = 'kpi_setting_model'
|
__tablename__ = "kpi_setting_model"
|
||||||
|
|
||||||
id: Mapped[int] = mapped_column(primary_key=True)
|
id: Mapped[int] = mapped_column(primary_key=True)
|
||||||
name: Mapped[str] = mapped_column(unique=True)
|
name: Mapped[str] = mapped_column(unique=True)
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,12 @@
|
||||||
from flask import Flask, request, jsonify
|
from flask import Flask, request
|
||||||
from ocr_runner import pdf_to_json, ocr_pdf
|
from ocr_runner import pdf_to_json, ocr_pdf
|
||||||
import requests
|
import requests
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
import base64
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
# Set up logging
|
# Set up logging
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
@ -15,6 +16,61 @@ app = Flask(__name__)
|
||||||
|
|
||||||
EXXETA_URL = os.getenv("EXXETA_SERVICE_URL", "http://localhost:5053/extract")
|
EXXETA_URL = os.getenv("EXXETA_SERVICE_URL", "http://localhost:5053/extract")
|
||||||
SPACY_URL = os.getenv("SPACY_SERVICE_URL", "http://localhost:5052/extract")
|
SPACY_URL = os.getenv("SPACY_SERVICE_URL", "http://localhost:5052/extract")
|
||||||
|
COORDINATOR_URL = os.getenv("COORDINATOR_URL", "http://localhost:5050")
|
||||||
|
|
||||||
|
|
||||||
|
def convert_pdf_async(temp_path, pitchbook_id):
|
||||||
|
try:
|
||||||
|
logger.info("Starting OCR process...")
|
||||||
|
|
||||||
|
ocr_path = ocr_pdf(temp_path)
|
||||||
|
|
||||||
|
if not ocr_path or not ocr_path.exists():
|
||||||
|
temp_path.unlink() # cleanup
|
||||||
|
return {"error": "OCR processing failed - all PDFs must be OCR'd"}, 500
|
||||||
|
|
||||||
|
with open(ocr_path, 'rb') as ocr_file:
|
||||||
|
ocr_file.seek(0)
|
||||||
|
result = pdf_to_json(ocr_file)
|
||||||
|
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"id": int(pitchbook_id),
|
||||||
|
"extracted_text_per_page": result["pages"]
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("Sending payload to EXXETA and SPACY services")
|
||||||
|
|
||||||
|
try:
|
||||||
|
exxeta_response = requests.post(EXXETA_URL, json=payload, timeout=600)
|
||||||
|
logger.info(f"EXXETA response: {exxeta_response.status_code}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error calling EXXETA: {e}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
spacy_response = requests.post(SPACY_URL, json=payload, timeout=600)
|
||||||
|
logger.info(f"SPACY response: {spacy_response.status_code}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error calling SPACY: {e}")
|
||||||
|
|
||||||
|
files=[
|
||||||
|
('file',('',open(ocr_path,'rb'),'application/pdf'))
|
||||||
|
]
|
||||||
|
headers = {}
|
||||||
|
|
||||||
|
try:
|
||||||
|
requests.put(f"{COORDINATOR_URL}/api/pitch_book/{pitchbook_id}", files=files, timeout=600, headers=headers)
|
||||||
|
|
||||||
|
requests.post(COORDINATOR_URL + "/api/progress", json={"id": pitchbook_id, "progress": 50}, timeout=600)
|
||||||
|
logger.info("COORDINATOR response: Progress + File updated")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error calling COORDINATOR: {e}")
|
||||||
|
|
||||||
|
ocr_path.unlink()
|
||||||
|
temp_path.unlink()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Exception in OCR processing: {str(e)}", exc_info=True)
|
||||||
|
|
||||||
|
|
||||||
@app.route('/ocr', methods=['POST'])
|
@app.route('/ocr', methods=['POST'])
|
||||||
def convert_extract_text_from_pdf():
|
def convert_extract_text_from_pdf():
|
||||||
|
|
@ -29,59 +85,20 @@ def convert_extract_text_from_pdf():
|
||||||
if not pitchbook_id:
|
if not pitchbook_id:
|
||||||
return {"error": "No ID"}, 400
|
return {"error": "No ID"}, 400
|
||||||
|
|
||||||
try:
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file:
|
||||||
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file:
|
file.seek(0)
|
||||||
file.seek(0)
|
temp_file.write(file.read())
|
||||||
temp_file.write(file.read())
|
temp_path = Path(temp_file.name)
|
||||||
temp_path = Path(temp_file.name)
|
|
||||||
|
|
||||||
logger.info("Starting OCR process...")
|
thread = threading.Thread(target=convert_pdf_async, args=(temp_path, pitchbook_id))
|
||||||
|
thread.start()
|
||||||
|
|
||||||
ocr_path = ocr_pdf(temp_path)
|
return {
|
||||||
|
"status": "sent",
|
||||||
|
"message": "PDF successfully OCR'd and processed"
|
||||||
|
}, 200
|
||||||
|
|
||||||
if not ocr_path or not ocr_path.exists():
|
|
||||||
temp_path.unlink() # cleanup
|
|
||||||
return {"error": "OCR processing failed - all PDFs must be OCR'd"}, 500
|
|
||||||
|
|
||||||
with open(ocr_path, 'rb') as ocr_file:
|
|
||||||
ocr_pdf_data = ocr_file.read()
|
|
||||||
ocr_pdf_base64 = base64.b64encode(ocr_pdf_data).decode('utf-8')
|
|
||||||
|
|
||||||
ocr_file.seek(0)
|
|
||||||
result = pdf_to_json(ocr_file)
|
|
||||||
|
|
||||||
ocr_path.unlink()
|
|
||||||
temp_path.unlink()
|
|
||||||
|
|
||||||
payload = {
|
|
||||||
"id": int(pitchbook_id),
|
|
||||||
"extracted_text_per_page": result["pages"]
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info(f"Sending payload to EXXETA and SPACY services")
|
|
||||||
|
|
||||||
try:
|
|
||||||
exxeta_response = requests.post(EXXETA_URL, json=payload, timeout=600)
|
|
||||||
logger.info(f"EXXETA response: {exxeta_response.status_code}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error calling EXXETA: {e}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
spacy_response = requests.post(SPACY_URL, json=payload, timeout=600)
|
|
||||||
logger.info(f"SPACY response: {spacy_response.status_code}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error calling SPACY: {e}")
|
|
||||||
|
|
||||||
return {
|
|
||||||
"status": "sent",
|
|
||||||
"ocr_pdf": ocr_pdf_base64,
|
|
||||||
"message": "PDF successfully OCR'd and processed"
|
|
||||||
}, 200
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Exception in OCR processing: {str(e)}", exc_info=True)
|
|
||||||
return {"error": f"Processing failed: {str(e)}"}, 500
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
logger.info("Starting OCR service on port 5000")
|
logger.info("Starting OCR service on port 5000")
|
||||||
app.run(host="0.0.0.0", port=5000, debug=True)
|
app.run(host="0.0.0.0", port=5051, debug=True)
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ import json
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
coordinator_url = os.getenv("COORDINATOR_URL", "localhost:5000")
|
coordinator_url = os.getenv("COORDINATOR_URL", "http://localhost:5000")
|
||||||
|
|
||||||
# todo add persistence layer
|
# todo add persistence layer
|
||||||
data_storage = {} # {id: {spacy_data: [], exxeta_data: []}}
|
data_storage = {} # {id: {spacy_data: [], exxeta_data: []}}
|
||||||
|
|
@ -28,7 +28,7 @@ def send_to_coordinator_service(processed_data, request_id):
|
||||||
"kpi": json.dumps(processed_data),
|
"kpi": json.dumps(processed_data),
|
||||||
}
|
}
|
||||||
requests.put(
|
requests.put(
|
||||||
"http://" + coordinator_url + "/api/pitch_book/" + str(request_id),
|
coordinator_url + "/api/pitch_book/" + str(request_id),
|
||||||
data=payload,
|
data=payload,
|
||||||
)
|
)
|
||||||
print(f"Result PitchBook {request_id} sent to coordinator")
|
print(f"Result PitchBook {request_id} sent to coordinator")
|
||||||
|
|
@ -128,4 +128,4 @@ def validate():
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
app.run(debug=True, host="0.0.0.0", port=5054)
|
app.run(debug=True, host="0.0.0.0", port=5054)
|
||||||
|
|
|
||||||
|
|
@ -73,6 +73,6 @@ services:
|
||||||
env_file:
|
env_file:
|
||||||
- .env
|
- .env
|
||||||
environment:
|
environment:
|
||||||
- COORDINATOR_URL=coordinator:5000
|
- COORDINATOR_URL=http://coordinator:5000
|
||||||
ports:
|
ports:
|
||||||
- 5054:5000
|
- 5054:5000
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue