-
Notifications
You must be signed in to change notification settings - Fork 79
Open
Description
What happened?
I might be looking at this the wrong way but, I don't know how one would use a "client" executor. Aren't A2A executors meant to be "server" side?
It would make more sense to me for this repo to provide middleware a consumer for the A2A client. Though to be fair, the A2A client might need some upgrades since right now consumers are not meant to control the flow. This is how I hacked it together with google-adk==1.14.1:
from typing import override, Dict, Any, Protocol
from a2a.client import ClientEvent as A2AClientEvent
from a2a.types import Message as A2AMessage, TaskState
from google.adk.agents.invocation_context import InvocationContext
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent as BaseRemoteA2aAgent
from google.adk.events.event import Event
from x402_a2a.core.utils import create_payment_submission_message
from x402_a2a.core.utils import x402Utils
from x402_a2a.types import PaymentStatus, x402PaymentRequiredResponse
from x402.types import PaymentPayload
class Authorizer(Protocol):
async def __call__(
self,
payment_required: x402PaymentRequiredResponse,
context: Dict[str, Any] | None = None,
) -> PaymentPayload | None: ...
class RemoteA2aAgent(BaseRemoteA2aAgent):
@override
def __init__(self, authorizer: Authorizer, **kwargs):
super().__init__(**kwargs)
self._authorizer = authorizer
self._x402Utils = x402Utils()
async def _x402(
self, initial_a2a_response: A2AClientEvent | A2AMessage, ctx: InvocationContext
) -> A2AClientEvent | A2AMessage:
if isinstance(initial_a2a_response, A2AMessage):
return initial_a2a_response
task = initial_a2a_response[0]
if task.status.state != TaskState.input_required:
return initial_a2a_response
status = self._x402Utils.get_payment_status(task)
if status != PaymentStatus.PAYMENT_REQUIRED:
return initial_a2a_response
payment_required = self._x402Utils.get_payment_requirements(task)
if payment_required is None:
# TODO: maybe report an error here?
return initial_a2a_response
try:
payment_payload = await self._authorizer(payment_required=payment_required)
except Exception:
# TODO: maybe report an error here?
return initial_a2a_response
if payment_payload is None:
# TODO: maybe report that authorizer declined to pay?
return initial_a2a_response
message = create_payment_submission_message(
task_id=task.id, payment_payload=payment_payload
)
# FIX-ME: this is required by the server, there is a bug in
# A2aAgentExecutor most probably because context_id is optional in theory
message.context_id = task.context_id
assert self._a2a_client is not None
responses = []
async for a2a_response in self._a2a_client.send_message(request=message):
responses.append(a2a_response)
# TODO: this doesn't work for streaming responses
return responses[-1]
@override
async def _handle_a2a_response(
self, a2a_response: A2AClientEvent | A2AMessage, ctx: InvocationContext
) -> Event:
return await super()._handle_a2a_response(
await self._x402(a2a_response, ctx), ctx
)Also, this seems to be wrong:
# Submit payment authorization
task = self.utils.record_payment_submission(task, payment_payload)
since there is no record_payment_submission
Code of Conduct
- I agree to follow this project's Code of Conduct