Skip to content
← all posts
8 min read

Building APIs Around Agent Chains: REST, Webhooks, and Streaming

Mentiko Team

Your agent chain works. It takes input, processes it through multiple agents, and produces useful output. Now someone else wants to use it. A frontend app, a partner integration, an internal tool. They don't want to learn your orchestration platform. They want an API endpoint they can call.

Wrapping agent chains in APIs is how chains go from internal tools to platform capabilities. Here's how to build REST endpoints, webhook callbacks, and streaming interfaces around your agent chains.

The challenge: chains are slow

Before we get into patterns, understand the fundamental constraint: agent chains are slow. A chain that calls three LLM-powered agents takes 15-60 seconds to complete. That's an eternity for an API caller expecting sub-second responses.

This constraint shapes every design decision:

  • Synchronous APIs only work for fast chains (< 10 seconds)
  • Most chains need async patterns (webhooks, polling, streaming)
  • Callers need progress information while they wait
  • Timeouts need to be chain-aware, not HTTP-aware

Keep this in mind as we go through each pattern.

Pattern 1: Synchronous REST

The simplest approach. Caller sends a request, waits for the chain to complete, gets the result.

POST /api/chains/summarize
Content-Type: application/json

{
  "input": {
    "document_url": "https://example.com/report.pdf",
    "format": "executive_summary"
  }
}

---

200 OK
{
  "run_id": "run-abc123",
  "status": "completed",
  "output": {
    "summary": "The Q1 report shows...",
    "key_points": ["Revenue grew 12%", "..."],
    "word_count": 245
  },
  "cost": 0.08,
  "duration_ms": 8400
}

When to use it

  • Chain completes in < 10 seconds consistently
  • Caller can tolerate blocking requests
  • Simple integration requirements (no progress tracking needed)

Implementation

The API handler triggers the chain, waits for completion, and returns the result:

@app.post("/api/chains/{chain_name}")
async def run_chain(chain_name: str, request: ChainRequest):
    run = await orchestrator.run(
        chain=chain_name,
        input=request.input,
        timeout=30
    )

    if run.status == "completed":
        return {
            "run_id": run.id,
            "status": "completed",
            "output": run.output,
            "cost": run.cost,
            "duration_ms": run.duration_ms
        }
    elif run.status == "timeout":
        raise HTTPException(504, "Chain execution timed out")
    else:
        raise HTTPException(500, f"Chain failed: {run.error}")

Pitfalls

HTTP timeouts. Most HTTP clients, load balancers, and API gateways have default timeouts of 30-60 seconds. If your chain takes longer, the request dies before the chain completes. You've burned tokens and compute for nothing.

Retry storms. When a synchronous request times out, callers retry. Now you have two chain runs for one request. If the retry also times out, you get three. Set appropriate retry limits and use idempotency keys.

Resource starvation. Each waiting request holds an HTTP connection. If 100 callers send requests simultaneously and each chain takes 30 seconds, you need 100 concurrent connections for 30 seconds. Your API server might not handle that.

Synchronous REST is fine for lightweight chains with fast turnaround. For anything else, go async.

Pattern 2: Async with polling

Caller starts the chain, gets a run ID immediately, and polls for the result.

POST /api/chains/research/runs
Content-Type: application/json

{
  "input": { "topic": "AI agent market size 2026" }
}

---

202 Accepted
{
  "run_id": "run-def456",
  "status": "running",
  "poll_url": "/api/runs/run-def456",
  "estimated_duration_s": 45
}

The caller polls the status endpoint:

GET /api/runs/run-def456

---

200 OK
{
  "run_id": "run-def456",
  "status": "running",
  "progress": {
    "current_agent": "synthesizer",
    "agents_completed": 2,
    "agents_total": 4
  }
}

When complete:

GET /api/runs/run-def456

---

200 OK
{
  "run_id": "run-def456",
  "status": "completed",
  "output": { ... },
  "cost": 0.34,
  "duration_ms": 42000
}

When to use it

  • Chain takes > 10 seconds
  • Caller is a frontend app that can show a loading state
  • You want to track progress across agents
  • Simple to implement on both sides

Implementation

Two endpoints: one to start the chain, one to check status.

@app.post("/api/chains/{chain_name}/runs")
async def start_chain(chain_name: str, request: ChainRequest):
    run_id = await orchestrator.start(
        chain=chain_name,
        input=request.input
    )
    return JSONResponse(
        status_code=202,
        content={
            "run_id": run_id,
            "status": "running",
            "poll_url": f"/api/runs/{run_id}"
        }
    )

@app.get("/api/runs/{run_id}")
async def get_run(run_id: str):
    run = await orchestrator.get_run(run_id)
    response = {
        "run_id": run.id,
        "status": run.status,
    }
    if run.status == "running":
        response["progress"] = run.progress
    elif run.status == "completed":
        response["output"] = run.output
        response["cost"] = run.cost
        response["duration_ms"] = run.duration_ms
    elif run.status == "failed":
        response["error"] = run.error
    return response

Pitfalls

Polling interval. Too fast and you're wasting bandwidth. Too slow and the user waits longer than necessary after completion. Start with 2-second intervals and back off to 5 seconds after 30 seconds. Include estimated_duration_s in the initial response so the caller can set a reasonable first poll delay.

Orphaned runs. Caller starts a chain, then disconnects (closes the browser, crashes). The chain keeps running, consuming resources. Set a maximum lifetime on runs and auto-cancel chains that nobody has polled in the last 5 minutes.

Storage for results. Completed run results need to be stored somewhere the poll endpoint can access. In-memory works for single-server setups. For distributed systems, use Redis or a database. Set a TTL on results -- you don't need to keep them forever.

Pattern 3: Webhooks

Caller starts the chain and provides a callback URL. When the chain completes, the platform sends the result to the callback.

POST /api/chains/data-enrichment/runs
Content-Type: application/json

{
  "input": { "customer_id": "cust-789" },
  "webhook": {
    "url": "https://yourapp.com/hooks/enrichment-complete",
    "secret": "whsec_abc123",
    "events": ["completed", "failed"]
  }
}

---

202 Accepted
{
  "run_id": "run-ghi789",
  "status": "running"
}

When the chain completes:

POST https://yourapp.com/hooks/enrichment-complete
Content-Type: application/json
X-Webhook-Signature: sha256=...

{
  "event": "chain.completed",
  "run_id": "run-ghi789",
  "chain": "data-enrichment",
  "output": { ... },
  "cost": 0.22,
  "duration_ms": 31000,
  "timestamp": "2026-03-19T14:30:00Z"
}

When to use it

  • Server-to-server integrations (no browser in the loop)
  • Caller doesn't want to poll
  • Chain results trigger downstream processing
  • Event-driven architectures

Implementation

The webhook sender runs after the chain completes:

async def send_webhook(run: Run, webhook_config: WebhookConfig):
    payload = {
        "event": f"chain.{run.status}",
        "run_id": run.id,
        "chain": run.chain_name,
        "output": run.output,
        "cost": run.cost,
        "duration_ms": run.duration_ms,
        "timestamp": datetime.utcnow().isoformat()
    }

    signature = hmac.new(
        webhook_config.secret.encode(),
        json.dumps(payload).encode(),
        hashlib.sha256
    ).hexdigest()

    for attempt in range(3):
        try:
            response = await http.post(
                webhook_config.url,
                json=payload,
                headers={"X-Webhook-Signature": f"sha256={signature}"},
                timeout=10
            )
            if response.status_code < 300:
                return
        except Exception:
            await asyncio.sleep(2 ** attempt)

    # All retries failed, log and store for manual retry
    await store_failed_webhook(run.id, payload)

Pitfalls

Webhook reliability. The receiver's server might be down. Always implement retries with exponential backoff. Store failed webhooks for manual retry. Provide a "resend webhook" endpoint so callers can request redelivery.

Security. Sign every webhook payload with a shared secret. The receiver verifies the signature before processing. Without this, anyone can POST fake results to the callback URL.

Ordering. If a chain fails and is retried, the webhook for the retry might arrive before the webhook for the failure. Include timestamps and run IDs so the receiver can handle out-of-order delivery.

Receiver timeouts. Your webhook POST has a timeout too. If the receiver takes 30 seconds to respond, you're holding a connection. Keep webhook delivery fast (< 5 second timeout) and let the receiver process asynchronously.

Pattern 4: Server-Sent Events (streaming)

The caller opens a persistent connection and receives events as each agent completes. This is the best UX for real-time interfaces.

POST /api/chains/writing-pipeline/stream
Content-Type: application/json

{
  "input": { "topic": "agent chain APIs", "tone": "technical" }
}

---

200 OK
Content-Type: text/event-stream

event: run.started
data: {"run_id":"run-jkl012","agents_total":3}

event: agent.started
data: {"agent":"outliner","index":0}

event: agent.output
data: {"agent":"outliner","chunk":"## Introduction\n"}

event: agent.output
data: {"agent":"outliner","chunk":"Agent chains are..."}

event: agent.completed
data: {"agent":"outliner","index":0,"duration_ms":4200}

event: agent.started
data: {"agent":"writer","index":1}

event: agent.output
data: {"agent":"writer","chunk":"Your agent chain works..."}

...

event: run.completed
data: {"run_id":"run-jkl012","cost":0.18,"duration_ms":28000}

When to use it

  • Frontend apps showing real-time progress
  • Agent output that should be displayed as it's generated
  • Long-running chains where users need feedback
  • Chat-like interfaces wrapping agent chains

Implementation

Use Server-Sent Events (SSE). The API handler creates a streaming response and pushes events as the chain executes:

@app.post("/api/chains/{chain_name}/stream")
async def stream_chain(chain_name: str, request: ChainRequest):
    async def event_generator():
        run = orchestrator.start_with_events(
            chain=chain_name,
            input=request.input
        )

        async for event in run.events():
            yield f"event: {event.type}\ndata: {json.dumps(event.data)}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

Pitfalls

Connection drops. SSE connections can drop due to network issues, proxy timeouts, or client disconnects. Include a run_id in the first event so the client can reconnect and poll for the remaining results if the stream drops.

Buffering. Proxies and load balancers often buffer responses. Disable buffering for SSE endpoints. In nginx: proxy_buffering off. In Cloudflare: SSE is supported but verify it's not being cached.

Token streaming. If you want to stream individual tokens from the LLM (character by character), the agent needs to forward the LLM's streaming response through the event stream. This adds complexity but provides the best UX for content-generation chains.

Choosing the right pattern

| Scenario | Pattern | |---|---| | Chain < 10s, simple integration | Synchronous REST | | Chain > 10s, frontend app | Polling or SSE | | Server-to-server, event-driven | Webhooks | | Real-time UI, content generation | SSE streaming | | Multiple consumers need results | Webhooks + polling fallback |

Most production deployments combine patterns. Offer polling as the default, webhooks for server-to-server, and streaming for frontend integrations. They all share the same chain execution engine -- only the delivery mechanism differs.

API design principles

Regardless of pattern, follow these principles:

Include run metadata. Every response should include run_id, status, cost, and duration_ms. Callers need this for debugging, cost tracking, and SLA monitoring.

Use consistent error formats. Agent failures, timeout failures, and validation failures should all use the same error response structure. The caller shouldn't need different error handling per failure type.

Version your API. Chain behavior changes when you update prompts or swap models. Version your API so callers can pin to a known behavior.

Rate limit by caller. Each API caller should have its own rate limit. A runaway integration shouldn't be able to exhaust your chain capacity for everyone.

Authenticate everything. API keys, OAuth tokens, or JWTs. No unauthenticated access to chain execution, even internally. You're spending real money on every chain run.

Wrapping chains in APIs transforms them from scripts you run manually into services that power products. The investment in a proper API layer pays back every time a new consumer wants to use your chain without learning your orchestration platform.


Want to understand the events that power these APIs? Read the complete guide to agent events or learn about event-driven vs DAG orchestration.

Get new posts in your inbox

No spam. Unsubscribe anytime.