这是indexloc提供的服务,不要输入任何密码
Skip to content

[Bug]: x402ClientExecutor makes little sense? #38

@matiasanaya

Description

@matiasanaya

What happened?

I might be looking at this the wrong way but, I don't know how one would use a "client" executor. Aren't A2A executors meant to be "server" side?

It would make more sense to me for this repo to provide middleware a consumer for the A2A client. Though to be fair, the A2A client might need some upgrades since right now consumers are not meant to control the flow. This is how I hacked it together with google-adk==1.14.1:

from typing import override, Dict, Any, Protocol

from a2a.client import ClientEvent as A2AClientEvent
from a2a.types import Message as A2AMessage, TaskState
from google.adk.agents.invocation_context import InvocationContext
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent as BaseRemoteA2aAgent
from google.adk.events.event import Event
from x402_a2a.core.utils import create_payment_submission_message
from x402_a2a.core.utils import x402Utils
from x402_a2a.types import PaymentStatus, x402PaymentRequiredResponse
from x402.types import PaymentPayload


class Authorizer(Protocol):
    async def __call__(
        self,
        payment_required: x402PaymentRequiredResponse,
        context: Dict[str, Any] | None = None,
    ) -> PaymentPayload | None: ...


class RemoteA2aAgent(BaseRemoteA2aAgent):
    @override
    def __init__(self, authorizer: Authorizer, **kwargs):
        super().__init__(**kwargs)
        self._authorizer = authorizer
        self._x402Utils = x402Utils()

    async def _x402(
        self, initial_a2a_response: A2AClientEvent | A2AMessage, ctx: InvocationContext
    ) -> A2AClientEvent | A2AMessage:
        if isinstance(initial_a2a_response, A2AMessage):
            return initial_a2a_response

        task = initial_a2a_response[0]
        if task.status.state != TaskState.input_required:
            return initial_a2a_response

        status = self._x402Utils.get_payment_status(task)
        if status != PaymentStatus.PAYMENT_REQUIRED:
            return initial_a2a_response

        payment_required = self._x402Utils.get_payment_requirements(task)
        if payment_required is None:
            # TODO: maybe report an error here?
            return initial_a2a_response

        try:
            payment_payload = await self._authorizer(payment_required=payment_required)
        except Exception:
            # TODO: maybe report an error here?
            return initial_a2a_response

        if payment_payload is None:
            # TODO: maybe report that authorizer declined to pay?
            return initial_a2a_response

        message = create_payment_submission_message(
            task_id=task.id, payment_payload=payment_payload
        )
        # FIX-ME: this is required by the server, there is a bug in
        # A2aAgentExecutor most probably because context_id is optional in theory
        message.context_id = task.context_id

        assert self._a2a_client is not None
        responses = []
        async for a2a_response in self._a2a_client.send_message(request=message):
            responses.append(a2a_response)

        # TODO: this doesn't work for streaming responses
        return responses[-1]

    @override
    async def _handle_a2a_response(
        self, a2a_response: A2AClientEvent | A2AMessage, ctx: InvocationContext
    ) -> Event:
        return await super()._handle_a2a_response(
            await self._x402(a2a_response, ctx), ctx
        )

Also, this seems to be wrong:

# Submit payment authorization
            task = self.utils.record_payment_submission(task, payment_payload)

since there is no record_payment_submission

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions