diff --git a/README.md b/README.md index a51b620..9737ebb 100644 --- a/README.md +++ b/README.md @@ -767,13 +767,13 @@ Let's make a new task that uses the fetch tool. In this case, we'll use https:// ```bash cat < 0 { - logger.Info("Adding traditional tools to LLM request", "toolCount", len(agent.Status.ValidTools)) +// endTaskRunSpan ends the TaskRun span with the given status +func (r *TaskRunReconciler) endTaskRunSpan(ctx context.Context, taskRun *kubechain.TaskRun, code codes.Code, message string) { + if taskRun.Status.SpanContext == nil { + return + } - for _, validTool := range agent.Status.ValidTools { - if validTool.Kind != "Tool" { - continue - } + traceID, err := trace.TraceIDFromHex(taskRun.Status.SpanContext.TraceID) + if err != nil { + return + } + spanID, err := trace.SpanIDFromHex(taskRun.Status.SpanContext.SpanID) + if err != nil { + return + } - // Get the Tool resource - tool := &kubechainv1alpha1.Tool{} - if err := r.Get(ctx, client.ObjectKey{Namespace: agent.Namespace, Name: validTool.Name}, tool); err != nil { - logger.Error(err, "Failed to get Tool", "name", validTool.Name) - continue - } + spanCtx := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + }) - // Convert to LLM client format - if clientTool := llmclient.FromKubechainTool(*tool); clientTool != nil { - tools = append(tools, *clientTool) - logger.Info("Added traditional tool", "name", tool.Name) - } - } - } + ctx = trace.ContextWithSpanContext(ctx, spanCtx) + _, span := r.Tracer.Start(ctx, "TaskRun") + defer span.End() - // Then, add tools from MCP servers if available - if r.MCPManager != nil && len(agent.Status.ValidMCPServers) > 0 { - logger.Info("Adding MCP tools to LLM request", "mcpServerCount", len(agent.Status.ValidMCPServers)) + span.SetStatus(code, message) +} - for _, mcpServer := range agent.Status.ValidMCPServers { - // Get tools for this server - mcpTools, exists := r.MCPManager.GetTools(mcpServer.Name) - if !exists { - logger.Error(fmt.Errorf("MCP server tools not found"), "Failed to get tools for MCP server", "server", mcpServer.Name) - continue - } +// collectTools collects all tools from the agent's MCP servers +func (r *TaskRunReconciler) collectTools(agent *kubechain.Agent) []llmclient.Tool { + var tools []llmclient.Tool - // Convert MCP tools to LLM client format - mcpClientTools := adapters.ConvertMCPToolsToLLMClientTools(mcpTools, mcpServer.Name) - tools = append(tools, mcpClientTools...) + // Get tools from MCP manager + mcpTools := r.MCPManager.GetToolsForAgent(agent) - logger.Info("Added MCP tools", "server", mcpServer.Name, "toolCount", len(mcpTools)) - } + // Convert MCP tools to LLM tools + for _, mcpTool := range mcpTools { + tools = append(tools, adapters.ConvertMCPToolsToLLMClientTools([]kubechain.MCPTool{mcpTool}, mcpTool.Name)...) } return tools } -// endTaskRunSpan ends the parent span for a TaskRun if it exists -func (r *TaskRunReconciler) endTaskRunSpan(ctx context.Context, taskRun *kubechainv1alpha1.TaskRun, status codes.Code, description string) { - // Only try to end span if we have SpanContext info - if taskRun.Status.SpanContext == nil || taskRun.Status.SpanContext.TraceID == "" { - return +// createLLMRequestSpan creates a child span for the LLM request +func (r *TaskRunReconciler) createLLMRequestSpan(ctx context.Context, taskRun *kubechain.TaskRun, numMessages int, numTools int) (context.Context, trace.Span) { + if taskRun.Status.SpanContext == nil { + return ctx, nil } - // Get tracer - tracer := r.Tracer - if tracer == nil { - tracer = otel.GetTracerProvider().Tracer("taskrun") - } - - // Parse the trace and span IDs from the stored context - var traceID trace.TraceID - var spanID trace.SpanID - - // Convert hex strings to byte arrays - traceIDBytes, err := trace.TraceIDFromHex(taskRun.Status.SpanContext.TraceID) + traceID, err := trace.TraceIDFromHex(taskRun.Status.SpanContext.TraceID) if err != nil { - return + return ctx, nil } - traceID = traceIDBytes - - spanIDBytes, err := trace.SpanIDFromHex(taskRun.Status.SpanContext.SpanID) + spanID, err := trace.SpanIDFromHex(taskRun.Status.SpanContext.SpanID) if err != nil { - return + return ctx, nil } - spanID = spanIDBytes - // Create a span context with the stored IDs spanCtx := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: traceID, - SpanID: spanID, - TraceFlags: trace.FlagsSampled, - Remote: false, + TraceID: traceID, + SpanID: spanID, }) - // Create context with the span context - ctxWithSpan := trace.ContextWithSpanContext(ctx, spanCtx) - - // Create a final completion span that's a child of the original span - _, span := tracer.Start(ctxWithSpan, fmt.Sprintf("TaskRun/%s/Completion", taskRun.Name), - trace.WithSpanKind(trace.SpanKindServer), - trace.WithAttributes( - attribute.String("taskrun.name", taskRun.Name), - attribute.String("taskrun.namespace", taskRun.Namespace), - attribute.String("taskrun.phase", string(taskRun.Status.Phase)), - attribute.String("taskrun.status", string(taskRun.Status.Status)), - ), + ctx = trace.ContextWithSpanContext(ctx, spanCtx) + childCtx, span := r.Tracer.Start(ctx, "LLMRequest") + + span.SetAttributes( + attribute.Int("messages", numMessages), + attribute.Int("tools", numTools), ) - span.SetStatus(status, description) - span.End() + + return childCtx, span } +// processLLMResponse processes the LLM response and updates the TaskRun status // processLLMResponse handles the LLM's output and updates status accordingly -func (r *TaskRunReconciler) processLLMResponse(ctx context.Context, output *kubechainv1alpha1.Message, taskRun *kubechainv1alpha1.TaskRun, statusUpdate *kubechainv1alpha1.TaskRun) (ctrl.Result, error) { +func (r *TaskRunReconciler) processLLMResponse(ctx context.Context, output *kubechain.Message, taskRun *kubechain.TaskRun, statusUpdate *kubechain.TaskRun) (ctrl.Result, error) { logger := log.FromContext(ctx) if output.Content != "" { // final answer branch statusUpdate.Status.Output = output.Content - statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseFinalAnswer + statusUpdate.Status.Phase = kubechain.TaskRunPhaseFinalAnswer statusUpdate.Status.Ready = true - statusUpdate.Status.ContextWindow = append(statusUpdate.Status.ContextWindow, kubechainv1alpha1.Message{ + statusUpdate.Status.ContextWindow = append(statusUpdate.Status.ContextWindow, kubechain.Message{ Role: "assistant", Content: output.Content, }) - statusUpdate.Status.Status = StatusReady + statusUpdate.Status.Status = kubechain.TaskRunStatusStatusReady statusUpdate.Status.StatusDetail = "LLM final response received" statusUpdate.Status.Error = "" r.recorder.Event(taskRun, corev1.EventTypeNormal, "LLMFinalAnswer", "LLM response received successfully") @@ -499,14 +380,14 @@ func (r *TaskRunReconciler) processLLMResponse(ctx context.Context, output *kube // tool call branch: create TaskRunToolCall objects for each tool call returned by the LLM. statusUpdate.Status.Output = "" - statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseToolCallsPending + statusUpdate.Status.Phase = kubechain.TaskRunPhaseToolCallsPending statusUpdate.Status.ToolCallRequestID = toolCallRequestId - statusUpdate.Status.ContextWindow = append(statusUpdate.Status.ContextWindow, kubechainv1alpha1.Message{ + statusUpdate.Status.ContextWindow = append(statusUpdate.Status.ContextWindow, kubechain.Message{ Role: "assistant", ToolCalls: adapters.CastOpenAIToolCallsToKubechain(output.ToolCalls), }) statusUpdate.Status.Ready = true - statusUpdate.Status.Status = StatusReady + statusUpdate.Status.Status = kubechain.TaskRunStatusStatusReady statusUpdate.Status.StatusDetail = "LLM response received, tool calls pending" statusUpdate.Status.Error = "" r.recorder.Event(taskRun, corev1.EventTypeNormal, "ToolCallsPending", "LLM response received, tool calls pending") @@ -523,7 +404,7 @@ func (r *TaskRunReconciler) processLLMResponse(ctx context.Context, output *kube } // createToolCalls creates TaskRunToolCall objects for each tool call -func (r *TaskRunReconciler) createToolCalls(ctx context.Context, taskRun *kubechainv1alpha1.TaskRun, statusUpdate *kubechainv1alpha1.TaskRun, toolCalls []kubechainv1alpha1.ToolCall) (ctrl.Result, error) { +func (r *TaskRunReconciler) createToolCalls(ctx context.Context, taskRun *kubechain.TaskRun, statusUpdate *kubechain.TaskRun, toolCalls []kubechain.ToolCall) (ctrl.Result, error) { logger := log.FromContext(ctx) if statusUpdate.Status.ToolCallRequestID == "" { @@ -535,7 +416,7 @@ func (r *TaskRunReconciler) createToolCalls(ctx context.Context, taskRun *kubech // For each tool call, create a new TaskRunToolCall with a unique name using the ToolCallRequestID for i, tc := range toolCalls { newName := fmt.Sprintf("%s-%s-tc-%02d", statusUpdate.Name, statusUpdate.Status.ToolCallRequestID, i+1) - newTRTC := &kubechainv1alpha1.TaskRunToolCall{ + newTRTC := &kubechain.TaskRunToolCall{ ObjectMeta: metav1.ObjectMeta{ Name: newName, Namespace: statusUpdate.Namespace, @@ -553,12 +434,12 @@ func (r *TaskRunReconciler) createToolCalls(ctx context.Context, taskRun *kubech }, }, }, - Spec: kubechainv1alpha1.TaskRunToolCallSpec{ + Spec: kubechain.TaskRunToolCallSpec{ ToolCallId: tc.ID, - TaskRunRef: kubechainv1alpha1.LocalObjectReference{ + TaskRunRef: kubechain.LocalObjectReference{ Name: statusUpdate.Name, }, - ToolRef: kubechainv1alpha1.LocalObjectReference{ + ToolRef: kubechain.LocalObjectReference{ Name: tc.Function.Name, }, Arguments: tc.Function.Arguments, @@ -574,109 +455,11 @@ func (r *TaskRunReconciler) createToolCalls(ctx context.Context, taskRun *kubech return ctrl.Result{RequeueAfter: time.Second * 5}, nil } -// initializePhaseAndSpan initializes the TaskRun phase and starts tracing -func (r *TaskRunReconciler) initializePhaseAndSpan(ctx context.Context, statusUpdate *kubechainv1alpha1.TaskRun) (ctrl.Result, error) { - logger := log.FromContext(ctx) - - // Start tracing the TaskRun - tracer := r.Tracer - if tracer == nil { - tracer = otel.GetTracerProvider().Tracer("taskrun") - } - - // Make sure we provide a meaningful span name that includes the TaskRun name - spanName := fmt.Sprintf("TaskRun/%s", statusUpdate.Name) - ctx, span := tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindServer)) - - // We need to explicitly end the span so the root span is properly recorded - // This is not what we want long-term, but it ensures spans show up correctly - // while we resolve the issue with maintaining spans across reconciliation calls - defer span.End() - - // Store span context in status - spanCtx := span.SpanContext() - statusUpdate.Status.SpanContext = &kubechainv1alpha1.SpanContext{ - TraceID: spanCtx.TraceID().String(), - SpanID: spanCtx.SpanID().String(), - } - - // Set useful attributes on the span - span.SetAttributes( - attribute.String("taskrun.name", statusUpdate.Name), - attribute.String("taskrun.namespace", statusUpdate.Namespace), - attribute.String("taskrun.uid", string(statusUpdate.UID)), - ) - - statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseInitializing - statusUpdate.Status.Ready = false - statusUpdate.Status.Status = kubechainv1alpha1.TaskRunStatusStatusPending - statusUpdate.Status.StatusDetail = "Initializing" - if err := r.Status().Update(ctx, statusUpdate); err != nil { - logger.Error(err, "Failed to update TaskRun status") - span.SetStatus(codes.Error, "Failed to update TaskRun status") - span.RecordError(err) - return ctrl.Result{}, err - } - - // By ending the span now, we ensure it's properly recorded - // This approach creates a separate span for each reconciliation rather than - // a single span that covers the entire TaskRun lifecycle - span.SetStatus(codes.Ok, "TaskRun initialized") - return ctrl.Result{Requeue: true}, nil -} - -// createLLMRequestSpan creates a child span for an LLM request that is properly linked to the parent span -func (r *TaskRunReconciler) createLLMRequestSpan(ctx context.Context, taskRun *kubechainv1alpha1.TaskRun, contextWindowSize, toolsCount int) (context.Context, trace.Span) { - // Use controller's tracer if available, otherwise get the global tracer - tracer := r.Tracer - if tracer == nil { - tracer = otel.GetTracerProvider().Tracer("taskrun") - } - - // If we have a parent span context, create a child span linked to the parent - if taskRun.Status.SpanContext != nil && taskRun.Status.SpanContext.TraceID != "" { - // Parse the trace and span IDs from the stored context - traceIDBytes, err := trace.TraceIDFromHex(taskRun.Status.SpanContext.TraceID) - if err == nil { - spanIDBytes, err := trace.SpanIDFromHex(taskRun.Status.SpanContext.SpanID) - if err == nil { - // Create a span context with the stored IDs - spanCtx := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: traceIDBytes, - SpanID: spanIDBytes, - TraceFlags: trace.FlagsSampled, - Remote: false, - }) - - // Create context with the span context - ctx = trace.ContextWithSpanContext(ctx, spanCtx) - } - } - - // Create a child span that will properly link to the parent - childCtx, childSpan := tracer.Start(ctx, fmt.Sprintf("TaskRun/%s/LLMRequest", taskRun.Name), - trace.WithSpanKind(trace.SpanKindClient)) - - // Set attributes for the LLM request - childSpan.SetAttributes( - attribute.Int("context_window_size", contextWindowSize), - attribute.Int("tools_count", toolsCount), - attribute.String("taskrun.name", taskRun.Name), - attribute.String("taskrun.namespace", taskRun.Namespace), - ) - - return childCtx, childSpan - } - - // No parent span, just use the current context - return ctx, nil -} - -// Reconcile validates the taskrun's task reference and sends the prompt to the LLM. +// Reconcile validates the taskrun's agent reference and sends the prompt to the LLM. func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) - var taskRun kubechainv1alpha1.TaskRun + var taskRun kubechain.TaskRun if err := r.Get(ctx, req.NamespacedName, &taskRun); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } @@ -692,33 +475,32 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } // Skip reconciliation for terminal states - if statusUpdate.Status.Phase == kubechainv1alpha1.TaskRunPhaseFinalAnswer || statusUpdate.Status.Phase == kubechainv1alpha1.TaskRunPhaseFailed { - // todo do we need this V()?? + if statusUpdate.Status.Phase == kubechain.TaskRunPhaseFinalAnswer || statusUpdate.Status.Phase == kubechain.TaskRunPhaseFailed { logger.V(1).Info("TaskRun in terminal state, skipping reconciliation", "phase", statusUpdate.Status.Phase) return ctrl.Result{}, nil } - // Step 1: Validate Task and Agent - logger.V(3).Info("Validating Task and Agent") - task, agent, result, err := r.validateTaskAndAgent(ctx, &taskRun, statusUpdate) + // Step 1: Validate Agent + logger.V(3).Info("Validating Agent") + agent, result, err := r.validateTaskAndAgent(ctx, &taskRun, statusUpdate) if err != nil || !result.IsZero() { return result, err } // Step 2: Initialize Phase if necessary logger.V(3).Info("Preparing for LLM") - if result, err := r.prepareForLLM(ctx, &taskRun, statusUpdate, task, agent); err != nil || !result.IsZero() { + if result, err := r.prepareForLLM(ctx, &taskRun, statusUpdate, agent); err != nil || !result.IsZero() { return result, err } // Step 3: Handle tool calls phase logger.V(3).Info("Handling tool calls phase") - if taskRun.Status.Phase == kubechainv1alpha1.TaskRunPhaseToolCallsPending { + if taskRun.Status.Phase == kubechain.TaskRunPhaseToolCallsPending { return r.processToolCalls(ctx, &taskRun) } // Step 4: Check for unexpected phase - if taskRun.Status.Phase != kubechainv1alpha1.TaskRunPhaseReadyForLLM { + if taskRun.Status.Phase != kubechain.TaskRunPhaseReadyForLLM { logger.Info("TaskRun in unknown phase", "phase", taskRun.Status.Phase) return ctrl.Result{}, nil } @@ -736,8 +518,8 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct if err != nil { logger.Error(err, "Failed to create OpenAI client") statusUpdate.Status.Ready = false - statusUpdate.Status.Status = StatusError - statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseFailed + statusUpdate.Status.Status = kubechain.TaskRunStatusStatusError + statusUpdate.Status.Phase = kubechain.TaskRunPhaseFailed statusUpdate.Status.StatusDetail = "Failed to create OpenAI client: " + err.Error() statusUpdate.Status.Error = err.Error() r.recorder.Event(&taskRun, corev1.EventTypeWarning, "OpenAIClientCreationFailed", err.Error()) @@ -753,7 +535,7 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } // Step 7: Collect tools from all sources - tools := r.collectTools(ctx, agent) + tools := r.collectTools(agent) r.recorder.Event(&taskRun, corev1.EventTypeNormal, "SendingContextWindowToLLM", "Sending context window to LLM") @@ -769,7 +551,7 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct if err != nil { logger.Error(err, "LLM request failed") statusUpdate.Status.Ready = false - statusUpdate.Status.Status = StatusError + statusUpdate.Status.Status = kubechain.TaskRunStatusStatusError statusUpdate.Status.StatusDetail = fmt.Sprintf("LLM request failed: %v", err) statusUpdate.Status.Error = err.Error() @@ -779,7 +561,7 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct logger.Info("LLM request failed with 4xx status code, marking as failed", "statusCode", llmErr.StatusCode, "message", llmErr.Message) - statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseFailed + statusUpdate.Status.Phase = kubechain.TaskRunPhaseFailed r.recorder.Event(&taskRun, corev1.EventTypeWarning, "LLMRequestFailed4xx", fmt.Sprintf("LLM request failed with status %d: %s", llmErr.StatusCode, llmErr.Message)) } else { @@ -810,8 +592,8 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct llmResult, err = r.processLLMResponse(ctx, output, &taskRun, statusUpdate) if err != nil { logger.Error(err, "Failed to process LLM response") - statusUpdate.Status.Status = StatusError - statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseFailed + statusUpdate.Status.Status = kubechain.TaskRunStatusStatusError + statusUpdate.Status.Phase = kubechain.TaskRunPhaseFailed statusUpdate.Status.StatusDetail = fmt.Sprintf("Failed to process LLM response: %v", err) statusUpdate.Status.Error = err.Error() r.recorder.Event(&taskRun, corev1.EventTypeWarning, "LLMResponseProcessingFailed", err.Error()) @@ -853,6 +635,6 @@ func (r *TaskRunReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). - For(&kubechainv1alpha1.TaskRun{}). + For(&kubechain.TaskRun{}). Complete(r) } diff --git a/kubechain/internal/controller/taskrun/taskrun_controller_test.go b/kubechain/internal/controller/taskrun/taskrun_controller_test.go index 0af8960..4b5be68 100644 --- a/kubechain/internal/controller/taskrun/taskrun_controller_test.go +++ b/kubechain/internal/controller/taskrun/taskrun_controller_test.go @@ -7,11 +7,11 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.opentelemetry.io/otel/trace/noop" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/humanlayer/smallchain/kubechain/api/v1alpha1" kubechain "github.com/humanlayer/smallchain/kubechain/api/v1alpha1" "github.com/humanlayer/smallchain/kubechain/internal/llmclient" . "github.com/humanlayer/smallchain/kubechain/test/utils" @@ -21,14 +21,15 @@ var _ = Describe("TaskRun Controller", func() { Context("'' -> Initializing", func() { ctx := context.Background() It("moves to Initializing and sets a span context", func() { - testTask.Setup(ctx) - defer testTask.Teardown(ctx) + _, _, _, teardown := setupSuiteObjects(ctx) + defer teardown() taskRun := testTaskRun.Setup(ctx) defer testTaskRun.Teardown(ctx) By("reconciling the taskrun") reconciler, _ := reconciler() + reconciler.Tracer = noop.NewTracerProvider().Tracer("test") result, err := reconciler.Reconcile(ctx, reconcile.Request{ NamespacedName: types.NamespacedName{Name: testTaskRun.name, Namespace: "default"}, @@ -46,35 +47,30 @@ var _ = Describe("TaskRun Controller", func() { }) }) Context("Initializing -> Error", func() { - It("moves to error if the task is not found", func() { + It("moves to error if the agent is not found", func() { taskRun := testTaskRun.SetupWithStatus(ctx, kubechain.TaskRunStatus{ Phase: kubechain.TaskRunPhaseInitializing, }) defer testTaskRun.Teardown(ctx) By("reconciling the taskrun") - reconciler, _ := reconciler() + reconciler, recorder := reconciler() result, err := reconciler.Reconcile(ctx, reconcile.Request{ NamespacedName: types.NamespacedName{Name: testTaskRun.name, Namespace: "default"}, }) - // todo dont error if not found, don't requeue - Expect(err).To(HaveOccurred()) - Expect(result.Requeue).To(BeFalse()) + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(Equal(time.Second * 5)) By("checking the taskrun status") Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testTaskRun.name, Namespace: "default"}, taskRun)).To(Succeed()) - Expect(taskRun.Status.Phase).To(Equal(kubechain.TaskRunPhaseFailed)) - Expect(taskRun.Status.Error).To(Equal("Task \"test-task\" not found")) + Expect(taskRun.Status.Phase).To(Equal(kubechain.TaskRunPhasePending)) + Expect(taskRun.Status.StatusDetail).To(ContainSubstring("Waiting for Agent to exist")) + ExpectRecorder(recorder).ToEmitEventContaining("Waiting") }) }) Context("Initializing -> Pending", func() { - It("moves to pending if upstream task is not ready", func() { - _ = testTask.SetupWithStatus(ctx, kubechain.TaskStatus{ - Status: kubechain.TaskStatusPending, - }) - defer testTask.Teardown(ctx) - + It("moves to pending if upstream agent does not exist", func() { taskRun := testTaskRun.SetupWithStatus(ctx, kubechain.TaskRunStatus{ Phase: kubechain.TaskRunPhaseInitializing, }) @@ -92,14 +88,14 @@ var _ = Describe("TaskRun Controller", func() { By("checking the taskrun status") Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testTaskRun.name, Namespace: "default"}, taskRun)).To(Succeed()) Expect(taskRun.Status.Phase).To(Equal(kubechain.TaskRunPhasePending)) - Expect(taskRun.Status.StatusDetail).To(ContainSubstring("Waiting for task \"test-task\" to become ready")) - ExpectRecorder(recorder).ToEmitEventContaining("TaskNotReady") + Expect(taskRun.Status.StatusDetail).To(ContainSubstring("Waiting for Agent to exist")) + ExpectRecorder(recorder).ToEmitEventContaining("Waiting") }) - }) - Context("Initializing -> ReadyForLLM", func() { - It("moves to ReadyForLLM if the task is ready", func() { - _, _, _, _, teardown := setupSuiteObjects(ctx) - defer teardown() + It("moves to pending if upstream agent is not ready", func() { + _ = testAgent.SetupWithStatus(ctx, kubechain.AgentStatus{ + Ready: false, + }) + defer testAgent.Teardown(ctx) taskRun := testTaskRun.SetupWithStatus(ctx, kubechain.TaskRunStatus{ Phase: kubechain.TaskRunPhaseInitializing, @@ -112,22 +108,18 @@ var _ = Describe("TaskRun Controller", func() { result, err := reconciler.Reconcile(ctx, reconcile.Request{ NamespacedName: types.NamespacedName{Name: testTaskRun.name, Namespace: "default"}, }) - Expect(err).NotTo(HaveOccurred()) - Expect(result.Requeue).To(BeTrue()) + Expect(result.RequeueAfter).To(Equal(time.Second * 5)) - By("ensuring the context window is set correctly") + By("checking the taskrun status") Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testTaskRun.name, Namespace: "default"}, taskRun)).To(Succeed()) - Expect(taskRun.Status.Phase).To(Equal(kubechain.TaskRunPhaseReadyForLLM)) - Expect(taskRun.Status.StatusDetail).To(ContainSubstring("Ready to send to LLM")) - Expect(taskRun.Status.ContextWindow).To(HaveLen(2)) - Expect(taskRun.Status.ContextWindow[0].Role).To(Equal("system")) - Expect(taskRun.Status.ContextWindow[0].Content).To(ContainSubstring(testAgent.system)) - Expect(taskRun.Status.ContextWindow[1].Role).To(Equal("user")) - Expect(taskRun.Status.ContextWindow[1].Content).To(ContainSubstring(testTask.message)) - ExpectRecorder(recorder).ToEmitEventContaining("ValidationSucceeded") + Expect(taskRun.Status.Phase).To(Equal(kubechain.TaskRunPhasePending)) + Expect(taskRun.Status.StatusDetail).To(ContainSubstring("Waiting for agent \"test-agent\" to become ready")) + ExpectRecorder(recorder).ToEmitEventContaining("Waiting for agent") }) - It("moves to ReadyForLLM if there is a userMessage + agentRef and no taskRef", func() { + }) + Context("Initializing -> ReadyForLLM", func() { + It("moves to ReadyForLLM if there is a userMessage + agentRef", func() { testAgent.SetupWithStatus(ctx, kubechain.AgentStatus{ Status: "Ready", Ready: true, @@ -167,8 +159,11 @@ var _ = Describe("TaskRun Controller", func() { }) Context("Pending -> ReadyForLLM", func() { It("moves to ReadyForLLM if upstream dependencies are ready", func() { - _, _, _, _, teardown := setupSuiteObjects(ctx) - defer teardown() + testAgent.SetupWithStatus(ctx, kubechain.AgentStatus{ + Status: "Ready", + Ready: true, + }) + defer testAgent.Teardown(ctx) taskRun := testTaskRun.SetupWithStatus(ctx, kubechain.TaskRunStatus{ Phase: kubechain.TaskRunPhasePending, @@ -192,13 +187,13 @@ var _ = Describe("TaskRun Controller", func() { Expect(taskRun.Status.ContextWindow[0].Role).To(Equal("system")) Expect(taskRun.Status.ContextWindow[0].Content).To(ContainSubstring(testAgent.system)) Expect(taskRun.Status.ContextWindow[1].Role).To(Equal("user")) - Expect(taskRun.Status.ContextWindow[1].Content).To(ContainSubstring(testTask.message)) + Expect(taskRun.Status.ContextWindow[1].Content).To(ContainSubstring(testTaskRun.userMessage)) ExpectRecorder(recorder).ToEmitEventContaining("ValidationSucceeded") }) }) Context("ReadyForLLM -> LLMFinalAnswer", func() { It("moves to LLMFinalAnswer after getting a response from the LLM", func() { - _, _, _, _, teardown := setupSuiteObjects(ctx) + _, _, _, teardown := setupSuiteObjects(ctx) defer teardown() taskRun := testTaskRun.SetupWithStatus(ctx, kubechain.TaskRunStatus{ @@ -210,7 +205,7 @@ var _ = Describe("TaskRun Controller", func() { }, { Role: "user", - Content: testTask.message, + Content: testTaskRun.userMessage, }, }, }) @@ -219,7 +214,7 @@ var _ = Describe("TaskRun Controller", func() { By("reconciling the taskrun") reconciler, recorder := reconciler() mockLLMClient := &llmclient.MockRawOpenAIClient{ - Response: &v1alpha1.Message{ + Response: &kubechain.Message{ Role: "assistant", Content: "The moon is a natural satellite of the Earth and lacks any formal government or capital.", }, @@ -250,12 +245,12 @@ var _ = Describe("TaskRun Controller", func() { Expect(mockLLMClient.Calls[0].Messages[0].Role).To(Equal("system")) Expect(mockLLMClient.Calls[0].Messages[0].Content).To(ContainSubstring(testAgent.system)) Expect(mockLLMClient.Calls[0].Messages[1].Role).To(Equal("user")) - Expect(mockLLMClient.Calls[0].Messages[1].Content).To(ContainSubstring(testTask.message)) + Expect(mockLLMClient.Calls[0].Messages[1].Content).To(ContainSubstring(testTaskRun.userMessage)) }) }) Context("ReadyForLLM -> Error", func() { It("moves to Error state but not Failed phase on general error", func() { - _, _, _, _, teardown := setupSuiteObjects(ctx) + _, _, _, teardown := setupSuiteObjects(ctx) defer teardown() taskRun := testTaskRun.SetupWithStatus(ctx, kubechain.TaskRunStatus{ @@ -267,7 +262,7 @@ var _ = Describe("TaskRun Controller", func() { }, { Role: "user", - Content: testTask.message, + Content: testTaskRun.userMessage, }, }, }) @@ -297,7 +292,7 @@ var _ = Describe("TaskRun Controller", func() { }) It("moves to Error state AND Failed phase on 4xx error", func() { - _, _, _, _, teardown := setupSuiteObjects(ctx) + _, _, _, teardown := setupSuiteObjects(ctx) defer teardown() taskRun := testTaskRun.SetupWithStatus(ctx, kubechain.TaskRunStatus{ @@ -309,7 +304,7 @@ var _ = Describe("TaskRun Controller", func() { }, { Role: "user", - Content: testTask.message, + Content: testTaskRun.userMessage, }, }, }) @@ -353,7 +348,7 @@ var _ = Describe("TaskRun Controller", func() { }) Context("ReadyForLLM -> ToolCallsPending", func() { It("moves to ToolCallsPending if the LLM returns tool calls", func() { - _, _, _, _, teardown := setupSuiteObjects(ctx) + _, _, _, teardown := setupSuiteObjects(ctx) defer teardown() taskRun := testTaskRun.SetupWithStatus(ctx, kubechain.TaskRunStatus{ @@ -364,12 +359,12 @@ var _ = Describe("TaskRun Controller", func() { By("reconciling the taskrun") reconciler, recorder := reconciler() mockLLMClient := &llmclient.MockRawOpenAIClient{ - Response: &v1alpha1.Message{ + Response: &kubechain.Message{ Role: "assistant", - ToolCalls: []v1alpha1.ToolCall{ + ToolCalls: []kubechain.ToolCall{ { ID: "1", - Function: v1alpha1.ToolCallFunction{Name: "fetch__fetch", Arguments: `{"url": "https://api.example.com/data"}`}, + Function: kubechain.ToolCallFunction{Name: "fetch__fetch", Arguments: `{"url": "https://api.example.com/data"}`}, }, }, }, @@ -408,7 +403,7 @@ var _ = Describe("TaskRun Controller", func() { }) Context("ToolCallsPending -> ToolCallsPending", func() { It("Stays in ToolCallsPending if the tool calls are not completed", func() { - _, _, _, _, teardown := setupSuiteObjects(ctx) + _, _, _, teardown := setupSuiteObjects(ctx) defer teardown() taskRun := testTaskRun.SetupWithStatus(ctx, kubechain.TaskRunStatus{ @@ -438,7 +433,7 @@ var _ = Describe("TaskRun Controller", func() { }) Context("ToolCallsPending -> ReadyForLLM", func() { It("moves to ReadyForLLM if all tool calls are completed", func() { - _, _, _, _, teardown := setupSuiteObjects(ctx) + _, _, _, teardown := setupSuiteObjects(ctx) defer teardown() By("setting up the taskrun with a tool call pending") @@ -452,7 +447,7 @@ var _ = Describe("TaskRun Controller", func() { }, { Role: "user", - Content: testTask.message, + Content: testTaskRun.userMessage, }, { Role: "assistant", @@ -594,8 +589,5 @@ var _ = Describe("TaskRun Controller", func() { Expect(lastMessage.Role).To(Equal("assistant")) Expect(lastMessage.Content).To(Equal("4 + 4 = 8")) }) - - // todo(dex) i think this is not needed anymore - check version history to restore it - XIt("should transition to ReadyForLLM when all tool calls are complete", func() {}) }) }) diff --git a/kubechain/internal/controller/taskrun/utils_test.go b/kubechain/internal/controller/taskrun/utils_test.go index b2fbfb6..4246493 100644 --- a/kubechain/internal/controller/taskrun/utils_test.go +++ b/kubechain/internal/controller/taskrun/utils_test.go @@ -11,6 +11,7 @@ import ( "k8s.io/client-go/tools/record" kubechain "github.com/humanlayer/smallchain/kubechain/api/v1alpha1" + "github.com/humanlayer/smallchain/kubechain/internal/mcpmanager" ) // todo this file should probably live in a shared package, but for now... @@ -142,50 +143,8 @@ var testAgent = &TestAgent{ mcpServers: []kubechain.LocalObjectReference{}, } -type TestTask struct { - name string - agentName string - message string - task *kubechain.Task -} - -func (t *TestTask) Setup(ctx context.Context) *kubechain.Task { - By("creating the task") - task := &kubechain.Task{ - ObjectMeta: metav1.ObjectMeta{ - Name: t.name, - Namespace: "default", - }, - Spec: kubechain.TaskSpec{ - AgentRef: kubechain.LocalObjectReference{ - Name: t.agentName, - }, - Message: t.message, - }, - } - err := k8sClient.Create(ctx, task) - Expect(err).NotTo(HaveOccurred()) - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: t.name, Namespace: "default"}, task)).To(Succeed()) - t.task = task - return task -} - -func (t *TestTask) SetupWithStatus(ctx context.Context, status kubechain.TaskStatus) *kubechain.Task { - task := t.Setup(ctx) - task.Status = status - Expect(k8sClient.Status().Update(ctx, task)).To(Succeed()) - t.task = task - return task -} - -func (t *TestTask) Teardown(ctx context.Context) { - By("deleting the task") - Expect(k8sClient.Delete(ctx, t.task)).To(Succeed()) -} - type TestTaskRun struct { name string - taskName string agentName string userMessage string taskRun *kubechain.TaskRun @@ -200,13 +159,8 @@ func (t *TestTaskRun) Setup(ctx context.Context) *kubechain.TaskRun { }, Spec: kubechain.TaskRunSpec{}, } - if t.taskName != "" { - taskRun.Spec.TaskRef = &kubechain.LocalObjectReference{ - Name: t.taskName, - } - } if t.agentName != "" { - taskRun.Spec.AgentRef = &kubechain.LocalObjectReference{ + taskRun.Spec.AgentRef = kubechain.LocalObjectReference{ Name: t.agentName, } } @@ -236,15 +190,10 @@ func (t *TestTaskRun) Teardown(ctx context.Context) { Expect(k8sClient.Delete(ctx, t.taskRun)).To(Succeed()) } -var testTask = &TestTask{ - name: "test-task", - agentName: "test-agent", - message: "what is the capital of the moon?", -} - var testTaskRun = &TestTaskRun{ - name: "test-taskrun", - taskName: testTask.name, + name: "test-taskrun", + agentName: "test-agent", + userMessage: "what is the capital of the moon?", } type TestTaskRunToolCall struct { @@ -298,7 +247,7 @@ var testTaskRunToolCall = &TestTaskRunToolCall{ } // nolint:golint,unparam -func setupSuiteObjects(ctx context.Context) (secret *corev1.Secret, llm *kubechain.LLM, agent *kubechain.Agent, task *kubechain.Task, teardown func()) { +func setupSuiteObjects(ctx context.Context) (secret *corev1.Secret, llm *kubechain.LLM, agent *kubechain.Agent, teardown func()) { secret = testSecret.Setup(ctx) llm = testLLM.SetupWithStatus(ctx, kubechain.LLMStatus{ Status: "Ready", @@ -308,26 +257,22 @@ func setupSuiteObjects(ctx context.Context) (secret *corev1.Secret, llm *kubecha Status: "Ready", Ready: true, }) - task = testTask.SetupWithStatus(ctx, kubechain.TaskStatus{ - Status: "Ready", - Ready: true, - }) teardown = func() { testSecret.Teardown(ctx) testLLM.Teardown(ctx) testAgent.Teardown(ctx) - testTask.Teardown(ctx) } - return secret, llm, agent, task, teardown + return secret, llm, agent, teardown } func reconciler() (*TaskRunReconciler, *record.FakeRecorder) { By("creating the reconciler") recorder := record.NewFakeRecorder(10) reconciler := &TaskRunReconciler{ - Client: k8sClient, - Scheme: k8sClient.Scheme(), - recorder: recorder, + Client: k8sClient, + Scheme: k8sClient.Scheme(), + recorder: recorder, + MCPManager: &mcpmanager.MCPServerManager{}, } return reconciler, recorder } diff --git a/kubechain/internal/mcpmanager/mcpmanager.go b/kubechain/internal/mcpmanager/mcpmanager.go index 649baf5..b33a205 100644 --- a/kubechain/internal/mcpmanager/mcpmanager.go +++ b/kubechain/internal/mcpmanager/mcpmanager.go @@ -19,6 +19,8 @@ import ( ) // MCPServerManager manages MCP server connections and tools +var _ MCPManagerInterface = &MCPServerManager{} + type MCPServerManager struct { connections map[string]*MCPConnection mu sync.RWMutex @@ -290,7 +292,6 @@ func (m *MCPServerManager) CallTool(ctx context.Context, serverName, toolName st Arguments: arguments, }, }) - if err != nil { return "", fmt.Errorf("error calling tool %s on server %s: %w", toolName, serverName, err) } diff --git a/kubechain/kubechain.knowledge.md b/kubechain/kubechain.knowledge.md deleted file mode 100644 index 62eaf7d..0000000 --- a/kubechain/kubechain.knowledge.md +++ /dev/null @@ -1,412 +0,0 @@ -_this is a knowledge file for codebuff and other coding agents, with instructions and guidelines for working on this project if you are a human reading this, some of this may not apply to you_ - -## Status Pattern - -Resources follow a consistent status pattern: -- Ready: Boolean indicating if resource is ready -- Status: Enum with values "Ready" or "Error" or "Pending" -- StatusDetail: Detailed message about the current status -- Events: Emit events for validation success/failure and significant state changes - -Example: -```yaml -status: - ready: true - status: Ready - statusDetail: "OpenAI API key validated successfully" -``` - -Events: -- ValidationSucceeded: When resource validation passes -- ValidationFailed: When resource validation fails -- ResourceCreated: When child resources are created (e.g. TaskRunCreated) - -New resources start in Pending state while validating dependencies. -Use Pending (not Error) when upstream dependencies exist but aren't ready. - -## using the controller - -The controller is running in the local kind cluster in the default namespace. The cluster is called `kubechain-example-cluster`. - -You can use `make deploy-local-kind` to rebuild the controller and push it to the local kind cluster. - -## progress tracking - -BEFORE every change, update resume-kubechain-operator.md with your recent progress and whats next - -## tests - -after every change, validate with - -``` -make test -``` - -## end to end tests - -to test things end to end, you can delete all existing examples - -``` -k delete task,taskrun,agent,llm --all -``` - -then apply the new resources - -``` -kustomize build samples | kubectl apply -f - -``` - -then - -``` -kubectl get llm,tool,agent,task,taskrun -``` - -## things not to do - -- IF YOU ARE RUNNING THE `kind` CLI you are doing something wrong -- DO NOT TRY to port-foward to grafana or anything else in the OTEL stack - i have that handled via node ports -- DO NOT TRY TO CHECK THINGS IN GRAFANA OR PROMETHEUS AT ALL - I will go look at them when you are ready for me to, just ask -- DO NOT USE `cat <