这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/appendices/file-formats/app-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- `command`: (string, required)
- `maintenance`: (boolean, optional)
- `schedule`: (string, required)
- `concurrency_policy`: (string, optional, default: `allow`, options: `allow`, `forbid`, `replace`)

## Formation

Expand Down
2 changes: 2 additions & 0 deletions docs/processes/scheduled-cron-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ A cron task takes the following properties:
- `command`: A command to be run within the built app image. Specified commands can also be `Procfile` entries.
- `maintenance`: A boolean value that decides whether the cron task is in maintenance and therefore executable or not.
- `schedule`: A [cron-compatible](https://en.wikipedia.org/wiki/Cron#Overview) scheduling definition upon which to run the command. Seconds are generally not supported.
- `concurrency_policy`: A string (default: `allow`), that controls whether the cron task can be run concurrently with another invocation of itself. Valid options are `allow` (allow concurrency), `forbid` (exit the new cron task if there is an existing one), `replace` (delete any existing cron task and start the new one).


Zero or more cron tasks can be specified per app. Cron tasks are validated after the build artifact is created but before the app is deployed, and the cron schedule is updated during the post-deploy phase.

Expand Down
3 changes: 3 additions & 0 deletions plugins/app-json/appjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type CronTask struct {

// Schedule is the cron schedule to execute the command on
Schedule string `json:"schedule"`

// ConcurrencyPolicy is the concurrency policy for the cron command
ConcurrencyPolicy string `json:"concurrency_policy"`
}

// Formation is a struct that represents the scale for a process from an app.json file
Expand Down
4 changes: 3 additions & 1 deletion plugins/common/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ func LogFailWithError(err error) {
fmt.Fprintf(os.Stderr, " ! %s\n", e.Error())
}
} else {
fmt.Fprintf(os.Stderr, " ! %s\n", err.Error())
if err.Error() != "" {
fmt.Fprintf(os.Stderr, " ! %s\n", err.Error())
}
}
if errExit, ok := err.(ErrWithExitCode); ok {
os.Exit(errExit.ExitCode())
Expand Down
12 changes: 11 additions & 1 deletion plugins/cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type CronTask struct {
// Schedule is the cron schedule
Schedule string `json:"schedule"`

// ConcurrencyPolicy is the concurrency policy for the cron command
ConcurrencyPolicy string `json:"concurrency_policy"`

// AltCommand is an alternate command to run
AltCommand string `json:"-"`

Expand All @@ -72,7 +75,7 @@ func (t CronTask) DokkuRunCommand() string {
return t.AltCommand
}

return fmt.Sprintf("dokku run --cron-id %s %s %s", t.ID, t.App, t.Command)
return fmt.Sprintf("dokku run --concurrency-policy %s --cron-id %s %s %s", t.ConcurrencyPolicy, t.ID, t.App, t.Command)
}

// FetchCronTasksInput is the input for the FetchCronTasks function
Expand Down Expand Up @@ -148,12 +151,19 @@ func FetchCronTasks(input FetchCronTasksInput) ([]CronTask, error) {
maintenance = boolValue
}
}
if c.ConcurrencyPolicy == "" {
c.ConcurrencyPolicy = "allow"
}
if c.ConcurrencyPolicy != "allow" && c.ConcurrencyPolicy != "forbid" && c.ConcurrencyPolicy != "replace" {
return tasks, fmt.Errorf("Invalid cron concurrency policy for app %s (schedule %s): %s", appName, c.Schedule, c.ConcurrencyPolicy)
}

tasks = append(tasks, CronTask{
App: appName,
Command: c.Command,
Schedule: c.Schedule,
ID: cronID,
ConcurrencyPolicy: c.ConcurrencyPolicy,
Maintenance: isAppCronInMaintenance || maintenance,
AppInMaintenance: isAppCronInMaintenance,
TaskInMaintenance: maintenance,
Expand Down
12 changes: 10 additions & 2 deletions plugins/cron/subcommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func CommandList(appName string, format string) error {
}

if format == "stdout" {
output := []string{"ID | Schedule | Maintenance | Command"}
output := []string{"ID | Schedule | Concurrency | Maintenance | Command"}
for _, task := range tasks {
maintenance := "false"
if task.Maintenance {
Expand All @@ -52,7 +52,7 @@ func CommandList(appName string, format string) error {
maintenance = "true (app)"
}
}
output = append(output, fmt.Sprintf("%s | %s | %s | %s", task.ID, task.Schedule, maintenance, task.Command))
output = append(output, fmt.Sprintf("%s | %s | %s | %t | %s", task.ID, task.Schedule, task.ConcurrencyPolicy, maintenance, task.Command))
}

result := columnize.SimpleFormat(output)
Expand Down Expand Up @@ -112,9 +112,11 @@ func CommandRun(appName string, cronID string, detached bool) error {
}

command := ""
concurrencyPolicy := "allow"
for _, task := range tasks {
if task.ID == cronID {
command = task.Command
concurrencyPolicy = task.ConcurrencyPolicy
}
}

Expand All @@ -134,6 +136,7 @@ func CommandRun(appName string, cronID string, detached bool) error {
os.Setenv("DOKKU_DISABLE_TTY", "true")
}

os.Setenv("DOKKU_CONCURRENCY_POLICY", concurrencyPolicy)
os.Setenv("DOKKU_CRON_ID", cronID)
os.Setenv("DOKKU_RM_CONTAINER", "1")
scheduler := common.GetAppScheduler(appName)
Expand All @@ -143,6 +146,11 @@ func CommandRun(appName string, cronID string, detached bool) error {
Args: args,
StreamStdio: true,
})
if err != nil {
// return an error with an empty message to avoid
// printing the error message twice
return errors.New("")
}
return err
}

Expand Down
17 changes: 15 additions & 2 deletions plugins/run/internal-functions
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn-run() {
shift 1

declare APP=""
local CRON_ID
local CRON_ID CONCURRENCY_POLICY="allow"
declare -a RUN_ENV
RUN_ENV=()
while [[ $# -gt 0 ]]; do
Expand All @@ -19,6 +19,11 @@ fn-run() {
CRON_ID="$arg"
shift
;;
--concurrency-policy=*)
local arg=$(printf "%s" "$1" | sed -E 's/(^--concurrency-policy=)//g')
CONCURRENCY_POLICY="$arg"
shift
;;
--no-tty)
export DOKKU_DISABLE_TTY=true
shift
Expand All @@ -35,6 +40,14 @@ fn-run() {
CRON_ID="$2"
shift 2
;;
--concurrency-policy)
if [[ ! $2 ]]; then
dokku_log_warn "expected $1 to have an argument"
break
fi
CONCURRENCY_POLICY="$2"
shift 2
;;
-e=* | --env=*)
local arg=$(printf "%s" "$1" | sed -E 's/(^-e=)|(^--env=)//g')
RUN_ENV+=("$arg")
Expand Down Expand Up @@ -67,7 +80,7 @@ fn-run() {
verify_app_name "$APP"

local DOKKU_SCHEDULER=$(get_app_scheduler "$APP")
DOKKU_CRON_ID="$CRON_ID" plugn trigger scheduler-run "$DOKKU_SCHEDULER" "$APP" "${#RUN_ENV[@]}" "${RUN_ENV[@]}" -- "$@"
DOKKU_CRON_ID="$CRON_ID" DOKKU_CONCURRENCY_POLICY="$CONCURRENCY_POLICY" plugn trigger scheduler-run "$DOKKU_SCHEDULER" "$APP" "${#RUN_ENV[@]}" "${RUN_ENV[@]}" -- "$@"
}

cmd-run() {
Expand Down
26 changes: 26 additions & 0 deletions plugins/scheduler-docker-local/scheduler-run
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,34 @@ trigger-scheduler-docker-local-scheduler-run() {
[[ "$DOKKU_TRACE" ]] && DOCKER_ARGS+=" -e TRACE=true "

local PROCESS_TYPE=run
local concurrency_policy="allow"
if [[ -n "$DOKKU_CRON_ID" ]]; then
PROCESS_TYPE=cron
DOCKER_ARGS+=" --label=com.dokku.cron-id=$DOKKU_CRON_ID"

if [[ -n "$DOKKU_CONCURRENCY_POLICY" ]]; then
concurrency_policy="$DOKKU_CONCURRENCY_POLICY"
fi
DOCKER_ARGS+=" --label=com.dokku.concurrency-policy=$concurrency_policy"

if [[ "$concurrency_policy" == "forbid" ]]; then
# check if there is a running docker container with the same com.dokku.cron-id label
local RUNNING_CONTAINERS="$("$DOCKER_BIN" container ls --filter "label=com.dokku.cron-id=$DOKKU_CRON_ID" --filter "status=running" --quiet || true)"
if [[ -n "$RUNNING_CONTAINERS" ]]; then
dokku_log_warn "$APP currently has a cron lock in place for $DOKKU_CRON_ID. Exiting..."
return 1
fi
fi

if [[ "$concurrency_policy" == "replace" ]]; then
local RUNNING_CONTAINERS="$("$DOCKER_BIN" container ls --filter "label=com.dokku.cron-id=$DOKKU_CRON_ID" --filter "status=running" --quiet || true)"
if [[ -n "$RUNNING_CONTAINERS" ]]; then
dokku_log_warn "Replacing running container for $DOKKU_CRON_ID"
"$DOCKER_BIN" container update --restart=no "$RUNNING_CONTAINERS" &>/dev/null || true
"$DOCKER_BIN" container stop "$RUNNING_CONTAINERS" >/dev/null && "$DOCKER_BIN" container kill "$RUNNING_CONTAINERS" &>/dev/null
plugn trigger scheduler-register-retired "$APP" "$RUNNING_CONTAINERS"
fi
fi
fi

local DYNO_NUMBER="$RANDOM"
Expand Down Expand Up @@ -138,6 +163,7 @@ trigger-scheduler-docker-local-scheduler-run() {
DOKKU_CONTAINER_EXIT_CODE="$("$DOCKER_BIN" container wait "$CONTAINER_ID" 2>/dev/null || echo "$EXIT_CODE")"
[[ -z "$DOKKU_CONTAINER_EXIT_CODE" ]] && DOKKU_CONTAINER_EXIT_CODE=0
fi

plugn trigger scheduler-post-run "$DOKKU_SCHEDULER" "$APP" "$CONTAINER_ID" "$DOKKU_CONTAINER_EXIT_CODE"
return "$DOKKU_CONTAINER_EXIT_CODE"
}
Expand Down
45 changes: 45 additions & 0 deletions plugins/scheduler-k3s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,51 @@ func (k KubernetesClient) DeleteNode(ctx context.Context, input DeleteNodeInput)
return k.Client.CoreV1().Nodes().Delete(ctx, input.Name, metav1.DeleteOptions{})
}

// DeletePodInput contains all the information needed to delete a Kubernetes pod
type DeletePodInput struct {
// Name is the Kubernetes pod name
Name string

// Namespace is the Kubernetes namespace
Namespace string

// LabelSelector is the Kubernetes label selector
LabelSelector string
}

// DeletePod deletes a Kubernetes pod
func (k KubernetesClient) DeletePod(ctx context.Context, input DeletePodInput) error {
if input.Name == "" && input.LabelSelector == "" {
return fmt.Errorf("name or label selector is required")
}

if input.Name != "" && input.LabelSelector != "" {
return fmt.Errorf("name and label selector cannot be used together")
}

if input.Name != "" {
return k.Client.CoreV1().Pods(input.Namespace).Delete(ctx, input.Name, metav1.DeleteOptions{})
}

// get the pods with the label selector
pods, err := k.ListPods(ctx, ListPodsInput{
Namespace: input.Namespace,
LabelSelector: input.LabelSelector,
})
if err != nil {
return fmt.Errorf("Error listing pods: %w", err)
}

for _, pod := range pods {
err := k.Client.CoreV1().Pods(input.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("Error deleting pod %s: %w", pod.Name, err)
}
}

return nil
}

// DeleteSecretInput contains all the information needed to delete a Kubernetes secret
type DeleteSecretInput struct {
// Name is the Kubernetes secret name
Expand Down
17 changes: 13 additions & 4 deletions plugins/scheduler-k3s/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,21 @@ const (
)

type ProcessCron struct {
ID string `yaml:"id"`
Schedule string `yaml:"schedule"`
Suffix string `yaml:"suffix"`
Suspend bool `yaml:"suspend"`
ID string `yaml:"id"`
Schedule string `yaml:"schedule"`
Suffix string `yaml:"suffix"`
Suspend bool `yaml:"suspend"`
ConcurrencyPolicy ProcessCronConcurrencyPolicy `yaml:"concurrency_policy"`
}

type ProcessCronConcurrencyPolicy string

const (
ProcessCronConcurrencyPolicy_Allow ProcessCronConcurrencyPolicy = "Allow"
ProcessCronConcurrencyPolicy_Forbid ProcessCronConcurrencyPolicy = "Forbid"
ProcessCronConcurrencyPolicy_Replace ProcessCronConcurrencyPolicy = "Replace"
)

type ProcessPortMap struct {
ContainerPort int32 `yaml:"container_port"`
HostPort int32 `yaml:"host_port"`
Expand Down
2 changes: 1 addition & 1 deletion plugins/scheduler-k3s/templates/chart/cron-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ metadata:
name: {{ $.Values.global.app_name }}-cron-{{ $config.cron.suffix }}
namespace: {{ $.Values.global.namespace }}
spec:
concurrencyPolicy: Allow
concurrencyPolicy: {{ $config.cron.concurrency_policy }}
failedJobsHistoryLimit: 10
jobTemplate:
metadata:
Expand Down
Loading
Loading