这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions deploy/crds/core.humio.com_humioclusters_crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ kind: CustomResourceDefinition
metadata:
name: humioclusters.core.humio.com
spec:
additionalPrinterColumns:
- JSONPath: .status.clusterState
description: The state of the cluster
name: State
type: string
group: core.humio.com
names:
kind: HumioCluster
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/core/v1alpha1/humiocluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// HumioClusterStateBoostrapping is the Bootstrapping state of the cluster
HumioClusterStateBoostrapping = "Bootstrapping"
// HumioClusterStateRunning is the Running state of the cluster
HumioClusterStateRunning = "Running"
)

// HumioClusterSpec defines the desired state of HumioCluster
type HumioClusterSpec struct {
// Desired container image including the image tag
Expand Down Expand Up @@ -36,6 +43,7 @@ type HumioClusterStatus struct {
// HumioCluster is the Schema for the humioclusters API
// +kubebuilder:subresource:status
// +kubebuilder:resource:path=humioclusters,scope=Namespaced
// +kubebuilder:printcolumn:name="State",type="string",JSONPath=".status.clusterState",description="The state of the cluster"
type HumioCluster struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand Down
49 changes: 43 additions & 6 deletions pkg/controller/humiocluster/humiocluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (r *ReconcileHumioCluster) Reconcile(request reconcile.Request) (reconcile.
// Assume we are bootstrapping if no cluster state is set.
// TODO: this is a workaround for the issue where humio pods cannot start up at the same time during the first boot
if humioCluster.Status.ClusterState == "" {
r.setClusterStatus(context.TODO(), "Boostrapping", humioCluster)
r.setClusterStatus(context.TODO(), corev1alpha1.HumioClusterStateBoostrapping, humioCluster)
}

// Ensure developer password is a k8s secret
Expand All @@ -149,13 +149,20 @@ func (r *ReconcileHumioCluster) Reconcile(request reconcile.Request) (reconcile.
}

// Ensure pods exist. Will requeue if not all pods are created and ready
if humioCluster.Status.ClusterState == corev1alpha1.HumioClusterStateBoostrapping {
result, err = r.ensurePodsBootstrapped(context.TODO(), humioCluster)
if result != emptyResult || err != nil {
return result, err
}
}

r.setClusterStatus(context.TODO(), corev1alpha1.HumioClusterStateRunning, humioCluster)

result, err = r.ensurePodsExist(context.TODO(), humioCluster)
if result != emptyResult || err != nil {
return result, err
}

r.setClusterStatus(context.TODO(), "Running", humioCluster)

// Ensure service exists
err = r.ensureServiceExists(context.TODO(), humioCluster)
if err != nil {
Expand All @@ -173,6 +180,7 @@ func (r *ReconcileHumioCluster) Reconcile(request reconcile.Request) (reconcile.
return reconcile.Result{}, err
}

// TODO: wait until all pods are ready before continuing
clusterController := humio.NewClusterController(r.humioClient)
err = r.ensurePartitionsAreBalanced(*clusterController, humioCluster)
if err != nil {
Expand All @@ -187,7 +195,7 @@ func (r *ReconcileHumioCluster) Reconcile(request reconcile.Request) (reconcile.
// TODO: we use this to determine if we should have a delay between startup of humio pods during bootstrap vs starting up pods during an image update
func (r *ReconcileHumioCluster) setClusterStatus(context context.Context, clusterState string, humioCluster *corev1alpha1.HumioCluster) error {
humioCluster.Status.ClusterState = clusterState
return r.client.Update(context, humioCluster)
return r.client.Status().Update(context, humioCluster)
}

func (r *ReconcileHumioCluster) ensurePodLabels(context context.Context, hc *corev1alpha1.HumioCluster) error {
Expand Down Expand Up @@ -315,7 +323,7 @@ func (r *ReconcileHumioCluster) ensureMismatchedPodVersionsAreDeleted(conetext c

// TODO: change to create 1 pod at a time, return Requeue=true and RequeueAfter.
// check that other pods, if they exist, are in a ready state
func (r *ReconcileHumioCluster) ensurePodsExist(conetext context.Context, humioCluster *corev1alpha1.HumioCluster) (reconcile.Result, error) {
func (r *ReconcileHumioCluster) ensurePodsBootstrapped(conetext context.Context, humioCluster *corev1alpha1.HumioCluster) (reconcile.Result, error) {
// Ensure we have pods for the defined NodeCount.
// If scaling down, we will handle the extra/obsolete pods later.
foundPodList, err := ListPods(r.client, humioCluster)
Expand All @@ -341,7 +349,7 @@ func (r *ReconcileHumioCluster) ensurePodsExist(conetext context.Context, humioC
return reconcile.Result{}, nil
}

if podsNotReadyCount > 0 && humioCluster.Status.ClusterState == "Bootstrapping" {
if podsNotReadyCount > 0 {
r.logger.Info(fmt.Sprintf("there are %d humio pods that are not ready. all humio pods must report ready before reconciliation can continue", podsNotReadyCount))
return reconcile.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}
Expand All @@ -367,6 +375,35 @@ func (r *ReconcileHumioCluster) ensurePodsExist(conetext context.Context, humioC
return reconcile.Result{}, nil
}

func (r *ReconcileHumioCluster) ensurePodsExist(conetext context.Context, humioCluster *corev1alpha1.HumioCluster) (reconcile.Result, error) {
// Ensure we have pods for the defined NodeCount.
// If scaling down, we will handle the extra/obsolete pods later.
foundPodList, err := ListPods(r.client, humioCluster)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to list pods: %s", err)
}

if len(foundPodList) < humioCluster.Spec.NodeCount {
pod, err := r.constructPod(humioCluster)
if err != nil {
return reconcile.Result{}, fmt.Errorf("unable to construct pod for HumioCluster: %v", err)
}

err = r.client.Create(context.TODO(), pod)
if err != nil {
log.Info(fmt.Sprintf("unable to create pod: %v", err))
return reconcile.Result{Requeue: true, RequeueAfter: time.Second * 5}, fmt.Errorf("unable to create Pod for HumioCluster: %v", err)
}
log.Info(fmt.Sprintf("successfully created pod %s for HumioCluster %s", pod.Name, humioCluster.Name))
metricPodsCreated.Inc()
// We have created a pod. Requeue immediately even if the pod is not ready. We will check the readiness status on the next reconciliation.
return reconcile.Result{Requeue: true}, nil
}

// TODO: what should happen if we have more pods than are expected?
return reconcile.Result{}, nil
}

// TODO: extend this (or create separate method) to take this password and perform a login, get the jwt token and then call the api to get the persistent api token and also store that as a secret
// this functionality should perhaps go into humio.cluster_auth.go
func (r *ReconcileHumioCluster) ensureDeveloperUserPasswordExists(conetext context.Context, humioCluster *corev1alpha1.HumioCluster) error {
Expand Down
98 changes: 72 additions & 26 deletions pkg/controller/humiocluster/humiocluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

humioapi "github.com/humio/cli/api"
humioClusterv1alpha1 "github.com/humio/humio-operator/pkg/apis/core/v1alpha1"
corev1alpha1 "github.com/humio/humio-operator/pkg/apis/core/v1alpha1"
"github.com/humio/humio-operator/pkg/humio"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -28,17 +28,17 @@ func TestReconcileHumioCluster_Reconcile(t *testing.T) {

tests := []struct {
name string
humioCluster *humioClusterv1alpha1.HumioCluster
humioCluster *corev1alpha1.HumioCluster
humioClient *humio.MockClientConfig
}{
{
"test simple cluster reconciliation",
&humioClusterv1alpha1.HumioCluster{
&corev1alpha1.HumioCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "humiocluster",
Namespace: "logging",
},
Spec: humioClusterv1alpha1.HumioClusterSpec{
Spec: corev1alpha1.HumioClusterSpec{
Image: "humio/humio-core:1.9.1",
TargetReplicationFactor: 2,
StoragePartitionsCount: 3,
Expand All @@ -55,12 +55,12 @@ func TestReconcileHumioCluster_Reconcile(t *testing.T) {
},
{
"test large cluster reconciliation",
&humioClusterv1alpha1.HumioCluster{
&corev1alpha1.HumioCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "humiocluster",
Namespace: "logging",
},
Spec: humioClusterv1alpha1.HumioClusterSpec{
Spec: corev1alpha1.HumioClusterSpec{
Image: "humio/humio-core:1.9.1",
TargetReplicationFactor: 3,
StoragePartitionsCount: 72,
Expand All @@ -86,7 +86,7 @@ func TestReconcileHumioCluster_Reconcile(t *testing.T) {

// Register operator types with the runtime scheme.
s := scheme.Scheme
s.AddKnownTypes(humioClusterv1alpha1.SchemeGroupVersion, tt.humioCluster)
s.AddKnownTypes(corev1alpha1.SchemeGroupVersion, tt.humioCluster)

// Create a fake client to mock API calls.
cl := fake.NewFakeClient(objs...)
Expand Down Expand Up @@ -120,19 +120,28 @@ func TestReconcileHumioCluster_Reconcile(t *testing.T) {
t.Errorf("reconcile: (%v)", err)
}

updatedHumioCluster := &corev1alpha1.HumioCluster{}
err = r.client.Get(context.TODO(), req.NamespacedName, updatedHumioCluster)
if err != nil {
t.Errorf("get HumioCluster: (%v)", err)
}
if updatedHumioCluster.Status.ClusterState != corev1alpha1.HumioClusterStateBoostrapping {
t.Errorf("expected cluster state to be %s but got %s", corev1alpha1.HumioClusterStateBoostrapping, updatedHumioCluster.Status.ClusterState)
}

// Check that the developer password exists as a k8s secret
secret, err := r.GetSecret(context.TODO(), tt.humioCluster, serviceAccountSecretName)
secret, err := r.GetSecret(context.TODO(), updatedHumioCluster, serviceAccountSecretName)
if err != nil {
t.Errorf("get secret with password: (%v). %+v", err, secret)
}
if string(secret.Data["password"]) == "" {
t.Errorf("password secret %s expected content to not be empty, but it was", serviceAccountSecretName)
}

for nodeCount := 0; nodeCount < tt.humioCluster.Spec.NodeCount; nodeCount++ {
foundPodList, err := ListPods(cl, tt.humioCluster)
if len(foundPodList) != nodeCount+1 {
t.Errorf("expected list pods to return equal to %d, got %d", nodeCount+1, len(foundPodList))
for nodeCount := 1; nodeCount <= tt.humioCluster.Spec.NodeCount; nodeCount++ {
foundPodList, err := ListPods(cl, updatedHumioCluster)
if len(foundPodList) != nodeCount {
t.Errorf("expected list pods to return equal to %d, got %d", nodeCount, len(foundPodList))
}

// We must update the IP address because when we attempt to add labels to the pod we validate that they have IP addresses first
Expand All @@ -147,18 +156,36 @@ func TestReconcileHumioCluster_Reconcile(t *testing.T) {
if err != nil {
t.Errorf("reconcile: (%v)", err)
}
}

// Check that we do not create more than expected number of humio pods
res, err = r.Reconcile(req)
if err != nil {
t.Errorf("reconcile: (%v)", err)
}
foundPodList, err := ListPods(cl, updatedHumioCluster)
if len(foundPodList) != tt.humioCluster.Spec.NodeCount {
t.Errorf("expected list pods to return equal to %d, got %d", tt.humioCluster.Spec.NodeCount, len(foundPodList))
}

updatedHumioCluster = &corev1alpha1.HumioCluster{}
err = r.client.Get(context.TODO(), req.NamespacedName, updatedHumioCluster)
if err != nil {
t.Errorf("get HumioCluster: (%v)", err)
}
if updatedHumioCluster.Status.ClusterState != corev1alpha1.HumioClusterStateRunning {
t.Errorf("expected cluster state to be %s but got %s", corev1alpha1.HumioClusterStateRunning, updatedHumioCluster.Status.ClusterState)
}

// Check that the service exists
service, err := r.GetService(context.TODO(), tt.humioCluster)
service, err := r.GetService(context.TODO(), updatedHumioCluster)
if err != nil {
t.Errorf("get service: (%v). %+v", err, service)
}

// Check that the persistent token exists as a k8s secret

token, err := r.GetSecret(context.TODO(), tt.humioCluster, serviceTokenSecretName)
token, err := r.GetSecret(context.TODO(), updatedHumioCluster, serviceTokenSecretName)
if err != nil {
t.Errorf("get secret with api token: (%v). %+v", err, token)
}
Expand All @@ -181,8 +208,7 @@ func TestReconcileHumioCluster_Reconcile(t *testing.T) {
t.Errorf("expected api token in use to be \"%+v\", but got \"%+v\"", "mocktoken", tokenInUse)
}

// Get the updated HumioCluster object.
updatedHumioCluster := &humioClusterv1alpha1.HumioCluster{}
// Get the updated HumioCluster to update it with the partitions
err = r.client.Get(context.TODO(), req.NamespacedName, updatedHumioCluster)
if err != nil {
t.Errorf("get HumioCluster: (%v)", err)
Expand All @@ -197,7 +223,7 @@ func TestReconcileHumioCluster_Reconcile(t *testing.T) {
t.Errorf("expected ingest partitions to be balanced. got %v, err %s", b, err)
}

foundPodList, err := ListPods(cl, tt.humioCluster)
foundPodList, err = ListPods(cl, updatedHumioCluster)
if err != nil {
t.Errorf("could not list pods to validate their content: %v", err)
}
Expand All @@ -222,18 +248,18 @@ func TestReconcileHumioCluster_Reconcile_update_humio_image(t *testing.T) {

tests := []struct {
name string
humioCluster *humioClusterv1alpha1.HumioCluster
humioCluster *corev1alpha1.HumioCluster
humioClient *humio.MockClientConfig
imageToUpdate string
}{
{
"test simple cluster humio image update",
&humioClusterv1alpha1.HumioCluster{
&corev1alpha1.HumioCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "humiocluster",
Namespace: "logging",
},
Spec: humioClusterv1alpha1.HumioClusterSpec{
Spec: corev1alpha1.HumioClusterSpec{
Image: "humio/humio-core:1.9.1",
TargetReplicationFactor: 2,
StoragePartitionsCount: 3,
Expand All @@ -260,7 +286,7 @@ func TestReconcileHumioCluster_Reconcile_update_humio_image(t *testing.T) {

// Register operator types with the runtime scheme.
s := scheme.Scheme
s.AddKnownTypes(humioClusterv1alpha1.SchemeGroupVersion, tt.humioCluster)
s.AddKnownTypes(corev1alpha1.SchemeGroupVersion, tt.humioCluster)

// Create a fake client to mock API calls.
cl := fake.NewFakeClient(objs...)
Expand Down Expand Up @@ -294,8 +320,18 @@ func TestReconcileHumioCluster_Reconcile_update_humio_image(t *testing.T) {
t.Errorf("reconcile: (%v)", err)
}

updatedHumioCluster := &corev1alpha1.HumioCluster{}
err = r.client.Get(context.TODO(), req.NamespacedName, updatedHumioCluster)
if err != nil {
t.Errorf("get HumioCluster: (%v)", err)
}
if updatedHumioCluster.Status.ClusterState != corev1alpha1.HumioClusterStateBoostrapping {
t.Errorf("expected cluster state to be %s but got %s", corev1alpha1.HumioClusterStateBoostrapping, updatedHumioCluster.Status.ClusterState)
}
tt.humioCluster = updatedHumioCluster

for nodeCount := 0; nodeCount < tt.humioCluster.Spec.NodeCount; nodeCount++ {
foundPodList, err := ListPods(cl, tt.humioCluster)
foundPodList, err := ListPods(cl, updatedHumioCluster)
if len(foundPodList) != nodeCount+1 {
t.Errorf("expected list pods to return equal to %d, got %d", nodeCount+1, len(foundPodList))
}
Expand All @@ -314,9 +350,19 @@ func TestReconcileHumioCluster_Reconcile_update_humio_image(t *testing.T) {
}
}

// Test that we're in a Running state
updatedHumioCluster = &corev1alpha1.HumioCluster{}
err = r.client.Get(context.TODO(), req.NamespacedName, updatedHumioCluster)
if err != nil {
t.Errorf("get HumioCluster: (%v)", err)
}
if updatedHumioCluster.Status.ClusterState != corev1alpha1.HumioClusterStateRunning {
t.Errorf("expected cluster state to be %s but got %s", corev1alpha1.HumioClusterStateRunning, updatedHumioCluster.Status.ClusterState)
}

// Update humio image
tt.humioCluster.Spec.Image = tt.imageToUpdate
cl.Update(context.TODO(), tt.humioCluster)
updatedHumioCluster.Spec.Image = tt.imageToUpdate
cl.Update(context.TODO(), updatedHumioCluster)

for nodeCount := 0; nodeCount < tt.humioCluster.Spec.NodeCount; nodeCount++ {
res, err := r.Reconcile(req)
Expand All @@ -329,7 +375,7 @@ func TestReconcileHumioCluster_Reconcile_update_humio_image(t *testing.T) {
}

// Ensure all the pods are shut down to prep for the image update
foundPodList, err := ListPods(cl, tt.humioCluster)
foundPodList, err := ListPods(cl, updatedHumioCluster)
if err != nil {
t.Errorf("failed to list pods: %s", err)
}
Expand All @@ -348,7 +394,7 @@ func TestReconcileHumioCluster_Reconcile_update_humio_image(t *testing.T) {
}
}

foundPodList, err = ListPods(cl, tt.humioCluster)
foundPodList, err = ListPods(cl, updatedHumioCluster)
if err != nil {
t.Errorf("failed to list pods: %s", err)
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/humiocluster/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ func labelsForPod(clusterName string, nodeID int) map[string]string {

func generatePodSuffix() string {
rand.Seed(time.Now().UnixNano())
chars := []rune("ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
"abcdefghijklmnopqrstuvwxyz" +
"0123456789")
chars := []rune("abcdefghijklmnopqrstuvwxyz")
length := 6
var b strings.Builder
for i := 0; i < length; i++ {
Expand Down