Telemetry: Making Your System Visible
Your RAG pipeline should now be working. Questions come in, the router picks a retrieval strategy, evidence gets compiled, and a grounded answer comes back with citations. But right now, you can only see the inputs and outputs. What happened in between (which route was chosen, how long retrieval took, how many tokens the evidence consumed, whether the model used the evidence or ignored it) is invisible unless you go digging through print statements.
In this lesson we'll add structured telemetry to the pipeline you built in Module 5. By the end, you'll be able to trace any request from start to finish and see exactly what the system did, how long each step took, and what it cost. That visibility will become the foundation for every evaluation and optimization we build in the rest of this module.
What you'll learn
- Distinguish between three layers of AI telemetry: agent traces, application observability, and product outcome events
- Instrument five key events in the RAG pipeline: run start, route chosen, tool call, retrieval return, and response completion
- Set up Langfuse as a trace backend and understand OpenTelemetry as the portable concept underneath
- Trace a full benchmark run end to end and read the resulting trace
- Identify where time and tokens are spent within a single request
Concepts
Agent trace: a structured record of everything that happened during one request to your AI system. A trace captures the sequence of steps (route decision, retrieval, generation), their timing, their inputs and outputs, and any metadata you attach. Traces are hierarchical: a top-level trace contains spans, and spans can contain child spans. Think of a trace as a timeline for a single request, with enough detail to reconstruct what happened and why.
Span: one step within a trace. In our pipeline, each span represents a discrete operation: classifying the question, retrieving evidence, calling the model, or checking grounding. Spans have a start time, an end time, inputs, outputs, and metadata. The nesting of spans within a trace shows the causal structure of the request: which steps triggered which other steps.
The three layers of telemetry: AI systems need telemetry at three distinct levels, and confusing them leads to blind spots:
- Agent traces: what the AI system did for a single request. Route chosen, tools called, evidence retrieved, model response. This is what we'll instrument in this lesson.
- Application observability: how the service is performing across many requests. Latency percentiles, error rates, throughput, resource utilization. This is traditional software observability applied to AI services.
- Product outcome events: whether the system is actually helping users. Task completion, answer acceptance, escalation to a human, user feedback. These events connect technical performance to business value.
Many teams start with layer 1 (traces), bolt on layer 2 (application metrics), and only add layer 3 (outcomes) when they realize they can't tell whether "fast and cheap" actually means "useful." We'll build all three across this lesson and the next.
Generation: in Langfuse's model, a generation is a special type of span that represents an LLM call. It captures the model name, prompt, completion, token counts, and cost. Separating generations from other spans makes it easy to aggregate model usage across traces.
Problem-to-Tool Map
| Problem class | Symptom | Cheapest thing to try first | Tool or approach |
|---|---|---|---|
| Opaque agent behavior | You can't explain why the system chose a route or tool | Print statements | Structured traces with spans |
| No cost visibility | Quality improves but you don't know what it costs | Manual token estimates | Per-generation usage telemetry |
| Can't reproduce failures | A request failed but you can't recreate the conditions | Re-run and hope | Trace replay with captured inputs |
| Cross-step debugging | The answer is wrong but you don't know which step failed | Read the full output | Span-level inspection of route, retrieval, generation |
| No product signal | System is fast but you don't know if answers are useful | Ask users informally | Product outcome events attached to traces |
Default: Langfuse
Why this is our default: Langfuse is open-source, has a generous free tier for development, provides a trace UI out of the box, and has a Python SDK that integrates cleanly with our existing code. It gives you trace visualization, cost tracking, and eval hooks without requiring infrastructure setup.
Portable concept underneath: Every trace should be reconstructable as a sequence of: request received, route chosen, tool calls made, retrieval results returned, model response generated, outcome recorded. This structure is the same whether you use Langfuse, LangSmith, Arize Phoenix, or raw OpenTelemetry. The concepts (traces, spans, generations, metadata) are portable. The SDK calls are not.
Closest alternatives and when to switch:
- LangSmith: use if LangChain or LangGraph is already your orchestration layer and you want tighter first-party integration
- Arize Phoenix: use if your organization already standardizes on it, or if you want a self-hosted open-source option with strong eval integration
- Pure OpenTelemetry + Jaeger/Grafana: use when you want maximal control over your telemetry pipeline and can tolerate more infrastructure setup. This is the most portable option but requires the most work. I would personally choose this in production, but it's a bit distracting for this course.
Walkthrough
Setting up Langfuse
First, install the SDK and set up credentials:
pip install langfuseYou'll need a Langfuse account for this. In development, the cloud tier at langfuse.com should be sufficient. After creating a project, you'll get a public key and a secret key.
export LANGFUSE_PUBLIC_KEY="pk-lf-..."
export LANGFUSE_SECRET_KEY="sk-lf-..."
export LANGFUSE_HOST="https://cloud.langfuse.com" # or your self-hosted URLLangfuse can run locally via Docker if you prefer not to send traces to the cloud. See the Langfuse self-hosting docs for setup. The code in this lesson works the same either way.
The five events we'll instrument
We'll add telemetry to five key points in our RAG pipeline. Each one then becomes a span in the trace:
- Run start: a request arrives, we create a trace and record the question and metadata
- Route chosen: the retrieval router classifies the question and picks a mode
- Tool call: any tool invocation (retrieval, graph traversal, etc.)
- Retrieval return: the evidence bundle comes back with chunk count, token count, and scores
- Response completion: the model generates an answer, and we record tokens, latency, and the response
The code below is the instrumented pipeline, which builds directly on top of rag/rag_with_routing.py from Module 5's Retrieval Modes and Routing lesson:
# observability/traced_pipeline.py
"""RAG pipeline with Langfuse tracing."""
import os
import sys
import time
from datetime import datetime, timezone
sys.path.insert(0, ".")
from langfuse import Langfuse
from rag.retrieval_service import (
retrieve, RetrievalPolicy, RetrievalMode, RetrievalResult,
)
from rag.grounded_answer import (
GroundedAnswer, check_grounding, generate_answer,
GROUNDING_SYSTEM_PROMPT,
)
from retrieval.context_compiler import ContextPack, EvidenceChunk
from openai import OpenAI
langfuse = Langfuse()
client = OpenAI()
MODEL = "gpt-4o-mini"
REPO_SHA = os.popen("git rev-parse --short HEAD").read().strip()
def traced_rag_pipeline(
question: str,
hybrid_retrieve_fn,
graph_traverse_fn=None,
policy: RetrievalPolicy | None = None,
mode: RetrievalMode | None = None,
model: str = MODEL,
run_id: str | None = None,
user_id: str | None = None,
) -> GroundedAnswer:
"""RAG pipeline with full Langfuse tracing."""
# --- 1. Run start: create the trace ---
trace = langfuse.trace(
name="rag-pipeline",
input={"question": question, "mode": mode.value if mode else "auto"},
metadata={
"run_id": run_id or "interactive",
"repo_sha": REPO_SHA,
"model": model,
},
user_id=user_id,
)
pipeline_start = time.perf_counter()
# --- 2. Route chosen ---
routing_span = trace.span(name="routing", input={"question": question})
routing_start = time.perf_counter()
result: RetrievalResult = retrieve(
question, hybrid_retrieve_fn,
graph_traverse_fn=graph_traverse_fn, policy=policy, mode=mode,
)
routing_ms = (time.perf_counter() - routing_start) * 1000
routing_span.end(
output={
"mode": result.mode_used.value,
"confidence": result.classification.confidence,
"reasoning": result.classification.reasoning,
"skipped": result.skipped,
},
metadata={"duration_ms": round(routing_ms, 1)},
)
# --- Handle skip (no retrieval needed) ---
if result.skipped:
gen_span = trace.generation(
name="generate-no-retrieval", model=model,
input=[
{"role": "system", "content": "Answer from general knowledge."},
{"role": "user", "content": question},
],
)
gen_start = time.perf_counter()
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": (
"You are a helpful assistant. Answer the question from "
"your general knowledge. Be concise and accurate."
)},
{"role": "user", "content": question},
],
temperature=0,
)
answer_text = response.choices[0].message.content
usage = response.usage
gen_ms = (time.perf_counter() - gen_start) * 1000
gen_span.end(
output=answer_text,
usage={
"input": usage.prompt_tokens,
"output": usage.completion_tokens,
"total": usage.total_tokens,
},
metadata={"duration_ms": round(gen_ms, 1)},
)
answer = GroundedAnswer(
question=question, answer=answer_text,
abstained=False, model=model,
)
total_ms = (time.perf_counter() - pipeline_start) * 1000
trace.update(output={
"answer": answer_text[:200],
"skipped_retrieval": True,
"total_ms": round(total_ms, 1),
})
langfuse.flush()
return answer
# --- 3. Tool call / 4. Retrieval return ---
retrieval_span = trace.span(
name="retrieval",
input={"question": question, "mode": result.mode_used.value},
)
snippet_count = len(result.bundle.snippets) if result.bundle else 0
token_count = result.bundle.total_tokens if result.bundle else 0
retrieval_span.end(output={
"snippet_count": snippet_count, "total_tokens": token_count,
"files": [s.file_path for s in (result.bundle.snippets if result.bundle else [])][:10],
})
# --- Grounding check ---
grounding_span = trace.span(
name="grounding-check",
input={"snippet_count": snippet_count, "total_tokens": token_count},
)
pack = ContextPack(
question=question,
chunks=[
EvidenceChunk(
chunk_id=s.chunk_id, file_path=s.file_path,
symbol_name=s.symbol_name or "", text=s.text,
start_line=s.start_line, end_line=s.end_line,
retrieval_method=s.retrieval_method,
retrieval_score=s.relevance_score,
)
for s in result.bundle.snippets
],
total_tokens=result.bundle.total_tokens,
token_budget=result.bundle.token_budget,
)
sufficient, reason = check_grounding(pack)
grounding_span.end(output={"sufficient": sufficient, "reason": reason})
if not sufficient:
answer = GroundedAnswer(
question=question,
answer=f"I don't have enough evidence to answer this question reliably. {reason}",
abstained=True, abstention_reason=reason,
model=model, context_tokens=pack.total_tokens,
)
total_ms = (time.perf_counter() - pipeline_start) * 1000
trace.update(output={"answer": answer.answer[:200], "abstained": True, "total_ms": round(total_ms, 1)})
langfuse.flush()
return answer
# --- 5. Response completion ---
gen_span = trace.generation(
name="generate-grounded-answer", model=model,
input=[
{"role": "system", "content": GROUNDING_SYSTEM_PROMPT[:200] + "..."},
{"role": "user", "content": f"[evidence: {snippet_count} snippets] {question}"},
],
)
gen_start = time.perf_counter()
answer = generate_answer(question, pack, model=model)
gen_ms = (time.perf_counter() - gen_start) * 1000
gen_span.end(
output=answer.answer[:500],
usage={
"input": answer.context_tokens or 0,
"output": 0, # Approximate — Langfuse captures exact counts
# when using its OpenAI wrapper; this is a fallback.
"total": answer.context_tokens or 0,
},
metadata={
"duration_ms": round(gen_ms, 1),
"citation_count": len(answer.citations) if answer.citations else 0,
},
)
total_ms = (time.perf_counter() - pipeline_start) * 1000
trace.update(output={
"answer": answer.answer[:200], "abstained": answer.abstained,
"citations": len(answer.citations) if answer.citations else 0,
"total_ms": round(total_ms, 1), "context_tokens": answer.context_tokens,
})
langfuse.flush()
return answer
if __name__ == "__main__":
from retrieval.hybrid_retrieve import hybrid_retrieve
test_questions = [
"What does validate_path return?",
"What is the architecture of the retrieval system?",
"What is a Python decorator?",
]
print("Traced RAG pipeline demo\n")
for q in test_questions:
print(f"Q: {q}")
ans = traced_rag_pipeline(q, hybrid_retrieve, run_id="telemetry-demo")
if ans.abstained:
print(f" ABSTAINED: {ans.abstention_reason}")
else:
print(f" Answer: {ans.answer[:120]}...")
print()
print("Traces flushed to Langfuse. Open the Langfuse UI to inspect them.")mkdir -p observability
python observability/traced_pipeline.pyExpected output:
Traced RAG pipeline demo
Q: What does validate_path return?
Answer: The `validate_path` function returns a `Path` object if the path is valid and within...
Q: What is the architecture of the retrieval system?
Answer: The retrieval system uses a hybrid architecture combining vector search and lexical...
Q: What is a Python decorator?
Answer: A Python decorator is a function that takes another function as an argument and extends...
Traces flushed to Langfuse. Open the Langfuse UI to inspect them.
After running this, open the Langfuse UI. You'll see three traces, each with nested spans showing the routing decision, retrieval results, grounding check, and generation. The "What is a Python decorator?" trace will show the skip path: routing decided no retrieval was needed, so there's a single generation span with no retrieval span.
Reading a trace
When you open a trace in the Langfuse UI, you'll see a waterfall view showing:
- Trace metadata: the question, run ID, repo SHA, and total duration
- Span timeline: each span as a horizontal bar, showing when it started, how long it ran, and what it produced
- Generation details: for LLM calls, the model, input/output tokens, and the full prompt and completion
- Nesting: child spans appear indented under their parent, showing the causal structure
The most useful thing to look for in early traces is where time is spent. In a typical RAG request, retrieval and generation dominate. If routing takes more than a few milliseconds, something is wrong. If retrieval takes longer than generation, your index may need optimization.
Understanding OpenTelemetry as the portable concept
Langfuse uses its own SDK, but the concepts map directly to OpenTelemetry (OTel):
| Langfuse concept | OTel equivalent | What it represents |
|---|---|---|
| Trace | Trace | One end-to-end request |
| Span | Span | One step in the request |
| Generation | Span with gen_ai.* attributes | An LLM call specifically |
| Metadata | Span attributes | Key-value data attached to a span |
| Score | — (custom) | Eval result attached to a trace |
If you later migrate to a pure OTel setup, you'll carry the same mental model. Traces contain spans, spans have timing and metadata, and generations are spans with model-specific attributes. The OpenTelemetry Semantic Conventions for GenAI define a standard attribute set for LLM calls that most observability tools understand.
Tracing a benchmark run
Now we'll connect telemetry to the benchmark harness from Module 2. This lets you trace an entire benchmark pass and inspect any individual question's trace:
# observability/traced_benchmark.py
"""Run the benchmark with tracing enabled."""
import json
import os
import sys
from datetime import datetime, timezone
sys.path.insert(0, ".")
from observability.traced_pipeline import traced_rag_pipeline, langfuse
from retrieval.hybrid_retrieve import hybrid_retrieve
RUN_ID = "traced-" + datetime.now(timezone.utc).strftime("%Y-%m-%d-%H%M%S")
MODEL = "gpt-4o-mini"
PROVIDER = "openai"
BENCHMARK_FILE = "benchmark-questions.jsonl"
OUTPUT_FILE = f"harness/runs/{RUN_ID}.jsonl"
REPO_SHA = os.popen("git rev-parse --short HEAD").read().strip()
questions = []
with open(BENCHMARK_FILE) as f:
for line in f:
if line.strip():
questions.append(json.loads(line))
print(f"Running {len(questions)} benchmark questions with tracing")
print(f"Run ID: {RUN_ID}\n")
os.makedirs("harness/runs", exist_ok=True)
results = []
for i, q in enumerate(questions):
print(f"[{i+1}/{len(questions)}] {q['category']}: {q['question'][:60]}...")
answer = traced_rag_pipeline(
question=q["question"], hybrid_retrieve_fn=hybrid_retrieve,
model=MODEL, run_id=RUN_ID,
)
entry = {
"run_id": RUN_ID, "question_id": q["id"],
"question": q["question"], "category": q["category"],
"answer": answer.answer, "model": MODEL, "provider": PROVIDER,
"evidence_files": list(set(
c.get("file_path", "") for c in answer.citations
)) if answer.citations else [],
"tools_called": [],
"retrieval_method": "routed",
"grade": None, "failure_label": None, "grading_notes": "",
"repo_sha": REPO_SHA,
"timestamp": datetime.now(timezone.utc).isoformat(),
"harness_version": "v0.2",
}
results.append(entry)
with open(OUTPUT_FILE, "w") as f:
for entry in results:
f.write(json.dumps(entry) + "\n")
langfuse.flush()
print(f"\nDone. {len(results)} results saved to {OUTPUT_FILE}")
print(f"Traces available in Langfuse. Filter by run_id: {RUN_ID}")python observability/traced_benchmark.pyAfter this runs, you'll have two things: a JSONL run log (same format as Module 2) and a full set of traces in Langfuse. You can filter traces by the run ID and inspect any individual question to see exactly what the pipeline did.
What traces reveal that logs don't
Print-based logging tells you what happened. Traces tell you how it happened:
- Timing relationships: you can see that retrieval took 340ms but generation took 1200ms, so generation is the bottleneck
- Causal structure: the routing span shows why hybrid mode was chosen, and the retrieval span shows what evidence came back
- Token accounting: the generation span shows exactly how many input tokens were evidence vs. system prompt
- Failure isolation: when an answer is wrong, you can trace backward from the generation to see if the evidence was bad (retrieval problem) or the evidence was good but the model ignored it (generation problem)
We'll build on this visibility in the next lesson, where we'll add cost tracking, latency budgets, and product outcome events to the traces.
Exercises
- Set up Langfuse (cloud or self-hosted) and run
observability/traced_pipeline.pywith the three test questions. Open the Langfuse UI and inspect each trace. Identify which spans took the longest. - Run
observability/traced_benchmark.pyagainst your full benchmark set. Filter traces by run ID in Langfuse and find the trace with the highest total duration. What made it slow? - Find a trace where the answer was wrong (you can cross-reference with your graded run logs from Module 2). Walk through the trace spans and identify where the failure originated: routing, retrieval, or generation.
- Add a custom span to the pipeline for a step that isn't currently instrumented. For example, add a span around the
context_pack_to_bundlecall to see how long evidence bundle assembly takes. - Write a short note (3-4 sentences) explaining what one trace taught you that print-statement logs alone wouldn't have shown.
Completion checkpoint
You should now have:
- Langfuse set up and receiving traces from your pipeline
- An instrumented RAG pipeline (
observability/traced_pipeline.py) that produces traces with spans for routing, retrieval, grounding, and generation - At least one full benchmark run traced end to end, with results saved to both JSONL and Langfuse
- The ability to open any trace in the UI and identify where time and tokens were spent
- A written observation about what traces revealed that logs alone wouldn't have
Reflection prompts
- Which step in the pipeline consistently takes the most time? Is that the step you expected?
- For a question that was answered incorrectly, could you identify the failure point from the trace alone? What additional information would have helped?
- How would you explain the value of structured traces to someone who says "print statements are fine"?
Connecting to the project
We can now see inside the pipeline as it runs. Every request produces a trace showing the route chosen, the evidence retrieved, the tokens consumed, and the answer generated. This visibility is the prerequisite for everything else in this module. You can't optimize what you can't see, and you can't evaluate what you can't measure.
But visibility alone doesn't answer the operational questions: how much does each answer cost? Where are we wasting tokens? How do we set budgets and catch runaway costs before they become a problem? The next lesson adds the cost, caching, and rate-limiting layers that turn raw telemetry into actionable operational metrics.
What's next
Cost, Caching and Rate Limits. Visibility is not enough if you cannot tell what the system costs or where tokens are being wasted; the next lesson makes the pipeline operationally legible.
References
Start here
- Langfuse documentation — setup guides, SDK reference, and trace visualization for the observability layer we're using as our default
Build with this
- Langfuse Python SDK — the SDK methods used in this lesson for creating traces, spans, and generations
- OpenTelemetry Semantic Conventions for GenAI — the portable attribute model for LLM telemetry, useful as a reference for what to capture
Deep dive
- OpenTelemetry documentation — the full OTel specification; most relevant if you want to build a pure OTel pipeline without Langfuse
- Anthropic: Building effective agents — discusses observability as a prerequisite for reliable agent systems