Week 10 - Advanced

MCP, Context Engineering, Multi-Agent Systems

Master the art of context engineering, build MCP servers and clients, understand the A2A protocol, and orchestrate multi-agent systems for complex real-world tasks.

10+ hands-on examples ~7 hours of content Last updated: March 2026

1. Context Engineering

What is Context Engineering?

Context engineering is the discipline of designing, assembling, and optimizing what goes into an LLM's context window for each request. It is arguably the most important skill in AI engineering -- a model is only as good as the context it receives.

Think of it this way: if prompt engineering is about writing the right question, context engineering is about providing all the right information alongside that question. It encompasses:

  • What information to include (system prompts, examples, retrieved documents, conversation history, tool results)
  • How to format it (structure, ordering, emphasis)
  • How to fit it (token budgeting, compression, summarization)
  • When to update it (dynamic context assembly based on the current request)

Poor context engineering leads to hallucinations, irrelevant responses, and wasted tokens. Good context engineering leads to accurate, helpful, and cost-efficient AI systems.

Context Engineering Pipeline
graph TD A[User Query] --> B[Context Assembler] C[System Prompt] --> B D[Conversation History] --> B E[Retrieved Documents] --> B F[Tool Results] --> B B --> G[Token Budget Check] G -->|Fits| H[Send to LLM] G -->|Too Large| I[Compress / Summarize] I --> H style A fill:#1a1a2e,stroke:#e94560,color:#fff style B fill:#1a1a2e,stroke:#f5a623,color:#fff style H fill:#1a1a2e,stroke:#00d4aa,color:#fff style G fill:#1a1a2e,stroke:#7c4dff,color:#fff

Context Window Sizes (March 2026)

Model Context Window Approx Pages Notes
GPT-4o128K tokens~300 pagesGood balance of quality and context
GPT-4o-mini128K tokens~300 pagesCost-effective for large contexts
Claude 3.5 Sonnet / Claude 4200K tokens~500 pagesExcellent long-context performance
Gemini 2.0 Flash/Pro1M-2M tokens~2,500-5,000 pagesLargest available context windows
o3200K tokens~500 pagesUses many tokens for internal reasoning

Important: A larger context window does not mean you should always fill it. Longer contexts cost more, increase latency, and can actually reduce quality due to the "lost in the middle" phenomenon (models tend to pay less attention to information in the middle of very long contexts).

Strategies for Context Management

1. Prompt Design and System Prompts

The system prompt is the most persistent context -- it is included in every request. Design it carefully:

System Prompt Design Patterns

# Pattern 1: Role + Guidelines + Constraints
system_prompt_v1 = """
You are a senior financial analyst at a Fortune 500 company.

## Your Role
- Analyze financial data and provide actionable insights
- Create clear, executive-friendly summaries
- Flag risks and opportunities

## Guidelines
- Always cite specific numbers from the provided data
- Use bullet points for clarity
- Include confidence levels for projections
- Compare against industry benchmarks when possible

## Constraints
- Never provide specific investment advice
- Always include caveats for projections
- If data is insufficient, say so rather than guessing
"""

# Pattern 2: Structured with XML-like tags (works well with Claude)
system_prompt_v2 = """
Senior customer support agent for TechCo


- Empathetic and patient
- Professional but warm
- Solution-oriented



- search_kb: Search knowledge base
- lookup_order: Check order status
- process_refund: Handle refund requests



1. Always search the knowledge base before answering policy questions
2. Verify order IDs before looking them up
3. Confirm refund details before processing
4. Escalate if the customer asks for a human agent
5. Never share internal system details with customers



- Keep responses under 200 words unless detailed explanation is needed
- Use bullet points for multi-step instructions
- End with "Is there anything else I can help with?"

"""

# Pattern 3: Dynamic system prompt assembled at runtime
def build_system_prompt(user_profile: dict, current_context: dict) -> str:
    """Dynamically build a system prompt based on runtime context."""
    base = "You are a helpful AI assistant for our e-commerce platform.\n\n"

    # Add user-specific context
    if user_profile.get("is_premium"):
        base += "This is a PREMIUM customer. Provide priority service and mention exclusive benefits.\n"
    if user_profile.get("previous_issues"):
        base += f"Note: Customer has had {user_profile['previous_issues']} previous support tickets. Be extra attentive.\n"

    # Add time-based context
    if current_context.get("is_sale_period"):
        base += "CURRENT PROMOTION: 20% off all electronics. Mention this if relevant.\n"

    # Add relevant policies
    base += "\nKey policies to remember:\n"
    base += "- 30-day return policy\n"
    base += "- Free shipping over $50\n"

    return base
                        

2. Few-Shot Example Selection

Instead of including all examples, dynamically select the most relevant ones based on the current query:

Dynamic Few-Shot Selection

import numpy as np
from openai import OpenAI

client = OpenAI()

class FewShotSelector:
    """Select the most relevant few-shot examples using embeddings."""

    def __init__(self):
        self.examples = []
        self.embeddings = []

    def add_example(self, query: str, response: str, category: str = ""):
        """Add an example to the pool."""
        embedding = self._get_embedding(query)
        self.examples.append({
            "query": query,
            "response": response,
            "category": category,
            "embedding": embedding
        })
        self.embeddings.append(embedding)

    def _get_embedding(self, text: str) -> list[float]:
        """Get embedding for a text."""
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding

    def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
        """Calculate cosine similarity between two vectors."""
        a, b = np.array(a), np.array(b)
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

    def select(self, query: str, k: int = 3) -> list[dict]:
        """Select the k most relevant examples for a query."""
        query_embedding = self._get_embedding(query)

        similarities = [
            (i, self._cosine_similarity(query_embedding, emb))
            for i, emb in enumerate(self.embeddings)
        ]
        similarities.sort(key=lambda x: x[1], reverse=True)

        return [self.examples[i] for i, _ in similarities[:k]]

    def build_prompt(self, query: str, k: int = 3) -> list[dict]:
        """Build a few-shot prompt with selected examples."""
        examples = self.select(query, k)

        messages = []
        for ex in examples:
            messages.append({"role": "user", "content": ex["query"]})
            messages.append({"role": "assistant", "content": ex["response"]})

        messages.append({"role": "user", "content": query})
        return messages


# Usage
selector = FewShotSelector()

# Add a pool of examples
selector.add_example(
    "What is your return policy?",
    "We offer a 30-day return policy for unused items in original packaging...",
    category="policy"
)
selector.add_example(
    "How do I track my order?",
    "You can track your order by going to Orders > Track Order and entering your order ID...",
    category="orders"
)
selector.add_example(
    "The app keeps crashing on my phone",
    "I'm sorry to hear about the crashes. Let's try these steps: 1) Clear the app cache...",
    category="technical"
)
# Add more examples...

# At runtime, select the most relevant examples
query = "I need to return a defective product"
few_shot_messages = selector.build_prompt(query, k=2)
# This will select the return policy and possibly order tracking examples
                        

3. Dynamic Context Assembly

Context Assembly Pipeline

import tiktoken

class ContextAssembler:
    """
    Assemble the optimal context for each LLM call,
    staying within token budget while maximizing relevance.
    """

    def __init__(self, model: str = "gpt-4o", max_tokens: int = 128000):
        self.model = model
        self.max_tokens = max_tokens
        self.encoding = tiktoken.encoding_for_model(model)

        # Reserve tokens for different components
        self.token_budget = {
            "system_prompt": 2000,
            "few_shot_examples": 3000,
            "retrieved_documents": 8000,
            "conversation_history": 4000,
            "user_query": 1000,
            "output_reserve": 4000,  # Reserve for model's response
        }

    def count_tokens(self, text: str) -> int:
        """Count tokens in a text string."""
        return len(self.encoding.encode(text))

    def truncate_to_budget(self, text: str, budget: int) -> str:
        """Truncate text to fit within token budget."""
        tokens = self.encoding.encode(text)
        if len(tokens) <= budget:
            return text
        truncated_tokens = tokens[:budget]
        return self.encoding.decode(truncated_tokens) + "\n[...truncated]"

    def summarize_old_messages(self, messages: list[dict], keep_recent: int = 5) -> list[dict]:
        """Summarize older messages to save tokens."""
        if len(messages) <= keep_recent:
            return messages

        old_messages = messages[:-keep_recent]
        recent_messages = messages[-keep_recent:]

        # Create a summary of old messages
        old_text = "\n".join([f"{m['role']}: {m['content']}" for m in old_messages])

        from openai import OpenAI
        client = OpenAI()
        summary_response = client.chat.completions.create(
            model="gpt-4o-mini",  # Cheap model for summarization
            messages=[
                {"role": "system", "content": "Summarize this conversation concisely, preserving key facts and decisions."},
                {"role": "user", "content": old_text}
            ],
            max_tokens=500
        )
        summary = summary_response.choices[0].message.content

        return [
            {"role": "system", "content": f"[Conversation Summary] {summary}"}
        ] + recent_messages

    def assemble(
        self,
        system_prompt: str,
        user_query: str,
        conversation_history: list[dict] = None,
        retrieved_docs: list[str] = None,
        few_shot_examples: list[dict] = None,
    ) -> list[dict]:
        """Assemble the complete context within token budget."""
        messages = []

        # 1. System prompt (highest priority - always included)
        sys_text = self.truncate_to_budget(system_prompt, self.token_budget["system_prompt"])
        messages.append({"role": "system", "content": sys_text})

        # 2. Retrieved documents (injected into system or as context)
        if retrieved_docs:
            docs_text = "\n\n---\n\n".join(retrieved_docs)
            docs_text = self.truncate_to_budget(docs_text, self.token_budget["retrieved_documents"])
            messages.append({
                "role": "system",
                "content": f"Relevant context from knowledge base:\n\n{docs_text}"
            })

        # 3. Few-shot examples
        if few_shot_examples:
            remaining_budget = self.token_budget["few_shot_examples"]
            for example in few_shot_examples:
                example_tokens = self.count_tokens(example["query"] + example["response"])
                if example_tokens <= remaining_budget:
                    messages.append({"role": "user", "content": example["query"]})
                    messages.append({"role": "assistant", "content": example["response"]})
                    remaining_budget -= example_tokens
                else:
                    break

        # 4. Conversation history (summarize if too long)
        if conversation_history:
            history_tokens = sum(self.count_tokens(m["content"]) for m in conversation_history)
            if history_tokens > self.token_budget["conversation_history"]:
                conversation_history = self.summarize_old_messages(conversation_history)
            messages.extend(conversation_history)

        # 5. Current user query (always included)
        messages.append({"role": "user", "content": user_query})

        # Verify total tokens
        total_tokens = sum(self.count_tokens(m["content"]) for m in messages)
        print(f"[Context] Total tokens: {total_tokens} / {self.max_tokens}")
        print(f"[Context] Available for response: {self.max_tokens - total_tokens}")

        return messages


# Usage
assembler = ContextAssembler(model="gpt-4o")

messages = assembler.assemble(
    system_prompt="You are a helpful customer support agent...",
    user_query="What's the status of my order ORD-1001?",
    conversation_history=[
        {"role": "user", "content": "Hi, I need help with my order"},
        {"role": "assistant", "content": "Sure! I'd be happy to help. What's your order ID?"},
    ],
    retrieved_docs=[
        "Order ORD-1001: Status=shipped, Tracking=1Z999AA1, ETA=March 10",
        "Shipping Policy: Standard 5-7 days, Express 2-3 days",
    ]
)
                        

4. Context Compression

Context Compression Techniques

from openai import OpenAI

client = OpenAI()

def compress_context(long_text: str, target_ratio: float = 0.3) -> str:
    """
    Compress a long context while preserving key information.
    target_ratio: e.g., 0.3 means reduce to ~30% of original length.
    """
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": f"""Compress the following text to approximately {int(target_ratio*100)}% of its length.
Rules:
- Preserve ALL key facts, numbers, names, and decisions
- Remove filler words, redundancy, and examples
- Use abbreviations where clear
- Maintain the logical structure
- Output ONLY the compressed text"""
            },
            {"role": "user", "content": long_text}
        ]
    )
    return response.choices[0].message.content


def hierarchical_summarization(documents: list[str], final_length: int = 500) -> str:
    """
    Summarize a large collection of documents hierarchically:
    1. Summarize each document individually
    2. Combine summaries
    3. Create a final meta-summary
    """
    # Step 1: Individual summaries
    individual_summaries = []
    for i, doc in enumerate(documents):
        summary = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "Summarize this document in 2-3 sentences, preserving key facts."},
                {"role": "user", "content": doc}
            ]
        ).choices[0].message.content
        individual_summaries.append(f"Document {i+1}: {summary}")
        print(f"  Summarized document {i+1}: {len(doc)} chars -> {len(summary)} chars")

    # Step 2: Combine and create meta-summary
    combined = "\n\n".join(individual_summaries)
    final_summary = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": f"Create a comprehensive summary (max {final_length} words) from these document summaries. "
                          "Synthesize common themes, highlight key differences, and note important details."
            },
            {"role": "user", "content": combined}
        ]
    ).choices[0].message.content

    return final_summary
                        

Token Budget Planning

Every LLM request has a finite context window. Planning how to allocate tokens across components is essential:

Token Budget Planner

from dataclasses import dataclass

@dataclass
class TokenBudget:
    """Plan token allocation for an LLM request."""
    model: str
    max_context: int
    max_output: int

    # Allocations (in order of priority)
    system_prompt: int = 0
    retrieved_context: int = 0
    conversation_history: int = 0
    few_shot_examples: int = 0
    user_input: int = 0

    @property
    def total_input(self) -> int:
        return (self.system_prompt + self.retrieved_context +
                self.conversation_history + self.few_shot_examples + self.user_input)

    @property
    def remaining(self) -> int:
        return self.max_context - self.total_input - self.max_output

    def report(self) -> str:
        return f"""
Token Budget Report for {self.model}
{'='*50}
Max Context Window:    {self.max_context:>8,} tokens
Max Output:            {self.max_output:>8,} tokens
Available for Input:   {self.max_context - self.max_output:>8,} tokens
{'='*50}
System Prompt:         {self.system_prompt:>8,} tokens ({self.system_prompt/(self.max_context)*100:.1f}%)
Retrieved Context:     {self.retrieved_context:>8,} tokens ({self.retrieved_context/(self.max_context)*100:.1f}%)
Conversation History:  {self.conversation_history:>8,} tokens ({self.conversation_history/(self.max_context)*100:.1f}%)
Few-Shot Examples:     {self.few_shot_examples:>8,} tokens ({self.few_shot_examples/(self.max_context)*100:.1f}%)
User Input:            {self.user_input:>8,} tokens ({self.user_input/(self.max_context)*100:.1f}%)
{'='*50}
Total Input:           {self.total_input:>8,} tokens
Output Reserve:        {self.max_output:>8,} tokens
Remaining Buffer:      {self.remaining:>8,} tokens
{'='*50}
"""

# Plan for a RAG-powered support agent
budget = TokenBudget(
    model="gpt-4o",
    max_context=128000,
    max_output=4096,
    system_prompt=1500,
    retrieved_context=6000,
    conversation_history=3000,
    few_shot_examples=2000,
    user_input=500,
)
print(budget.report())
                        

2. Memory in Agents

Types of Memory

Inspired by human cognitive science, agent memory can be categorized into five types:

  • Short-term Memory (Working Memory): The current conversation context. What's in the context window right now. Limited by the context window size. Examples: current conversation messages, tool results from this session.
  • Long-term Memory (Persistent Storage): Information stored across sessions in a database or vector store. Persists indefinitely. Examples: user preferences, past interactions, learned facts.
  • Episodic Memory (Past Experiences): Memories of specific past interactions or events. "Last time this user had a billing issue, we resolved it by..." Enables learning from past experiences.
  • Semantic Memory (Facts and Knowledge): General knowledge and facts about the world or domain. The knowledge base, FAQs, documentation. Typically implemented as RAG.
  • Procedural Memory (Learned Skills): How to do things. Standard operating procedures, workflow patterns, learned strategies. Can be implemented as stored prompts or fine-tuned behaviors.

PRACTICAL: Implementing Memory Types

Complete Agent Memory System

"""
Agent Memory System
===================
Implements multiple memory types for a persistent, learning agent.
"""

import json
import sqlite3
from datetime import datetime
from typing import Optional
from openai import OpenAI

client = OpenAI()


class ConversationBufferMemory:
    """
    Short-term memory: Keep the full conversation in a buffer.
    Simple but grows linearly with conversation length.
    """
    def __init__(self, max_messages: int = 50):
        self.messages: list[dict] = []
        self.max_messages = max_messages

    def add(self, role: str, content: str):
        self.messages.append({"role": role, "content": content})
        if len(self.messages) > self.max_messages:
            self.messages = self.messages[-self.max_messages:]

    def get_messages(self) -> list[dict]:
        return self.messages.copy()

    def clear(self):
        self.messages = []


class SummaryMemory:
    """
    Compressed memory: Maintains a running summary of the conversation.
    Uses less tokens but loses some detail.
    """
    def __init__(self):
        self.summary: str = ""
        self.recent_messages: list[dict] = []
        self.max_recent = 5

    def add(self, role: str, content: str):
        self.recent_messages.append({"role": role, "content": content})
        if len(self.recent_messages) > self.max_recent * 2:
            self._compress()

    def _compress(self):
        """Compress old messages into the summary."""
        old_messages = self.recent_messages[:-self.max_recent]
        self.recent_messages = self.recent_messages[-self.max_recent:]

        old_text = "\n".join([f"{m['role']}: {m['content']}" for m in old_messages])
        prompt = f"Current summary: {self.summary}\n\nNew messages:\n{old_text}\n\nUpdate the summary to include the new information. Be concise but preserve key facts."

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "You are a conversation summarizer. Update the running summary."},
                {"role": "user", "content": prompt}
            ],
            max_tokens=300
        )
        self.summary = response.choices[0].message.content

    def get_context(self) -> list[dict]:
        messages = []
        if self.summary:
            messages.append({
                "role": "system",
                "content": f"Conversation summary so far: {self.summary}"
            })
        messages.extend(self.recent_messages)
        return messages


class VectorMemory:
    """
    Long-term memory using embeddings for semantic search.
    Stores memories as vectors for efficient retrieval.
    """
    def __init__(self, db_path: str = ":memory:"):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS memories (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                content TEXT NOT NULL,
                embedding TEXT NOT NULL,
                memory_type TEXT DEFAULT 'general',
                metadata TEXT,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
                access_count INTEGER DEFAULT 0,
                last_accessed DATETIME
            )
        """)
        self.conn.commit()

    def _get_embedding(self, text: str) -> list[float]:
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding

    def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
        import numpy as np
        a, b = np.array(a), np.array(b)
        return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

    def store(self, content: str, memory_type: str = "general", metadata: dict = None):
        """Store a memory with its embedding."""
        embedding = self._get_embedding(content)
        self.conn.execute(
            "INSERT INTO memories (content, embedding, memory_type, metadata) VALUES (?, ?, ?, ?)",
            (content, json.dumps(embedding), memory_type, json.dumps(metadata or {}))
        )
        self.conn.commit()

    def recall(self, query: str, k: int = 5, memory_type: str = None) -> list[dict]:
        """Retrieve the k most relevant memories."""
        query_embedding = self._get_embedding(query)

        # Fetch all memories (in production, use a vector DB like Pinecone/Chroma)
        cursor = self.conn.execute("SELECT * FROM memories")
        rows = cursor.fetchall()

        scored = []
        for row in rows:
            memory_embedding = json.loads(row[2])
            if memory_type and row[3] != memory_type:
                continue
            similarity = self._cosine_similarity(query_embedding, memory_embedding)
            scored.append({
                "id": row[0],
                "content": row[1],
                "memory_type": row[3],
                "similarity": similarity,
                "created_at": row[5]
            })

        scored.sort(key=lambda x: x["similarity"], reverse=True)

        # Update access counts
        for mem in scored[:k]:
            self.conn.execute(
                "UPDATE memories SET access_count = access_count + 1, last_accessed = ? WHERE id = ?",
                (datetime.now().isoformat(), mem["id"])
            )
        self.conn.commit()

        return scored[:k]


class EntityMemory:
    """
    Entity-based memory: Extract and track entities (people, things, concepts)
    across conversations. The agent remembers facts about specific entities.
    """
    def __init__(self):
        self.entities: dict[str, dict] = {}

    def update_from_conversation(self, messages: list[dict]):
        """Extract entities and facts from conversation messages."""
        conversation_text = "\n".join([f"{m['role']}: {m['content']}" for m in messages])

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": """Extract entities and their associated facts from this conversation.
Return JSON with format: {"entities": {"entity_name": {"type": "person/company/product/etc", "facts": ["fact1", "fact2"]}}}"""
                },
                {"role": "user", "content": conversation_text}
            ],
            response_format={"type": "json_object"}
        )

        try:
            extracted = json.loads(response.choices[0].message.content)
            for name, info in extracted.get("entities", {}).items():
                name_lower = name.lower()
                if name_lower not in self.entities:
                    self.entities[name_lower] = {"type": info.get("type", "unknown"), "facts": []}
                # Add new facts without duplicates
                existing_facts = set(self.entities[name_lower]["facts"])
                for fact in info.get("facts", []):
                    if fact not in existing_facts:
                        self.entities[name_lower]["facts"].append(fact)
        except json.JSONDecodeError:
            pass

    def get_context(self, relevant_entities: list[str] = None) -> str:
        """Get entity context as a string for the prompt."""
        if relevant_entities:
            entities = {k: v for k, v in self.entities.items() if k in [e.lower() for e in relevant_entities]}
        else:
            entities = self.entities

        if not entities:
            return ""

        lines = ["Known information about relevant entities:"]
        for name, info in entities.items():
            lines.append(f"\n{name.title()} ({info['type']}):")
            for fact in info["facts"]:
                lines.append(f"  - {fact}")

        return "\n".join(lines)


# =============================================================================
# Putting it all together: Agent with Multiple Memory Types
# =============================================================================

class MemoryAgent:
    """An agent with comprehensive memory capabilities."""

    def __init__(self, user_id: str):
        self.user_id = user_id
        self.buffer = ConversationBufferMemory(max_messages=20)
        self.summary = SummaryMemory()
        self.vector = VectorMemory()
        self.entities = EntityMemory()

    def chat(self, user_message: str) -> str:
        """Process a message with full memory support."""
        # 1. Store in short-term memory
        self.buffer.add("user", user_message)
        self.summary.add("user", user_message)

        # 2. Recall relevant long-term memories
        relevant_memories = self.vector.recall(user_message, k=3)
        memory_context = ""
        if relevant_memories:
            memory_context = "Relevant memories from past interactions:\n"
            for mem in relevant_memories:
                memory_context += f"- {mem['content']} (relevance: {mem['similarity']:.2f})\n"

        # 3. Get entity context
        entity_context = self.entities.get_context()

        # 4. Build the prompt
        system = f"""You are a helpful assistant with memory capabilities.
You remember past interactions and use them to provide better assistance.

{memory_context}
{entity_context}
"""
        messages = [{"role": "system", "content": system}]
        messages.extend(self.summary.get_context())
        messages.append({"role": "user", "content": user_message})

        # 5. Get response
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages
        )
        assistant_message = response.choices[0].message.content

        # 6. Update memories
        self.buffer.add("assistant", assistant_message)
        self.summary.add("assistant", assistant_message)

        # Store important information in long-term memory
        self.vector.store(
            f"User said: {user_message} | Assistant replied about: {assistant_message[:100]}",
            memory_type="conversation"
        )

        # Update entity memory periodically
        if len(self.buffer.messages) % 6 == 0:  # Every 3 exchanges
            self.entities.update_from_conversation(self.buffer.messages[-6:])

        return assistant_message


# Demo
if __name__ == "__main__":
    agent = MemoryAgent(user_id="user-001")

    # First conversation
    print(agent.chat("Hi! My name is Alice and I work at TechCorp."))
    print(agent.chat("I'm interested in your Enterprise plan for our team of 50 engineers."))
    print(agent.chat("Our budget is around $10,000 per month."))

    # Later conversation (agent should remember Alice and her needs)
    print("\n--- Later session ---")
    print(agent.chat("Hi again! Any updates on pricing for my team?"))
                        

3. Model Context Protocol (MCP)

What is MCP?

The Model Context Protocol (MCP) is an open standard created by Anthropic that defines how LLM applications communicate with external tools, data sources, and services. Think of it as a universal adapter -- instead of building custom integrations for every tool, you build one MCP server and any MCP-compatible client can use it.

The Problem MCP Solves: Before MCP, every AI application had to build custom integrations for each tool. Want your AI to access GitHub? Write a custom integration. Slack? Another custom integration. Database? Yet another. This led to fragmented, duplicated effort across the ecosystem. MCP standardizes this with a single protocol.

MCP Protocol Flow
graph LR A[AI Host App] --> B[MCP Client] B -->|JSON-RPC| C[MCP Server] C --> D[Tools] C --> E[Resources] C --> F[Prompts] D -->|Results| B E -->|Data| B F -->|Templates| B style A fill:#1a1a2e,stroke:#e94560,color:#fff style B fill:#1a1a2e,stroke:#f5a623,color:#fff style C fill:#1a1a2e,stroke:#00d4aa,color:#fff style D fill:#1a1a2e,stroke:#7c4dff,color:#fff style E fill:#1a1a2e,stroke:#7c4dff,color:#fff style F fill:#1a1a2e,stroke:#7c4dff,color:#fff

MCP Architecture

MCP follows a client-server architecture:

  • MCP Host: The AI application (e.g., Claude Desktop, a custom chatbot). Contains the LLM and orchestrates everything.
  • MCP Client: A protocol client inside the host that connects to MCP servers. One client per server connection.
  • MCP Server: A lightweight program that exposes specific capabilities (tools, resources, prompts) via the MCP protocol. Each server typically wraps one service (e.g., a GitHub MCP server, a database MCP server).

MCP Concepts

  • Tools: Functions the LLM can call (similar to function calling). Example: create_github_issue, query_database. Tools are model-controlled -- the LLM decides when to use them.
  • Resources: Data the LLM can read (like GET endpoints). Example: file contents, database records, API data. Resources are application-controlled -- the host app decides when to fetch them.
  • Prompts: Pre-built prompt templates that servers can expose. Example: a "code review" prompt template. User-controlled -- the user selects which prompt to use.
  • Sampling: Allows servers to request the LLM to generate text (server-initiated LLM calls). Enables complex agentic behaviors where the server needs LLM reasoning.

How MCP Differs from Function Calling

Aspect Function Calling MCP
ScopeSingle API callFull protocol with lifecycle
DiscoveryTools defined in codeDynamic tool discovery
TransportHTTP APIstdio, HTTP/SSE
StandardizationProvider-specificOpen standard
FeaturesTools onlyTools + Resources + Prompts + Sampling
ReusabilityApp-specificAny MCP client can use any MCP server

PRACTICAL: Build an MCP Server in Python

MCP Server - Weather and Notes Service

# pip install mcp

"""
MCP Server: Weather & Notes
============================
An MCP server that provides:
- Tools: get_weather, add_note, search_notes
- Resources: notes://list (list all notes)
- Prompts: daily_briefing (a prompt template)
"""

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
    Tool, TextContent, Resource, ResourceTemplate,
    Prompt, PromptMessage, PromptArgument,
    GetPromptResult, ReadResourceResult
)
import json
from datetime import datetime

# Create the MCP server
server = Server("weather-notes-server")

# In-memory storage for notes
notes_db: list[dict] = []


# =============================================================================
# TOOLS: Functions the LLM can call
# =============================================================================

@server.list_tools()
async def list_tools() -> list[Tool]:
    """List available tools."""
    return [
        Tool(
            name="get_weather",
            description="Get current weather for a location. Returns temperature, conditions, and humidity.",
            inputSchema={
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "City name, e.g., 'San Francisco' or 'London'"
                    }
                },
                "required": ["location"]
            }
        ),
        Tool(
            name="add_note",
            description="Add a note to the notebook. Notes are persisted for the session.",
            inputSchema={
                "type": "object",
                "properties": {
                    "title": {
                        "type": "string",
                        "description": "Title of the note"
                    },
                    "content": {
                        "type": "string",
                        "description": "Content of the note"
                    },
                    "tags": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "Optional tags for categorization"
                    }
                },
                "required": ["title", "content"]
            }
        ),
        Tool(
            name="search_notes",
            description="Search notes by keyword in title or content.",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Search keyword"
                    }
                },
                "required": ["query"]
            }
        )
    ]


@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Handle tool calls."""
    if name == "get_weather":
        location = arguments["location"]
        # Simulated weather data
        weather = {
            "location": location,
            "temperature": 22,
            "unit": "celsius",
            "conditions": "partly cloudy",
            "humidity": 65,
            "wind": "12 km/h NW",
            "forecast": "Clear skies expected this afternoon",
            "timestamp": datetime.now().isoformat()
        }
        return [TextContent(type="text", text=json.dumps(weather, indent=2))]

    elif name == "add_note":
        note = {
            "id": len(notes_db) + 1,
            "title": arguments["title"],
            "content": arguments["content"],
            "tags": arguments.get("tags", []),
            "created_at": datetime.now().isoformat()
        }
        notes_db.append(note)
        return [TextContent(type="text", text=f"Note added successfully: #{note['id']} - {note['title']}")]

    elif name == "search_notes":
        query = arguments["query"].lower()
        results = [
            n for n in notes_db
            if query in n["title"].lower() or query in n["content"].lower()
        ]
        return [TextContent(type="text", text=json.dumps(results, indent=2))]

    return [TextContent(type="text", text=f"Unknown tool: {name}")]


# =============================================================================
# RESOURCES: Data the application can read
# =============================================================================

@server.list_resources()
async def list_resources() -> list[Resource]:
    """List available resources."""
    return [
        Resource(
            uri="notes://list",
            name="All Notes",
            description="List all notes in the notebook",
            mimeType="application/json"
        )
    ]

@server.read_resource()
async def read_resource(uri: str) -> ReadResourceResult:
    """Read a resource."""
    if uri == "notes://list":
        return ReadResourceResult(
            contents=[TextContent(type="text", text=json.dumps(notes_db, indent=2))]
        )
    raise ValueError(f"Unknown resource: {uri}")


# =============================================================================
# PROMPTS: Pre-built prompt templates
# =============================================================================

@server.list_prompts()
async def list_prompts() -> list[Prompt]:
    """List available prompts."""
    return [
        Prompt(
            name="daily_briefing",
            description="Generate a daily briefing with weather and notes summary",
            arguments=[
                PromptArgument(
                    name="location",
                    description="Location for weather",
                    required=True
                )
            ]
        )
    ]

@server.get_prompt()
async def get_prompt(name: str, arguments: dict) -> GetPromptResult:
    """Get a prompt template."""
    if name == "daily_briefing":
        location = arguments.get("location", "San Francisco")
        notes_summary = f"{len(notes_db)} notes in notebook" if notes_db else "No notes yet"
        return GetPromptResult(
            description=f"Daily briefing for {location}",
            messages=[
                PromptMessage(
                    role="user",
                    content=TextContent(
                        type="text",
                        text=f"Please give me a daily briefing. Check the weather in {location} "
                             f"and summarize my notes. Current notes: {notes_summary}"
                    )
                )
            ]
        )
    raise ValueError(f"Unknown prompt: {name}")


# =============================================================================
# Run the server
# =============================================================================

async def main():
    """Run the MCP server over stdio."""
    async with stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream, server.create_initialization_options())

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())
                        

PRACTICAL: Build an MCP Client

MCP Client - Connecting to MCP Servers

# pip install mcp anthropic

"""
MCP Client
==========
Connects to MCP servers, discovers their tools, and uses them with Claude.
"""

import asyncio
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from anthropic import Anthropic

anthropic_client = Anthropic()


class MCPAgent:
    """An agent that connects to MCP servers and uses their tools with Claude."""

    def __init__(self):
        self.sessions: dict[str, ClientSession] = {}
        self.all_tools: list[dict] = []
        self.tool_to_session: dict[str, str] = {}

    async def connect_server(self, name: str, command: str, args: list[str] = None):
        """Connect to an MCP server."""
        server_params = StdioServerParameters(
            command=command,
            args=args or []
        )

        # Create the connection
        read, write = await stdio_client(server_params).__aenter__()
        session = ClientSession(read, write)
        await session.__aenter__()
        await session.initialize()

        self.sessions[name] = session

        # Discover tools
        tools_result = await session.list_tools()
        for tool in tools_result.tools:
            tool_dict = {
                "name": tool.name,
                "description": tool.description,
                "input_schema": tool.inputSchema
            }
            self.all_tools.append(tool_dict)
            self.tool_to_session[tool.name] = name

        print(f"Connected to '{name}': {len(tools_result.tools)} tools available")

    async def call_tool(self, tool_name: str, arguments: dict) -> str:
        """Call a tool on the appropriate MCP server."""
        server_name = self.tool_to_session.get(tool_name)
        if not server_name:
            return f"Error: Unknown tool '{tool_name}'"

        session = self.sessions[server_name]
        result = await session.call_tool(tool_name, arguments)

        # Extract text from the result
        texts = [c.text for c in result.content if hasattr(c, 'text')]
        return "\n".join(texts)

    async def chat(self, user_message: str) -> str:
        """Chat with Claude using MCP tools."""
        # Format tools for Anthropic API
        anthropic_tools = [
            {
                "name": tool["name"],
                "description": tool["description"],
                "input_schema": tool["input_schema"]
            }
            for tool in self.all_tools
        ]

        messages = [{"role": "user", "content": user_message}]

        # Agent loop
        for _ in range(10):
            response = anthropic_client.messages.create(
                model="claude-sonnet-4-20250514",
                max_tokens=4096,
                system="You are a helpful assistant with access to various tools via MCP servers.",
                tools=anthropic_tools,
                messages=messages
            )

            if response.stop_reason == "tool_use":
                messages.append({"role": "assistant", "content": response.content})

                tool_results = []
                for block in response.content:
                    if block.type == "tool_use":
                        print(f"  [MCP Tool Call] {block.name}({json.dumps(block.input)})")
                        result = await self.call_tool(block.name, block.input)
                        print(f"  [MCP Result] {result[:200]}")

                        tool_results.append({
                            "type": "tool_result",
                            "tool_use_id": block.id,
                            "content": result
                        })

                messages.append({"role": "user", "content": tool_results})
            else:
                # Final response
                return "".join(b.text for b in response.content if hasattr(b, "text"))

        return "Max iterations reached."

    async def cleanup(self):
        """Close all server connections."""
        for session in self.sessions.values():
            await session.__aexit__(None, None, None)


# Usage
async def main():
    agent = MCPAgent()

    # Connect to MCP servers
    await agent.connect_server(
        "weather-notes",
        "python",
        ["weather_notes_server.py"]
    )

    # Chat with Claude using MCP tools
    response = await agent.chat("What's the weather in Tokyo? Also, save a note about it.")
    print(f"\nClaude: {response}")

    response = await agent.chat("Search my notes for weather.")
    print(f"\nClaude: {response}")

    await agent.cleanup()

if __name__ == "__main__":
    asyncio.run(main())
                        

Available MCP Servers (Community Ecosystem)

The MCP ecosystem has grown rapidly. Here are some notable servers available as of March 2026:

  • Filesystem: Read, write, and manage local files
  • GitHub: Create issues, PRs, search repos, manage workflows
  • Slack: Send messages, search conversations, manage channels
  • PostgreSQL/SQLite: Query databases, explore schemas
  • Brave Search: Web search with the Brave Search API
  • Google Drive: Search, read, create documents
  • Puppeteer/Playwright: Browser automation and web scraping
  • Memory: Persistent knowledge graph memory
  • Linear: Project management and issue tracking
  • Sentry: Error tracking and monitoring

4. A2A (Agent-to-Agent Protocol)

What is A2A?

The Agent-to-Agent (A2A) Protocol is an open standard proposed by Google that enables AI agents to communicate and collaborate with each other, regardless of their underlying framework or vendor. While MCP connects agents to tools, A2A connects agents to other agents.

Key Concepts

  • Agent Cards: JSON metadata files that describe an agent's capabilities, endpoint, and supported skills. Like a business card for agents. Published at a well-known URL (e.g., /.well-known/agent.json).
  • Tasks: Units of work that one agent sends to another. Tasks have lifecycles: submitted, working, input-needed, completed, failed, canceled.
  • Messages: Communication between agents within a task. Each message can contain multiple parts (text, files, structured data).
  • Artifacts: Output produced by an agent during task execution (documents, images, data).

A2A vs MCP

A2A and MCP are complementary, not competing protocols:

  • MCP: Connects an agent to tools and data (agent-to-tool). Like USB connecting a computer to peripherals.
  • A2A: Connects an agent to other agents (agent-to-agent). Like networking connecting computers to each other.

In a production system, an agent might use MCP to access its tools and A2A to collaborate with other agents.

A2A Agent Card Example

{
  "name": "Research Agent",
  "description": "An agent that researches topics and provides comprehensive summaries",
  "url": "https://research-agent.example.com",
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": false
  },
  "skills": [
    {
      "id": "web_research",
      "name": "Web Research",
      "description": "Research a topic by searching the web and summarizing findings",
      "inputModes": ["text"],
      "outputModes": ["text"]
    },
    {
      "id": "fact_check",
      "name": "Fact Checking",
      "description": "Verify claims against reliable sources",
      "inputModes": ["text"],
      "outputModes": ["text"]
    }
  ],
  "authentication": {
    "schemes": ["bearer"]
  }
}
                        
A2A Task Flow (Conceptual Python)

"""
Conceptual A2A interaction between a Coordinator Agent and a Research Agent.
This shows the protocol flow, not a runnable implementation.
"""

import httpx
import json

class A2AClient:
    """Simple A2A client for sending tasks to other agents."""

    def __init__(self, agent_url: str, auth_token: str = None):
        self.agent_url = agent_url
        self.headers = {"Content-Type": "application/json"}
        if auth_token:
            self.headers["Authorization"] = f"Bearer {auth_token}"

    async def discover(self) -> dict:
        """Discover agent capabilities via Agent Card."""
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"{self.agent_url}/.well-known/agent.json",
                headers=self.headers
            )
            return response.json()

    async def send_task(self, skill_id: str, message: str) -> dict:
        """Send a task to the agent."""
        task = {
            "jsonrpc": "2.0",
            "method": "tasks/send",
            "params": {
                "id": f"task-{id(message)}",
                "message": {
                    "role": "user",
                    "parts": [{"type": "text", "text": message}]
                },
                "skillId": skill_id
            }
        }
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.agent_url}/a2a",
                json=task,
                headers=self.headers
            )
            return response.json()

    async def get_task_status(self, task_id: str) -> dict:
        """Check task status."""
        request = {
            "jsonrpc": "2.0",
            "method": "tasks/get",
            "params": {"id": task_id}
        }
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.agent_url}/a2a",
                json=request,
                headers=self.headers
            )
            return response.json()


# Coordinator agent using A2A to delegate to specialized agents
async def coordinator_workflow(topic: str):
    """A coordinator that uses A2A to delegate research tasks."""
    research_agent = A2AClient("https://research-agent.example.com")
    writer_agent = A2AClient("https://writer-agent.example.com")

    # 1. Discover capabilities
    research_card = await research_agent.discover()
    print(f"Research Agent: {research_card['name']} - {len(research_card['skills'])} skills")

    # 2. Send research task
    research_result = await research_agent.send_task(
        "web_research",
        f"Research the following topic thoroughly: {topic}"
    )

    # 3. Send writing task with research results
    writer_result = await writer_agent.send_task(
        "write_article",
        f"Write an article based on this research: {research_result}"
    )

    return writer_result
                        

5. Multi-Agent Systems

Why Multi-Agent?

Multi-agent systems use multiple specialized AI agents that collaborate to solve complex tasks. The advantages include:

  • Specialization: Each agent can have a focused system prompt, specific tools, and domain expertise. A "researcher" agent is better at research than a general-purpose agent.
  • Divide and Conquer: Complex tasks can be decomposed across agents working in parallel.
  • Separation of Concerns: Each agent has a clear responsibility, making the system easier to debug, test, and improve.
  • Scalability: Add new agents for new capabilities without modifying existing ones.
  • Model Mixing: Use expensive models (GPT-4o, Claude 4) for complex reasoning agents and cheap models (GPT-4o-mini) for simple task agents.
Multi-Agent System Architecture
graph TD A[User Task] --> B[Supervisor Agent] B --> C[Researcher Agent] B --> D[Writer Agent] B --> E[Reviewer Agent] C -->|Findings| B D -->|Draft| B E -->|Feedback| B B --> F[Final Output] C --- G[Search Tools] D --- H[Writing Tools] E --- I[Eval Tools] style A fill:#1a1a2e,stroke:#e94560,color:#fff style B fill:#1a1a2e,stroke:#f5a623,color:#fff style C fill:#1a1a2e,stroke:#00d4aa,color:#fff style D fill:#1a1a2e,stroke:#00d4aa,color:#fff style E fill:#1a1a2e,stroke:#00d4aa,color:#fff style F fill:#1a1a2e,stroke:#7c4dff,color:#fff

Multi-Agent Architectures

1. Supervisor Pattern

One agent (the supervisor) coordinates all other agents. The supervisor receives the task, decides which agents to invoke and in what order, and assembles the final result.

2. Peer-to-Peer Pattern

Agents communicate directly with each other without a central coordinator. Each agent can request help from any other agent. More flexible but harder to control.

3. Hierarchical Pattern

Multiple layers of supervisors. A top-level supervisor delegates to mid-level supervisors, who coordinate their own teams of worker agents. Useful for very complex tasks.

4. Debate/Consensus Pattern

Multiple agents independently tackle the same task, then discuss their approaches and reach consensus. Improves quality by leveraging diverse "perspectives."

PRACTICAL: Multi-Agent Content Creation System

Multi-Agent Content Pipeline with CrewAI-Style Architecture

"""
Multi-Agent Content Creation System
====================================
4 specialized agents collaborate to create high-quality content:
1. Researcher: Gathers information
2. Writer: Creates the content
3. Editor: Reviews and improves
4. Coordinator: Manages the workflow
"""

import openai
import json
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

client = openai.OpenAI()


class AgentRole(Enum):
    COORDINATOR = "coordinator"
    RESEARCHER = "researcher"
    WRITER = "writer"
    EDITOR = "editor"


@dataclass
class AgentConfig:
    role: AgentRole
    model: str
    system_prompt: str
    temperature: float = 0.7


@dataclass
class TaskResult:
    agent: str
    output: str
    metadata: dict = field(default_factory=dict)


class Agent:
    """A specialized agent with a specific role and capabilities."""

    def __init__(self, config: AgentConfig):
        self.config = config
        self.history: list[dict] = []

    def run(self, task: str, context: str = "") -> TaskResult:
        """Execute a task with optional context from other agents."""
        prompt = task
        if context:
            prompt = f"Context from other agents:\n{context}\n\n---\n\nYour task:\n{task}"

        messages = [
            {"role": "system", "content": self.config.system_prompt},
            {"role": "user", "content": prompt}
        ]

        response = client.chat.completions.create(
            model=self.config.model,
            messages=messages,
            temperature=self.config.temperature,
            max_tokens=4000
        )

        output = response.choices[0].message.content
        self.history.append({"task": task, "output": output})

        return TaskResult(
            agent=self.config.role.value,
            output=output,
            metadata={
                "model": self.config.model,
                "tokens": response.usage.total_tokens
            }
        )


class ContentCreationCrew:
    """A multi-agent crew for content creation."""

    def __init__(self):
        self.agents = self._create_agents()
        self.task_log: list[TaskResult] = []

    def _create_agents(self) -> dict[str, Agent]:
        return {
            "coordinator": Agent(AgentConfig(
                role=AgentRole.COORDINATOR,
                model="gpt-4o",
                system_prompt="""You are a content production coordinator. Your job is to:
1. Break down content requests into specific tasks
2. Review agent outputs and provide feedback
3. Ensure quality standards are met
4. Assemble the final deliverable

Be specific in your task assignments. Include format, length, and quality requirements.""",
                temperature=0.3
            )),
            "researcher": Agent(AgentConfig(
                role=AgentRole.RESEARCHER,
                model="gpt-4o",
                system_prompt="""You are an expert researcher. Your job is to:
1. Gather comprehensive information on any topic
2. Identify key facts, statistics, and trends
3. Find multiple perspectives and sources
4. Organize findings in a clear, structured format

Always include specific data points, dates, and verifiable facts.
Cite sources where possible. Separate facts from opinions.""",
                temperature=0.5
            )),
            "writer": Agent(AgentConfig(
                role=AgentRole.WRITER,
                model="gpt-4o",
                system_prompt="""You are a skilled content writer. Your job is to:
1. Transform research into engaging, well-structured content
2. Write in a clear, compelling style
3. Use appropriate tone for the target audience
4. Include relevant examples and analogies

Focus on readability, flow, and engagement.
Use short paragraphs, subheadings, and bullet points where appropriate.""",
                temperature=0.8
            )),
            "editor": Agent(AgentConfig(
                role=AgentRole.EDITOR,
                model="gpt-4o",
                system_prompt="""You are a meticulous editor. Your job is to:
1. Review content for clarity, accuracy, and engagement
2. Fix grammar, spelling, and punctuation errors
3. Improve sentence structure and flow
4. Ensure consistency in tone and style
5. Fact-check claims against the provided research

Provide your edited version AND a list of changes made.
Rate the content on a 1-10 scale for: clarity, accuracy, engagement, completeness.""",
                temperature=0.3
            ))
        }

    def create_content(self, topic: str, content_type: str = "blog post",
                       target_audience: str = "general", word_count: int = 1000) -> str:
        """
        Run the full content creation pipeline.
        """
        print(f"\n{'='*70}")
        print(f"CONTENT CREATION PIPELINE")
        print(f"Topic: {topic}")
        print(f"Type: {content_type} | Audience: {target_audience} | Words: {word_count}")
        print(f"{'='*70}")

        # Phase 1: Coordinator creates the plan
        print("\n[Phase 1] Coordinator creating plan...")
        plan_result = self.agents["coordinator"].run(
            f"Create a detailed content plan for:\n"
            f"- Topic: {topic}\n"
            f"- Type: {content_type}\n"
            f"- Target audience: {target_audience}\n"
            f"- Target length: {word_count} words\n\n"
            f"Define specific tasks for the Researcher, Writer, and Editor agents."
        )
        self.task_log.append(plan_result)
        print(f"  Plan created ({plan_result.metadata['tokens']} tokens)")

        # Phase 2: Researcher gathers information
        print("\n[Phase 2] Researcher gathering information...")
        research_result = self.agents["researcher"].run(
            f"Research the following topic thoroughly for a {content_type}:\n\n"
            f"Topic: {topic}\n"
            f"Target audience: {target_audience}\n\n"
            f"Coordinator's research brief:\n{plan_result.output}\n\n"
            f"Provide comprehensive, well-organized research with specific facts and data."
        )
        self.task_log.append(research_result)
        print(f"  Research complete ({research_result.metadata['tokens']} tokens)")

        # Phase 3: Writer creates the content
        print("\n[Phase 3] Writer creating content...")
        writing_result = self.agents["writer"].run(
            f"Write a {content_type} about '{topic}' for {target_audience}.\n"
            f"Target length: {word_count} words.\n\n"
            f"Use this research as your foundation (cite specific facts):\n",
            context=f"Research findings:\n{research_result.output}\n\n"
                    f"Content plan:\n{plan_result.output}"
        )
        self.task_log.append(writing_result)
        print(f"  Draft complete ({writing_result.metadata['tokens']} tokens)")

        # Phase 4: Editor reviews and improves
        print("\n[Phase 4] Editor reviewing and improving...")
        editing_result = self.agents["editor"].run(
            f"Review and edit this {content_type}. Check against the research for accuracy.",
            context=f"Original research:\n{research_result.output}\n\n"
                    f"Draft to edit:\n{writing_result.output}"
        )
        self.task_log.append(editing_result)
        print(f"  Editing complete ({editing_result.metadata['tokens']} tokens)")

        # Phase 5: Coordinator final review
        print("\n[Phase 5] Coordinator final review...")
        final_result = self.agents["coordinator"].run(
            f"Review the final edited content. If it meets quality standards, "
            f"output the final version. If not, note what needs improvement.\n\n"
            f"Quality criteria:\n"
            f"- Accuracy (verified against research)\n"
            f"- Completeness (covers all key points)\n"
            f"- Engagement (interesting and readable)\n"
            f"- Appropriate length (~{word_count} words)",
            context=f"Edited content:\n{editing_result.output}\n\n"
                    f"Original research:\n{research_result.output}"
        )
        self.task_log.append(final_result)
        print(f"  Final review complete ({final_result.metadata['tokens']} tokens)")

        # Summary
        total_tokens = sum(r.metadata.get("tokens", 0) for r in self.task_log)
        print(f"\n{'='*70}")
        print(f"PIPELINE COMPLETE")
        print(f"Total agents used: 4")
        print(f"Total tasks: {len(self.task_log)}")
        print(f"Total tokens: {total_tokens:,}")
        print(f"{'='*70}")

        return final_result.output


# Usage
if __name__ == "__main__":
    crew = ContentCreationCrew()
    result = crew.create_content(
        topic="The Rise of AI Agents in Enterprise Software (2026)",
        content_type="blog post",
        target_audience="tech-savvy business leaders",
        word_count=1200
    )
    print("\n\nFINAL CONTENT:")
    print("=" * 70)
    print(result)
                        

6. Optimizing Agentic Flows

Performance and Cost Optimization

Reducing Latency

  • Parallel Tool Execution: When multiple tools are called in a single turn, execute them concurrently rather than sequentially. OpenAI and Anthropic both support parallel tool calls.
  • Model Selection by Task: Use fast, cheap models (GPT-4o-mini, Claude 3.5 Haiku) for simple tasks (classification, extraction) and powerful models for complex reasoning.
  • Streaming: Stream agent responses to reduce perceived latency. Show the user partial results as they arrive.
  • Speculative Execution: Start likely next steps before confirming the current step. If 90% of queries need a KB search, start searching while classifying.

Cost Optimization

Cost-Aware Model Selection

from enum import Enum

class TaskComplexity(Enum):
    SIMPLE = "simple"       # Classification, extraction, formatting
    MEDIUM = "medium"       # Summarization, Q&A, simple analysis
    COMPLEX = "complex"     # Multi-step reasoning, creative writing, coding
    CRITICAL = "critical"   # High-stakes decisions, complex analysis

# Model selection based on task complexity
MODEL_MAP = {
    TaskComplexity.SIMPLE: "gpt-4o-mini",       # ~$0.15/M input, $0.60/M output
    TaskComplexity.MEDIUM: "gpt-4o-mini",        # Same, still very capable
    TaskComplexity.COMPLEX: "gpt-4o",            # ~$2.50/M input, $10/M output
    TaskComplexity.CRITICAL: "gpt-4o",           # Use the best for critical tasks
}

# Approximate costs per 1M tokens (March 2026 pricing)
COST_TABLE = {
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "claude-3-5-haiku": {"input": 0.80, "output": 4.00},
    "claude-sonnet-4": {"input": 3.00, "output": 15.00},
}

def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    """Estimate cost for an API call."""
    costs = COST_TABLE.get(model, {"input": 5.0, "output": 15.0})
    return (input_tokens * costs["input"] + output_tokens * costs["output"]) / 1_000_000

def select_model(task_description: str) -> str:
    """Select the appropriate model based on task complexity."""
    # In production, use a classifier to determine complexity
    # Simple heuristic for demonstration:
    complex_keywords = ["analyze", "reason", "compare", "evaluate", "create", "design", "plan"]
    simple_keywords = ["classify", "extract", "format", "convert", "list"]

    desc_lower = task_description.lower()
    if any(k in desc_lower for k in complex_keywords):
        return MODEL_MAP[TaskComplexity.COMPLEX]
    return MODEL_MAP[TaskComplexity.SIMPLE]
                        

Caching Strategies

LLM Response Caching

import hashlib
import json
import time
from functools import wraps

class LLMCache:
    """
    Simple LLM response cache.
    In production, use Redis or a dedicated caching layer.
    """

    def __init__(self, ttl_seconds: int = 3600):
        self.cache: dict[str, dict] = {}
        self.ttl = ttl_seconds
        self.hits = 0
        self.misses = 0

    def _make_key(self, model: str, messages: list, **kwargs) -> str:
        """Create a deterministic cache key from the request."""
        key_data = json.dumps({
            "model": model,
            "messages": messages,
            **{k: v for k, v in kwargs.items() if k != "stream"}
        }, sort_keys=True)
        return hashlib.sha256(key_data.encode()).hexdigest()

    def get(self, model: str, messages: list, **kwargs) -> str | None:
        """Get a cached response if available."""
        key = self._make_key(model, messages, **kwargs)
        entry = self.cache.get(key)
        if entry and (time.time() - entry["timestamp"]) < self.ttl:
            self.hits += 1
            return entry["response"]
        self.misses += 1
        return None

    def set(self, model: str, messages: list, response: str, **kwargs):
        """Cache a response."""
        key = self._make_key(model, messages, **kwargs)
        self.cache[key] = {
            "response": response,
            "timestamp": time.time()
        }

    @property
    def hit_rate(self) -> float:
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0.0


# Usage with OpenAI
cache = LLMCache(ttl_seconds=3600)

def cached_llm_call(model: str, messages: list, **kwargs) -> str:
    """LLM call with caching."""
    # Only cache deterministic calls (temperature=0)
    if kwargs.get("temperature", 1.0) == 0:
        cached = cache.get(model, messages, **kwargs)
        if cached:
            print("[CACHE HIT]")
            return cached

    # Make the actual API call
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model=model, messages=messages, **kwargs
    )
    result = response.choices[0].message.content

    # Cache if deterministic
    if kwargs.get("temperature", 1.0) == 0:
        cache.set(model, messages, result, **kwargs)

    return result
                        

Observability and Debugging

Agent systems are notoriously hard to debug. Essential observability tools include:

  • LangSmith: Tracing platform from LangChain. Records every LLM call, tool call, and intermediate step. Essential for debugging LangGraph agents.
  • Arize Phoenix: Open-source LLM observability. Tracks traces, evaluates quality, and identifies issues.
  • Braintrust: Evaluation and logging platform. Combines tracing with automated evals.
  • Custom Logging: At minimum, log every agent iteration: the input, the model's decision, tool calls and results, and the final output. Include timestamps and cost data.

Week 10 Summary

Key Takeaways

  • Context engineering is the art of assembling the optimal information for each LLM request. It includes system prompts, few-shot examples, retrieved context, and conversation history.
  • Agents need multiple types of memory: short-term (buffer), long-term (vector store), episodic (past interactions), semantic (knowledge), and procedural (skills).
  • MCP (Model Context Protocol) is an open standard for connecting LLMs to tools and data sources. Build once, connect to any MCP-compatible client.
  • A2A (Agent-to-Agent Protocol) enables agents to discover and communicate with each other. Complementary to MCP.
  • Multi-agent systems enable specialization and parallel work. Key patterns: supervisor, peer-to-peer, hierarchical, and debate/consensus.
  • Optimize agents through model selection, caching, parallel execution, and comprehensive observability.

Next Week Preview

In Week 11, we dive into evaluations (evals) and production AI. You will learn how to measure AI quality, detect hallucinations, and build robust production systems.