production-stack foundations 05 / 17 26 min read · 35 min hands-on

step 05 · ship · foundations

Wrap as a FastAPI service

/v1/chat/completions of your own — streaming, API-key auth, structured logging, health checks. The endpoint your application code talks to.

apiproduction

We have a model running. We can hit it with curl. We have evals. The last foundations piece: wrap it in a service that we control.

Why: the inference engine (Ollama, vLLM) is one specific concern — fast token generation. Your application has many other concerns: authentication, rate limiting, request logging for observability, prompt templating, A/B testing, retry logic, fallback to a different backend if the primary’s down. None of those belong in vLLM; all of them belong in a thin layer in front of it.

That thin layer is what we build now. By the end you’ll have your own /v1/chat/completions endpoint — same OpenAI schema your application code already speaks (step 02), but with auth, logging, and a clean place to add the rest of the production cross-cutting concerns over the next 12 steps.

Architecture

The shape:

┌─────────────────┐
│  client app     │
│  (your code,    │     POST /v1/chat/completions
│   browser, etc.)│     ↓ Authorization: Bearer <key>
└────────┬────────┘

┌────────────────────────────────┐
│  YOUR FastAPI service          │   ← we build this
│  ─────────────────────────     │
│  • API-key auth                │
│  • request logging (JSON)      │
│  • health check                │
│  • prompt templating later     │
│  • RAG injection later (step 6)│
│  • tool calling later (step 9) │
└────────┬───────────────────────┘
         ↓ POST /v1/chat/completions
         ↓ (no auth here; localhost-only)
┌─────────────────┐
│ vLLM or Ollama  │   ← already running from step 02 / 03
│  (the engine)   │
└─────────────────┘

Two services, two concerns. The engine is fast token generation; the FastAPI layer is everything else. Easy to swap engines (already proved in step 03), easy to add cross-cutting features (next 12 steps).

Refactor the client

We’ve been carrying a slightly awkward ollama_client.py + vllm_client.py split. Time to consolidate to the cleaner LLM class I sketched in step 03.

# stack/llm.py
from __future__ import annotations
import json
import os
from dataclasses import dataclass
from typing import Iterator, AsyncIterator
import httpx


@dataclass
class LLMConfig:
    base_url: str
    model: str
    timeout: float = 120.0


# Two pre-built configs. Swap by env var at startup.
OLLAMA_CONFIG = LLMConfig(
    base_url="http://localhost:11434/v1",
    model="llama3.1:8b",
)
VLLM_CONFIG = LLMConfig(
    base_url="http://localhost:8000/v1",
    model="meta-llama/Llama-3.1-8B-Instruct",
)


def config_from_env() -> LLMConfig:
    """Pick a backend from LLM_BACKEND env var. Default: Ollama."""
    backend = os.environ.get("LLM_BACKEND", "ollama").lower()
    if backend == "vllm":
        return VLLM_CONFIG
    return OLLAMA_CONFIG


class LLM:
    """Thin client for any OpenAI-compatible /v1/chat/completions endpoint."""

    def __init__(self, config: LLMConfig | None = None) -> None:
        self.config = config or config_from_env()

    # ── Sync API ────────────────────────────────────────────
    def chat(
        self,
        messages: list[dict],
        model: str | None = None,
        temperature: float = 0.7,
        max_tokens: int | None = None,
        **kwargs,
    ) -> dict:
        """Single-shot chat. Returns the full OpenAI-shaped response dict."""
        payload = {
            "model": model or self.config.model,
            "messages": messages,
            "stream": False,
            "temperature": temperature,
            **kwargs,
        }
        if max_tokens is not None:
            payload["max_tokens"] = max_tokens
        with httpx.Client(timeout=self.config.timeout) as client:
            r = client.post(f"{self.config.base_url}/chat/completions", json=payload)
            r.raise_for_status()
            return r.json()

    # ── Async streaming API ─────────────────────────────────
    async def astream(
        self,
        messages: list[dict],
        model: str | None = None,
        temperature: float = 0.7,
        **kwargs,
    ) -> AsyncIterator[dict]:
        """Yield SSE chunks (parsed as dicts) as they arrive from the backend."""
        payload = {
            "model": model or self.config.model,
            "messages": messages,
            "stream": True,
            "temperature": temperature,
            **kwargs,
        }
        async with httpx.AsyncClient(timeout=None) as client:
            async with client.stream(
                "POST", f"{self.config.base_url}/chat/completions", json=payload
            ) as r:
                r.raise_for_status()
                async for line in r.aiter_lines():
                    if not line or not line.startswith("data: "):
                        continue
                    data = line[6:]
                    if data == "[DONE]":
                        break
                    yield json.loads(data)

Two methods: chat (synchronous, blocking, full response) and astream (async, yields SSE chunks). FastAPI loves the async path; we’ll bind it to streaming responses below.

Build the FastAPI service

# stack/server.py
from __future__ import annotations
import json
import logging
import os
import time
import uuid
from typing import AsyncIterator
from fastapi import Depends, FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field

from stack.llm import LLM, config_from_env


# ─── logging setup ──────────────────────────────────────────
# Structured JSON logs so the observability layer in step 12 can
# ingest them cleanly. Production tip: never log raw prompts/outputs
# at INFO level — they may contain PII. Log them at DEBUG and gate
# by env var.
class JsonFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        payload = {
            "ts": int(record.created * 1000),
            "level": record.levelname,
            "msg": record.getMessage(),
            **getattr(record, "extra", {}),
        }
        return json.dumps(payload)


handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
log = logging.getLogger("stack")
log.handlers[:] = [handler]
log.setLevel(logging.INFO)


# ─── auth ──────────────────────────────────────────────────
# Single-tenant API-key auth. For a real product you'd use OAuth or
# per-user keys with rate limits in a database. Single-key works for
# step 05; we'll layer rate limiting in step 12 alongside observability.
API_KEYS = set(
    k.strip()
    for k in os.environ.get("STACK_API_KEYS", "dev-key").split(",")
    if k.strip()
)


def verify_key(authorization: str = Header(None)) -> str:
    """Bearer-token check. Returns the key on success."""
    if not authorization or not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Missing Authorization header")
    key = authorization.removeprefix("Bearer ").strip()
    if key not in API_KEYS:
        raise HTTPException(status_code=401, detail="Invalid API key")
    return key


# ─── schemas ───────────────────────────────────────────────
# We mirror the OpenAI Chat Completions schema closely — partly because
# our backend speaks it, partly so client tooling (the OpenAI Python SDK,
# LangChain, etc.) can hit our service unchanged.

class Message(BaseModel):
    role: str
    content: str


class ChatRequest(BaseModel):
    model: str | None = None
    messages: list[Message]
    temperature: float = 0.7
    max_tokens: int | None = None
    stream: bool = False


# ─── app ───────────────────────────────────────────────────
app = FastAPI(title="stack-llm", version="0.1.0")
llm = LLM()  # Backend picked from LLM_BACKEND env var


@app.middleware("http")
async def log_requests(request: Request, call_next):
    """One log line per request with timing + status. Adds a request_id
    you can trace through every downstream call (essential in step 12)."""
    request_id = uuid.uuid4().hex[:12]
    request.state.request_id = request_id
    t0 = time.time()
    try:
        response = await call_next(request)
        elapsed_ms = int((time.time() - t0) * 1000)
        log.info(
            "request",
            extra={"extra": {
                "request_id": request_id,
                "method": request.method,
                "path": request.url.path,
                "status": response.status_code,
                "elapsed_ms": elapsed_ms,
            }},
        )
        response.headers["X-Request-Id"] = request_id
        return response
    except Exception as e:
        elapsed_ms = int((time.time() - t0) * 1000)
        log.error(
            "request_error",
            extra={"extra": {
                "request_id": request_id,
                "error": str(e),
                "elapsed_ms": elapsed_ms,
            }},
        )
        raise


@app.get("/health")
async def health():
    """Liveness check. Real prod also pokes the backend; we keep it cheap."""
    return {"status": "ok", "backend": llm.config.base_url}


@app.post("/v1/chat/completions")
async def chat_completions(
    body: ChatRequest,
    request: Request,
    _: str = Depends(verify_key),
):
    """OpenAI-compatible chat completions. Streams when stream=True, else blocks."""
    messages = [m.model_dump() for m in body.messages]
    request_id = request.state.request_id

    log.info(
        "chat_started",
        extra={"extra": {
            "request_id": request_id,
            "model": body.model or llm.config.model,
            "n_messages": len(messages),
            "stream": body.stream,
        }},
    )

    if not body.stream:
        # Blocking path — return the full response.
        response = llm.chat(
            messages=messages,
            model=body.model,
            temperature=body.temperature,
            max_tokens=body.max_tokens,
        )
        usage = response.get("usage", {})
        log.info(
            "chat_finished",
            extra={"extra": {
                "request_id": request_id,
                "tokens_in": usage.get("prompt_tokens"),
                "tokens_out": usage.get("completion_tokens"),
            }},
        )
        return JSONResponse(response)

    # Streaming path — wrap the async iterator as SSE.
    async def event_stream() -> AsyncIterator[bytes]:
        token_count = 0
        try:
            async for chunk in llm.astream(
                messages=messages,
                model=body.model,
                temperature=body.temperature,
            ):
                token_count += len(
                    chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
                )
                yield f"data: {json.dumps(chunk)}\n\n".encode()
            yield b"data: [DONE]\n\n"
        finally:
            log.info(
                "chat_finished_stream",
                extra={"extra": {
                    "request_id": request_id,
                    "approx_chars_out": token_count,
                }},
            )

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # disable nginx buffering for SSE
        },
    )

The structure:

  • /health — liveness check. A load balancer pings this every few seconds. Cheap (no LLM call); reports the backend URL so you can curl /health to know what you’re hitting.
  • /v1/chat/completions — the actual endpoint. Same shape as OpenAI’s. Streaming and blocking modes share the path; stream=True gives SSE, false gives JSON.
  • API-key auth via Bearer header. Multiple keys supported via comma-separated env var (so you can rotate without downtime).
  • Structured JSON logging — every request gets a request_id, every downstream call (in later steps) will reference it. The single biggest production debugging accelerator.
  • No business logic yet. That’s the point. The skeleton is clean; we’ll layer features (RAG in step 06, tools in step 09, observability in step 12) on top of it.

Run it

Add the run dependencies (you should already have them from step 00):

uv add "fastapi[standard]" "uvicorn[standard]" pydantic

Boot the service:

LLM_BACKEND=ollama \
STACK_API_KEYS=dev-key \
  uv run uvicorn stack.server:app --reload --port 9000

You should see structured logs:

{"ts":1714123456789,"level":"INFO","msg":"Started server process","extra":{}}
{"ts":1714123456900,"level":"INFO","msg":"Uvicorn running on http://127.0.0.1:9000","extra":{}}

--reload watches the file system and restarts on Python changes. Drop it for production.

Sanity check

Three requests cover the surface area.

1. Health

curl -s http://localhost:9000/health | python3 -m json.tool

Expected:

{
  "status": "ok",
  "backend": "http://localhost:11434/v1"
}

2. Auth — wrong key

curl -s -o /dev/null -w "%{http_code}\n" \
  http://localhost:9000/v1/chat/completions \
  -H "Authorization: Bearer wrong" \
  -H "Content-Type: application/json" \
  -d '{"messages":[{"role":"user","content":"hi"}]}'

Expected:

401

3. Streaming with the right key

curl -N http://localhost:9000/v1/chat/completions \
  -H "Authorization: Bearer dev-key" \
  -H "Content-Type: application/json" \
  -d '{
    "messages":[{"role":"user","content":"Count from 1 to 5."}],
    "stream":true
  }'

Expected (real-time stream):

data: {"id":"chatcmpl-...","choices":[{"delta":{"content":"1"}}]}
data: {"id":"chatcmpl-...","choices":[{"delta":{"content":", "}}]}
...
data: [DONE]

The -N flag disables curl’s buffering so you see the stream in real time — same way a browser would.

In your terminal running uvicorn, you’ll see a request log:

{"ts":1714123456789,"level":"INFO","msg":"chat_started","extra":{"request_id":"a3f2c1...","model":"llama3.1:8b","n_messages":1,"stream":true}}
{"ts":1714123461234,"level":"INFO","msg":"chat_finished_stream","extra":{"request_id":"a3f2c1...","approx_chars_out":18}}
{"ts":1714123461235,"level":"INFO","msg":"request","extra":{"request_id":"a3f2c1...","method":"POST","path":"/v1/chat/completions","status":200,"elapsed_ms":4446}}

Three log lines per streaming request: chat-started, chat-finished, request. All carrying the same request_id. That’s the foundation observability tracing in step 12 will build on.

Test from Python

The OpenAI SDK speaks our service. Pip-install it (or use httpx from earlier):

uv add openai
# scratch/test_my_service.py
from openai import OpenAI

client = OpenAI(
    api_key="dev-key",
    base_url="http://localhost:9000/v1",
)

# Blocking
resp = client.chat.completions.create(
    model="llama3.1:8b",
    messages=[{"role": "user", "content": "What's 2+2?"}],
)
print("blocking:", resp.choices[0].message.content)

# Streaming
print("stream:  ", end="", flush=True)
for chunk in client.chat.completions.create(
    model="llama3.1:8b",
    messages=[{"role": "user", "content": "Count from 1 to 5."}],
    stream=True,
):
    delta = chunk.choices[0].delta.content
    if delta:
        print(delta, end="", flush=True)
print()
uv run python scratch/test_my_service.py

Expected:

blocking: 2 + 2 equals 4.
stream:   1, 2, 3, 4, 5

The official OpenAI Python SDK works against your service unchanged, just by setting base_url and api_key. That’s the payoff for mirroring the schema.

What latency adds

Your service is a thin layer, but layers cost something. Approximate overhead vs hitting the engine directly:

PathPer-request added latency
Blocking JSON response~5 ms
First token of stream~10 ms
Subsequent stream tokensunder 1 ms each

The 5–10 ms is HTTP framing, JSON parsing, auth check, log writes. Negligible at the scale of LLM inference (which is in the seconds). If you do hot-path profiling later and find the layer matters, the JSON logger and middleware logging are your first targets to optimize.

Cross-references

What we did and didn’t do

What we did:

  • Refactored the client into a clean LLM class with sync + async APIs
  • Built a FastAPI service that mirrors the OpenAI chat schema
  • Added Bearer-token auth, structured JSON logging, a health check, and request-IDs for tracing
  • Verified the service works with the official OpenAI Python SDK unchanged
  • Added an env-var switch between Ollama and vLLM backends

What we didn’t:

  • Rate limiting. Production needs per-key request and token-budget limits. slowapi or a Redis token bucket. Step 12 (observability) is the natural place; rate limits often live alongside the metrics they’re enforced from.
  • Per-user API keys + a database. We’ve got one shared key. Multi-tenant deployments use a real auth provider (Clerk, Auth0, Supabase) or a custom user table.
  • Retry / fallback logic. When the backend hiccups, you’d want to retry once or fall back to a secondary model. httpx-retries or a tiny custom wrapper. Add when the first 5xx shows up in logs.
  • Prompt templating / system-prompt injection. Right now the client sends raw messages. Often you want to prepend a system prompt server-side so it can’t be overridden. Step 09 (tools) introduces the templating layer; step 06 (RAG) layers retrieved context onto it.
  • TLS, CORS, graceful shutdown. Deploy-time concerns. Step 15 covers them.

Next

Foundations is done. Step 06 starts the Building release: RAG. Pick a corpus, chunk it, embed it, store it, retrieve at query time, fold the retrieved context into your messages before they hit the engine. The endpoint we just built is what gets the (query, context) pair injected into.

You now have a self-hosted LLM-as-a-service running on your machine, evaluated against academic benchmarks and a custom task suite, exposing a streaming OpenAI-compatible endpoint with auth and observability hooks. The first release of /ship is shippable on its own.