🔍 About This Code Showcase
This page walks through the five files that define the multi-agent pipeline: the four agents (Researcher, Judge, Content Builder, Orchestrator) and the shared authenticated HTTP client that lets them call each other securely on Cloud Run.
The pattern is reusable — swap the agent prompts and you have a generic gather → validate → build → deliver system. Deployment scripts and Dockerfiles are in the GitHub repo.
📁 Project Structure
course-creator/
├── agents/
│ ├── researcher/ # Researcher agent — Google Search
│ │ ├── agent.py
│ │ ├── adk_app.py
│ │ └── Dockerfile
│ ├── judge/ # Judge agent — Pydantic-structured verdict
│ │ ├── agent.py
│ │ └── Dockerfile
│ ├── content_builder/ # Content Builder — synthesises the course
│ │ ├── agent.py
│ │ └── Dockerfile
│ └── orchestrator/ # Orchestrator — wires the others together
│ ├── agent.py
│ └── Dockerfile
├── app/ # FastAPI web frontend (5th Cloud Run service)
│ ├── main.py
│ ├── frontend/
│ └── Dockerfile
├── shared/ # Files symlinked into every agent
│ ├── a2a_utils.py
│ ├── adk_app.py
│ └── authenticated_httpx.py
├── deploy.sh # One-shot deploy of all 5 Cloud Run services
├── run_local.sh # Start all agents + app locally on ports 8000–8004
└── pyproject.toml
🔍 Researcher Agent — Google Search Tool
The Researcher is the simplest of the four. It is a standard ADK Agent with one tool: google_search. Its only job is to gather web information and produce a clear summary that the Judge will then evaluate.
from google.adk.agents import Agent
from google.adk.tools.google_search_tool import google_search
MODEL = "gemini-2.5-pro"
researcher = Agent(
name="researcher",
model=MODEL,
description="Gathers information on a topic using Google Search.",
instruction="""
You are an expert researcher. Your goal is to find comprehensive and
accurate information on the user's topic.
Use the `google_search` tool to find relevant information.
Summarize your findings clearly.
If you receive feedback that your research is insufficient,
use the feedback to refine your next search.
""",
tools=[google_search],
)
root_agent = researcher
⚖️ Judge Agent — Pydantic-Structured Verdict
The Judge is the quality gate. Instead of returning free-form prose, it returns a JudgeFeedback Pydantic schema with a status field set to either pass or fail. This deterministic shape is what lets the Orchestrator branch the loop without any text parsing.
from typing import Literal
from google.adk.agents import Agent
from pydantic import BaseModel, Field
MODEL = "gemini-2.5-pro"
class JudgeFeedback(BaseModel):
"""Structured feedback from the Judge agent."""
status: Literal["pass", "fail"] = Field(
description="Whether the research is sufficient ('pass') or needs more work ('fail')."
)
feedback: str = Field(
description="Detailed feedback on what is missing. If 'pass', a brief confirmation."
)
judge = Agent(
name="judge",
model=MODEL,
description="Evaluates research findings for completeness and accuracy.",
instruction="""
You are a strict editor.
Evaluate the 'research_findings' against the user's original request.
If the findings are missing key info, return status='fail'.
If they are comprehensive, return status='pass'.
""",
output_schema=JudgeFeedback,
disallow_transfer_to_parent=True,
disallow_transfer_to_peers=True,
)
root_agent = judge
🎯 Orchestrator — A2A Composition with LoopAgent
This is the heart of the system. The Orchestrator does not run the other agents in-process — it calls them as remote services using RemoteA2aAgent. It then composes them with LoopAgent (research + judge cycle) and SequentialAgent (loop → content builder).
import os
from google.adk.agents import BaseAgent, LoopAgent, SequentialAgent
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from authenticated_httpx import create_authenticated_client
researcher_url = os.environ.get(
"RESEARCHER_AGENT_CARD_URL",
"http://localhost:8001/a2a/agent/.well-known/agent-card.json",
)
researcher = RemoteA2aAgent(
name="researcher",
agent_card=researcher_url,
description="Gathers information using Google Search.",
after_agent_callback=create_save_output_callback("research_findings"),
httpx_client=create_authenticated_client(researcher_url),
)
judge_url = os.environ.get("JUDGE_AGENT_CARD_URL", "http://localhost:8002/a2a/agent/.well-known/agent-card.json")
judge = RemoteA2aAgent(
name="judge",
agent_card=judge_url,
description="Evaluates research.",
after_agent_callback=create_save_output_callback("judge_feedback"),
httpx_client=create_authenticated_client(judge_url),
)
content_builder_url = os.environ.get("CONTENT_BUILDER_AGENT_CARD_URL", "http://localhost:8003/a2a/agent/.well-known/agent-card.json")
content_builder = RemoteA2aAgent(
name="content_builder",
agent_card=content_builder_url,
description="Builds the course.",
httpx_client=create_authenticated_client(content_builder_url),
)
class EscalationChecker(BaseAgent):
"""Checks the judge's feedback and escalates (breaks the loop) if it passed."""
async def _run_async_impl(self, ctx: InvocationContext):
feedback = ctx.session.state.get("judge_feedback")
is_pass = False
if isinstance(feedback, dict) and feedback.get("status") == "pass":
is_pass = True
if is_pass:
yield Event(author=self.name, actions=EventActions(escalate=True))
else:
yield Event(author=self.name)
escalation_checker = EscalationChecker(name="escalation_checker")
research_loop = LoopAgent(
name="research_loop",
description="Iteratively researches and judges until quality standards are met.",
sub_agents=[researcher, judge, escalation_checker],
max_iterations=3,
)
root_agent = SequentialAgent(
name="course_creation_pipeline",
description="A pipeline that researches a topic and then builds a course from it.",
sub_agents=[research_loop, content_builder],
)
🔐 Service-to-Service Auth — authenticated_httpx
This is what makes the A2A architecture safe in production. Every HTTP call between agents on Cloud Run carries a Google identity token. The receiving service rejects calls that aren't signed by an authorised service account.
def create_authenticated_client(remote_service_url: str, timeout: float = 600.0):
"""httpx.AsyncClient that signs each request with a Google identity token.
In Cloud Run: token comes from the Compute Metadata server.
Locally: token comes from the gcloud CLI.
"""
class _IdentityTokenAuth(httpx.Auth):
def __init__(self, remote_service_url: str):
parsed_url = urlparse(remote_service_url)
self.root_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
self.session = None
def auth_flow(self, request):
if self.session:
id_token = self.session.credentials.token
else:
credentials = fetch_id_token_credentials(audience=self.root_url)
credentials.refresh(Request())
self.session = AuthorizedSession(credentials)
id_token = self.session.credentials.token
if id_token:
request.headers["Authorization"] = f"Bearer {id_token}"
yield request
return httpx.AsyncClient(auth=_IdentityTokenAuth(remote_service_url), follow_redirects=True, timeout=timeout)