这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ KubeChain is a cloud-native orchestrator for AI Agents built on Kubernetes. It s
- [Setting Up a Local Cluster](#setting-up-a-local-cluster)
- [Deploying KubeChain](#deploying-kubechain)
- [Creating Your First Agent](#creating-your-first-agent)
- [Running Your First Task](#running-your-first-task)
- [Running Your First TaskRun](#running-your-first-taskrun)
- [Inspecting the TaskRun more closely](#inspecting-the-taskrun-more-closely)
- [Adding Tools with MCP](#adding-tools-with-mcp)
- [Cleaning Up](#cleaning-up)
Expand All @@ -47,8 +47,7 @@ KubeChain is a cloud-native orchestrator for AI Agents built on Kubernetes. It s
- **LLM**: Provider + API Keys + Parameters
- **Agent**: LLM + System Prompt + Tools
- **Tool**: MCP server or another Agent
- **Task**: Agent + User Message
- **TaskRun**: Task + Current context window
- **TaskRun**: Agent + User Message + Current context window

## Getting Started

Expand Down Expand Up @@ -308,22 +307,20 @@ Events:
Normal ValidationSucceeded 64m (x2 over 64m) agent-controller All dependencies validated successfully
```

</details>

### Running Your First Task
</details>### Running Your First TaskRun
Create a TaskRun to interact with your agent:

1. **Create a Task resource**

```bash
cat <<EOF | kubectl apply -f -
apiVersion: kubechain.humanlayer.dev/v1alpha1
kind: Task
kind: TaskRun
metadata:
name: hello-world-task
name: hello-world-1
spec:
agentRef:
name: my-assistant
message: "What is the capital of the moon?"
userMessage: "What is the capital of the moon?"
EOF
```

Expand All @@ -349,15 +346,15 @@ graph RL
SystemPrompt
end

subgraph Task
subgraph TaskRun
AgentRef
Message
UserMessage
end
```
Check the created Task:
Check the created TaskRun:

```bash
kubectl get task
kubectl get taskrun
```

Output:
Expand All @@ -371,18 +368,18 @@ hello-world-task true Ready my-assistant What is the capital of the mo
<summary>Using `-o wide` and `describe`</summary>

```bash
kubectl get task -o wide
kubectl get taskrun -o wide
```

Output:

```
NAME READY STATUS DETAIL AGENT MESSAGE OUTPUT
hello-world-task true Ready Task Run Created my-assistant What is the capital of the moon?
NAME READY STATUS DETAIL AGENT PREVIEW OUTPUT
hello-world-1 true Ready Task Run Created my-assistant What is the capital of the moon?
```

```bash
kubectl describe task
kubectl describe taskrun
```

Output:
Expand Down
2 changes: 1 addition & 1 deletion kubechain/config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ kind: Kustomization
images:
- name: controller
newName: controller
newTag: "202504021329"
newTag: "202504041032"
73 changes: 65 additions & 8 deletions kubechain/internal/controller/taskrun/taskrun_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,14 @@ type TaskRunReconciler struct {

// getTask fetches the parent Task for this TaskRun
func (r *TaskRunReconciler) getTask(ctx context.Context, taskRun *kubechainv1alpha1.TaskRun) (*kubechainv1alpha1.Task, error) {
// If we have agentRef and userMessage, we don't need a task
if taskRun.Spec.AgentRef != nil && taskRun.Spec.UserMessage != "" {
return nil, nil
}

// If TaskRef is nil, we can't fetch the task
if taskRun.Spec.TaskRef == nil {
return nil, fmt.Errorf("TaskRef is required but was not provided")
return nil, fmt.Errorf("taskRef is required when agentRef and userMessage are not provided")
}

task := &kubechainv1alpha1.Task{}
Expand All @@ -78,7 +84,7 @@ func (r *TaskRunReconciler) getTask(ctx context.Context, taskRun *kubechainv1alp
func (r *TaskRunReconciler) validateTaskAndAgent(ctx context.Context, taskRun *kubechainv1alpha1.TaskRun, statusUpdate *kubechainv1alpha1.TaskRun) (*kubechainv1alpha1.Task, *kubechainv1alpha1.Agent, ctrl.Result, error) {
logger := log.FromContext(ctx)

// Get parent Task
// Get parent Task if needed
task, err := r.getTask(ctx, taskRun)
if err != nil {
r.recorder.Event(taskRun, corev1.EventTypeWarning, "TaskValidationFailed", err.Error())
Expand Down Expand Up @@ -106,9 +112,6 @@ func (r *TaskRunReconciler) validateTaskAndAgent(ctx context.Context, taskRun *k
logger.Error(updateErr, "Failed to update TaskRun status")
return nil, nil, ctrl.Result{}, fmt.Errorf("failed to update taskrun status: %v", err)
}
// todo dont error if not found, don't requeue
// (can use client.IgnoreNotFound(err), but today
// the parent method needs err != nil to break control flow properly)
return nil, nil, ctrl.Result{}, err
}

Expand All @@ -128,9 +131,30 @@ func (r *TaskRunReconciler) validateTaskAndAgent(ctx context.Context, taskRun *k
return nil, nil, ctrl.Result{RequeueAfter: time.Second * 5}, nil
}

// Get the Agent referenced by the Task
// Get the Agent - use TaskRun's AgentRef if provided, otherwise use Task's AgentRef
var agentRef kubechainv1alpha1.LocalObjectReference
if taskRun.Spec.AgentRef != nil {
agentRef = *taskRun.Spec.AgentRef
} else if task != nil {
agentRef = task.Spec.AgentRef
} else {
err := fmt.Errorf("no agent reference found")
logger.Error(err, "Missing agent reference")
statusUpdate.Status.Ready = false
statusUpdate.Status.Status = StatusError
statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseFailed
statusUpdate.Status.StatusDetail = err.Error()
statusUpdate.Status.Error = err.Error()
r.recorder.Event(taskRun, corev1.EventTypeWarning, "ValidationFailed", err.Error())
if updateErr := r.Status().Update(ctx, statusUpdate); updateErr != nil {
logger.Error(updateErr, "Failed to update TaskRun status")
return nil, nil, ctrl.Result{}, updateErr
}
return nil, nil, ctrl.Result{}, err
}

var agent kubechainv1alpha1.Agent
if err := r.Get(ctx, client.ObjectKey{Namespace: task.Namespace, Name: task.Spec.AgentRef.Name}, &agent); err != nil {
if err := r.Get(ctx, client.ObjectKey{Namespace: taskRun.Namespace, Name: agentRef.Name}, &agent); err != nil {
logger.Error(err, "Failed to get Agent")
statusUpdate.Status.Ready = false
statusUpdate.Status.Status = StatusPending
Expand Down Expand Up @@ -161,24 +185,49 @@ func (r *TaskRunReconciler) validateTaskAndAgent(ctx context.Context, taskRun *k
return nil, nil, ctrl.Result{RequeueAfter: time.Second * 5}, nil
}

// If we get here, either task is nil (userMessage + agentRef case) or task is ready
return task, &agent, ctrl.Result{}, nil
}

// prepareForLLM sets up the initial state of a TaskRun with the correct context and phase
func (r *TaskRunReconciler) prepareForLLM(ctx context.Context, taskRun *kubechainv1alpha1.TaskRun, statusUpdate *kubechainv1alpha1.TaskRun, task *kubechainv1alpha1.Task, agent *kubechainv1alpha1.Agent) (ctrl.Result, error) {
logger := log.FromContext(ctx)

// If we're in Initializing or Pending phase, transition to ReadyForLLM
if statusUpdate.Status.Phase == kubechainv1alpha1.TaskRunPhaseInitializing || statusUpdate.Status.Phase == kubechainv1alpha1.TaskRunPhasePending {
statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseReadyForLLM
statusUpdate.Status.Ready = true

// Use userMessage from TaskRun if provided, otherwise use task's message
message := taskRun.Spec.UserMessage
if message == "" && task != nil {
message = task.Spec.Message
}

if message == "" {
err := fmt.Errorf("no message found in TaskRun or Task")
logger.Error(err, "Missing message")
statusUpdate.Status.Ready = false
statusUpdate.Status.Status = StatusError
statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseFailed
statusUpdate.Status.StatusDetail = err.Error()
statusUpdate.Status.Error = err.Error()
r.recorder.Event(taskRun, corev1.EventTypeWarning, "ValidationFailed", err.Error())
if updateErr := r.Status().Update(ctx, statusUpdate); updateErr != nil {
logger.Error(updateErr, "Failed to update TaskRun status")
return ctrl.Result{}, updateErr
}
return ctrl.Result{}, err
}

statusUpdate.Status.ContextWindow = []kubechainv1alpha1.Message{
{
Role: "system",
Content: agent.Spec.System,
},
{
Role: "user",
Content: task.Spec.Message,
Content: message,
},
}
statusUpdate.Status.Status = StatusReady
Expand Down Expand Up @@ -644,22 +693,26 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

// Skip reconciliation for terminal states
if statusUpdate.Status.Phase == kubechainv1alpha1.TaskRunPhaseFinalAnswer || statusUpdate.Status.Phase == kubechainv1alpha1.TaskRunPhaseFailed {
// todo do we need this V()??
logger.V(1).Info("TaskRun in terminal state, skipping reconciliation", "phase", statusUpdate.Status.Phase)
return ctrl.Result{}, nil
}

// Step 1: Validate Task and Agent
logger.V(3).Info("Validating Task and Agent")
task, agent, result, err := r.validateTaskAndAgent(ctx, &taskRun, statusUpdate)
if err != nil || !result.IsZero() {
return result, err
}

// Step 2: Initialize Phase if necessary
logger.V(3).Info("Preparing for LLM")
if result, err := r.prepareForLLM(ctx, &taskRun, statusUpdate, task, agent); err != nil || !result.IsZero() {
return result, err
}

// Step 3: Handle tool calls phase
logger.V(3).Info("Handling tool calls phase")
if taskRun.Status.Phase == kubechainv1alpha1.TaskRunPhaseToolCallsPending {
return r.processToolCalls(ctx, &taskRun)
}
Expand All @@ -671,12 +724,14 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

// Step 5: Get API credentials (LLM is returned but not used)
logger.V(3).Info("Getting API credentials")
_, apiKey, err := r.getLLMAndCredentials(ctx, agent, &taskRun, statusUpdate)
if err != nil {
return ctrl.Result{}, err
}

// Step 6: Create LLM client
logger.V(3).Info("Creating LLM client")
llmClient, err := r.newLLMClient(apiKey)
if err != nil {
logger.Error(err, "Failed to create OpenAI client")
Expand Down Expand Up @@ -708,6 +763,7 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
defer childSpan.End()
}

logger.V(3).Info("Sending LLM request")
// Step 8: Send the prompt to the LLM
output, err := llmClient.SendRequest(childCtx, taskRun.Status.ContextWindow, tools)
if err != nil {
Expand Down Expand Up @@ -748,6 +804,7 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
childSpan.SetStatus(codes.Ok, "LLM request succeeded")
}

logger.V(3).Info("Processing LLM response")
// Step 9: Process LLM response
var llmResult ctrl.Result
llmResult, err = r.processLLMResponse(ctx, output, &taskRun, statusUpdate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,42 @@ var _ = Describe("TaskRun Controller", func() {
Expect(taskRun.Status.ContextWindow[1].Content).To(ContainSubstring(testTask.message))
ExpectRecorder(recorder).ToEmitEventContaining("ValidationSucceeded")
})
XIt("moves to ReadyForLLM if there is a userMessage + agentRef and no taskRef", func() {
// todo
It("moves to ReadyForLLM if there is a userMessage + agentRef and no taskRef", func() {
testAgent.SetupWithStatus(ctx, kubechain.AgentStatus{
Status: "Ready",
Ready: true,
})
defer testAgent.Teardown(ctx)

testTaskRun2 := &TestTaskRun{
name: "test-taskrun-2",
agentName: testAgent.name,
userMessage: "test-user-message",
}
taskRun := testTaskRun2.SetupWithStatus(ctx, kubechain.TaskRunStatus{
Phase: kubechain.TaskRunPhaseInitializing,
})
defer testTaskRun2.Teardown(ctx)

By("reconciling the taskrun")
reconciler, recorder := reconciler()

result, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: types.NamespacedName{Name: testTaskRun2.name, Namespace: "default"},
})

Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeTrue())

By("ensuring the context window is set correctly")
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testTaskRun2.name, Namespace: "default"}, taskRun)).To(Succeed())
Expect(taskRun.Status.Phase).To(Equal(kubechain.TaskRunPhaseReadyForLLM))
Expect(taskRun.Status.ContextWindow).To(HaveLen(2))
Expect(taskRun.Status.ContextWindow[0].Role).To(Equal("system"))
Expect(taskRun.Status.ContextWindow[0].Content).To(ContainSubstring(testAgent.system))
Expect(taskRun.Status.ContextWindow[1].Role).To(Equal("user"))
Expect(taskRun.Status.ContextWindow[1].Content).To(ContainSubstring("test-user-message"))
ExpectRecorder(recorder).ToEmitEventContaining("ValidationSucceeded")
})
})
Context("Pending -> ReadyForLLM", func() {
Expand Down
28 changes: 20 additions & 8 deletions kubechain/internal/controller/taskrun/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,11 @@ func (t *TestTask) Teardown(ctx context.Context) {
}

type TestTaskRun struct {
name string
taskName string
taskRun *kubechain.TaskRun
name string
taskName string
agentName string
userMessage string
taskRun *kubechain.TaskRun
}

func (t *TestTaskRun) Setup(ctx context.Context) *kubechain.TaskRun {
Expand All @@ -196,12 +198,22 @@ func (t *TestTaskRun) Setup(ctx context.Context) *kubechain.TaskRun {
Name: t.name,
Namespace: "default",
},
Spec: kubechain.TaskRunSpec{
TaskRef: &kubechain.LocalObjectReference{
Name: t.taskName,
},
},
Spec: kubechain.TaskRunSpec{},
}
if t.taskName != "" {
taskRun.Spec.TaskRef = &kubechain.LocalObjectReference{
Name: t.taskName,
}
}
if t.agentName != "" {
taskRun.Spec.AgentRef = &kubechain.LocalObjectReference{
Name: t.agentName,
}
}
if t.userMessage != "" {
taskRun.Spec.UserMessage = t.userMessage
}

err := k8sClient.Create(ctx, taskRun)
Expect(err).NotTo(HaveOccurred())

Expand Down