diff --git a/README.md b/README.md index 3835cba..a51b620 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ KubeChain is a cloud-native orchestrator for AI Agents built on Kubernetes. It s - [Setting Up a Local Cluster](#setting-up-a-local-cluster) - [Deploying KubeChain](#deploying-kubechain) - [Creating Your First Agent](#creating-your-first-agent) - - [Running Your First Task](#running-your-first-task) + - [Running Your First TaskRun](#running-your-first-taskrun) - [Inspecting the TaskRun more closely](#inspecting-the-taskrun-more-closely) - [Adding Tools with MCP](#adding-tools-with-mcp) - [Cleaning Up](#cleaning-up) @@ -47,8 +47,7 @@ KubeChain is a cloud-native orchestrator for AI Agents built on Kubernetes. It s - **LLM**: Provider + API Keys + Parameters - **Agent**: LLM + System Prompt + Tools - **Tool**: MCP server or another Agent -- **Task**: Agent + User Message -- **TaskRun**: Task + Current context window +- **TaskRun**: Agent + User Message + Current context window ## Getting Started @@ -308,22 +307,20 @@ Events: Normal ValidationSucceeded 64m (x2 over 64m) agent-controller All dependencies validated successfully ``` - - -### Running Your First Task +### Running Your First TaskRun +Create a TaskRun to interact with your agent: -1. **Create a Task resource** ```bash cat <Using `-o wide` and `describe` ```bash -kubectl get task -o wide +kubectl get taskrun -o wide ``` Output: ``` -NAME READY STATUS DETAIL AGENT MESSAGE OUTPUT -hello-world-task true Ready Task Run Created my-assistant What is the capital of the moon? +NAME READY STATUS DETAIL AGENT PREVIEW OUTPUT +hello-world-1 true Ready Task Run Created my-assistant What is the capital of the moon? ``` ```bash -kubectl describe task +kubectl describe taskrun ``` Output: diff --git a/kubechain/config/manager/kustomization.yaml b/kubechain/config/manager/kustomization.yaml index 07c1c50..9ab4348 100644 --- a/kubechain/config/manager/kustomization.yaml +++ b/kubechain/config/manager/kustomization.yaml @@ -5,4 +5,4 @@ kind: Kustomization images: - name: controller newName: controller - newTag: "202504021329" + newTag: "202504041032" diff --git a/kubechain/internal/controller/taskrun/taskrun_controller.go b/kubechain/internal/controller/taskrun/taskrun_controller.go index 9976d48..41b87ae 100644 --- a/kubechain/internal/controller/taskrun/taskrun_controller.go +++ b/kubechain/internal/controller/taskrun/taskrun_controller.go @@ -54,8 +54,14 @@ type TaskRunReconciler struct { // getTask fetches the parent Task for this TaskRun func (r *TaskRunReconciler) getTask(ctx context.Context, taskRun *kubechainv1alpha1.TaskRun) (*kubechainv1alpha1.Task, error) { + // If we have agentRef and userMessage, we don't need a task + if taskRun.Spec.AgentRef != nil && taskRun.Spec.UserMessage != "" { + return nil, nil + } + + // If TaskRef is nil, we can't fetch the task if taskRun.Spec.TaskRef == nil { - return nil, fmt.Errorf("TaskRef is required but was not provided") + return nil, fmt.Errorf("taskRef is required when agentRef and userMessage are not provided") } task := &kubechainv1alpha1.Task{} @@ -78,7 +84,7 @@ func (r *TaskRunReconciler) getTask(ctx context.Context, taskRun *kubechainv1alp func (r *TaskRunReconciler) validateTaskAndAgent(ctx context.Context, taskRun *kubechainv1alpha1.TaskRun, statusUpdate *kubechainv1alpha1.TaskRun) (*kubechainv1alpha1.Task, *kubechainv1alpha1.Agent, ctrl.Result, error) { logger := log.FromContext(ctx) - // Get parent Task + // Get parent Task if needed task, err := r.getTask(ctx, taskRun) if err != nil { r.recorder.Event(taskRun, corev1.EventTypeWarning, "TaskValidationFailed", err.Error()) @@ -106,9 +112,6 @@ func (r *TaskRunReconciler) validateTaskAndAgent(ctx context.Context, taskRun *k logger.Error(updateErr, "Failed to update TaskRun status") return nil, nil, ctrl.Result{}, fmt.Errorf("failed to update taskrun status: %v", err) } - // todo dont error if not found, don't requeue - // (can use client.IgnoreNotFound(err), but today - // the parent method needs err != nil to break control flow properly) return nil, nil, ctrl.Result{}, err } @@ -128,9 +131,30 @@ func (r *TaskRunReconciler) validateTaskAndAgent(ctx context.Context, taskRun *k return nil, nil, ctrl.Result{RequeueAfter: time.Second * 5}, nil } - // Get the Agent referenced by the Task + // Get the Agent - use TaskRun's AgentRef if provided, otherwise use Task's AgentRef + var agentRef kubechainv1alpha1.LocalObjectReference + if taskRun.Spec.AgentRef != nil { + agentRef = *taskRun.Spec.AgentRef + } else if task != nil { + agentRef = task.Spec.AgentRef + } else { + err := fmt.Errorf("no agent reference found") + logger.Error(err, "Missing agent reference") + statusUpdate.Status.Ready = false + statusUpdate.Status.Status = StatusError + statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseFailed + statusUpdate.Status.StatusDetail = err.Error() + statusUpdate.Status.Error = err.Error() + r.recorder.Event(taskRun, corev1.EventTypeWarning, "ValidationFailed", err.Error()) + if updateErr := r.Status().Update(ctx, statusUpdate); updateErr != nil { + logger.Error(updateErr, "Failed to update TaskRun status") + return nil, nil, ctrl.Result{}, updateErr + } + return nil, nil, ctrl.Result{}, err + } + var agent kubechainv1alpha1.Agent - if err := r.Get(ctx, client.ObjectKey{Namespace: task.Namespace, Name: task.Spec.AgentRef.Name}, &agent); err != nil { + if err := r.Get(ctx, client.ObjectKey{Namespace: taskRun.Namespace, Name: agentRef.Name}, &agent); err != nil { logger.Error(err, "Failed to get Agent") statusUpdate.Status.Ready = false statusUpdate.Status.Status = StatusPending @@ -161,6 +185,7 @@ func (r *TaskRunReconciler) validateTaskAndAgent(ctx context.Context, taskRun *k return nil, nil, ctrl.Result{RequeueAfter: time.Second * 5}, nil } + // If we get here, either task is nil (userMessage + agentRef case) or task is ready return task, &agent, ctrl.Result{}, nil } @@ -168,9 +193,33 @@ func (r *TaskRunReconciler) validateTaskAndAgent(ctx context.Context, taskRun *k func (r *TaskRunReconciler) prepareForLLM(ctx context.Context, taskRun *kubechainv1alpha1.TaskRun, statusUpdate *kubechainv1alpha1.TaskRun, task *kubechainv1alpha1.Task, agent *kubechainv1alpha1.Agent) (ctrl.Result, error) { logger := log.FromContext(ctx) + // If we're in Initializing or Pending phase, transition to ReadyForLLM if statusUpdate.Status.Phase == kubechainv1alpha1.TaskRunPhaseInitializing || statusUpdate.Status.Phase == kubechainv1alpha1.TaskRunPhasePending { statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseReadyForLLM statusUpdate.Status.Ready = true + + // Use userMessage from TaskRun if provided, otherwise use task's message + message := taskRun.Spec.UserMessage + if message == "" && task != nil { + message = task.Spec.Message + } + + if message == "" { + err := fmt.Errorf("no message found in TaskRun or Task") + logger.Error(err, "Missing message") + statusUpdate.Status.Ready = false + statusUpdate.Status.Status = StatusError + statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseFailed + statusUpdate.Status.StatusDetail = err.Error() + statusUpdate.Status.Error = err.Error() + r.recorder.Event(taskRun, corev1.EventTypeWarning, "ValidationFailed", err.Error()) + if updateErr := r.Status().Update(ctx, statusUpdate); updateErr != nil { + logger.Error(updateErr, "Failed to update TaskRun status") + return ctrl.Result{}, updateErr + } + return ctrl.Result{}, err + } + statusUpdate.Status.ContextWindow = []kubechainv1alpha1.Message{ { Role: "system", @@ -178,7 +227,7 @@ func (r *TaskRunReconciler) prepareForLLM(ctx context.Context, taskRun *kubechai }, { Role: "user", - Content: task.Spec.Message, + Content: message, }, } statusUpdate.Status.Status = StatusReady @@ -644,22 +693,26 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct // Skip reconciliation for terminal states if statusUpdate.Status.Phase == kubechainv1alpha1.TaskRunPhaseFinalAnswer || statusUpdate.Status.Phase == kubechainv1alpha1.TaskRunPhaseFailed { + // todo do we need this V()?? logger.V(1).Info("TaskRun in terminal state, skipping reconciliation", "phase", statusUpdate.Status.Phase) return ctrl.Result{}, nil } // Step 1: Validate Task and Agent + logger.V(3).Info("Validating Task and Agent") task, agent, result, err := r.validateTaskAndAgent(ctx, &taskRun, statusUpdate) if err != nil || !result.IsZero() { return result, err } // Step 2: Initialize Phase if necessary + logger.V(3).Info("Preparing for LLM") if result, err := r.prepareForLLM(ctx, &taskRun, statusUpdate, task, agent); err != nil || !result.IsZero() { return result, err } // Step 3: Handle tool calls phase + logger.V(3).Info("Handling tool calls phase") if taskRun.Status.Phase == kubechainv1alpha1.TaskRunPhaseToolCallsPending { return r.processToolCalls(ctx, &taskRun) } @@ -671,12 +724,14 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } // Step 5: Get API credentials (LLM is returned but not used) + logger.V(3).Info("Getting API credentials") _, apiKey, err := r.getLLMAndCredentials(ctx, agent, &taskRun, statusUpdate) if err != nil { return ctrl.Result{}, err } // Step 6: Create LLM client + logger.V(3).Info("Creating LLM client") llmClient, err := r.newLLMClient(apiKey) if err != nil { logger.Error(err, "Failed to create OpenAI client") @@ -708,6 +763,7 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct defer childSpan.End() } + logger.V(3).Info("Sending LLM request") // Step 8: Send the prompt to the LLM output, err := llmClient.SendRequest(childCtx, taskRun.Status.ContextWindow, tools) if err != nil { @@ -748,6 +804,7 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct childSpan.SetStatus(codes.Ok, "LLM request succeeded") } + logger.V(3).Info("Processing LLM response") // Step 9: Process LLM response var llmResult ctrl.Result llmResult, err = r.processLLMResponse(ctx, output, &taskRun, statusUpdate) diff --git a/kubechain/internal/controller/taskrun/taskrun_controller_test.go b/kubechain/internal/controller/taskrun/taskrun_controller_test.go index ef6058a..0af8960 100644 --- a/kubechain/internal/controller/taskrun/taskrun_controller_test.go +++ b/kubechain/internal/controller/taskrun/taskrun_controller_test.go @@ -127,8 +127,42 @@ var _ = Describe("TaskRun Controller", func() { Expect(taskRun.Status.ContextWindow[1].Content).To(ContainSubstring(testTask.message)) ExpectRecorder(recorder).ToEmitEventContaining("ValidationSucceeded") }) - XIt("moves to ReadyForLLM if there is a userMessage + agentRef and no taskRef", func() { - // todo + It("moves to ReadyForLLM if there is a userMessage + agentRef and no taskRef", func() { + testAgent.SetupWithStatus(ctx, kubechain.AgentStatus{ + Status: "Ready", + Ready: true, + }) + defer testAgent.Teardown(ctx) + + testTaskRun2 := &TestTaskRun{ + name: "test-taskrun-2", + agentName: testAgent.name, + userMessage: "test-user-message", + } + taskRun := testTaskRun2.SetupWithStatus(ctx, kubechain.TaskRunStatus{ + Phase: kubechain.TaskRunPhaseInitializing, + }) + defer testTaskRun2.Teardown(ctx) + + By("reconciling the taskrun") + reconciler, recorder := reconciler() + + result, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{Name: testTaskRun2.name, Namespace: "default"}, + }) + + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + + By("ensuring the context window is set correctly") + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testTaskRun2.name, Namespace: "default"}, taskRun)).To(Succeed()) + Expect(taskRun.Status.Phase).To(Equal(kubechain.TaskRunPhaseReadyForLLM)) + Expect(taskRun.Status.ContextWindow).To(HaveLen(2)) + Expect(taskRun.Status.ContextWindow[0].Role).To(Equal("system")) + Expect(taskRun.Status.ContextWindow[0].Content).To(ContainSubstring(testAgent.system)) + Expect(taskRun.Status.ContextWindow[1].Role).To(Equal("user")) + Expect(taskRun.Status.ContextWindow[1].Content).To(ContainSubstring("test-user-message")) + ExpectRecorder(recorder).ToEmitEventContaining("ValidationSucceeded") }) }) Context("Pending -> ReadyForLLM", func() { diff --git a/kubechain/internal/controller/taskrun/utils_test.go b/kubechain/internal/controller/taskrun/utils_test.go index 7a74911..b2fbfb6 100644 --- a/kubechain/internal/controller/taskrun/utils_test.go +++ b/kubechain/internal/controller/taskrun/utils_test.go @@ -184,9 +184,11 @@ func (t *TestTask) Teardown(ctx context.Context) { } type TestTaskRun struct { - name string - taskName string - taskRun *kubechain.TaskRun + name string + taskName string + agentName string + userMessage string + taskRun *kubechain.TaskRun } func (t *TestTaskRun) Setup(ctx context.Context) *kubechain.TaskRun { @@ -196,12 +198,22 @@ func (t *TestTaskRun) Setup(ctx context.Context) *kubechain.TaskRun { Name: t.name, Namespace: "default", }, - Spec: kubechain.TaskRunSpec{ - TaskRef: &kubechain.LocalObjectReference{ - Name: t.taskName, - }, - }, + Spec: kubechain.TaskRunSpec{}, + } + if t.taskName != "" { + taskRun.Spec.TaskRef = &kubechain.LocalObjectReference{ + Name: t.taskName, + } } + if t.agentName != "" { + taskRun.Spec.AgentRef = &kubechain.LocalObjectReference{ + Name: t.agentName, + } + } + if t.userMessage != "" { + taskRun.Spec.UserMessage = t.userMessage + } + err := k8sClient.Create(ctx, taskRun) Expect(err).NotTo(HaveOccurred())