这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 65 additions & 18 deletions plugins/scheduler-k3s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ func getKubeContext() string {
return common.PropertyGetDefault("scheduler-k3s", "--global", "kube-context", DefaultKubeContext)
}

type NotFoundError struct {
Message string
}

func (e *NotFoundError) Error() string {
return e.Message
}

type EmptyResultsError struct {
Message string
}

func (e *EmptyResultsError) Error() string {
return e.Message
}

// KubernetesClient is a wrapper around the Kubernetes client
type KubernetesClient struct {
// Client is the Kubernetes client
Expand Down Expand Up @@ -141,7 +157,7 @@ func (k KubernetesClient) AnnotateNode(ctx context.Context, input AnnotateNodeIn
}

if node == nil {
return errors.New("node is nil")
return &NotFoundError{"node is nil"}
}

keyPath := fmt.Sprintf("/metadata/annotations/%s", jsonpointer.Escape(input.Key))
Expand Down Expand Up @@ -206,7 +222,7 @@ func (k KubernetesClient) CreateJob(ctx context.Context, input CreateJobInput) (
}

if job == nil {
return batchv1.Job{}, errors.New("job is nil")
return batchv1.Job{}, &NotFoundError{"job is nil"}
}

return *job, err
Expand Down Expand Up @@ -237,7 +253,7 @@ func (k KubernetesClient) CreateNamespace(ctx context.Context, input CreateNames
}

if namespace == nil {
return corev1.Namespace{}, errors.New("namespace is nil")
return corev1.Namespace{}, &NotFoundError{"namespace is nil"}
}

return *namespace, err
Expand Down Expand Up @@ -405,7 +421,7 @@ func (k KubernetesClient) GetNode(ctx context.Context, input GetNodeInput) (Node
}

if node == nil {
return Node{}, errors.New("node is nil")
return Node{}, &NotFoundError{"node is nil"}
}

return kubernetesNodeToNode(*node), err
Expand All @@ -428,12 +444,35 @@ func (k KubernetesClient) GetPod(ctx context.Context, input GetPodInput) (corev1
}

if pod == nil {
return corev1.Pod{}, errors.New("pod is nil")
return corev1.Pod{}, &NotFoundError{"pod is nil"}
}

return *pod, err
}

// GetSecretInput contains all the information needed to get a Kubernetes secret
type GetSecretInput struct {
// Name is the Kubernetes secret name
Name string

// Namespace is the Kubernetes namespace
Namespace string
}

// GetSecret gets a Kubernetes secret
func (k KubernetesClient) GetSecret(ctx context.Context, input GetSecretInput) (corev1.Secret, error) {
secret, err := k.Client.CoreV1().Secrets(input.Namespace).Get(ctx, input.Name, metav1.GetOptions{})
if err != nil {
return corev1.Secret{}, err
}

if secret == nil {
return corev1.Secret{}, &NotFoundError{"secret is nil"}
}

return *secret, err
}

// LabelNodeInput contains all the information needed to label a Kubernetes node
type LabelNodeInput struct {
// Name is the Kubernetes node name
Expand All @@ -452,7 +491,7 @@ func (k KubernetesClient) LabelNode(ctx context.Context, input LabelNodeInput) e
}

if node == nil {
return errors.New("node is nil")
return &NotFoundError{"node is nil"}
}

keyPath := fmt.Sprintf("/metadata/labels/%s", jsonpointer.Escape(input.Key))
Expand Down Expand Up @@ -489,6 +528,10 @@ func (k KubernetesClient) ListClusterTriggerAuthentications(ctx context.Context,
return []kedav1alpha1.ClusterTriggerAuthentication{}, err
}

if response == nil || len(response.Items) == 0 {
return []kedav1alpha1.ClusterTriggerAuthentication{}, &EmptyResultsError{"cluster trigger authentications is nil"}
}

triggerAuthentications := []kedav1alpha1.ClusterTriggerAuthentication{}
for _, triggerAuthentication := range response.Items {
var ta kedav1alpha1.ClusterTriggerAuthentication
Expand Down Expand Up @@ -524,8 +567,8 @@ func (k KubernetesClient) ListCronJobs(ctx context.Context, input ListCronJobsIn
return []batchv1.CronJob{}, err
}

if cronJobs == nil {
return []batchv1.CronJob{}, errors.New("cron jobs is nil")
if cronJobs == nil || len(cronJobs.Items) == 0 {
return []batchv1.CronJob{}, &EmptyResultsError{"cron jobs is nil"}
}

return cronJobs.Items, err
Expand All @@ -548,8 +591,8 @@ func (k KubernetesClient) ListDeployments(ctx context.Context, input ListDeploym
return []appsv1.Deployment{}, err
}

if deployments == nil {
return []appsv1.Deployment{}, errors.New("deployments is nil")
if deployments == nil || len(deployments.Items) == 0 {
return []appsv1.Deployment{}, &EmptyResultsError{"deployments list is nil"}
}

return deployments.Items, nil
Expand All @@ -572,8 +615,8 @@ func (k KubernetesClient) ListIngresses(ctx context.Context, input ListIngresses
return []networkingv1.Ingress{}, err
}

if ingresses == nil {
return []networkingv1.Ingress{}, errors.New("ingresses is nil")
if ingresses == nil || len(ingresses.Items) == 0 {
return []networkingv1.Ingress{}, &EmptyResultsError{"ingresses is nil"}
}

return ingresses.Items, nil
Expand All @@ -585,8 +628,8 @@ func (k KubernetesClient) ListNamespaces(ctx context.Context) ([]corev1.Namespac
if err != nil {
return []corev1.Namespace{}, err
}
if namespaces == nil {
return []corev1.Namespace{}, errors.New("namespaces is nil")
if namespaces == nil || len(namespaces.Items) == 0 {
return []corev1.Namespace{}, &EmptyResultsError{"namespaces list is nil"}
}

return namespaces.Items, nil
Expand All @@ -610,8 +653,8 @@ func (k KubernetesClient) ListNodes(ctx context.Context, input ListNodesInput) (
return []corev1.Node{}, err
}

if nodeList == nil {
return []corev1.Node{}, errors.New("pod list is nil")
if nodeList == nil || len(nodeList.Items) == 0 {
return []corev1.Node{}, &EmptyResultsError{"pod list is nil"}
}

return nodeList.Items, err
Expand All @@ -634,8 +677,8 @@ func (k KubernetesClient) ListPods(ctx context.Context, input ListPodsInput) ([]
return []corev1.Pod{}, err
}

if podList == nil {
return []corev1.Pod{}, errors.New("pod list is nil")
if podList == nil || len(podList.Items) == 0 {
return []corev1.Pod{}, &EmptyResultsError{"pod list is nil"}
}

return podList.Items, err
Expand Down Expand Up @@ -665,6 +708,10 @@ func (k KubernetesClient) ListTriggerAuthentications(ctx context.Context, input
return []kedav1alpha1.TriggerAuthentication{}, err
}

if response == nil || len(response.Items) == 0 {
return []kedav1alpha1.TriggerAuthentication{}, &EmptyResultsError{"trigger authentications is nil"}
}

triggerAuthentications := []kedav1alpha1.TriggerAuthentication{}
for _, triggerAuthentication := range response.Items {
var ta kedav1alpha1.TriggerAuthentication
Expand Down
34 changes: 25 additions & 9 deletions plugins/scheduler-k3s/triggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,32 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args []
}

attachToPod := os.Getenv("DOKKU_DETACH_CONTAINER") != "1"

clientset, err := NewKubernetesClient()
if err != nil {
return fmt.Errorf("Error creating kubernetes client: %w", err)
}

if err := clientset.Ping(); err != nil {
return fmt.Errorf("kubernetes api not available: %w", err)
}

imagePullSecrets := getComputedImagePullSecrets(appName)
if imagePullSecrets == "" {
imagePullSecrets = fmt.Sprintf("ims-%s.%d", appName, deploymentID)
_, err := clientset.GetSecret(context.Background(), GetSecretInput{
Name: imagePullSecrets,
Namespace: namespace,
})

if err != nil {
if _, ok := err.(*NotFoundError); !ok {
return fmt.Errorf("Error getting image pull secret: %w", err)
}
imagePullSecrets = ""
}
}

workingDir := common.GetWorkingDir(appName, image)
job, err := templateKubernetesJob(Job{
AppName: appName,
Expand All @@ -1132,15 +1157,6 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args []
color.NoColor = false
}

clientset, err := NewKubernetesClient()
if err != nil {
return fmt.Errorf("Error creating kubernetes client: %w", err)
}

if err := clientset.Ping(); err != nil {
return fmt.Errorf("kubernetes api not available: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP,
Expand Down
Loading