production-stack production 12 / 17 22 min read · 30 min hands-on

step 12 · ship · production

Observability with Phoenix

Trace every model call, every retrieval, every tool. The waterfall view production AI requires.

observabilityproduction

When something is wrong with your AI service at 2 a.m., the first question is always “what was the model actually shown, and what did it actually return?” Without tracing, you’ll spend hours grepping logs and reproducing requests. With tracing, you click on the failed span, see the exact prompt, the exact response, the tools called, the documents retrieved, and you’re debugging in two minutes.

This is the single highest-leverage thing you can add to a production AI service. Not “nice to have.” Required. This article is short because it should be — Phoenix does the hard part.

Why Phoenix and not Datadog / Honeycomb / Sentry

You can absolutely use general-purpose APM tools. Datadog has LLM Observability, Honeycomb supports OpenTelemetry, Sentry traces requests fine. They work.

But AI workloads have specific shapes that LLM-specific observability tools render natively:

  • Prompt diffing. “Show me what changed between the prompt template last week and this week.” Generic APM treats prompts as opaque strings; Phoenix renders them with templating awareness.
  • Token-level cost views. Per-trace, per-prompt-variant, per-tool. Generic APM measures wall-clock; AI cost is dominated by tokens, not seconds.
  • Eval correlation. Phoenix lets you bind eval scores (from step 13) to the traces that produced them, so “which prompt variant has the worst factuality score” is one query.
  • Retrieval inspection. When a chunk is retrieved, Phoenix shows the chunk text, score, and embedding similarity inline. Datadog shows you a span name.

For step 12, Phoenix (open-source, runs locally, OTLP-compatible). At scale you’d add Datadog or Honeycomb alongside it. Both can ingest the same OpenTelemetry data.

Setup

uv add arize-phoenix opentelemetry-api opentelemetry-sdk \
       opentelemetry-instrumentation-httpx \
       openinference-semantic-conventions

Phoenix runs as a local server on port 6006 (it borrowed TensorBoard’s). Start it:

uv run python -c "import phoenix as px; px.launch_app()"
# then open http://localhost:6006

Leave that running in one terminal. The traces you emit from your stack will appear there in real time.

The tracing module

# stack/tracing.py
from __future__ import annotations
import contextlib
import json
import os
from typing import Any, Iterator

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor, ConsoleSpanExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
    OTLPSpanExporter,
)


_INITIALIZED = False
_tracer = None


def init_tracing(
    service_name: str = "stack",
    endpoint: str | None = None,
    console: bool = False,
) -> None:
    """Initialize OpenTelemetry. Call once at process start.

    Args:
        service_name: Shows up as the service in Phoenix / Datadog.
        endpoint: OTLP endpoint URL. Default: PHOENIX_COLLECTOR_ENDPOINT
                  env var, falling back to localhost:6006.
        console: If True, also log spans to stdout. Useful for debugging.
    """
    global _INITIALIZED, _tracer
    if _INITIALIZED:
        return

    endpoint = endpoint or os.environ.get(
        "PHOENIX_COLLECTOR_ENDPOINT", "http://localhost:6006/v1/traces"
    )

    provider = TracerProvider()
    provider.add_span_processor(
        BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint))
    )
    if console:
        provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))

    trace.set_tracer_provider(provider)
    _tracer = trace.get_tracer(service_name)
    _INITIALIZED = True


def tracer():
    """Return the global tracer. Initializes on first call."""
    if _tracer is None:
        init_tracing()
    return _tracer

A single init_tracing() at process start, a single tracer() accessor everywhere else. The OTLP processor batches and sends in the background — your hot path doesn’t pay the cost of network I/O on each span.

Instrumenting LLM calls

The most valuable spans capture (a) what was sent, (b) what was returned, (c) tokens used, (d) duration. Five lines around LLM.chat:

# stack/tracing.py (continued)
from openinference.semconv.trace import (
    SpanAttributes, OpenInferenceSpanKindValues,
)


@contextlib.contextmanager
def llm_span(
    model: str,
    messages: list[dict],
    temperature: float = 0.0,
) -> Iterator[Any]:
    """Wrap an LLM call. Caller fills in output via .set_attribute()."""
    with tracer().start_as_current_span("llm.chat") as span:
        span.set_attribute(
            SpanAttributes.OPENINFERENCE_SPAN_KIND,
            OpenInferenceSpanKindValues.LLM.value,
        )
        span.set_attribute(SpanAttributes.LLM_MODEL_NAME, model)
        span.set_attribute("llm.invocation_parameters",
                           json.dumps({"temperature": temperature}))
        span.set_attribute(SpanAttributes.LLM_INPUT_MESSAGES,
                           json.dumps(messages))
        try:
            yield span
        except Exception as exc:
            span.record_exception(exc)
            span.set_status(trace.Status(trace.StatusCode.ERROR))
            raise


def record_llm_output(span, response: dict) -> None:
    """Pull the output back out of an OpenAI-shaped response and tag the span."""
    msg = response["choices"][0]["message"]
    usage = response.get("usage", {})
    span.set_attribute(SpanAttributes.LLM_OUTPUT_MESSAGES,
                       json.dumps([msg]))
    span.set_attribute(SpanAttributes.LLM_TOKEN_COUNT_PROMPT,
                       usage.get("prompt_tokens", 0))
    span.set_attribute(SpanAttributes.LLM_TOKEN_COUNT_COMPLETION,
                       usage.get("completion_tokens", 0))
    span.set_attribute(SpanAttributes.LLM_TOKEN_COUNT_TOTAL,
                       usage.get("total_tokens", 0))

The OpenInference attribute names are the magic. Phoenix knows what llm.input_messages means and renders it as a chat transcript. Datadog and Honeycomb render it generically but you can still filter and search. Resist the urge to invent your own attribute names.

Wiring into stack/llm.py

Edit your existing LLM.chat:

# stack/llm.py — modified chat method
from stack.tracing import llm_span, record_llm_output


def chat(self, messages, model=None, temperature=0.7, max_tokens=None, **kwargs):
    model_name = model or self.config.model
    payload = {
        "model": model_name,
        "messages": messages,
        "stream": False,
        "temperature": temperature,
        **kwargs,
    }
    if max_tokens is not None:
        payload["max_tokens"] = max_tokens

    with llm_span(model_name, messages, temperature) as span:
        with httpx.Client(timeout=self.config.timeout) as client:
            r = client.post(f"{self.config.base_url}/chat/completions", json=payload)
            r.raise_for_status()
            response = r.json()
        record_llm_output(span, response)
        return response

That’s it. Three new lines around the existing call. Every chat completion is now traced.

Tool-call and retrieval spans

Two more wrapper helpers:

# stack/tracing.py (continued)
@contextlib.contextmanager
def tool_span(name: str, arguments: dict) -> Iterator[Any]:
    """Wrap a tool execution."""
    with tracer().start_as_current_span(f"tool.{name}") as span:
        span.set_attribute(
            SpanAttributes.OPENINFERENCE_SPAN_KIND,
            OpenInferenceSpanKindValues.TOOL.value,
        )
        span.set_attribute(SpanAttributes.TOOL_NAME, name)
        span.set_attribute(SpanAttributes.INPUT_VALUE,
                           json.dumps(arguments, default=str))
        try:
            yield span
        except Exception as exc:
            span.record_exception(exc)
            span.set_status(trace.Status(trace.StatusCode.ERROR))
            raise


@contextlib.contextmanager
def retrieval_span(query: str, top_k: int) -> Iterator[Any]:
    """Wrap a retrieval call. Caller fills in retrieved docs."""
    with tracer().start_as_current_span("retrieval") as span:
        span.set_attribute(
            SpanAttributes.OPENINFERENCE_SPAN_KIND,
            OpenInferenceSpanKindValues.RETRIEVER.value,
        )
        span.set_attribute(SpanAttributes.INPUT_VALUE, query)
        span.set_attribute("retrieval.top_k", top_k)
        try:
            yield span
        except Exception as exc:
            span.record_exception(exc)
            span.set_status(trace.Status(trace.StatusCode.ERROR))
            raise


def record_retrieval_output(span, docs: list[dict]) -> None:
    """Tag a retrieval span with the docs that were returned."""
    span.set_attribute(SpanAttributes.RETRIEVAL_DOCUMENTS,
                       json.dumps([
                           {"id": d.get("id"), "score": d.get("score"),
                            "text": d.get("text", "")[:500]}
                           for d in docs
                       ]))

Now wrap the relevant call sites:

# stack/agent.py — inside _dispatch (modified)
from stack.tracing import tool_span

def _dispatch(self, tc, history, steps, iteration, cfg):
    name = tc["function"]["name"]
    raw_args = tc["function"]["arguments"] or "{}"
    args = json.loads(raw_args) if raw_args else {}

    with tool_span(name, args) as span:
        try:
            result = self.registry.call(name, args)
            content = json.dumps(result, default=str)
            span.set_attribute("output.value", content[:1000])
        except Exception as exc:
            content = json.dumps({"error": f"{type(exc).__name__}: {exc}"})
            raise

    history.append({
        "role": "tool", "tool_call_id": tc["id"],
        "name": name, "content": content,
    })
# stack/retrieve.py — inside HybridRetriever.retrieve (modified)
from stack.tracing import retrieval_span, record_retrieval_output

def retrieve(self, query: str, cfg: RetrievalConfig | None = None):
    cfg = cfg or RetrievalConfig()
    with retrieval_span(query, cfg.n_after_rerank) as span:
        # existing 3-stage logic produces `results`
        record_retrieval_output(span, results)
        return results

The agent run as a parent span

Wrap Agent.run so all the LLM and tool spans nest under one root:

# stack/agent.py — modified Agent.run signature
from stack.tracing import tracer
from openinference.semconv.trace import (
    SpanAttributes, OpenInferenceSpanKindValues,
)


def run(self, user_goal: str) -> AgentResult:
    with tracer().start_as_current_span("agent.run") as root:
        root.set_attribute(
            SpanAttributes.OPENINFERENCE_SPAN_KIND,
            OpenInferenceSpanKindValues.AGENT.value,
        )
        root.set_attribute(SpanAttributes.INPUT_VALUE, user_goal)
        # existing loop body produces `result`
        root.set_attribute(SpanAttributes.OUTPUT_VALUE, result.final)
        root.set_attribute("agent.stop_reason", result.stop_reason)
        root.set_attribute("agent.iterations", len(result.steps))
        return result

Now in Phoenix you’ll see:

agent.run                                       18.4s
  llm.chat                                       4.2s
  tool.search_docs                               0.3s
    retrieval                                    0.2s
  llm.chat                                       3.8s
  tool.fetch_chunk                               0.1s
  llm.chat                                       5.4s
  llm.chat                                       4.7s   (final answer)

Click any span and you see the prompt, response, tokens, retrieved docs. That’s the whole point.

The runner script

# stack/run_traced.py
from stack.tracing import init_tracing
from stack.llm import LLM
from stack.tools import (
    ToolRegistry, tool_from_callable, now, search_docs,
)
from stack.agent import Agent, fetch_chunk, SYSTEM_PROMPT


if __name__ == "__main__":
    init_tracing(service_name="stack-dev", console=False)
    llm = LLM()
    registry = ToolRegistry()
    for fn in (now, search_docs, fetch_chunk):
        registry.register(tool_from_callable(fn))

    agent = Agent(llm, registry, SYSTEM_PROMPT)
    result = agent.run(
        "Compare Postgres and SQLite for a small Django app's database."
    )
    print(f"\n{result.final}\n")
    print(f"Trace: open http://localhost:6006 to see the waterfall.")

Run it:

# Terminal 1: Phoenix
uv run python -c "import phoenix as px; px.launch_app(host='0.0.0.0')"

# Terminal 2: traced agent
uv run python -m stack.run_traced

Open http://localhost:6006. You’ll see the agent run as a root span, child LLM calls, tool spans nested correctly, and the latency waterfall.

Production touches

A few small things that turn a dev setup into something you’d run unattended:

Trace IDs in HTTP responses

Add the trace ID to every API response so users (and your support team) can hand you a single string when reporting a bug:

# stack/server.py — modified
from opentelemetry import trace as otel_trace

@app.middleware("http")
async def add_trace_id(request: Request, call_next):
    response = await call_next(request)
    span = otel_trace.get_current_span()
    if span and span.get_span_context().trace_id:
        trace_id = format(span.get_span_context().trace_id, "032x")
        response.headers["X-Trace-Id"] = trace_id
    return response

A user emails you “the bot gave me garbage on request 4f8d...” — paste the ID into Phoenix, get the exact trace.

User attribution

Tag the agent span with the authenticated user so you can filter all traces for one user:

root.set_attribute("user.id", request.state.user_id)
root.set_attribute("session.id", request.state.session_id)

Sampling

When you cross ~10 req/s, full sampling gets expensive. The OTel SDK supports head-based sampling — capture 1% of requests by default, 100% of error requests, 100% of slow (over 10 s) requests:

from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased

provider = TracerProvider(
    sampler=ParentBased(root=TraceIdRatioBased(0.01))
)

Errors and slow paths still get full traces (because their parent decision propagates). Fast happy-path requests get sampled. Do this before scaling, not after.

Cross-references

What we did and didn’t do

What we did:

  • OpenTelemetry initialization with Phoenix as the local backend
  • Span helpers for LLM calls, tool calls, retrieval, and agent runs
  • Wired into stack/llm.py, stack/agent.py, and stack/retrieve.py
  • Trace IDs in HTTP responses for end-to-end debugging
  • Head-based sampling for production cost management

What we didn’t:

  • Metrics (the second OTel signal). Histograms of latency, counters for errors, gauges for queue depth. Tracing covers the per-request view; metrics cover the rate view. Add a Counter/Histogram instance per dimension once you’re at scale.
  • Logs (the third OTel signal). Structured logs already exist in stack/server.py from step 05. OTel can ingest them and correlate to traces; nice-to-have, not required.
  • Real-user-monitoring (RUM). Browser-side latency for user-facing apps. A different problem; covered by Sentry / LogRocket / Datadog RUM, not by Phoenix.
  • Trace-based testing. Replay a captured trace as a regression test. Possible, niche; defer until you have a specific reason.

Next

Step 13 is evaluation in production — using the traces from this step to feed live eval pipelines. The eval harness from step 04 was for offline testing; we’ll wire the same eval.py to live request/response pairs, add A/B prompt testing, drift detection, and a feedback collector that turns user thumbs-down into eval data. Once that’s running you’ll know about quality regressions before users do.