这是indexloc提供的服务,不要输入任何密码
Skip to content

[Feat]: Support for Google ADK A2aAgentExecutor #41

@matiasanaya

Description

@matiasanaya

Is your feature request related to a problem? Please describe.

The current executor doesn't work with A2aAgentExecutor. This is more an issue on the A2aAgentExecutor side of things, but would like to bring some visibility to the issue here as well in the hopes that it will help move the issue along in the adk-python repository. In particular the issue is google/adk-python#3009

Describe the solution you'd like

This is what I ended up doing to hack it into working (google-adk==1.14.1):

  1. Monkey patch A2aAgentExecutor.convert_a2a_request_to_adk_run_args so as to allow for task metadata to be passed to the ADK agent as state.
from typing import Any

import google.adk.a2a.executor.a2a_agent_executor
from google.adk.a2a.converters.part_converter import A2APartToGenAIPartConverter
from google.adk.a2a.converters.part_converter import convert_a2a_part_to_genai_part
from google.adk.a2a.converters.request_converter import (
    convert_a2a_request_to_adk_run_args,
)
from x402_a2a.types import RequestContext


def override_convert_a2a_request_to_adk_run_args(
    request: RequestContext,
    part_converter: A2APartToGenAIPartConverter = convert_a2a_part_to_genai_part,
) -> dict[str, Any]:
    og = convert_a2a_request_to_adk_run_args(request, part_converter)
    if request.current_task and request.current_task.metadata:
        og["state_delta"] = {}
        for key, value in request.current_task.metadata.items():
            og["state_delta"][key] = value
    return og


# Monkey patch
google.adk.a2a.executor.a2a_agent_executor.convert_a2a_request_to_adk_run_args = (  # type: ignore
    override_convert_a2a_request_to_adk_run_args
)

# IMPORTANT: Apply monkey patch BEFORE importing A2aAgentExecutor
from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutor  # noqa: E402

MonkeyA2aAgentExecutor = A2aAgentExecutor

And also, split the A2aAgentExecutor into an "outer" and "inner" executor to allow for the x402ServerExecutor to be in between those two, to effectively catch the x402PaymentRequiredException(...) raised by the ADK Agent.

import uuid

from datetime import datetime
from datetime import timezone
from typing import override
from venv import logger

from a2a.types import Message
from a2a.types import Part
from a2a.types import Role
from a2a.types import TaskStatusUpdateEvent
from a2a.types import TextPart
from x402_a2a.types import AgentExecutor
from x402_a2a.types import EventQueue
from x402_a2a.types import TaskState
from x402_a2a.types import TaskStatus
from x402_a2a.types import RequestContext

from .a2a_monkey import MonkeyA2aAgentExecutor


class OuterA2aAgentExecutor(AgentExecutor):
    def __init__(
        self,
        delegate: AgentExecutor,
    ):
        super().__init__()
        self._delegate = delegate

    async def cancel(self, context: RequestContext, event_queue: EventQueue):
        return await self._delegate.cancel(context, event_queue)

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ):
        if not context.message:
            raise ValueError("A2A request must have a message")

        assert context.task_id, "A2A request must have a task ID"
        assert context.context_id, "A2A request must have a context ID"

        # for new task, create a task submitted event
        if not context.current_task:
            await event_queue.enqueue_event(
                TaskStatusUpdateEvent(
                    task_id=context.task_id,
                    status=TaskStatus(
                        state=TaskState.submitted,
                        message=context.message,
                        timestamp=datetime.now(timezone.utc).isoformat(),
                    ),
                    context_id=context.context_id,
                    final=False,
                )
            )
        try:
            await self._delegate.execute(context, event_queue)
        except Exception as e:
            logger.error("Error handling A2A request: %s", e, exc_info=True)
            # Publish failure event
            try:
                await event_queue.enqueue_event(
                    TaskStatusUpdateEvent(
                        task_id=context.task_id,
                        status=TaskStatus(
                            state=TaskState.failed,
                            timestamp=datetime.now(timezone.utc).isoformat(),
                            message=Message(
                                message_id=str(uuid.uuid4()),
                                role=Role.agent,
                                parts=[Part(TextPart(text=str(e)))],
                            ),
                        ),
                        context_id=context.context_id,
                        final=True,
                    )
                )
            except Exception as enqueue_error:
                logger.error(
                    "Failed to publish failure event: %s", enqueue_error, exc_info=True
                )


class InnerA2aAgentExecutor(MonkeyA2aAgentExecutor):
    @override
    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ):
        await self._handle_request(context, event_queue)

Now, this is not perfect, nor production ready, but hopefully paints the picture of what's required.

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions