这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions kubechain/config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: ghcr.io/humanlayer/smallchain
newTag: v0.1.10
newName: controller
newTag: "202503221610"

2 changes: 2 additions & 0 deletions kubechain/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/onsi/ginkgo/v2 v2.23.2
github.com/onsi/gomega v1.36.2
github.com/openai/openai-go v0.1.0-alpha.59
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/otel v1.34.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.34.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0
Expand All @@ -20,6 +21,7 @@ require (
)

require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/tidwall/gjson v1.14.4 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,29 @@ func (r *TaskRunToolCallReconciler) handleUnsupportedToolType(ctx context.Contex
return ctrl.Result{}, err
}

// getMCPServer gets the MCPServer for a tool and checks if it requires approval
func (r *TaskRunToolCallReconciler) getMCPServer(ctx context.Context, trtc *kubechainv1alpha1.TaskRunToolCall) (*kubechainv1alpha1.MCPServer, bool, error) {
logger := log.FromContext(ctx)

// Check if this is an MCP tool
serverName, _, isMCP := isMCPTool(trtc.Spec.ToolRef.Name)
if !isMCP {
return nil, false, nil
}

// Get the MCPServer
var mcpServer kubechainv1alpha1.MCPServer
if err := r.Get(ctx, client.ObjectKey{
Namespace: trtc.Namespace,
Name: serverName,
}, &mcpServer); err != nil {
logger.Error(err, "Failed to get MCPServer", "serverName", serverName)
return nil, false, err
}

return &mcpServer, mcpServer.Spec.ApprovalContactChannel != nil, nil
}

// Reconcile processes TaskRunToolCall objects.
func (r *TaskRunToolCallReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
Expand All @@ -646,9 +669,29 @@ func (r *TaskRunToolCallReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, nil
}

// Step 3: Check if this is an MCP tool
serverName, mcpToolName, isMCP := isMCPTool(trtc.Spec.ToolRef.Name)

if isMCP {
// Step 3: Check if this is an MCP tool and needs approval
mcpServer, needsApproval, err := r.getMCPServer(ctx, &trtc)
if err != nil {
return ctrl.Result{}, err
}

if needsApproval {
trtc.Status.Status = "AwaitingHumanApproval"
trtc.Status.StatusDetail = fmt.Sprintf("Waiting for human approval via contact channel %s", mcpServer.Spec.ApprovalContactChannel.Name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • fetch the contact channel based on the MCP server ref
  • create a post /humanlayer/v1/function_calls
  • store the call ID on the status
  • requeue in 5 seconds
  • in reconcile loop
if status == AwaitingHumanApproval {
   GET /humanalyer/v1/function_calls/{call_id}
   if not finished:
        requeue in 5s
   else -> either execute tool, or set output to denied

r.recorder.Event(&trtc, corev1.EventTypeNormal, "AwaitingHumanApproval",
fmt.Sprintf("Tool execution requires approval via contact channel %s", mcpServer.Spec.ApprovalContactChannel.Name))

if err := r.Status().Update(ctx, &trtc); err != nil {
logger.Error(err, "Failed to update TaskRunToolCall status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
}

// Step 4: Parse arguments
args, err := r.parseArguments(ctx, &trtc)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,108 @@ var _ = Describe("TaskRunToolCall Controller", func() {
By("checking that a validation failed event was created")
utils.ExpectRecorder(eventRecorder).ToEmitEventContaining("ExecutionFailed")
})

It("should transition to AwaitingHumanApproval when MCP tool's server has approval contact channel", func() {
By("creating a contact channel")
contactChannel := &kubechainv1alpha1.ContactChannel{
ObjectMeta: metav1.ObjectMeta{
Name: "test-contact-channel",
Namespace: "default",
},
Status: kubechainv1alpha1.ContactChannelStatus{
Ready: true,
Status: "Ready",
},
}
Expect(k8sClient.Create(ctx, contactChannel)).To(Succeed())

By("creating an MCPServer with approval contact channel")
mcpServer := &kubechainv1alpha1.MCPServer{
ObjectMeta: metav1.ObjectMeta{
Name: "test-mcp-server",
Namespace: "default",
},
Spec: kubechainv1alpha1.MCPServerSpec{
Transport: "stdio",
ApprovalContactChannel: &kubechainv1alpha1.LocalObjectReference{
Name: "test-contact-channel",
},
},
}
Expect(k8sClient.Create(ctx, mcpServer)).To(Succeed())

By("creating an MCP tool")
tool := &kubechainv1alpha1.Tool{
ObjectMeta: metav1.ObjectMeta{
Name: "test-mcp-server-test-tool",
Namespace: "default",
},
Spec: kubechainv1alpha1.ToolSpec{
ToolType: "function",
Name: "test-mcp-server__test-tool",
Description: "A tool that requires human approval",
Execute: kubechainv1alpha1.ToolExecute{
Builtin: &kubechainv1alpha1.BuiltinToolSpec{
Name: "add",
},
},
},
}
Expect(k8sClient.Create(ctx, tool)).To(Succeed())

By("creating the taskruntoolcall")
trtc := &kubechainv1alpha1.TaskRunToolCall{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Namespace: "default",
},
Spec: kubechainv1alpha1.TaskRunToolCallSpec{
TaskRunRef: kubechainv1alpha1.LocalObjectReference{
Name: "parent-taskrun",
},
ToolRef: kubechainv1alpha1.LocalObjectReference{
Name: "test-mcp-server__test-tool",
},
Arguments: `{"a": 2, "b": 3}`,
},
}
Expect(k8sClient.Create(ctx, trtc)).To(Succeed())

By("reconciling the taskruntoolcall")
eventRecorder := record.NewFakeRecorder(10)
reconciler := &TaskRunToolCallReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
recorder: eventRecorder,
}

// First reconciliation (but not the contrition variety) - should initialize status
_, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())

// Second reconciliation - should transition to AwaitingHumanApproval
_, err = reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())

By("checking the taskruntoolcall status")
updatedTRTC := &kubechainv1alpha1.TaskRunToolCall{}
err = k8sClient.Get(ctx, typeNamespacedName, updatedTRTC)
Expect(err).NotTo(HaveOccurred())
Expect(updatedTRTC.Status.Phase).To(Equal(kubechainv1alpha1.TaskRunToolCallPhasePending))
Expect(updatedTRTC.Status.Status).To(Equal("AwaitingHumanApproval"))
Expect(updatedTRTC.Status.StatusDetail).To(Equal("Waiting for human approval via contact channel test-contact-channel"))

By("checking that appropriate events were emitted")
utils.ExpectRecorder(eventRecorder).ToEmitEventContaining("AwaitingHumanApproval")

By("Cleanup")
Expect(k8sClient.Delete(ctx, contactChannel)).To(Succeed())
Expect(k8sClient.Delete(ctx, mcpServer)).To(Succeed())
Expect(k8sClient.Delete(ctx, tool)).To(Succeed())
})
})
})