-
Notifications
You must be signed in to change notification settings - Fork 34
Separate the humio client from cluster methods, move the kubernetes-related methods to a separate package and add initial unit tests #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3ef782c
956607e
bf1887c
c481b88
4e4e46f
45af425
fe92315
c29fca1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,3 +6,4 @@ spec: | |
| image: humio/humio-core | ||
| version: "1.9.0" | ||
| targetReplicationFactor: 2 | ||
| storagePartitionsCount: 24 | ||
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| package humiocluster | ||
|
|
||
| import ( | ||
| humioClusterv1alpha1 "github.com/humio/humio-operator/pkg/apis/core/v1alpha1" | ||
| ) | ||
|
|
||
| const ( | ||
| name = "humiocluster" | ||
| namespace = "logging" | ||
| image = "humio/humio-core" | ||
| version = "1.9.0" | ||
| targetReplicationFactor = 2 | ||
| storagePartitionsCount = 24 | ||
| digestPartitionsCount = 24 | ||
| nodeCount = 3 | ||
| ) | ||
|
|
||
| func setDefaults(humioCluster *humioClusterv1alpha1.HumioCluster) { | ||
| if humioCluster.ObjectMeta.Name == "" { | ||
| humioCluster.ObjectMeta.Name = name | ||
| } | ||
| if humioCluster.ObjectMeta.Namespace == "" { | ||
| humioCluster.ObjectMeta.Namespace = namespace | ||
| } | ||
| if humioCluster.Spec.Image == "" { | ||
| humioCluster.Spec.Image = image | ||
| } | ||
| if humioCluster.Spec.TargetReplicationFactor == 0 { | ||
| humioCluster.Spec.TargetReplicationFactor = targetReplicationFactor | ||
| } | ||
| if humioCluster.Spec.StoragePartitionsCount == 0 { | ||
| humioCluster.Spec.StoragePartitionsCount = storagePartitionsCount | ||
| } | ||
| if humioCluster.Spec.DigestPartitionsCount == 0 { | ||
| humioCluster.Spec.DigestPartitionsCount = digestPartitionsCount | ||
| } | ||
| if humioCluster.Spec.NodeCount == 0 { | ||
| humioCluster.Spec.NodeCount = nodeCount | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,24 +2,40 @@ package humiocluster | |
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "strconv" | ||
|
|
||
| corev1alpha1 "github.com/humio/humio-operator/pkg/apis/core/v1alpha1" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
| corev1 "k8s.io/api/core/v1" | ||
| "k8s.io/apimachinery/pkg/api/errors" | ||
| k8serrors "k8s.io/apimachinery/pkg/api/errors" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/runtime" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| "k8s.io/apimachinery/pkg/util/intstr" | ||
| "sigs.k8s.io/controller-runtime/pkg/client" | ||
| "sigs.k8s.io/controller-runtime/pkg/controller" | ||
| "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" | ||
| "sigs.k8s.io/controller-runtime/pkg/handler" | ||
| logf "sigs.k8s.io/controller-runtime/pkg/log" | ||
| "sigs.k8s.io/controller-runtime/pkg/manager" | ||
| "sigs.k8s.io/controller-runtime/pkg/metrics" | ||
| "sigs.k8s.io/controller-runtime/pkg/reconcile" | ||
| "sigs.k8s.io/controller-runtime/pkg/source" | ||
| ) | ||
|
|
||
| var log = logf.Log.WithName("controller_humiocluster") | ||
| var ( | ||
| log = logf.Log.WithName("controller_humiocluster") | ||
| metricPodsCreated = prometheus.NewCounter(prometheus.CounterOpts{ | ||
| Name: "humio_controller_pods_created_total", | ||
| Help: "Total number of pod objects created by controller", | ||
| }) | ||
| metricPodsDeleted = prometheus.NewCounter(prometheus.CounterOpts{ | ||
| Name: "humio_controller_pods_deleted_total", | ||
| Help: "Total number of pod objects deleted by controller", | ||
| }) | ||
| ) | ||
|
|
||
| /** | ||
| * USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller | ||
|
|
@@ -86,9 +102,9 @@ func (r *ReconcileHumioCluster) Reconcile(request reconcile.Request) (reconcile. | |
| reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name) | ||
| reqLogger.Info("Reconciling HumioCluster") | ||
|
|
||
| // Fetch the HumioCluster instance | ||
| instance := &corev1alpha1.HumioCluster{} | ||
| err := r.client.Get(context.TODO(), request.NamespacedName, instance) | ||
| // Fetch the HumioCluster | ||
| humioCluster := &corev1alpha1.HumioCluster{} | ||
| err := r.client.Get(context.TODO(), request.NamespacedName, humioCluster) | ||
| if err != nil { | ||
| if errors.IsNotFound(err) { | ||
| // Request object not found, could have been deleted after reconcile request. | ||
|
|
@@ -100,33 +116,189 @@ func (r *ReconcileHumioCluster) Reconcile(request reconcile.Request) (reconcile. | |
| return reconcile.Result{}, err | ||
| } | ||
|
|
||
| // Define a new Pod object | ||
| pod := newPodForCR(instance) | ||
| // Set defaults | ||
| setDefaults(humioCluster) | ||
|
|
||
| // Set HumioCluster instance as the owner and controller | ||
| if err := controllerutil.SetControllerReference(instance, pod, r.scheme); err != nil { | ||
| // Ensure pods exist | ||
| err = r.ensurePodsExist(context.TODO(), humioCluster) | ||
| if err != nil { | ||
| return reconcile.Result{}, err | ||
| } | ||
|
|
||
| // Check if this Pod already exists | ||
| found := &corev1.Pod{} | ||
| err = r.client.Get(context.TODO(), types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, found) | ||
| if err != nil && errors.IsNotFound(err) { | ||
| reqLogger.Info("Creating a new Pod", "Pod.Namespace", pod.Namespace, "Pod.Name", pod.Name) | ||
| err = r.client.Create(context.TODO(), pod) | ||
| if err != nil { | ||
| return reconcile.Result{}, err | ||
| // All done, don't requeue | ||
| return reconcile.Result{}, nil | ||
| } | ||
|
|
||
| func (r *ReconcileHumioCluster) ensurePodsExist(conetext context.Context, humioCluster *corev1alpha1.HumioCluster) error { | ||
| // Ensure we have pods for the defined NodeCount. | ||
| // If scaling down, we will handle the extra/obsolete pods later. | ||
| for nodeID := 0; nodeID < humioCluster.Spec.NodeCount; nodeID++ { | ||
| var existingPod corev1.Pod | ||
| pod := constructPod(humioCluster, nodeID) | ||
|
|
||
| if err := controllerutil.SetControllerReference(humioCluster, pod, r.scheme); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Pod created successfully - don't requeue | ||
| return reconcile.Result{}, nil | ||
| } else if err != nil { | ||
| return reconcile.Result{}, err | ||
| if err := r.client.Get(context.TODO(), types.NamespacedName{ | ||
| Namespace: humioCluster.Namespace, | ||
| Name: fmt.Sprintf("%s-core-%d", humioCluster.Name, nodeID), | ||
| }, &existingPod); err != nil { | ||
| if k8serrors.IsNotFound(err) { | ||
|
|
||
| err := r.client.Create(context.TODO(), pod) | ||
| if err != nil { | ||
| log.Info(fmt.Sprintf("unable to create pod: %v", err)) | ||
| return fmt.Errorf("unable to create Pod for HumioCluster: %v", err) | ||
| } | ||
| log.Info(fmt.Sprintf("successfully created pod %s for HumioCluster %s with node id: %d", pod.Name, humioCluster.Name, nodeID)) | ||
| metricPodsCreated.Inc() | ||
| } | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Pod already exists - don't requeue | ||
| reqLogger.Info("Skip reconcile: Pod already exists", "Pod.Namespace", found.Namespace, "Pod.Name", found.Name) | ||
| return reconcile.Result{}, nil | ||
| func constructPod(hc *corev1alpha1.HumioCluster, nodeID int) *corev1.Pod { | ||
| return &corev1.Pod{ | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: fmt.Sprintf("%s-core-%d", hc.Name, nodeID), | ||
| Namespace: hc.Namespace, | ||
| Labels: labelsForHumio(hc.Name, nodeID), | ||
| OwnerReferences: []metav1.OwnerReference{ | ||
| { | ||
| APIVersion: hc.APIVersion, | ||
| Kind: hc.Kind, | ||
| Name: hc.Name, | ||
| UID: hc.UID, | ||
| }, | ||
| }, | ||
| }, | ||
| Spec: corev1.PodSpec{ | ||
| Hostname: fmt.Sprintf("%s-core-%d", hc.Name, nodeID), | ||
| Subdomain: hc.Name, | ||
| Containers: []corev1.Container{ | ||
| { | ||
| Name: "humio", | ||
| Image: fmt.Sprintf("%s:%s", hc.Spec.Image, hc.Spec.Version), | ||
| Ports: []corev1.ContainerPort{ | ||
| { | ||
| Name: "http", | ||
| ContainerPort: 8080, | ||
| Protocol: "TCP", | ||
| }, | ||
| { | ||
| Name: "es", | ||
| ContainerPort: 9200, | ||
| Protocol: "TCP", | ||
| }, | ||
| }, | ||
| Env: *constructEnvVarList(nodeID), | ||
| ImagePullPolicy: "IfNotPresent", | ||
| VolumeMounts: []corev1.VolumeMount{ | ||
| { | ||
| Name: "humio-data", | ||
| MountPath: "/data", | ||
| }, | ||
| }, | ||
| ReadinessProbe: &corev1.Probe{ | ||
| Handler: corev1.Handler{ | ||
| HTTPGet: &corev1.HTTPGetAction{ | ||
| Path: "/api/v1/status", | ||
| Port: intstr.IntOrString{IntVal: 8080}, | ||
| }, | ||
| }, | ||
| InitialDelaySeconds: 90, | ||
| PeriodSeconds: 5, | ||
| TimeoutSeconds: 2, | ||
| SuccessThreshold: 1, | ||
| FailureThreshold: 12, | ||
| }, | ||
| LivenessProbe: &corev1.Probe{ | ||
| Handler: corev1.Handler{ | ||
| HTTPGet: &corev1.HTTPGetAction{ | ||
| Path: "/api/v1/status", | ||
| Port: intstr.IntOrString{IntVal: 8080}, | ||
| }, | ||
| }, | ||
| InitialDelaySeconds: 90, | ||
| PeriodSeconds: 5, | ||
| TimeoutSeconds: 2, | ||
| SuccessThreshold: 1, | ||
| FailureThreshold: 12, | ||
| }, | ||
| }, | ||
| }, | ||
| Volumes: []corev1.Volume{ | ||
| { | ||
| Name: "humio-data", | ||
| VolumeSource: corev1.VolumeSource{ | ||
| PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ | ||
| ClaimName: fmt.Sprintf("%s-core-%d", hc.Name, nodeID), | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| func constructEnvVarList(nodeID int) *[]corev1.EnvVar { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we'll want to refactor this in an upcoming PR so we can merge env vars that are set in the spec (e.g. for things like saml, etc)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good to me! |
||
| return &[]corev1.EnvVar{ | ||
| { | ||
| Name: "THIS_POD_IP", | ||
| ValueFrom: &corev1.EnvVarSource{ | ||
| FieldRef: &corev1.ObjectFieldSelector{ | ||
| FieldPath: "status.podIP", | ||
| }, | ||
| }, | ||
| }, | ||
| { | ||
| Name: "POD_NAME", | ||
| ValueFrom: &corev1.EnvVarSource{ | ||
| FieldRef: &corev1.ObjectFieldSelector{ | ||
| FieldPath: "metadata.name", | ||
| }, | ||
| }, | ||
| }, | ||
| { | ||
| Name: "POD_NAMESPACE", | ||
| ValueFrom: &corev1.EnvVarSource{ | ||
| FieldRef: &corev1.ObjectFieldSelector{ | ||
| FieldPath: "metadata.namespace", | ||
| }, | ||
| }, | ||
| }, | ||
| {Name: "BOOTSTRAP_HOST_ID", Value: strconv.Itoa(nodeID)}, | ||
| {Name: "HUMIO_JVM_ARGS", Value: "-Xss2m -Xms256m -Xmx1536m -server -XX:+UseParallelOldGC -XX:+ScavengeBeforeFullGC -XX:+DisableExplicitGC"}, | ||
| {Name: "HUMIO_PORT", Value: "8080"}, | ||
| {Name: "ELASTIC_PORT", Value: "9200"}, | ||
| {Name: "KAFKA_MANAGED_BY_HUMIO", Value: "true"}, | ||
| {Name: "AUTHENTICATION_METHOD", Value: "single-user"}, | ||
| {Name: "SINGLE_USER_PASSWORD", Value: "temp"}, | ||
| {Name: "KAFKA_SERVERS", Value: "humio-cp-kafka-0.humio-cp-kafka-headless:9092"}, | ||
| {Name: "ZOOKEEPER_URL", Value: "humio-cp-zookeeper-0.humio-cp-zookeeper-headless:2181"}, | ||
| { | ||
| Name: "EXTERNAL_URL", // URL used by other Humio hosts. | ||
| Value: fmt.Sprintf("http://$(POD_NAME).core.$(POD_NAMESPACE).svc.cluster.local:$(HUMIO_PORT)"), | ||
| //Value: "http://$(POD_NAME).humio-humio-core-headless.$(POD_NAMESPACE).svc.cluster.local:8080", | ||
| //Value: "http://$(THIS_POD_IP):$(HUMIO_PORT)", | ||
| }, | ||
| { | ||
| Name: "PUBLIC_URL", // URL used by users/browsers. | ||
| //Value: "http://$(POD_NAME).humio-humio-core-headless.$(POD_NAMESPACE).svc.cluster.local:8080", | ||
| Value: "http://$(THIS_POD_IP):$(HUMIO_PORT)", | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| func labelsForHumio(clusterName string, nodeID int) map[string]string { | ||
| labels := map[string]string{ | ||
| "app": "humio", | ||
| "humio_cr": clusterName, | ||
| "humio_node_id": strconv.Itoa(nodeID), | ||
| } | ||
| return labels | ||
| } | ||
|
|
||
| // newPodForCR returns a busybox pod with the same name/namespace as the cr | ||
|
|
@@ -151,3 +323,8 @@ func newPodForCR(cr *corev1alpha1.HumioCluster) *corev1.Pod { | |
| }, | ||
| } | ||
| } | ||
|
|
||
| func init() { | ||
| metrics.Registry.MustRegister(metricPodsCreated) | ||
| metrics.Registry.MustRegister(metricPodsDeleted) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we want to do regarding Humio's data volume? We talked about focussing on our own use-case with ephemeral storage using hostPath, but maybe we'd also want a flag to maybe run with emptyDir when e.g. running it locally in kind/Minikube?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I filed #5.