📁 Project Structure
Each agent is its own self-contained service with its own agent.py (the ADK agent definition) and adk_app.py (a thin wrapper invoking the shared CLI). The web app does NOT import ADK — it talks to the Orchestrator service over HTTP/SSE.
projects/bookshelf/
├── pyproject.toml ← deps pinned to ADK 1.27.x + a2a-sdk 0.3.x
├── start.bat ← spawns 5 services on Windows
├── stop.bat ← kills processes on ports 8000-8004
├── README.md
├── .env.example
├── shared/ ← Course Creator's shared infra verbatim
│ ├── adk_app.py ← click CLI wrapping ADK's fast_api.get_fast_api_app
│ ├── a2a_utils.py ← agent-card URL rewriting middleware
│ └── authenticated_httpx.py ← Cloud Run identity-token client
├── agents/
│ ├── researcher/
│ │ ├── agent.py ← Agent + FunctionTool calling data_tool
│ │ ├── data_tool.py ← pandas: per-SKU + seasonal + aging metrics
│ │ ├── adk_app.py ← thin wrapper to shared.adk_app.main
│ │ └── Dockerfile
│ ├── judge/
│ │ ├── agent.py ← Pydantic JudgeVerdict structured output
│ │ ├── adk_app.py
│ │ └── Dockerfile
│ ├── content_builder/
│ │ ├── agent.py ← writes the final Markdown brief
│ │ └── adk_app.py
│ └── orchestrator/
│ ├── agent.py ← LoopAgent + SequentialAgent + EscalationChecker
│ └── adk_app.py
├── app/ ← web app (no ADK imports)
│ ├── main.py ← FastAPI thin client over /run_sse
│ └── frontend/
│ ├── index.html ← single ask bar
│ ├── app.js ← progress strip + Markdown render
│ └── style.css
├── scripts/ ← per-agent runner .bat files
└── sample_briefs/ ← 5 captured briefs (the static demo data)
🧠 Code Walkthrough — The Five Important Files
Skip the boilerplate. These are the files that show how the system actually works.
1️⃣ Researcher — calls a deterministic pandas tool
The Researcher's job is mostly to tell Gemini "call get_sales_metrics and pass the result through verbatim." All the analytical work happens in data_tool.py — exact aggregates, exact rankings, exact aging classification.
from google.adk.agents import Agent
from google.adk.tools import FunctionTool
from data_tool import analyse_sales_data_summary
MODEL = os.environ.get("BOOKSHELF_MODEL", "gemini-2.5-flash")
def get_sales_metrics() -> str:
"""Read the bookshelf sales dataset and return structured metrics as a JSON string."""
return analyse_sales_data_summary(top_n=30)
sales_metrics_tool = FunctionTool(func=get_sales_metrics)
researcher = Agent(
name="researcher",
model=MODEL,
description=("Reads the bookshelf sales dataset and produces structured metrics."),
instruction=(
"You are the Data Researcher for Bookshelf...\n"
"1. Call the `get_sales_metrics` tool to load the latest sales analytics.\n"
"2. Return the JSON string from the tool VERBATIM as your response."
),
tools=[sales_metrics_tool],
)
root_agent = researcher
The data_tool is what makes the numbers exact
The agent prompt does the routing; the pandas code does the math. This is the difference between Bookshelf and a chat-on-document tool — aggregates are computed before the LLM ever sees them.
last_sale = df.groupby("Product")["OrderDate"].max().reset_index()
last_sale.columns = ["Product", "_last_sale_dt"]
sku_agg = sku_agg.merge(last_sale, on="Product", how="left")
data_max_date = df["OrderDate"].max()
sku_agg["days_since_last_sale"] = (data_max_date - sku_agg["_last_sale_dt"]).dt.days.astype(int)
def _aging_class(days: int) -> str:
if days <= 30: return "fresh"
if days <= 90: return "slowing"
if days <= 180: return "stale"
return "stuck"
sku_agg["aging_class"] = sku_agg["days_since_last_sale"].apply(_aging_class)
2️⃣ Judge — Pydantic structured output as a quality gate
The Judge does not free-text its verdict. It emits a typed JudgeVerdict with status / issues / confidence / feedback. The orchestrator's EscalationChecker reads verdict.status directly — no string parsing, no regex.
from typing import Literal
from google.adk.agents import Agent
from pydantic import BaseModel, Field
class JudgeVerdict(BaseModel):
status: Literal["pass", "fail"] = Field(...)
issues: list[str] = Field(default_factory=list)
confidence: float = Field(ge=0.0, le=1.0)
feedback: str = Field(...)
judge = Agent(
name="judge",
model=MODEL,
description=("Validates the Researcher's metrics for completeness and red flags."),
instruction=("You are a strict, independent forensic auditor...\n"
"Check completeness, data quality, plausibility, coverage, duplicate risk.\n"
"Return JudgeVerdict. Set status='pass' unless serious problems."),
output_schema=JudgeVerdict,
disallow_transfer_to_parent=True,
disallow_transfer_to_peers=True,
)
root_agent = judge
3️⃣ Orchestrator — the LoopAgent + EscalationChecker pattern
This is the workshop's core "specialised agents + quality loop" pattern. LoopAgent retries the Researcher → Judge → EscalationChecker chain up to 3 times. The EscalationChecker is a 5-line deterministic Python class that decides whether to break the loop. Not every component needs an LLM.
from google.adk.agents import BaseAgent, LoopAgent, SequentialAgent
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.adk.events import Event, EventActions
researcher = RemoteA2aAgent(
name="researcher",
agent_card=os.environ["RESEARCHER_AGENT_CARD_URL"],
after_agent_callback=create_save_output_callback("research_findings"),
httpx_client=create_authenticated_client(researcher_url),
)
class EscalationChecker(BaseAgent):
async def _run_async_impl(self, ctx):
feedback = ctx.session.state.get("judge_feedback") or {}
is_pass = isinstance(feedback, dict) and feedback.get("status") == "pass"
if is_pass:
yield Event(author=self.name, actions=EventActions(escalate=True))
else:
yield Event(author=self.name)
research_loop = LoopAgent(
name="research_loop",
sub_agents=[researcher, judge, EscalationChecker(name="escalation_checker")],
max_iterations=3,
)
root_agent = SequentialAgent(
name="bookshelf_pipeline",
sub_agents=[research_loop, content_builder],
)
4️⃣ Web App — thin HTTP/SSE client (no ADK imports)
The most important architectural decision. The web app does NOT instantiate Runner in-process. It speaks HTTP/SSE to the orchestrator service. This means the web app and the agents can scale independently, and the web app stays small.
from httpx_sse import aconnect_sse
from google.genai import types as genai_types
async def query_agent(origin, agent, user_id, message, session_id):
client = await get_client(origin)
request = {
"appName": agent,
"userId": user_id,
"sessionId": session_id,
"newMessage": {"role": "user", "parts": [{"text": message}]},
"streaming": False,
}
async with aconnect_sse(client, "POST", f"{origin}/run_sse", json=request) as es:
async for server_event in es.aiter_sse():
yield server_event.json()
async def event_generator():
async for event in events:
if event["author"] == "researcher":
yield json.dumps({"type": "progress", "text": "🔍 Researcher reading data..."}) + "\n"
if event.get("author") == "content_builder" and event.get("content"):
content = genai_types.Content.model_validate(event["content"])
final_text += content.parts[0].text
5️⃣ Content Builder — reasons over the metrics + question
Receives research_findings (the metrics JSON), judge_feedback (the verdict), and the user's question via session state. Classifies SKUs into the 6-bucket taxonomy and writes a focused 600-word brief.
instruction=(
"You are the Bookshelf business advisor for a Malaysian SME book shop owner.\n\n"
"INPUTS (in session state):\n"
"- 'research_findings' — JSON with sku_metrics, seasonal_indices, channel_breakdown, etc.\n"
"- 'judge_feedback' — the Judge's verdict\n"
"- The user's original message — what the shop owner asked.\n\n"
"Classify relevant SKUs into one of 6 actions:\n"
"- push: top-quartile revenue, healthy margin (>30%), velocity > 50 u/mo\n"
"- hold: steady, no concerning signals\n"
"- drop: declining velocity OR margin <15%\n"
"- restock-seasonal: clear seasonal_indices peak in next 90 days\n"
"- discontinue: velocity <3 u/mo AND no future demand (e.g. discontinued exam syllabus,\n"
" or aging_class='stale'/'stuck' meaning >90 days since last sale)\n"
"- source-similar: category gap visible\n\n"
"AGING SIGNAL: each SKU has aging_class (fresh/slowing/stale/stuck)...\n"
"STYLE: Plain English. Every recommendation cites a specific RM/%/units/month.\n"
"Return ONLY the Markdown brief — no preamble, no closing remarks."
)
🔧 Implementation Notes Worth Mentioning
Pinning ADK to a known-good version
Initial build used google-adk >= 0.1.0 with no upper bound. ADK 1.28+ removed APIs the code depended on. Fix: pin to >= 1.27.4, < 2.0.0 — same as the Course Creator reference build.
UTF-8 mode on Windows
ADK's CLI uses Python's default write_text(), which on Windows is cp1252. Em-dashes in agent instructions wrote as 0x97, then re-read as UTF-8 → crash. Fix: set PYTHONUTF8=1 in start.bat before any Python call.
Localhost auth bypass
Course Creator's authenticated_httpx requires Google Cloud identity tokens for inter-agent calls — perfect for Cloud Run, but local dev fails because fetch_id_token_credentials can't auth without ADC. Wrapped: when the URL is localhost/127.0.0.1, return a plain httpx.AsyncClient; otherwise call create_authenticated_client.
Aging slice in the trim window
Sending all 385 SKUs to the LLM blows context. Default trim is top 30 + bottom 30 by revenue. But aging-stale SKUs (e.g. Pakej UPSR Lengkap at RM 6,560) sit mid-revenue and got dropped. Fix: analyse_sales_data_summary also keeps every SKU with aging_class in {slowing, stale, stuck}, deduplicated.