← Back to Showcase

📚 Bookshelf — Source Code

Multi-Agent Inventory Planner for Malaysian Bookstores

📁 Project Structure

Each agent is its own self-contained service with its own agent.py (the ADK agent definition) and adk_app.py (a thin wrapper invoking the shared CLI). The web app does NOT import ADK — it talks to the Orchestrator service over HTTP/SSE.

projects/bookshelf/ ├── pyproject.toml ← deps pinned to ADK 1.27.x + a2a-sdk 0.3.x ├── start.bat ← spawns 5 services on Windows ├── stop.bat ← kills processes on ports 8000-8004 ├── README.md ├── .env.example ├── shared/ ← Course Creator's shared infra verbatim │ ├── adk_app.py ← click CLI wrapping ADK's fast_api.get_fast_api_app │ ├── a2a_utils.py ← agent-card URL rewriting middleware │ └── authenticated_httpx.py ← Cloud Run identity-token client ├── agents/ │ ├── researcher/ │ │ ├── agent.py ← Agent + FunctionTool calling data_tool │ │ ├── data_tool.py ← pandas: per-SKU + seasonal + aging metrics │ │ ├── adk_app.py ← thin wrapper to shared.adk_app.main │ │ └── Dockerfile │ ├── judge/ │ │ ├── agent.py ← Pydantic JudgeVerdict structured output │ │ ├── adk_app.py │ │ └── Dockerfile │ ├── content_builder/ │ │ ├── agent.py ← writes the final Markdown brief │ │ └── adk_app.py │ └── orchestrator/ │ ├── agent.py ← LoopAgent + SequentialAgent + EscalationChecker │ └── adk_app.py ├── app/ ← web app (no ADK imports) │ ├── main.py ← FastAPI thin client over /run_sse │ └── frontend/ │ ├── index.html ← single ask bar │ ├── app.js ← progress strip + Markdown render │ └── style.css ├── scripts/ ← per-agent runner .bat files └── sample_briefs/ ← 5 captured briefs (the static demo data)

🧠 Code Walkthrough — The Five Important Files

Skip the boilerplate. These are the files that show how the system actually works.

1️⃣ Researcher — calls a deterministic pandas tool

The Researcher's job is mostly to tell Gemini "call get_sales_metrics and pass the result through verbatim." All the analytical work happens in data_tool.py — exact aggregates, exact rankings, exact aging classification.

📄 agents/researcher/agent.py
from google.adk.agents import Agent from google.adk.tools import FunctionTool from data_tool import analyse_sales_data_summary MODEL = os.environ.get("BOOKSHELF_MODEL", "gemini-2.5-flash") def get_sales_metrics() -> str: """Read the bookshelf sales dataset and return structured metrics as a JSON string.""" return analyse_sales_data_summary(top_n=30) sales_metrics_tool = FunctionTool(func=get_sales_metrics) researcher = Agent( name="researcher", model=MODEL, description=("Reads the bookshelf sales dataset and produces structured metrics."), instruction=( "You are the Data Researcher for Bookshelf...\n" "1. Call the `get_sales_metrics` tool to load the latest sales analytics.\n" "2. Return the JSON string from the tool VERBATIM as your response." ), tools=[sales_metrics_tool], ) root_agent = researcher

The data_tool is what makes the numbers exact

The agent prompt does the routing; the pandas code does the math. This is the difference between Bookshelf and a chat-on-document tool — aggregates are computed before the LLM ever sees them.

📄 agents/researcher/data_tool.py — aging classification (excerpt)
last_sale = df.groupby("Product")["OrderDate"].max().reset_index() last_sale.columns = ["Product", "_last_sale_dt"] sku_agg = sku_agg.merge(last_sale, on="Product", how="left") data_max_date = df["OrderDate"].max() sku_agg["days_since_last_sale"] = (data_max_date - sku_agg["_last_sale_dt"]).dt.days.astype(int) def _aging_class(days: int) -> str: if days <= 30: return "fresh" if days <= 90: return "slowing" if days <= 180: return "stale" return "stuck" sku_agg["aging_class"] = sku_agg["days_since_last_sale"].apply(_aging_class)

2️⃣ Judge — Pydantic structured output as a quality gate

The Judge does not free-text its verdict. It emits a typed JudgeVerdict with status / issues / confidence / feedback. The orchestrator's EscalationChecker reads verdict.status directly — no string parsing, no regex.

📄 agents/judge/agent.py
from typing import Literal from google.adk.agents import Agent from pydantic import BaseModel, Field class JudgeVerdict(BaseModel): status: Literal["pass", "fail"] = Field(...) issues: list[str] = Field(default_factory=list) confidence: float = Field(ge=0.0, le=1.0) feedback: str = Field(...) judge = Agent( name="judge", model=MODEL, description=("Validates the Researcher's metrics for completeness and red flags."), instruction=("You are a strict, independent forensic auditor...\n" "Check completeness, data quality, plausibility, coverage, duplicate risk.\n" "Return JudgeVerdict. Set status='pass' unless serious problems."), output_schema=JudgeVerdict, # forces structured decoding disallow_transfer_to_parent=True, disallow_transfer_to_peers=True, ) root_agent = judge

3️⃣ Orchestrator — the LoopAgent + EscalationChecker pattern

This is the workshop's core "specialised agents + quality loop" pattern. LoopAgent retries the Researcher → Judge → EscalationChecker chain up to 3 times. The EscalationChecker is a 5-line deterministic Python class that decides whether to break the loop. Not every component needs an LLM.

📄 agents/orchestrator/agent.py
from google.adk.agents import BaseAgent, LoopAgent, SequentialAgent from google.adk.agents.remote_a2a_agent import RemoteA2aAgent from google.adk.events import Event, EventActions # --- Three remote agents reachable via A2A --- researcher = RemoteA2aAgent( name="researcher", agent_card=os.environ["RESEARCHER_AGENT_CARD_URL"], after_agent_callback=create_save_output_callback("research_findings"), httpx_client=create_authenticated_client(researcher_url), ) # judge, content_builder defined the same way... # --- 5-line deterministic checker that breaks the loop on pass --- class EscalationChecker(BaseAgent): async def _run_async_impl(self, ctx): feedback = ctx.session.state.get("judge_feedback") or {} is_pass = isinstance(feedback, dict) and feedback.get("status") == "pass" if is_pass: yield Event(author=self.name, actions=EventActions(escalate=True)) else: yield Event(author=self.name) # --- The pipeline --- research_loop = LoopAgent( name="research_loop", sub_agents=[researcher, judge, EscalationChecker(name="escalation_checker")], max_iterations=3, ) root_agent = SequentialAgent( name="bookshelf_pipeline", sub_agents=[research_loop, content_builder], )

4️⃣ Web App — thin HTTP/SSE client (no ADK imports)

The most important architectural decision. The web app does NOT instantiate Runner in-process. It speaks HTTP/SSE to the orchestrator service. This means the web app and the agents can scale independently, and the web app stays small.

📄 app/main.py — talking to the orchestrator over SSE
from httpx_sse import aconnect_sse from google.genai import types as genai_types async def query_agent(origin, agent, user_id, message, session_id): client = await get_client(origin) request = { "appName": agent, "userId": user_id, "sessionId": session_id, "newMessage": {"role": "user", "parts": [{"text": message}]}, "streaming": False, } async with aconnect_sse(client, "POST", f"{origin}/run_sse", json=request) as es: async for server_event in es.aiter_sse(): yield server_event.json() # Per-author progress messages stream to the frontend as ndjson lines async def event_generator(): async for event in events: if event["author"] == "researcher": yield json.dumps({"type": "progress", "text": "🔍 Researcher reading data..."}) + "\n" # ...etc for judge, content_builder... if event.get("author") == "content_builder" and event.get("content"): content = genai_types.Content.model_validate(event["content"]) final_text += content.parts[0].text

5️⃣ Content Builder — reasons over the metrics + question

Receives research_findings (the metrics JSON), judge_feedback (the verdict), and the user's question via session state. Classifies SKUs into the 6-bucket taxonomy and writes a focused 600-word brief.

📄 agents/content_builder/agent.py — the 6-class taxonomy in the system prompt
instruction=( "You are the Bookshelf business advisor for a Malaysian SME book shop owner.\n\n" "INPUTS (in session state):\n" "- 'research_findings' — JSON with sku_metrics, seasonal_indices, channel_breakdown, etc.\n" "- 'judge_feedback' — the Judge's verdict\n" "- The user's original message — what the shop owner asked.\n\n" "Classify relevant SKUs into one of 6 actions:\n" "- push: top-quartile revenue, healthy margin (>30%), velocity > 50 u/mo\n" "- hold: steady, no concerning signals\n" "- drop: declining velocity OR margin <15%\n" "- restock-seasonal: clear seasonal_indices peak in next 90 days\n" "- discontinue: velocity <3 u/mo AND no future demand (e.g. discontinued exam syllabus,\n" " or aging_class='stale'/'stuck' meaning >90 days since last sale)\n" "- source-similar: category gap visible\n\n" "AGING SIGNAL: each SKU has aging_class (fresh/slowing/stale/stuck)...\n" "STYLE: Plain English. Every recommendation cites a specific RM/%/units/month.\n" "Return ONLY the Markdown brief — no preamble, no closing remarks." )

🔧 Implementation Notes Worth Mentioning

Pinning ADK to a known-good version

Initial build used google-adk >= 0.1.0 with no upper bound. ADK 1.28+ removed APIs the code depended on. Fix: pin to >= 1.27.4, < 2.0.0 — same as the Course Creator reference build.

UTF-8 mode on Windows

ADK's CLI uses Python's default write_text(), which on Windows is cp1252. Em-dashes in agent instructions wrote as 0x97, then re-read as UTF-8 → crash. Fix: set PYTHONUTF8=1 in start.bat before any Python call.

Localhost auth bypass

Course Creator's authenticated_httpx requires Google Cloud identity tokens for inter-agent calls — perfect for Cloud Run, but local dev fails because fetch_id_token_credentials can't auth without ADC. Wrapped: when the URL is localhost/127.0.0.1, return a plain httpx.AsyncClient; otherwise call create_authenticated_client.

Aging slice in the trim window

Sending all 385 SKUs to the LLM blows context. Default trim is top 30 + bottom 30 by revenue. But aging-stale SKUs (e.g. Pakej UPSR Lengkap at RM 6,560) sit mid-revenue and got dropped. Fix: analyse_sales_data_summary also keeps every SKU with aging_class in {slowing, stale, stuck}, deduplicated.