这是indexloc提供的服务,不要输入任何密码
Skip to content

ptone/adk-pubsub-demo

Repository files navigation

Pub/Sub ADK Agent Demo

This project demonstrates an architecture where user queries are submitted via Google Cloud Pub/Sub, processed by an ADK-based agent, and the responses are published back to another Pub/Sub topic. The agent interaction is handled by a FastAPI application designed to be deployed as a Google Cloud Run service.

Architectural Flow

Project Structure

/Users/ptone/dev/adk/projects/pubsub-demo/
├───.env                  # Environment variables (GOOGLE_CLOUD_PROJECT, GOOGLE_CLOUD_LOCATION, etc)
├───publish_query.py      # Utility to send a query to the 'user-queries' topic
├───pubsub_watcher.py     # Utility to watch for messages on the 'agent_response-sub' subscription
├───requirements.txt      # Python dependencies for utility scripts
├───README.md             # This file
├───Dockerfile            # Dockerfile for building the handler service
├───pubsub_handler/       # FastAPI application for handling Pub/Sub messages
│   ├───main.py             # FastAPI application logic
└───subber/               # ADK Agent implementation
    ├───__init__.py
    └───agent.py            # ADK agent logic

Prerequisites

  • Python 3.9+
  • Google Cloud SDK (gcloud CLI) installed and authenticated.
  • A Google Cloud Project.

Setup

  1. Clone the repository (if applicable) or ensure you are in the project root directory.

  2. Create and activate a Python virtual environment:

    python3 -m venv .venv
    source .venv/bin/activate
  3. Install Python dependencies:

    pip install -r requirements.txt
  4. Configure Environment Variables: Create a .env file in the project root with your Google Cloud Project details:

    GOOGLE_CLOUD_PROJECT=your-gcp-project-id
    GOOGLE_CLOUD_LOCATION=your-gcp-region # e.g., us-central1
    # GOOGLE_GENAI_USE_VERTEXAI=1 # If using Vertex AI for GenAI models

    Replace your-gcp-project-id and your-gcp-region with your actual project ID and preferred region.

Google Cloud Pub/Sub Setup

This project uses two Pub/Sub topics:

  • user-queries: For publishing incoming user queries.
  • agent_response: For publishing the agent's final responses.

And one subscription:

  • agent_response-sub: For the pubsub_watcher.py script to listen for agent responses.
  • (Another subscription will be created by Cloud Run to trigger the pubsub_handler service from the user-queries topic).

Create the Pub/Sub Topics: Use the following gcloud commands, replacing ptone-misc with your GOOGLE_CLOUD_PROJECT if different.

gcloud pubsub topics create user-queries --project=ptone-misc
gcloud pubsub topics create agent_response --project=ptone-misc

Create the Subscription for pubsub_watcher.py: This subscription allows the pubsub_watcher.py script to receive messages from the agent_response topic.

gcloud pubsub subscriptions create agent_response-sub --topic=agent_response --project=ptone-misc

Using the Utility Scripts

1. publish_query.py

This script sends a message (your query) to the user-queries Pub/Sub topic.

Usage:

python publish_query.py "Your query string here"

Example:

python publish_query.py "How do I bake a cake?"

2. pubsub_watcher.py

This script listens to the agent_response-sub subscription and prints any messages received from the agent_response topic.

Usage:

python pubsub_watcher.py

It will keep running and display messages as they arrive. Press Ctrl+C to stop.

Deploying the pubsub_handler to Google Cloud Run from Source

The pubsub_handler is a FastAPI application that:

  1. Is triggered by messages on the user-queries Pub/Sub topic.
  2. Processes the message using the ADK agent (subber/agent.py).
  3. Publishes the agent's response to the agent_response Pub/Sub topic.

Deployment Steps:

  1. Enable Google Cloud APIs: Ensure you have the Cloud Run API, Cloud Build API, and Pub/Sub API enabled in your Google Cloud project.

    gcloud services enable run.googleapis.com cloudbuild.googleapis.com pubsub.googleapis.com
  2. Deploy to Cloud Run: This command will build and deploy the service from the source code in the current directory. Replace placeholders like your-gcp-project-id and your-gcp-region as needed. The --service-account flag should specify a service account with permissions for Pub/Sub (e.g., Pub/Sub Publisher for the response topic, and implicitly Pub/Sub Subscriber via the push subscription). It also needs permissions to invoke the ADK agent if it calls other Google Cloud services.

    gcloud run deploy pubsub-demo \
        --source . \
        --platform=managed \
        --region=${GOOGLE_CLOUD_LOCATION} \
        --set-env-vars="GOOGLE_CLOUD_PROJECT=${GOOGLE_CLOUD_PROJECT}" \
        --set-env-vars="GOOGLE_CLOUD_LOCATION=${GOOGLE_CLOUD_LOCATION}" \
        --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=true" \
        --project=${GOOGLE_CLOUD_PROJECT}
        # --service-account=your-service-account-email@your-gcp-project-id.iam.gserviceaccount.com \
  3. Create a Pub/Sub Push Subscription to Trigger Cloud Run: This command creates a new Pub/Sub subscription (user-queries-cloud-run-sub) to the user-queries topic. Messages sent to user-queries will be pushed to your deployed Cloud Run service.

    • Get the URL of your deployed Cloud Run service:
      SERVICE_URL=$(gcloud run services describe pubsub-demo --platform managed --region ${GOOGLE_CLOUD_LOCATION} --format 'value(status.url)')
      echo $SERVICE_URL
    • Create a service account for Pub/Sub to use when invoking your Cloud Run service (if you don't have one already with run.invoker role):
      gcloud iam service-accounts create pubsub-cloud-run-invoker --display-name "Pub/Sub Cloud Run Invoker" --project=${GOOGLE_CLOUD_PROJECT}
      
      gcloud run services add-iam-policy-binding pubsub-demo \
          --member="serviceAccount:pubsub-cloud-run-invoker@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com" \
          --role="roles/run.invoker" \
          --region=${GOOGLE_CLOUD_LOCATION} \
          --platform=managed \
          --project=${GOOGLE_CLOUD_PROJECT}
    • Create the push subscription:
      gcloud pubsub subscriptions create user-queries-cloud-run-sub \
          --topic=user-queries \
          --push-endpoint=${SERVICE_URL}/pubsub/push \
          --push-auth-service-account=pubsub-cloud-run-invoker@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com \
          --project=${GOOGLE_CLOUD_PROJECT}

    Ensure the /pubsub/push path matches the endpoint defined in your pubsub_handler/main.py FastAPI application.

  4. Testing the Deployed Service:

    • Use publish_query.py to send a message to the user-queries topic:
      python publish_query.py "Tell me a joke"
    • Use pubsub_watcher.py (running in a separate terminal) to see the agent's response from the agent_response topic.
    • You can also check the logs for your pubsub-demo in Google Cloud Console (Cloud Run section).

Local Development of pubsub_handler (Optional)

You can run the pubsub_handler locally for testing:

  1. Ensure GOOGLE_CLOUD_PROJECT is set in your environment or .env file.
  2. Navigate to the pubsub_handler directory:
    cd pubsub_handler
  3. Install dependencies:
    pip install -r requirements.txt
  4. Run with Uvicorn:
    uvicorn main:app --host 0.0.0.0 --port 8080 --reload
    You would then need a way to send a POST request to http://localhost:8080/pubsub/push mimicking a Pub/Sub push message. Tools like curl or Postman can be used for this.

Cleaning Up

To avoid ongoing charges, delete the created resources:

# Delete Cloud Run Service
gcloud run services delete pubsub-demo --platform=managed --region=${GOOGLE_CLOUD_LOCATION} --project=${GOOGLE_CLOUD_PROJECT} --quiet

# Delete Pub/Sub Subscriptions
gcloud pubsub subscriptions delete user-queries-cloud-run-sub --project=${GOOGLE_CLOUD_PROJECT} --quiet
gcloud pubsub subscriptions delete agent_response-sub --project=${GOOGLE_CLOUD_PROJECT} --quiet

# Delete Pub/Sub Topics
gcloud pubsub topics delete user-queries --project=${GOOGLE_CLOUD_PROJECT} --quiet
gcloud pubsub topics delete agent_response --project=${GOOGLE_CLOUD_PROJECT} --quiet

# Delete Service Account (if created)
gcloud iam service-accounts delete pubsub-cloud-run-invoker@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com --project=${GOOGLE_CLOUD_PROJECT} --quiet

Remember to replace ${GOOGLE_CLOUD_PROJECT} and ${GOOGLE_CLOUD_LOCATION} with your actual values if they are not set as environment variables in your shell.

About

A demonstration of using ADK with pubsub

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published