This project demonstrates an architecture where user queries are submitted via Google Cloud Pub/Sub, processed by an ADK-based agent, and the responses are published back to another Pub/Sub topic. The agent interaction is handled by a FastAPI application designed to be deployed as a Google Cloud Run service.
/Users/ptone/dev/adk/projects/pubsub-demo/
├───.env # Environment variables (GOOGLE_CLOUD_PROJECT, GOOGLE_CLOUD_LOCATION, etc)
├───publish_query.py # Utility to send a query to the 'user-queries' topic
├───pubsub_watcher.py # Utility to watch for messages on the 'agent_response-sub' subscription
├───requirements.txt # Python dependencies for utility scripts
├───README.md # This file
├───Dockerfile # Dockerfile for building the handler service
├───pubsub_handler/ # FastAPI application for handling Pub/Sub messages
│ ├───main.py # FastAPI application logic
└───subber/ # ADK Agent implementation
├───__init__.py
└───agent.py # ADK agent logic
- Python 3.9+
- Google Cloud SDK (
gcloud
CLI) installed and authenticated. - A Google Cloud Project.
-
Clone the repository (if applicable) or ensure you are in the project root directory.
-
Create and activate a Python virtual environment:
python3 -m venv .venv source .venv/bin/activate
-
Install Python dependencies:
pip install -r requirements.txt
-
Configure Environment Variables: Create a
.env
file in the project root with your Google Cloud Project details:GOOGLE_CLOUD_PROJECT=your-gcp-project-id GOOGLE_CLOUD_LOCATION=your-gcp-region # e.g., us-central1 # GOOGLE_GENAI_USE_VERTEXAI=1 # If using Vertex AI for GenAI models
Replace
your-gcp-project-id
andyour-gcp-region
with your actual project ID and preferred region.
This project uses two Pub/Sub topics:
user-queries
: For publishing incoming user queries.agent_response
: For publishing the agent's final responses.
And one subscription:
agent_response-sub
: For thepubsub_watcher.py
script to listen for agent responses.- (Another subscription will be created by Cloud Run to trigger the
pubsub_handler
service from theuser-queries
topic).
Create the Pub/Sub Topics:
Use the following gcloud
commands, replacing ptone-misc
with your GOOGLE_CLOUD_PROJECT
if different.
gcloud pubsub topics create user-queries --project=ptone-misc
gcloud pubsub topics create agent_response --project=ptone-misc
Create the Subscription for pubsub_watcher.py
:
This subscription allows the pubsub_watcher.py
script to receive messages from the agent_response
topic.
gcloud pubsub subscriptions create agent_response-sub --topic=agent_response --project=ptone-misc
This script sends a message (your query) to the user-queries
Pub/Sub topic.
Usage:
python publish_query.py "Your query string here"
Example:
python publish_query.py "How do I bake a cake?"
This script listens to the agent_response-sub
subscription and prints any messages received from the agent_response
topic.
Usage:
python pubsub_watcher.py
It will keep running and display messages as they arrive. Press Ctrl+C
to stop.
The pubsub_handler
is a FastAPI application that:
- Is triggered by messages on the
user-queries
Pub/Sub topic. - Processes the message using the ADK agent (
subber/agent.py
). - Publishes the agent's response to the
agent_response
Pub/Sub topic.
Deployment Steps:
-
Enable Google Cloud APIs: Ensure you have the Cloud Run API, Cloud Build API, and Pub/Sub API enabled in your Google Cloud project.
gcloud services enable run.googleapis.com cloudbuild.googleapis.com pubsub.googleapis.com
-
Deploy to Cloud Run: This command will build and deploy the service from the source code in the current directory. Replace placeholders like
your-gcp-project-id
andyour-gcp-region
as needed. The--service-account
flag should specify a service account with permissions for Pub/Sub (e.g.,Pub/Sub Publisher
for the response topic, and implicitlyPub/Sub Subscriber
via the push subscription). It also needs permissions to invoke the ADK agent if it calls other Google Cloud services.gcloud run deploy pubsub-demo \ --source . \ --platform=managed \ --region=${GOOGLE_CLOUD_LOCATION} \ --set-env-vars="GOOGLE_CLOUD_PROJECT=${GOOGLE_CLOUD_PROJECT}" \ --set-env-vars="GOOGLE_CLOUD_LOCATION=${GOOGLE_CLOUD_LOCATION}" \ --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=true" \ --project=${GOOGLE_CLOUD_PROJECT} # --service-account=your-service-account-email@your-gcp-project-id.iam.gserviceaccount.com \
-
Create a Pub/Sub Push Subscription to Trigger Cloud Run: This command creates a new Pub/Sub subscription (
user-queries-cloud-run-sub
) to theuser-queries
topic. Messages sent touser-queries
will be pushed to your deployed Cloud Run service.- Get the URL of your deployed Cloud Run service:
SERVICE_URL=$(gcloud run services describe pubsub-demo --platform managed --region ${GOOGLE_CLOUD_LOCATION} --format 'value(status.url)') echo $SERVICE_URL
- Create a service account for Pub/Sub to use when invoking your Cloud Run service (if you don't have one already with
run.invoker
role):gcloud iam service-accounts create pubsub-cloud-run-invoker --display-name "Pub/Sub Cloud Run Invoker" --project=${GOOGLE_CLOUD_PROJECT} gcloud run services add-iam-policy-binding pubsub-demo \ --member="serviceAccount:pubsub-cloud-run-invoker@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com" \ --role="roles/run.invoker" \ --region=${GOOGLE_CLOUD_LOCATION} \ --platform=managed \ --project=${GOOGLE_CLOUD_PROJECT}
- Create the push subscription:
gcloud pubsub subscriptions create user-queries-cloud-run-sub \ --topic=user-queries \ --push-endpoint=${SERVICE_URL}/pubsub/push \ --push-auth-service-account=pubsub-cloud-run-invoker@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com \ --project=${GOOGLE_CLOUD_PROJECT}
Ensure the
/pubsub/push
path matches the endpoint defined in yourpubsub_handler/main.py
FastAPI application. - Get the URL of your deployed Cloud Run service:
-
Testing the Deployed Service:
- Use
publish_query.py
to send a message to theuser-queries
topic:python publish_query.py "Tell me a joke"
- Use
pubsub_watcher.py
(running in a separate terminal) to see the agent's response from theagent_response
topic. - You can also check the logs for your
pubsub-demo
in Google Cloud Console (Cloud Run section).
- Use
You can run the pubsub_handler
locally for testing:
- Ensure
GOOGLE_CLOUD_PROJECT
is set in your environment or.env
file. - Navigate to the
pubsub_handler
directory:cd pubsub_handler
- Install dependencies:
pip install -r requirements.txt
- Run with Uvicorn:
You would then need a way to send a POST request to
uvicorn main:app --host 0.0.0.0 --port 8080 --reload
http://localhost:8080/pubsub/push
mimicking a Pub/Sub push message. Tools likecurl
or Postman can be used for this.
To avoid ongoing charges, delete the created resources:
# Delete Cloud Run Service
gcloud run services delete pubsub-demo --platform=managed --region=${GOOGLE_CLOUD_LOCATION} --project=${GOOGLE_CLOUD_PROJECT} --quiet
# Delete Pub/Sub Subscriptions
gcloud pubsub subscriptions delete user-queries-cloud-run-sub --project=${GOOGLE_CLOUD_PROJECT} --quiet
gcloud pubsub subscriptions delete agent_response-sub --project=${GOOGLE_CLOUD_PROJECT} --quiet
# Delete Pub/Sub Topics
gcloud pubsub topics delete user-queries --project=${GOOGLE_CLOUD_PROJECT} --quiet
gcloud pubsub topics delete agent_response --project=${GOOGLE_CLOUD_PROJECT} --quiet
# Delete Service Account (if created)
gcloud iam service-accounts delete pubsub-cloud-run-invoker@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com --project=${GOOGLE_CLOUD_PROJECT} --quiet
Remember to replace ${GOOGLE_CLOUD_PROJECT}
and ${GOOGLE_CLOUD_LOCATION}
with your actual values if they are not set as environment variables in your shell.