From f3d4577e426972b226370ee45c2bde84d80ba869 Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Sun, 21 Dec 2025 18:42:34 +0900 Subject: [PATCH] fix(cmd): refactor start command not to prevent DAG from running unexpectedly (#1497) * **Behavior Changes** * Removed explicit no-queue and disable-max-active-runs options; start/retry flows simplified to default local execution and streamlined retry semantics. * **New Features** * Singleton mode now returns clear HTTP 409 conflicts when a singleton DAG is already running or queued. * Added top-level run Error field and an API to record early failures for quicker failure visibility. * **Bug Fixes** * Improved process acquisition and restart/retry error handling; tests updated to reflect local execution behavior. --- .vscode/launch.json | 2 +- internal/cmd/context.go | 49 +++++ internal/cmd/exec.go | 78 +------- internal/cmd/exec_spec.go | 28 +-- internal/cmd/flags.go | 17 -- internal/cmd/record_early_failure_test.go | 182 ++++++++++++++++++ internal/cmd/restart.go | 34 ++-- internal/cmd/retry.go | 106 +--------- internal/cmd/start.go | 72 ++----- internal/core/execution/runstatus.go | 4 + internal/integration/distributed_e2e_test.go | 23 ++- internal/integration/distributed_test.go | 96 ++++----- internal/integration/gha_test.go | 2 +- internal/runtime/executor/dag_runner.go | 2 - internal/runtime/executor/dag_runner_test.go | 1 - internal/runtime/subcmd.go | 22 +-- internal/runtime/subcmd_test.go | 32 +-- internal/runtime/transform/status.go | 7 + internal/service/frontend/api/v1/dags.go | 4 +- internal/service/frontend/api/v2/dagruns.go | 8 +- internal/service/frontend/api/v2/dags.go | 136 ++++--------- .../service/frontend/api/v2/singleton_test.go | 124 ++++++++++++ internal/service/scheduler/dag_executor.go | 2 +- internal/service/worker/handler_test.go | 2 - 24 files changed, 521 insertions(+), 512 deletions(-) create mode 100644 internal/cmd/record_early_failure_test.go create mode 100644 internal/service/frontend/api/v2/singleton_test.go diff --git a/.vscode/launch.json b/.vscode/launch.json index d797d02a..96591c2d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -42,7 +42,7 @@ "request": "launch", "mode": "auto", "program": "${workspaceFolder}/cmd/", - "args": ["start", "--no-queue", "${input:pathToSpec}"] + "args": ["start", "${input:pathToSpec}"] }, { "name": "Retry", diff --git a/internal/cmd/context.go b/internal/cmd/context.go index 8ddf0153..879c329b 100644 --- a/internal/cmd/context.go +++ b/internal/cmd/context.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "errors" "fmt" "log/slog" "os" @@ -27,6 +28,7 @@ import ( "github.com/dagu-org/dagu/internal/persistence/filequeue" "github.com/dagu-org/dagu/internal/persistence/fileserviceregistry" "github.com/dagu-org/dagu/internal/runtime" + "github.com/dagu-org/dagu/internal/runtime/transform" "github.com/dagu-org/dagu/internal/service/coordinator" "github.com/dagu-org/dagu/internal/service/frontend" "github.com/dagu-org/dagu/internal/service/resource" @@ -488,6 +490,53 @@ func (cfg LogConfig) LogDir() (string, error) { return logDir, nil } +// RecordEarlyFailure records a failure in the execution history before the DAG has fully started. +// This is used for infrastructure errors like singleton conflicts or process acquisition failures. +func (c *Context) RecordEarlyFailure(dag *core.DAG, dagRunID string, err error) error { + if dag == nil || dagRunID == "" { + return fmt.Errorf("DAG and dag-run ID are required to record failure") + } + + // 1. Check if a DAGRunAttempt already exists for the given run-id. + ref := execution.NewDAGRunRef(dag.Name, dagRunID) + attempt, findErr := c.DAGRunStore.FindAttempt(c, ref) + if findErr != nil && !errors.Is(findErr, execution.ErrDAGRunIDNotFound) { + return fmt.Errorf("failed to check for existing attempt: %w", findErr) + } + + if attempt == nil { + // 2. Create the attempt if not exists + att, createErr := c.DAGRunStore.CreateAttempt(c, dag, time.Now(), dagRunID, execution.NewDAGRunAttemptOptions{}) + if createErr != nil { + return fmt.Errorf("failed to create run to record failure: %w", createErr) + } + attempt = att + } + + // 3. Construct the "Failed" status + statusBuilder := transform.NewStatusBuilder(dag) + logPath, _ := c.GenLogFileName(dag, dagRunID) + status := statusBuilder.Create(dagRunID, core.Failed, 0, time.Now(), + transform.WithLogFilePath(logPath), + transform.WithFinishedAt(time.Now()), + transform.WithError(err.Error()), + ) + + // 4. Write the status + if err := attempt.Open(c); err != nil { + return fmt.Errorf("failed to open attempt for recording failure: %w", err) + } + defer func() { + _ = attempt.Close(c) + }() + + if err := attempt.Write(c, status); err != nil { + return fmt.Errorf("failed to write failed status: %w", err) + } + + return nil +} + // LogFile constructs the log filename using the prefix, safe DAG name, current timestamp, // and a truncated version of the dag-run ID. func (cfg LogConfig) LogFile() string { diff --git a/internal/cmd/exec.go b/internal/cmd/exec.go index 13d29159..d9d31386 100644 --- a/internal/cmd/exec.go +++ b/internal/cmd/exec.go @@ -1,9 +1,7 @@ package cmd import ( - "errors" "fmt" - "log/slog" "os" "strings" @@ -21,7 +19,6 @@ const ( flagWorkdir = "workdir" flagShell = "shell" flagBase = "base" - flagSingleton = "singleton" defaultStepName = "main" execCommandUsage = "exec [flags] -- [args...]" ) @@ -30,12 +27,9 @@ var ( execFlags = []commandLineFlag{ dagRunIDFlag, nameFlag, - queueFlag, - noQueueFlag, workdirFlag, shellFlag, baseFlag, - singletonFlag, } ) @@ -49,7 +43,7 @@ func Exec() *cobra.Command { Examples: dagu exec -- echo "hello world" dagu exec --env FOO=bar -- sh -c 'echo $FOO' - dagu exec --queue nightly --worker-label role=batch -- python nightly.py`, + dagu exec --worker-label role=batch -- python remote_script.py`, Args: cobra.ArbitraryArgs, } @@ -62,14 +56,12 @@ Examples: return command } -// runExec parses flags and arguments and executes the provided command as an inline DAG run, -// either enqueueing it for distributed execution or running it immediately in-process. -// It validates inputs (run-id, working directory, base and dotenv files, env vars, worker labels, -// queue/singleton flags), builds the DAG for the inline command, and chooses between enqueueing -// (when queues/worker labels require it or when max runs are reached) or direct execution. +// runExec parses flags and arguments and executes the provided command as an inline DAG run. +// It validates inputs (run-id, working directory, base and dotenv files, env vars, worker labels), +// builds the DAG for the inline command, and executes it locally. // ctx provides CLI and application context; args are the command and its arguments. // Returns an error for validation failures, when a dag-run with the same run-id already exists, -// or if enqueueing/execution fails. +// or if execution fails. func runExec(ctx *Context, args []string) error { if len(args) == 0 { return fmt.Errorf("command is required (try: dagu exec -- )") @@ -177,29 +169,10 @@ func runExec(ctx *Context, args []string) error { workerLabels[key] = value } - queueName, err := ctx.Command.Flags().GetString("queue") - if err != nil { - return fmt.Errorf("failed to read queue flag: %w", err) - } - noQueue, err := ctx.Command.Flags().GetBool("no-queue") - if err != nil { - return fmt.Errorf("failed to read no-queue flag: %w", err) - } - - singleton, err := ctx.Command.Flags().GetBool(flagSingleton) - if err != nil { - return fmt.Errorf("failed to read singleton flag: %w", err) - } - - queueDisabled := !ctx.Config.Queues.Enabled || noQueue if len(workerLabels) > 0 { if !ctx.Config.Queues.Enabled { return fmt.Errorf("worker selector requires queues; enable queues or remove --worker-label") } - if noQueue { - return fmt.Errorf("--worker-label cannot be combined with --no-queue") - } - queueDisabled = false } opts := ExecOptions{ @@ -210,9 +183,6 @@ func runExec(ctx *Context, args []string) error { Env: envVars, DotenvFiles: dotenvPaths, BaseConfig: baseConfig, - Queue: queueName, - NoQueue: noQueue, - Singleton: singleton, WorkerLabels: workerLabels, } @@ -223,28 +193,6 @@ func runExec(ctx *Context, args []string) error { dagRunRef := execution.NewDAGRunRef(dag.Name, runID) - if !queueDisabled && len(workerLabels) > 0 { - logger.Info(ctx, "Queueing inline dag-run for distributed execution", - tag.DAG(dag.Name), - tag.RunID(runID), - slog.Any("worker-selector", workerLabels), - tag.Command(strings.Join(args, " ")), - ) - dag.Location = "" - return enqueueDAGRun(ctx, dag, runID) - } - - if !queueDisabled && dag.Queue != "" { - logger.Info(ctx, "Queueing inline dag-run", - tag.DAG(dag.Name), - tag.Queue(dag.Queue), - tag.RunID(runID), - tag.Command(strings.Join(args, " ")), - ) - dag.Location = "" - return enqueueDAGRun(ctx, dag, runID) - } - attempt, _ := ctx.DAGRunStore.FindAttempt(ctx, dagRunRef) if attempt != nil { return fmt.Errorf("dag-run ID %s already exists for DAG %s", runID, dag.Name) @@ -256,16 +204,7 @@ func runExec(ctx *Context, args []string) error { tag.RunID(runID), ) - err = tryExecuteDAG(ctx, dag, runID, dagRunRef, false) - if errors.Is(err, errMaxRunReached) && !queueDisabled { - logger.Info(ctx, "Max active runs reached; enqueueing dag-run instead", - tag.DAG(dag.Name), - tag.RunID(runID), - ) - dag.Location = "" - return enqueueDAGRun(ctx, dag, runID) - } - return err + return tryExecuteDAG(ctx, dag, runID, dagRunRef) } var ( @@ -281,9 +220,4 @@ var ( name: flagBase, usage: "Path to a base DAG YAML whose defaults are applied before inline overrides", } - singletonFlag = commandLineFlag{ - name: flagSingleton, - usage: "Limit execution to a single active run (sets maxActiveRuns=1)", - isBool: true, - } ) diff --git a/internal/cmd/exec_spec.go b/internal/cmd/exec_spec.go index 3f34750c..ac82f2fa 100644 --- a/internal/cmd/exec_spec.go +++ b/internal/cmd/exec_spec.go @@ -22,9 +22,6 @@ type ExecOptions struct { Env []string DotenvFiles []string BaseConfig string - Queue string - NoQueue bool - Singleton bool WorkerLabels map[string]string } @@ -34,8 +31,6 @@ type execSpec struct { WorkingDir string `yaml:"workingDir,omitempty"` Env []string `yaml:"env,omitempty"` Dotenv []string `yaml:"dotenv,omitempty"` - MaxActiveRuns int `yaml:"maxActiveRuns,omitempty"` - Queue string `yaml:"queue,omitempty"` WorkerSelector map[string]string `yaml:"workerSelector,omitempty"` Steps []execStep `yaml:"steps"` } @@ -59,24 +54,12 @@ func buildExecDAG(ctx *Context, opts ExecOptions) (*core.DAG, string, error) { return nil, "", fmt.Errorf("invalid DAG name: %w", err) } - maxActiveRuns := -1 - if opts.Singleton { - maxActiveRuns = 1 - } - - queueValue := "" - if opts.Queue != "" && !opts.NoQueue { - queueValue = opts.Queue - } - specDoc := execSpec{ Name: name, Type: core.TypeChain, WorkingDir: opts.WorkingDir, Env: opts.Env, Dotenv: opts.DotenvFiles, - MaxActiveRuns: maxActiveRuns, - Queue: queueValue, WorkerSelector: opts.WorkerLabels, Steps: []execStep{ { @@ -127,19 +110,10 @@ func buildExecDAG(ctx *Context, opts ExecOptions) (*core.DAG, string, error) { dag.Name = name dag.WorkingDir = opts.WorkingDir - if opts.Queue != "" && !opts.NoQueue { - dag.Queue = opts.Queue - } else if opts.NoQueue { - dag.Queue = "" - } if len(opts.WorkerLabels) > 0 { dag.WorkerSelector = opts.WorkerLabels } - if opts.Singleton { - dag.MaxActiveRuns = 1 - } else { - dag.MaxActiveRuns = -1 - } + dag.MaxActiveRuns = -1 dag.Location = "" return dag, string(specYAML), nil diff --git a/internal/cmd/flags.go b/internal/cmd/flags.go index 29e889d1..c277db6e 100644 --- a/internal/cmd/flags.go +++ b/internal/cmd/flags.go @@ -67,14 +67,6 @@ var ( usage: "Override the DAG name (default: name from DAG definition or filename)", } - // noQueueFlag is used to indicate that the dag-run should not be queued and should be executed immediately. - noQueueFlag = commandLineFlag{ - name: "no-queue", - usage: "Do not queue the dag-run, execute immediately", - isBool: true, - shorthand: "n", - } - // Unique dag-run ID required for retrying a dag-run. // This flag must be provided when using the retry command. dagRunIDFlagRetry = commandLineFlag{ @@ -92,15 +84,6 @@ var ( defaultValue: "", } - // noCheckMaxActiveRuns - disableMaxActiveRuns = commandLineFlag{ - name: "disable-max-active-runs", - shorthand: "", - usage: "Disable check for max active runs", - isBool: true, - defaultValue: "", - } - // Unique dag-run ID used for starting a new dag-run. // This is used to track and identify the execution instance and its status. dagRunIDFlag = commandLineFlag{ diff --git a/internal/cmd/record_early_failure_test.go b/internal/cmd/record_early_failure_test.go new file mode 100644 index 00000000..5cf8db18 --- /dev/null +++ b/internal/cmd/record_early_failure_test.go @@ -0,0 +1,182 @@ +package cmd_test + +import ( + "errors" + "testing" + "time" + + "github.com/dagu-org/dagu/internal/cmd" + "github.com/dagu-org/dagu/internal/core" + "github.com/dagu-org/dagu/internal/core/execution" + "github.com/dagu-org/dagu/internal/test" + "github.com/stretchr/testify/require" +) + +func TestRecordEarlyFailure(t *testing.T) { + t.Run("RecordsFailureForNewDAGRun", func(t *testing.T) { + th := test.SetupCommand(t) + + dag := th.DAG(t, ` +steps: + - name: step1 + command: echo hello +`) + + dagRunID := "test-run-id-001" + testErr := errors.New("process acquisition failed") + + // Create Context with required stores + ctx := &cmd.Context{ + Context: th.Context, + Config: th.Config, + DAGRunStore: th.DAGRunStore, + } + + // Record the early failure + err := ctx.RecordEarlyFailure(dag.DAG, dagRunID, testErr) + require.NoError(t, err) + + // Verify the failure was recorded + ref := execution.NewDAGRunRef(dag.Name, dagRunID) + attempt, err := th.DAGRunStore.FindAttempt(th.Context, ref) + require.NoError(t, err) + require.NotNil(t, attempt) + + status, err := attempt.ReadStatus(th.Context) + require.NoError(t, err) + require.Equal(t, core.Failed, status.Status) + require.Contains(t, status.Error, "process acquisition failed") + }) + + t.Run("RecordsFailureForExistingAttempt", func(t *testing.T) { + th := test.SetupCommand(t) + + dag := th.DAG(t, ` +steps: + - name: step1 + command: echo hello +`) + + // First, run the DAG to create an attempt + th.RunCommand(t, cmd.Start(), test.CmdTest{ + Args: []string{"start", dag.Location}, + }) + + // Get the existing run ID + latestStatus, err := th.DAGRunMgr.GetLatestStatus(th.Context, dag.DAG) + require.NoError(t, err) + dagRunID := latestStatus.DAGRunID + + // Now record an early failure for the same run ID + testErr := errors.New("retry failed due to lock contention") + + ctx := &cmd.Context{ + Context: th.Context, + Config: th.Config, + DAGRunStore: th.DAGRunStore, + } + + err = ctx.RecordEarlyFailure(dag.DAG, dagRunID, testErr) + require.NoError(t, err) + + // Verify the failure was recorded (status should be updated) + ref := execution.NewDAGRunRef(dag.Name, dagRunID) + attempt, err := th.DAGRunStore.FindAttempt(th.Context, ref) + require.NoError(t, err) + require.NotNil(t, attempt) + + status, err := attempt.ReadStatus(th.Context) + require.NoError(t, err) + require.Equal(t, core.Failed, status.Status) + require.Contains(t, status.Error, "retry failed due to lock contention") + }) + + t.Run("ReturnsErrorForNilDAG", func(t *testing.T) { + th := test.SetupCommand(t) + + ctx := &cmd.Context{ + Context: th.Context, + Config: th.Config, + DAGRunStore: th.DAGRunStore, + } + + err := ctx.RecordEarlyFailure(nil, "some-run-id", errors.New("test error")) + require.Error(t, err) + require.Contains(t, err.Error(), "DAG and dag-run ID are required") + }) + + t.Run("ReturnsErrorForEmptyDAGRunID", func(t *testing.T) { + th := test.SetupCommand(t) + + dag := th.DAG(t, ` +steps: + - name: step1 + command: echo hello +`) + + ctx := &cmd.Context{ + Context: th.Context, + Config: th.Config, + DAGRunStore: th.DAGRunStore, + } + + err := ctx.RecordEarlyFailure(dag.DAG, "", errors.New("test error")) + require.Error(t, err) + require.Contains(t, err.Error(), "DAG and dag-run ID are required") + }) + + t.Run("CanRetryEarlyFailureRecord", func(t *testing.T) { + th := test.SetupCommand(t) + + dag := th.DAG(t, ` +steps: + - name: step1 + command: echo hello +`) + + dagRunID := "early-failure-retry-test" + testErr := errors.New("initial process acquisition failed") + + // Create Context and record early failure + ctx := &cmd.Context{ + Context: th.Context, + Config: th.Config, + DAGRunStore: th.DAGRunStore, + } + + err := ctx.RecordEarlyFailure(dag.DAG, dagRunID, testErr) + require.NoError(t, err) + + // Verify initial failure status + ref := execution.NewDAGRunRef(dag.Name, dagRunID) + attempt, err := th.DAGRunStore.FindAttempt(th.Context, ref) + require.NoError(t, err) + require.NotNil(t, attempt) + + status, err := attempt.ReadStatus(th.Context) + require.NoError(t, err) + require.Equal(t, core.Failed, status.Status) + + // Verify DAG can be read back (required for retry) + storedDAG, err := attempt.ReadDAG(th.Context) + require.NoError(t, err) + require.NotNil(t, storedDAG) + require.Equal(t, dag.Name, storedDAG.Name) + + // Now retry the early failure record + th.RunCommand(t, cmd.Retry(), test.CmdTest{ + Args: []string{"retry", "--run-id", dagRunID, dag.Name}, + }) + + // Wait for retry to complete + require.Eventually(t, func() bool { + currentStatus, err := th.DAGRunMgr.GetCurrentStatus(th.Context, dag.DAG, dagRunID) + return err == nil && currentStatus != nil && currentStatus.Status == core.Succeeded + }, 5*time.Second, 100*time.Millisecond, "Retry should succeed") + + // Verify final status is succeeded + finalStatus, err := th.DAGRunMgr.GetCurrentStatus(th.Context, dag.DAG, dagRunID) + require.NoError(t, err) + require.Equal(t, core.Succeeded, finalStatus.Status) + }) +} diff --git a/internal/cmd/restart.go b/internal/cmd/restart.go index 70fe4329..8ae6a456 100644 --- a/internal/cmd/restart.go +++ b/internal/cmd/restart.go @@ -87,9 +87,9 @@ func runRestart(ctx *Context, args []string) error { return nil } -func handleRestartProcess(ctx *Context, d *core.DAG, dagRunID string) error { +func handleRestartProcess(ctx *Context, d *core.DAG, oldDagRunID string) error { // Stop if running - if err := stopDAGIfRunning(ctx, ctx.DAGRunMgr, d, dagRunID); err != nil { + if err := stopDAGIfRunning(ctx, ctx.DAGRunMgr, d, oldDagRunID); err != nil { return err } @@ -99,36 +99,40 @@ func handleRestartProcess(ctx *Context, d *core.DAG, dagRunID string) error { time.Sleep(d.RestartWait) } + // Generate new dag-run ID for the restart + newDagRunID, err := genRunID() + if err != nil { + return fmt.Errorf("failed to generate dag-run ID: %w", err) + } + // Execute the exact same DAG with the same parameters but a new dag-run ID if err := ctx.ProcStore.Lock(ctx, d.ProcGroup()); err != nil { logger.Debug(ctx, "Failed to lock process group", tag.Error(err)) - return errMaxRunReached + _ = ctx.RecordEarlyFailure(d, newDagRunID, err) + return errProcAcquisitionFailed } - defer ctx.ProcStore.Unlock(ctx, d.ProcGroup()) // Acquire process handle - proc, err := ctx.ProcStore.Acquire(ctx, d.ProcGroup(), execution.NewDAGRunRef(d.Name, dagRunID)) + proc, err := ctx.ProcStore.Acquire(ctx, d.ProcGroup(), execution.NewDAGRunRef(d.Name, newDagRunID)) if err != nil { + ctx.ProcStore.Unlock(ctx, d.ProcGroup()) logger.Debug(ctx, "Failed to acquire process handle", tag.Error(err)) - return fmt.Errorf("failed to acquire process handle: %w", errMaxRunReached) + _ = ctx.RecordEarlyFailure(d, newDagRunID, err) + return fmt.Errorf("failed to acquire process handle: %w", errProcAcquisitionFailed) } defer func() { _ = proc.Stop(ctx) }() - // Unlock the process group + // Unlock the process group immediately after acquiring the handle ctx.ProcStore.Unlock(ctx, d.ProcGroup()) - return executeDAG(ctx, ctx.DAGRunMgr, d) + return executeDAGWithRunID(ctx, ctx.DAGRunMgr, d, newDagRunID) } -// It returns an error if run ID generation, log or DAG store initialization, or agent execution fails. -func executeDAG(ctx *Context, cli runtime.Manager, dag *core.DAG) error { - dagRunID, err := genRunID() - if err != nil { - return fmt.Errorf("failed to generate dag-run ID: %w", err) - } - +// executeDAGWithRunID executes a DAG with a pre-generated run ID. +// It returns an error if log or DAG store initialization, or agent execution fails. +func executeDAGWithRunID(ctx *Context, cli runtime.Manager, dag *core.DAG, dagRunID string) error { logFile, err := ctx.OpenLogFile(dag, dagRunID) if err != nil { return fmt.Errorf("failed to initialize log file: %w", err) diff --git a/internal/cmd/retry.go b/internal/cmd/retry.go index 712090c2..b0420edc 100644 --- a/internal/cmd/retry.go +++ b/internal/cmd/retry.go @@ -2,18 +2,14 @@ package cmd import ( "fmt" - "log/slog" "path/filepath" - "time" "github.com/dagu-org/dagu/internal/common/fileutil" "github.com/dagu-org/dagu/internal/common/logger" "github.com/dagu-org/dagu/internal/common/logger/tag" - "github.com/dagu-org/dagu/internal/common/stringutil" "github.com/dagu-org/dagu/internal/core" "github.com/dagu-org/dagu/internal/core/execution" "github.com/dagu-org/dagu/internal/runtime/agent" - "github.com/dagu-org/dagu/internal/runtime/transform" "github.com/spf13/cobra" ) @@ -37,12 +33,12 @@ Examples: ) } -var retryFlags = []commandLineFlag{dagRunIDFlagRetry, stepNameForRetry, disableMaxActiveRuns, noQueueFlag} +var retryFlags = []commandLineFlag{dagRunIDFlagRetry, stepNameForRetry} func runRetry(ctx *Context, args []string) error { + // Extract retry details dagRunID, _ := ctx.StringParam("run-id") stepName, _ := ctx.StringParam("step") - disableMaxActiveRuns := ctx.Command.Flags().Changed("disable-max-active-runs") name, err := extractDAGName(ctx, args[0]) if err != nil { @@ -71,48 +67,18 @@ func runRetry(ctx *Context, args []string) error { // Set DAG context for all logs ctx.Context = logger.WithValues(ctx.Context, tag.DAG(dag.Name), tag.RunID(dagRunID)) - // Check if queue is disabled via config or flag - queueDisabled := !ctx.Config.Queues.Enabled || ctx.Command.Flags().Changed("no-queue") - - // Check if this DAG should be distributed to workers - // If the DAG has a workerSelector and the queue is not disabled, - // enqueue it so the scheduler can dispatch it to a worker. - // The --no-queue flag acts as a circuit breaker to prevent infinite loops - // when the worker executes the dispatched retry task. - if !queueDisabled && len(dag.WorkerSelector) > 0 { - logger.Info(ctx, "DAG has workerSelector, enqueueing retry for distributed execution", slog.Any("worker-selector", dag.WorkerSelector)) - - // Enqueue the retry - must create new attempt with status "Queued" - // so the scheduler will process it - if err := enqueueRetry(ctx, dag, dagRunID); err != nil { - return fmt.Errorf("failed to enqueue retry: %w", err) - } - - logger.Info(ctx.Context, "Retry enqueued") - return nil - } - // Try lock proc store to avoid race if err := ctx.ProcStore.Lock(ctx, dag.ProcGroup()); err != nil { return fmt.Errorf("failed to lock process group: %w", err) } - defer ctx.ProcStore.Unlock(ctx, dag.ProcGroup()) - - if !disableMaxActiveRuns && dag.MaxActiveRuns == 1 { - liveCount, err := ctx.ProcStore.CountAliveByDAGName(ctx, dag.ProcGroup(), dag.Name) - if err != nil { - return fmt.Errorf("failed to access proc store: %w", err) - } - if liveCount > 0 { - return fmt.Errorf("DAG %s is already running, cannot retry", dag.Name) - } - } // Acquire process handle proc, err := ctx.ProcStore.Acquire(ctx, dag.ProcGroup(), execution.NewDAGRunRef(dag.Name, dagRunID)) if err != nil { + ctx.ProcStore.Unlock(ctx, dag.ProcGroup()) logger.Debug(ctx, "Failed to acquire process handle", tag.Error(err)) - return fmt.Errorf("failed to acquire process handle: %w", errMaxRunReached) + _ = ctx.RecordEarlyFailure(dag, dagRunID, err) + return fmt.Errorf("failed to acquire process handle: %w", errProcAcquisitionFailed) } defer func() { _ = proc.Stop(ctx) @@ -181,65 +147,3 @@ func executeRetry(ctx *Context, dag *core.DAG, status *execution.DAGRunStatus, r // Use the shared agent execution function return ExecuteAgent(ctx, agentInstance, dag, status.DAGRunID, logFile) } - -// enqueueRetry creates a new attempt for retry and enqueues it for execution -func enqueueRetry(ctx *Context, dag *core.DAG, dagRunID string) error { - // Queued dag-runs must not have a location because it is used to generate - // unix pipe. If two DAGs has same location, they can not run at the same time. - // Queued DAGs can be run at the same time depending on the `maxActiveRuns` setting. - dag.Location = "" - - // Check if queues are enabled - if !ctx.Config.Queues.Enabled { - return fmt.Errorf("queues are disabled in configuration") - } - - // Create a new attempt for retry - att, err := ctx.DAGRunStore.CreateAttempt(ctx.Context, dag, time.Now(), dagRunID, execution.NewDAGRunAttemptOptions{ - Retry: true, - }) - if err != nil { - return fmt.Errorf("failed to create retry attempt: %w", err) - } - - // Generate log file name - logFile, err := ctx.GenLogFileName(dag, dagRunID) - if err != nil { - return fmt.Errorf("failed to generate log file name: %w", err) - } - - // Create status for the new attempt with "Queued" status - opts := []transform.StatusOption{ - transform.WithLogFilePath(logFile), - transform.WithAttemptID(att.ID()), - transform.WithPreconditions(dag.Preconditions), - transform.WithQueuedAt(stringutil.FormatTime(time.Now())), - transform.WithHierarchyRefs( - execution.NewDAGRunRef(dag.Name, dagRunID), - execution.DAGRunRef{}, - ), - } - - dagStatus := transform.NewStatusBuilder(dag).Create(dagRunID, core.Queued, 0, time.Time{}, opts...) - - // Write the status - if err := att.Open(ctx.Context); err != nil { - return fmt.Errorf("failed to open attempt: %w", err) - } - defer func() { - _ = att.Close(ctx.Context) - }() - if err := att.Write(ctx.Context, dagStatus); err != nil { - return fmt.Errorf("failed to save status: %w", err) - } - - // Enqueue the DAG run - dagRun := execution.NewDAGRunRef(dag.Name, dagRunID) - if err := ctx.QueueStore.Enqueue(ctx.Context, dag.ProcGroup(), execution.QueuePriorityLow, dagRun); err != nil { - return fmt.Errorf("failed to enqueue: %w", err) - } - - logger.Info(ctx, "Retry attempt created and enqueued", tag.AttemptID(att.ID())) - - return nil -} diff --git a/internal/cmd/start.go b/internal/cmd/start.go index 0647cd70..715d1381 100644 --- a/internal/cmd/start.go +++ b/internal/cmd/start.go @@ -57,7 +57,7 @@ This command parses the DAG definition, resolves parameters, and initiates the D } // Command line flags for the start command -var startFlags = []commandLineFlag{paramsFlag, nameFlag, dagRunIDFlag, fromRunIDFlag, parentDAGRunFlag, rootDAGRunFlag, noQueueFlag, disableMaxActiveRuns, defaultWorkingDirFlag} +var startFlags = []commandLineFlag{paramsFlag, nameFlag, dagRunIDFlag, fromRunIDFlag, parentDAGRunFlag, rootDAGRunFlag, defaultWorkingDirFlag} var fromRunIDFlag = commandLineFlag{ name: "from-run-id", @@ -81,8 +81,6 @@ func runStart(ctx *Context, args []string) error { return fmt.Errorf("--from-run-id cannot be combined with --parent or --root") } - disableMaxActiveRuns := ctx.Command.Flags().Changed("disable-max-active-runs") - var ( dag *core.DAG params string @@ -157,14 +155,6 @@ func runStart(ctx *Context, args []string) error { return handleSubDAGRun(ctx, dag, dagRunID, params, root, parent) } - // Check if queue is disabled via config or flag - queueDisabled := !ctx.Config.Queues.Enabled - - // check no-queue flag (overrides config) - if ctx.Command.Flags().Changed("no-queue") { - queueDisabled = true - } - // Check if the DAG run-id is unique attempt, _ := ctx.DAGRunStore.FindAttempt(ctx, root) if attempt != nil { @@ -172,15 +162,6 @@ func runStart(ctx *Context, args []string) error { return fmt.Errorf("dag-run ID %s already exists for DAG %s", dagRunID, dag.Name) } - // Count running DAG to check against maxActiveRuns setting (best effort). - liveCount, err := ctx.ProcStore.CountAliveByDAGName(ctx, dag.ProcGroup(), dag.Name) - if err != nil { - return fmt.Errorf("failed to access proc store: %w", err) - } - if !disableMaxActiveRuns && dag.MaxActiveRuns == 1 && liveCount > 0 { - return fmt.Errorf("DAG %s is already running, cannot start", dag.Name) - } - // Log root dag-run or reschedule action if fromRunID != "" { logger.Info(ctx, "Rescheduling dag-run", @@ -191,65 +172,36 @@ func runStart(ctx *Context, args []string) error { logger.Info(ctx, "Executing root dag-run", slog.String("params", params)) } - // Check if this DAG should be distributed to workers - // If the DAG has a workerSelector and the queue is not disabled, - // enqueue it so the scheduler can dispatch it to a worker. - // The --no-queue flag acts as a circuit breaker to prevent infinite loops - // when the worker executes the dispatched task. - if !queueDisabled && len(dag.WorkerSelector) > 0 { - logger.Info(ctx, "DAG has workerSelector, enqueueing for distributed execution", slog.Any("worker-selector", dag.WorkerSelector)) - dag.Location = "" // Queued dag-runs must not have a location - return enqueueDAGRun(ctx, dag, dagRunID) - } - - err = tryExecuteDAG(ctx, dag, dagRunID, root, disableMaxActiveRuns) - if errors.Is(err, errMaxRunReached) && !queueDisabled && !disableMaxActiveRuns { - dag.Location = "" // Queued dag-runs must not have a location - return enqueueDAGRun(ctx, dag, dagRunID) - } - - return err // return executed result + return tryExecuteDAG(ctx, dag, dagRunID, root) } var ( - errMaxRunReached = errors.New("max run reached") + errProcAcquisitionFailed = errors.New("failed to acquire process handle") ) -// tryExecuteDAG tries to run the DAG within the max concurrent run config -func tryExecuteDAG(ctx *Context, dag *core.DAG, dagRunID string, root execution.DAGRunRef, disableMaxActiveRuns bool) error { +// tryExecuteDAG tries to run the DAG. +func tryExecuteDAG(ctx *Context, dag *core.DAG, dagRunID string, root execution.DAGRunRef) error { if err := ctx.ProcStore.Lock(ctx, dag.ProcGroup()); err != nil { logger.Debug(ctx, "Failed to lock process group", tag.Error(err)) - return errMaxRunReached - } - defer ctx.ProcStore.Unlock(ctx, dag.ProcGroup()) - - if !disableMaxActiveRuns { - runningCount, err := ctx.ProcStore.CountAlive(ctx, dag.ProcGroup()) - if err != nil { - logger.Debug(ctx, "Failed to count live processes", tag.Error(err)) - return fmt.Errorf("failed to count live process for %s: %w", dag.ProcGroup(), errMaxRunReached) - } - - // If the DAG has a queue configured and maxActiveRuns > 0, ensure the number - // of active runs in the queue does not exceed this limit. - if dag.MaxActiveRuns > 0 && runningCount >= dag.MaxActiveRuns { - // It's not possible to run right now. - return fmt.Errorf("max active run is reached (%d >= %d): %w", runningCount, dag.MaxActiveRuns, errMaxRunReached) - } + _ = ctx.RecordEarlyFailure(dag, dagRunID, err) + return errProcAcquisitionFailed } // Acquire process handle proc, err := ctx.ProcStore.Acquire(ctx, dag.ProcGroup(), execution.NewDAGRunRef(dag.Name, dagRunID)) if err != nil { + ctx.ProcStore.Unlock(ctx, dag.ProcGroup()) logger.Debug(ctx, "Failed to acquire process handle", tag.Error(err)) - return fmt.Errorf("failed to acquire process handle: %w", errMaxRunReached) + _ = ctx.RecordEarlyFailure(dag, dagRunID, err) + return fmt.Errorf("failed to acquire process handle: %w", errProcAcquisitionFailed) } defer func() { _ = proc.Stop(ctx) }() ctx.Proc = proc - // Unlock the process group + // Unlock the process group immediately after acquiring the handle + // to allow other instances of the same DAG to start. ctx.ProcStore.Unlock(ctx, dag.ProcGroup()) return executeDAGRun(ctx, dag, execution.DAGRunRef{}, dagRunID, root) diff --git a/internal/core/execution/runstatus.go b/internal/core/execution/runstatus.go index 96ac667e..1cb30b0b 100644 --- a/internal/core/execution/runstatus.go +++ b/internal/core/execution/runstatus.go @@ -51,6 +51,7 @@ type DAGRunStatus struct { StartedAt string `json:"startedAt,omitempty"` FinishedAt string `json:"finishedAt,omitempty"` Log string `json:"log,omitempty"` + Error string `json:"error,omitempty"` Params string `json:"params,omitempty"` ParamsList []string `json:"paramsList,omitempty"` Preconditions []*core.Condition `json:"preconditions,omitempty"` @@ -64,6 +65,9 @@ func (st *DAGRunStatus) DAGRun() DAGRunRef { // Errors returns a slice of errors for the current status func (st *DAGRunStatus) Errors() []error { var errs []error + if st.Error != "" { + errs = append(errs, fmt.Errorf("%s", st.Error)) + } for _, node := range st.Nodes { if node.Error != "" { errs = append(errs, fmt.Errorf("node %s: %s", node.Step.Name, node.Error)) diff --git a/internal/integration/distributed_e2e_test.go b/internal/integration/distributed_e2e_test.go index 1a9149e9..52a87da9 100644 --- a/internal/integration/distributed_e2e_test.go +++ b/internal/integration/distributed_e2e_test.go @@ -50,15 +50,15 @@ steps: // Load the DAG dagWrapper := coord.DAG(t, yamlContent) - // Build the start command spec + // Build the enqueue command spec subCmdBuilder := runtime.NewSubCmdBuilder(coord.Config) - startSpec := subCmdBuilder.Start(dagWrapper.DAG, runtime.StartOptions{ + enqueueSpec := subCmdBuilder.Enqueue(dagWrapper.DAG, runtime.EnqueueOptions{ Quiet: true, }) - // Execute the start command (spawns subprocess) - err := runtime.Start(coord.Context, startSpec) - require.NoError(t, err, "Start command should succeed") + // Execute the enqueue command (spawns subprocess) + err := runtime.Start(coord.Context, enqueueSpec) + require.NoError(t, err, "Enqueue command should succeed") // Wait for the subprocess to complete enqueueing require.Eventually(t, func() bool { @@ -138,10 +138,10 @@ steps: t.Log("E2E test completed successfully!") }) - t.Run("E2E_StartCommand_WithNoQueueFlag_ShouldExecuteDirectly", func(t *testing.T) { - // Verify that --no-queue flag bypasses enqueueing even when workerSelector exists + t.Run("E2E_StartCommand_WorkerSelector_ShouldExecuteLocally", func(t *testing.T) { + // Verify that dagu start always executes locally even when workerSelector exists yamlContent := ` -name: no-queue-dag +name: local-start-dag workerSelector: test: value steps: @@ -154,11 +154,10 @@ steps: // Load the DAG dagWrapper := coord.DAG(t, yamlContent) - // Build start command WITH --no-queue flag + // Build start command subCmdBuilder := runtime.NewSubCmdBuilder(coord.Config) startSpec := subCmdBuilder.Start(dagWrapper.DAG, runtime.StartOptions{ - Quiet: true, - NoQueue: true, // This bypasses enqueueing + Quiet: true, }) err := runtime.Start(ctx, startSpec) @@ -170,7 +169,7 @@ steps: // Should NOT be enqueued (executed directly) queueItems, err := coord.QueueStore.ListByDAGName(ctx, dagWrapper.ProcGroup(), dagWrapper.Name) require.NoError(t, err) - require.Len(t, queueItems, 0, "DAG should NOT be enqueued when --no-queue is set") + require.Len(t, queueItems, 0, "DAG should NOT be enqueued (dagu start runs locally)") }) t.Run("E2E_DistributedExecution_Cancellation_SubDAG", func(t *testing.T) { diff --git a/internal/integration/distributed_test.go b/internal/integration/distributed_test.go index 548fc9c8..24e349e3 100644 --- a/internal/integration/distributed_test.go +++ b/internal/integration/distributed_test.go @@ -5,20 +5,12 @@ import ( "time" "github.com/dagu-org/dagu/internal/core" - "github.com/dagu-org/dagu/internal/core/execution" "github.com/dagu-org/dagu/internal/runtime" "github.com/dagu-org/dagu/internal/test" "github.com/stretchr/testify/require" ) -// TestStartCommandWithWorkerSelector tests that the start command enqueues -// DAGs with workerSelector instead of executing them locally, and that the -// scheduler dispatches them to workers correctly. -// -// This is the integration test for the distributed execution fix where: -// 1. start command checks for workerSelector → enqueues (instead of executing) -// 2. Scheduler queue handler picks it up → dispatches to coordinator -// 3. Worker executes with --no-queue flag → executes directly (no re-enqueue) +// 3. Worker executes directly as ordered by the scheduler (no re-enqueue) func TestStartCommandWithWorkerSelector(t *testing.T) { t.Run("StartCommand_WithWorkerSelector_ShouldEnqueue", func(t *testing.T) { // This test verifies that when a DAG has workerSelector, @@ -53,16 +45,16 @@ steps: err := runtime.Start(coord.Context, startSpec) require.NoError(t, err, "Start command should succeed") - // Wait for the DAG to be enqueued + // Wait for completion (executed locally) require.Eventually(t, func() bool { - queueItems, err := coord.QueueStore.ListByDAGName(coord.Context, dagWrapper.ProcGroup(), dagWrapper.Name) - return err == nil && len(queueItems) == 1 - }, 2*time.Second, 50*time.Millisecond, "DAG should be enqueued") + status, err := coord.DAGRunMgr.GetLatestStatus(coord.Context, dagWrapper.DAG) + return err == nil && status.Status == core.Succeeded + }, 2*time.Second, 50*time.Millisecond, "DAG should complete successfully") - // Verify the DAG was enqueued (not executed locally) + // Verify the DAG was NOT enqueued (executed locally) queueItems, err := coord.QueueStore.ListByDAGName(coord.Context, dagWrapper.ProcGroup(), dagWrapper.Name) require.NoError(t, err) - require.Len(t, queueItems, 1, "DAG should be enqueued once") + require.Len(t, queueItems, 0, "DAG should NOT be enqueued (dagu start runs locally)") if len(queueItems) > 0 { data, err := queueItems[0].Data() @@ -70,18 +62,18 @@ steps: t.Logf("DAG enqueued: dag=%s runId=%s", data.Name, data.ID) } - // Verify the DAG status is "queued" (not started/running) + // Verify the DAG status is "succeeded" (executed locally) latest, err := coord.DAGRunMgr.GetLatestStatus(coord.Context, dagWrapper.DAG) require.NoError(t, err) - require.Equal(t, core.Queued, latest.Status, "DAG status should be queued") + require.Equal(t, core.Succeeded, latest.Status, "DAG status should be succeeded") }) - t.Run("StartCommand_WithNoQueueFlag_ShouldExecuteDirectly", func(t *testing.T) { - // Verify that --no-queue flag bypasses enqueueing + t.Run("StartCommand_WorkerSelector_ShouldExecuteLocally", func(t *testing.T) { + // Verify that dagu start always executes locally // even when workerSelector exists yamlContent := ` -name: no-queue-dag +name: local-start-dag workerSelector: test: value steps: @@ -94,11 +86,10 @@ steps: // Load the DAG dagWrapper := coord.DAG(t, yamlContent) - // Build start command WITH --no-queue flag + // Build start command subCmdBuilder := runtime.NewSubCmdBuilder(coord.Config) startSpec := subCmdBuilder.Start(dagWrapper.DAG, runtime.StartOptions{ - Quiet: true, - NoQueue: true, // This bypasses enqueueing + Quiet: true, }) err := runtime.Start(ctx, startSpec) @@ -107,7 +98,7 @@ steps: // Should NOT be enqueued (executed directly) queueItems, err := coord.QueueStore.ListByDAGName(ctx, dagWrapper.ProcGroup(), dagWrapper.Name) require.NoError(t, err) - require.Len(t, queueItems, 0, "DAG should NOT be enqueued when --no-queue is set") + require.Len(t, queueItems, 0, "DAG should NOT be enqueued (dagu start runs locally)") // Verify it succeeded (executed locally) dagWrapper.AssertLatestStatus(t, core.Succeeded) @@ -140,54 +131,39 @@ steps: Quiet: true, }) + // Execute the start command (runs locally now) err := runtime.Start(coord.Context, startSpec) - require.NoError(t, err, "Start command should succeed") + require.NoError(t, err, "Start command should succeed (process started)") - // Wait for the DAG to be enqueued + // Wait for completion (executed locally) require.Eventually(t, func() bool { - queueItems, err := coord.QueueStore.ListByDAGName(coord.Context, dagWrapper.ProcGroup(), dagWrapper.Name) - return err == nil && len(queueItems) == 1 - }, 2*time.Second, 50*time.Millisecond, "DAG should be enqueued") + status, err := coord.DAGRunMgr.GetLatestStatus(coord.Context, dagWrapper.DAG) + return err == nil && status.Status == core.Failed + }, 5*time.Second, 100*time.Millisecond, "DAG should fail") - // Verify the DAG was enqueued + // Should NOT be enqueued queueItems, err := coord.QueueStore.ListByDAGName(coord.Context, dagWrapper.ProcGroup(), dagWrapper.Name) require.NoError(t, err) - require.Len(t, queueItems, 1, "DAG should be enqueued once") + require.Len(t, queueItems, 0, "DAG should NOT be enqueued (dagu start runs locally)") - var dagRunID string - var dagRun execution.DAGRunRef - if len(queueItems) > 0 { - data, err := queueItems[0].Data() - require.NoError(t, err, "Should be able to get queue item data") - dagRunID = data.ID - dagRun = *data - t.Logf("DAG enqueued: dag=%s runId=%s", data.Name, data.ID) - } - - // Dequeue it to simulate processing - _, err = coord.QueueStore.DequeueByDAGRunID(coord.Context, dagWrapper.ProcGroup(), dagRun) + status, err := coord.DAGRunMgr.GetLatestStatus(coord.Context, dagWrapper.DAG) require.NoError(t, err) + dagRunID := status.DAGRunID + t.Logf("DAG failed: dag=%s runId=%s", status.Name, status.DAGRunID) - // Now retry the DAG - it should be enqueued again - retrySpec := subCmdBuilder.Retry(dagWrapper.DAG, dagRunID, "", false) - err = runtime.Run(coord.Context, retrySpec) - require.NoError(t, err, "Retry command should succeed") + // Now retry the DAG - it should run locally + retrySpec := subCmdBuilder.Retry(dagWrapper.DAG, dagRunID, "") + err = runtime.Start(coord.Context, retrySpec) + require.NoError(t, err, "Retry command should succeed (process started)") - // Wait for the retry to be enqueued + // Wait for completion require.Eventually(t, func() bool { - queueItems, err := coord.QueueStore.ListByDAGName(coord.Context, dagWrapper.ProcGroup(), dagWrapper.Name) - return err == nil && len(queueItems) == 1 - }, 2*time.Second, 50*time.Millisecond, "Retry should be enqueued") + status, err := coord.DAGRunMgr.GetLatestStatus(coord.Context, dagWrapper.DAG) + return err == nil && status.Status == core.Failed + }, 5*time.Second, 100*time.Millisecond, "Retry should fail") - // Verify the retry was enqueued + // Should NOT be enqueued queueItems, err = coord.QueueStore.ListByDAGName(coord.Context, dagWrapper.ProcGroup(), dagWrapper.Name) require.NoError(t, err) - require.Len(t, queueItems, 1, "Retry should be enqueued once") - - if len(queueItems) > 0 { - data, err := queueItems[0].Data() - require.NoError(t, err, "Should be able to get queue item data") - require.Equal(t, dagRunID, data.ID, "Should have same DAG run ID") - t.Logf("Retry enqueued: dag=%s runId=%s", data.Name, data.ID) - } + require.Len(t, queueItems, 0, "Retry should NOT be enqueued (dagu retry runs locally)") } diff --git a/internal/integration/gha_test.go b/internal/integration/gha_test.go index 344681d3..47bfdc7d 100644 --- a/internal/integration/gha_test.go +++ b/internal/integration/gha_test.go @@ -20,7 +20,7 @@ func TestGitHubActionsExecutor(t *testing.T) { executor: type: github_action config: - runner: node:24-bookworm + runner: node:22-bookworm params: who-to-greet: "Morning" output: ACTION_OUTPUT diff --git a/internal/runtime/executor/dag_runner.go b/internal/runtime/executor/dag_runner.go index ba3595eb..4c429a13 100644 --- a/internal/runtime/executor/dag_runner.go +++ b/internal/runtime/executor/dag_runner.go @@ -121,8 +121,6 @@ func (e *SubDAGExecutor) buildCommand(ctx context.Context, runParams RunParams, fmt.Sprintf("--root=%s", rCtx.RootDAGRun.String()), fmt.Sprintf("--parent=%s", rCtx.DAGRunRef().String()), fmt.Sprintf("--run-id=%s", runParams.RunID), - "--no-queue", - "--disable-max-active-runs", } if workDir != "" { args = append(args, fmt.Sprintf("--default-working-dir=%s", workDir)) diff --git a/internal/runtime/executor/dag_runner_test.go b/internal/runtime/executor/dag_runner_test.go index d93eb062..3d9085c2 100644 --- a/internal/runtime/executor/dag_runner_test.go +++ b/internal/runtime/executor/dag_runner_test.go @@ -193,7 +193,6 @@ func TestBuildCommand(t *testing.T) { assert.Contains(t, args, "--root=parent:root-123") assert.Contains(t, args, "--parent=parent:parent-456") assert.Contains(t, args, "--run-id=child-789") - assert.Contains(t, args, "--no-queue") assert.Contains(t, args, "/path/to/test.yaml") assert.Contains(t, args, "--") assert.Contains(t, args, "param1=value1 param2=value2") diff --git a/internal/runtime/subcmd.go b/internal/runtime/subcmd.go index f073155c..e4437b18 100644 --- a/internal/runtime/subcmd.go +++ b/internal/runtime/subcmd.go @@ -42,9 +42,7 @@ func (b *SubCmdBuilder) Start(dag *core.DAG, opts StartOptions) CmdSpec { if opts.Quiet { args = append(args, "-q") } - if opts.NoQueue { - args = append(args, "--no-queue") - } + if opts.DAGRunID != "" { args = append(args, fmt.Sprintf("--run-id=%s", opts.DAGRunID)) } @@ -141,15 +139,13 @@ func (b *SubCmdBuilder) Restart(dag *core.DAG, opts RestartOptions) CmdSpec { } // Retry creates a retry command spec. -func (b *SubCmdBuilder) Retry(dag *core.DAG, dagRunID string, stepName string, disableMaxActiveRuns bool) CmdSpec { +func (b *SubCmdBuilder) Retry(dag *core.DAG, dagRunID string, stepName string) CmdSpec { args := []string{"retry", fmt.Sprintf("--run-id=%s", dagRunID), "-q"} if stepName != "" { args = append(args, fmt.Sprintf("--step=%s", stepName)) } - if disableMaxActiveRuns { - args = append(args, "--disable-max-active-runs") - } + if b.configFile != "" { args = append(args, "--config", b.configFile) } @@ -174,7 +170,7 @@ func (b *SubCmdBuilder) TaskStart(task *coordinatorv1.Task) CmdSpec { args = append(args, fmt.Sprintf("--parent=%s:%s", task.ParentDagRunName, task.ParentDagRunId)) } - args = append(args, fmt.Sprintf("--run-id=%s", task.DagRunId), "--no-queue", "--disable-max-active-runs") + args = append(args, fmt.Sprintf("--run-id=%s", task.DagRunId)) if b.configFile != "" { args = append(args, "--config", b.configFile) @@ -194,7 +190,7 @@ func (b *SubCmdBuilder) TaskStart(task *coordinatorv1.Task) CmdSpec { // TaskRetry creates a retry command spec for coordinator tasks. func (b *SubCmdBuilder) TaskRetry(task *coordinatorv1.Task) CmdSpec { - args := []string{"retry", fmt.Sprintf("--run-id=%s", task.DagRunId), "--no-queue", "--disable-max-active-runs", "-q"} + args := []string{"retry", fmt.Sprintf("--run-id=%s", task.DagRunId), "-q"} if task.Step != "" { args = append(args, fmt.Sprintf("--step=%s", task.Step)) @@ -224,10 +220,10 @@ type CmdSpec struct { // StartOptions contains options for initiating a dag-run. type StartOptions struct { - Params string // Parameters to pass to the DAG - Quiet bool // Whether to run in quiet mode - DAGRunID string // ID for the dag-run - NoQueue bool // Do not allow queueing + Params string // Parameters to pass to the DAG + Quiet bool // Whether to run in quiet mode + DAGRunID string // ID for the dag-run + NameOverride string // Optional DAG name override FromRunID string // Historic dag-run ID to use as a template Target string // Optional CLI argument override (DAG name or file path) diff --git a/internal/runtime/subcmd_test.go b/internal/runtime/subcmd_test.go index 834dc634..80961e88 100644 --- a/internal/runtime/subcmd_test.go +++ b/internal/runtime/subcmd_test.go @@ -79,16 +79,6 @@ func TestStart(t *testing.T) { assert.Contains(t, spec.Args, "-q") }) - t.Run("StartWithNoQueue", func(t *testing.T) { - t.Parallel() - opts := runtime.StartOptions{ - NoQueue: true, - } - spec := builder.Start(dag, opts) - - assert.Contains(t, spec.Args, "--no-queue") - }) - t.Run("StartWithDAGRunID", func(t *testing.T) { t.Parallel() opts := runtime.StartOptions{ @@ -104,7 +94,6 @@ func TestStart(t *testing.T) { opts := runtime.StartOptions{ Params: "env=prod", Quiet: true, - NoQueue: true, DAGRunID: "full-test-id", } spec := builder.Start(dag, opts) @@ -113,7 +102,6 @@ func TestStart(t *testing.T) { assert.Contains(t, spec.Args, "-p") assert.Contains(t, spec.Args, `"env=prod"`) assert.Contains(t, spec.Args, "-q") - assert.Contains(t, spec.Args, "--no-queue") assert.Contains(t, spec.Args, "--run-id=full-test-id") assert.Contains(t, spec.Args, "--config") assert.Contains(t, spec.Args, "/path/to/dag.yaml") @@ -365,7 +353,7 @@ func TestRetry(t *testing.T) { t.Run("BasicRetry", func(t *testing.T) { t.Parallel() - spec := builder.Retry(dag, "retry-run-id", "", false) + spec := builder.Retry(dag, "retry-run-id", "") assert.Equal(t, "/usr/bin/dagu", spec.Executable) assert.Contains(t, spec.Args, "retry") @@ -377,26 +365,18 @@ func TestRetry(t *testing.T) { t.Run("RetryWithStepName", func(t *testing.T) { t.Parallel() - spec := builder.Retry(dag, "retry-run-id", "step-1", false) + spec := builder.Retry(dag, "retry-run-id", "step-1") assert.Contains(t, spec.Args, "--step=step-1") }) - t.Run("RetryWithDisableMaxActiveRuns", func(t *testing.T) { - t.Parallel() - spec := builder.Retry(dag, "retry-run-id", "", true) - - assert.Contains(t, spec.Args, "--disable-max-active-runs") - }) - t.Run("RetryWithAllOptions", func(t *testing.T) { t.Parallel() - spec := builder.Retry(dag, "full-retry-id", "step-2", true) + spec := builder.Retry(dag, "full-retry-id", "step-2") assert.Contains(t, spec.Args, "retry") assert.Contains(t, spec.Args, "--run-id=full-retry-id") assert.Contains(t, spec.Args, "--step=step-2") - assert.Contains(t, spec.Args, "--disable-max-active-runs") assert.Contains(t, spec.Args, "test-dag") }) @@ -408,7 +388,7 @@ func TestRetry(t *testing.T) { }, } builderNoFile := runtime.NewSubCmdBuilder(cfgNoFile) - spec := builderNoFile.Retry(dag, "retry-run-id", "", false) + spec := builderNoFile.Retry(dag, "retry-run-id", "") assert.NotContains(t, spec.Args, "--config") }) @@ -436,7 +416,7 @@ func TestTaskStart(t *testing.T) { assert.Equal(t, "/usr/bin/dagu", spec.Executable) assert.Contains(t, spec.Args, "start") assert.Contains(t, spec.Args, "--run-id=task-run-id") - assert.Contains(t, spec.Args, "--no-queue") + assert.Contains(t, spec.Args, "--config") assert.Contains(t, spec.Args, "/etc/dagu/config.yaml") assert.Contains(t, spec.Args, "/path/to/task.yaml") @@ -457,7 +437,7 @@ func TestTaskStart(t *testing.T) { assert.Contains(t, spec.Args, "--root=root-dag:root-id") assert.Contains(t, spec.Args, "--parent=parent-dag:parent-id") assert.Contains(t, spec.Args, "--run-id=child-run-id") - assert.Contains(t, spec.Args, "--no-queue") + }) t.Run("TaskStartWithParams", func(t *testing.T) { diff --git a/internal/runtime/transform/status.go b/internal/runtime/transform/status.go index 3309ecfc..fab1f975 100644 --- a/internal/runtime/transform/status.go +++ b/internal/runtime/transform/status.go @@ -122,6 +122,13 @@ func WithLogFilePath(logFilePath string) StatusOption { } } +// WithError returns a StatusOption that sets the top-level error message +func WithError(err string) StatusOption { + return func(s *execution.DAGRunStatus) { + s.Error = err + } +} + // WithPreconditions returns a StatusOption that sets the preconditions func WithPreconditions(conditions []*core.Condition) StatusOption { return func(s *execution.DAGRunStatus) { diff --git a/internal/service/frontend/api/v1/dags.go b/internal/service/frontend/api/v1/dags.go index 86e2d3b5..b9a60023 100644 --- a/internal/service/frontend/api/v1/dags.go +++ b/internal/service/frontend/api/v1/dags.go @@ -597,14 +597,14 @@ func (a *API) PostDAGAction(ctx context.Context, request api.PostDAGActionReques } } if request.Body.Step != nil && *request.Body.Step != "" { - spec := a.subCmdBuilder.Retry(dag, *request.Body.RequestId, *request.Body.Step, true) + spec := a.subCmdBuilder.Retry(dag, *request.Body.RequestId, *request.Body.Step) if err := runtime.Start(ctx, spec); err != nil { return nil, fmt.Errorf("error retrying DAG step: %w", err) } return api.PostDAGAction200JSONResponse{}, nil } - spec := a.subCmdBuilder.Retry(dag, *request.Body.RequestId, "", false) + spec := a.subCmdBuilder.Retry(dag, *request.Body.RequestId, "") if err := runtime.Start(ctx, spec); err != nil { return nil, fmt.Errorf("error retrying DAG: %w", err) } diff --git a/internal/service/frontend/api/v2/dagruns.go b/internal/service/frontend/api/v2/dagruns.go index fb5aef7f..60ab72dd 100644 --- a/internal/service/frontend/api/v2/dagruns.go +++ b/internal/service/frontend/api/v2/dagruns.go @@ -64,7 +64,7 @@ func (a *API) ExecuteDAGRunFromSpec(ctx context.Context, request api.ExecuteDAGR } defer cleanup() - if err := a.ensureDAGRunStartable(ctx, dag, dagRunId, singleton); err != nil { + if err := a.ensureDAGRunIDUnique(ctx, dag, dagRunId); err != nil { return nil, err } @@ -628,14 +628,14 @@ func (a *API) RetryDAGRun(ctx context.Context, request api.RetryDAGRunRequestObj } if request.Body.StepName != nil && *request.Body.StepName != "" { - spec := a.subCmdBuilder.Retry(dag, request.Body.DagRunId, *request.Body.StepName, true) + spec := a.subCmdBuilder.Retry(dag, request.Body.DagRunId, *request.Body.StepName) if err := runtime.Start(ctx, spec); err != nil { return nil, fmt.Errorf("error retrying DAG step: %w", err) } return api.RetryDAGRun200Response{}, nil } - spec := a.subCmdBuilder.Retry(dag, request.Body.DagRunId, "", false) + spec := a.subCmdBuilder.Retry(dag, request.Body.DagRunId, "") if err := runtime.Start(ctx, spec); err != nil { return nil, fmt.Errorf("error retrying DAG: %w", err) } @@ -794,7 +794,7 @@ func (a *API) RescheduleDAGRun(ctx context.Context, request api.RescheduleDAGRun newDagRunID = id } - if err := a.ensureDAGRunStartable(ctx, dag, newDagRunID, singleton); err != nil { + if err := a.ensureDAGRunIDUnique(ctx, dag, newDagRunID); err != nil { return nil, err } diff --git a/internal/service/frontend/api/v2/dags.go b/internal/service/frontend/api/v2/dags.go index f3b81d93..06a5ffad 100644 --- a/internal/service/frontend/api/v2/dags.go +++ b/internal/service/frontend/api/v2/dags.go @@ -593,7 +593,21 @@ func (a *API) ExecuteDAG(ctx context.Context, request api.ExecuteDAGRequestObjec } } - if err := a.ensureDAGRunStartable(ctx, dag, dagRunId, singleton); err != nil { + if singleton { + alive, err := a.procStore.CountAliveByDAGName(ctx, dag.ProcGroup(), dag.Name) + if err != nil { + return nil, fmt.Errorf("failed to check singleton execution status: %w", err) + } + if alive > 0 { + return nil, &Error{ + HTTPStatus: http.StatusConflict, + Code: api.ErrorCodeAlreadyExists, + Message: fmt.Sprintf("DAG %s is already running (singleton mode)", dag.Name), + } + } + } + + if err := a.ensureDAGRunIDUnique(ctx, dag, dagRunId); err != nil { return nil, err } @@ -615,16 +629,6 @@ func (a *API) startDAGRun(ctx context.Context, dag *core.DAG, params, dagRunID, }) } -// ensureDAGRunStartable validates that a DAG-run can be started. -// It checks both DAG-run ID uniqueness and concurrency limits. -// dagRunID must be non-empty before calling this function. -func (a *API) ensureDAGRunStartable(ctx context.Context, dag *core.DAG, dagRunID string, singleton bool) error { - if err := a.ensureDAGRunIDUnique(ctx, dag, dagRunID); err != nil { - return err - } - return a.ensureDAGCapacity(ctx, dag, singleton) -} - // ensureDAGRunIDUnique validates that the given dagRunID is not already in use for this DAG. func (a *API) ensureDAGRunIDUnique(ctx context.Context, dag *core.DAG, dagRunID string) error { if dagRunID == "" { @@ -642,74 +646,6 @@ func (a *API) ensureDAGRunIDUnique(ctx context.Context, dag *core.DAG, dagRunID return nil } -// ensureDAGCapacity validates that the DAG has capacity to start a new run. -// It checks both singleton constraints and maxActiveRuns limits. -func (a *API) ensureDAGCapacity(ctx context.Context, dag *core.DAG, singleton bool) error { - liveCount, err := a.procStore.CountAliveByDAGName(ctx, dag.ProcGroup(), dag.Name) - if err != nil { - return fmt.Errorf("failed to access proc store: %w", err) - } - - if isSingletonExecution(dag, singleton) { - if liveCount > 0 { - return &Error{ - HTTPStatus: http.StatusConflict, - Code: api.ErrorCodeMaxRunReached, - Message: fmt.Sprintf("DAG %s is already running, cannot start", dag.Name), - } - } - } - - queuedRuns, err := a.queueStore.ListByDAGName(ctx, dag.ProcGroup(), dag.Name) - if err != nil { - return fmt.Errorf("failed to read queue: %w", err) - } - - if dag.MaxActiveRuns > 0 && len(queuedRuns)+liveCount >= dag.MaxActiveRuns { - return &Error{ - HTTPStatus: http.StatusConflict, - Code: api.ErrorCodeMaxRunReached, - Message: fmt.Sprintf("DAG %s is already in the queue (maxActiveRuns=%d), cannot start", dag.Name, dag.MaxActiveRuns), - } - } - - return nil -} - -// isSingletonExecution returns true if only one instance of the DAG should run at a time. -func isSingletonExecution(dag *core.DAG, singleton bool) bool { - return singleton || dag.MaxActiveRuns == 1 -} - -// ensureSingletonEnqueue checks that no instance of the DAG is running or queued. -func (a *API) ensureSingletonEnqueue(ctx context.Context, dag *core.DAG) error { - liveCount, err := a.procStore.CountAliveByDAGName(ctx, dag.ProcGroup(), dag.Name) - if err != nil { - return fmt.Errorf("failed to access proc store: %w", err) - } - if liveCount > 0 { - return &Error{ - HTTPStatus: http.StatusConflict, - Code: api.ErrorCodeMaxRunReached, - Message: fmt.Sprintf("DAG %s is already running, cannot enqueue", dag.Name), - } - } - - queuedRuns, err := a.queueStore.ListByDAGName(ctx, dag.ProcGroup(), dag.Name) - if err != nil { - return fmt.Errorf("failed to read queue: %w", err) - } - if len(queuedRuns) > 0 { - return &Error{ - HTTPStatus: http.StatusConflict, - Code: api.ErrorCodeMaxRunReached, - Message: fmt.Sprintf("DAG %s is already queued, cannot enqueue", dag.Name), - } - } - - return nil -} - type startDAGRunOptions struct { params string dagRunID string @@ -720,23 +656,10 @@ type startDAGRunOptions struct { } func (a *API) startDAGRunWithOptions(ctx context.Context, dag *core.DAG, opts startDAGRunOptions) error { - var noQueue bool - if isSingletonExecution(dag, opts.singleton) { - noQueue = true - } - - if len(dag.WorkerSelector) > 0 { - // If workerSelector is set, we should queue the DAG-run, so that - // it gets dispatched to qualified workers rather than being started - // directly on the local node. - noQueue = false - } - spec := a.subCmdBuilder.Start(dag, runtime1.StartOptions{ Params: opts.params, DAGRunID: opts.dagRunID, Quiet: true, - NoQueue: noQueue, NameOverride: opts.nameOverride, FromRunID: opts.fromRunID, Target: opts.target, @@ -835,9 +758,32 @@ func (a *API) EnqueueDAGDAGRun(ctx context.Context, request api.EnqueueDAGDAGRun } } - if valueOf(request.Body.Singleton) { - if err := a.ensureSingletonEnqueue(ctx, dag); err != nil { - return nil, err + singleton := valueOf(request.Body.Singleton) + if singleton { + // Check if running + alive, err := a.procStore.CountAliveByDAGName(ctx, dag.ProcGroup(), dag.Name) + if err != nil { + return nil, fmt.Errorf("failed to check singleton execution status (proc): %w", err) + } + if alive > 0 { + return nil, &Error{ + HTTPStatus: http.StatusConflict, + Code: api.ErrorCodeAlreadyExists, + Message: fmt.Sprintf("DAG %s is already running (singleton mode)", dag.Name), + } + } + + // Check if queued + queued, err := a.queueStore.ListByDAGName(ctx, dag.ProcGroup(), dag.Name) + if err != nil { + return nil, fmt.Errorf("failed to check singleton execution status (queue): %w", err) + } + if len(queued) > 0 { + return nil, &Error{ + HTTPStatus: http.StatusConflict, + Code: api.ErrorCodeAlreadyExists, + Message: fmt.Sprintf("DAG %s is already in queue (singleton mode)", dag.Name), + } } } diff --git a/internal/service/frontend/api/v2/singleton_test.go b/internal/service/frontend/api/v2/singleton_test.go new file mode 100644 index 00000000..b6a60bb7 --- /dev/null +++ b/internal/service/frontend/api/v2/singleton_test.go @@ -0,0 +1,124 @@ +package api_test + +import ( + "fmt" + "net/http" + "testing" + "time" + + "github.com/dagu-org/dagu/api/v2" + "github.com/dagu-org/dagu/internal/core" + "github.com/dagu-org/dagu/internal/test" + "github.com/stretchr/testify/require" +) + +func TestSingleton(t *testing.T) { + server := test.SetupServer(t) + + t.Run("ExecuteDAGConflict", func(t *testing.T) { + spec := ` +steps: + - name: sleep + command: sleep 10 +` + // Create a new DAG + _ = server.Client().Post("/api/v2/dags", api.CreateNewDAGJSONRequestBody{ + Name: "singleton_exec_dag", + Spec: &spec, + }).ExpectStatus(http.StatusCreated).Send(t) + + // Execute the DAG + singleton := true + resp := server.Client().Post("/api/v2/dags/singleton_exec_dag/start", api.ExecuteDAGJSONRequestBody{ + Singleton: &singleton, + }).ExpectStatus(http.StatusOK).Send(t) + + var execResp api.ExecuteDAG200JSONResponse + resp.Unmarshal(t, &execResp) + require.NotEmpty(t, execResp.DagRunId) + + // Wait for it to be running + require.Eventually(t, func() bool { + url := fmt.Sprintf("/api/v2/dags/singleton_exec_dag/dag-runs/%s", execResp.DagRunId) + statusResp := server.Client().Get(url).ExpectStatus(http.StatusOK).Send(t) + var dagRunStatus api.GetDAGDAGRunDetails200JSONResponse + statusResp.Unmarshal(t, &dagRunStatus) + return dagRunStatus.DagRun.Status == api.Status(core.Running) + }, 5*time.Second, 100*time.Millisecond) + + // Try to execute it again with singleton: true - should conflict + server.Client().Post("/api/v2/dags/singleton_exec_dag/start", api.ExecuteDAGJSONRequestBody{ + Singleton: &singleton, + }).ExpectStatus(http.StatusConflict).Send(t) + + // Clean up (deleting the DAG will eventually stop the run) + _ = server.Client().Delete("/api/v2/dags/singleton_exec_dag").ExpectStatus(http.StatusNoContent).Send(t) + }) + + t.Run("EnqueueDAGConflict_Running", func(t *testing.T) { + spec := ` +steps: + - name: sleep + command: sleep 10 +` + // Create a new DAG + _ = server.Client().Post("/api/v2/dags", api.CreateNewDAGJSONRequestBody{ + Name: "singleton_enq_run_dag", + Spec: &spec, + }).ExpectStatus(http.StatusCreated).Send(t) + + // Execute the DAG + resp := server.Client().Post("/api/v2/dags/singleton_enq_run_dag/start", api.ExecuteDAGJSONRequestBody{}). + ExpectStatus(http.StatusOK).Send(t) + + var execResp api.ExecuteDAG200JSONResponse + resp.Unmarshal(t, &execResp) + + // Wait for it to be running + require.Eventually(t, func() bool { + url := fmt.Sprintf("/api/v2/dags/singleton_enq_run_dag/dag-runs/%s", execResp.DagRunId) + statusResp := server.Client().Get(url).ExpectStatus(http.StatusOK).Send(t) + var dagRunStatus api.GetDAGDAGRunDetails200JSONResponse + statusResp.Unmarshal(t, &dagRunStatus) + return dagRunStatus.DagRun.Status == api.Status(core.Running) + }, 5*time.Second, 100*time.Millisecond) + + // Try to enqueue it with singleton: true - should conflict because it's running + singleton := true + server.Client().Post("/api/v2/dags/singleton_enq_run_dag/enqueue", api.EnqueueDAGDAGRunJSONRequestBody{ + Singleton: &singleton, + }).ExpectStatus(http.StatusConflict).Send(t) + + // Clean up + _ = server.Client().Delete("/api/v2/dags/singleton_enq_run_dag").ExpectStatus(http.StatusNoContent).Send(t) + }) + + t.Run("EnqueueDAGConflict_Queued", func(t *testing.T) { + spec := ` +steps: + - name: sleep + command: sleep 10 +` + // Create a new DAG + _ = server.Client().Post("/api/v2/dags", api.CreateNewDAGJSONRequestBody{ + Name: "singleton_enq_q_dag", + Spec: &spec, + }).ExpectStatus(http.StatusCreated).Send(t) + + // Enqueue the DAG + resp := server.Client().Post("/api/v2/dags/singleton_enq_q_dag/enqueue", api.EnqueueDAGDAGRunJSONRequestBody{}). + ExpectStatus(http.StatusOK).Send(t) + + var enqueueResp api.EnqueueDAGDAGRun200JSONResponse + resp.Unmarshal(t, &enqueueResp) + + // Try to enqueue it again with singleton: true - should conflict because it's already queued + singleton := true + server.Client().Post("/api/v2/dags/singleton_enq_q_dag/enqueue", api.EnqueueDAGDAGRunJSONRequestBody{ + Singleton: &singleton, + }).ExpectStatus(http.StatusConflict).Send(t) + + // Clean up + _ = server.Client().Delete("/api/v2/dags/singleton_enq_q_dag").ExpectStatus(http.StatusNoContent).Send(t) + }) +} diff --git a/internal/service/scheduler/dag_executor.go b/internal/service/scheduler/dag_executor.go index 5f1314d7..9220bf7f 100644 --- a/internal/service/scheduler/dag_executor.go +++ b/internal/service/scheduler/dag_executor.go @@ -143,7 +143,7 @@ func (e *DAGExecutor) ExecuteDAG( return runtime.Start(ctx, spec) case coordinatorv1.Operation_OPERATION_RETRY: - spec := e.subCmdBuilder.Retry(dag, runID, "", true) + spec := e.subCmdBuilder.Retry(dag, runID, "") return runtime.Run(ctx, spec) case coordinatorv1.Operation_OPERATION_UNSPECIFIED: diff --git a/internal/service/worker/handler_test.go b/internal/service/worker/handler_test.go index 817fa736..fece52ad 100644 --- a/internal/service/worker/handler_test.go +++ b/internal/service/worker/handler_test.go @@ -227,8 +227,6 @@ func TestTaskHandlerStartWithDefinition(t *testing.T) { argsLines := strings.Split(strings.TrimSpace(string(argsData)), "\n") require.Contains(t, argsLines, "start") require.Contains(t, argsLines, "--run-id=run-123") - require.Contains(t, argsLines, "--no-queue") - require.Contains(t, argsLines, "--disable-max-active-runs") require.Contains(t, argsLines, task.Target) require.Contains(t, argsLines, "--") require.Contains(t, argsLines, "foo=bar")