diff --git a/CLAUDE.md b/CLAUDE.md
index 3bd59e2..21ef568 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -172,6 +172,75 @@ Alternatively, clean up components individually:
- Ensure the PROJECT file contains entries for all resources before running `make manifests`
- Follow the detailed guidance in the [Kubebuilder Guide](/kubechain/docs/kubebuilder-guide.md)
+### Kubernetes Resource Design
+
+#### Don't use "config" in field names:
+
+Bad:
+
+```
+spec:
+ slackConfig:
+ #...
+ emailConfig:
+ #...
+```
+
+Good:
+
+```
+spec:
+ slack:
+ # ...
+ email:
+ # ...
+```
+
+#### Prefer nil-able sub-objects over "type" fields:
+
+This is more guidelines than rules, just consider it in cases when you a Resource that is a union type. There's
+no great answer here because of how Go handles unions. (maybe the state-of-the-art has progressed since the last time I checked) -- dex
+
+Bad:
+
+```
+spec:
+ type: slack
+ slack:
+ channelOrUserID: C1234567890
+```
+
+Good:
+
+```
+spec:
+ slack:
+ channelOrUserID: C1234567890
+```
+
+In code, instead of
+
+```
+switch (resource.Spec.Type) {
+ case "slack":
+ // ...
+ case "email":
+ // ...
+}
+```
+
+check which object is non-nil and use that:
+
+```
+if resource.Spec.Slack != nil {
+ // ...
+} else if resource.Spec.Email != nil {
+ // ...
+} else if {
+ // ...
+}
+```
+
### Markdown
- When writing markdown code blocks, do not indent the block, just use backticks to offset the code
diff --git a/README.md b/README.md
index 108b443..0fe8d43 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@
KubeChain is a cloud-native orchestrator for AI Agents built on Kubernetes. It supports [long-lived outer-loop agents](https://theouterloop.substack.com/p/openais-realtime-api-is-a-step-towards) that can process asynchronous execution of both LLM inference and long-running tool calls. It's designed for simplicity and gives strong durability and reliability guarantees for agents that make asynchronous tool calls like contacting humans or delegating work to other agents.
:warning: **Note** - KubeChain is experimental and some known issues and race conditions. Use at your own risk.
-
+
@@ -66,15 +66,11 @@ To run KubeChain locally on macos, you'll also need:
### Setting Up a Local Cluster
-
-
-1. **Create a Kind cluster**
-
```bash
kind create cluster
```
-2. **Add your OpenAI API key as a Kubernetes secret**
+### Add your OpenAI API key as a Kubernetes secret
```bash
kubectl create secret generic openai \
@@ -86,10 +82,10 @@ kubectl create secret generic openai \
> [!TIP]
-> For better visibility when running tutorial, we recommend starting
+> For better visibility when running tutorial, we recommend starting
> a stream to watch all the events as they're happening,
> for example:
->
+>
> ```bash
> kubectl get events --watch
> ```
@@ -155,24 +151,24 @@ graph RL
```
Check the created LLM:
-
+
```bash
kubectl get llm
```
-
+
Output:
```
NAME PROVIDER READY STATUS
gpt-4o openai true Ready
```
-
+
Using `-o wide` and `describe`
-
+
```bash
kubectl get llm -o wide
```
-
+
Output:
```
NAME PROVIDER READY STATUS DETAIL
@@ -254,24 +250,24 @@ graph RL
```
Check the created Agent:
-
+
```bash
kubectl get agent
```
-
+
Output:
```
NAME READY STATUS
my-assistant true Ready
```
-
+
Using `-o wide` and `describe`
-
+
```bash
kubectl get agent -o wide
```
-
+
Output:
```
NAME READY STATUS DETAIL
@@ -359,21 +355,21 @@ graph RL
end
```
Check the created Task:
-
+
```bash
kubectl get task
```
-
+
Output:
```
NAME READY STATUS AGENT MESSAGE
hello-world-task true Ready my-assistant What is the capital of the moon?
```
-
+
Using `-o wide` and `describe`
-
+
```bash
kubectl get task -o wide
```
@@ -464,7 +460,7 @@ graph RL
For now, our task run should complete quickly and return a FinalAnswer.
```bash
-kubectl get taskrun
+kubectl get taskrun
```
Output:
@@ -480,7 +476,7 @@ To get just the output, run
kubectl get taskrun -o jsonpath='{.items[*].status.output}'
```
-and you'll see
+and you'll see
> The Moon does not have a capital. It is a natural satellite of Earth and lacks any governmental structure or human habitation that would necessitate a capital city.
@@ -536,7 +532,7 @@ We saw above how you can get the status of a taskrun with `kubectl get taskrun`.
For more detailed information, like to see the full context window, you can use:
```bash
-kubectl describe taskrun
+kubectl describe taskrun
```
```
@@ -659,7 +655,7 @@ fetch true Ready
```bash
kubectl describe mcpserver
```
-Output:
+Output:
```
Name: fetch
@@ -784,7 +780,7 @@ spec:
EOF
```
-You should see some events in the output of
+You should see some events in the output of
```
kubectl get events --watch
@@ -807,7 +803,7 @@ kubectl get taskrun fetch-task-1 -o jsonpath='{.status.output}'
```
> The URL [https://swapi.dev/api/people/1](https://swapi.dev/api/people/1) contains the following data about a Star Wars character:
->
+>
> - **Name**: Luke Skywalker
> - **Height**: 172 cm
> - **Mass**: 77 kg
@@ -836,7 +832,7 @@ kubectl get taskrun fetch-task-1 -o jsonpath='{.status.output}'
and you can describe the taskrun to see the full context window and tool-calling turns
```
-kubectl describe taskrun fetch-task-1
+kubectl describe taskrun fetch-task-1
```
A simplified view of the taskrun:
@@ -866,11 +862,11 @@ flowchart TD
Task2[Task]
Agent2[Agent]
- UserMessage --> Task --> Agent --> LLM
+ UserMessage --> Task --> Agent --> LLM
Provider --> OpenAI
Secret --> Credentials
Credentials --> OpenAI
- OpenAI --> ToolCall-1
+ OpenAI --> ToolCall-1
ToolCall-1 --> Task2 --> Agent2 --> fetch
fetch --> ToolResponse-1
ToolResponse-1 --> OpenAI2[OpenAI]
@@ -1052,6 +1048,253 @@ Events:
That's it! Go add your favorite MCPs and start running durable agents in Kubernetes!
+### Incorporating Human Approval
+
+For certain classes of MCP tools, you may want to incorporate human approval into an agent's workflow.
+
+KubeChain supports this via [HumanLayer's](https://github.com/humanlayer/humanlayer) [contact channels](https://www.humanlayer.dev/docs/channels/introduction)
+to request human approval and input across Slack, email, and more.
+
+**Note**: Directly approving tool calls with `kubectl` or a `kubechain` CLI is planned but not yet supported.
+
+**Note**: We recommend running through the above examples first prior exploring this section. Several Kubernetes resources created in that section will be assumed to exist. If you'd like to see a full running version of the Kubernetes configuration used in this section, check out [kubechain_v1alpha_contactchannel_with_approval.yaml](./config/samples/kubechain_v1alpha_contactchannel_with_approval.yaml)
+
+You'll need a HumanLayer API key to get started:
+
+```bash
+kubectl create secret generic humanlayer --from-literal=humanlayer-api-key=$HUMANLAYER_API_KEY
+```
+
+Next, create a ContactChannel resource. In this example, we'll use an email contact channel (be sure to swap the `approver@example.com` address for a real target email address):
+
+```bash
+cat <
+API Version: kubechain.humanlayer.dev/v1alpha1
+Kind: TaskRunToolCall
+Metadata:
+ Creation Timestamp: 2025-04-01T16:09:02Z
+ Generation: 1
+ Owner References:
+ API Version: kubechain.humanlayer.dev/v1alpha1
+ Controller: true
+ Kind: TaskRun
+ Name: approved-fetch-task-1
+ UID: 52893dec-c5a5-424d-983f-13a89215b084
+ Resource Version: 91939
+ UID: 3f8c4eaf-0e46-44f6-9741-f32809747099
+Spec:
+ Arguments: {"url":"https://swapi.dev/api/people/2"}
+ Task Run Ref:
+ Name: approved-fetch-task-1
+ Tool Call Id: call_7PCkM1y2v8wFOC2vKtDrweor
+ Tool Ref:
+ Name: fetch__fetch
+Status:
+ External Call ID: ec-3257d3e
+ Phase: Pending
+ Start Time: 2025-04-01T16:09:02Z
+ Status: AwaitingHumanApproval
+ Status Detail: Waiting for human approval via contact channel approval-channel
+Events:
+ Type Reason Age From Message
+ ---- ------ ---- ---- -------
+ Normal AwaitingHumanApproval 2m42s taskruntoolcall-controller Tool execution requires approval via contact channel approval-channel
+ Normal HumanLayerRequestSent 2m41s taskruntoolcall-controller HumanLayer request sent
+```
+
+Note as well, at this point our `taskrun` has not completed. If we run `kubectl get taskrun approved-fetch-task-1` no `OUTPUT` has yet been returned.
+
+Go ahead and approve the email you should have received via HumanLayer requesting approval to run our `fetch` tool. After a few seconds, running `kubectl get taskruntoolcall approved-fetch-task-1-tc-01` should show our tool has been called. Additionally, if we run `kubectl describe taskrun approved-fetch-task-1`, we should see the following (truncated a bit for brevity):
+
+```
+$ kubectl describe taskrun approved-fetch-task-1
+Name: approved-fetch-task-1
+Kind: TaskRun
+Metadata:
+ Creation Timestamp: 2025-04-01T16:16:13Z
+ UID: 58c9d760-a160-4386-9d8d-ae9da0286125
+Spec:
+ Task Ref:
+ Name: approved-fetch-task
+Status:
+ Context Window:
+ Content: You are a helpful assistant. Your job is to help the user with their tasks.
+
+ Role: system
+ Content: Write me a haiku about the character found at https://swapi.dev/api/people/2?
+ Role: user
+ Content:
+ Role: assistant
+ Tool Calls:
+ Function:
+ Arguments: {"url":"https://swapi.dev/api/people/2"}
+ Name: fetch__fetch
+ Id: call_FZaXJq1FKuBVLYr9HHJwcnOb
+ Type: function
+ Content: Content type application/json cannot be simplified to markdown, but here is the raw content:
+Contents of https://swapi.dev/api/people/2:
+{"name":"C-3PO","height":"167","mass":"75","hair_color":"n/a","skin_color":"gold","eye_color":"yellow","birth_year":"112BBY","gender":"n/a","homeworld":"https://swapi.dev/api/planets/1/","films":["https://swapi.dev/api/films/1/","https://swapi.dev/api/films/2/","https://swapi.dev/api/films/3/","https://swapi.dev/api/films/4/","https://swapi.dev/api/films/5/","https://swapi.dev/api/films/6/"],"species":["https://swapi.dev/api/species/2/"],"vehicles":[],"starships":[],"created":"2014-12-10T15:10:51.357000Z","edited":"2014-12-20T21:17:50.309000Z","url":"https://swapi.dev/api/people/2/"}
+ Role: tool
+ Tool Call Id: call_FZaXJq1FKuBVLYr9HHJwcnOb
+ Content: Golden C-3PO,
+Speaks in many languages,
+Droid with gentle heart.
+ Role: assistant
+ Output: Golden C-3PO,
+Speaks in many languages,
+Droid with gentle heart.
+ Phase: FinalAnswer
+ Ready: true
+ Span Context:
+ Span ID: 3fd054c970f50fc1
+ Trace ID: 21e2b0e7457ae78cce4abaf1b1c02819
+ Status: Ready
+ Status Detail: LLM final response received
+Events:
+ Type Reason Age From Message
+ ---- ------ ---- ---- -------
+ Normal ValidationSucceeded 48s taskrun-controller Task validated successfully
+ Normal ToolCallsPending 47s taskrun-controller LLM response received, tool calls pending
+ Normal ToolCallCreated 47s taskrun-controller Created TaskRunToolCall approved-fetch-task-1-tc-01
+ Normal SendingContextWindowToLLM 7s (x2 over 48s) taskrun-controller Sending context window to LLM
+ Normal AllToolCallsCompleted 7s taskrun-controller All tool calls completed, ready to send tool results to LLM
+ Normal LLMFinalAnswer 6s taskrun-controller LLM response received successfully
+```
+
+### Using other Language Models
+
+So far we've been using the `gpt-4o` model from OpenAI. KubeChain also supports using other language models from OpenAI, Anthropic, Vertex, and more.
+
+
+Let's create a new LLM and Agent that uses the `claude-3-5-sonnet` model from Anthropic.
+
+#### Create a secret
+
+```
+cat < 0 {
logger.Info("Child TaskRun already exists", "childTaskRun", taskRunList.Items[0].Name)
// Optionally, sync status from child to parent.
- return true, nil
+ return true, nil, true
}
- return false, nil
+ return false, nil, false
}
// parseArguments parses the tool call arguments
-func (r *TaskRunToolCallReconciler) parseArguments(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall) (map[string]interface{}, error) {
+func (r *TaskRunToolCallReconciler) parseArguments(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall) (args map[string]interface{}, err error) {
logger := log.FromContext(ctx)
// Parse the arguments string as JSON (needed for both MCP and traditional tools)
- var args map[string]interface{}
if err := json.Unmarshal([]byte(trtc.Spec.Arguments), &args); err != nil {
logger.Error(err, "Failed to parse arguments")
trtc.Status.Status = StatusError
@@ -576,8 +576,8 @@ func (r *TaskRunToolCallReconciler) processExternalAPI(ctx context.Context, trtc
kwargs, _ = kwargsVal.(map[string]interface{})
}
- // Generate call ID
- callID := "call-" + uuid.New().String()
+ // Generate call ID - using a shorter format while maintaining uniqueness
+ callID := "c" + uuid.New().String()[:7]
// Prepare function call spec
functionSpec := map[string]interface{}{
@@ -681,9 +681,10 @@ func (r *TaskRunToolCallReconciler) getHumanLayerAPIKey(ctx context.Context, sec
}
//nolint:unparam
-func (r *TaskRunToolCallReconciler) setStatusError(ctx context.Context, trtcStatus kubechainv1alpha1.TaskRunToolCallStatusType, eventType string, trtc *kubechainv1alpha1.TaskRunToolCall, err error) (ctrl.Result, error) {
+func (r *TaskRunToolCallReconciler) setStatusError(ctx context.Context, trtcStatus kubechainv1alpha1.TaskRunToolCallStatusType, eventType string, trtc *kubechainv1alpha1.TaskRunToolCall, err error) (ctrl.Result, error, bool) {
+ trtcDeepCopy := trtc.DeepCopy()
logger := log.FromContext(ctx)
- trtc.Status.Status = trtcStatus
+ trtcDeepCopy.Status.Status = trtcStatus
// Handle nil error case
errorMessage := "Unknown error occurred"
@@ -691,27 +692,43 @@ func (r *TaskRunToolCallReconciler) setStatusError(ctx context.Context, trtcStat
errorMessage = err.Error()
}
- trtc.Status.StatusDetail = errorMessage
- trtc.Status.Error = errorMessage
- r.recorder.Event(trtc, corev1.EventTypeWarning, eventType, errorMessage)
+ trtcDeepCopy.Status.StatusDetail = errorMessage
+ trtcDeepCopy.Status.Error = errorMessage
+ r.recorder.Event(trtcDeepCopy, corev1.EventTypeWarning, eventType, errorMessage)
- if err := r.Status().Update(ctx, trtc); err != nil {
+ if err := r.Status().Update(ctx, trtcDeepCopy); err != nil {
logger.Error(err, "Failed to update status")
- return ctrl.Result{}, err
+ return ctrl.Result{}, err, true
}
- return ctrl.Result{}, nil
+ return ctrl.Result{}, nil, true
}
-func (r *TaskRunToolCallReconciler) postToHumanLayer(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall, contactChannel *kubechainv1alpha1.ContactChannel, apiKey string) (int, error) {
- client := r.HLClient.NewHumanLayerClient()
+func (r *TaskRunToolCallReconciler) updateTRTCStatus(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall, trtcStatusType kubechainv1alpha1.TaskRunToolCallStatusType, trtcStatusPhase kubechainv1alpha1.TaskRunToolCallPhase, statusDetail string) (ctrl.Result, error, bool) {
+ logger := log.FromContext(ctx)
+
+ trtcDeepCopy := trtc.DeepCopy()
- switch contactChannel.Spec.ChannelType {
- case "slack":
- client.SetSlackConfig(contactChannel.Spec.SlackConfig)
- case "email":
- client.SetEmailConfig(contactChannel.Spec.EmailConfig)
+ trtcDeepCopy.Status.Status = trtcStatusType
+ trtcDeepCopy.Status.StatusDetail = statusDetail
+ trtcDeepCopy.Status.Phase = trtcStatusPhase
+
+ if err := r.Status().Update(ctx, trtcDeepCopy); err != nil {
+ logger.Error(err, "Failed to update status")
+ return ctrl.Result{}, err, true
+ }
+ return ctrl.Result{}, nil, true
+}
+
+func (r *TaskRunToolCallReconciler) postToHumanLayer(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall, contactChannel *kubechainv1alpha1.ContactChannel, apiKey string) (*humanlayerapi.FunctionCallOutput, int, error) {
+ client := r.HLClientFactory.NewHumanLayerClient()
+
+ switch contactChannel.Spec.Type {
+ case kubechainv1alpha1.ContactChannelTypeSlack:
+ client.SetSlackConfig(contactChannel.Spec.Slack)
+ case kubechainv1alpha1.ContactChannelTypeEmail:
+ client.SetEmailConfig(contactChannel.Spec.Email)
default:
- return 0, fmt.Errorf("unsupported channel type: %s", contactChannel.Spec.ChannelType)
+ return nil, 0, fmt.Errorf("unsupported channel type: %s", contactChannel.Spec.Type)
}
toolName := trtc.Spec.ToolRef.Name
@@ -724,133 +741,250 @@ func (r *TaskRunToolCallReconciler) postToHumanLayer(ctx context.Context, trtc *
}
client.SetFunctionCallSpec(toolName, args)
- client.SetCallID("call-" + uuid.New().String())
+ client.SetCallID("ec-" + uuid.New().String()[:7])
client.SetRunID(trtc.Name)
client.SetAPIKey(apiKey)
- _, statusCode, err := client.RequestApproval(ctx)
+ functionCall, statusCode, err := client.RequestApproval(ctx)
- return statusCode, err
+ if err == nil {
+ r.recorder.Event(trtc, corev1.EventTypeNormal, "HumanLayerRequestSent", "HumanLayer request sent")
+ }
+
+ return functionCall, statusCode, err
}
-// Reconcile processes TaskRunToolCall objects.
-func (r *TaskRunToolCallReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
+// handlePendingApproval checks if an existing human approval is completed and updates status accordingly
+func (r *TaskRunToolCallReconciler) handlePendingApproval(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall, apiKey string) (ctrl.Result, error, bool) {
logger := log.FromContext(ctx)
- var trtc kubechainv1alpha1.TaskRunToolCall
- if err := r.Get(ctx, req.NamespacedName, &trtc); err != nil {
- return ctrl.Result{}, client.IgnoreNotFound(err)
+ // Only process if in the pending approval state
+ if trtc.Status.Status != kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval {
+ return ctrl.Result{}, nil, false
}
- logger.Info("Reconciling TaskRunToolCall", "name", trtc.Name)
- // Step 1: Initialize status if not set
- if initialized, err := r.initializeTRTC(ctx, &trtc); initialized || err != nil {
- if err != nil {
- return ctrl.Result{}, err
- }
- return ctrl.Result{}, nil
+ // Verify we have a call ID
+ if trtc.Status.ExternalCallID == "" {
+ logger.Info("Missing ExternalCallID in pending approval state")
+ return ctrl.Result{}, nil, false
}
- // Do not pass GO, do not collect $200, if we've errored
- if trtc.Status.Status == kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval {
- return ctrl.Result{}, nil
+ client := r.HLClientFactory.NewHumanLayerClient()
+ client.SetCallID(trtc.Status.ExternalCallID)
+ client.SetAPIKey(apiKey)
+ functionCall, _, err := client.GetFunctionCallStatus(ctx)
+ if err != nil {
+ return ctrl.Result{}, err, true
}
- // Step 2: Check if already completed or has child TaskRun
- if done, err := r.checkCompletedOrExisting(ctx, &trtc); done || err != nil {
- if err != nil {
- return ctrl.Result{}, err
- }
+ status := functionCall.GetStatus()
+
+ approved, ok := status.GetApprovedOk()
+
+ if !ok || approved == nil {
+ return ctrl.Result{RequeueAfter: 5 * time.Second}, nil, true
+ }
+
+ if *approved {
+ return r.updateTRTCStatus(ctx, trtc,
+ kubechainv1alpha1.TaskRunToolCallStatusTypeReadyToExecuteApprovedTool,
+ kubechainv1alpha1.TaskRunToolCallPhasePending,
+ "Ready to execute approved tool")
+ } else {
+ return r.updateTRTCStatus(ctx, trtc,
+ kubechainv1alpha1.TaskRunToolCallStatusTypeToolCallRejected,
+ kubechainv1alpha1.TaskRunToolCallPhaseFailed,
+ "Tool execution rejected")
+ }
+}
+
+// requestHumanApproval handles setting up a new human approval request
+func (r *TaskRunToolCallReconciler) requestHumanApproval(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall,
+ contactChannel *kubechainv1alpha1.ContactChannel, apiKey string, mcpServer *kubechainv1alpha1.MCPServer,
+) (ctrl.Result, error) {
+ logger := log.FromContext(ctx)
+
+ // Skip if already in progress or approved
+ if trtc.Status.Status == kubechainv1alpha1.TaskRunToolCallStatusTypeReadyToExecuteApprovedTool {
return ctrl.Result{}, nil
}
- serverName, mcpToolName, isMCP := isMCPTool(trtc.Spec.ToolRef.Name)
+ // Update status to awaiting approval
+ trtc.Status.Status = "AwaitingHumanApproval"
+ trtc.Status.StatusDetail = fmt.Sprintf("Waiting for human approval via contact channel %s", mcpServer.Spec.ApprovalContactChannel.Name)
+ r.recorder.Event(trtc, corev1.EventTypeNormal, "AwaitingHumanApproval",
+ fmt.Sprintf("Tool execution requires approval via contact channel %s", mcpServer.Spec.ApprovalContactChannel.Name))
- if isMCP {
- // Step 3: Check if this is an MCP tool and needs approval
- mcpServer, needsApproval, err := r.getMCPServer(ctx, &trtc)
- if err != nil {
- return ctrl.Result{}, err
- }
+ if err := r.Status().Update(ctx, trtc); err != nil {
+ logger.Error(err, "Failed to update TaskRunToolCall status")
+ return ctrl.Result{}, err
+ }
- // No approval yet? Get back in the loop.
- if needsApproval && trtc.Status.Status == kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval {
- return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
- }
+ // Verify HLClient is initialized
+ if r.HLClientFactory == nil {
+ err := fmt.Errorf("HLClient not initialized")
+ result, errStatus, _ := r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval,
+ "NoHumanLayerClient", trtc, err)
+ return result, errStatus
+ }
- if needsApproval {
+ // Post to HumanLayer to request approval
+ functionCall, statusCode, err := r.postToHumanLayer(ctx, trtc, contactChannel, apiKey)
+ if err != nil {
+ errorMsg := fmt.Errorf("HumanLayer request failed with status code: %d", statusCode)
+ if err != nil {
+ errorMsg = fmt.Errorf("HumanLayer request failed with status code %d: %v", statusCode, err)
+ }
+ result, errStatus, _ := r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval,
+ "HumanLayerRequestFailed", trtc, errorMsg)
+ return result, errStatus
+ }
- trtcNamespace := trtc.Namespace
- contactChannel, err := r.getContactChannel(ctx, mcpServer, trtcNamespace)
- if err != nil {
- return r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval, "NoContactChannel", &trtc, err)
- }
+ // Update with call ID and requeue
+ callId := functionCall.GetCallId()
+ trtc.Status.ExternalCallID = callId
+ if err := r.Status().Update(ctx, trtc); err != nil {
+ logger.Error(err, "Failed to update TaskRunToolCall status")
+ return ctrl.Result{}, err
+ }
- trtc.Status.Status = "AwaitingHumanApproval"
- trtc.Status.StatusDetail = fmt.Sprintf("Waiting for human approval via contact channel %s", mcpServer.Spec.ApprovalContactChannel.Name)
- r.recorder.Event(&trtc, corev1.EventTypeNormal, "AwaitingHumanApproval",
- fmt.Sprintf("Tool execution requires approval via contact channel %s", mcpServer.Spec.ApprovalContactChannel.Name))
+ return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
+}
- if err := r.Status().Update(ctx, &trtc); err != nil {
- logger.Error(err, "Failed to update TaskRunToolCall status")
- return ctrl.Result{}, err
- }
+// handleMCPApprovalFlow encapsulates the MCP approval flow logic
+func (r *TaskRunToolCallReconciler) handleMCPApprovalFlow(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall) (result ctrl.Result, err error, handled bool) {
+ // We've already been through the approval flow and are ready to execute the tool
+ if trtc.Status.Status == kubechainv1alpha1.TaskRunToolCallStatusTypeReadyToExecuteApprovedTool {
+ return ctrl.Result{}, nil, false
+ }
- apiKey, err := r.getHumanLayerAPIKey(ctx, contactChannel.Spec.APIKeyFrom.SecretKeyRef.Name, contactChannel.Spec.APIKeyFrom.SecretKeyRef.Key, trtcNamespace)
+ // Check if this is an MCP tool and needs approval
+ mcpServer, needsApproval, err := r.getMCPServer(ctx, trtc)
+ if err != nil {
+ return ctrl.Result{}, err, true
+ }
- if err != nil || apiKey == "" {
- return r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval, "NoAPIKey", &trtc, err)
- }
+ // If not an MCP tool or no approval needed, continue with normal processing
+ if mcpServer == nil || !needsApproval {
+ return ctrl.Result{}, nil, false
+ }
- if r.HLClient == nil {
- err := fmt.Errorf("HLClient not initialized")
- return r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval, "NoHumanLayerClient", &trtc, err)
- }
+ // Get contact channel and API key information
+ trtcNamespace := trtc.Namespace
+ contactChannel, err := r.getContactChannel(ctx, mcpServer, trtcNamespace)
+ if err != nil {
+ result, errStatus, _ := r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval,
+ "NoContactChannel", trtc, err)
+ return result, errStatus, true
+ }
- statusCode, err := r.postToHumanLayer(ctx, &trtc, contactChannel, apiKey)
+ apiKey, err := r.getHumanLayerAPIKey(ctx,
+ contactChannel.Spec.APIKeyFrom.SecretKeyRef.Name,
+ contactChannel.Spec.APIKeyFrom.SecretKeyRef.Key,
+ trtcNamespace)
- if statusCode != 200 {
- errorMsg := fmt.Errorf("HumanLayer request failed with status code: %d", statusCode)
- if err != nil {
- errorMsg = fmt.Errorf("HumanLayer request failed with status code %d: %v", statusCode, err)
- }
- return r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval, "HumanLayerRequestFailed", &trtc, errorMsg)
- }
+ if err != nil || apiKey == "" {
+ result, errStatus, _ := r.setStatusError(ctx, kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval,
+ "NoAPIKey", trtc, err)
+ return result, errStatus, true
+ }
- return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
+ // Handle pending approval check first
+ if trtc.Status.Status == kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval {
+ result, err, handled := r.handlePendingApproval(ctx, trtc, apiKey)
+ if handled {
+ return result, err, true
}
}
- // Step 4: Parse arguments
- args, err := r.parseArguments(ctx, &trtc)
- if err != nil {
- return ctrl.Result{}, err
- }
+ // Request human approval if not already done
+ result, err = r.requestHumanApproval(ctx, trtc, contactChannel, apiKey, mcpServer)
+ return result, err, true
+}
- // Step 5: Handle MCP tool execution if applicable
+// dispatchToolExecution routes tool execution to the appropriate handler based on tool type
+func (r *TaskRunToolCallReconciler) dispatchToolExecution(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall,
+ args map[string]interface{},
+) (ctrl.Result, error) {
+ // Check for MCP tool first
+ serverName, mcpToolName, isMCP := isMCPTool(trtc.Spec.ToolRef.Name)
if isMCP && r.MCPManager != nil {
- return r.processMCPTool(ctx, &trtc, serverName, mcpToolName, args)
+ return r.processMCPTool(ctx, trtc, serverName, mcpToolName, args)
}
- // Step 6: Get traditional Tool resource
- tool, toolType, err := r.getTraditionalTool(ctx, &trtc)
+ // Get traditional Tool resource
+ tool, toolType, err := r.getTraditionalTool(ctx, trtc)
if err != nil {
return ctrl.Result{}, err
}
- // Step 7: Process based on tool type
+ // Dispatch based on tool type
switch toolType {
case "delegateToAgent":
- return r.processDelegateToAgent(ctx, &trtc)
+ return r.processDelegateToAgent(ctx, trtc)
case "function":
- return r.processBuiltinFunction(ctx, &trtc, tool, args)
+ return r.processBuiltinFunction(ctx, trtc, tool, args)
case "externalAPI":
- return r.processExternalAPI(ctx, &trtc, tool)
+ return r.processExternalAPI(ctx, trtc, tool)
default:
- return r.handleUnsupportedToolType(ctx, &trtc)
+ return r.handleUnsupportedToolType(ctx, trtc)
}
}
+// Reconcile processes TaskRunToolCall objects.
+func (r *TaskRunToolCallReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
+ logger := log.FromContext(ctx)
+
+ // Get the TaskRunToolCall resource
+ var trtc kubechainv1alpha1.TaskRunToolCall
+ if err := r.Get(ctx, req.NamespacedName, &trtc); err != nil {
+ return ctrl.Result{}, client.IgnoreNotFound(err)
+ }
+ logger.Info("Reconciling TaskRunToolCall", "name", trtc.Name)
+
+ // 1. Initialize status if not set
+ initialized, err, handled := r.initializeTRTC(ctx, &trtc)
+ if handled {
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+ if initialized {
+ return ctrl.Result{}, nil
+ }
+ }
+
+ // 2. Check for terminal error states - early return
+ if trtc.Status.Status == kubechainv1alpha1.TaskRunToolCallStatusTypeErrorRequestingHumanApproval {
+ return ctrl.Result{}, nil
+ }
+
+ // 3. Check if already completed or has child TaskRun
+ done, err, handled := r.checkCompletedOrExisting(ctx, &trtc)
+ if handled {
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+ if done {
+ return ctrl.Result{}, nil
+ }
+ }
+
+ // 4. Handle MCP approval flow
+ result, err, handled := r.handleMCPApprovalFlow(ctx, &trtc)
+ if handled {
+ return result, err
+ }
+
+ // 5. Parse arguments for execution
+ args, err := r.parseArguments(ctx, &trtc)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+
+ // 6. Execute the appropriate tool type
+ return r.dispatchToolExecution(ctx, &trtc, args)
+}
+
func (r *TaskRunToolCallReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.recorder = mgr.GetEventRecorderFor("taskruntoolcall-controller")
r.server = &http.Server{Addr: ":8080"} // Choose a port
@@ -861,13 +995,13 @@ func (r *TaskRunToolCallReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.MCPManager = mcpmanager.NewMCPServerManager()
}
- if r.HLClient == nil {
- client, err := humanlayer.NewHumanLayerClient("")
+ if r.HLClientFactory == nil {
+ client, err := humanlayer.NewHumanLayerClientFactory("")
if err != nil {
return err
}
- r.HLClient = client
+ r.HLClientFactory = client
}
go func() {
diff --git a/kubechain/internal/controller/taskruntoolcall/taskruntoolcall_controller_test.go b/kubechain/internal/controller/taskruntoolcall/taskruntoolcall_controller_test.go
index 9533c4b..f2e89f2 100644
--- a/kubechain/internal/controller/taskruntoolcall/taskruntoolcall_controller_test.go
+++ b/kubechain/internal/controller/taskruntoolcall/taskruntoolcall_controller_test.go
@@ -223,10 +223,10 @@ var _ = Describe("TaskRunToolCall Controller", func() {
})
// Tests for approval workflow
- Context("Pending -> AwaitingHumanApproval (MCP Tool)", func() {
+ Context("Pending -> AwaitingHumanApproval (MCP Tool, Slack Contact Channel)", func() {
It("transitions to AwaitingHumanApproval when MCPServer has approval channel", func() {
// Note setupTestApprovalResources sets up the MCP server, MCP tool, and TaskRunToolCall
- trtc, teardown := setupTestApprovalResources(ctx)
+ trtc, teardown := setupTestApprovalResources(ctx, nil)
defer teardown()
By("reconciling the taskruntoolcall that uses MCP tool with approval")
@@ -236,7 +236,7 @@ var _ = Describe("TaskRunToolCall Controller", func() {
NeedsApproval: true,
}
- reconciler.HLClient = &humanlayer.MockHumanLayerClient{
+ reconciler.HLClientFactory = &humanlayer.MockHumanLayerClientFactory{
ShouldFail: false,
StatusCode: 200,
ReturnError: nil,
@@ -275,20 +275,223 @@ var _ = Describe("TaskRunToolCall Controller", func() {
})
})
+ Context("Pending -> AwaitingHumanApproval (MCP Tool, Email Contact Channel)", func() {
+ It("transitions to AwaitingHumanApproval when MCPServer has email approval channel", func() {
+ // Set up resources with email contact channel
+ trtc, teardown := setupTestApprovalResources(ctx, &SetupTestApprovalConfig{
+ ContactChannelType: "email",
+ })
+ defer teardown()
+
+ By("reconciling the taskruntoolcall that uses MCP tool with email approval")
+ reconciler, recorder := reconciler()
+
+ reconciler.MCPManager = &MockMCPManager{
+ NeedsApproval: true,
+ }
+
+ reconciler.HLClientFactory = &humanlayer.MockHumanLayerClientFactory{
+ ShouldFail: false,
+ StatusCode: 200,
+ ReturnError: nil,
+ }
+
+ result, err := reconciler.Reconcile(ctx, reconcile.Request{
+ NamespacedName: types.NamespacedName{
+ Name: trtc.Name,
+ Namespace: trtc.Namespace,
+ },
+ })
+
+ Expect(err).NotTo(HaveOccurred())
+ Expect(result.RequeueAfter).To(Equal(5 * time.Second)) // Should requeue after 5 seconds
+
+ By("checking the taskruntoolcall status is set to AwaitingHumanApproval")
+ updatedTRTC := &kubechainv1alpha1.TaskRunToolCall{}
+ err = k8sClient.Get(ctx, types.NamespacedName{
+ Name: trtc.Name,
+ Namespace: trtc.Namespace,
+ }, updatedTRTC)
+
+ Expect(err).NotTo(HaveOccurred())
+ Expect(updatedTRTC.Status.Phase).To(Equal(kubechainv1alpha1.TaskRunToolCallPhasePending))
+ Expect(updatedTRTC.Status.Status).To(Equal(kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval))
+ Expect(updatedTRTC.Status.StatusDetail).To(ContainSubstring("Waiting for human approval via contact channel"))
+
+ By("checking that appropriate events were emitted")
+ utils.ExpectRecorder(recorder).ToEmitEventContaining("AwaitingHumanApproval")
+ Expect(updatedTRTC.Status.Status).To(Equal(kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval))
+
+ By("verifying the contact channel type is email")
+ var contactChannel kubechainv1alpha1.ContactChannel
+ err = k8sClient.Get(ctx, types.NamespacedName{
+ Name: testContactChannel.name,
+ Namespace: "default",
+ }, &contactChannel)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(contactChannel.Spec.Type).To(Equal(kubechainv1alpha1.ContactChannelTypeEmail))
+ })
+ })
+
+ Context("AwaitingHumanApproval -> ReadyToExecuteApprovedTool", func() {
+ It("transitions from AwaitingHumanApproval to ReadyToExecuteApprovedTool when MCP tool is approved", func() {
+ trtc, teardown := setupTestApprovalResources(ctx, &SetupTestApprovalConfig{
+ TaskRunToolCallStatus: &kubechainv1alpha1.TaskRunToolCallStatus{
+ ExternalCallID: "call-ready-to-execute-test",
+ Phase: kubechainv1alpha1.TaskRunToolCallPhasePending,
+ Status: kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval,
+ StatusDetail: "Waiting for human approval via contact channel",
+ StartTime: &metav1.Time{Time: time.Now().Add(-1 * time.Minute)},
+ },
+ })
+ defer teardown()
+
+ By("reconciling the trtc against an approval-granting HumanLayer client")
+
+ reconciler, _ := reconciler()
+
+ reconciler.MCPManager = &MockMCPManager{
+ NeedsApproval: true,
+ }
+
+ reconciler.HLClientFactory = &humanlayer.MockHumanLayerClientFactory{
+ ShouldFail: false,
+ StatusCode: 200,
+ ReturnError: nil,
+ ShouldReturnApproval: true,
+ }
+
+ result, err := reconciler.Reconcile(ctx, reconcile.Request{
+ NamespacedName: types.NamespacedName{
+ Name: trtc.Name,
+ Namespace: trtc.Namespace,
+ },
+ })
+
+ Expect(err).NotTo(HaveOccurred())
+ Expect(result.Requeue).To(BeFalse())
+
+ By("checking the taskruntoolcall status is set to ReadyToExecuteApprovedTool")
+ updatedTRTC := &kubechainv1alpha1.TaskRunToolCall{}
+ err = k8sClient.Get(ctx, types.NamespacedName{
+ Name: trtc.Name,
+ Namespace: trtc.Namespace,
+ }, updatedTRTC)
+
+ Expect(err).NotTo(HaveOccurred())
+ Expect(updatedTRTC.Status.Phase).To(Equal(kubechainv1alpha1.TaskRunToolCallPhasePending))
+ Expect(updatedTRTC.Status.Status).To(Equal(kubechainv1alpha1.TaskRunToolCallStatusTypeReadyToExecuteApprovedTool))
+ Expect(updatedTRTC.Status.StatusDetail).To(ContainSubstring("Ready to execute approved tool"))
+ })
+ })
+
+ Context("AwaitingHumanApproval -> ToolCallRejected", func() {
+ It("transitions from AwaitingHumanApproval to ToolCallRejected when MCP tool is rejected", func() {
+ trtc, teardown := setupTestApprovalResources(ctx, &SetupTestApprovalConfig{
+ TaskRunToolCallStatus: &kubechainv1alpha1.TaskRunToolCallStatus{
+ ExternalCallID: "call-tool-call-rejected-test",
+ Phase: kubechainv1alpha1.TaskRunToolCallPhasePending,
+ Status: kubechainv1alpha1.TaskRunToolCallStatusTypeAwaitingHumanApproval,
+ StatusDetail: "Waiting for human approval via contact channel",
+ StartTime: &metav1.Time{Time: time.Now().Add(-1 * time.Minute)},
+ },
+ })
+ defer teardown()
+
+ By("reconciling the trtc against an approval-rejecting HumanLayer client")
+
+ reconciler, _ := reconciler()
+
+ reconciler.MCPManager = &MockMCPManager{
+ NeedsApproval: true,
+ }
+
+ reconciler.HLClientFactory = &humanlayer.MockHumanLayerClientFactory{
+ ShouldFail: false,
+ StatusCode: 200,
+ ReturnError: nil,
+ ShouldReturnRejection: true,
+ }
+
+ result, err := reconciler.Reconcile(ctx, reconcile.Request{
+ NamespacedName: types.NamespacedName{
+ Name: trtc.Name,
+ Namespace: trtc.Namespace,
+ },
+ })
+
+ Expect(err).NotTo(HaveOccurred())
+ Expect(result.Requeue).To(BeFalse())
+
+ By("checking the taskruntoolcall status is set to ToolCallRejected")
+ updatedTRTC := &kubechainv1alpha1.TaskRunToolCall{}
+ err = k8sClient.Get(ctx, types.NamespacedName{
+ Name: trtc.Name,
+ Namespace: trtc.Namespace,
+ }, updatedTRTC)
+
+ Expect(err).NotTo(HaveOccurred())
+ Expect(updatedTRTC.Status.Phase).To(Equal(kubechainv1alpha1.TaskRunToolCallPhaseFailed))
+ Expect(updatedTRTC.Status.Status).To(Equal(kubechainv1alpha1.TaskRunToolCallStatusTypeToolCallRejected))
+ Expect(updatedTRTC.Status.StatusDetail).To(ContainSubstring("Tool execution rejected"))
+ })
+ })
+
+ Context("ReadyToExecuteApprovedTool -> Succeeded", func() {
+ It("transitions from ReadyToExecuteApprovedTool to Succeeded when a tool is executed", func() {
+ trtc, teardown := setupTestApprovalResources(ctx, &SetupTestApprovalConfig{
+ TaskRunToolCallStatus: &kubechainv1alpha1.TaskRunToolCallStatus{
+ ExternalCallID: "call-ready-to-execute-test",
+ Phase: kubechainv1alpha1.TaskRunToolCallPhasePending,
+ Status: kubechainv1alpha1.TaskRunToolCallStatusTypeReadyToExecuteApprovedTool,
+ StatusDetail: "Ready to execute tool, with great vigor",
+ StartTime: &metav1.Time{Time: time.Now().Add(-1 * time.Minute)},
+ },
+ })
+ defer teardown()
+
+ By("reconciling the trtc against an approval-granting HumanLayer client")
+
+ reconciler, _ := reconciler()
+
+ result, err := reconciler.Reconcile(ctx, reconcile.Request{
+ NamespacedName: types.NamespacedName{
+ Name: trtc.Name,
+ Namespace: trtc.Namespace,
+ },
+ })
+
+ Expect(err).NotTo(HaveOccurred())
+ Expect(result.Requeue).To(BeFalse())
+
+ By("checking the taskruntoolcall status is set to Ready:Succeeded")
+ updatedTRTC := &kubechainv1alpha1.TaskRunToolCall{}
+ err = k8sClient.Get(ctx, types.NamespacedName{
+ Name: trtc.Name,
+ Namespace: trtc.Namespace,
+ }, updatedTRTC)
+
+ Expect(err).NotTo(HaveOccurred())
+ Expect(updatedTRTC.Status.Phase).To(Equal(kubechainv1alpha1.TaskRunToolCallPhaseSucceeded))
+ Expect(updatedTRTC.Status.Status).To(Equal(kubechainv1alpha1.TaskRunToolCallStatusTypeReady))
+ Expect(updatedTRTC.Status.Result).To(Equal("5")) // From our mock implementation
+ })
+ })
+
Context("Pending -> ErrorRequestingHumanApproval (MCP Tool)", func() {
It("transitions to ErrorRequestingHumanApproval when request to HumanLayer fails", func() {
// Note setupTestApprovalResources sets up the MCP server, MCP tool, and TaskRunToolCall
- trtc, teardown := setupTestApprovalResources(ctx)
+ trtc, teardown := setupTestApprovalResources(ctx, nil)
defer teardown()
By("reconciling the taskruntoolcall that uses MCP tool with approval")
reconciler, _ := reconciler()
reconciler.MCPManager = &MockMCPManager{
- NeedsApproval: true,
+ NeedsApproval: false,
}
- reconciler.HLClient = &humanlayer.MockHumanLayerClient{
+ reconciler.HLClientFactory = &humanlayer.MockHumanLayerClientFactory{
ShouldFail: true,
StatusCode: 500,
ReturnError: fmt.Errorf("While taking pizzas from the kitchen to the lobby, Pete passed through the server room where he tripped over a network cable and now there's pizza all over the place. Also this request failed. No more pizza in the server room Pete."),
diff --git a/kubechain/internal/controller/taskruntoolcall/utils_test.go b/kubechain/internal/controller/taskruntoolcall/utils_test.go
index 875a182..be3ef2c 100644
--- a/kubechain/internal/controller/taskruntoolcall/utils_test.go
+++ b/kubechain/internal/controller/taskruntoolcall/utils_test.go
@@ -22,7 +22,7 @@ var addTool = &TestTool{
var testContactChannel = &TestContactChannel{
name: "test-contact-channel",
- channelType: "slack",
+ channelType: kubechainv1alpha1.ContactChannelTypeSlack,
secretName: testSecret.name,
}
@@ -64,7 +64,7 @@ type TestSecret struct {
// TestContactChannel represents a test ContactChannel resource
type TestContactChannel struct {
name string
- channelType string
+ channelType kubechainv1alpha1.ContactChannelType
secretName string
contactChannel *kubechainv1alpha1.ContactChannel
}
@@ -77,7 +77,7 @@ func (t *TestContactChannel) Setup(ctx context.Context) *kubechainv1alpha1.Conta
Namespace: "default",
},
Spec: kubechainv1alpha1.ContactChannelSpec{
- ChannelType: t.channelType,
+ Type: t.channelType,
APIKeyFrom: kubechainv1alpha1.APIKeySource{
SecretKeyRef: kubechainv1alpha1.SecretKeyRef{
Name: t.secretName,
@@ -89,11 +89,11 @@ func (t *TestContactChannel) Setup(ctx context.Context) *kubechainv1alpha1.Conta
// Add specific config based on channel type
if t.channelType == "slack" {
- contactChannel.Spec.SlackConfig = &kubechainv1alpha1.SlackChannelConfig{
+ contactChannel.Spec.Slack = &kubechainv1alpha1.SlackChannelConfig{
ChannelOrUserID: "C12345678",
}
} else if t.channelType == "email" {
- contactChannel.Spec.EmailConfig = &kubechainv1alpha1.EmailChannelConfig{
+ contactChannel.Spec.Email = &kubechainv1alpha1.EmailChannelConfig{
Address: "test@example.com",
}
}
@@ -375,11 +375,32 @@ func reconciler() (*TaskRunToolCallReconciler, *record.FakeRecorder) {
return reconciler, recorder
}
+// SetupTestApprovalConfig contains optional configuration for setupTestApprovalResources
+type SetupTestApprovalConfig struct {
+ TaskRunToolCallStatus *kubechainv1alpha1.TaskRunToolCallStatus
+ TaskRunToolCallName string
+ TaskRunToolCallArgs string
+ ContactChannelType kubechainv1alpha1.ContactChannelType
+}
+
// setupTestApprovalResources sets up all resources needed for testing approval
-func setupTestApprovalResources(ctx context.Context) (*kubechainv1alpha1.TaskRunToolCall, func()) {
+func setupTestApprovalResources(ctx context.Context, config *SetupTestApprovalConfig) (*kubechainv1alpha1.TaskRunToolCall, func()) {
By("creating the secret")
testSecret.Setup(ctx)
By("creating the contact channel")
+
+ // Set contact channel type based on config or default to ContactChannelTypeSlack
+ channelType := kubechainv1alpha1.ContactChannelTypeSlack
+ if config != nil && config.ContactChannelType != "" {
+ switch config.ContactChannelType {
+ case "email":
+ channelType = kubechainv1alpha1.ContactChannelTypeEmail
+ default:
+ channelType = kubechainv1alpha1.ContactChannelTypeSlack
+ }
+ }
+
+ testContactChannel.channelType = channelType
testContactChannel.SetupWithStatus(ctx, kubechainv1alpha1.ContactChannelStatus{
Ready: true,
Status: "Ready",
@@ -395,18 +416,35 @@ func setupTestApprovalResources(ctx context.Context) (*kubechainv1alpha1.TaskRun
Status: "Ready",
})
+ name := "test-mcp-with-approval-trtc"
+ args := `{"a": 2, "b": 3}`
+ if config != nil {
+ if config.TaskRunToolCallName != "" {
+ name = config.TaskRunToolCallName
+ }
+ if config.TaskRunToolCallArgs != "" {
+ args = config.TaskRunToolCallArgs
+ }
+ }
+
taskRunToolCall := &TestTaskRunToolCall{
- name: "test-mcp-with-approval-trtc",
+ name: name,
toolName: mcpTool.Spec.Name,
- arguments: `{"a": 2, "b": 3}`,
+ arguments: args,
}
- trtc := taskRunToolCall.SetupWithStatus(ctx, kubechainv1alpha1.TaskRunToolCallStatus{
+ status := kubechainv1alpha1.TaskRunToolCallStatus{
Phase: kubechainv1alpha1.TaskRunToolCallPhasePending,
Status: "Pending",
StatusDetail: "Ready for execution",
StartTime: &metav1.Time{Time: time.Now().Add(-1 * time.Minute)},
- })
+ }
+
+ if config != nil && config.TaskRunToolCallStatus != nil {
+ status = *config.TaskRunToolCallStatus
+ }
+
+ trtc := taskRunToolCall.SetupWithStatus(ctx, status)
return trtc, func() {
testMCPTool.Teardown(ctx)
diff --git a/kubechain/internal/humanlayer/hlclient.go b/kubechain/internal/humanlayer/hlclient.go
index ab750d7..10f8ed6 100644
--- a/kubechain/internal/humanlayer/hlclient.go
+++ b/kubechain/internal/humanlayer/hlclient.go
@@ -12,10 +12,10 @@ import (
humanlayerapi "github.com/humanlayer/smallchain/kubechain/internal/humanlayerapi"
)
-// NewHumanLayerClient creates a new API client using either the provided API key
+// NewHumanLayerClientFactory creates a new API client using either the provided API key
// or falling back to the HUMANLAYER_API_KEY environment variable. Similarly,
// it uses the provided API base URL or falls back to HUMANLAYER_API_BASE.
-func NewHumanLayerClient(optionalApiBase string) (HumanLayerClientInterface, error) {
+func NewHumanLayerClientFactory(optionalApiBase string) (HumanLayerClientFactory, error) {
config := humanlayerapi.NewConfiguration()
// Get API base from parameter or environment variable
@@ -42,27 +42,32 @@ func NewHumanLayerClient(optionalApiBase string) (HumanLayerClientInterface, err
},
}
+ // Enable debug mode to log request/response
+ // config.Debug = true
+
// Create the API client with the configuration
client := humanlayerapi.NewAPIClient(config)
- return &HumanLayerClient{client: client}, nil
+ return &RealHumanLayerClientFactory{client: client}, nil
}
-type HumanLayerClientWrapperInterface interface {
+type HumanLayerClientWrapper interface {
SetSlackConfig(slackConfig *kubechainv1alpha1.SlackChannelConfig)
SetEmailConfig(emailConfig *kubechainv1alpha1.EmailChannelConfig)
SetFunctionCallSpec(functionName string, args map[string]interface{})
SetCallID(callID string)
SetRunID(runID string)
SetAPIKey(apiKey string)
+
RequestApproval(ctx context.Context) (functionCall *humanlayerapi.FunctionCallOutput, statusCode int, err error)
+ GetFunctionCallStatus(ctx context.Context) (functionCall *humanlayerapi.FunctionCallOutput, statusCode int, err error)
}
-type HumanLayerClientInterface interface {
- NewHumanLayerClient() HumanLayerClientWrapperInterface
+type HumanLayerClientFactory interface {
+ NewHumanLayerClient() HumanLayerClientWrapper
}
-type HumanLayerClientWrapper struct {
+type RealHumanLayerClientWrapper struct {
client *humanlayerapi.APIClient
slackChannelInput *humanlayerapi.SlackContactChannelInput
emailContactChannel *humanlayerapi.EmailContactChannel
@@ -72,17 +77,17 @@ type HumanLayerClientWrapper struct {
apiKey string
}
-type HumanLayerClient struct {
+type RealHumanLayerClientFactory struct {
client *humanlayerapi.APIClient
}
-func (h *HumanLayerClient) NewHumanLayerClient() HumanLayerClientWrapperInterface {
- return &HumanLayerClientWrapper{
+func (h *RealHumanLayerClientFactory) NewHumanLayerClient() HumanLayerClientWrapper {
+ return &RealHumanLayerClientWrapper{
client: h.client,
}
}
-func (h *HumanLayerClientWrapper) SetSlackConfig(slackConfig *kubechainv1alpha1.SlackChannelConfig) {
+func (h *RealHumanLayerClientWrapper) SetSlackConfig(slackConfig *kubechainv1alpha1.SlackChannelConfig) {
slackChannelInput := humanlayerapi.NewSlackContactChannelInput(slackConfig.ChannelOrUserID)
if slackConfig.ContextAboutChannelOrUser != "" {
@@ -92,7 +97,7 @@ func (h *HumanLayerClientWrapper) SetSlackConfig(slackConfig *kubechainv1alpha1.
h.slackChannelInput = slackChannelInput
}
-func (h *HumanLayerClientWrapper) SetEmailConfig(emailConfig *kubechainv1alpha1.EmailChannelConfig) {
+func (h *RealHumanLayerClientWrapper) SetEmailConfig(emailConfig *kubechainv1alpha1.EmailChannelConfig) {
emailContactChannel := humanlayerapi.NewEmailContactChannel(emailConfig.Address)
if emailConfig.ContextAboutUser != "" {
@@ -102,26 +107,26 @@ func (h *HumanLayerClientWrapper) SetEmailConfig(emailConfig *kubechainv1alpha1.
h.emailContactChannel = emailContactChannel
}
-func (h *HumanLayerClientWrapper) SetFunctionCallSpec(functionName string, args map[string]interface{}) {
+func (h *RealHumanLayerClientWrapper) SetFunctionCallSpec(functionName string, args map[string]interface{}) {
// Create the function call input with required parameters
functionCallSpecInput := humanlayerapi.NewFunctionCallSpecInput(functionName, args)
h.functionCallSpecInput = functionCallSpecInput
}
-func (h *HumanLayerClientWrapper) SetCallID(callID string) {
+func (h *RealHumanLayerClientWrapper) SetCallID(callID string) {
h.callID = callID
}
-func (h *HumanLayerClientWrapper) SetRunID(runID string) {
+func (h *RealHumanLayerClientWrapper) SetRunID(runID string) {
h.runID = runID
}
-func (h *HumanLayerClientWrapper) SetAPIKey(apiKey string) {
+func (h *RealHumanLayerClientWrapper) SetAPIKey(apiKey string) {
h.apiKey = apiKey
}
-func (h *HumanLayerClientWrapper) RequestApproval(ctx context.Context) (functionCall *humanlayerapi.FunctionCallOutput, statusCode int, err error) {
+func (h *RealHumanLayerClientWrapper) RequestApproval(ctx context.Context) (functionCall *humanlayerapi.FunctionCallOutput, statusCode int, err error) {
channel := humanlayerapi.NewContactChannelInput()
if h.slackChannelInput != nil {
@@ -142,3 +147,11 @@ func (h *HumanLayerClientWrapper) RequestApproval(ctx context.Context) (function
return functionCall, resp.StatusCode, err
}
+
+func (h *RealHumanLayerClientWrapper) GetFunctionCallStatus(ctx context.Context) (functionCall *humanlayerapi.FunctionCallOutput, statusCode int, err error) {
+ functionCall, resp, err := h.client.DefaultAPI.GetFunctionCallStatus(ctx, h.callID).
+ Authorization("Bearer " + h.apiKey).
+ Execute()
+
+ return functionCall, resp.StatusCode, err
+}
diff --git a/kubechain/internal/humanlayer/mock_hlclient.go b/kubechain/internal/humanlayer/mock_hlclient.go
index b8ac475..66841c2 100644
--- a/kubechain/internal/humanlayer/mock_hlclient.go
+++ b/kubechain/internal/humanlayer/mock_hlclient.go
@@ -2,26 +2,29 @@ package humanlayer
import (
"context"
+ "time"
kubechainv1alpha1 "github.com/humanlayer/smallchain/kubechain/api/v1alpha1"
humanlayerapi "github.com/humanlayer/smallchain/kubechain/internal/humanlayerapi"
)
-// MockHumanLayerClient implements HumanLayerClientInterface for testing
-type MockHumanLayerClient struct {
- ShouldFail bool
- StatusCode int
- ReturnError error
- LastAPIKey string
- LastCallID string
- LastRunID string
- LastFunction string
- LastArguments map[string]interface{}
+// MockHumanLayerClientFactory implements HumanLayerClientFactory for testing
+type MockHumanLayerClientFactory struct {
+ ShouldFail bool
+ StatusCode int
+ ReturnError error
+ ShouldReturnApproval bool
+ ShouldReturnRejection bool
+ LastAPIKey string
+ LastCallID string
+ LastRunID string
+ LastFunction string
+ LastArguments map[string]interface{}
}
-// MockHumanLayerClientWrapper implements HumanLayerClientWrapperInterface for testing
+// MockHumanLayerClientWrapper implements HumanLayerClientWrapper for testing
type MockHumanLayerClientWrapper struct {
- parent *MockHumanLayerClient
+ parent *MockHumanLayerClientFactory
slackConfig *kubechainv1alpha1.SlackChannelConfig
emailConfig *kubechainv1alpha1.EmailChannelConfig
functionName string
@@ -32,53 +35,85 @@ type MockHumanLayerClientWrapper struct {
}
// NewHumanLayerClient creates a new mock client
-func NewMockHumanLayerClient(shouldFail bool, statusCode int, returnError error) *MockHumanLayerClient {
- return &MockHumanLayerClient{
+func NewMockHumanLayerClient(shouldFail bool, statusCode int, returnError error) *MockHumanLayerClientFactory {
+ return &MockHumanLayerClientFactory{
ShouldFail: shouldFail,
StatusCode: statusCode,
ReturnError: returnError,
}
}
-// NewHumanLayerClient implements HumanLayerClientInterface
-func (m *MockHumanLayerClient) NewHumanLayerClient() HumanLayerClientWrapperInterface {
+// NewHumanLayerClient implements HumanLayerClientFactory
+func (m *MockHumanLayerClientFactory) NewHumanLayerClient() HumanLayerClientWrapper {
return &MockHumanLayerClientWrapper{
parent: m,
}
}
-// SetSlackConfig implements HumanLayerClientWrapperInterface
+// SetSlackConfig implements HumanLayerClientWrapper
func (m *MockHumanLayerClientWrapper) SetSlackConfig(slackConfig *kubechainv1alpha1.SlackChannelConfig) {
m.slackConfig = slackConfig
}
-// SetEmailConfig implements HumanLayerClientWrapperInterface
+// SetEmailConfig implements HumanLayerClientWrapper
func (m *MockHumanLayerClientWrapper) SetEmailConfig(emailConfig *kubechainv1alpha1.EmailChannelConfig) {
m.emailConfig = emailConfig
}
-// SetFunctionCallSpec implements HumanLayerClientWrapperInterface
+// SetFunctionCallSpec implements HumanLayerClientWrapper
func (m *MockHumanLayerClientWrapper) SetFunctionCallSpec(functionName string, args map[string]interface{}) {
m.functionName = functionName
m.functionArgs = args
}
-// SetCallID implements HumanLayerClientWrapperInterface
+// SetCallID implements HumanLayerClientWrapper
func (m *MockHumanLayerClientWrapper) SetCallID(callID string) {
m.callID = callID
}
-// SetRunID implements HumanLayerClientWrapperInterface
+// SetRunID implements HumanLayerClientWrapper
func (m *MockHumanLayerClientWrapper) SetRunID(runID string) {
m.runID = runID
}
-// SetAPIKey implements HumanLayerClientWrapperInterface
+// SetAPIKey implements HumanLayerClientWrapper
func (m *MockHumanLayerClientWrapper) SetAPIKey(apiKey string) {
m.apiKey = apiKey
}
-// RequestApproval implements HumanLayerClientWrapperInterface
+// GetFunctionCallStatus implements HumanLayerClientWrapper
+func (m *MockHumanLayerClientWrapper) GetFunctionCallStatus(ctx context.Context) (*humanlayerapi.FunctionCallOutput, int, error) {
+
+ if m.parent.ShouldReturnApproval {
+ now := time.Now()
+ approved := true
+ status := humanlayerapi.NewNullableFunctionCallStatus(&humanlayerapi.FunctionCallStatus{
+ RequestedAt: *humanlayerapi.NewNullableTime(&now),
+ RespondedAt: *humanlayerapi.NewNullableTime(&now),
+ Approved: *humanlayerapi.NewNullableBool(&approved),
+ })
+ return &humanlayerapi.FunctionCallOutput{
+ Status: *status,
+ }, 200, nil
+ }
+
+ if m.parent.ShouldReturnRejection {
+ now := time.Now()
+ approved := false
+ status := humanlayerapi.NewNullableFunctionCallStatus(&humanlayerapi.FunctionCallStatus{
+ RequestedAt: *humanlayerapi.NewNullableTime(&now),
+ RespondedAt: *humanlayerapi.NewNullableTime(&now),
+ Approved: *humanlayerapi.NewNullableBool(&approved),
+ })
+ return &humanlayerapi.FunctionCallOutput{
+ Status: *status,
+ }, 200, nil
+ }
+
+ return nil, m.parent.StatusCode, m.parent.ReturnError
+}
+
+// RequestApproval implements HumanLayerClientWrapper
func (m *MockHumanLayerClientWrapper) RequestApproval(ctx context.Context) (*humanlayerapi.FunctionCallOutput, int, error) {
// Store the values in the parent for test verification
m.parent.LastAPIKey = m.apiKey
diff --git a/kubechain/internal/humanlayerapi/utils.go b/kubechain/internal/humanlayerapi/utils.go
index 584f71e..c7df67a 100644
--- a/kubechain/internal/humanlayerapi/utils.go
+++ b/kubechain/internal/humanlayerapi/utils.go
@@ -327,7 +327,44 @@ func (v NullableTime) MarshalJSON() ([]byte, error) {
func (v *NullableTime) UnmarshalJSON(src []byte) error {
v.isSet = true
- return json.Unmarshal(src, &v.value)
+
+ // Handle null case
+ if string(src) == "null" {
+ v.value = nil
+ return nil
+ }
+
+ // Try standard time parsing first
+ var stdTime time.Time
+ err := json.Unmarshal(src, &stdTime)
+ if err == nil {
+ v.value = &stdTime
+ return nil
+ }
+
+ // If standard parsing fails, try to parse as RFC3339Nano without timezone
+ s := string(src)
+ if len(s) >= 2 {
+ // Remove quotes from the string
+ s = s[1 : len(s)-1]
+
+ // Try various time formats
+ formats := []string{
+ "2006-01-02T15:04:05.999999", // Missing timezone
+ "2006-01-02T15:04:05", // No fractional seconds or timezone
+ time.RFC3339, // Standard format with timezone
+ time.RFC3339Nano, // Standard format with nanoseconds
+ }
+
+ for _, format := range formats {
+ if t, err := time.Parse(format, s); err == nil {
+ v.value = &t
+ return nil
+ }
+ }
+ }
+
+ return fmt.Errorf("cannot parse %s as time.Time", string(src))
}
// IsNil checks if an input is nil