diff --git a/docs/appendices/file-formats/app-json.md b/docs/appendices/file-formats/app-json.md index c7500e19fe1..b3cd72c7bb1 100644 --- a/docs/appendices/file-formats/app-json.md +++ b/docs/appendices/file-formats/app-json.md @@ -23,6 +23,7 @@ - `command`: (string, required) - `maintenance`: (boolean, optional) - `schedule`: (string, required) +- `concurrency_policy`: (string, optional, default: `allow`, options: `allow`, `forbid`, `replace`) ## Formation diff --git a/docs/processes/scheduled-cron-tasks.md b/docs/processes/scheduled-cron-tasks.md index 39f659586a6..6ce3b5f71fb 100644 --- a/docs/processes/scheduled-cron-tasks.md +++ b/docs/processes/scheduled-cron-tasks.md @@ -38,6 +38,8 @@ A cron task takes the following properties: - `command`: A command to be run within the built app image. Specified commands can also be `Procfile` entries. - `maintenance`: A boolean value that decides whether the cron task is in maintenance and therefore executable or not. - `schedule`: A [cron-compatible](https://en.wikipedia.org/wiki/Cron#Overview) scheduling definition upon which to run the command. Seconds are generally not supported. +- `concurrency_policy`: A string (default: `allow`), that controls whether the cron task can be run concurrently with another invocation of itself. Valid options are `allow` (allow concurrency), `forbid` (exit the new cron task if there is an existing one), `replace` (delete any existing cron task and start the new one). + Zero or more cron tasks can be specified per app. Cron tasks are validated after the build artifact is created but before the app is deployed, and the cron schedule is updated during the post-deploy phase. diff --git a/plugins/app-json/appjson.go b/plugins/app-json/appjson.go index 99c856bc560..fe59c9942a3 100644 --- a/plugins/app-json/appjson.go +++ b/plugins/app-json/appjson.go @@ -60,6 +60,9 @@ type CronTask struct { // Schedule is the cron schedule to execute the command on Schedule string `json:"schedule"` + + // ConcurrencyPolicy is the concurrency policy for the cron command + ConcurrencyPolicy string `json:"concurrency_policy"` } // Formation is a struct that represents the scale for a process from an app.json file diff --git a/plugins/common/log.go b/plugins/common/log.go index 9a32811384d..9e67287b4ba 100644 --- a/plugins/common/log.go +++ b/plugins/common/log.go @@ -90,7 +90,9 @@ func LogFailWithError(err error) { fmt.Fprintf(os.Stderr, " ! %s\n", e.Error()) } } else { - fmt.Fprintf(os.Stderr, " ! %s\n", err.Error()) + if err.Error() != "" { + fmt.Fprintf(os.Stderr, " ! %s\n", err.Error()) + } } if errExit, ok := err.(ErrWithExitCode); ok { os.Exit(errExit.ExitCode()) diff --git a/plugins/cron/cron.go b/plugins/cron/cron.go index 1d02ade857d..89d19f552b4 100644 --- a/plugins/cron/cron.go +++ b/plugins/cron/cron.go @@ -47,6 +47,9 @@ type CronTask struct { // Schedule is the cron schedule Schedule string `json:"schedule"` + // ConcurrencyPolicy is the concurrency policy for the cron command + ConcurrencyPolicy string `json:"concurrency_policy"` + // AltCommand is an alternate command to run AltCommand string `json:"-"` @@ -72,7 +75,7 @@ func (t CronTask) DokkuRunCommand() string { return t.AltCommand } - return fmt.Sprintf("dokku run --cron-id %s %s %s", t.ID, t.App, t.Command) + return fmt.Sprintf("dokku run --concurrency-policy %s --cron-id %s %s %s", t.ConcurrencyPolicy, t.ID, t.App, t.Command) } // FetchCronTasksInput is the input for the FetchCronTasks function @@ -148,12 +151,19 @@ func FetchCronTasks(input FetchCronTasksInput) ([]CronTask, error) { maintenance = boolValue } } + if c.ConcurrencyPolicy == "" { + c.ConcurrencyPolicy = "allow" + } + if c.ConcurrencyPolicy != "allow" && c.ConcurrencyPolicy != "forbid" && c.ConcurrencyPolicy != "replace" { + return tasks, fmt.Errorf("Invalid cron concurrency policy for app %s (schedule %s): %s", appName, c.Schedule, c.ConcurrencyPolicy) + } tasks = append(tasks, CronTask{ App: appName, Command: c.Command, Schedule: c.Schedule, ID: cronID, + ConcurrencyPolicy: c.ConcurrencyPolicy, Maintenance: isAppCronInMaintenance || maintenance, AppInMaintenance: isAppCronInMaintenance, TaskInMaintenance: maintenance, diff --git a/plugins/cron/subcommands.go b/plugins/cron/subcommands.go index 23336195351..8bc86677f32 100644 --- a/plugins/cron/subcommands.go +++ b/plugins/cron/subcommands.go @@ -42,7 +42,7 @@ func CommandList(appName string, format string) error { } if format == "stdout" { - output := []string{"ID | Schedule | Maintenance | Command"} + output := []string{"ID | Schedule | Concurrency | Maintenance | Command"} for _, task := range tasks { maintenance := "false" if task.Maintenance { @@ -52,7 +52,7 @@ func CommandList(appName string, format string) error { maintenance = "true (app)" } } - output = append(output, fmt.Sprintf("%s | %s | %s | %s", task.ID, task.Schedule, maintenance, task.Command)) + output = append(output, fmt.Sprintf("%s | %s | %s | %t | %s", task.ID, task.Schedule, task.ConcurrencyPolicy, maintenance, task.Command)) } result := columnize.SimpleFormat(output) @@ -112,9 +112,11 @@ func CommandRun(appName string, cronID string, detached bool) error { } command := "" + concurrencyPolicy := "allow" for _, task := range tasks { if task.ID == cronID { command = task.Command + concurrencyPolicy = task.ConcurrencyPolicy } } @@ -134,6 +136,7 @@ func CommandRun(appName string, cronID string, detached bool) error { os.Setenv("DOKKU_DISABLE_TTY", "true") } + os.Setenv("DOKKU_CONCURRENCY_POLICY", concurrencyPolicy) os.Setenv("DOKKU_CRON_ID", cronID) os.Setenv("DOKKU_RM_CONTAINER", "1") scheduler := common.GetAppScheduler(appName) @@ -143,6 +146,11 @@ func CommandRun(appName string, cronID string, detached bool) error { Args: args, StreamStdio: true, }) + if err != nil { + // return an error with an empty message to avoid + // printing the error message twice + return errors.New("") + } return err } diff --git a/plugins/run/internal-functions b/plugins/run/internal-functions index b4ccfc2ce50..fe45b179046 100755 --- a/plugins/run/internal-functions +++ b/plugins/run/internal-functions @@ -9,7 +9,7 @@ fn-run() { shift 1 declare APP="" - local CRON_ID + local CRON_ID CONCURRENCY_POLICY="allow" declare -a RUN_ENV RUN_ENV=() while [[ $# -gt 0 ]]; do @@ -19,6 +19,11 @@ fn-run() { CRON_ID="$arg" shift ;; + --concurrency-policy=*) + local arg=$(printf "%s" "$1" | sed -E 's/(^--concurrency-policy=)//g') + CONCURRENCY_POLICY="$arg" + shift + ;; --no-tty) export DOKKU_DISABLE_TTY=true shift @@ -35,6 +40,14 @@ fn-run() { CRON_ID="$2" shift 2 ;; + --concurrency-policy) + if [[ ! $2 ]]; then + dokku_log_warn "expected $1 to have an argument" + break + fi + CONCURRENCY_POLICY="$2" + shift 2 + ;; -e=* | --env=*) local arg=$(printf "%s" "$1" | sed -E 's/(^-e=)|(^--env=)//g') RUN_ENV+=("$arg") @@ -67,7 +80,7 @@ fn-run() { verify_app_name "$APP" local DOKKU_SCHEDULER=$(get_app_scheduler "$APP") - DOKKU_CRON_ID="$CRON_ID" plugn trigger scheduler-run "$DOKKU_SCHEDULER" "$APP" "${#RUN_ENV[@]}" "${RUN_ENV[@]}" -- "$@" + DOKKU_CRON_ID="$CRON_ID" DOKKU_CONCURRENCY_POLICY="$CONCURRENCY_POLICY" plugn trigger scheduler-run "$DOKKU_SCHEDULER" "$APP" "${#RUN_ENV[@]}" "${RUN_ENV[@]}" -- "$@" } cmd-run() { diff --git a/plugins/scheduler-docker-local/scheduler-run b/plugins/scheduler-docker-local/scheduler-run index cb6bbfb7427..b83510684df 100755 --- a/plugins/scheduler-docker-local/scheduler-run +++ b/plugins/scheduler-docker-local/scheduler-run @@ -39,9 +39,34 @@ trigger-scheduler-docker-local-scheduler-run() { [[ "$DOKKU_TRACE" ]] && DOCKER_ARGS+=" -e TRACE=true " local PROCESS_TYPE=run + local concurrency_policy="allow" if [[ -n "$DOKKU_CRON_ID" ]]; then PROCESS_TYPE=cron DOCKER_ARGS+=" --label=com.dokku.cron-id=$DOKKU_CRON_ID" + + if [[ -n "$DOKKU_CONCURRENCY_POLICY" ]]; then + concurrency_policy="$DOKKU_CONCURRENCY_POLICY" + fi + DOCKER_ARGS+=" --label=com.dokku.concurrency-policy=$concurrency_policy" + + if [[ "$concurrency_policy" == "forbid" ]]; then + # check if there is a running docker container with the same com.dokku.cron-id label + local RUNNING_CONTAINERS="$("$DOCKER_BIN" container ls --filter "label=com.dokku.cron-id=$DOKKU_CRON_ID" --filter "status=running" --quiet || true)" + if [[ -n "$RUNNING_CONTAINERS" ]]; then + dokku_log_warn "$APP currently has a cron lock in place for $DOKKU_CRON_ID. Exiting..." + return 1 + fi + fi + + if [[ "$concurrency_policy" == "replace" ]]; then + local RUNNING_CONTAINERS="$("$DOCKER_BIN" container ls --filter "label=com.dokku.cron-id=$DOKKU_CRON_ID" --filter "status=running" --quiet || true)" + if [[ -n "$RUNNING_CONTAINERS" ]]; then + dokku_log_warn "Replacing running container for $DOKKU_CRON_ID" + "$DOCKER_BIN" container update --restart=no "$RUNNING_CONTAINERS" &>/dev/null || true + "$DOCKER_BIN" container stop "$RUNNING_CONTAINERS" >/dev/null && "$DOCKER_BIN" container kill "$RUNNING_CONTAINERS" &>/dev/null + plugn trigger scheduler-register-retired "$APP" "$RUNNING_CONTAINERS" + fi + fi fi local DYNO_NUMBER="$RANDOM" @@ -138,6 +163,7 @@ trigger-scheduler-docker-local-scheduler-run() { DOKKU_CONTAINER_EXIT_CODE="$("$DOCKER_BIN" container wait "$CONTAINER_ID" 2>/dev/null || echo "$EXIT_CODE")" [[ -z "$DOKKU_CONTAINER_EXIT_CODE" ]] && DOKKU_CONTAINER_EXIT_CODE=0 fi + plugn trigger scheduler-post-run "$DOKKU_SCHEDULER" "$APP" "$CONTAINER_ID" "$DOKKU_CONTAINER_EXIT_CODE" return "$DOKKU_CONTAINER_EXIT_CODE" } diff --git a/plugins/scheduler-k3s/k8s.go b/plugins/scheduler-k3s/k8s.go index aa5a07543da..14ee6002ba1 100644 --- a/plugins/scheduler-k3s/k8s.go +++ b/plugins/scheduler-k3s/k8s.go @@ -337,6 +337,51 @@ func (k KubernetesClient) DeleteNode(ctx context.Context, input DeleteNodeInput) return k.Client.CoreV1().Nodes().Delete(ctx, input.Name, metav1.DeleteOptions{}) } +// DeletePodInput contains all the information needed to delete a Kubernetes pod +type DeletePodInput struct { + // Name is the Kubernetes pod name + Name string + + // Namespace is the Kubernetes namespace + Namespace string + + // LabelSelector is the Kubernetes label selector + LabelSelector string +} + +// DeletePod deletes a Kubernetes pod +func (k KubernetesClient) DeletePod(ctx context.Context, input DeletePodInput) error { + if input.Name == "" && input.LabelSelector == "" { + return fmt.Errorf("name or label selector is required") + } + + if input.Name != "" && input.LabelSelector != "" { + return fmt.Errorf("name and label selector cannot be used together") + } + + if input.Name != "" { + return k.Client.CoreV1().Pods(input.Namespace).Delete(ctx, input.Name, metav1.DeleteOptions{}) + } + + // get the pods with the label selector + pods, err := k.ListPods(ctx, ListPodsInput{ + Namespace: input.Namespace, + LabelSelector: input.LabelSelector, + }) + if err != nil { + return fmt.Errorf("Error listing pods: %w", err) + } + + for _, pod := range pods { + err := k.Client.CoreV1().Pods(input.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("Error deleting pod %s: %w", pod.Name, err) + } + } + + return nil +} + // DeleteSecretInput contains all the information needed to delete a Kubernetes secret type DeleteSecretInput struct { // Name is the Kubernetes secret name diff --git a/plugins/scheduler-k3s/template.go b/plugins/scheduler-k3s/template.go index 17eafdde5a1..dcc6060d8d7 100644 --- a/plugins/scheduler-k3s/template.go +++ b/plugins/scheduler-k3s/template.go @@ -299,12 +299,21 @@ const ( ) type ProcessCron struct { - ID string `yaml:"id"` - Schedule string `yaml:"schedule"` - Suffix string `yaml:"suffix"` - Suspend bool `yaml:"suspend"` + ID string `yaml:"id"` + Schedule string `yaml:"schedule"` + Suffix string `yaml:"suffix"` + Suspend bool `yaml:"suspend"` + ConcurrencyPolicy ProcessCronConcurrencyPolicy `yaml:"concurrency_policy"` } +type ProcessCronConcurrencyPolicy string + +const ( + ProcessCronConcurrencyPolicy_Allow ProcessCronConcurrencyPolicy = "Allow" + ProcessCronConcurrencyPolicy_Forbid ProcessCronConcurrencyPolicy = "Forbid" + ProcessCronConcurrencyPolicy_Replace ProcessCronConcurrencyPolicy = "Replace" +) + type ProcessPortMap struct { ContainerPort int32 `yaml:"container_port"` HostPort int32 `yaml:"host_port"` diff --git a/plugins/scheduler-k3s/templates/chart/cron-job.yaml b/plugins/scheduler-k3s/templates/chart/cron-job.yaml index b5b4e8b88e2..102b3ce1e5b 100644 --- a/plugins/scheduler-k3s/templates/chart/cron-job.yaml +++ b/plugins/scheduler-k3s/templates/chart/cron-job.yaml @@ -26,7 +26,7 @@ metadata: name: {{ $.Values.global.app_name }}-cron-{{ $config.cron.suffix }} namespace: {{ $.Values.global.namespace }} spec: - concurrencyPolicy: Allow + concurrencyPolicy: {{ $config.cron.concurrency_policy }} failedJobsHistoryLimit: 10 jobTemplate: metadata: diff --git a/plugins/scheduler-k3s/triggers.go b/plugins/scheduler-k3s/triggers.go index 626f11a5dce..de2e9c2c925 100644 --- a/plugins/scheduler-k3s/triggers.go +++ b/plugins/scheduler-k3s/triggers.go @@ -699,14 +699,26 @@ func TriggerSchedulerDeploy(scheduler string, appName string, imageTag string) e return fmt.Errorf("Error getting process labels: %w", err) } + concurrencyPolicy := strings.ToUpper(cronTask.ConcurrencyPolicy) + switch concurrencyPolicy { + case "ALLOW": + concurrencyPolicy = "Allow" + case "FORBID": + concurrencyPolicy = "Forbid" + case "REPLACE": + concurrencyPolicy = "Replace" + default: + return fmt.Errorf("Invalid concurrency_policy specified: %v", concurrencyPolicy) + } processValues := ProcessValues{ Args: words, Annotations: annotations, Cron: ProcessCron{ - ID: cronTask.ID, - Schedule: cronTask.Schedule, - Suffix: suffix, - Suspend: cronTask.Maintenance, + ID: cronTask.ID, + Schedule: cronTask.Schedule, + Suffix: suffix, + Suspend: cronTask.Maintenance, + ConcurrencyPolicy: ProcessCronConcurrencyPolicy(concurrencyPolicy), }, Labels: labels, ProcessType: ProcessType_Cron, @@ -1200,10 +1212,44 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args [] extraEnv["TRACE"] = "true" } + namespace := getComputedNamespace(appName) + clientset, err := NewKubernetesClient() + if err != nil { + return fmt.Errorf("Error creating kubernetes client: %w", err) + } + + if err := clientset.Ping(); err != nil { + return fmt.Errorf("kubernetes api not available: %w", err) + } + processType := "run" if os.Getenv("DOKKU_CRON_ID") != "" { processType = "cron" labels["dokku.com/cron-id"] = os.Getenv("DOKKU_CRON_ID") + concurrencyPolicy := strings.ToUpper(os.Getenv("DOKKU_CONCURRENCY_POLICY")) + switch concurrencyPolicy { + case "forbid": + // check if there is a running pod with the same dokku.com/cron-id label + pods, err := clientset.ListPods(context.Background(), ListPodsInput{ + Namespace: namespace, + LabelSelector: fmt.Sprintf("dokku.com/cron-id=%s", os.Getenv("DOKKU_CRON_ID")), + }) + if err != nil { + return fmt.Errorf("Error listing pods: %w", err) + } + if len(pods) > 0 { + return fmt.Errorf("There is a running pod with the same dokku.com/cron-id label") + } + case "replace": + // delete any existing pod with the same dokku.com/cron-id label + err := clientset.DeletePod(context.Background(), DeletePodInput{ + Namespace: namespace, + LabelSelector: fmt.Sprintf("dokku.com/cron-id=%s", os.Getenv("DOKKU_CRON_ID")), + }) + if err != nil { + return fmt.Errorf("Error deleting pod: %w", err) + } + } } imageSourceType, err := common.DockerInspect(image, "{{ index .Config.Labels \"com.dokku.builder-type\" }}") @@ -1233,7 +1279,6 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args [] entrypoint = "/exec" } - namespace := getComputedNamespace(appName) helmAgent, err := NewHelmAgent(namespace, DevNullPrinter) if err != nil { return fmt.Errorf("Error creating helm agent: %w", err) @@ -1263,15 +1308,6 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args [] attachToPod := os.Getenv("DOKKU_DETACH_CONTAINER") != "1" - clientset, err := NewKubernetesClient() - if err != nil { - return fmt.Errorf("Error creating kubernetes client: %w", err) - } - - if err := clientset.Ping(); err != nil { - return fmt.Errorf("kubernetes api not available: %w", err) - } - imagePullSecrets := getComputedImagePullSecrets(appName) if imagePullSecrets == "" { imagePullSecrets = fmt.Sprintf("ims-%s.%d", appName, deploymentID) diff --git a/tests/unit/cron.bats b/tests/unit/cron.bats index 5c0b1345dfc..5eb2bc34a4c 100644 --- a/tests/unit/cron.bats +++ b/tests/unit/cron.bats @@ -129,7 +129,7 @@ teardown() { echo "output: $output" echo "status: $status" assert_success - assert_output '[{"id":"5cruaotm4yzzpnjlsdunblj8qyjp","command":"/bin/true","global":true,"schedule":"@daily","app-in-maintenance":false,"task-in-maintenance":false,"maintenance":false}]' + assert_output '[{"id":"5cruaotm4yzzpnjlsdunblj8qyjp","command":"/bin/true","global":true,"schedule":"@daily","concurrency_policy":"","app-in-maintenance":false,"task-in-maintenance":false,"maintenance":false}]' run /bin/bash -c "cat /var/spool/cron/crontabs/dokku" echo "output: $output" @@ -235,6 +235,37 @@ teardown() { assert_output "['task.py', 'schedule', 'now']" } +@test "(cron) cron:run concurrency_policy forbid" { + run deploy_app dockerfile dokku@$DOKKU_DOMAIN:$TEST_APP template_cron_file_concurrency_forbid + echo "output: $output" + echo "status: $status" + assert_success + + cron_id="$(dokku cron:list $TEST_APP --format json | jq -r '.[0].id')" + run /bin/bash -c "echo cron $cron_id" + echo "output: $output" + echo "status: $status" + assert_success + assert_output_exists + + run /bin/bash -c "dokku cron:run $TEST_APP $cron_id --detach" + echo "output: $output" + echo "status: $status" + assert_success + + run /bin/bash -c "docker ps --filter "label=com.dokku.cron-id=$cron_id" -q | xargs docker inspect -f '{{ index .Config.Labels \"com.dokku.concurrency-policy\" }}'" + echo "output: $output" + echo "status: $status" + assert_success + assert_output "forbid" + + run /bin/bash -c "dokku cron:run $TEST_APP $cron_id" + echo "output: $output" + echo "status: $status" + assert_output_contains "currently has a cron lock in place for $cron_id" + assert_failure +} + @test "(cron) cron:suspend cron:resume" { run deploy_app python dokku@$DOKKU_DOMAIN:$TEST_APP template_cron_file_valid_multiple echo "output: $output" @@ -427,3 +458,21 @@ template_cron_file_valid_multiple() { } EOF } + +template_cron_file_concurrency_forbid() { + local APP="$1" + local APP_REPO_DIR="$2" + [[ -z "$APP" ]] && local APP="$TEST_APP" + echo "injecting valid cron app.json -> $APP_REPO_DIR/app.json" + cat <"$APP_REPO_DIR/app.json" +{ + "cron": [ + { + "command": "sleep 30", + "schedule": "0 0 * * *", + "concurrency_policy": "forbid" + } + ] +} +EOF +}