Add progress updates to ocr & exxeta processes #72
|
|
@ -17,48 +17,6 @@ OCR_SERVICE_URL = os.getenv("OCR_SERVICE_URL", "http://localhost:5051")
|
|||
progress_per_id = {} # {id: {kpi: 0, pdf: 0}}
|
||||
storage_lock = threading.Lock()
|
||||
|
||||
def process_pdf_async(app, file_id, file_data, filename):
|
||||
with app.app_context():
|
||||
try:
|
||||
file_obj = BytesIO(file_data)
|
||||
file_obj.name = filename
|
||||
|
||||
files = {"file": (filename, file_obj, "application/pdf")}
|
||||
data = {"id": file_id}
|
||||
|
||||
response = requests.post(
|
||||
f"{OCR_SERVICE_URL}/ocr", files=files, data=data, timeout=600
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
response_data = response.json()
|
||||
if "ocr_pdf" in response_data:
|
||||
import base64
|
||||
|
||||
ocr_pdf_data = base64.b64decode(response_data["ocr_pdf"])
|
||||
|
||||
file_record = PitchBookModel.query.get(file_id)
|
||||
if file_record:
|
||||
file_record.file = ocr_pdf_data
|
||||
db.session.commit()
|
||||
|
||||
print("[DEBUG] PDF updated in database:")
|
||||
print("[DEBUG] - Successfully saved to database")
|
||||
|
||||
socketio.emit("progress", {"id": file_id, "progress": 50})
|
||||
else:
|
||||
socketio.emit(
|
||||
"error", {"id": file_id, "message": "OCR processing failed"}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
socketio.emit(
|
||||
"error", {"id": file_id, "message": f"Processing failed: {str(e)}"}
|
||||
)
|
||||
|
||||
|
||||
@pitch_book_controller.route("/", methods=["POST"])
|
||||
def upload_file():
|
||||
|
|
@ -88,6 +46,7 @@ def upload_file():
|
|||
files = {"file": (uploaded_file.filename, file_data, "application/pdf")}
|
||||
data = {"id": new_file.id}
|
||||
|
||||
socketio.emit("progress", {"id": new_file.id, "progress": 5})
|
||||
response = requests.post(
|
||||
f"{OCR_SERVICE_URL}/ocr", files=files, data=data, timeout=600
|
||||
)
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ def extract_text_from_ocr_json():
|
|||
pitchbook_id = json_data["id"]
|
||||
pages_data = json_data["extracted_text_per_page"]
|
||||
|
||||
entities_json = extract_with_exxeta(pages_data)
|
||||
entities_json = extract_with_exxeta(pages_data, pitchbook_id)
|
||||
entities = json.loads(entities_json) if isinstance(entities_json, str) else entities_json
|
||||
|
||||
validate_payload = {
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ MODEL = "gpt-4o-mini"
|
|||
EXXETA_BASE_URL = "https://ai.exxeta.com/api/v2/azure/openai"
|
||||
load_dotenv()
|
||||
EXXETA_API_KEY = os.getenv("API_KEY")
|
||||
COORDINATOR_URL = os.getenv("COORDINATOR_URL", "http://localhost:5050")
|
||||
|
||||
MAX_RETRIES = 3
|
||||
TIMEOUT = 180
|
||||
|
|
@ -16,14 +17,20 @@ TIMEOUT = 180
|
|||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def extract_with_exxeta(pages_json):
|
||||
def extract_with_exxeta(pages_json, pitchbook_id):
|
||||
results = []
|
||||
|
||||
if not EXXETA_API_KEY:
|
||||
logger.warning("EXXETA_API_KEY nicht gesetzt. Rückgabe eines leeren JSON.")
|
||||
return json.dumps(results, indent=2, ensure_ascii=False)
|
||||
|
||||
i = 0
|
||||
for page_data in pages_json:
|
||||
i += 1
|
||||
if i % 8 == 0:
|
||||
requests.post(COORDINATOR_URL + "/api/progress", json={"id": pitchbook_id, "progress": 35 + 60/len(pages_json)*i})
|
||||
|
||||
|
||||
page_num = page_data.get("page")
|
||||
page_data.get("page")
|
||||
text = page_data.get("text", "")
|
||||
|
|
@ -144,4 +151,6 @@ def extract_with_exxeta(pages_json):
|
|||
if attempt == MAX_RETRIES:
|
||||
results.extend([])
|
||||
|
||||
|
||||
requests.post(COORDINATOR_URL + "/api/progress", json={"id": pitchbook_id, "progress": 95})
|
||||
return json.dumps(results, indent=2, ensure_ascii=False)
|
||||
|
|
@ -41,6 +41,7 @@ def convert_pdf_async(temp_path, pitchbook_id):
|
|||
|
||||
logger.info("Sending payload to EXXETA and SPACY services")
|
||||
|
||||
requests.post(COORDINATOR_URL + "/api/progress", json={"id": pitchbook_id, "progress": 35})
|
||||
try:
|
||||
exxeta_response = requests.post(EXXETA_URL, json=payload, timeout=600)
|
||||
logger.info(f"EXXETA response: {exxeta_response.status_code}")
|
||||
|
|
@ -59,9 +60,8 @@ def convert_pdf_async(temp_path, pitchbook_id):
|
|||
headers = {}
|
||||
|
||||
try:
|
||||
requests.put(f"{COORDINATOR_URL}/api/pitch_book/{pitchbook_id}", files=files, timeout=600, headers=headers)
|
||||
|
||||
requests.post(COORDINATOR_URL + "/api/progress", json={"id": pitchbook_id, "progress": 50}, timeout=600)
|
||||
requests.put(f"{COORDINATOR_URL}/api/pitch_book/{pitchbook_id}", files=files, timeout=600, headers=headers)
|
||||
logger.info("COORDINATOR response: Progress + File updated")
|
||||
except Exception as e:
|
||||
logger.error(f"Error calling COORDINATOR: {e}")
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import json
|
|||
app = Flask(__name__)
|
||||
|
||||
load_dotenv()
|
||||
coordinator_url = os.getenv("COORDINATOR_URL", "http://localhost:5000")
|
||||
COORDINATOR_URL = os.getenv("COORDINATOR_URL", "http://localhost:5000")
|
||||
|
||||
# todo add persistence layer
|
||||
data_storage = {} # {id: {spacy_data: [], exxeta_data: []}}
|
||||
|
|
@ -19,7 +19,7 @@ storage_lock = threading.Lock()
|
|||
|
||||
|
||||
def send_to_coordinator_service(processed_data, request_id):
|
||||
if not coordinator_url:
|
||||
if not COORDINATOR_URL:
|
||||
print("Not processed, missing url", processed_data)
|
||||
return
|
||||
|
||||
|
|
@ -28,7 +28,7 @@ def send_to_coordinator_service(processed_data, request_id):
|
|||
"kpi": json.dumps(processed_data),
|
||||
}
|
||||
requests.put(
|
||||
coordinator_url + "/api/pitch_book/" + str(request_id),
|
||||
COORDINATOR_URL + "/api/pitch_book/" + str(request_id),
|
||||
data=payload,
|
||||
)
|
||||
print(f"Result PitchBook {request_id} sent to coordinator")
|
||||
|
|
@ -40,6 +40,7 @@ def send_to_coordinator_service(processed_data, request_id):
|
|||
|
||||
def process_data_async(request_id, spacy_data, exxeta_data):
|
||||
try:
|
||||
requests.post(COORDINATOR_URL + "/api/progress", json={"id": request_id, "progress": 95})
|
||||
print(f"Start asynchronous processing for PitchBook: {request_id}")
|
||||
|
||||
# Perform merge
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ services:
|
|||
environment:
|
||||
- EXXETA_SERVICE_URL=http://exxeta:5000/extract
|
||||
- SPACY_SERVICE_URL=http://spacy:5052/extract
|
||||
- COORDINATOR_URL=http://coordinator:5000
|
||||
ports:
|
||||
- 5051:5000
|
||||
|
||||
|
|
@ -63,6 +64,7 @@ services:
|
|||
- .env
|
||||
environment:
|
||||
- VALIDATE_SERVICE_URL=http://validate:5000/validate
|
||||
- COORDINATOR_URL=http://coordinator:5000
|
||||
ports:
|
||||
- 5053:5000
|
||||
|
||||
|
|
|
|||
|
|
@ -6,8 +6,6 @@ import FileUpload from "react-material-file-upload";
|
|||
import { socket } from "../socket";
|
||||
import { CircularProgressWithLabel } from "./CircularProgressWithLabel";
|
||||
|
||||
const PROGRESS = true;
|
||||
|
||||
export default function UploadPage() {
|
||||
const [files, setFiles] = useState<File[]>([]);
|
||||
const [pageId, setPageId] = useState<string | null>(null);
|
||||
|
|
@ -27,17 +25,11 @@ export default function UploadPage() {
|
|||
console.log("File uploaded successfully");
|
||||
const data = await response.json();
|
||||
setPageId(data.id.toString());
|
||||
setLoadingState(0);
|
||||
|
||||
!PROGRESS &&
|
||||
navigate({
|
||||
to: "/extractedResult/$pitchBook",
|
||||
params: { pitchBook: data.id.toString() },
|
||||
});
|
||||
setLoadingState(5);
|
||||
} else {
|
||||
console.error("Failed to upload file");
|
||||
}
|
||||
}, [files, navigate]);
|
||||
}, [files]);
|
||||
|
||||
const onConnection = useCallback(() => {
|
||||
console.log("connected");
|
||||
|
|
@ -79,18 +71,16 @@ export default function UploadPage() {
|
|||
|
||||
return (
|
||||
<>
|
||||
{PROGRESS && (
|
||||
<Backdrop
|
||||
sx={(theme) => ({ color: "#fff", zIndex: theme.zIndex.drawer + 1 })}
|
||||
open={pageId !== null && loadingState !== null && loadingState < 100}
|
||||
>
|
||||
<CircularProgressWithLabel
|
||||
color="inherit"
|
||||
value={loadingState || 0}
|
||||
size={60}
|
||||
/>
|
||||
</Backdrop>
|
||||
)}
|
||||
<Backdrop
|
||||
sx={(theme) => ({ color: "#fff", zIndex: theme.zIndex.drawer + 1 })}
|
||||
open={pageId !== null && loadingState !== null && loadingState < 100}
|
||||
>
|
||||
<CircularProgressWithLabel
|
||||
color="inherit"
|
||||
value={loadingState || 0}
|
||||
size={60}
|
||||
/>
|
||||
</Backdrop>
|
||||
<Box
|
||||
display="flex"
|
||||
flexDirection="column"
|
||||
|
|
|
|||
Loading…
Reference in New Issue