-
Notifications
You must be signed in to change notification settings - Fork 79
Open
Description
Is your feature request related to a problem? Please describe.
The current executor doesn't work with A2aAgentExecutor. This is more an issue on the A2aAgentExecutor side of things, but would like to bring some visibility to the issue here as well in the hopes that it will help move the issue along in the adk-python repository. In particular the issue is google/adk-python#3009
Describe the solution you'd like
This is what I ended up doing to hack it into working (google-adk==1.14.1):
- Monkey patch
A2aAgentExecutor.convert_a2a_request_to_adk_run_argsso as to allow for task metadata to be passed to the ADK agent as state.
from typing import Any
import google.adk.a2a.executor.a2a_agent_executor
from google.adk.a2a.converters.part_converter import A2APartToGenAIPartConverter
from google.adk.a2a.converters.part_converter import convert_a2a_part_to_genai_part
from google.adk.a2a.converters.request_converter import (
convert_a2a_request_to_adk_run_args,
)
from x402_a2a.types import RequestContext
def override_convert_a2a_request_to_adk_run_args(
request: RequestContext,
part_converter: A2APartToGenAIPartConverter = convert_a2a_part_to_genai_part,
) -> dict[str, Any]:
og = convert_a2a_request_to_adk_run_args(request, part_converter)
if request.current_task and request.current_task.metadata:
og["state_delta"] = {}
for key, value in request.current_task.metadata.items():
og["state_delta"][key] = value
return og
# Monkey patch
google.adk.a2a.executor.a2a_agent_executor.convert_a2a_request_to_adk_run_args = ( # type: ignore
override_convert_a2a_request_to_adk_run_args
)
# IMPORTANT: Apply monkey patch BEFORE importing A2aAgentExecutor
from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutor # noqa: E402
MonkeyA2aAgentExecutor = A2aAgentExecutorAnd also, split the A2aAgentExecutor into an "outer" and "inner" executor to allow for the x402ServerExecutor to be in between those two, to effectively catch the x402PaymentRequiredException(...) raised by the ADK Agent.
import uuid
from datetime import datetime
from datetime import timezone
from typing import override
from venv import logger
from a2a.types import Message
from a2a.types import Part
from a2a.types import Role
from a2a.types import TaskStatusUpdateEvent
from a2a.types import TextPart
from x402_a2a.types import AgentExecutor
from x402_a2a.types import EventQueue
from x402_a2a.types import TaskState
from x402_a2a.types import TaskStatus
from x402_a2a.types import RequestContext
from .a2a_monkey import MonkeyA2aAgentExecutor
class OuterA2aAgentExecutor(AgentExecutor):
def __init__(
self,
delegate: AgentExecutor,
):
super().__init__()
self._delegate = delegate
async def cancel(self, context: RequestContext, event_queue: EventQueue):
return await self._delegate.cancel(context, event_queue)
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
):
if not context.message:
raise ValueError("A2A request must have a message")
assert context.task_id, "A2A request must have a task ID"
assert context.context_id, "A2A request must have a context ID"
# for new task, create a task submitted event
if not context.current_task:
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=context.task_id,
status=TaskStatus(
state=TaskState.submitted,
message=context.message,
timestamp=datetime.now(timezone.utc).isoformat(),
),
context_id=context.context_id,
final=False,
)
)
try:
await self._delegate.execute(context, event_queue)
except Exception as e:
logger.error("Error handling A2A request: %s", e, exc_info=True)
# Publish failure event
try:
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=context.task_id,
status=TaskStatus(
state=TaskState.failed,
timestamp=datetime.now(timezone.utc).isoformat(),
message=Message(
message_id=str(uuid.uuid4()),
role=Role.agent,
parts=[Part(TextPart(text=str(e)))],
),
),
context_id=context.context_id,
final=True,
)
)
except Exception as enqueue_error:
logger.error(
"Failed to publish failure event: %s", enqueue_error, exc_info=True
)
class InnerA2aAgentExecutor(MonkeyA2aAgentExecutor):
@override
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
):
await self._handle_request(context, event_queue)Now, this is not perfect, nor production ready, but hopefully paints the picture of what's required.
Code of Conduct
- I agree to follow this project's Code of Conduct