🤝 Course Creator

Quality content built by a coordinated AI agent team

Google ADK A2A Protocol Gemini 2.5 Pro Python Cloud Run

🔍 About This Code Showcase

This page walks through the five files that define the multi-agent pipeline: the four agents (Researcher, Judge, Content Builder, Orchestrator) and the shared authenticated HTTP client that lets them call each other securely on Cloud Run.

The pattern is reusable — swap the agent prompts and you have a generic gather → validate → build → deliver system. Deployment scripts and Dockerfiles are in the GitHub repo.

📁 Project Structure

course-creator/ ├── agents/ │ ├── researcher/ # Researcher agent — Google Search │ │ ├── agent.py │ │ ├── adk_app.py │ │ └── Dockerfile │ ├── judge/ # Judge agent — Pydantic-structured verdict │ │ ├── agent.py │ │ └── Dockerfile │ ├── content_builder/ # Content Builder — synthesises the course │ │ ├── agent.py │ │ └── Dockerfile │ └── orchestrator/ # Orchestrator — wires the others together │ ├── agent.py │ └── Dockerfile ├── app/ # FastAPI web frontend (5th Cloud Run service) │ ├── main.py │ ├── frontend/ │ └── Dockerfile ├── shared/ # Files symlinked into every agent │ ├── a2a_utils.py │ ├── adk_app.py │ └── authenticated_httpx.py ├── deploy.sh # One-shot deploy of all 5 Cloud Run services ├── run_local.sh # Start all agents + app locally on ports 8000–8004 └── pyproject.toml

🔍 Researcher Agent — Google Search Tool

The Researcher is the simplest of the four. It is a standard ADK Agent with one tool: google_search. Its only job is to gather web information and produce a clear summary that the Judge will then evaluate.

📄 agents/researcher/agent.py
from google.adk.agents import Agent from google.adk.tools.google_search_tool import google_search MODEL = "gemini-2.5-pro" researcher = Agent( name="researcher", model=MODEL, description="Gathers information on a topic using Google Search.", instruction=""" You are an expert researcher. Your goal is to find comprehensive and accurate information on the user's topic. Use the `google_search` tool to find relevant information. Summarize your findings clearly. If you receive feedback that your research is insufficient, use the feedback to refine your next search. """, tools=[google_search], ) root_agent = researcher

⚖️ Judge Agent — Pydantic-Structured Verdict

The Judge is the quality gate. Instead of returning free-form prose, it returns a JudgeFeedback Pydantic schema with a status field set to either pass or fail. This deterministic shape is what lets the Orchestrator branch the loop without any text parsing.

📄 agents/judge/agent.py
from typing import Literal from google.adk.agents import Agent from pydantic import BaseModel, Field MODEL = "gemini-2.5-pro" # 1. Structured output schema — the Orchestrator depends on this exact shape class JudgeFeedback(BaseModel): """Structured feedback from the Judge agent.""" status: Literal["pass", "fail"] = Field( description="Whether the research is sufficient ('pass') or needs more work ('fail')." ) feedback: str = Field( description="Detailed feedback on what is missing. If 'pass', a brief confirmation." ) # 2. The agent itself — note the strict editor instruction and disallow_transfer flags judge = Agent( name="judge", model=MODEL, description="Evaluates research findings for completeness and accuracy.", instruction=""" You are a strict editor. Evaluate the 'research_findings' against the user's original request. If the findings are missing key info, return status='fail'. If they are comprehensive, return status='pass'. """, output_schema=JudgeFeedback, # Disallow delegation — this agent only emits the schema disallow_transfer_to_parent=True, disallow_transfer_to_peers=True, ) root_agent = judge

🎯 Orchestrator — A2A Composition with LoopAgent

This is the heart of the system. The Orchestrator does not run the other agents in-process — it calls them as remote services using RemoteA2aAgent. It then composes them with LoopAgent (research + judge cycle) and SequentialAgent (loop → content builder).

📄 agents/orchestrator/agent.py — Remote agent connections
import os from google.adk.agents import BaseAgent, LoopAgent, SequentialAgent from google.adk.agents.remote_a2a_agent import RemoteA2aAgent from authenticated_httpx import create_authenticated_client # Each remote agent is just a URL — A2A handles the rest researcher_url = os.environ.get( "RESEARCHER_AGENT_CARD_URL", "http://localhost:8001/a2a/agent/.well-known/agent-card.json", ) researcher = RemoteA2aAgent( name="researcher", agent_card=researcher_url, description="Gathers information using Google Search.", after_agent_callback=create_save_output_callback("research_findings"), httpx_client=create_authenticated_client(researcher_url), ) judge_url = os.environ.get("JUDGE_AGENT_CARD_URL", "http://localhost:8002/a2a/agent/.well-known/agent-card.json") judge = RemoteA2aAgent( name="judge", agent_card=judge_url, description="Evaluates research.", after_agent_callback=create_save_output_callback("judge_feedback"), httpx_client=create_authenticated_client(judge_url), ) content_builder_url = os.environ.get("CONTENT_BUILDER_AGENT_CARD_URL", "http://localhost:8003/a2a/agent/.well-known/agent-card.json") content_builder = RemoteA2aAgent( name="content_builder", agent_card=content_builder_url, description="Builds the course.", httpx_client=create_authenticated_client(content_builder_url), )
📄 agents/orchestrator/agent.py — Custom EscalationChecker
# Custom BaseAgent — reads the Judge's verdict, decides whether to break the loop class EscalationChecker(BaseAgent): """Checks the judge's feedback and escalates (breaks the loop) if it passed.""" async def _run_async_impl(self, ctx: InvocationContext): feedback = ctx.session.state.get("judge_feedback") is_pass = False if isinstance(feedback, dict) and feedback.get("status") == "pass": is_pass = True if is_pass: # escalate=True tells the parent LoopAgent to STOP looping yield Event(author=self.name, actions=EventActions(escalate=True)) else: # Continue — LoopAgent will run another iteration yield Event(author=self.name) escalation_checker = EscalationChecker(name="escalation_checker")
📄 agents/orchestrator/agent.py — Pipeline composition
# LoopAgent runs Researcher → Judge → EscalationChecker repeatedly # until the EscalationChecker yields escalate=True, OR max_iterations is hit research_loop = LoopAgent( name="research_loop", description="Iteratively researches and judges until quality standards are met.", sub_agents=[researcher, judge, escalation_checker], max_iterations=3, ) # SequentialAgent runs the research loop, then hands off to the Content Builder root_agent = SequentialAgent( name="course_creation_pipeline", description="A pipeline that researches a topic and then builds a course from it.", sub_agents=[research_loop, content_builder], )

🔐 Service-to-Service Auth — authenticated_httpx

This is what makes the A2A architecture safe in production. Every HTTP call between agents on Cloud Run carries a Google identity token. The receiving service rejects calls that aren't signed by an authorised service account.

📄 shared/authenticated_httpx.py
def create_authenticated_client(remote_service_url: str, timeout: float = 600.0): """httpx.AsyncClient that signs each request with a Google identity token. In Cloud Run: token comes from the Compute Metadata server. Locally: token comes from the gcloud CLI. """ class _IdentityTokenAuth(httpx.Auth): def __init__(self, remote_service_url: str): parsed_url = urlparse(remote_service_url) self.root_url = f"{parsed_url.scheme}://{parsed_url.netloc}" self.session = None def auth_flow(self, request): if self.session: id_token = self.session.credentials.token else: # In Cloud Run — fetch from metadata server credentials = fetch_id_token_credentials(audience=self.root_url) credentials.refresh(Request()) self.session = AuthorizedSession(credentials) id_token = self.session.credentials.token if id_token: request.headers["Authorization"] = f"Bearer {id_token}" yield request return httpx.AsyncClient(auth=_IdentityTokenAuth(remote_service_url), follow_redirects=True, timeout=timeout)

⚙️ Technical Implementation Notes

Why a LoopAgent + EscalationChecker?

Why RemoteA2aAgent instead of in-process imports?

Why Vertex AI instead of API keys?