Merge pull request 'Add progress updates to ocr & exxeta processes' (#72) from #23-Progress into main
Reviewed-on: #72pull/79/head^2
commit
a947de31bf
|
|
@ -17,48 +17,6 @@ OCR_SERVICE_URL = os.getenv("OCR_SERVICE_URL", "http://localhost:5051")
|
||||||
progress_per_id = {} # {id: {kpi: 0, pdf: 0}}
|
progress_per_id = {} # {id: {kpi: 0, pdf: 0}}
|
||||||
storage_lock = threading.Lock()
|
storage_lock = threading.Lock()
|
||||||
|
|
||||||
def process_pdf_async(app, file_id, file_data, filename):
|
|
||||||
with app.app_context():
|
|
||||||
try:
|
|
||||||
file_obj = BytesIO(file_data)
|
|
||||||
file_obj.name = filename
|
|
||||||
|
|
||||||
files = {"file": (filename, file_obj, "application/pdf")}
|
|
||||||
data = {"id": file_id}
|
|
||||||
|
|
||||||
response = requests.post(
|
|
||||||
f"{OCR_SERVICE_URL}/ocr", files=files, data=data, timeout=600
|
|
||||||
)
|
|
||||||
|
|
||||||
if response.status_code == 200:
|
|
||||||
response_data = response.json()
|
|
||||||
if "ocr_pdf" in response_data:
|
|
||||||
import base64
|
|
||||||
|
|
||||||
ocr_pdf_data = base64.b64decode(response_data["ocr_pdf"])
|
|
||||||
|
|
||||||
file_record = PitchBookModel.query.get(file_id)
|
|
||||||
if file_record:
|
|
||||||
file_record.file = ocr_pdf_data
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
print("[DEBUG] PDF updated in database:")
|
|
||||||
print("[DEBUG] - Successfully saved to database")
|
|
||||||
|
|
||||||
socketio.emit("progress", {"id": file_id, "progress": 50})
|
|
||||||
else:
|
|
||||||
socketio.emit(
|
|
||||||
"error", {"id": file_id, "message": "OCR processing failed"}
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
import traceback
|
|
||||||
|
|
||||||
traceback.print_exc()
|
|
||||||
socketio.emit(
|
|
||||||
"error", {"id": file_id, "message": f"Processing failed: {str(e)}"}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pitch_book_controller.route("/", methods=["POST"])
|
@pitch_book_controller.route("/", methods=["POST"])
|
||||||
def upload_file():
|
def upload_file():
|
||||||
|
|
@ -88,6 +46,7 @@ def upload_file():
|
||||||
files = {"file": (uploaded_file.filename, file_data, "application/pdf")}
|
files = {"file": (uploaded_file.filename, file_data, "application/pdf")}
|
||||||
data = {"id": new_file.id}
|
data = {"id": new_file.id}
|
||||||
|
|
||||||
|
socketio.emit("progress", {"id": new_file.id, "progress": 5})
|
||||||
response = requests.post(
|
response = requests.post(
|
||||||
f"{OCR_SERVICE_URL}/ocr", files=files, data=data, timeout=600
|
f"{OCR_SERVICE_URL}/ocr", files=files, data=data, timeout=600
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ def extract_text_from_ocr_json():
|
||||||
pitchbook_id = json_data["id"]
|
pitchbook_id = json_data["id"]
|
||||||
pages_data = json_data["extracted_text_per_page"]
|
pages_data = json_data["extracted_text_per_page"]
|
||||||
|
|
||||||
entities_json = extract_with_exxeta(pages_data)
|
entities_json = extract_with_exxeta(pages_data, pitchbook_id)
|
||||||
entities = json.loads(entities_json) if isinstance(entities_json, str) else entities_json
|
entities = json.loads(entities_json) if isinstance(entities_json, str) else entities_json
|
||||||
|
|
||||||
validate_payload = {
|
validate_payload = {
|
||||||
|
|
@ -39,4 +39,4 @@ def extract_text_from_ocr_json():
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
app.run(host="0.0.0.0", port=5053, debug=True)
|
app.run(host="0.0.0.0", port=5053, debug=True)
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ MODEL = "gpt-4o-mini"
|
||||||
EXXETA_BASE_URL = "https://ai.exxeta.com/api/v2/azure/openai"
|
EXXETA_BASE_URL = "https://ai.exxeta.com/api/v2/azure/openai"
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
EXXETA_API_KEY = os.getenv("API_KEY")
|
EXXETA_API_KEY = os.getenv("API_KEY")
|
||||||
|
COORDINATOR_URL = os.getenv("COORDINATOR_URL", "http://localhost:5050")
|
||||||
|
|
||||||
MAX_RETRIES = 3
|
MAX_RETRIES = 3
|
||||||
TIMEOUT = 180
|
TIMEOUT = 180
|
||||||
|
|
@ -16,14 +17,20 @@ TIMEOUT = 180
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def extract_with_exxeta(pages_json):
|
def extract_with_exxeta(pages_json, pitchbook_id):
|
||||||
results = []
|
results = []
|
||||||
|
|
||||||
if not EXXETA_API_KEY:
|
if not EXXETA_API_KEY:
|
||||||
logger.warning("EXXETA_API_KEY nicht gesetzt. Rückgabe eines leeren JSON.")
|
logger.warning("EXXETA_API_KEY nicht gesetzt. Rückgabe eines leeren JSON.")
|
||||||
return json.dumps(results, indent=2, ensure_ascii=False)
|
return json.dumps(results, indent=2, ensure_ascii=False)
|
||||||
|
|
||||||
|
i = 0
|
||||||
for page_data in pages_json:
|
for page_data in pages_json:
|
||||||
|
i += 1
|
||||||
|
if i % 8 == 0:
|
||||||
|
requests.post(COORDINATOR_URL + "/api/progress", json={"id": pitchbook_id, "progress": 35 + 60/len(pages_json)*i})
|
||||||
|
|
||||||
|
|
||||||
page_num = page_data.get("page")
|
page_num = page_data.get("page")
|
||||||
page_data.get("page")
|
page_data.get("page")
|
||||||
text = page_data.get("text", "")
|
text = page_data.get("text", "")
|
||||||
|
|
@ -57,7 +64,7 @@ def extract_with_exxeta(pages_json):
|
||||||
prompt = (
|
prompt = (
|
||||||
"Bitte extrahiere relevante Fondskennzahlen aus dem folgenden Pitchbook-Text. "
|
"Bitte extrahiere relevante Fondskennzahlen aus dem folgenden Pitchbook-Text. "
|
||||||
"Analysiere den Text sorgfältig, um **nur exakt benannte und relevante Werte** zu extrahieren.\n\n"
|
"Analysiere den Text sorgfältig, um **nur exakt benannte und relevante Werte** zu extrahieren.\n\n"
|
||||||
|
|
||||||
"ZU EXTRAHIERENDE KENNZAHLEN (immer exakt wie unten angegeben):\n"
|
"ZU EXTRAHIERENDE KENNZAHLEN (immer exakt wie unten angegeben):\n"
|
||||||
"- FONDSNAME\n"
|
"- FONDSNAME\n"
|
||||||
"- FONDSMANAGER\n"
|
"- FONDSMANAGER\n"
|
||||||
|
|
@ -74,14 +81,14 @@ def extract_with_exxeta(pages_json):
|
||||||
"- MANAGEMENTGEBÜHREN (ggf. mit Staffelung und Bezug auf NAV/GAV)\n"
|
"- MANAGEMENTGEBÜHREN (ggf. mit Staffelung und Bezug auf NAV/GAV)\n"
|
||||||
"- SEKTORENALLOKATION (z. B. BÜRO, LOGISTIK, WOHNEN... inkl. %-Angaben)\n"
|
"- SEKTORENALLOKATION (z. B. BÜRO, LOGISTIK, WOHNEN... inkl. %-Angaben)\n"
|
||||||
"- LÄNDERALLOKATION (z. B. DEUTSCHLAND, FRANKREICH, etc. inkl. %-Angaben)\n\n"
|
"- LÄNDERALLOKATION (z. B. DEUTSCHLAND, FRANKREICH, etc. inkl. %-Angaben)\n\n"
|
||||||
|
|
||||||
"WICHTIG:\n"
|
"WICHTIG:\n"
|
||||||
"- Gib **nur eine Entität pro Kennzahl** an - keine Listen oder Interpretationen.\n"
|
"- Gib **nur eine Entität pro Kennzahl** an - keine Listen oder Interpretationen.\n"
|
||||||
"- Wenn mehrere Varianten genannt werden (z. B. \"Core und Core+\"), gib sie im Originalformat als **eine entity** an.\n"
|
"- Wenn mehrere Varianten genannt werden (z. B. \"Core und Core+\"), gib sie im Originalformat als **eine entity** an.\n"
|
||||||
"- **Keine Vermutungen oder Ergänzungen**. Wenn keine Information enthalten ist, gib die Kennzahl **nicht aus**.\n"
|
"- **Keine Vermutungen oder Ergänzungen**. Wenn keine Information enthalten ist, gib die Kennzahl **nicht aus**.\n"
|
||||||
"- Extrahiere **nur wörtlich vorkommende Inhalte** (keine Berechnungen, keine Zusammenfassungen).\n"
|
"- Extrahiere **nur wörtlich vorkommende Inhalte** (keine Berechnungen, keine Zusammenfassungen).\n"
|
||||||
"- Jeder gefundene Wert muss einem der obigen Label **eindeutig zuordenbar** sein.\n\n"
|
"- Jeder gefundene Wert muss einem der obigen Label **eindeutig zuordenbar** sein.\n\n"
|
||||||
|
|
||||||
"FORMAT:\n"
|
"FORMAT:\n"
|
||||||
"Antworte als **reines JSON-Array** mit folgendem Format:\n"
|
"Antworte als **reines JSON-Array** mit folgendem Format:\n"
|
||||||
"[\n"
|
"[\n"
|
||||||
|
|
@ -92,7 +99,7 @@ def extract_with_exxeta(pages_json):
|
||||||
" },\n"
|
" },\n"
|
||||||
" ...\n"
|
" ...\n"
|
||||||
"]\n\n"
|
"]\n\n"
|
||||||
|
|
||||||
f"Falls keine Kennzahl enthalten ist, gib ein leeres Array [] zurück.\n\n"
|
f"Falls keine Kennzahl enthalten ist, gib ein leeres Array [] zurück.\n\n"
|
||||||
f"Nur JSON-Antwort - keine Kommentare, keine Erklärungen, kein Text außerhalb des JSON.\n\n"
|
f"Nur JSON-Antwort - keine Kommentare, keine Erklärungen, kein Text außerhalb des JSON.\n\n"
|
||||||
f"TEXT:\n{text}"
|
f"TEXT:\n{text}"
|
||||||
|
|
@ -144,4 +151,6 @@ def extract_with_exxeta(pages_json):
|
||||||
if attempt == MAX_RETRIES:
|
if attempt == MAX_RETRIES:
|
||||||
results.extend([])
|
results.extend([])
|
||||||
|
|
||||||
return json.dumps(results, indent=2, ensure_ascii=False)
|
|
||||||
|
requests.post(COORDINATOR_URL + "/api/progress", json={"id": pitchbook_id, "progress": 95})
|
||||||
|
return json.dumps(results, indent=2, ensure_ascii=False)
|
||||||
|
|
|
||||||
|
|
@ -41,6 +41,7 @@ def convert_pdf_async(temp_path, pitchbook_id):
|
||||||
|
|
||||||
logger.info("Sending payload to EXXETA and SPACY services")
|
logger.info("Sending payload to EXXETA and SPACY services")
|
||||||
|
|
||||||
|
requests.post(COORDINATOR_URL + "/api/progress", json={"id": pitchbook_id, "progress": 35})
|
||||||
try:
|
try:
|
||||||
exxeta_response = requests.post(EXXETA_URL, json=payload, timeout=600)
|
exxeta_response = requests.post(EXXETA_URL, json=payload, timeout=600)
|
||||||
logger.info(f"EXXETA response: {exxeta_response.status_code}")
|
logger.info(f"EXXETA response: {exxeta_response.status_code}")
|
||||||
|
|
@ -59,9 +60,8 @@ def convert_pdf_async(temp_path, pitchbook_id):
|
||||||
headers = {}
|
headers = {}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
requests.put(f"{COORDINATOR_URL}/api/pitch_book/{pitchbook_id}", files=files, timeout=600, headers=headers)
|
|
||||||
|
|
||||||
requests.post(COORDINATOR_URL + "/api/progress", json={"id": pitchbook_id, "progress": 50}, timeout=600)
|
requests.put(f"{COORDINATOR_URL}/api/pitch_book/{pitchbook_id}", files=files, timeout=600, headers=headers)
|
||||||
logger.info("COORDINATOR response: Progress + File updated")
|
logger.info("COORDINATOR response: Progress + File updated")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error calling COORDINATOR: {e}")
|
logger.error(f"Error calling COORDINATOR: {e}")
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ import json
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
coordinator_url = os.getenv("COORDINATOR_URL", "http://localhost:5000")
|
COORDINATOR_URL = os.getenv("COORDINATOR_URL", "http://localhost:5000")
|
||||||
|
|
||||||
# todo add persistence layer
|
# todo add persistence layer
|
||||||
data_storage = {} # {id: {spacy_data: [], exxeta_data: []}}
|
data_storage = {} # {id: {spacy_data: [], exxeta_data: []}}
|
||||||
|
|
@ -19,7 +19,7 @@ storage_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
def send_to_coordinator_service(processed_data, request_id):
|
def send_to_coordinator_service(processed_data, request_id):
|
||||||
if not coordinator_url:
|
if not COORDINATOR_URL:
|
||||||
print("Not processed, missing url", processed_data)
|
print("Not processed, missing url", processed_data)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -28,7 +28,7 @@ def send_to_coordinator_service(processed_data, request_id):
|
||||||
"kpi": json.dumps(processed_data),
|
"kpi": json.dumps(processed_data),
|
||||||
}
|
}
|
||||||
requests.put(
|
requests.put(
|
||||||
coordinator_url + "/api/pitch_book/" + str(request_id),
|
COORDINATOR_URL + "/api/pitch_book/" + str(request_id),
|
||||||
data=payload,
|
data=payload,
|
||||||
)
|
)
|
||||||
print(f"Result PitchBook {request_id} sent to coordinator")
|
print(f"Result PitchBook {request_id} sent to coordinator")
|
||||||
|
|
@ -40,6 +40,7 @@ def send_to_coordinator_service(processed_data, request_id):
|
||||||
|
|
||||||
def process_data_async(request_id, spacy_data, exxeta_data):
|
def process_data_async(request_id, spacy_data, exxeta_data):
|
||||||
try:
|
try:
|
||||||
|
requests.post(COORDINATOR_URL + "/api/progress", json={"id": request_id, "progress": 95})
|
||||||
print(f"Start asynchronous processing for PitchBook: {request_id}")
|
print(f"Start asynchronous processing for PitchBook: {request_id}")
|
||||||
|
|
||||||
# Perform merge
|
# Perform merge
|
||||||
|
|
|
||||||
|
|
@ -47,6 +47,7 @@ services:
|
||||||
environment:
|
environment:
|
||||||
- EXXETA_SERVICE_URL=http://exxeta:5000/extract
|
- EXXETA_SERVICE_URL=http://exxeta:5000/extract
|
||||||
- SPACY_SERVICE_URL=http://spacy:5052/extract
|
- SPACY_SERVICE_URL=http://spacy:5052/extract
|
||||||
|
- COORDINATOR_URL=http://coordinator:5000
|
||||||
ports:
|
ports:
|
||||||
- 5051:5000
|
- 5051:5000
|
||||||
|
|
||||||
|
|
@ -66,6 +67,7 @@ services:
|
||||||
- .env
|
- .env
|
||||||
environment:
|
environment:
|
||||||
- VALIDATE_SERVICE_URL=http://validate:5000/validate
|
- VALIDATE_SERVICE_URL=http://validate:5000/validate
|
||||||
|
- COORDINATOR_URL=http://coordinator:5000
|
||||||
ports:
|
ports:
|
||||||
- 5053:5000
|
- 5053:5000
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,8 +7,6 @@ import { socket } from "../socket";
|
||||||
import { CircularProgressWithLabel } from "./CircularProgressWithLabel";
|
import { CircularProgressWithLabel } from "./CircularProgressWithLabel";
|
||||||
import { API_HOST } from "../util/api";
|
import { API_HOST } from "../util/api";
|
||||||
|
|
||||||
const PROGRESS = true;
|
|
||||||
|
|
||||||
export default function UploadPage() {
|
export default function UploadPage() {
|
||||||
const [files, setFiles] = useState<File[]>([]);
|
const [files, setFiles] = useState<File[]>([]);
|
||||||
const [pageId, setPageId] = useState<string | null>(null);
|
const [pageId, setPageId] = useState<string | null>(null);
|
||||||
|
|
@ -28,17 +26,11 @@ export default function UploadPage() {
|
||||||
console.log("File uploaded successfully");
|
console.log("File uploaded successfully");
|
||||||
const data = await response.json();
|
const data = await response.json();
|
||||||
setPageId(data.id.toString());
|
setPageId(data.id.toString());
|
||||||
setLoadingState(0);
|
setLoadingState(5);
|
||||||
|
|
||||||
!PROGRESS &&
|
|
||||||
navigate({
|
|
||||||
to: "/extractedResult/$pitchBook",
|
|
||||||
params: { pitchBook: data.id.toString() },
|
|
||||||
});
|
|
||||||
} else {
|
} else {
|
||||||
console.error("Failed to upload file");
|
console.error("Failed to upload file");
|
||||||
}
|
}
|
||||||
}, [files, navigate]);
|
}, [files]);
|
||||||
|
|
||||||
const onConnection = useCallback(() => {
|
const onConnection = useCallback(() => {
|
||||||
console.log("connected");
|
console.log("connected");
|
||||||
|
|
@ -80,18 +72,16 @@ export default function UploadPage() {
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<>
|
<>
|
||||||
{PROGRESS && (
|
<Backdrop
|
||||||
<Backdrop
|
sx={(theme) => ({ color: "#fff", zIndex: theme.zIndex.drawer + 1 })}
|
||||||
sx={(theme) => ({ color: "#fff", zIndex: theme.zIndex.drawer + 1 })}
|
open={pageId !== null && loadingState !== null && loadingState < 100}
|
||||||
open={pageId !== null && loadingState !== null && loadingState < 100}
|
>
|
||||||
>
|
<CircularProgressWithLabel
|
||||||
<CircularProgressWithLabel
|
color="inherit"
|
||||||
color="inherit"
|
value={loadingState || 0}
|
||||||
value={loadingState || 0}
|
size={60}
|
||||||
size={60}
|
/>
|
||||||
/>
|
</Backdrop>
|
||||||
</Backdrop>
|
|
||||||
)}
|
|
||||||
<Box
|
<Box
|
||||||
display="flex"
|
display="flex"
|
||||||
flexDirection="column"
|
flexDirection="column"
|
||||||
|
|
@ -195,4 +185,4 @@ export default function UploadPage() {
|
||||||
</Box>
|
</Box>
|
||||||
</>
|
</>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue