diff --git a/CLAUDE.md b/CLAUDE.md index 3bd59e2..21ef568 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -172,6 +172,75 @@ Alternatively, clean up components individually: - Ensure the PROJECT file contains entries for all resources before running `make manifests` - Follow the detailed guidance in the [Kubebuilder Guide](/kubechain/docs/kubebuilder-guide.md) +### Kubernetes Resource Design + +#### Don't use "config" in field names: + +Bad: + +``` +spec: + slackConfig: + #... + emailConfig: + #... +``` + +Good: + +``` +spec: + slack: + # ... + email: + # ... +``` + +#### Prefer nil-able sub-objects over "type" fields: + +This is more guidelines than rules, just consider it in cases when you a Resource that is a union type. There's +no great answer here because of how Go handles unions. (maybe the state-of-the-art has progressed since the last time I checked) -- dex + +Bad: + +``` +spec: + type: slack + slack: + channelOrUserID: C1234567890 +``` + +Good: + +``` +spec: + slack: + channelOrUserID: C1234567890 +``` + +In code, instead of + +``` +switch (resource.Spec.Type) { + case "slack": + // ... + case "email": + // ... +} +``` + +check which object is non-nil and use that: + +``` +if resource.Spec.Slack != nil { + // ... +} else if resource.Spec.Email != nil { + // ... +} else if { + // ... +} +``` + ### Markdown - When writing markdown code blocks, do not indent the block, just use backticks to offset the code diff --git a/README.md b/README.md index 108b443..0fe8d43 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ KubeChain is a cloud-native orchestrator for AI Agents built on Kubernetes. It supports [long-lived outer-loop agents](https://theouterloop.substack.com/p/openais-realtime-api-is-a-step-towards) that can process asynchronous execution of both LLM inference and long-running tool calls. It's designed for simplicity and gives strong durability and reliability guarantees for agents that make asynchronous tool calls like contacting humans or delegating work to other agents. :warning: **Note** - KubeChain is experimental and some known issues and race conditions. Use at your own risk. - +

@@ -66,15 +66,11 @@ To run KubeChain locally on macos, you'll also need: ### Setting Up a Local Cluster - - -1. **Create a Kind cluster** - ```bash kind create cluster ``` -2. **Add your OpenAI API key as a Kubernetes secret** +### Add your OpenAI API key as a Kubernetes secret ```bash kubectl create secret generic openai \ @@ -86,10 +82,10 @@ kubectl create secret generic openai \ > [!TIP] -> For better visibility when running tutorial, we recommend starting +> For better visibility when running tutorial, we recommend starting > a stream to watch all the events as they're happening, > for example: -> +> > ```bash > kubectl get events --watch > ``` @@ -155,24 +151,24 @@ graph RL ``` Check the created LLM: - + ```bash kubectl get llm ``` - + Output: ``` NAME PROVIDER READY STATUS gpt-4o openai true Ready ``` - +
Using `-o wide` and `describe` - + ```bash kubectl get llm -o wide ``` - + Output: ``` NAME PROVIDER READY STATUS DETAIL @@ -254,24 +250,24 @@ graph RL ``` Check the created Agent: - + ```bash kubectl get agent ``` - + Output: ``` NAME READY STATUS my-assistant true Ready ``` - +
Using `-o wide` and `describe` - + ```bash kubectl get agent -o wide ``` - + Output: ``` NAME READY STATUS DETAIL @@ -359,21 +355,21 @@ graph RL end ``` Check the created Task: - + ```bash kubectl get task ``` - + Output: ``` NAME READY STATUS AGENT MESSAGE hello-world-task true Ready my-assistant What is the capital of the moon? ``` - +
Using `-o wide` and `describe` - + ```bash kubectl get task -o wide ``` @@ -464,7 +460,7 @@ graph RL For now, our task run should complete quickly and return a FinalAnswer. ```bash -kubectl get taskrun +kubectl get taskrun ``` Output: @@ -480,7 +476,7 @@ To get just the output, run kubectl get taskrun -o jsonpath='{.items[*].status.output}' ``` -and you'll see +and you'll see > The Moon does not have a capital. It is a natural satellite of Earth and lacks any governmental structure or human habitation that would necessitate a capital city. @@ -536,7 +532,7 @@ We saw above how you can get the status of a taskrun with `kubectl get taskrun`. For more detailed information, like to see the full context window, you can use: ```bash -kubectl describe taskrun +kubectl describe taskrun ``` ``` @@ -659,7 +655,7 @@ fetch true Ready ```bash kubectl describe mcpserver ``` -Output: +Output: ``` Name: fetch @@ -784,7 +780,7 @@ spec: EOF ``` -You should see some events in the output of +You should see some events in the output of ``` kubectl get events --watch @@ -807,7 +803,7 @@ kubectl get taskrun fetch-task-1 -o jsonpath='{.status.output}' ``` > The URL [https://swapi.dev/api/people/1](https://swapi.dev/api/people/1) contains the following data about a Star Wars character: -> +> > - **Name**: Luke Skywalker > - **Height**: 172 cm > - **Mass**: 77 kg @@ -836,7 +832,7 @@ kubectl get taskrun fetch-task-1 -o jsonpath='{.status.output}' and you can describe the taskrun to see the full context window and tool-calling turns ``` -kubectl describe taskrun fetch-task-1 +kubectl describe taskrun fetch-task-1 ``` A simplified view of the taskrun: @@ -866,11 +862,11 @@ flowchart TD Task2[Task] Agent2[Agent] - UserMessage --> Task --> Agent --> LLM + UserMessage --> Task --> Agent --> LLM Provider --> OpenAI Secret --> Credentials Credentials --> OpenAI - OpenAI --> ToolCall-1 + OpenAI --> ToolCall-1 ToolCall-1 --> Task2 --> Agent2 --> fetch fetch --> ToolResponse-1 ToolResponse-1 --> OpenAI2[OpenAI] @@ -1052,6 +1048,253 @@ Events: That's it! Go add your favorite MCPs and start running durable agents in Kubernetes! +### Incorporating Human Approval + +For certain classes of MCP tools, you may want to incorporate human approval into an agent's workflow. + +KubeChain supports this via [HumanLayer's](https://github.com/humanlayer/humanlayer) [contact channels](https://www.humanlayer.dev/docs/channels/introduction) +to request human approval and input across Slack, email, and more. + +**Note**: Directly approving tool calls with `kubectl` or a `kubechain` CLI is planned but not yet supported. + +**Note**: We recommend running through the above examples first prior exploring this section. Several Kubernetes resources created in that section will be assumed to exist. If you'd like to see a full running version of the Kubernetes configuration used in this section, check out [kubechain_v1alpha_contactchannel_with_approval.yaml](./config/samples/kubechain_v1alpha_contactchannel_with_approval.yaml) + +You'll need a HumanLayer API key to get started: + +```bash +kubectl create secret generic humanlayer --from-literal=humanlayer-api-key=$HUMANLAYER_API_KEY +``` + +Next, create a ContactChannel resource. In this example, we'll use an email contact channel (be sure to swap the `approver@example.com` address for a real target email address): + +```bash +cat < +API Version: kubechain.humanlayer.dev/v1alpha1 +Kind: TaskRunToolCall +Metadata: + Creation Timestamp: 2025-04-01T16:09:02Z + Generation: 1 + Owner References: + API Version: kubechain.humanlayer.dev/v1alpha1 + Controller: true + Kind: TaskRun + Name: approved-fetch-task-1 + UID: 52893dec-c5a5-424d-983f-13a89215b084 + Resource Version: 91939 + UID: 3f8c4eaf-0e46-44f6-9741-f32809747099 +Spec: + Arguments: {"url":"https://swapi.dev/api/people/2"} + Task Run Ref: + Name: approved-fetch-task-1 + Tool Call Id: call_7PCkM1y2v8wFOC2vKtDrweor + Tool Ref: + Name: fetch__fetch +Status: + External Call ID: ec-3257d3e + Phase: Pending + Start Time: 2025-04-01T16:09:02Z + Status: AwaitingHumanApproval + Status Detail: Waiting for human approval via contact channel approval-channel +Events: + Type Reason Age From Message + ---- ------ ---- ---- ------- + Normal AwaitingHumanApproval 2m42s taskruntoolcall-controller Tool execution requires approval via contact channel approval-channel + Normal HumanLayerRequestSent 2m41s taskruntoolcall-controller HumanLayer request sent +``` + +Note as well, at this point our `taskrun` has not completed. If we run `kubectl get taskrun approved-fetch-task-1` no `OUTPUT` has yet been returned. + +Go ahead and approve the email you should have received via HumanLayer requesting approval to run our `fetch` tool. After a few seconds, running `kubectl get taskruntoolcall approved-fetch-task-1-tc-01` should show our tool has been called. Additionally, if we run `kubectl describe taskrun approved-fetch-task-1`, we should see the following (truncated a bit for brevity): + +``` +$ kubectl describe taskrun approved-fetch-task-1 +Name: approved-fetch-task-1 +Kind: TaskRun +Metadata: + Creation Timestamp: 2025-04-01T16:16:13Z + UID: 58c9d760-a160-4386-9d8d-ae9da0286125 +Spec: + Task Ref: + Name: approved-fetch-task +Status: + Context Window: + Content: You are a helpful assistant. Your job is to help the user with their tasks. + + Role: system + Content: Write me a haiku about the character found at https://swapi.dev/api/people/2? + Role: user + Content: + Role: assistant + Tool Calls: + Function: + Arguments: {"url":"https://swapi.dev/api/people/2"} + Name: fetch__fetch + Id: call_FZaXJq1FKuBVLYr9HHJwcnOb + Type: function + Content: Content type application/json cannot be simplified to markdown, but here is the raw content: +Contents of https://swapi.dev/api/people/2: +{"name":"C-3PO","height":"167","mass":"75","hair_color":"n/a","skin_color":"gold","eye_color":"yellow","birth_year":"112BBY","gender":"n/a","homeworld":"https://swapi.dev/api/planets/1/","films":["https://swapi.dev/api/films/1/","https://swapi.dev/api/films/2/","https://swapi.dev/api/films/3/","https://swapi.dev/api/films/4/","https://swapi.dev/api/films/5/","https://swapi.dev/api/films/6/"],"species":["https://swapi.dev/api/species/2/"],"vehicles":[],"starships":[],"created":"2014-12-10T15:10:51.357000Z","edited":"2014-12-20T21:17:50.309000Z","url":"https://swapi.dev/api/people/2/"} + Role: tool + Tool Call Id: call_FZaXJq1FKuBVLYr9HHJwcnOb + Content: Golden C-3PO, +Speaks in many languages, +Droid with gentle heart. + Role: assistant + Output: Golden C-3PO, +Speaks in many languages, +Droid with gentle heart. + Phase: FinalAnswer + Ready: true + Span Context: + Span ID: 3fd054c970f50fc1 + Trace ID: 21e2b0e7457ae78cce4abaf1b1c02819 + Status: Ready + Status Detail: LLM final response received +Events: + Type Reason Age From Message + ---- ------ ---- ---- ------- + Normal ValidationSucceeded 48s taskrun-controller Task validated successfully + Normal ToolCallsPending 47s taskrun-controller LLM response received, tool calls pending + Normal ToolCallCreated 47s taskrun-controller Created TaskRunToolCall approved-fetch-task-1-tc-01 + Normal SendingContextWindowToLLM 7s (x2 over 48s) taskrun-controller Sending context window to LLM + Normal AllToolCallsCompleted 7s taskrun-controller All tool calls completed, ready to send tool results to LLM + Normal LLMFinalAnswer 6s taskrun-controller LLM response received successfully +``` + +### Using other Language Models + +So far we've been using the `gpt-4o` model from OpenAI. KubeChain also supports using other language models from OpenAI, Anthropic, Vertex, and more. + + +Let's create a new LLM and Agent that uses the `claude-3-5-sonnet` model from Anthropic. + +#### Create a secret + +``` +cat < 0 { logger.Info("Child TaskRun already exists", "childTaskRun", taskRunList.Items[0].Name) // Optionally, sync status from child to parent. - return true, nil + return true, nil, true } - return false, nil + return false, nil, false } // parseArguments parses the tool call arguments -func (r *TaskRunToolCallReconciler) parseArguments(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall) (map[string]interface{}, error) { +func (r *TaskRunToolCallReconciler) parseArguments(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall) (args map[string]interface{}, err error) { logger := log.FromContext(ctx) // Parse the arguments string as JSON (needed for both MCP and traditional tools) - var args map[string]interface{} if err := json.Unmarshal([]byte(trtc.Spec.Arguments), &args); err != nil { logger.Error(err, "Failed to parse arguments") trtc.Status.Status = StatusError @@ -576,8 +576,8 @@ func (r *TaskRunToolCallReconciler) processExternalAPI(ctx context.Context, trtc kwargs, _ = kwargsVal.(map[string]interface{}) } - // Generate call ID - callID := "call-" + uuid.New().String() + // Generate call ID - using a shorter format while maintaining uniqueness + callID := "c" + uuid.New().String()[:7] // Prepare function call spec functionSpec := map[string]interface{}{ @@ -681,9 +681,10 @@ func (r *TaskRunToolCallReconciler) getHumanLayerAPIKey(ctx context.Context, sec } //nolint:unparam -func (r *TaskRunToolCallReconciler) setStatusError(ctx context.Context, trtcStatus kubechainv1alpha1.TaskRunToolCallStatusType, eventType string, trtc *kubechainv1alpha1.TaskRunToolCall, err error) (ctrl.Result, error) { +func (r *TaskRunToolCallReconciler) setStatusError(ctx context.Context, trtcStatus kubechainv1alpha1.TaskRunToolCallStatusType, eventType string, trtc *kubechainv1alpha1.TaskRunToolCall, err error) (ctrl.Result, error, bool) { + trtcDeepCopy := trtc.DeepCopy() logger := log.FromContext(ctx) - trtc.Status.Status = trtcStatus + trtcDeepCopy.Status.Status = trtcStatus // Handle nil error case errorMessage := "Unknown error occurred" @@ -691,27 +692,43 @@ func (r *TaskRunToolCallReconciler) setStatusError(ctx context.Context, trtcStat errorMessage = err.Error() } - trtc.Status.StatusDetail = errorMessage - trtc.Status.Error = errorMessage - r.recorder.Event(trtc, corev1.EventTypeWarning, eventType, errorMessage) + trtcDeepCopy.Status.StatusDetail = errorMessage + trtcDeepCopy.Status.Error = errorMessage + r.recorder.Event(trtcDeepCopy, corev1.EventTypeWarning, eventType, errorMessage) - if err := r.Status().Update(ctx, trtc); err != nil { + if err := r.Status().Update(ctx, trtcDeepCopy); err != nil { logger.Error(err, "Failed to update status") - return ctrl.Result{}, err + return ctrl.Result{}, err, true } - return ctrl.Result{}, nil + return ctrl.Result{}, nil, true } -func (r *TaskRunToolCallReconciler) postToHumanLayer(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall, contactChannel *kubechainv1alpha1.ContactChannel, apiKey string) (int, error) { - client := r.HLClient.NewHumanLayerClient() +func (r *TaskRunToolCallReconciler) updateTRTCStatus(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall, trtcStatusType kubechainv1alpha1.TaskRunToolCallStatusType, trtcStatusPhase kubechainv1alpha1.TaskRunToolCallPhase, statusDetail string) (ctrl.Result, error, bool) { + logger := log.FromContext(ctx) + + trtcDeepCopy := trtc.DeepCopy() - switch contactChannel.Spec.ChannelType { - case "slack": - client.SetSlackConfig(contactChannel.Spec.SlackConfig) - case "email": - client.SetEmailConfig(contactChannel.Spec.EmailConfig) + trtcDeepCopy.Status.Status = trtcStatusType + trtcDeepCopy.Status.StatusDetail = statusDetail + trtcDeepCopy.Status.Phase = trtcStatusPhase + + if err := r.Status().Update(ctx, trtcDeepCopy); err != nil { + logger.Error(err, "Failed to update status") + return ctrl.Result{}, err, true + } + return ctrl.Result{}, nil, true +} + +func (r *TaskRunToolCallReconciler) postToHumanLayer(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall, contactChannel *kubechainv1alpha1.ContactChannel, apiKey string) (*humanlayerapi.FunctionCallOutput, int, error) { + client := r.HLClientFactory.NewHumanLayerClient() + + switch contactChannel.Spec.Type { + case kubechainv1alpha1.ContactChannelTypeSlack: + client.SetSlackConfig(contactChannel.Spec.Slack) + case kubechainv1alpha1.ContactChannelTypeEmail: + client.SetEmailConfig(contactChannel.Spec.Email) default: - return 0, fmt.Errorf("unsupported channel type: %s", contactChannel.Spec.ChannelType) + return nil, 0, fmt.Errorf("unsupported channel type: %s", contactChannel.Spec.Type) } toolName := trtc.Spec.ToolRef.Name @@ -724,133 +741,250 @@ func (r *TaskRunToolCallReconciler) postToHumanLayer(ctx context.Context, trtc * } client.SetFunctionCallSpec(toolName, args) - client.SetCallID("call-" + uuid.New().String()) + client.SetCallID("ec-" + uuid.New().String()[:7]) client.SetRunID(trtc.Name) client.SetAPIKey(apiKey) - _, statusCode, err := client.RequestApproval(ctx) + functionCall, statusCode, err := client.RequestApproval(ctx) - return statusCode, err + if err == nil { + r.recorder.Event(trtc, corev1.EventTypeNormal, "HumanLayerRequestSent", "HumanLayer request sent") + } + + return functionCall, statusCode, err } -// Reconcile processes TaskRunToolCall objects. -func (r *TaskRunToolCallReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +// handlePendingApproval checks if an existing human approval is completed and updates status accordingly +func (r *TaskRunToolCallReconciler) handlePendingApproval(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall, apiKey string) (ctrl.Result, error, bool) { logger := log.FromContext(ctx) - var trtc kubechainv1alpha1.TaskRunToolCall - if err := r.Get(ctx, req.NamespacedName, &trtc); err != nil { - return ctrl.Result{}, client.IgnoreNotFound(err) + // Only process if in the pending approval state + if trtc.Status.Status != kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval { + return ctrl.Result{}, nil, false } - logger.Info("Reconciling TaskRunToolCall", "name", trtc.Name) - // Step 1: Initialize status if not set - if initialized, err := r.initializeTRTC(ctx, &trtc); initialized || err != nil { - if err != nil { - return ctrl.Result{}, err - } - return ctrl.Result{}, nil + // Verify we have a call ID + if trtc.Status.ExternalCallID == "" { + logger.Info("Missing ExternalCallID in pending approval state") + return ctrl.Result{}, nil, false } - // Do not pass GO, do not collect $200, if we've errored - if trtc.Status.Status == kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval { - return ctrl.Result{}, nil + client := r.HLClientFactory.NewHumanLayerClient() + client.SetCallID(trtc.Status.ExternalCallID) + client.SetAPIKey(apiKey) + functionCall, _, err := client.GetFunctionCallStatus(ctx) + if err != nil { + return ctrl.Result{}, err, true } - // Step 2: Check if already completed or has child TaskRun - if done, err := r.checkCompletedOrExisting(ctx, &trtc); done || err != nil { - if err != nil { - return ctrl.Result{}, err - } + status := functionCall.GetStatus() + + approved, ok := status.GetApprovedOk() + + if !ok || approved == nil { + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil, true + } + + if *approved { + return r.updateTRTCStatus(ctx, trtc, + kubechainv1alpha1.TaskRunToolCallStatusTypeReadyToExecuteApprovedTool, + kubechainv1alpha1.TaskRunToolCallPhasePending, + "Ready to execute approved tool") + } else { + return r.updateTRTCStatus(ctx, trtc, + kubechainv1alpha1.TaskRunToolCallStatusTypeToolCallRejected, + kubechainv1alpha1.TaskRunToolCallPhaseFailed, + "Tool execution rejected") + } +} + +// requestHumanApproval handles setting up a new human approval request +func (r *TaskRunToolCallReconciler) requestHumanApproval(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall, + contactChannel *kubechainv1alpha1.ContactChannel, apiKey string, mcpServer *kubechainv1alpha1.MCPServer, +) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + // Skip if already in progress or approved + if trtc.Status.Status == kubechainv1alpha1.TaskRunToolCallStatusTypeReadyToExecuteApprovedTool { return ctrl.Result{}, nil } - serverName, mcpToolName, isMCP := isMCPTool(trtc.Spec.ToolRef.Name) + // Update status to awaiting approval + trtc.Status.Status = "AwaitingHumanApproval" + trtc.Status.StatusDetail = fmt.Sprintf("Waiting for human approval via contact channel %s", mcpServer.Spec.ApprovalContactChannel.Name) + r.recorder.Event(trtc, corev1.EventTypeNormal, "AwaitingHumanApproval", + fmt.Sprintf("Tool execution requires approval via contact channel %s", mcpServer.Spec.ApprovalContactChannel.Name)) - if isMCP { - // Step 3: Check if this is an MCP tool and needs approval - mcpServer, needsApproval, err := r.getMCPServer(ctx, &trtc) - if err != nil { - return ctrl.Result{}, err - } + if err := r.Status().Update(ctx, trtc); err != nil { + logger.Error(err, "Failed to update TaskRunToolCall status") + return ctrl.Result{}, err + } - // No approval yet? Get back in the loop. - if needsApproval && trtc.Status.Status == kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval { - return ctrl.Result{RequeueAfter: 5 * time.Second}, nil - } + // Verify HLClient is initialized + if r.HLClientFactory == nil { + err := fmt.Errorf("HLClient not initialized") + result, errStatus, _ := r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval, + "NoHumanLayerClient", trtc, err) + return result, errStatus + } - if needsApproval { + // Post to HumanLayer to request approval + functionCall, statusCode, err := r.postToHumanLayer(ctx, trtc, contactChannel, apiKey) + if err != nil { + errorMsg := fmt.Errorf("HumanLayer request failed with status code: %d", statusCode) + if err != nil { + errorMsg = fmt.Errorf("HumanLayer request failed with status code %d: %v", statusCode, err) + } + result, errStatus, _ := r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval, + "HumanLayerRequestFailed", trtc, errorMsg) + return result, errStatus + } - trtcNamespace := trtc.Namespace - contactChannel, err := r.getContactChannel(ctx, mcpServer, trtcNamespace) - if err != nil { - return r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval, "NoContactChannel", &trtc, err) - } + // Update with call ID and requeue + callId := functionCall.GetCallId() + trtc.Status.ExternalCallID = callId + if err := r.Status().Update(ctx, trtc); err != nil { + logger.Error(err, "Failed to update TaskRunToolCall status") + return ctrl.Result{}, err + } - trtc.Status.Status = "AwaitingHumanApproval" - trtc.Status.StatusDetail = fmt.Sprintf("Waiting for human approval via contact channel %s", mcpServer.Spec.ApprovalContactChannel.Name) - r.recorder.Event(&trtc, corev1.EventTypeNormal, "AwaitingHumanApproval", - fmt.Sprintf("Tool execution requires approval via contact channel %s", mcpServer.Spec.ApprovalContactChannel.Name)) + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil +} - if err := r.Status().Update(ctx, &trtc); err != nil { - logger.Error(err, "Failed to update TaskRunToolCall status") - return ctrl.Result{}, err - } +// handleMCPApprovalFlow encapsulates the MCP approval flow logic +func (r *TaskRunToolCallReconciler) handleMCPApprovalFlow(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall) (result ctrl.Result, err error, handled bool) { + // We've already been through the approval flow and are ready to execute the tool + if trtc.Status.Status == kubechainv1alpha1.TaskRunToolCallStatusTypeReadyToExecuteApprovedTool { + return ctrl.Result{}, nil, false + } - apiKey, err := r.getHumanLayerAPIKey(ctx, contactChannel.Spec.APIKeyFrom.SecretKeyRef.Name, contactChannel.Spec.APIKeyFrom.SecretKeyRef.Key, trtcNamespace) + // Check if this is an MCP tool and needs approval + mcpServer, needsApproval, err := r.getMCPServer(ctx, trtc) + if err != nil { + return ctrl.Result{}, err, true + } - if err != nil || apiKey == "" { - return r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval, "NoAPIKey", &trtc, err) - } + // If not an MCP tool or no approval needed, continue with normal processing + if mcpServer == nil || !needsApproval { + return ctrl.Result{}, nil, false + } - if r.HLClient == nil { - err := fmt.Errorf("HLClient not initialized") - return r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval, "NoHumanLayerClient", &trtc, err) - } + // Get contact channel and API key information + trtcNamespace := trtc.Namespace + contactChannel, err := r.getContactChannel(ctx, mcpServer, trtcNamespace) + if err != nil { + result, errStatus, _ := r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval, + "NoContactChannel", trtc, err) + return result, errStatus, true + } - statusCode, err := r.postToHumanLayer(ctx, &trtc, contactChannel, apiKey) + apiKey, err := r.getHumanLayerAPIKey(ctx, + contactChannel.Spec.APIKeyFrom.SecretKeyRef.Name, + contactChannel.Spec.APIKeyFrom.SecretKeyRef.Key, + trtcNamespace) - if statusCode != 200 { - errorMsg := fmt.Errorf("HumanLayer request failed with status code: %d", statusCode) - if err != nil { - errorMsg = fmt.Errorf("HumanLayer request failed with status code %d: %v", statusCode, err) - } - return r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval, "HumanLayerRequestFailed", &trtc, errorMsg) - } + if err != nil || apiKey == "" { + result, errStatus, _ := r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval, + "NoAPIKey", trtc, err) + return result, errStatus, true + } - return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + // Handle pending approval check first + if trtc.Status.Status == kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval { + result, err, handled := r.handlePendingApproval(ctx, trtc, apiKey) + if handled { + return result, err, true } } - // Step 4: Parse arguments - args, err := r.parseArguments(ctx, &trtc) - if err != nil { - return ctrl.Result{}, err - } + // Request human approval if not already done + result, err = r.requestHumanApproval(ctx, trtc, contactChannel, apiKey, mcpServer) + return result, err, true +} - // Step 5: Handle MCP tool execution if applicable +// dispatchToolExecution routes tool execution to the appropriate handler based on tool type +func (r *TaskRunToolCallReconciler) dispatchToolExecution(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall, + args map[string]interface{}, +) (ctrl.Result, error) { + // Check for MCP tool first + serverName, mcpToolName, isMCP := isMCPTool(trtc.Spec.ToolRef.Name) if isMCP && r.MCPManager != nil { - return r.processMCPTool(ctx, &trtc, serverName, mcpToolName, args) + return r.processMCPTool(ctx, trtc, serverName, mcpToolName, args) } - // Step 6: Get traditional Tool resource - tool, toolType, err := r.getTraditionalTool(ctx, &trtc) + // Get traditional Tool resource + tool, toolType, err := r.getTraditionalTool(ctx, trtc) if err != nil { return ctrl.Result{}, err } - // Step 7: Process based on tool type + // Dispatch based on tool type switch toolType { case "delegateToAgent": - return r.processDelegateToAgent(ctx, &trtc) + return r.processDelegateToAgent(ctx, trtc) case "function": - return r.processBuiltinFunction(ctx, &trtc, tool, args) + return r.processBuiltinFunction(ctx, trtc, tool, args) case "externalAPI": - return r.processExternalAPI(ctx, &trtc, tool) + return r.processExternalAPI(ctx, trtc, tool) default: - return r.handleUnsupportedToolType(ctx, &trtc) + return r.handleUnsupportedToolType(ctx, trtc) } } +// Reconcile processes TaskRunToolCall objects. +func (r *TaskRunToolCallReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + // Get the TaskRunToolCall resource + var trtc kubechainv1alpha1.TaskRunToolCall + if err := r.Get(ctx, req.NamespacedName, &trtc); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + logger.Info("Reconciling TaskRunToolCall", "name", trtc.Name) + + // 1. Initialize status if not set + initialized, err, handled := r.initializeTRTC(ctx, &trtc) + if handled { + if err != nil { + return ctrl.Result{}, err + } + if initialized { + return ctrl.Result{}, nil + } + } + + // 2. Check for terminal error states - early return + if trtc.Status.Status == kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval { + return ctrl.Result{}, nil + } + + // 3. Check if already completed or has child TaskRun + done, err, handled := r.checkCompletedOrExisting(ctx, &trtc) + if handled { + if err != nil { + return ctrl.Result{}, err + } + if done { + return ctrl.Result{}, nil + } + } + + // 4. Handle MCP approval flow + result, err, handled := r.handleMCPApprovalFlow(ctx, &trtc) + if handled { + return result, err + } + + // 5. Parse arguments for execution + args, err := r.parseArguments(ctx, &trtc) + if err != nil { + return ctrl.Result{}, err + } + + // 6. Execute the appropriate tool type + return r.dispatchToolExecution(ctx, &trtc, args) +} + func (r *TaskRunToolCallReconciler) SetupWithManager(mgr ctrl.Manager) error { r.recorder = mgr.GetEventRecorderFor("taskruntoolcall-controller") r.server = &http.Server{Addr: ":8080"} // Choose a port @@ -861,13 +995,13 @@ func (r *TaskRunToolCallReconciler) SetupWithManager(mgr ctrl.Manager) error { r.MCPManager = mcpmanager.NewMCPServerManager() } - if r.HLClient == nil { - client, err := humanlayer.NewHumanLayerClient("") + if r.HLClientFactory == nil { + client, err := humanlayer.NewHumanLayerClientFactory("") if err != nil { return err } - r.HLClient = client + r.HLClientFactory = client } go func() { diff --git a/kubechain/internal/controller/taskruntoolcall/taskruntoolcall_controller_test.go b/kubechain/internal/controller/taskruntoolcall/taskruntoolcall_controller_test.go index 9533c4b..f2e89f2 100644 --- a/kubechain/internal/controller/taskruntoolcall/taskruntoolcall_controller_test.go +++ b/kubechain/internal/controller/taskruntoolcall/taskruntoolcall_controller_test.go @@ -223,10 +223,10 @@ var _ = Describe("TaskRunToolCall Controller", func() { }) // Tests for approval workflow - Context("Pending -> AwaitingHumanApproval (MCP Tool)", func() { + Context("Pending -> AwaitingHumanApproval (MCP Tool, Slack Contact Channel)", func() { It("transitions to AwaitingHumanApproval when MCPServer has approval channel", func() { // Note setupTestApprovalResources sets up the MCP server, MCP tool, and TaskRunToolCall - trtc, teardown := setupTestApprovalResources(ctx) + trtc, teardown := setupTestApprovalResources(ctx, nil) defer teardown() By("reconciling the taskruntoolcall that uses MCP tool with approval") @@ -236,7 +236,7 @@ var _ = Describe("TaskRunToolCall Controller", func() { NeedsApproval: true, } - reconciler.HLClient = &humanlayer.MockHumanLayerClient{ + reconciler.HLClientFactory = &humanlayer.MockHumanLayerClientFactory{ ShouldFail: false, StatusCode: 200, ReturnError: nil, @@ -275,20 +275,223 @@ var _ = Describe("TaskRunToolCall Controller", func() { }) }) + Context("Pending -> AwaitingHumanApproval (MCP Tool, Email Contact Channel)", func() { + It("transitions to AwaitingHumanApproval when MCPServer has email approval channel", func() { + // Set up resources with email contact channel + trtc, teardown := setupTestApprovalResources(ctx, &SetupTestApprovalConfig{ + ContactChannelType: "email", + }) + defer teardown() + + By("reconciling the taskruntoolcall that uses MCP tool with email approval") + reconciler, recorder := reconciler() + + reconciler.MCPManager = &MockMCPManager{ + NeedsApproval: true, + } + + reconciler.HLClientFactory = &humanlayer.MockHumanLayerClientFactory{ + ShouldFail: false, + StatusCode: 200, + ReturnError: nil, + } + + result, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: trtc.Name, + Namespace: trtc.Namespace, + }, + }) + + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(Equal(5 * time.Second)) // Should requeue after 5 seconds + + By("checking the taskruntoolcall status is set to AwaitingHumanApproval") + updatedTRTC := &kubechainv1alpha1.TaskRunToolCall{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: trtc.Name, + Namespace: trtc.Namespace, + }, updatedTRTC) + + Expect(err).NotTo(HaveOccurred()) + Expect(updatedTRTC.Status.Phase).To(Equal(kubechainv1alpha1.TaskRunToolCallPhasePending)) + Expect(updatedTRTC.Status.Status).To(Equal(kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval)) + Expect(updatedTRTC.Status.StatusDetail).To(ContainSubstring("Waiting for human approval via contact channel")) + + By("checking that appropriate events were emitted") + utils.ExpectRecorder(recorder).ToEmitEventContaining("AwaitingHumanApproval") + Expect(updatedTRTC.Status.Status).To(Equal(kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval)) + + By("verifying the contact channel type is email") + var contactChannel kubechainv1alpha1.ContactChannel + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: testContactChannel.name, + Namespace: "default", + }, &contactChannel) + Expect(err).NotTo(HaveOccurred()) + Expect(contactChannel.Spec.Type).To(Equal(kubechainv1alpha1.ContactChannelTypeEmail)) + }) + }) + + Context("AwaitingHumanApproval -> ReadyToExecuteApprovedTool", func() { + It("transitions from AwaitingHumanApproval to ReadyToExecuteApprovedTool when MCP tool is approved", func() { + trtc, teardown := setupTestApprovalResources(ctx, &SetupTestApprovalConfig{ + TaskRunToolCallStatus: &kubechainv1alpha1.TaskRunToolCallStatus{ + ExternalCallID: "call-ready-to-execute-test", + Phase: kubechainv1alpha1.TaskRunToolCallPhasePending, + Status: kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval, + StatusDetail: "Waiting for human approval via contact channel", + StartTime: &metav1.Time{Time: time.Now().Add(-1 * time.Minute)}, + }, + }) + defer teardown() + + By("reconciling the trtc against an approval-granting HumanLayer client") + + reconciler, _ := reconciler() + + reconciler.MCPManager = &MockMCPManager{ + NeedsApproval: true, + } + + reconciler.HLClientFactory = &humanlayer.MockHumanLayerClientFactory{ + ShouldFail: false, + StatusCode: 200, + ReturnError: nil, + ShouldReturnApproval: true, + } + + result, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: trtc.Name, + Namespace: trtc.Namespace, + }, + }) + + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + + By("checking the taskruntoolcall status is set to ReadyToExecuteApprovedTool") + updatedTRTC := &kubechainv1alpha1.TaskRunToolCall{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: trtc.Name, + Namespace: trtc.Namespace, + }, updatedTRTC) + + Expect(err).NotTo(HaveOccurred()) + Expect(updatedTRTC.Status.Phase).To(Equal(kubechainv1alpha1.TaskRunToolCallPhasePending)) + Expect(updatedTRTC.Status.Status).To(Equal(kubechainv1alpha1.TaskRunToolCallStatusTypeReadyToExecuteApprovedTool)) + Expect(updatedTRTC.Status.StatusDetail).To(ContainSubstring("Ready to execute approved tool")) + }) + }) + + Context("AwaitingHumanApproval -> ToolCallRejected", func() { + It("transitions from AwaitingHumanApproval to ToolCallRejected when MCP tool is rejected", func() { + trtc, teardown := setupTestApprovalResources(ctx, &SetupTestApprovalConfig{ + TaskRunToolCallStatus: &kubechainv1alpha1.TaskRunToolCallStatus{ + ExternalCallID: "call-tool-call-rejected-test", + Phase: kubechainv1alpha1.TaskRunToolCallPhasePending, + Status: kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval, + StatusDetail: "Waiting for human approval via contact channel", + StartTime: &metav1.Time{Time: time.Now().Add(-1 * time.Minute)}, + }, + }) + defer teardown() + + By("reconciling the trtc against an approval-rejecting HumanLayer client") + + reconciler, _ := reconciler() + + reconciler.MCPManager = &MockMCPManager{ + NeedsApproval: true, + } + + reconciler.HLClientFactory = &humanlayer.MockHumanLayerClientFactory{ + ShouldFail: false, + StatusCode: 200, + ReturnError: nil, + ShouldReturnRejection: true, + } + + result, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: trtc.Name, + Namespace: trtc.Namespace, + }, + }) + + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + + By("checking the taskruntoolcall status is set to ToolCallRejected") + updatedTRTC := &kubechainv1alpha1.TaskRunToolCall{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: trtc.Name, + Namespace: trtc.Namespace, + }, updatedTRTC) + + Expect(err).NotTo(HaveOccurred()) + Expect(updatedTRTC.Status.Phase).To(Equal(kubechainv1alpha1.TaskRunToolCallPhaseFailed)) + Expect(updatedTRTC.Status.Status).To(Equal(kubechainv1alpha1.TaskRunToolCallStatusTypeToolCallRejected)) + Expect(updatedTRTC.Status.StatusDetail).To(ContainSubstring("Tool execution rejected")) + }) + }) + + Context("ReadyToExecuteApprovedTool -> Succeeded", func() { + It("transitions from ReadyToExecuteApprovedTool to Succeeded when a tool is executed", func() { + trtc, teardown := setupTestApprovalResources(ctx, &SetupTestApprovalConfig{ + TaskRunToolCallStatus: &kubechainv1alpha1.TaskRunToolCallStatus{ + ExternalCallID: "call-ready-to-execute-test", + Phase: kubechainv1alpha1.TaskRunToolCallPhasePending, + Status: kubechainv1alpha1.TaskRunToolCallStatusTypeReadyToExecuteApprovedTool, + StatusDetail: "Ready to execute tool, with great vigor", + StartTime: &metav1.Time{Time: time.Now().Add(-1 * time.Minute)}, + }, + }) + defer teardown() + + By("reconciling the trtc against an approval-granting HumanLayer client") + + reconciler, _ := reconciler() + + result, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: trtc.Name, + Namespace: trtc.Namespace, + }, + }) + + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + + By("checking the taskruntoolcall status is set to Ready:Succeeded") + updatedTRTC := &kubechainv1alpha1.TaskRunToolCall{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: trtc.Name, + Namespace: trtc.Namespace, + }, updatedTRTC) + + Expect(err).NotTo(HaveOccurred()) + Expect(updatedTRTC.Status.Phase).To(Equal(kubechainv1alpha1.TaskRunToolCallPhaseSucceeded)) + Expect(updatedTRTC.Status.Status).To(Equal(kubechainv1alpha1.TaskRunToolCallStatusTypeReady)) + Expect(updatedTRTC.Status.Result).To(Equal("5")) // From our mock implementation + }) + }) + Context("Pending -> ErrorRequestingHumanApproval (MCP Tool)", func() { It("transitions to ErrorRequestingHumanApproval when request to HumanLayer fails", func() { // Note setupTestApprovalResources sets up the MCP server, MCP tool, and TaskRunToolCall - trtc, teardown := setupTestApprovalResources(ctx) + trtc, teardown := setupTestApprovalResources(ctx, nil) defer teardown() By("reconciling the taskruntoolcall that uses MCP tool with approval") reconciler, _ := reconciler() reconciler.MCPManager = &MockMCPManager{ - NeedsApproval: true, + NeedsApproval: false, } - reconciler.HLClient = &humanlayer.MockHumanLayerClient{ + reconciler.HLClientFactory = &humanlayer.MockHumanLayerClientFactory{ ShouldFail: true, StatusCode: 500, ReturnError: fmt.Errorf("While taking pizzas from the kitchen to the lobby, Pete passed through the server room where he tripped over a network cable and now there's pizza all over the place. Also this request failed. No more pizza in the server room Pete."), diff --git a/kubechain/internal/controller/taskruntoolcall/utils_test.go b/kubechain/internal/controller/taskruntoolcall/utils_test.go index 875a182..be3ef2c 100644 --- a/kubechain/internal/controller/taskruntoolcall/utils_test.go +++ b/kubechain/internal/controller/taskruntoolcall/utils_test.go @@ -22,7 +22,7 @@ var addTool = &TestTool{ var testContactChannel = &TestContactChannel{ name: "test-contact-channel", - channelType: "slack", + channelType: kubechainv1alpha1.ContactChannelTypeSlack, secretName: testSecret.name, } @@ -64,7 +64,7 @@ type TestSecret struct { // TestContactChannel represents a test ContactChannel resource type TestContactChannel struct { name string - channelType string + channelType kubechainv1alpha1.ContactChannelType secretName string contactChannel *kubechainv1alpha1.ContactChannel } @@ -77,7 +77,7 @@ func (t *TestContactChannel) Setup(ctx context.Context) *kubechainv1alpha1.Conta Namespace: "default", }, Spec: kubechainv1alpha1.ContactChannelSpec{ - ChannelType: t.channelType, + Type: t.channelType, APIKeyFrom: kubechainv1alpha1.APIKeySource{ SecretKeyRef: kubechainv1alpha1.SecretKeyRef{ Name: t.secretName, @@ -89,11 +89,11 @@ func (t *TestContactChannel) Setup(ctx context.Context) *kubechainv1alpha1.Conta // Add specific config based on channel type if t.channelType == "slack" { - contactChannel.Spec.SlackConfig = &kubechainv1alpha1.SlackChannelConfig{ + contactChannel.Spec.Slack = &kubechainv1alpha1.SlackChannelConfig{ ChannelOrUserID: "C12345678", } } else if t.channelType == "email" { - contactChannel.Spec.EmailConfig = &kubechainv1alpha1.EmailChannelConfig{ + contactChannel.Spec.Email = &kubechainv1alpha1.EmailChannelConfig{ Address: "test@example.com", } } @@ -375,11 +375,32 @@ func reconciler() (*TaskRunToolCallReconciler, *record.FakeRecorder) { return reconciler, recorder } +// SetupTestApprovalConfig contains optional configuration for setupTestApprovalResources +type SetupTestApprovalConfig struct { + TaskRunToolCallStatus *kubechainv1alpha1.TaskRunToolCallStatus + TaskRunToolCallName string + TaskRunToolCallArgs string + ContactChannelType kubechainv1alpha1.ContactChannelType +} + // setupTestApprovalResources sets up all resources needed for testing approval -func setupTestApprovalResources(ctx context.Context) (*kubechainv1alpha1.TaskRunToolCall, func()) { +func setupTestApprovalResources(ctx context.Context, config *SetupTestApprovalConfig) (*kubechainv1alpha1.TaskRunToolCall, func()) { By("creating the secret") testSecret.Setup(ctx) By("creating the contact channel") + + // Set contact channel type based on config or default to ContactChannelTypeSlack + channelType := kubechainv1alpha1.ContactChannelTypeSlack + if config != nil && config.ContactChannelType != "" { + switch config.ContactChannelType { + case "email": + channelType = kubechainv1alpha1.ContactChannelTypeEmail + default: + channelType = kubechainv1alpha1.ContactChannelTypeSlack + } + } + + testContactChannel.channelType = channelType testContactChannel.SetupWithStatus(ctx, kubechainv1alpha1.ContactChannelStatus{ Ready: true, Status: "Ready", @@ -395,18 +416,35 @@ func setupTestApprovalResources(ctx context.Context) (*kubechainv1alpha1.TaskRun Status: "Ready", }) + name := "test-mcp-with-approval-trtc" + args := `{"a": 2, "b": 3}` + if config != nil { + if config.TaskRunToolCallName != "" { + name = config.TaskRunToolCallName + } + if config.TaskRunToolCallArgs != "" { + args = config.TaskRunToolCallArgs + } + } + taskRunToolCall := &TestTaskRunToolCall{ - name: "test-mcp-with-approval-trtc", + name: name, toolName: mcpTool.Spec.Name, - arguments: `{"a": 2, "b": 3}`, + arguments: args, } - trtc := taskRunToolCall.SetupWithStatus(ctx, kubechainv1alpha1.TaskRunToolCallStatus{ + status := kubechainv1alpha1.TaskRunToolCallStatus{ Phase: kubechainv1alpha1.TaskRunToolCallPhasePending, Status: "Pending", StatusDetail: "Ready for execution", StartTime: &metav1.Time{Time: time.Now().Add(-1 * time.Minute)}, - }) + } + + if config != nil && config.TaskRunToolCallStatus != nil { + status = *config.TaskRunToolCallStatus + } + + trtc := taskRunToolCall.SetupWithStatus(ctx, status) return trtc, func() { testMCPTool.Teardown(ctx) diff --git a/kubechain/internal/humanlayer/hlclient.go b/kubechain/internal/humanlayer/hlclient.go index ab750d7..10f8ed6 100644 --- a/kubechain/internal/humanlayer/hlclient.go +++ b/kubechain/internal/humanlayer/hlclient.go @@ -12,10 +12,10 @@ import ( humanlayerapi "github.com/humanlayer/smallchain/kubechain/internal/humanlayerapi" ) -// NewHumanLayerClient creates a new API client using either the provided API key +// NewHumanLayerClientFactory creates a new API client using either the provided API key // or falling back to the HUMANLAYER_API_KEY environment variable. Similarly, // it uses the provided API base URL or falls back to HUMANLAYER_API_BASE. -func NewHumanLayerClient(optionalApiBase string) (HumanLayerClientInterface, error) { +func NewHumanLayerClientFactory(optionalApiBase string) (HumanLayerClientFactory, error) { config := humanlayerapi.NewConfiguration() // Get API base from parameter or environment variable @@ -42,27 +42,32 @@ func NewHumanLayerClient(optionalApiBase string) (HumanLayerClientInterface, err }, } + // Enable debug mode to log request/response + // config.Debug = true + // Create the API client with the configuration client := humanlayerapi.NewAPIClient(config) - return &HumanLayerClient{client: client}, nil + return &RealHumanLayerClientFactory{client: client}, nil } -type HumanLayerClientWrapperInterface interface { +type HumanLayerClientWrapper interface { SetSlackConfig(slackConfig *kubechainv1alpha1.SlackChannelConfig) SetEmailConfig(emailConfig *kubechainv1alpha1.EmailChannelConfig) SetFunctionCallSpec(functionName string, args map[string]interface{}) SetCallID(callID string) SetRunID(runID string) SetAPIKey(apiKey string) + RequestApproval(ctx context.Context) (functionCall *humanlayerapi.FunctionCallOutput, statusCode int, err error) + GetFunctionCallStatus(ctx context.Context) (functionCall *humanlayerapi.FunctionCallOutput, statusCode int, err error) } -type HumanLayerClientInterface interface { - NewHumanLayerClient() HumanLayerClientWrapperInterface +type HumanLayerClientFactory interface { + NewHumanLayerClient() HumanLayerClientWrapper } -type HumanLayerClientWrapper struct { +type RealHumanLayerClientWrapper struct { client *humanlayerapi.APIClient slackChannelInput *humanlayerapi.SlackContactChannelInput emailContactChannel *humanlayerapi.EmailContactChannel @@ -72,17 +77,17 @@ type HumanLayerClientWrapper struct { apiKey string } -type HumanLayerClient struct { +type RealHumanLayerClientFactory struct { client *humanlayerapi.APIClient } -func (h *HumanLayerClient) NewHumanLayerClient() HumanLayerClientWrapperInterface { - return &HumanLayerClientWrapper{ +func (h *RealHumanLayerClientFactory) NewHumanLayerClient() HumanLayerClientWrapper { + return &RealHumanLayerClientWrapper{ client: h.client, } } -func (h *HumanLayerClientWrapper) SetSlackConfig(slackConfig *kubechainv1alpha1.SlackChannelConfig) { +func (h *RealHumanLayerClientWrapper) SetSlackConfig(slackConfig *kubechainv1alpha1.SlackChannelConfig) { slackChannelInput := humanlayerapi.NewSlackContactChannelInput(slackConfig.ChannelOrUserID) if slackConfig.ContextAboutChannelOrUser != "" { @@ -92,7 +97,7 @@ func (h *HumanLayerClientWrapper) SetSlackConfig(slackConfig *kubechainv1alpha1. h.slackChannelInput = slackChannelInput } -func (h *HumanLayerClientWrapper) SetEmailConfig(emailConfig *kubechainv1alpha1.EmailChannelConfig) { +func (h *RealHumanLayerClientWrapper) SetEmailConfig(emailConfig *kubechainv1alpha1.EmailChannelConfig) { emailContactChannel := humanlayerapi.NewEmailContactChannel(emailConfig.Address) if emailConfig.ContextAboutUser != "" { @@ -102,26 +107,26 @@ func (h *HumanLayerClientWrapper) SetEmailConfig(emailConfig *kubechainv1alpha1. h.emailContactChannel = emailContactChannel } -func (h *HumanLayerClientWrapper) SetFunctionCallSpec(functionName string, args map[string]interface{}) { +func (h *RealHumanLayerClientWrapper) SetFunctionCallSpec(functionName string, args map[string]interface{}) { // Create the function call input with required parameters functionCallSpecInput := humanlayerapi.NewFunctionCallSpecInput(functionName, args) h.functionCallSpecInput = functionCallSpecInput } -func (h *HumanLayerClientWrapper) SetCallID(callID string) { +func (h *RealHumanLayerClientWrapper) SetCallID(callID string) { h.callID = callID } -func (h *HumanLayerClientWrapper) SetRunID(runID string) { +func (h *RealHumanLayerClientWrapper) SetRunID(runID string) { h.runID = runID } -func (h *HumanLayerClientWrapper) SetAPIKey(apiKey string) { +func (h *RealHumanLayerClientWrapper) SetAPIKey(apiKey string) { h.apiKey = apiKey } -func (h *HumanLayerClientWrapper) RequestApproval(ctx context.Context) (functionCall *humanlayerapi.FunctionCallOutput, statusCode int, err error) { +func (h *RealHumanLayerClientWrapper) RequestApproval(ctx context.Context) (functionCall *humanlayerapi.FunctionCallOutput, statusCode int, err error) { channel := humanlayerapi.NewContactChannelInput() if h.slackChannelInput != nil { @@ -142,3 +147,11 @@ func (h *HumanLayerClientWrapper) RequestApproval(ctx context.Context) (function return functionCall, resp.StatusCode, err } + +func (h *RealHumanLayerClientWrapper) GetFunctionCallStatus(ctx context.Context) (functionCall *humanlayerapi.FunctionCallOutput, statusCode int, err error) { + functionCall, resp, err := h.client.DefaultAPI.GetFunctionCallStatus(ctx, h.callID). + Authorization("Bearer " + h.apiKey). + Execute() + + return functionCall, resp.StatusCode, err +} diff --git a/kubechain/internal/humanlayer/mock_hlclient.go b/kubechain/internal/humanlayer/mock_hlclient.go index b8ac475..66841c2 100644 --- a/kubechain/internal/humanlayer/mock_hlclient.go +++ b/kubechain/internal/humanlayer/mock_hlclient.go @@ -2,26 +2,29 @@ package humanlayer import ( "context" + "time" kubechainv1alpha1 "github.com/humanlayer/smallchain/kubechain/api/v1alpha1" humanlayerapi "github.com/humanlayer/smallchain/kubechain/internal/humanlayerapi" ) -// MockHumanLayerClient implements HumanLayerClientInterface for testing -type MockHumanLayerClient struct { - ShouldFail bool - StatusCode int - ReturnError error - LastAPIKey string - LastCallID string - LastRunID string - LastFunction string - LastArguments map[string]interface{} +// MockHumanLayerClientFactory implements HumanLayerClientFactory for testing +type MockHumanLayerClientFactory struct { + ShouldFail bool + StatusCode int + ReturnError error + ShouldReturnApproval bool + ShouldReturnRejection bool + LastAPIKey string + LastCallID string + LastRunID string + LastFunction string + LastArguments map[string]interface{} } -// MockHumanLayerClientWrapper implements HumanLayerClientWrapperInterface for testing +// MockHumanLayerClientWrapper implements HumanLayerClientWrapper for testing type MockHumanLayerClientWrapper struct { - parent *MockHumanLayerClient + parent *MockHumanLayerClientFactory slackConfig *kubechainv1alpha1.SlackChannelConfig emailConfig *kubechainv1alpha1.EmailChannelConfig functionName string @@ -32,53 +35,85 @@ type MockHumanLayerClientWrapper struct { } // NewHumanLayerClient creates a new mock client -func NewMockHumanLayerClient(shouldFail bool, statusCode int, returnError error) *MockHumanLayerClient { - return &MockHumanLayerClient{ +func NewMockHumanLayerClient(shouldFail bool, statusCode int, returnError error) *MockHumanLayerClientFactory { + return &MockHumanLayerClientFactory{ ShouldFail: shouldFail, StatusCode: statusCode, ReturnError: returnError, } } -// NewHumanLayerClient implements HumanLayerClientInterface -func (m *MockHumanLayerClient) NewHumanLayerClient() HumanLayerClientWrapperInterface { +// NewHumanLayerClient implements HumanLayerClientFactory +func (m *MockHumanLayerClientFactory) NewHumanLayerClient() HumanLayerClientWrapper { return &MockHumanLayerClientWrapper{ parent: m, } } -// SetSlackConfig implements HumanLayerClientWrapperInterface +// SetSlackConfig implements HumanLayerClientWrapper func (m *MockHumanLayerClientWrapper) SetSlackConfig(slackConfig *kubechainv1alpha1.SlackChannelConfig) { m.slackConfig = slackConfig } -// SetEmailConfig implements HumanLayerClientWrapperInterface +// SetEmailConfig implements HumanLayerClientWrapper func (m *MockHumanLayerClientWrapper) SetEmailConfig(emailConfig *kubechainv1alpha1.EmailChannelConfig) { m.emailConfig = emailConfig } -// SetFunctionCallSpec implements HumanLayerClientWrapperInterface +// SetFunctionCallSpec implements HumanLayerClientWrapper func (m *MockHumanLayerClientWrapper) SetFunctionCallSpec(functionName string, args map[string]interface{}) { m.functionName = functionName m.functionArgs = args } -// SetCallID implements HumanLayerClientWrapperInterface +// SetCallID implements HumanLayerClientWrapper func (m *MockHumanLayerClientWrapper) SetCallID(callID string) { m.callID = callID } -// SetRunID implements HumanLayerClientWrapperInterface +// SetRunID implements HumanLayerClientWrapper func (m *MockHumanLayerClientWrapper) SetRunID(runID string) { m.runID = runID } -// SetAPIKey implements HumanLayerClientWrapperInterface +// SetAPIKey implements HumanLayerClientWrapper func (m *MockHumanLayerClientWrapper) SetAPIKey(apiKey string) { m.apiKey = apiKey } -// RequestApproval implements HumanLayerClientWrapperInterface +// GetFunctionCallStatus implements HumanLayerClientWrapper +func (m *MockHumanLayerClientWrapper) GetFunctionCallStatus(ctx context.Context) (*humanlayerapi.FunctionCallOutput, int, error) { + + if m.parent.ShouldReturnApproval { + now := time.Now() + approved := true + status := humanlayerapi.NewNullableFunctionCallStatus(&humanlayerapi.FunctionCallStatus{ + RequestedAt: *humanlayerapi.NewNullableTime(&now), + RespondedAt: *humanlayerapi.NewNullableTime(&now), + Approved: *humanlayerapi.NewNullableBool(&approved), + }) + return &humanlayerapi.FunctionCallOutput{ + Status: *status, + }, 200, nil + } + + if m.parent.ShouldReturnRejection { + now := time.Now() + approved := false + status := humanlayerapi.NewNullableFunctionCallStatus(&humanlayerapi.FunctionCallStatus{ + RequestedAt: *humanlayerapi.NewNullableTime(&now), + RespondedAt: *humanlayerapi.NewNullableTime(&now), + Approved: *humanlayerapi.NewNullableBool(&approved), + }) + return &humanlayerapi.FunctionCallOutput{ + Status: *status, + }, 200, nil + } + + return nil, m.parent.StatusCode, m.parent.ReturnError +} + +// RequestApproval implements HumanLayerClientWrapper func (m *MockHumanLayerClientWrapper) RequestApproval(ctx context.Context) (*humanlayerapi.FunctionCallOutput, int, error) { // Store the values in the parent for test verification m.parent.LastAPIKey = m.apiKey diff --git a/kubechain/internal/humanlayerapi/utils.go b/kubechain/internal/humanlayerapi/utils.go index 584f71e..c7df67a 100644 --- a/kubechain/internal/humanlayerapi/utils.go +++ b/kubechain/internal/humanlayerapi/utils.go @@ -327,7 +327,44 @@ func (v NullableTime) MarshalJSON() ([]byte, error) { func (v *NullableTime) UnmarshalJSON(src []byte) error { v.isSet = true - return json.Unmarshal(src, &v.value) + + // Handle null case + if string(src) == "null" { + v.value = nil + return nil + } + + // Try standard time parsing first + var stdTime time.Time + err := json.Unmarshal(src, &stdTime) + if err == nil { + v.value = &stdTime + return nil + } + + // If standard parsing fails, try to parse as RFC3339Nano without timezone + s := string(src) + if len(s) >= 2 { + // Remove quotes from the string + s = s[1 : len(s)-1] + + // Try various time formats + formats := []string{ + "2006-01-02T15:04:05.999999", // Missing timezone + "2006-01-02T15:04:05", // No fractional seconds or timezone + time.RFC3339, // Standard format with timezone + time.RFC3339Nano, // Standard format with nanoseconds + } + + for _, format := range formats { + if t, err := time.Parse(format, s); err == nil { + v.value = &t + return nil + } + } + } + + return fmt.Errorf("cannot parse %s as time.Time", string(src)) } // IsNil checks if an input is nil