Build a document processing workflow in 30 minutes (or less!)

Build a document processing workflow in 30 minutes (or less!)
# workflows
# ai studio
# ocr

Learn how to develop with Mistral Workflows

April 28, 2026
Jen Person
Jen Person
Build a document processing workflow in 30 minutes (or less!)
Mistral Workflows is an orchestration platform for building, executing, and monitoring complex AI-driven workflows. It provides durable, fault-tolerant workflow execution backed by battle-tested distributed systems infrastructure, combined with a developer-friendly SDK.
In this tutorial, you’ll build an end-to-end medical document processing pipeline using three Mistral capabilities: OCR to read PDFs, Agents to classify documents and extract structured data, and Workflows to orchestrate the entire process reliably.
The pipeline takes any scanned medical PDF like a prescription, a hospital bill, or an imaging report and runs it through three steps: optical character recognition (OCR) to extract raw text, an AI agent to classify the document type with a confidence score, and a second agent to extract patient information and document-specific fields as structured JSON. Because it’s built on Mistral Workflows, each step is durable and fault-tolerant: if a worker restarts mid-execution, the workflow resumes where it left off instead of starting over.
You’ll also include a human-in-the-loop step: when the classifier’s confidence falls below a configurable threshold, the pipeline pauses and waits for a user to review and confirm the category before extraction continues. Mistral Workflows enables these long-running processes that can pause and resume based on external input, which is difficult to build reliably with a simple async queue or a chain of API calls.

What you’ll build:

  • A workflow that:
  • Takes medical files as input
  • Categorizes the content type and formats the data using Mistral Agents
  • A frontend for uploading the invoices built using Streamlit



What you’ll need:

  • Python 3.12 installed on your machine
  • uv installed on your machine

Steps

  1. Set up the environment
  1. Define fields to extract
  1. Build the workflow
  1. Create the app frontend
  1. Add load testing
  1. Update Makefile
  1. Run the app and workflow
  1. Success!

Set up the environment

Initialize the project

Use the following command to scaffold a ready-to-run Python project with the Workflows SDK configured, along with helper commands:
uvx mistralai-workflows-cli setup
When prompted, choose the default project name, and follow steps to generate a Mistral API key.

Examine the starter code

Open the my-workflow project in the IDE of your choice. Notice the structure of the project:
.agents/skills/workflows src/ workflows/ __init__.py hello.py start.py dev_worker.py discover.py .gitignore Makefile pyproject.toml README.md
Here are some key components of this scaffold project:
  • Agent Skills for Mistral Workflows are included for easy development with coding agents in the .agents/skills/workflows directory.
  • Workflows are stored in separate files in the src/workflows directory:
  • The start.py file contains code that enables you to trigger a workflow execution from the command line.
  • The hello.py file gives a minimal example workflow, showing how to use the @workflows.activity() , @workflows.workflow.define() , and @workflows.workflow.entrypoint decorators.
  • The dev_worker.py file contains a worker for local development that watches src/ for .py changes and auto-restarts the workflow worker as needed.
  • Makefile contains helper commands to streamline the development and testing process.
Through the tutorial steps, we will add four Python files to the scaffold project:
  • extraction_fields.py : Useful strings and constants that will be consumed by the workflow’s agents.
  • medical_doc_workflow.py : The actual functionality at the heart of the workflow.
  • app.py : A Streamlit frontend for consuming PDFs to run in the workflow.
  • load_test.py : Functionality to Launch multiple workflows in parallel to see how they load balance.

Install the dependencies

Let’s install the dependencies required for the project. We’ll first make some changes to the dependency requirements to reflect what is needed for the project.

Update the dependency requirements

Open pyproject.toml and update your dependencies to the following:
dependencies = [ "mistralai-workflows[mistralai]>=3.0.0,<4", "pydantic", "python-dotenv", "streamlit==1.55.0", "pymupdf>=1.23.0", ]
These updates make the following changes:
  • Replaces the default mistralai-workflows in the file with mistralai-workflows[mistralai] , adding the Mistral plugin, which provides native integration with Mistral's AI models and services, including durable agents, tool calling, and multi-agent handoffs.
  • Adds Streamlit as a dependency to create a basic frontend to upload the invoices.
  • Adds PyMuPDF for PDF analysis.
Use the included Makefile command to install the required dependencies:
make installdeps

Define fields to extract

Documents of the same type can come in a variety of formats. In the medical field, invoices, medical results, referrals, and more, can have different field names and different ways of organizing the same information.
To process information extracted from the documents, we define how the fields should be named.
To start, create a new directory under src called shared , and add a file called extraction_fields.py:
mkdir src/shared touch src/shared/extraction_fields.py

Define constants for fields

Add the following code to src/shared/extraction_fields.py :
""" Fields to extract by document type. Imported by workflow.py and app.py. """ COMMON_FIELDS: list[tuple[str, str]] = [ ("full_name", "Full name of the patient (last name + first name)"), ("patient_address", "Full address of the patient"), ("social_security_number", "Social security number of the patient"), ] SPECIFIC_FIELDS: dict[str, list[tuple[str, str]]] = { "prescription": [ ("doctor_name", "Name of the doctor who made the prescription"), ("doctor_address", "Address of the doctor who made the prescription"), ("doctor_rpps", "RPPS number of the doctor"), ("medications", "List of prescribed medications (separated by commas)"), ("prescription_date", "Date of the prescription"), ], "medical_bill": [ ("healthcare_professional_name", "Name of the healthcare professional"), ("healthcare_professional_address", "Address of the healthcare professional"), ("bill_amount", "Total billed amount (with currency)"), ("services", "Billed services or procedures"), ("care_date", "Date of care"), ], "hospitalization_report": [ ("hospital_name", "Name of the hospital"), ("department", "Department or care unit"), ("responsible_doctor", "Responsible doctor"), ("admission_date", "Date of admission"), ("discharge_date", "Date of discharge"), ("primary_diagnosis", "Primary diagnosis"), ], "biological_analysis": [ ("laboratory", "Name of the laboratory"), ("prescribing_doctor", "Prescribing doctor"), ("sample_date", "Date of the sample"), ("analyses", "Performed analyses"), ("abnormal_results", "Reported abnormal results"), ], "medical_imaging": [ ("facility", "Name of the imaging facility"), ("radiologist_name", "Name of the radiologist"), ("exam_type", "Type of exam (MRI, CT, X-ray, etc.)"), ("anatomical_region", "Anatomical region examined"), ("exam_date", "Date of the exam"), ("conclusion", "Conclusion or main finding"), ], "medical_certificate": [ ("doctor_name", "Name of the signing doctor"), ("doctor_address", "Address of the doctor"), ("certificate_subject", "Subject of the certificate"), ("certificate_date", "Date of the certificate"), ("sick_leave_duration", "Duration of sick leave if mentioned"), ], "mutual_reimbursement": [ ("mutual_name", "Name of the mutual insurance"), ("member_number", "Member number"), ("reimbursement_amount", "Amount reimbursed by the mutual insurance"), ("reimbursement_date", "Reimbursement date"), ("reimbursed_acts", "Reimbursed acts"), ], "social_security_reimbursement": [ ("fund_name", "Name of the social security fund"), ("reimbursement_amount", "Amount reimbursed by social security"), ("reimbursement_rate", "Applied reimbursement rate"), ("reimbursement_date", "Reimbursement date"), ("reimbursed_acts", "Reimbursed acts"), ], "consultation_report": [ ("doctor_name", "Name of the consulting doctor"), ("doctor_specialty", "Specialty of the doctor"), ("consultation_date", "Date of the consultation"), ("reason", "Reason for the consultation"), ("diagnosis", "Diagnosis"), ("prescribed_treatment", "Prescribed treatment or follow-up"), ], "informed_consent": [ ("doctor_name", "Name of the doctor"), ("facility", "Facility"), ("medical_procedure", "Medical procedure concerned"), ("signature_date", "Date of signature"), ], "other": [], } DOCUMENT_CATEGORIES: list[str] = list(SPECIFIC_FIELDS.keys()) CATEGORY_LABELS: dict[str, str] = { "prescription": "📋 Prescription", "medical_bill": "🧾 Medical Bill", "hospitalization_report": "🏥 Hospitalization Report", "biological_analysis": "🔬 Biological Analysis", "medical_imaging": "🩻 Medical Imaging", "medical_certificate": "📄 Medical Certificate", "mutual_reimbursement": "💳 Mutual Reimbursement", "social_security_reimbursement": "🏛️ Social Security Reimbursement", "consultation_report": "👨‍⚕️ Consultation Report", "informed_consent": "✍️ Informed Consent", "other": "❓ Other", }
These constants specify the formatting to use for extraction, standardizing output from inputs that may have a variety of names and layouts for the same types of data.
This function returns the extractor-agent prompt for OCR text and a document category that will then be processed in the workflow.

Build the workflow

With our desired fields configured, we can now build out the workflow using the Mistral Workflows SDK. Mistral Workflows uses decorators to define the components of the workflow. As we build out the workflow, we’ll discuss some of the available decorators.
Create a new file in the workflows folder called medical_doc_workflow.py:
touch src/workflows/medical_doc_workflow.py
Add the required imports and setup to medical_doc_workflow.py:
import asyncio import logging import os from functools import lru_cache from datetime import timedelta from typing import Optional import mistralai.workflows as workflows import mistralai.workflows.plugins.mistralai as workflows_mistralai from dotenv import load_dotenv from pydantic import BaseModel, ConfigDict, Field, create_model from shared.extraction_fields import COMMON_FIELDS, DOCUMENT_CATEGORIES, SPECIFIC_FIELDS load_dotenv(override=True) for name in ("mistralai_workflows", "httpx", "httpcore"): logging.getLogger(name).setLevel(logging.WARNING) class ManualCategorySignal(BaseModel): category: str class DocumentClassification(BaseModel): category: str = Field(description=f"One of: {', '.join(DOCUMENT_CATEGORIES)}") confidence: float = Field(ge=0.0, le=1.0) explanation: str @lru_cache(maxsize=None) def get_extraction_output_model(category: str) -> type[BaseModel]: common_model = create_model( "CommonExtractionFields", __config__=ConfigDict(extra="forbid"), **{key: (Optional[str], None) for key, _ in COMMON_FIELDS}, ) specific_model = create_model( f"SpecificExtractionFields_{category}", __config__=ConfigDict(extra="forbid"), **{key: (Optional[str], None) for key, _ in SPECIFIC_FIELDS.get(category, [])}, ) return create_model( f"ExtractionOutput_{category}", __config__=ConfigDict(extra="forbid"), common=(common_model, ...), specific=(specific_model, ...), )

Create the workflow activities

In Mistral Workflows, an activity is a unit of work that performs actual computations, API calls, or other operations. The are designated using the @workflows.activity decorator. By default, an activity is registered using its Python function name. We will pass two parameters with our activities:
start_to_close_timeout: sets the maximum time an activity can run from the moment it starts executing to the moment it must return a result. If the activity exceeds this duration, it is terminated and treated as a failure (which may trigger a retry). Without a timeout, a hung activity blocks indefinitely.
retry_policy_max_attempts: specifies how many times an activities retries when it fails.
To see all the available parameters, view the documentation on activity basics.
Our workflow includes three activities:
  • get_document_signed_url: Gets a signed URL for the file that can be passed to the next activity.
  • classify_document: Uses Mistral chat completions to categorize the type of document, passing the document categories and file info.
  • extract_patient_info: Uses Mistral chat completions to pull the desired information from the document based on the given category and return it as JSON.
After the imports and setup in medical_doc_workflow.py, add the following code to create the activities:
# ── Activities ──────────────────────────────────────────────────────────────── # Resolve the uploaded file ID into a temporary signed URL for Document QnA. @workflows.activity(start_to_close_timeout=timedelta(minutes=5), retry_policy_max_attempts=2) async def get_document_signed_url(file_id: str) -> str: client = workflows_mistralai.get_mistral_client() signed_url = await client.files.get_signed_url_async(file_id=file_id) return signed_url.url # Classify the document category directly from the document URL using structured output. @workflows.activity(start_to_close_timeout=timedelta(minutes=2), retry_policy_max_attempts=2) async def classify_document(document_url: str, filename: str) -> dict: client = workflows_mistralai.get_mistral_client() model = os.environ.get("MISTRAL_CLASSIFIER_MODEL", "mistral-medium-latest") response = await client.chat.parse_async( response_format=DocumentClassification, model=model, temperature=0.0, messages=[ { "role": "system", "content": ( "You are an expert in medical document classification. " "Return only valid JSON that matches the schema." ), }, { "role": "user", "content": [ { "type": "text", "text": ( f"Classify the medical document '{filename}' into exactly one category from:\\n" + "\\n".join(f"- {c}" for c in DOCUMENT_CATEGORIES) + "\\n\\n" "Return confidence between 0 and 1 and a short explanation." ), }, { "type": "document_url", "document_url": document_url, "document_name": filename, }, ], }, ], ) parsed = response.choices[0].message.parsed if response.choices and response.choices[0].message else None if parsed is None: raise RuntimeError("Classification response could not be parsed.") return parsed.model_dump(mode="json") # Extract common and category-specific fields from the document with schema-constrained JSON. @workflows.activity(start_to_close_timeout=timedelta(minutes=2), retry_policy_max_attempts=2) async def extract_patient_info(document_url: str, filename: str, category: str) -> dict: client = workflows_mistralai.get_mistral_client() model = os.environ.get("MISTRAL_EXTRACTOR_MODEL", "mistral-medium-latest") extraction_model = get_extraction_output_model(category) common_fields_text = "\\n".join(f"- {key}" for key, _ in COMMON_FIELDS) specific_fields_text = "\\n".join(f"- {key}" for key, _ in SPECIFIC_FIELDS.get(category, [])) response = await client.chat.parse_async( response_format=extraction_model, model=model, temperature=0.0, messages=[ { "role": "system", "content": ( "You extract administrative and medical information from a medical document. " "Return only valid JSON that matches the schema. " "If a field is missing, set it to null." ), }, { "role": "user", "content": [ { "type": "text", "text": ( f"Extract fields from '{filename}' for category '{category}'.\\n\\n" "Populate these common fields:\\n" f"{common_fields_text}\\n\\n" "Populate these category-specific fields:\\n" f"{specific_fields_text if specific_fields_text else '- (none)'}\\n\\n" "Return null for missing values." ), }, { "type": "document_url", "document_url": document_url, "document_name": filename, }, ], }, ], ) parsed = response.choices[0].message.parsed if response.choices and response.choices[0].message else None if parsed is None: raise RuntimeError("Extraction response could not be parsed.") return parsed.model_dump(mode="json")

Define the workflow

To define our workflow, we use the @workflows.workflow.define decorator, passing a name that will be used to identify the workflow.
We use three decorators provided by the SDK to run and track the workflow:
  • @workflows.workflow.query: Mistral Workflows’ query functionality is used to get the status of the workflow steps. Queries use synchronous communication and are read-only.
  • @workflows.workflow.signal: Mistral Workflows’ signal decorator is used to indicate if the user selected a document category. Signals use asynchronous communication and can be sent at any time during workflow execution. This is used when the workflow pauses to get human input if the category confidence threshold isn't met, allowing the user to select a threshold when the llm isn't confident.
  • @workflows.workflow.entrypoint: Mistral Workflows’ entrypoint decorator is used to define the run of the workflow. It walks through each activity, updating the steps along the way until the workflow is complete.
Add the following code after the activities to complete medical_doc_workflow.py :
# ── Workflow ────────────────────────────────────────────────────────────────── @workflows.workflow.define(name="pdf_ocr_workflow") class PdfOcrWorkflow(workflows.InteractiveWorkflow): def __init__(self): # UI-facing step state exposed through the `get_steps` query. self.steps = { "ocr": {"status": "pending", "result": None}, "classify": {"status": "pending", "result": None}, "extract": {"status": "pending", "result": None}, } # Set by the `manual_category` signal when a user overrides classification. self._manual_category = None @workflows.workflow.query(name="get_steps") def get_steps(self) -> dict: # Queries are synchronous/read-only: frontend polls this for progress. return self.steps @workflows.workflow.signal(name="manual_category") async def manual_category_signal(self, payload: ManualCategorySignal) -> None: # Signals are async/write: this mutates workflow state while run() is active. self._manual_category = payload.category @workflows.workflow.entrypoint async def run( self, file_id: str, filename: str, confidence_threshold: float = 0.9, manual_review_timeout_seconds: Optional[float] = None, ) -> workflows_mistralai.ChatAssistantWorkflowOutput: # TodoList items let clients display coarse workflow progress in addition to step data. ocr_item = workflows_mistralai.TodoListItem( title="Prepare document for Document QnA", description="Generate a signed URL so Mistral Document AI can read the document.", ) classify_item = workflows_mistralai.TodoListItem( title="Classify document type", description="Predict the medical document category with confidence.", ) extract_item = workflows_mistralai.TodoListItem( title="Extract structured fields", description="Extract patient and document-specific fields.", ) async with workflows_mistralai.TodoList(items=[ocr_item, classify_item, extract_item]): self.steps["ocr"]["status"] = "running" async with ocr_item: signed_document_url = await get_document_signed_url(file_id) self.steps["ocr"] = { "status": "done", "result": "Document prepared for Document QnA (OCR handled by Mistral Document AI).", } self.steps["classify"]["status"] = "running" async with classify_item: classification = await classify_document(signed_document_url, filename) if classification.get("confidence", 0.0) < confidence_threshold: # Low confidence enters a human-in-the-loop branch. self.steps["classify"] = {"status": "waiting_human", "result": classification} try: # wait_condition pauses deterministically until signal or timeout. await workflows.workflow.wait_condition( lambda: self._manual_category is not None, timeout=manual_review_timeout_seconds, timeout_summary="manual_category_review", ) except asyncio.TimeoutError: # Non-interactive callers can set a timeout to avoid waiting forever. classification["explanation"] = ( f"{classification.get('explanation', '').strip()} " "Manual review timed out; using model-predicted category." ).strip() else: # Manual override takes precedence when a signal arrives in time. classification["category"] = self._manual_category classification["confidence"] = 1.0 classification["explanation"] = f"Manually selected category: {self._manual_category}" self.steps["classify"] = {"status": "done", "result": classification} self.steps["extract"]["status"] = "running" async with extract_item: patient_info = await extract_patient_info(signed_document_url, filename, classification["category"]) self.steps["extract"] = {"status": "done", "result": patient_info} return workflows_mistralai.ChatAssistantWorkflowOutput( # Return both human-readable text and structured payload for the UI. content=[workflows_mistralai.TextOutput(text=f"Processing complete for {filename}.")], structuredContent={ "filename": filename, "ocr_text": "Document processed with Document QnA (no standalone OCR text payload).", "classification": classification, "patient_info": patient_info, }, ) async def main() -> None: print("Worker ready — waiting for tasks...\\n") await workflows.run_worker([PdfOcrWorkflow]) if __name__ == "__main__": asyncio.run(main())
This workflow is defined as an InteractiveWorkflow and orchestrates the three Mistral-powered stages from our activities: prepare document access, classify document type, and extract structured fields.

Create the app frontend

Workflows do not require a frontend, but we’ll include one for demonstration purposes.
In the entrypoints folder, create a new file called app.py :
touch src/entrypoints/app.py
The Streamlit app we’re building is a human-in-the-loop pipeline for processing medical PDFs end to end: it uploads a document, runs OCR, classifies the document type, and extracts structured patient information. It also displays workflow progress in real time and lets a user manually confirm the category when model confidence is too low, so the extraction remains reliable for downstream use.
""" Streamlit UI for PDF OCR + Classification + Extraction Workflow. """ import asyncio import io import os import time import uuid import streamlit as st from dotenv import load_dotenv from pydantic import BaseModel try: import fitz # PyMuPDF except ImportError: fitz = None load_dotenv(override=True) from mistralai.client import Mistral from mistralai.workflows.client import get_mistral_client from shared.extraction_fields import CATEGORY_LABELS API_KEY = os.environ["MISTRAL_API_KEY"] BASE_URL = os.environ.get("SERVER_URL", "<https://api.mistral.ai>") COMMON_FIELD_LABELS = { "full_name": "Full Name", "patient_address": "Address", "social_security_number": "Social Security Number", } STEPS_CONFIG = [ ("ocr", "✅ Document Preparation"), ("classify", "🏷️ Classification"), ("extract", "👤 Patient Extraction"), ] class PdfOcrInput(BaseModel): file_id: str filename: str confidence_threshold: float = 0.9
Let’s add the app’s integration layer between Streamlit and Mistral Workflows.
After the imports and setup in app.py , add the following code:
# ── Workflow Activities ───────────────────────────────────────────────────── def get_workflows_client(): """Create a fresh Mistral Workflows client for this operation. Do not cache across run_async() calls: each call creates/closes its own event loop, and reusing an async client across loops triggers "Event loop is closed" errors. """ return get_mistral_client( server_url=BASE_URL, api_key=API_KEY, ) def run_async(coro): """Run an async coroutine in a new event loop, cleaning up properly afterward.""" loop = asyncio.new_event_loop() try: return loop.run_until_complete(coro) finally: # Drain pending async generators before closing the loop to avoid: # "Task was destroyed but it is pending! ... async_generator_athrow" loop.run_until_complete(loop.shutdown_asyncgens()) loop.close() async def upload_pdf(pdf_bytes: bytes, filename: str) -> str: """Upload a PDF file to Mistral and return its file ID.""" async with Mistral(api_key=API_KEY) as client: resp = await client.files.upload_async( file={"file_name": filename, "content": pdf_bytes, "content_type": "application/pdf"}, purpose="ocr", ) return resp.id async def trigger_workflow(file_id: str, filename: str, confidence_threshold: float) -> str: """Start a new workflow execution and return its execution ID.""" execution_id = f"pdf-ocr-{uuid.uuid4().hex[:12]}" async with get_workflows_client() as client: resp = await client.workflows.execute_workflow_async( workflow_identifier="pdf_ocr_workflow", input=PdfOcrInput(file_id=file_id, filename=filename, confidence_threshold=confidence_threshold).model_dump(mode="json"), execution_id=execution_id, ) return resp.execution_id async def poll_steps(execution_id: str) -> dict: """Fetch the workflow's step progress from the get_steps query.""" async with get_workflows_client() as client: resp = await client.workflows.executions.query_workflow_execution_async( execution_id=execution_id, name="get_steps", ) return resp.result or {} async def get_execution_details(execution_id: str): """Fetch the full workflow execution record.""" async with get_workflows_client() as client: return await client.workflows.executions.get_workflow_execution_async(execution_id=execution_id) class ManualCategorySignal(BaseModel): category: str async def send_signal(execution_id: str, category: str): """Send a manual_category signal to the running workflow.""" async with get_workflows_client() as client: await client.workflows.executions.signal_workflow_execution_async( execution_id=execution_id, name="manual_category", input=ManualCategorySignal(category=category).model_dump(mode="json"), ) def backfill_steps_from_execution_result(steps: dict, result: object) -> dict: """Extract step data from the final execution result when normal polling is incomplete.""" if not isinstance(result, dict): return steps structured = result.get("structuredContent") if not isinstance(structured, dict): structured = result.get("structured_content") if not isinstance(structured, dict): structured = result ocr_text = structured.get("ocr_text") classification = structured.get("classification") patient_info = structured.get("patient_info") updated = dict(steps) if ocr_text is not None: updated["ocr"] = {"status": "done", "result": ocr_text} if isinstance(classification, dict): updated["classify"] = {"status": "done", "result": classification} if isinstance(patient_info, dict): updated["extract"] = {"status": "done", "result": patient_info} return updated
This code creates and reuses a single workflows client, provides a helper to run async API calls from Streamlit’s synchronous execution model, uploads the PDF file, starts a new workflow execution, polls step-by-step progress, checks overall execution status, and sends the manual category signal when a user overrides low-confidence classification.

Complete the Streamlit app

Let’s complete the application by adding the UI elements. This code block is lengthy, but not important to understanding Mistral Workflows.
After the workflow activities section you just added to app.py , add this code:
# ── PDF rendering ───────────────────────────────────────────────────────────── def get_pdf_first_page(pdf_bytes: bytes): """Convert first page of PDF to image.""" if not fitz: return None try: doc = fitz.open(stream=pdf_bytes, filetype="pdf") page = doc[0] pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5)) # 1.5x zoom for clarity img_bytes = pix.tobytes("ppm") return io.BytesIO(img_bytes) except Exception: return None # ── Step renderers ──────────────────────────────────────────────────────────── def render_step(key: str, step: dict): """Render a single workflow step in the UI based on its status.""" status = step.get("status", "pending") result = step.get("result") if status == "pending": st.markdown("⏳ Pending…") elif status == "running": st.spinner("⚙️ In Progress…") # spinner widget needs a context manager — use a visual substitute st.markdown("⚙️ In Progress…") elif status == "waiting_human": result = step.get("result", {}) confidence = result.get("confidence", 0.0) if result else 0.0 st.warning( f"⚠️ Insufficient confidence ({confidence * 100:.0f}%). " "Please choose the category manually." ) selected = st.selectbox( "Category", options=list(CATEGORY_LABELS.keys()), format_func=lambda k: CATEGORY_LABELS[k], key="manual_category_select", ) if st.button("Validate", key="manual_category_submit"): run_async(send_signal(st.session_state.execution_id, selected)) st.session_state.signal_sent = True st.rerun() elif status == "done" and result is not None: if key == "ocr": st.markdown("✅ Prepared for Document QnA") if isinstance(result, str) and result.strip(): st.caption(result) elif key == "classify": category = result.get("category", "other") confidence = result.get("confidence", 0.0) explanation = result.get("explanation", "") label = CATEGORY_LABELS.get(category, f"❓ {category}") col1, col2 = st.columns([3, 1]) col1.markdown(f"**{label}**") col1.caption(explanation) col2.metric("Confidence", f"{confidence * 100:.0f}%") col2.progress(confidence) elif key == "extract": common = result.get("common", {}) specific = result.get("specific", {}) st.markdown("**🧍 Patient Information**") common_rows = [ {"Field": COMMON_FIELD_LABELS.get(k, k), "Value": ", ".join(v) if isinstance(v, list) else v} for k, v in common.items() if v is not None ] if common_rows: st.table(common_rows) else: st.info("No common information found.") if specific: st.markdown("**📋 Specific Information**") specific_rows = [ {"Field": k.replace("_", " ").capitalize(), "Value": ", ".join(v) if isinstance(v, list) else v} for k, v in specific.items() if v is not None ] if specific_rows: st.table(specific_rows) else: st.info("No specific information found.") # ── UI ──────────────────────────────────────────────────────────────────────── st.set_page_config(page_title="PDF OCR & Classification", page_icon="📄", layout="wide") st.title("📄 PDF OCR & Classification") st.caption("Upload a PDF → OCR → Classification → Patient Extraction") with st.sidebar: st.header("⚙️ Parameters") confidence_threshold = st.slider( "Confidence Threshold", min_value=0.0, max_value=1.0, value=0.9, step=0.05, help="Below this threshold, classification requires manual validation.", ) st.caption(f"Current threshold: **{confidence_threshold * 100:.0f}%**") if confidence_threshold >= 1.0: st.info("☝️ Manual validation always required") elif confidence_threshold == 0.0: st.info("✅ Manual validation never required") # Init session state if "execution_id" not in st.session_state: st.session_state.execution_id = None if "done" not in st.session_state: st.session_state.done = False if "steps" not in st.session_state: st.session_state.steps = {} if "poll_error" not in st.session_state: st.session_state.poll_error = None if "signal_sent" not in st.session_state: st.session_state.signal_sent = False uploaded = st.file_uploader("Choose a PDF file", type=["pdf"]) if uploaded is not None: st.info(f"**{uploaded.name}** — {uploaded.size / 1024:.1f} KB") if st.button("Start Workflow", type="primary"): st.session_state.execution_id = None st.session_state.done = False st.session_state.steps = {} st.session_state.poll_error = None st.session_state.signal_sent = False pdf_bytes = uploaded.read() filename = uploaded.name with st.status("Uploading PDF…", expanded=False) as s: file_id = run_async(upload_pdf(pdf_bytes, filename)) s.update(label="Upload ✓", state="complete") execution_id = run_async(trigger_workflow(file_id, filename, confidence_threshold)) st.session_state.execution_id = execution_id st.rerun() if st.session_state.execution_id and not st.session_state.done: execution_id = st.session_state.execution_id try: steps = run_async(poll_steps(execution_id)) st.session_state.steps = steps st.session_state.poll_error = None except Exception as exc: steps = st.session_state.steps st.session_state.poll_error = str(exc) col_pdf, col_steps = st.columns([1, 1.2]) with col_pdf: st.markdown("### 📄 Document") if uploaded: pdf_bytes = uploaded.read() uploaded.seek(0) img = get_pdf_first_page(pdf_bytes) if img: st.image(img, width='stretch') else: st.info("PyMuPDF not available for preview") with col_steps: if st.session_state.poll_error: st.warning(f"Progress polling failed: {st.session_state.poll_error}") for key, title in STEPS_CONFIG: st.markdown(f"### {title}") step = steps.get(key, {"status": "pending", "result": None}) render_step(key, step) # Check if all done all_done = all( steps.get(k, {}).get("status") == "done" for k, _ in STEPS_CONFIG ) waiting_human = any( steps.get(k, {}).get("status") == "waiting_human" for k, _ in STEPS_CONFIG ) if all_done: st.session_state.done = True st.success("✅ Completed!") elif waiting_human and not st.session_state.signal_sent: pass elif waiting_human and st.session_state.signal_sent: time.sleep(0.5) st.rerun() else: try: execution = run_async(get_execution_details(execution_id)) wf_status = execution.status if wf_status == "COMPLETED": st.session_state.steps = backfill_steps_from_execution_result( st.session_state.steps, execution.result, ) st.session_state.done = True st.success("✅ Completed!") elif wf_status in ("FAILED", "CANCELED", "TERMINATED"): st.error(f"Workflow ended with status: {wf_status}") st.session_state.done = True else: time.sleep(0.5) st.rerun() except Exception as exc: st.session_state.poll_error = str(exc) time.sleep(0.5) st.rerun() elif st.session_state.execution_id and st.session_state.done: steps = st.session_state.steps for key, title in STEPS_CONFIG: st.markdown(f"### {title}") step = steps.get(key, {"status": "pending", "result": None}) render_step(key, step) st.success("✅ Completed!")
This code implements the front-end behavior of the app: it renders the PDF preview, displays each workflow step in the UI, and updates what the user sees based on step status (pending, running, waiting_human, done). It also handles manual review when classification confidence is low, shows extracted common/specific fields in tables, and manages Streamlit session state plus reruns so the interface continuously reflects live workflow progress until completion or failure.
With our workflow and frontend ready to go, we have everything we need to run the document processing. But to show the power or Mistral Workflows, we’ll add load testing next.

Add load testing

We’re creating a load testing mechanism to demonstrate one of the key advantages of Mistral Workflows: the ability to run many long-running, AI-heavy tasks concurrently without managing queues or threading yourself. It reads one or more PDF files from the input_doc folder, uploads them to the Mistral Files API, and fires off a specified number of workflow executions in parallel using only a few lines of async code.
In the entrypoints directory, make a new file called load_test.py :
touch src/entrypoints/load_test.py
In this file, we’ll use the workflows.execute_workflow_async() function to launch a specified number of workflows in parallel, with a default of 10.
Add the following code to load_test.py :
""" Load test script: Launch 10 workflows in parallel to demonstrate load balancing. Each workflow will be distributed across available workers. """ import asyncio import os import sys import uuid from pathlib import Path from dotenv import load_dotenv from pydantic import BaseModel load_dotenv(override=True) from mistralai.client import Mistral from mistralai.workflows.client import get_mistral_client API_KEY = os.environ["MISTRAL_API_KEY"] BASE_URL = os.environ.get("SERVER_URL", "<https://api.mistral.ai>") WORKFLOWS_CLIENT = None # Number of workflows to launch (from CLI arg, default to 10) NUM_WORKFLOWS = int(sys.argv[1]) if len(sys.argv) > 1 else 10 class PdfOcrInput(BaseModel): file_id: str filename: str confidence_threshold: float = 0.9 manual_review_timeout_seconds: float | None = 5.0 def get_workflows_sdk_client(): global WORKFLOWS_CLIENT if WORKFLOWS_CLIENT is None: WORKFLOWS_CLIENT = get_mistral_client( server_url=BASE_URL, api_key=API_KEY, ) return WORKFLOWS_CLIENT async def upload_pdf(pdf_bytes: bytes, filename: str) -> str: """Upload PDF and return file ID.""" async with Mistral(api_key=API_KEY) as client: resp = await client.files.upload_async( file={"file_name": filename, "content": pdf_bytes, "content_type": "application/pdf"}, purpose="ocr", ) return resp.id async def trigger_workflow(file_id: str, filename: str) -> str: """Trigger workflow and return execution ID.""" execution_id = f"load-test-{uuid.uuid4().hex[:12]}" client = get_workflows_sdk_client() resp = await client.workflows.execute_workflow_async( workflow_identifier="pdf_ocr_workflow", input=PdfOcrInput( file_id=file_id, filename=filename, confidence_threshold=0.9, manual_review_timeout_seconds=5.0, ).model_dump(mode="json"), execution_id=execution_id, ) return resp.execution_id async def launch_workflow(pdf_path: Path, index: int) -> tuple[int, str]: """Launch a single workflow and return (index, execution_id).""" pdf_bytes = pdf_path.read_bytes() file_id = await upload_pdf(pdf_bytes, pdf_path.name) execution_id = await trigger_workflow(file_id, pdf_path.name) return index, execution_id async def main(): project_root = Path(__file__).resolve().parent.parent.parent input_dir = project_root / "input_doc" pdf_files = list(input_dir.glob("*.pdf")) if not pdf_files: print("❌ No PDF files found in input_doc/") sys.exit(1) print(f"\\n🚀 Launching {NUM_WORKFLOWS} workflows (load balancing on available workers)") print(f"📁 Using PDFs from: {input_dir}\\n") # Launch all workflows in parallel tasks = [ launch_workflow(pdf_files[i % len(pdf_files)], i + 1) for i in range(NUM_WORKFLOWS) ] execution_ids = [] for index, execution_id in await asyncio.gather(*tasks): execution_ids.append(execution_id) print(f" ✓ Workflow {index:2d} → {execution_id}") print(f"\\n✅ All {NUM_WORKFLOWS} workflows launched!") print(f"\\nExecution IDs (for monitoring):") for i, eid in enumerate(execution_ids, 1): print(f" {i:2d}. {eid}") # Small delay to allow async clients to close gracefully await asyncio.sleep(0.1) if __name__ == "__main__": asyncio.run(main())
Each execution runs independently on the worker pool, so this is a practical way to verify that your worker scales under load and that the workflow engine correctly distributes work. You can control how many workflows to launch by passing a number as a CLI argument (e.g. make load-test n=10), and the script prints every execution ID so you can monitor or inspect individual runs afterward.

Update Makefile

The contents of Makefile is optimized for the starter app. Since we’ve made some code changes, it’s useful to add some new commands. Open Makefile and replace all the contents with the following:
.PHONY: start-worker execute installdeps ## Install dependencies installdeps: uv sync ## Auto-discover all workflows and start the worker (with file-watch auto-reload) start-worker: uv run python src/dev_worker.py ## Trigger a workflow execution ## Usage: make execute workflow=hello-world input='{"name": "World"}' execute: uv run python src/workflows/start.py $(if $(workflow),--workflow $(workflow),) $(if $(input),--input '$(input)',) ## Start the Streamlit app streamlit: PYTHONPATH=src uv run streamlit run src/entrypoints/app.py ## Run load test load-test: uv run python src/entrypoints/load_test.py $(if $(n),$(n),)
This streamlines the process of creating the agents, running the application, and load testing, simplifying the development and testing process.

Run the app and workflow

We’re ready to run our workflow! In testing the process, we’ll take these steps:
  1. Download the sample data
  1. Start the worker
  1. Start the Streamlit app
  1. Upload and process a file
  1. View workflow in Mistral AI Studio
  1. Run load testing

Download the sample data

You can of course test out the process with any documents of your own, but since not everyone has medical documents lying around their desktop, we provide some examples for you. Click here to download the PDF invoices for testing.
Create a folder called input_doc at the root of the project, and add the files to the folder:
mkdir input_doc
Be sure to move the example files to this folder, as that is where the load tester will look for them.

Start the worker

To start the worker, use the included Makefile command:
make start-worker
This helper auto-discovers all workflows and start the worker, with file-watch auto-reload to update when you make changes.

Start the Streamlit app

In a separate terminal window, start the Streamlit app:
make streamlit
The app will automatically open in your browser. If it doesn’t open, follow the local URL in the terminal logs:
PYTHONPATH=src uv run streamlit run src/entrypoints/app.py You can now view your Streamlit app in your browser. Local URL: <http://localhost:8501>

Upload and process a file

Click Browse Files and choose one of the provided samples. Click Start Workflow to process the file. Each step of the workflow displays after completion, showing a preview of the document, the raw OCR text, a classification, and the extracted info.

View workflow in Mistral AI Studio

Navigate to Mistral AI Studio in your browser and select the pdf_ocr_workflow. Click on the execution to see details of the workflow output. For each activity, you can inspect the input, output, and metadata.

Run load testing

The frontend gives us a good way to see our workflow in action, but a load test can show the benefit of Mistral Workflows by showcasing how they handle executions at scale.
Stop the Streamlit app using CTRL+C, then run the following command:
make load-test
All the work in our workflow is async, so a single worker can already handle many concurrent executions at once; that is, it doesn't block waiting for one to finish before starting the next. So for a modest load test (10–20 workflows), one worker is enough.
To run a larger load test, you can open additional terminal windows and run make start-worker in each.
To test at higher or lower loads, you can provide a specific number of runs:
make load-test n=50

Success! You created a workflow

You just created a document processing workflow that:
  • Uses Mistral OCR to identify text in files
  • Uses Mistral's Chat API to categorize and format the text
You also created a basic Streamlit frontend and a load testing app to run the workflow.
You now have the foundation you need to build, execute, and monitor complex AI-driven workflows using Mistral Workflows. You can view the complete code example on GitHub.

Next steps

Comments (0)
Popular
avatar

Dive in

Related

Blog
How to Vibe Code: A Developer's Playbook
By Akshay Pachaar • Apr 16th, 2026 Views 352
Blog
Designing a speech-to-speech assistant
By Diogo Alexandre Alme... • Apr 2nd, 2026 Views 1.2K
Blog
How to Vibe Code: A Developer's Playbook
By Akshay Pachaar • Apr 16th, 2026 Views 352
Blog
Designing a speech-to-speech assistant
By Diogo Alexandre Alme... • Apr 2nd, 2026 Views 1.2K
Terms of Service