Thread and Workflow Memory
Up to this point, every request to our system starts from scratch. The orchestrator receives a question, routes it, gets a specialist response, and synthesizes an answer, and then forgets everything. Ask a follow-up question, and the system has no idea what you just discussed. Start a multi-step debugging session, and each step loses the context from the previous one.
Memory fixes this, but memory isn't one thing. It's at least three layers, and mixing them up creates problems. This lesson covers the first two layers, which are thread memory (the current conversation) and workflow state (the current multi-step task). These are the simpler, safer layers. We'll build them first, measure the improvement, and save long-term memory (the riskier layer) for the next lesson.
What you'll learn
- Implement thread memory that maintains conversation context within a session
- Build workflow state that persists intermediate results across steps of a multi-step task
- Manage context window pressure as accumulated memory grows
- Apply summarization and truncation strategies to keep memory useful without flooding the context
- Measure whether memory improves system quality on a benchmark subset
Concepts
Thread memory — the record of the current conversation session. Thread memory includes the messages exchanged so far, the questions asked, the answers given, and any clarifications. When a user asks "what about the error handling?" after discussing a function, thread memory is what connects "the error handling" to the function from three turns ago. Thread memory is scoped to a single session and disappears when the session ends.
Workflow state — the structured record of a multi-step task's progress. Where thread memory is conversational (a sequence of messages), workflow state is operational (a data structure tracking what's been done, what's pending, and what intermediate results exist). A debugging workflow might track: symptom identified, relevant files located, root cause hypothesized, fix proposed, tests run. Workflow state makes multi-step tasks resumable and debuggable.
Context window pressure — the tension between adding memory to the context and keeping room for retrieval, tool results, and generation. Every token of memory is a token that can't be used for evidence or reasoning. As conversations grow, naive thread memory (just append every message) will eventually crowd out the context window, degrading quality. Managing context window pressure is a core context engineering challenge — one we first encountered in Module 1 and will keep encountering as the system grows.
Memory summarization — the practice of compressing older memory into a shorter summary to reduce context window pressure. Instead of keeping all 40 messages from a long conversation, summarize turns 1-30 into a paragraph and keep turns 31-40 verbatim. The summary preserves the key decisions and facts while freeing tokens for the current work.
Memory truncation — dropping the oldest memory entries when the context window fills up. Truncation is simpler than summarization but lossy — early context that might matter (the original question, key constraints) can be lost. A common pattern is to combine both: summarize old turns and truncate the summaries when even those grow too large.
Problem-to-Tool Map
| Problem class | Symptom | Cheapest thing to try first | Tool or approach |
|---|---|---|---|
| System forgets the current conversation | Follow-up questions lose context | Include recent messages in the prompt | Thread memory with a message window |
| Multi-step tasks lose progress | Each step starts from scratch | Pass state manually | Explicit workflow state in the graph |
| Long conversations degrade quality | Answers get worse as conversations grow | Limit message count | Summarize older turns, keep recent ones verbatim |
| Memory crowds out retrieval | Less room for evidence in long threads | Shorter history window | Adaptive memory budget based on task type |
Walkthrough
Thread memory: maintaining conversation context
The simplest form of thread memory is including recent messages in the prompt. LangGraph provides checkpointing that makes this straightforward — the graph state persists across invocations within a session:
# orchestration/memory.py
"""Thread and workflow memory for the orchestration pipeline.
Thread memory: conversation history within a session.
Workflow state: structured progress tracking for multi-step tasks.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from openai import OpenAI
client = OpenAI()
@dataclass
class ThreadMemory:
"""Conversation history for a single session.
Maintains a window of recent messages and a summary of older ones.
"""
messages: list[dict] = field(default_factory=list)
summary: str = ""
session_id: str = ""
created_at: str = ""
max_recent_messages: int = 20 # Keep this many recent messages verbatim
summary_threshold: int = 30 # Summarize when total exceeds this
def add_message(self, role: str, content: str) -> None:
"""Add a message to the thread."""
self.messages.append({
"role": role,
"content": content,
"timestamp": datetime.now(timezone.utc).isoformat(),
})
if len(self.messages) > self.summary_threshold:
self._summarize_old_messages()
def get_context_messages(self) -> list[dict]:
"""Get messages formatted for the LLM context."""
context = []
if self.summary:
context.append({
"role": "system",
"content": f"Summary of earlier conversation:\n{self.summary}",
})
recent = self.messages[-self.max_recent_messages:]
for msg in recent:
context.append({"role": msg["role"], "content": msg["content"]})
return context
def _summarize_old_messages(self) -> None:
"""Compress older messages into a summary."""
if len(self.messages) <= self.max_recent_messages:
return
old_messages = self.messages[:-self.max_recent_messages]
summary_prompt = "Summarize this conversation history into key facts, decisions, and context. Be concise but preserve important details:\n\n"
for msg in old_messages:
summary_prompt += f"{msg['role']}: {msg['content'][:200]}\n"
if self.summary:
summary_prompt = f"Previous summary:\n{self.summary}\n\nNew messages to incorporate:\n" + summary_prompt
response = client.chat.completions.create(
model="gpt-4.1-nano",
messages=[{"role": "user", "content": summary_prompt}],
max_tokens=300,
)
self.summary = response.choices[0].message.content
self.messages = self.messages[-self.max_recent_messages:]
def token_estimate(self) -> int:
"""Rough estimate of tokens consumed by this memory."""
total_chars = len(self.summary)
for msg in self.messages[-self.max_recent_messages:]:
total_chars += len(msg["content"])
return total_chars // 4To integrate thread memory with the orchestration graph, pass the memory through the graph state:
# orchestration/graph.py (extend OrchestratorState)
@dataclass
class OrchestratorState:
"""State passed between nodes in the orchestration graph."""
question: str = ""
route: str = ""
specialist_output: str = ""
evidence: list[str] = field(default_factory=list)
tools_called: list[str] = field(default_factory=list)
final_answer: str = ""
confidence: float = 0.0
retry_count: int = 0
# Memory fields
thread_messages: list[dict] = field(default_factory=list)
thread_summary: str = ""
workflow_state: dict = field(default_factory=dict)Then update each specialist to receive conversation context:
# orchestration/specialists.py (update call_specialist)
def call_specialist(
question: str,
system_prompt: str,
tools: list[dict],
tool_executor: callable,
thread_context: list[dict] | None = None,
) -> dict:
"""Call a specialist with its scoped prompt, tools, and conversation context."""
messages = [{"role": "system", "content": system_prompt}]
# Include thread context if available
if thread_context:
messages.extend(thread_context)
messages.append({"role": "user", "content": question})
# ... rest of the function unchangedWorkflow state: tracking multi-step progress
Workflow state is different from thread memory. Where thread memory is a sequence of messages, workflow state is a structured record of what the system is doing:
# orchestration/memory.py (continued)
@dataclass
class WorkflowState:
"""Structured state for a multi-step workflow.
Tracks what's been done, what's pending, and intermediate results.
Each workflow type can have its own state shape.
"""
workflow_id: str = ""
workflow_type: str = "" # "debug", "review", "refactor", etc.
status: str = "in_progress" # in_progress, paused, completed, failed
steps_completed: list[dict] = field(default_factory=list)
steps_pending: list[str] = field(default_factory=list)
intermediate_results: dict = field(default_factory=dict)
created_at: str = ""
updated_at: str = ""
def complete_step(self, step_name: str, result: dict) -> None:
"""Mark a step as complete and store its result."""
self.steps_completed.append({
"step": step_name,
"result": result,
"completed_at": datetime.now(timezone.utc).isoformat(),
})
if step_name in self.steps_pending:
self.steps_pending.remove(step_name)
self.updated_at = datetime.now(timezone.utc).isoformat()
def add_intermediate_result(self, key: str, value) -> None:
"""Store an intermediate result for use by later steps."""
self.intermediate_results[key] = value
self.updated_at = datetime.now(timezone.utc).isoformat()
def get_progress_summary(self) -> str:
"""Generate a summary of workflow progress for the LLM context."""
completed = [s["step"] for s in self.steps_completed]
summary = f"Workflow: {self.workflow_type} ({self.status})\n"
summary += f"Completed: {', '.join(completed) if completed else 'none'}\n"
summary += f"Pending: {', '.join(self.steps_pending) if self.steps_pending else 'none'}\n"
if self.intermediate_results:
summary += "Key findings:\n"
for key, value in self.intermediate_results.items():
summary += f" - {key}: {str(value)[:100]}\n"
return summary
def create_debug_workflow(symptom: str) -> WorkflowState:
"""Create a workflow for a debugging session."""
return WorkflowState(
workflow_type="debug",
steps_pending=[
"identify_symptom",
"locate_relevant_code",
"hypothesize_root_cause",
"verify_hypothesis",
"propose_fix",
],
intermediate_results={"initial_symptom": symptom},
created_at=datetime.now(timezone.utc).isoformat(),
updated_at=datetime.now(timezone.utc).isoformat(),
)Workflow state shines in multi-step tasks like debugging. Without it, each step in a debugging session starts from scratch, and the system has to re-discover the error, re-locate the relevant code, and re-derive the context. With workflow state, step three (hypothesize root cause) knows what step two found (the relevant code) and what step one identified (the symptom).
Managing context window pressure
As thread memory grows, it competes with retrieval evidence and specialist reasoning for context window space. Here's a practical approach to managing the budget:
# orchestration/context_budget.py
"""Context window budget management.
Allocates context window space across memory, retrieval, and generation
to prevent memory from crowding out useful content.
"""
# Target allocations as percentage of available context
BUDGET = {
"system_prompt": 0.10, # 10% for system instructions
"thread_memory": 0.20, # 20% for conversation history
"workflow_state": 0.05, # 5% for workflow progress
"retrieval_evidence": 0.40, # 40% for retrieved evidence
"generation": 0.25, # 25% reserved for model output
}
def allocate_context_budget(
total_tokens: int = 128_000,
task_type: str = "general",
) -> dict:
"""Calculate token budgets for each context section.
Adjusts based on task type:
- Conversation-heavy tasks get more thread memory budget
- Research-heavy tasks get more retrieval budget
"""
budget = BUDGET.copy()
if task_type == "follow_up":
# Follow-ups need more conversation context
budget["thread_memory"] = 0.30
budget["retrieval_evidence"] = 0.30
elif task_type == "research":
# Research needs more retrieval space
budget["thread_memory"] = 0.10
budget["retrieval_evidence"] = 0.50
return {
section: int(total_tokens * fraction)
for section, fraction in budget.items()
}
def trim_thread_to_budget(
thread: "ThreadMemory",
token_budget: int,
) -> list[dict]:
"""Trim thread memory to fit within a token budget.
Strategy: keep the summary and as many recent messages as fit.
"""
context = thread.get_context_messages()
# Estimate tokens
total_tokens = 0
trimmed = []
# Always include summary if present
if context and context[0]["role"] == "system" and "Summary" in context[0]["content"]:
summary_tokens = len(context[0]["content"]) // 4
if summary_tokens < token_budget:
trimmed.append(context[0])
total_tokens += summary_tokens
context = context[1:]
# Add recent messages from most recent backwards until budget is full.
# We reverse twice so the final message list stays chronological.
selected_recent = []
for msg in reversed(context):
msg_tokens = len(msg["content"]) // 4
if total_tokens + msg_tokens > token_budget:
break
selected_recent.append(msg)
total_tokens += msg_tokens
trimmed.extend(reversed(selected_recent))
return trimmedThe key insight here: memory isn't free. Every token of conversation history is a token that can't be used for evidence or reasoning. Setting explicit budgets and trimming to fit them prevents the slow quality degradation that happens when conversations grow long and memory crowds out useful context.
Integrating memory with the orchestration graph
Here's how thread memory and workflow state flow through the orchestration pipeline:
# orchestration/graph.py (updated pipeline with memory)
from orchestration.memory import ThreadMemory, WorkflowState
from orchestration.context_budget import allocate_context_budget, trim_thread_to_budget
# Session store — in production, use a database
_sessions: dict[str, ThreadMemory] = {}
_workflows: dict[str, WorkflowState] = {}
def get_or_create_session(session_id: str) -> ThreadMemory:
"""Get an existing session or create a new one."""
if session_id not in _sessions:
_sessions[session_id] = ThreadMemory(session_id=session_id)
return _sessions[session_id]
def run_with_memory(
question: str,
session_id: str,
workflow_id: str | None = None,
) -> dict:
"""Run the orchestration pipeline with memory context."""
# Get session memory
thread = get_or_create_session(session_id)
# Get workflow state if a workflow is active
workflow = _workflows.get(workflow_id) if workflow_id else None
# Budget allocation
budget = allocate_context_budget(
task_type="follow_up" if len(thread.messages) > 0 else "general",
)
# Trim memory to budget
thread_context = trim_thread_to_budget(thread, budget["thread_memory"])
# Build state for the graph.
# thread_messages will be passed into each specialist's message list
# by call_specialist() — see the specialist-design lesson for that wiring.
state = {
"question": question,
"thread_messages": thread_context,
"thread_summary": thread.summary,
}
if workflow:
# Workflow progress is included in the question context so the
# specialist knows what steps have already been completed.
# In a production system, you'd inject this into the system prompt
# or as a separate message rather than concatenating with the question.
workflow_context = workflow.get_progress_summary()
state["question"] = f"{workflow_context}\n\nCurrent question: {question}"
state["workflow_state"] = {
"progress": workflow_context,
"intermediate_results": workflow.intermediate_results,
}
# Run the orchestration graph
result = graph.invoke(state)
# Update memory with the new exchange
thread.add_message("user", question)
thread.add_message("assistant", result.get("final_answer", ""))
return resultMeasuring memory's impact
Thread memory and workflow state should improve quality on conversational and multi-step benchmark subsets. Here's how to measure:
# Create a conversational benchmark subset
# These are multi-turn question sequences where context matters
python harness/run_harness.py \
--pipeline orchestrated-with-memory \
--benchmark benchmark-conversational.jsonl
python harness/graders/answer_grader.py harness/runs/latest.jsonl
# Compare against the no-memory orchestrated system
python harness/compare_runs.py \
harness/runs/orchestrated-no-memory.jsonl \
harness/runs/orchestrated-with-memory.jsonlThe conversational benchmark should include question sequences like:
{"id": "conv-001a", "question": "What does the validate_path function do?", "session": "conv-001", "turn": 1, "gold_answer": "..."}
{"id": "conv-001b", "question": "What about its error handling?", "session": "conv-001", "turn": 2, "gold_answer": "..."}
{"id": "conv-001c", "question": "Could that cause issues with symlinks?", "session": "conv-001", "turn": 3, "gold_answer": "..."}Without thread memory, turn 2 has no idea what "its" refers to. With thread memory, the system connects "its error handling" to validate_path from turn 1. The quality difference on these sequences is where memory proves its value.
Exercises
-
Implement the
ThreadMemoryclass and integrate it with your orchestration pipeline. Run a 5-turn conversation and verify that follow-up questions resolve correctly (e.g., "What does that function do?" after discussing a file). -
Implement the summarization logic. Start a conversation, add 35+ messages, and verify that older messages get summarized while recent ones remain verbatim. Check that the summary preserves key facts.
-
Create a debug workflow using
WorkflowState. Walk through a 4-step debugging session and verify that each step can see the results of the previous steps. -
Implement the context budget system. Run the same 10-question benchmark with budget limits of 20%, 30%, and 40% for thread memory. Does more memory budget always help, or is there a point where it hurts retrieval quality?
-
Create a conversational benchmark subset (at least 5 multi-turn sequences). Compare accuracy with and without thread memory. What's the improvement on follow-up questions?
Completion checkpoint
You have:
- Thread memory that maintains conversation context across turns within a session
- Summarization that compresses older messages to manage context window pressure
- Workflow state that tracks multi-step task progress and intermediate results
- A context budget system that allocates tokens across memory, retrieval, and generation
- Measured improvement on a conversational benchmark subset showing that memory helps follow-up questions
Reflection prompts
- At what conversation length does thread memory start degrading quality instead of helping? How would you detect this automatically?
- Workflow state makes multi-step tasks explicit. What workflows in your code assistant would benefit most from structured state tracking?
- The context budget allocations in this lesson are starting points. Based on your benchmark results, what adjustments would improve quality?
What's next
Long-Term Memory and Write Policies. Session memory solves continuity, but some facts need to survive beyond the current run; the next lesson adds persistence without letting memory rot or accumulate garbage.
References
Start here
- LangGraph: Memory and persistence — LangGraph's built-in checkpointing and state persistence, which handles thread memory at the framework level
Build with this
- LangGraph: How to add memory — practical patterns for adding conversation memory to LangGraph agents
- Anthropic: Context window management — strategies for managing context pressure as memory grows
Deep dive
- Letta: Memory management for agents — an agent framework built around memory as a first-class concept, useful for understanding memory architecture patterns
- Anthropic: Building effective agents — the orchestration and state management patterns that inform memory design