Build a document processing workflow in 30 minutes (or less!)

# workflows
# ai studio
# ocr
Learn how to develop with Mistral Workflows
April 28, 2026
Jen Person

Mistral Workflows is an orchestration platform for building, executing, and monitoring complex AI-driven workflows. It provides durable, fault-tolerant workflow execution backed by battle-tested distributed systems infrastructure, combined with a developer-friendly SDK.
In this tutorial, you’ll build an end-to-end medical document processing pipeline using three Mistral capabilities: OCR to read PDFs, Agents to classify documents and extract structured data, and Workflows to orchestrate the entire process reliably.
The pipeline takes any scanned medical PDF like a prescription, a hospital bill, or an imaging report and runs it through three steps: optical character recognition (OCR) to extract raw text, an AI agent to classify the document type with a confidence score, and a second agent to extract patient information and document-specific fields as structured JSON. Because it’s built on Mistral Workflows, each step is durable and fault-tolerant: if a worker restarts mid-execution, the workflow resumes where it left off instead of starting over.
You’ll also include a human-in-the-loop step: when the classifier’s confidence falls below a configurable threshold, the pipeline pauses and waits for a user to review and confirm the category before extraction continues. Mistral Workflows enables these long-running processes that can pause and resume based on external input, which is difficult to build reliably with a simple async queue or a chain of API calls.
What you’ll build:
- A workflow that:
- Takes medical files as input
- Uses Mistral OCR Processor to recognize the content of the document
- Categorizes the content type and formats the data using Mistral Agents
- A frontend for uploading the invoices built using Streamlit
What you’ll need:
- Python 3.12 installed on your machine
- uv installed on your machine
Steps
- Set up the environment
- Define fields to extract
- Build the workflow
- Create the app frontend
- Add load testing
- Update Makefile
- Run the app and workflow
- Success!
Set up the environment
Initialize the project
Use the following command to scaffold a ready-to-run Python project with the Workflows SDK configured, along with helper commands:
uvx mistralai-workflows-cli setupWhen prompted, choose the default project name, and follow steps to generate a Mistral API key.
Examine the starter code
Open the
my-workflow project in the IDE of your choice. Notice the structure of the project:.agents/skills/workflows
src/
workflows/
__init__.py
hello.py
start.py
dev_worker.py
discover.py
.gitignore
Makefile
pyproject.toml
README.mdHere are some key components of this scaffold project:
- Agent Skills for Mistral Workflows are included for easy development with coding agents in the .agents/skills/workflows directory.
- Workflows are stored in separate files in the
src/workflowsdirectory:
- The
start.pyfile contains code that enables you to trigger a workflow execution from the command line.
- The
hello.pyfile gives a minimal example workflow, showing how to use the@workflows.activity(),@workflows.workflow.define(), and@workflows.workflow.entrypointdecorators.
- The
dev_worker.pyfile contains a worker for local development that watchessrc/for.pychanges and auto-restarts the workflow worker as needed.
Makefilecontains helper commands to streamline the development and testing process.
Through the tutorial steps, we will add four Python files to the scaffold project:
extraction_fields.py: Useful strings and constants that will be consumed by the workflow’s agents.
medical_doc_workflow.py: The actual functionality at the heart of the workflow.
app.py: A Streamlit frontend for consuming PDFs to run in the workflow.
load_test.py: Functionality to Launch multiple workflows in parallel to see how they load balance.
Install the dependencies
Let’s install the dependencies required for the project. We’ll first make some changes to the dependency requirements to reflect what is needed for the project.
Update the dependency requirements
Open
pyproject.toml and update your dependencies to the following:dependencies = [
"mistralai-workflows[mistralai]>=3.0.0,<4",
"pydantic",
"python-dotenv",
"streamlit==1.55.0",
"pymupdf>=1.23.0",
]These updates make the following changes:
- Replaces the default
mistralai-workflowsin the file withmistralai-workflows[mistralai], adding the Mistral plugin, which provides native integration with Mistral's AI models and services, including durable agents, tool calling, and multi-agent handoffs.
- Adds Streamlit as a dependency to create a basic frontend to upload the invoices.
- Adds PyMuPDF for PDF analysis.
Use the included Makefile command to install the required dependencies:
make installdepsDefine fields to extract
Documents of the same type can come in a variety of formats. In the medical field, invoices, medical results, referrals, and more, can have different field names and different ways of organizing the same information.
To process information extracted from the documents, we define how the fields should be named.
To start, create a new directory under
src called shared , and add a file called extraction_fields.py:mkdir src/shared
touch src/shared/extraction_fields.pyDefine constants for fields
Add the following code to
src/shared/extraction_fields.py :"""
Fields to extract by document type.
Imported by workflow.py and app.py.
"""
COMMON_FIELDS: list[tuple[str, str]] = [
("full_name", "Full name of the patient (last name + first name)"),
("patient_address", "Full address of the patient"),
("social_security_number", "Social security number of the patient"),
]
SPECIFIC_FIELDS: dict[str, list[tuple[str, str]]] = {
"prescription": [
("doctor_name", "Name of the doctor who made the prescription"),
("doctor_address", "Address of the doctor who made the prescription"),
("doctor_rpps", "RPPS number of the doctor"),
("medications", "List of prescribed medications (separated by commas)"),
("prescription_date", "Date of the prescription"),
],
"medical_bill": [
("healthcare_professional_name", "Name of the healthcare professional"),
("healthcare_professional_address", "Address of the healthcare professional"),
("bill_amount", "Total billed amount (with currency)"),
("services", "Billed services or procedures"),
("care_date", "Date of care"),
],
"hospitalization_report": [
("hospital_name", "Name of the hospital"),
("department", "Department or care unit"),
("responsible_doctor", "Responsible doctor"),
("admission_date", "Date of admission"),
("discharge_date", "Date of discharge"),
("primary_diagnosis", "Primary diagnosis"),
],
"biological_analysis": [
("laboratory", "Name of the laboratory"),
("prescribing_doctor", "Prescribing doctor"),
("sample_date", "Date of the sample"),
("analyses", "Performed analyses"),
("abnormal_results", "Reported abnormal results"),
],
"medical_imaging": [
("facility", "Name of the imaging facility"),
("radiologist_name", "Name of the radiologist"),
("exam_type", "Type of exam (MRI, CT, X-ray, etc.)"),
("anatomical_region", "Anatomical region examined"),
("exam_date", "Date of the exam"),
("conclusion", "Conclusion or main finding"),
],
"medical_certificate": [
("doctor_name", "Name of the signing doctor"),
("doctor_address", "Address of the doctor"),
("certificate_subject", "Subject of the certificate"),
("certificate_date", "Date of the certificate"),
("sick_leave_duration", "Duration of sick leave if mentioned"),
],
"mutual_reimbursement": [
("mutual_name", "Name of the mutual insurance"),
("member_number", "Member number"),
("reimbursement_amount", "Amount reimbursed by the mutual insurance"),
("reimbursement_date", "Reimbursement date"),
("reimbursed_acts", "Reimbursed acts"),
],
"social_security_reimbursement": [
("fund_name", "Name of the social security fund"),
("reimbursement_amount", "Amount reimbursed by social security"),
("reimbursement_rate", "Applied reimbursement rate"),
("reimbursement_date", "Reimbursement date"),
("reimbursed_acts", "Reimbursed acts"),
],
"consultation_report": [
("doctor_name", "Name of the consulting doctor"),
("doctor_specialty", "Specialty of the doctor"),
("consultation_date", "Date of the consultation"),
("reason", "Reason for the consultation"),
("diagnosis", "Diagnosis"),
("prescribed_treatment", "Prescribed treatment or follow-up"),
],
"informed_consent": [
("doctor_name", "Name of the doctor"),
("facility", "Facility"),
("medical_procedure", "Medical procedure concerned"),
("signature_date", "Date of signature"),
],
"other": [],
}
DOCUMENT_CATEGORIES: list[str] = list(SPECIFIC_FIELDS.keys())
CATEGORY_LABELS: dict[str, str] = {
"prescription": "📋 Prescription",
"medical_bill": "🧾 Medical Bill",
"hospitalization_report": "🏥 Hospitalization Report",
"biological_analysis": "🔬 Biological Analysis",
"medical_imaging": "🩻 Medical Imaging",
"medical_certificate": "📄 Medical Certificate",
"mutual_reimbursement": "💳 Mutual Reimbursement",
"social_security_reimbursement": "🏛️ Social Security Reimbursement",
"consultation_report": "👨⚕️ Consultation Report",
"informed_consent": "✍️ Informed Consent",
"other": "❓ Other",
}These constants specify the formatting to use for extraction, standardizing output from inputs that may have a variety of names and layouts for the same types of data.
This function returns the extractor-agent prompt for OCR text and a document category that will then be processed in the workflow.
Build the workflow
With our desired fields configured, we can now build out the workflow using the Mistral Workflows SDK. Mistral Workflows uses decorators to define the components of the workflow. As we build out the workflow, we’ll discuss some of the available decorators.
Create a new file in the
workflows folder called medical_doc_workflow.py:touch src/workflows/medical_doc_workflow.pyAdd the required imports and setup to
medical_doc_workflow.py:import asyncio
import logging
import os
from functools import lru_cache
from datetime import timedelta
from typing import Optional
import mistralai.workflows as workflows
import mistralai.workflows.plugins.mistralai as workflows_mistralai
from dotenv import load_dotenv
from pydantic import BaseModel, ConfigDict, Field, create_model
from shared.extraction_fields import COMMON_FIELDS, DOCUMENT_CATEGORIES, SPECIFIC_FIELDS
load_dotenv(override=True)
for name in ("mistralai_workflows", "httpx", "httpcore"):
logging.getLogger(name).setLevel(logging.WARNING)
class ManualCategorySignal(BaseModel):
category: str
class DocumentClassification(BaseModel):
category: str = Field(description=f"One of: {', '.join(DOCUMENT_CATEGORIES)}")
confidence: float = Field(ge=0.0, le=1.0)
explanation: str
@lru_cache(maxsize=None)
def get_extraction_output_model(category: str) -> type[BaseModel]:
common_model = create_model(
"CommonExtractionFields",
__config__=ConfigDict(extra="forbid"),
**{key: (Optional[str], None) for key, _ in COMMON_FIELDS},
)
specific_model = create_model(
f"SpecificExtractionFields_{category}",
__config__=ConfigDict(extra="forbid"),
**{key: (Optional[str], None) for key, _ in SPECIFIC_FIELDS.get(category, [])},
)
return create_model(
f"ExtractionOutput_{category}",
__config__=ConfigDict(extra="forbid"),
common=(common_model, ...),
specific=(specific_model, ...),
)Create the workflow activities
In Mistral Workflows, an activity is a unit of work that performs actual computations, API calls, or other operations. The are designated using the
@workflows.activity decorator. By default, an activity is registered using its Python function name. We will pass two parameters with our activities:start_to_close_timeout: sets the maximum time an activity can run from the moment it starts executing to the moment it must return a result. If the activity exceeds this duration, it is terminated and treated as a failure (which may trigger a retry). Without a timeout, a hung activity blocks indefinitely.retry_policy_max_attempts: specifies how many times an activities retries when it fails.Our workflow includes three activities:
get_document_signed_url: Gets a signed URL for the file that can be passed to the next activity.
classify_document: Uses Mistral chat completions to categorize the type of document, passing the document categories and file info.
extract_patient_info: Uses Mistral chat completions to pull the desired information from the document based on the given category and return it as JSON.
After the imports and setup in
medical_doc_workflow.py, add the following code to create the activities:# ── Activities ────────────────────────────────────────────────────────────────
# Resolve the uploaded file ID into a temporary signed URL for Document QnA.
@workflows.activity(start_to_close_timeout=timedelta(minutes=5), retry_policy_max_attempts=2)
async def get_document_signed_url(file_id: str) -> str:
client = workflows_mistralai.get_mistral_client()
signed_url = await client.files.get_signed_url_async(file_id=file_id)
return signed_url.url
# Classify the document category directly from the document URL using structured output.
@workflows.activity(start_to_close_timeout=timedelta(minutes=2), retry_policy_max_attempts=2)
async def classify_document(document_url: str, filename: str) -> dict:
client = workflows_mistralai.get_mistral_client()
model = os.environ.get("MISTRAL_CLASSIFIER_MODEL", "mistral-medium-latest")
response = await client.chat.parse_async(
response_format=DocumentClassification,
model=model,
temperature=0.0,
messages=[
{
"role": "system",
"content": (
"You are an expert in medical document classification. "
"Return only valid JSON that matches the schema."
),
},
{
"role": "user",
"content": [
{
"type": "text",
"text": (
f"Classify the medical document '{filename}' into exactly one category from:\\n"
+ "\\n".join(f"- {c}" for c in DOCUMENT_CATEGORIES)
+ "\\n\\n"
"Return confidence between 0 and 1 and a short explanation."
),
},
{
"type": "document_url",
"document_url": document_url,
"document_name": filename,
},
],
},
],
)
parsed = response.choices[0].message.parsed if response.choices and response.choices[0].message else None
if parsed is None:
raise RuntimeError("Classification response could not be parsed.")
return parsed.model_dump(mode="json")
# Extract common and category-specific fields from the document with schema-constrained JSON.
@workflows.activity(start_to_close_timeout=timedelta(minutes=2), retry_policy_max_attempts=2)
async def extract_patient_info(document_url: str, filename: str, category: str) -> dict:
client = workflows_mistralai.get_mistral_client()
model = os.environ.get("MISTRAL_EXTRACTOR_MODEL", "mistral-medium-latest")
extraction_model = get_extraction_output_model(category)
common_fields_text = "\\n".join(f"- {key}" for key, _ in COMMON_FIELDS)
specific_fields_text = "\\n".join(f"- {key}" for key, _ in SPECIFIC_FIELDS.get(category, []))
response = await client.chat.parse_async(
response_format=extraction_model,
model=model,
temperature=0.0,
messages=[
{
"role": "system",
"content": (
"You extract administrative and medical information from a medical document. "
"Return only valid JSON that matches the schema. "
"If a field is missing, set it to null."
),
},
{
"role": "user",
"content": [
{
"type": "text",
"text": (
f"Extract fields from '{filename}' for category '{category}'.\\n\\n"
"Populate these common fields:\\n"
f"{common_fields_text}\\n\\n"
"Populate these category-specific fields:\\n"
f"{specific_fields_text if specific_fields_text else '- (none)'}\\n\\n"
"Return null for missing values."
),
},
{
"type": "document_url",
"document_url": document_url,
"document_name": filename,
},
],
},
],
)
parsed = response.choices[0].message.parsed if response.choices and response.choices[0].message else None
if parsed is None:
raise RuntimeError("Extraction response could not be parsed.")
return parsed.model_dump(mode="json")Define the workflow
To define our workflow, we use the
@workflows.workflow.define decorator, passing a name that will be used to identify the workflow.We use three decorators provided by the SDK to run and track the workflow:
@workflows.workflow.query: Mistral Workflows’ query functionality is used to get the status of the workflow steps. Queries use synchronous communication and are read-only.
@workflows.workflow.signal: Mistral Workflows’ signal decorator is used to indicate if the user selected a document category. Signals use asynchronous communication and can be sent at any time during workflow execution. This is used when the workflow pauses to get human input if the category confidence threshold isn't met, allowing the user to select a threshold when the llm isn't confident.
@workflows.workflow.entrypoint: Mistral Workflows’ entrypoint decorator is used to define the run of the workflow. It walks through each activity, updating the steps along the way until the workflow is complete.
Add the following code after the activities to complete
medical_doc_workflow.py :# ── Workflow ──────────────────────────────────────────────────────────────────
@workflows.workflow.define(name="pdf_ocr_workflow")
class PdfOcrWorkflow(workflows.InteractiveWorkflow):
def __init__(self):
# UI-facing step state exposed through the `get_steps` query.
self.steps = {
"ocr": {"status": "pending", "result": None},
"classify": {"status": "pending", "result": None},
"extract": {"status": "pending", "result": None},
}
# Set by the `manual_category` signal when a user overrides classification.
self._manual_category = None
@workflows.workflow.query(name="get_steps")
def get_steps(self) -> dict:
# Queries are synchronous/read-only: frontend polls this for progress.
return self.steps
@workflows.workflow.signal(name="manual_category")
async def manual_category_signal(self, payload: ManualCategorySignal) -> None:
# Signals are async/write: this mutates workflow state while run() is active.
self._manual_category = payload.category
@workflows.workflow.entrypoint
async def run(
self,
file_id: str,
filename: str,
confidence_threshold: float = 0.9,
manual_review_timeout_seconds: Optional[float] = None,
) -> workflows_mistralai.ChatAssistantWorkflowOutput:
# TodoList items let clients display coarse workflow progress in addition to step data.
ocr_item = workflows_mistralai.TodoListItem(
title="Prepare document for Document QnA",
description="Generate a signed URL so Mistral Document AI can read the document.",
)
classify_item = workflows_mistralai.TodoListItem(
title="Classify document type",
description="Predict the medical document category with confidence.",
)
extract_item = workflows_mistralai.TodoListItem(
title="Extract structured fields",
description="Extract patient and document-specific fields.",
)
async with workflows_mistralai.TodoList(items=[ocr_item, classify_item, extract_item]):
self.steps["ocr"]["status"] = "running"
async with ocr_item:
signed_document_url = await get_document_signed_url(file_id)
self.steps["ocr"] = {
"status": "done",
"result": "Document prepared for Document QnA (OCR handled by Mistral Document AI).",
}
self.steps["classify"]["status"] = "running"
async with classify_item:
classification = await classify_document(signed_document_url, filename)
if classification.get("confidence", 0.0) < confidence_threshold:
# Low confidence enters a human-in-the-loop branch.
self.steps["classify"] = {"status": "waiting_human", "result": classification}
try:
# wait_condition pauses deterministically until signal or timeout.
await workflows.workflow.wait_condition(
lambda: self._manual_category is not None,
timeout=manual_review_timeout_seconds,
timeout_summary="manual_category_review",
)
except asyncio.TimeoutError:
# Non-interactive callers can set a timeout to avoid waiting forever.
classification["explanation"] = (
f"{classification.get('explanation', '').strip()} "
"Manual review timed out; using model-predicted category."
).strip()
else:
# Manual override takes precedence when a signal arrives in time.
classification["category"] = self._manual_category
classification["confidence"] = 1.0
classification["explanation"] = f"Manually selected category: {self._manual_category}"
self.steps["classify"] = {"status": "done", "result": classification}
self.steps["extract"]["status"] = "running"
async with extract_item:
patient_info = await extract_patient_info(signed_document_url, filename, classification["category"])
self.steps["extract"] = {"status": "done", "result": patient_info}
return workflows_mistralai.ChatAssistantWorkflowOutput(
# Return both human-readable text and structured payload for the UI.
content=[workflows_mistralai.TextOutput(text=f"Processing complete for {filename}.")],
structuredContent={
"filename": filename,
"ocr_text": "Document processed with Document QnA (no standalone OCR text payload).",
"classification": classification,
"patient_info": patient_info,
},
)
async def main() -> None:
print("Worker ready — waiting for tasks...\\n")
await workflows.run_worker([PdfOcrWorkflow])
if __name__ == "__main__":
asyncio.run(main())This workflow is defined as an
InteractiveWorkflow and orchestrates the three Mistral-powered stages from our activities: prepare document access, classify document type, and extract structured fields.Create the app frontend
Workflows do not require a frontend, but we’ll include one for demonstration purposes.
In the
entrypoints folder, create a new file called app.py :touch src/entrypoints/app.pyThe Streamlit app we’re building is a human-in-the-loop pipeline for processing medical PDFs end to end: it uploads a document, runs OCR, classifies the document type, and extracts structured patient information. It also displays workflow progress in real time and lets a user manually confirm the category when model confidence is too low, so the extraction remains reliable for downstream use.
"""
Streamlit UI for PDF OCR + Classification + Extraction Workflow.
"""
import asyncio
import io
import os
import time
import uuid
import streamlit as st
from dotenv import load_dotenv
from pydantic import BaseModel
try:
import fitz # PyMuPDF
except ImportError:
fitz = None
load_dotenv(override=True)
from mistralai.client import Mistral
from mistralai.workflows.client import get_mistral_client
from shared.extraction_fields import CATEGORY_LABELS
API_KEY = os.environ["MISTRAL_API_KEY"]
BASE_URL = os.environ.get("SERVER_URL", "<https://api.mistral.ai>")
COMMON_FIELD_LABELS = {
"full_name": "Full Name",
"patient_address": "Address",
"social_security_number": "Social Security Number",
}
STEPS_CONFIG = [
("ocr", "✅ Document Preparation"),
("classify", "🏷️ Classification"),
("extract", "👤 Patient Extraction"),
]
class PdfOcrInput(BaseModel):
file_id: str
filename: str
confidence_threshold: float = 0.9Let’s add the app’s integration layer between Streamlit and Mistral Workflows.
After the imports and setup in
app.py , add the following code:# ── Workflow Activities ─────────────────────────────────────────────────────
def get_workflows_client():
"""Create a fresh Mistral Workflows client for this operation.
Do not cache across run_async() calls: each call creates/closes its own
event loop, and reusing an async client across loops triggers
"Event loop is closed" errors.
"""
return get_mistral_client(
server_url=BASE_URL,
api_key=API_KEY,
)
def run_async(coro):
"""Run an async coroutine in a new event loop, cleaning up properly afterward."""
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
# Drain pending async generators before closing the loop to avoid:
# "Task was destroyed but it is pending! ... async_generator_athrow"
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
async def upload_pdf(pdf_bytes: bytes, filename: str) -> str:
"""Upload a PDF file to Mistral and return its file ID."""
async with Mistral(api_key=API_KEY) as client:
resp = await client.files.upload_async(
file={"file_name": filename, "content": pdf_bytes, "content_type": "application/pdf"},
purpose="ocr",
)
return resp.id
async def trigger_workflow(file_id: str, filename: str, confidence_threshold: float) -> str:
"""Start a new workflow execution and return its execution ID."""
execution_id = f"pdf-ocr-{uuid.uuid4().hex[:12]}"
async with get_workflows_client() as client:
resp = await client.workflows.execute_workflow_async(
workflow_identifier="pdf_ocr_workflow",
input=PdfOcrInput(file_id=file_id, filename=filename, confidence_threshold=confidence_threshold).model_dump(mode="json"),
execution_id=execution_id,
)
return resp.execution_id
async def poll_steps(execution_id: str) -> dict:
"""Fetch the workflow's step progress from the get_steps query."""
async with get_workflows_client() as client:
resp = await client.workflows.executions.query_workflow_execution_async(
execution_id=execution_id,
name="get_steps",
)
return resp.result or {}
async def get_execution_details(execution_id: str):
"""Fetch the full workflow execution record."""
async with get_workflows_client() as client:
return await client.workflows.executions.get_workflow_execution_async(execution_id=execution_id)
class ManualCategorySignal(BaseModel):
category: str
async def send_signal(execution_id: str, category: str):
"""Send a manual_category signal to the running workflow."""
async with get_workflows_client() as client:
await client.workflows.executions.signal_workflow_execution_async(
execution_id=execution_id,
name="manual_category",
input=ManualCategorySignal(category=category).model_dump(mode="json"),
)
def backfill_steps_from_execution_result(steps: dict, result: object) -> dict:
"""Extract step data from the final execution result when normal polling is incomplete."""
if not isinstance(result, dict):
return steps
structured = result.get("structuredContent")
if not isinstance(structured, dict):
structured = result.get("structured_content")
if not isinstance(structured, dict):
structured = result
ocr_text = structured.get("ocr_text")
classification = structured.get("classification")
patient_info = structured.get("patient_info")
updated = dict(steps)
if ocr_text is not None:
updated["ocr"] = {"status": "done", "result": ocr_text}
if isinstance(classification, dict):
updated["classify"] = {"status": "done", "result": classification}
if isinstance(patient_info, dict):
updated["extract"] = {"status": "done", "result": patient_info}
return updated
This code creates and reuses a single workflows client, provides a helper to run async API calls from Streamlit’s synchronous execution model, uploads the PDF file, starts a new workflow execution, polls step-by-step progress, checks overall execution status, and sends the manual category signal when a user overrides low-confidence classification.
Complete the Streamlit app
Let’s complete the application by adding the UI elements. This code block is lengthy, but not important to understanding Mistral Workflows.
After the workflow activities section you just added to
app.py , add this code:# ── PDF rendering ─────────────────────────────────────────────────────────────
def get_pdf_first_page(pdf_bytes: bytes):
"""Convert first page of PDF to image."""
if not fitz:
return None
try:
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
page = doc[0]
pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5)) # 1.5x zoom for clarity
img_bytes = pix.tobytes("ppm")
return io.BytesIO(img_bytes)
except Exception:
return None
# ── Step renderers ────────────────────────────────────────────────────────────
def render_step(key: str, step: dict):
"""Render a single workflow step in the UI based on its status."""
status = step.get("status", "pending")
result = step.get("result")
if status == "pending":
st.markdown("⏳ Pending…")
elif status == "running":
st.spinner("⚙️ In Progress…")
# spinner widget needs a context manager — use a visual substitute
st.markdown("⚙️ In Progress…")
elif status == "waiting_human":
result = step.get("result", {})
confidence = result.get("confidence", 0.0) if result else 0.0
st.warning(
f"⚠️ Insufficient confidence ({confidence * 100:.0f}%). "
"Please choose the category manually."
)
selected = st.selectbox(
"Category",
options=list(CATEGORY_LABELS.keys()),
format_func=lambda k: CATEGORY_LABELS[k],
key="manual_category_select",
)
if st.button("Validate", key="manual_category_submit"):
run_async(send_signal(st.session_state.execution_id, selected))
st.session_state.signal_sent = True
st.rerun()
elif status == "done" and result is not None:
if key == "ocr":
st.markdown("✅ Prepared for Document QnA")
if isinstance(result, str) and result.strip():
st.caption(result)
elif key == "classify":
category = result.get("category", "other")
confidence = result.get("confidence", 0.0)
explanation = result.get("explanation", "")
label = CATEGORY_LABELS.get(category, f"❓ {category}")
col1, col2 = st.columns([3, 1])
col1.markdown(f"**{label}**")
col1.caption(explanation)
col2.metric("Confidence", f"{confidence * 100:.0f}%")
col2.progress(confidence)
elif key == "extract":
common = result.get("common", {})
specific = result.get("specific", {})
st.markdown("**🧍 Patient Information**")
common_rows = [
{"Field": COMMON_FIELD_LABELS.get(k, k), "Value": ", ".join(v) if isinstance(v, list) else v}
for k, v in common.items() if v is not None
]
if common_rows:
st.table(common_rows)
else:
st.info("No common information found.")
if specific:
st.markdown("**📋 Specific Information**")
specific_rows = [
{"Field": k.replace("_", " ").capitalize(), "Value": ", ".join(v) if isinstance(v, list) else v}
for k, v in specific.items() if v is not None
]
if specific_rows:
st.table(specific_rows)
else:
st.info("No specific information found.")
# ── UI ────────────────────────────────────────────────────────────────────────
st.set_page_config(page_title="PDF OCR & Classification", page_icon="📄", layout="wide")
st.title("📄 PDF OCR & Classification")
st.caption("Upload a PDF → OCR → Classification → Patient Extraction")
with st.sidebar:
st.header("⚙️ Parameters")
confidence_threshold = st.slider(
"Confidence Threshold",
min_value=0.0,
max_value=1.0,
value=0.9,
step=0.05,
help="Below this threshold, classification requires manual validation.",
)
st.caption(f"Current threshold: **{confidence_threshold * 100:.0f}%**")
if confidence_threshold >= 1.0:
st.info("☝️ Manual validation always required")
elif confidence_threshold == 0.0:
st.info("✅ Manual validation never required")
# Init session state
if "execution_id" not in st.session_state:
st.session_state.execution_id = None
if "done" not in st.session_state:
st.session_state.done = False
if "steps" not in st.session_state:
st.session_state.steps = {}
if "poll_error" not in st.session_state:
st.session_state.poll_error = None
if "signal_sent" not in st.session_state:
st.session_state.signal_sent = False
uploaded = st.file_uploader("Choose a PDF file", type=["pdf"])
if uploaded is not None:
st.info(f"**{uploaded.name}** — {uploaded.size / 1024:.1f} KB")
if st.button("Start Workflow", type="primary"):
st.session_state.execution_id = None
st.session_state.done = False
st.session_state.steps = {}
st.session_state.poll_error = None
st.session_state.signal_sent = False
pdf_bytes = uploaded.read()
filename = uploaded.name
with st.status("Uploading PDF…", expanded=False) as s:
file_id = run_async(upload_pdf(pdf_bytes, filename))
s.update(label="Upload ✓", state="complete")
execution_id = run_async(trigger_workflow(file_id, filename, confidence_threshold))
st.session_state.execution_id = execution_id
st.rerun()
if st.session_state.execution_id and not st.session_state.done:
execution_id = st.session_state.execution_id
try:
steps = run_async(poll_steps(execution_id))
st.session_state.steps = steps
st.session_state.poll_error = None
except Exception as exc:
steps = st.session_state.steps
st.session_state.poll_error = str(exc)
col_pdf, col_steps = st.columns([1, 1.2])
with col_pdf:
st.markdown("### 📄 Document")
if uploaded:
pdf_bytes = uploaded.read()
uploaded.seek(0)
img = get_pdf_first_page(pdf_bytes)
if img:
st.image(img, width='stretch')
else:
st.info("PyMuPDF not available for preview")
with col_steps:
if st.session_state.poll_error:
st.warning(f"Progress polling failed: {st.session_state.poll_error}")
for key, title in STEPS_CONFIG:
st.markdown(f"### {title}")
step = steps.get(key, {"status": "pending", "result": None})
render_step(key, step)
# Check if all done
all_done = all(
steps.get(k, {}).get("status") == "done"
for k, _ in STEPS_CONFIG
)
waiting_human = any(
steps.get(k, {}).get("status") == "waiting_human"
for k, _ in STEPS_CONFIG
)
if all_done:
st.session_state.done = True
st.success("✅ Completed!")
elif waiting_human and not st.session_state.signal_sent:
pass
elif waiting_human and st.session_state.signal_sent:
time.sleep(0.5)
st.rerun()
else:
try:
execution = run_async(get_execution_details(execution_id))
wf_status = execution.status
if wf_status == "COMPLETED":
st.session_state.steps = backfill_steps_from_execution_result(
st.session_state.steps,
execution.result,
)
st.session_state.done = True
st.success("✅ Completed!")
elif wf_status in ("FAILED", "CANCELED", "TERMINATED"):
st.error(f"Workflow ended with status: {wf_status}")
st.session_state.done = True
else:
time.sleep(0.5)
st.rerun()
except Exception as exc:
st.session_state.poll_error = str(exc)
time.sleep(0.5)
st.rerun()
elif st.session_state.execution_id and st.session_state.done:
steps = st.session_state.steps
for key, title in STEPS_CONFIG:
st.markdown(f"### {title}")
step = steps.get(key, {"status": "pending", "result": None})
render_step(key, step)
st.success("✅ Completed!")This code implements the front-end behavior of the app: it renders the PDF preview, displays each workflow step in the UI, and updates what the user sees based on step status (
pending, running, waiting_human, done). It also handles manual review when classification confidence is low, shows extracted common/specific fields in tables, and manages Streamlit session state plus reruns so the interface continuously reflects live workflow progress until completion or failure.With our workflow and frontend ready to go, we have everything we need to run the document processing. But to show the power or Mistral Workflows, we’ll add load testing next.
Add load testing
We’re creating a load testing mechanism to demonstrate one of the key advantages of Mistral Workflows: the ability to run many long-running, AI-heavy tasks concurrently without managing queues or threading yourself. It reads one or more PDF files from the input_doc folder, uploads them to the Mistral Files API, and fires off a specified number of workflow executions in parallel using only a few lines of async code.
In the
entrypoints directory, make a new file called load_test.py :touch src/entrypoints/load_test.pyIn this file, we’ll use the
workflows.execute_workflow_async() function to launch a specified number of workflows in parallel, with a default of 10.Add the following code to
load_test.py :"""
Load test script: Launch 10 workflows in parallel to demonstrate load balancing.
Each workflow will be distributed across available workers.
"""
import asyncio
import os
import sys
import uuid
from pathlib import Path
from dotenv import load_dotenv
from pydantic import BaseModel
load_dotenv(override=True)
from mistralai.client import Mistral
from mistralai.workflows.client import get_mistral_client
API_KEY = os.environ["MISTRAL_API_KEY"]
BASE_URL = os.environ.get("SERVER_URL", "<https://api.mistral.ai>")
WORKFLOWS_CLIENT = None
# Number of workflows to launch (from CLI arg, default to 10)
NUM_WORKFLOWS = int(sys.argv[1]) if len(sys.argv) > 1 else 10
class PdfOcrInput(BaseModel):
file_id: str
filename: str
confidence_threshold: float = 0.9
manual_review_timeout_seconds: float | None = 5.0
def get_workflows_sdk_client():
global WORKFLOWS_CLIENT
if WORKFLOWS_CLIENT is None:
WORKFLOWS_CLIENT = get_mistral_client(
server_url=BASE_URL,
api_key=API_KEY,
)
return WORKFLOWS_CLIENT
async def upload_pdf(pdf_bytes: bytes, filename: str) -> str:
"""Upload PDF and return file ID."""
async with Mistral(api_key=API_KEY) as client:
resp = await client.files.upload_async(
file={"file_name": filename, "content": pdf_bytes, "content_type": "application/pdf"},
purpose="ocr",
)
return resp.id
async def trigger_workflow(file_id: str, filename: str) -> str:
"""Trigger workflow and return execution ID."""
execution_id = f"load-test-{uuid.uuid4().hex[:12]}"
client = get_workflows_sdk_client()
resp = await client.workflows.execute_workflow_async(
workflow_identifier="pdf_ocr_workflow",
input=PdfOcrInput(
file_id=file_id,
filename=filename,
confidence_threshold=0.9,
manual_review_timeout_seconds=5.0,
).model_dump(mode="json"),
execution_id=execution_id,
)
return resp.execution_id
async def launch_workflow(pdf_path: Path, index: int) -> tuple[int, str]:
"""Launch a single workflow and return (index, execution_id)."""
pdf_bytes = pdf_path.read_bytes()
file_id = await upload_pdf(pdf_bytes, pdf_path.name)
execution_id = await trigger_workflow(file_id, pdf_path.name)
return index, execution_id
async def main():
project_root = Path(__file__).resolve().parent.parent.parent
input_dir = project_root / "input_doc"
pdf_files = list(input_dir.glob("*.pdf"))
if not pdf_files:
print("❌ No PDF files found in input_doc/")
sys.exit(1)
print(f"\\n🚀 Launching {NUM_WORKFLOWS} workflows (load balancing on available workers)")
print(f"📁 Using PDFs from: {input_dir}\\n")
# Launch all workflows in parallel
tasks = [
launch_workflow(pdf_files[i % len(pdf_files)], i + 1)
for i in range(NUM_WORKFLOWS)
]
execution_ids = []
for index, execution_id in await asyncio.gather(*tasks):
execution_ids.append(execution_id)
print(f" ✓ Workflow {index:2d} → {execution_id}")
print(f"\\n✅ All {NUM_WORKFLOWS} workflows launched!")
print(f"\\nExecution IDs (for monitoring):")
for i, eid in enumerate(execution_ids, 1):
print(f" {i:2d}. {eid}")
# Small delay to allow async clients to close gracefully
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(main())Each execution runs independently on the worker pool, so this is a practical way to verify that your worker scales under load and that the workflow engine correctly distributes work. You can control how many workflows to launch by passing a number as a CLI argument (e.g.
make load-test n=10), and the script prints every execution ID so you can monitor or inspect individual runs afterward.Update Makefile
The contents of Makefile is optimized for the starter app. Since we’ve made some code changes, it’s useful to add some new commands. Open
Makefile and replace all the contents with the following:.PHONY: start-worker execute installdeps
## Install dependencies
installdeps:
uv sync
## Auto-discover all workflows and start the worker (with file-watch auto-reload)
start-worker:
uv run python src/dev_worker.py
## Trigger a workflow execution
## Usage: make execute workflow=hello-world input='{"name": "World"}'
execute:
uv run python src/workflows/start.py $(if $(workflow),--workflow $(workflow),) $(if $(input),--input '$(input)',)
## Start the Streamlit app
streamlit:
PYTHONPATH=src uv run streamlit run src/entrypoints/app.py
## Run load test
load-test:
uv run python src/entrypoints/load_test.py $(if $(n),$(n),)This streamlines the process of creating the agents, running the application, and load testing, simplifying the development and testing process.
Run the app and workflow
We’re ready to run our workflow! In testing the process, we’ll take these steps:
- Download the sample data
- Start the worker
- Start the Streamlit app
- Upload and process a file
- View workflow in Mistral AI Studio
- Run load testing
Download the sample data
You can of course test out the process with any documents of your own, but since not everyone has medical documents lying around their desktop, we provide some examples for you. Click here to download the PDF invoices for testing.
Create a folder called
input_doc at the root of the project, and add the files to the folder:mkdir input_docBe sure to move the example files to this folder, as that is where the load tester will look for them.
Start the worker
To start the worker, use the included Makefile command:
make start-workerThis helper auto-discovers all workflows and start the worker, with file-watch auto-reload to update when you make changes.
Start the Streamlit app
In a separate terminal window, start the Streamlit app:
make streamlitThe app will automatically open in your browser. If it doesn’t open, follow the local URL in the terminal logs:
PYTHONPATH=src uv run streamlit run src/entrypoints/app.py
You can now view your Streamlit app in your browser.
Local URL: <http://localhost:8501>Upload and process a file
Click Browse Files and choose one of the provided samples. Click Start Workflow to process the file. Each step of the workflow displays after completion, showing a preview of the document, the raw OCR text, a classification, and the extracted info.
View workflow in Mistral AI Studio
Navigate to Mistral AI Studio in your browser and select the
pdf_ocr_workflow. Click on the execution to see details of the workflow output. For each activity, you can inspect the input, output, and metadata.Run load testing
The frontend gives us a good way to see our workflow in action, but a load test can show the benefit of Mistral Workflows by showcasing how they handle executions at scale.
Stop the Streamlit app using CTRL+C, then run the following command:
make load-testAll the work in our workflow is async, so a single worker can already handle many concurrent executions at once; that is, it doesn't block waiting for one to finish before starting the next. So for a modest load test (10–20 workflows), one worker is enough.
To run a larger load test, you can open additional terminal windows and run
make start-worker in each.To test at higher or lower loads, you can provide a specific number of runs:
make load-test n=50Success! You created a workflow
You just created a document processing workflow that:
- Uses Mistral OCR to identify text in files
- Uses Mistral's Chat API to categorize and format the text
You also created a basic Streamlit frontend and a load testing app to run the workflow.
You now have the foundation you need to build, execute, and monitor complex AI-driven workflows using Mistral Workflows. You can view the complete code example on GitHub.
Next steps
- Check out core Mistral Workflows concepts.
1
Comments (0)
Popular

