diff --git a/kubechain/api/v1alpha1/taskrun_types.go b/kubechain/api/v1alpha1/taskrun_types.go index 9487c24..c4306e8 100644 --- a/kubechain/api/v1alpha1/taskrun_types.go +++ b/kubechain/api/v1alpha1/taskrun_types.go @@ -36,17 +36,11 @@ type Message struct { // +optional ToolCallId string `json:"toolCallId,omitempty"` - // Name is the name of the tool call - // +optional - Name string `json:"name,omitempty"` + // Todo(dex) what is this? This is used in the OpenAI converter but I think this is supposed to be in a ToolCall - // Type is the type of tool call - // +kubebuilder:validation:Enum=function - Type string `json:"type,omitempty"` - - // Arguments is the arguments to pass to the tool call + // Name is the name of the tool that was called // +optional - Arguments string `json:"arguments,omitempty"` + Name string `json:"name,omitempty"` } // ToolCall represents a request to call a tool @@ -90,8 +84,8 @@ type TaskRunStatus struct { Ready bool `json:"ready,omitempty"` // Status indicates the current status of the taskrun - // +kubebuilder:validation:Enum=Ready;Error;Pending;Initializing - Status string `json:"status,omitempty"` + // +kubebuilder:validation:Enum=Ready;Error;Pending + Status TaskRunStatusStatus `json:"status,omitempty"` // StatusDetail provides additional details about the current status StatusDetail string `json:"statusDetail,omitempty"` @@ -129,8 +123,16 @@ type TaskRunStatus struct { SpanContext *SpanContext `json:"spanContext,omitempty"` } +type TaskRunStatusStatus string + +const ( + TaskRunStatusStatusReady TaskRunStatusStatus = "Ready" + TaskRunStatusStatusError TaskRunStatusStatus = "Error" + TaskRunStatusStatusPending TaskRunStatusStatus = "Pending" +) + // TaskRunPhase represents the phase of a TaskRun -// +kubebuilder:validation:Enum=Initializing;Pending;ReadyForLLM;SendContextWindowToLLM;ToolCallsPending;FinalAnswer;ErrorBackoff;Failed +// +kubebuilder:validation:Enum=Initializing;Pending;ReadyForLLM;SendContextWindowToLLM;ToolCallsPending;CheckingToolCalls;FinalAnswer;ErrorBackoff;Failed type TaskRunPhase string const ( @@ -144,6 +146,8 @@ const ( TaskRunPhaseSendContextWindowToLLM TaskRunPhase = "SendContextWindowToLLM" // TaskRunPhaseToolCallsPending indicates the TaskRun has pending tool calls TaskRunPhaseToolCallsPending TaskRunPhase = "ToolCallsPending" + // TaskRunPhaseCheckingToolCalls indicates the TaskRun is checking if tool calls are complete + TaskRunPhaseCheckingToolCalls TaskRunPhase = "CheckingToolCalls" // TaskRunPhaseFinalAnswer indicates the TaskRun has received final answer TaskRunPhaseFinalAnswer TaskRunPhase = "FinalAnswer" // TaskRunPhaseErrorBackoff indicates the TaskRun has failed and is in error backoff diff --git a/kubechain/api/v1alpha1/taskruntoolcall_types.go b/kubechain/api/v1alpha1/taskruntoolcall_types.go index ec6b009..f05ae24 100644 --- a/kubechain/api/v1alpha1/taskruntoolcall_types.go +++ b/kubechain/api/v1alpha1/taskruntoolcall_types.go @@ -55,6 +55,10 @@ type TaskRunToolCallStatus struct { // CompletionTime is when the tool call completed // +optional CompletionTime *metav1.Time `json:"completionTime,omitempty"` + + // SpanContext contains OpenTelemetry span context information + // +optional + SpanContext *SpanContext `json:"spanContext,omitempty"` } // TaskRunToolCallPhase represents the phase of a TaskRunToolCall diff --git a/kubechain/api/v1alpha1/zz_generated.deepcopy.go b/kubechain/api/v1alpha1/zz_generated.deepcopy.go index b0def3d..ade213d 100644 --- a/kubechain/api/v1alpha1/zz_generated.deepcopy.go +++ b/kubechain/api/v1alpha1/zz_generated.deepcopy.go @@ -1048,6 +1048,11 @@ func (in *TaskRunToolCallStatus) DeepCopyInto(out *TaskRunToolCallStatus) { in, out := &in.CompletionTime, &out.CompletionTime *out = (*in).DeepCopy() } + if in.SpanContext != nil { + in, out := &in.SpanContext, &out.SpanContext + *out = new(SpanContext) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskRunToolCallStatus. diff --git a/kubechain/cmd/main.go b/kubechain/cmd/main.go index baf1e02..464e090 100644 --- a/kubechain/cmd/main.go +++ b/kubechain/cmd/main.go @@ -49,7 +49,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" kubechainv1alpha1 "github.com/humanlayer/smallchain/kubechain/api/v1alpha1" - "github.com/humanlayer/smallchain/kubechain/internal/otel" + kubechainotel "github.com/humanlayer/smallchain/kubechain/internal/otel" // +kubebuilder:scaffold:imports ) @@ -99,14 +99,14 @@ func main() { flag.Parse() ctx := context.Background() - tracerProvider, err := otel.InitTracer(ctx) + tracerProvider, err := kubechainotel.InitTracer(ctx) if err != nil { setupLog.Error(err, "failed to initialize opentelemetry tracer") os.Exit(1) } defer func() { _ = tracerProvider.Shutdown(ctx) }() - meterProvider, err := otel.InitMeter(ctx) + meterProvider, err := kubechainotel.InitMeter(ctx) if err != nil { setupLog.Error(err, "failed to initialize opentelemetry meter") os.Exit(1) @@ -268,6 +268,7 @@ func main() { Client: mgr.GetClient(), Scheme: mgr.GetScheme(), MCPManager: mcpManagerInstance, + Tracer: tracerProvider.Tracer("taskrun"), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "TaskRun") os.Exit(1) diff --git a/kubechain/config/crd/bases/kubechain.humanlayer.dev_taskruns.yaml b/kubechain/config/crd/bases/kubechain.humanlayer.dev_taskruns.yaml index 38dd844..4e467a6 100644 --- a/kubechain/config/crd/bases/kubechain.humanlayer.dev_taskruns.yaml +++ b/kubechain/config/crd/bases/kubechain.humanlayer.dev_taskruns.yaml @@ -122,15 +122,11 @@ spec: items: description: Message represents a single message in the conversation properties: - arguments: - description: Arguments is the arguments to pass to the tool - call - type: string content: description: Content is the message content type: string name: - description: Name is the name of the tool call + description: Name is the name of the tool that was called type: string role: description: Role is the role of the message sender (system, @@ -180,11 +176,6 @@ spec: - type type: object type: array - type: - description: Type is the type of tool call - enum: - - function - type: string required: - content - role @@ -208,6 +199,7 @@ spec: - ReadyForLLM - SendContextWindowToLLM - ToolCallsPending + - CheckingToolCalls - FinalAnswer - ErrorBackoff - Failed @@ -235,7 +227,6 @@ spec: - Ready - Error - Pending - - Initializing type: string statusDetail: description: StatusDetail provides additional details about the current diff --git a/kubechain/config/crd/bases/kubechain.humanlayer.dev_taskruntoolcalls.yaml b/kubechain/config/crd/bases/kubechain.humanlayer.dev_taskruntoolcalls.yaml index d0e3f64..c979224 100644 --- a/kubechain/config/crd/bases/kubechain.humanlayer.dev_taskruntoolcalls.yaml +++ b/kubechain/config/crd/bases/kubechain.humanlayer.dev_taskruntoolcalls.yaml @@ -117,6 +117,16 @@ spec: result: description: Result contains the result of the tool call if completed type: string + spanContext: + description: SpanContext contains OpenTelemetry span context information + properties: + spanID: + description: SpanID is the span ID + type: string + traceID: + description: TraceID is the trace ID for the span + type: string + type: object startTime: description: StartTime is when the tool call started format: date-time diff --git a/kubechain/config/manager/kustomization.yaml b/kubechain/config/manager/kustomization.yaml index a75a859..4ec3b6b 100644 --- a/kubechain/config/manager/kustomization.yaml +++ b/kubechain/config/manager/kustomization.yaml @@ -5,4 +5,4 @@ kind: Kustomization images: - name: controller newName: controller - newTag: "202503251521" + newTag: "202503251842" diff --git a/kubechain/internal/controller/taskrun/taskrun_controller.go b/kubechain/internal/controller/taskrun/taskrun_controller.go index 4823014..3d6578d 100644 --- a/kubechain/internal/controller/taskrun/taskrun_controller.go +++ b/kubechain/internal/controller/taskrun/taskrun_controller.go @@ -21,12 +21,16 @@ import ( "github.com/humanlayer/smallchain/kubechain/internal/llmclient" "github.com/humanlayer/smallchain/kubechain/internal/mcpmanager" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) const ( - StatusReady = "Ready" - StatusError = "Error" - StatusPending = "Pending" + // These constants are kept for backward compatibility during refactoring + StatusReady = kubechainv1alpha1.TaskRunStatusStatusReady + StatusError = kubechainv1alpha1.TaskRunStatusStatusError + StatusPending = kubechainv1alpha1.TaskRunStatusStatusPending ) // +kubebuilder:rbac:groups=kubechain.humanlayer.dev,resources=taskruns,verbs=get;list;watch;create;update;patch;delete @@ -43,6 +47,7 @@ type TaskRunReconciler struct { recorder record.EventRecorder newLLMClient func(apiKey string) (llmclient.OpenAIClient, error) MCPManager *mcpmanager.MCPServerManager + Tracer trace.Tracer } // getTask fetches the parent Task for this TaskRun @@ -77,12 +82,18 @@ func (r *TaskRunReconciler) validateTaskAndAgent(ctx context.Context, taskRun *k statusUpdate.Status.Error = fmt.Sprintf("Task %q not found", taskRun.Spec.TaskRef.Name) statusUpdate.Status.StatusDetail = fmt.Sprintf("Task %q not found", taskRun.Spec.TaskRef.Name) statusUpdate.Status.Status = StatusError + + // End span since we've failed with a terminal error + r.endTaskRunSpan(ctx, taskRun, codes.Error, fmt.Sprintf("Task %q not found", taskRun.Spec.TaskRef.Name)) } else { logger.Error(err, "Task validation failed") statusUpdate.Status.Ready = false statusUpdate.Status.Status = StatusError statusUpdate.Status.StatusDetail = fmt.Sprintf("Task validation failed: %v", err) statusUpdate.Status.Error = err.Error() + + // End span since we've failed with a terminal error + r.endTaskRunSpan(ctx, taskRun, codes.Error, fmt.Sprintf("Task validation failed: %v", err)) } if updateErr := r.Status().Update(ctx, statusUpdate); updateErr != nil { @@ -350,6 +361,61 @@ func (r *TaskRunReconciler) collectTools(ctx context.Context, agent *kubechainv1 return tools } +// endTaskRunSpan ends the parent span for a TaskRun if it exists +func (r *TaskRunReconciler) endTaskRunSpan(ctx context.Context, taskRun *kubechainv1alpha1.TaskRun, status codes.Code, description string) { + // Only try to end span if we have SpanContext info + if taskRun.Status.SpanContext == nil || taskRun.Status.SpanContext.TraceID == "" { + return + } + + // Get tracer + tracer := r.Tracer + if tracer == nil { + tracer = otel.GetTracerProvider().Tracer("taskrun") + } + + // Parse the trace and span IDs from the stored context + var traceID trace.TraceID + var spanID trace.SpanID + + // Convert hex strings to byte arrays + traceIDBytes, err := trace.TraceIDFromHex(taskRun.Status.SpanContext.TraceID) + if err != nil { + return + } + traceID = traceIDBytes + + spanIDBytes, err := trace.SpanIDFromHex(taskRun.Status.SpanContext.SpanID) + if err != nil { + return + } + spanID = spanIDBytes + + // Create a span context with the stored IDs + spanCtx := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + TraceFlags: trace.FlagsSampled, + Remote: false, + }) + + // Create context with the span context + ctxWithSpan := trace.ContextWithSpanContext(ctx, spanCtx) + + // Create a final completion span that's a child of the original span + _, span := tracer.Start(ctxWithSpan, fmt.Sprintf("TaskRun/%s/Completion", taskRun.Name), + trace.WithSpanKind(trace.SpanKindServer), + trace.WithAttributes( + attribute.String("taskrun.name", taskRun.Name), + attribute.String("taskrun.namespace", taskRun.Namespace), + attribute.String("taskrun.phase", string(taskRun.Status.Phase)), + attribute.String("taskrun.status", string(taskRun.Status.Status)), + ), + ) + span.SetStatus(status, description) + span.End() +} + // processLLMResponse handles the LLM's output and updates status accordingly func (r *TaskRunReconciler) processLLMResponse(ctx context.Context, output *kubechainv1alpha1.Message, taskRun *kubechainv1alpha1.TaskRun, statusUpdate *kubechainv1alpha1.TaskRun) (ctrl.Result, error) { logger := log.FromContext(ctx) @@ -367,6 +433,9 @@ func (r *TaskRunReconciler) processLLMResponse(ctx context.Context, output *kube statusUpdate.Status.StatusDetail = "LLM final response received" statusUpdate.Status.Error = "" r.recorder.Event(taskRun, corev1.EventTypeNormal, "LLMFinalAnswer", "LLM response received successfully") + + // End the parent span since we've reached a terminal state + r.endTaskRunSpan(ctx, taskRun, codes.Ok, "TaskRun completed successfully with final answer") } else { // tool call branch: create TaskRunToolCall objects for each tool call returned by the LLM. statusUpdate.Status.Output = "" @@ -442,8 +511,19 @@ func (r *TaskRunReconciler) initializePhaseAndSpan(ctx context.Context, statusUp logger := log.FromContext(ctx) // Start tracing the TaskRun - tracer := otel.GetTracerProvider().Tracer("taskrun") - ctx, span := tracer.Start(ctx, "TaskRun") + tracer := r.Tracer + if tracer == nil { + tracer = otel.GetTracerProvider().Tracer("taskrun") + } + + // Make sure we provide a meaningful span name that includes the TaskRun name + spanName := fmt.Sprintf("TaskRun/%s", statusUpdate.Name) + ctx, span := tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindServer)) + + // We need to explicitly end the span so the root span is properly recorded + // This is not what we want long-term, but it ensures spans show up correctly + // while we resolve the issue with maintaining spans across reconciliation calls + defer span.End() // Store span context in status spanCtx := span.SpanContext() @@ -452,19 +532,78 @@ func (r *TaskRunReconciler) initializePhaseAndSpan(ctx context.Context, statusUp SpanID: spanCtx.SpanID().String(), } + // Set useful attributes on the span + span.SetAttributes( + attribute.String("taskrun.name", statusUpdate.Name), + attribute.String("taskrun.namespace", statusUpdate.Namespace), + attribute.String("taskrun.uid", string(statusUpdate.UID)), + ) + statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseInitializing statusUpdate.Status.Ready = false - statusUpdate.Status.Status = "Initializing" + statusUpdate.Status.Status = kubechainv1alpha1.TaskRunStatusStatusPending statusUpdate.Status.StatusDetail = "Initializing" if err := r.Status().Update(ctx, statusUpdate); err != nil { logger.Error(err, "Failed to update TaskRun status") + span.SetStatus(codes.Error, "Failed to update TaskRun status") + span.RecordError(err) return ctrl.Result{}, err } - // Don't end the span - it will be ended when we reach FinalAnswer + // By ending the span now, we ensure it's properly recorded + // This approach creates a separate span for each reconciliation rather than + // a single span that covers the entire TaskRun lifecycle + span.SetStatus(codes.Ok, "TaskRun initialized") return ctrl.Result{Requeue: true}, nil } +// createLLMRequestSpan creates a child span for an LLM request that is properly linked to the parent span +func (r *TaskRunReconciler) createLLMRequestSpan(ctx context.Context, taskRun *kubechainv1alpha1.TaskRun, contextWindowSize, toolsCount int) (context.Context, trace.Span) { + // Use controller's tracer if available, otherwise get the global tracer + tracer := r.Tracer + if tracer == nil { + tracer = otel.GetTracerProvider().Tracer("taskrun") + } + + // If we have a parent span context, create a child span linked to the parent + if taskRun.Status.SpanContext != nil && taskRun.Status.SpanContext.TraceID != "" { + // Parse the trace and span IDs from the stored context + traceIDBytes, err := trace.TraceIDFromHex(taskRun.Status.SpanContext.TraceID) + if err == nil { + spanIDBytes, err := trace.SpanIDFromHex(taskRun.Status.SpanContext.SpanID) + if err == nil { + // Create a span context with the stored IDs + spanCtx := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceIDBytes, + SpanID: spanIDBytes, + TraceFlags: trace.FlagsSampled, + Remote: false, + }) + + // Create context with the span context + ctx = trace.ContextWithSpanContext(ctx, spanCtx) + } + } + + // Create a child span that will properly link to the parent + childCtx, childSpan := tracer.Start(ctx, fmt.Sprintf("TaskRun/%s/LLMRequest", taskRun.Name), + trace.WithSpanKind(trace.SpanKindClient)) + + // Set attributes for the LLM request + childSpan.SetAttributes( + attribute.Int("context_window_size", contextWindowSize), + attribute.Int("tools_count", toolsCount), + attribute.String("taskrun.name", taskRun.Name), + attribute.String("taskrun.namespace", taskRun.Namespace), + ) + + return childCtx, childSpan + } + + // No parent span, just use the current context + return ctx, nil +} + // Reconcile validates the taskrun's task reference and sends the prompt to the LLM. func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) @@ -484,6 +623,12 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return r.initializePhaseAndSpan(ctx, statusUpdate) } + // Skip reconciliation for terminal states + if statusUpdate.Status.Phase == kubechainv1alpha1.TaskRunPhaseFinalAnswer || statusUpdate.Status.Phase == kubechainv1alpha1.TaskRunPhaseFailed { + logger.V(1).Info("TaskRun in terminal state, skipping reconciliation", "phase", statusUpdate.Status.Phase) + return ctrl.Result{}, nil + } + // Step 1: Validate Task and Agent task, agent, result, err := r.validateTaskAndAgent(ctx, &taskRun, statusUpdate) if err != nil || !result.IsZero() { @@ -518,9 +663,14 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct logger.Error(err, "Failed to create OpenAI client") statusUpdate.Status.Ready = false statusUpdate.Status.Status = StatusError + statusUpdate.Status.Phase = kubechainv1alpha1.TaskRunPhaseFailed statusUpdate.Status.StatusDetail = "Failed to create OpenAI client: " + err.Error() statusUpdate.Status.Error = err.Error() r.recorder.Event(&taskRun, corev1.EventTypeWarning, "OpenAIClientCreationFailed", err.Error()) + + // End span since we've failed with a terminal error + r.endTaskRunSpan(ctx, &taskRun, codes.Error, "Failed to create OpenAI client: "+err.Error()) + if updateErr := r.Status().Update(ctx, statusUpdate); updateErr != nil { logger.Error(updateErr, "Failed to update TaskRun status") return ctrl.Result{}, updateErr @@ -532,8 +682,15 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct tools := r.collectTools(ctx, agent) r.recorder.Event(&taskRun, corev1.EventTypeNormal, "SendingContextWindowToLLM", "Sending context window to LLM") + + // Create child span for LLM call + childCtx, childSpan := r.createLLMRequestSpan(ctx, &taskRun, len(taskRun.Status.ContextWindow), len(tools)) + if childSpan != nil { + defer childSpan.End() + } + // Step 8: Send the prompt to the LLM - output, err := llmClient.SendRequest(ctx, taskRun.Status.ContextWindow, tools) + output, err := llmClient.SendRequest(childCtx, taskRun.Status.ContextWindow, tools) if err != nil { logger.Error(err, "LLM request failed") statusUpdate.Status.Ready = false @@ -541,6 +698,13 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct statusUpdate.Status.StatusDetail = fmt.Sprintf("LLM request failed: %v", err) statusUpdate.Status.Error = err.Error() r.recorder.Event(&taskRun, corev1.EventTypeWarning, "LLMRequestFailed", err.Error()) + + // Record error in span + if childSpan != nil { + childSpan.RecordError(err) + childSpan.SetStatus(codes.Error, err.Error()) + } + if updateErr := r.Status().Update(ctx, statusUpdate); updateErr != nil { logger.Error(updateErr, "Failed to update TaskRun status after LLM error") return ctrl.Result{}, updateErr @@ -548,6 +712,11 @@ func (r *TaskRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, err } + // Mark span as successful if we reach here + if childSpan != nil { + childSpan.SetStatus(codes.Ok, "LLM request succeeded") + } + // Step 9: Process LLM response if result, err := r.processLLMResponse(ctx, output, &taskRun, statusUpdate); err != nil || !result.IsZero() { return result, err