From e36d62a398c84eddee00a9d260bb6a481d6103b1 Mon Sep 17 00:00:00 2001 From: YotaHamada Date: Mon, 18 Aug 2025 22:24:38 +0900 Subject: [PATCH] fix: improve directory lock and active process management (#1202) --- .codespellrc | 2 +- internal/agent/agent.go | 21 -- internal/cmd/dry.go | 1 - internal/cmd/restart.go | 20 +- internal/cmd/restart_test.go | 4 +- internal/cmd/retry.go | 20 +- internal/cmd/start.go | 64 +++-- internal/cmd/start_test.go | 3 +- internal/dagrun/manager.go | 12 +- internal/dagrun/manager_test.go | 50 ---- internal/frontend/api/v2/dags.go | 16 +- internal/models/proc.go | 4 + internal/persistence/dirlock/dirlock.go | 285 ++++--------------- internal/persistence/dirlock/dirlock_test.go | 83 +++--- internal/persistence/filedagrun/store.go | 4 +- internal/persistence/fileproc/procgrp.go | 8 + internal/persistence/fileproc/store.go | 57 ++-- internal/scheduler/dag_executor.go | 2 +- internal/scheduler/scheduler.go | 26 +- internal/scheduler/zombie_detector_test.go | 11 + internal/test/setup.go | 4 +- 21 files changed, 284 insertions(+), 413 deletions(-) diff --git a/.codespellrc b/.codespellrc index 289d5af5..7a31af70 100644 --- a/.codespellrc +++ b/.codespellrc @@ -4,4 +4,4 @@ skip = .git,*.svg,go.mod,go.sum,*.gen.go,pnpm-lock.yaml,*.lock,*.css,.codespellr check-hidden = true # ignore-regex = # short variables close to typos are often used -ignore-words-list = ot,te,flate +ignore-words-list = ot,te,flate,pastTime diff --git a/internal/agent/agent.go b/internal/agent/agent.go index aa45f0da..dd1b5568 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -56,9 +56,6 @@ type Agent struct { // dagRunStore is the database to store the run history. dagRunStore models.DAGRunStore - // procStore is the database to store the process information. - procStore models.ProcStore - // registry is the service registry to find the coordinator service. registry models.ServiceRegistry @@ -149,7 +146,6 @@ func New( drm dagrun.Manager, ds models.DAGStore, drs models.DAGRunStore, - ps models.ProcStore, reg models.ServiceRegistry, root digraph.DAGRunRef, opts Options, @@ -166,7 +162,6 @@ func New( dagRunMgr: drm, dagStore: ds, dagRunStore: drs, - procStore: ps, registry: reg, stepRetry: opts.StepRetry, } @@ -246,17 +241,6 @@ func (a *Agent) Run(ctx context.Context) error { return err } - // Create a process for heartbeat. We need to acquire proc file by queue name or dag name. - // This is to ensure queue scheduler can limit the number of active processes for the same queue. - proc, err := a.procStore.Acquire(ctx, a.dag.ProcGroup(), digraph.NewDAGRunRef(a.dag.Name, a.dagRunID)) - if err != nil { - return fmt.Errorf("failed to get process: %w", err) - } - defer func() { - // Stop the process and remove it from the store. - _ = proc.Stop(ctx) - }() - if !a.dry { // Setup the attempt for the dag-run. // It's not required for dry-run mode. @@ -485,11 +469,6 @@ func (a *Agent) Run(ctx context.Context) error { lastErr := a.scheduler.Schedule(ctx, a.graph, progressCh) - // Stop the process and remove it from the store. - if err := proc.Stop(ctx); err != nil { - logger.Error(ctx, "Failed to stop the heartbeat", "err", err) - } - if coordinatorCli != nil { // Cleanup the coordinator client resources if it was created. if err := coordinatorCli.Cleanup(ctx); err != nil { diff --git a/internal/cmd/dry.go b/internal/cmd/dry.go index a5a742a4..9047cc25 100644 --- a/internal/cmd/dry.go +++ b/internal/cmd/dry.go @@ -87,7 +87,6 @@ func runDry(ctx *Context, args []string) error { ctx.DAGRunMgr, dr, ctx.DAGRunStore, - ctx.ProcStore, ctx.ServiceRegistry, root, agent.Options{Dry: true}, diff --git a/internal/cmd/restart.go b/internal/cmd/restart.go index 18af431b..b21a6a93 100644 --- a/internal/cmd/restart.go +++ b/internal/cmd/restart.go @@ -100,6 +100,25 @@ func handleRestartProcess(ctx *Context, d *digraph.DAG, dagRunID string) error { } // Execute the exact same DAG with the same parameters but a new dag-run ID + if err := ctx.ProcStore.TryLock(ctx, d.ProcGroup()); err != nil { + logger.Debug(ctx, "failed to lock process group", "err", err) + return errMaxRunReached + } + defer ctx.ProcStore.Unlock(ctx, d.ProcGroup()) + + // Acquire process handle + proc, err := ctx.ProcStore.Acquire(ctx, d.ProcGroup(), digraph.NewDAGRunRef(d.Name, dagRunID)) + if err != nil { + logger.Debug(ctx, "failed to acquire process handle", "err", err) + return fmt.Errorf("failed to acquire process handle: %w", errMaxRunReached) + } + defer func() { + _ = proc.Stop(ctx) + }() + + // Unlock the process group + ctx.ProcStore.Unlock(ctx, d.ProcGroup()) + return executeDAG(ctx, ctx.DAGRunMgr, d) } @@ -134,7 +153,6 @@ func executeDAG(ctx *Context, cli dagrun.Manager, dag *digraph.DAG) error { cli, dr, ctx.DAGRunStore, - ctx.ProcStore, ctx.ServiceRegistry, digraph.NewDAGRunRef(dag.Name, dagRunID), agent.Options{Dry: false}) diff --git a/internal/cmd/restart_test.go b/internal/cmd/restart_test.go index 8ace6ac6..b7539879 100644 --- a/internal/cmd/restart_test.go +++ b/internal/cmd/restart_test.go @@ -20,9 +20,7 @@ steps: - name: "1" script: "echo $1" - name: "2" - script: "sleep 100" - depends: - - "1" + script: "sleep 1" `) go func() { diff --git a/internal/cmd/retry.go b/internal/cmd/retry.go index 35d8559e..24d6b1a3 100644 --- a/internal/cmd/retry.go +++ b/internal/cmd/retry.go @@ -67,6 +67,25 @@ func runRetry(ctx *Context, args []string) error { return fmt.Errorf("failed to read DAG from record: %w", err) } + // Try lock proc store to avoid race + if err := ctx.ProcStore.TryLock(ctx, dag.ProcGroup()); err != nil { + return fmt.Errorf("failed to lock process group: %w", err) + } + defer ctx.ProcStore.Unlock(ctx, dag.ProcGroup()) + + // Acquire process handle + proc, err := ctx.ProcStore.Acquire(ctx, dag.ProcGroup(), digraph.NewDAGRunRef(dag.Name, dagRunID)) + if err != nil { + logger.Debug(ctx, "failed to acquire process handle", "err", err) + return fmt.Errorf("failed to acquire process handle: %w", errMaxRunReached) + } + defer func() { + _ = proc.Stop(ctx) + }() + + // Unlock the process group before start DAG + ctx.ProcStore.Unlock(ctx, dag.ProcGroup()) + // The retry command is currently only supported for root DAGs. if err := executeRetry(ctx, dag, status, status.DAGRun(), stepName); err != nil { return fmt.Errorf("failed to execute retry: %w", err) @@ -102,7 +121,6 @@ func executeRetry(ctx *Context, dag *digraph.DAG, status *models.DAGRunStatus, r ctx.DAGRunMgr, dr, ctx.DAGRunStore, - ctx.ProcStore, ctx.ServiceRegistry, rootRun, agent.Options{ diff --git a/internal/cmd/start.go b/internal/cmd/start.go index 58f1ddba..22d5bcad 100644 --- a/internal/cmd/start.go +++ b/internal/cmd/start.go @@ -86,14 +86,14 @@ func runStart(ctx *Context, args []string) error { return handleChildDAGRun(ctx, dag, dagRunID, params, root, parent) } - var disabledQueue bool + var queueDisabled bool if os.Getenv("DISABLE_DAG_RUN_QUEUE") != "" { - disabledQueue = true + queueDisabled = true } // check no-queue flag if ctx.Command.Flags().Changed("no-queue") { - disabledQueue = true + queueDisabled = true } // Check if the DAG run-id is unique @@ -118,23 +118,56 @@ func runStart(ctx *Context, args []string) error { "dagRunId", dagRunID, ) - // Check if the DAG needs to be enqueued or executed directly - // We need to enqueue it unless if the queue is disabled - if dag.MaxActiveRuns < 0 || disabledQueue { - // MaxActiveRuns < 0 means queueing is disabled for this DAG - return executeDAGRun(ctx, dag, digraph.DAGRunRef{}, dagRunID, root) + err = tryExecuteDAG(ctx, dag, dagRunID, root, queueDisabled) + if errors.Is(err, errMaxRunReached) { + dag.Location = "" // Queued dag-runs must not have a location + + // Enqueue the DAG-run for execution + return enqueueDAGRun(ctx, dag, dagRunID) } - runningCount, err := ctx.ProcStore.CountAlive(ctx, dag.ProcGroup()) - if err == nil && runningCount == 0 { - // If there are no running processes, we can execute the DAG-run directly - return executeDAGRun(ctx, dag, digraph.DAGRunRef{}, dagRunID, root) + return err // return executed result +} + +var ( + errMaxRunReached = errors.New("max run reached") +) + +// tryExecuteDAG tries to run the DAG within the max concurrent run config +func tryExecuteDAG(ctx *Context, dag *digraph.DAG, dagRunID string, root digraph.DAGRunRef, queueDisabled bool) error { + if err := ctx.ProcStore.TryLock(ctx, dag.ProcGroup()); err != nil { + logger.Debug(ctx, "failed to lock process group", "err", err) + return errMaxRunReached + } + defer ctx.ProcStore.Unlock(ctx, dag.ProcGroup()) + + if !queueDisabled { + runningCount, err := ctx.ProcStore.CountAlive(ctx, dag.ProcGroup()) + if err != nil { + logger.Debug(ctx, "failed to count live processes", "err", err) + return fmt.Errorf("failed to count live process for %s: %w", dag.ProcGroup(), errMaxRunReached) + } + + if dag.MaxActiveRuns > 0 && runningCount >= dag.MaxActiveRuns { + // It's not possible to run right now. + return fmt.Errorf("max active run is reached (%d >= %d): %w", runningCount, dag.MaxActiveRuns, errMaxRunReached) + } } - dag.Location = "" // Queued dag-runs must not have a location + // Acquire process handle + proc, err := ctx.ProcStore.Acquire(ctx, dag.ProcGroup(), digraph.NewDAGRunRef(dag.Name, dagRunID)) + if err != nil { + logger.Debug(ctx, "failed to acquire process handle", "err", err) + return fmt.Errorf("failed to acquire process handle: %w", errMaxRunReached) + } + defer func() { + _ = proc.Stop(ctx) + }() - // Enqueue the DAG-run for execution - return enqueueDAGRun(ctx, dag, dagRunID) + // Unlock the process group + ctx.ProcStore.Unlock(ctx, dag.ProcGroup()) + + return executeDAGRun(ctx, dag, digraph.DAGRunRef{}, dagRunID, root) } // getDAGRunInfo extracts and validates dag-run ID and references from command flags @@ -332,7 +365,6 @@ func executeDAGRun(ctx *Context, d *digraph.DAG, parent digraph.DAGRunRef, dagRu ctx.DAGRunMgr, dr, ctx.DAGRunStore, - ctx.ProcStore, ctx.ServiceRegistry, root, agent.Options{ diff --git a/internal/cmd/start_test.go b/internal/cmd/start_test.go index 9d4a9ed1..2256de63 100644 --- a/internal/cmd/start_test.go +++ b/internal/cmd/start_test.go @@ -13,7 +13,8 @@ import ( func TestStartCommand(t *testing.T) { th := test.SetupCommand(t) - dagStart := th.DAG(t, `steps: + dagStart := th.DAG(t, `maxActiveRuns: 1 +steps: - name: "1" command: "true" `) diff --git a/internal/dagrun/manager.go b/internal/dagrun/manager.go index 2b2631cf..2d4b75ac 100644 --- a/internal/dagrun/manager.go +++ b/internal/dagrun/manager.go @@ -459,8 +459,6 @@ func (*Manager) currentStatus(_ context.Context, dag *digraph.DAG, dagRunID stri // If the DAG is running, it attempts to get the current status from the socket. // If that fails or no status exists, it returns an initial status or an error. func (m *Manager) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (models.DAGRunStatus, error) { - var dagStatus *models.DAGRunStatus - // Find the proc store to check if the DAG is running alive, _ := m.procStore.CountAlive(ctx, dag.ProcGroup()) if alive > 0 { @@ -501,15 +499,7 @@ func (m *Manager) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (models } } - // If querying the current status fails, ensure if the status is running, - if st.Status == status.Running { - if err := m.checkAndUpdateStaleRunningStatus(ctx, attempt, st); err != nil { - logger.Error(ctx, "Failed to check and update stale running status", "err", err) - } - } - dagStatus = st - - return *dagStatus, nil + return *st, nil } // ListRecentStatus retrieves the n most recent statuses for a DAG by name. diff --git a/internal/dagrun/manager_test.go b/internal/dagrun/manager_test.go index e8dbe152..37289845 100644 --- a/internal/dagrun/manager_test.go +++ b/internal/dagrun/manager_test.go @@ -158,56 +158,6 @@ steps: err := cli.UpdateStatus(ctx, root, status) require.Error(t, err) }) - t.Run("OrphanedRunningStatus", func(t *testing.T) { - // Test that when a DAG is marked as running but has no alive process, - // GetLatestStatus correctly updates it to error status - dag := th.DAG(t, `steps: - - name: "1" - command: "sleep 10" -`) - ctx := th.Context - - // Start a DAG run - err := th.DAGRunMgr.StartDAGRunAsync(ctx, dag.DAG, dagrun.StartOptions{}) - require.NoError(t, err) - - // Wait for it to start running - time.Sleep(200 * time.Millisecond) - - // Get the latest status - should be running - dagRunStatus, err := th.DAGRunMgr.GetLatestStatus(ctx, dag.DAG) - require.NoError(t, err) - require.Equal(t, status.Running.String(), dagRunStatus.Status.String()) - - // Now simulate the process dying by removing the process file - // First, stop the DAG to kill the process - err = th.DAGRunMgr.Stop(ctx, dag.DAG, dagRunStatus.DAGRunID) - require.NoError(t, err) - - // Wait for process to be cleaned up - time.Sleep(100 * time.Millisecond) - - // Manually update the status back to running to simulate orphaned state - dagRunRef := digraph.NewDAGRunRef(dag.Name, dagRunStatus.DAGRunID) - attempt, err := th.DAGRunStore.FindAttempt(ctx, dagRunRef) - require.NoError(t, err) - - // Force status to running - runningStatus := dagRunStatus - runningStatus.Status = status.Running - err = attempt.Open(ctx) - require.NoError(t, err) - err = attempt.Write(ctx, runningStatus) - require.NoError(t, err) - err = attempt.Close(ctx) - require.NoError(t, err) - - // Now when we get the latest status, it should detect the process is dead - // and update the status to error - dagRunStatus, err = th.DAGRunMgr.GetLatestStatus(ctx, dag.DAG) - require.NoError(t, err) - require.Equal(t, status.Error.String(), dagRunStatus.Status.String()) - }) } func TestClient_RunDAG(t *testing.T) { diff --git a/internal/frontend/api/v2/dags.go b/internal/frontend/api/v2/dags.go index a6f25c5b..7fd733ed 100644 --- a/internal/frontend/api/v2/dags.go +++ b/internal/frontend/api/v2/dags.go @@ -465,7 +465,15 @@ func (a *API) ExecuteDAG(ctx context.Context, request api.ExecuteDAGRequestObjec } } - dagRunId := valueOf(request.Body.DagRunId) + var dagRunId, params string + var singleton bool + + if request.Body != nil { + dagRunId = valueOf(request.Body.DagRunId) + params = valueOf(request.Body.Params) + singleton = valueOf(request.Body.Singleton) + } + if dagRunId == "" { var err error dagRunId, err = a.dagRunMgr.GenDAGRunID(ctx) @@ -488,7 +496,7 @@ func (a *API) ExecuteDAG(ctx context.Context, request api.ExecuteDAGRequestObjec } // Check singleton flag - if enabled and DAG is already running, return 409 - if valueOf(request.Body.Singleton) { + if singleton { dagStatus, err := a.dagRunMgr.GetLatestStatus(ctx, dag) if err == nil && dagStatus.Status == status.Running { return nil, &Error{ @@ -499,7 +507,7 @@ func (a *API) ExecuteDAG(ctx context.Context, request api.ExecuteDAGRequestObjec } } - if err := a.startDAGRun(ctx, dag, valueOf(request.Body.Params), dagRunId, valueOf(request.Body.Singleton)); err != nil { + if err := a.startDAGRun(ctx, dag, params, dagRunId, singleton); err != nil { return nil, fmt.Errorf("error starting dag-run: %w", err) } @@ -513,7 +521,7 @@ func (a *API) startDAGRun(ctx context.Context, dag *digraph.DAG, params, dagRunI Params: params, DAGRunID: dagRunID, Quiet: true, - Immediate: true, + Immediate: false, Singleton: singleton, }); err != nil { return fmt.Errorf("error starting DAG: %w", err) diff --git a/internal/models/proc.go b/internal/models/proc.go index b203e1e3..cc99d431 100644 --- a/internal/models/proc.go +++ b/internal/models/proc.go @@ -8,6 +8,10 @@ import ( // ProcStore is an interface for managing process storage. type ProcStore interface { + // Lock try to lock process group return error if it's held by another process + TryLock(ctx context.Context, groupName string) error + // UnLock unlocks process group + Unlock(ctx context.Context, groupName string) // Acquire creates a new process for a given group name and DAG-run reference. // It automatically starts the heartbeat for the process. Acquire(ctx context.Context, groupName string, dagRun digraph.DAGRunRef) (ProcHandle, error) diff --git a/internal/persistence/dirlock/dirlock.go b/internal/persistence/dirlock/dirlock.go index 6dff562a..9594e62b 100644 --- a/internal/persistence/dirlock/dirlock.go +++ b/internal/persistence/dirlock/dirlock.go @@ -8,13 +8,8 @@ import ( "fmt" "os" "path/filepath" - "strconv" - "strings" "sync" "time" - - "github.com/dagu-org/dagu/internal/fileutil" - "github.com/dagu-org/dagu/internal/logger" ) // Error types for lock operations @@ -78,7 +73,6 @@ type LockInfo struct { // dirLock implements the DirLock interface type dirLock struct { - id string targetDir string lockPath string opts *LockOptions @@ -87,7 +81,6 @@ type dirLock struct { } // New creates a new directory lock instance - func New(directory string, opts *LockOptions) DirLock { // Set default options if not provided if opts == nil { @@ -101,17 +94,16 @@ func New(directory string, opts *LockOptions) DirLock { } return &dirLock{ - id: generateID(), targetDir: directory, + lockPath: filepath.Join(directory, ".dagu_lock"), opts: opts, } } -// Heartbeat updates the lock's last heartbeat time to prevent it from being +// Heartbeat updates the lock's modification time to prevent it from being // considered stale. This is an atomic operation that should be called -// periodically while the lock is held to keep it alive. This removes the old -// lock path and creates a new one with the current timestamp. -func (l *dirLock) Heartbeat(ctx context.Context) error { +// periodically while the lock is held to keep it alive. +func (l *dirLock) Heartbeat(_ context.Context) error { l.mu.Lock() defer l.mu.Unlock() @@ -119,26 +111,10 @@ func (l *dirLock) Heartbeat(ctx context.Context) error { return ErrNotLocked } - if l.lockPath == "" { - return errors.New("lock path is empty") - } - - // Create new lock path with current timestamp - lockName := fmt.Sprintf(".dagu_lock.%s.%d", l.id, time.Now().UnixNano()) - newLockPath := filepath.Join(l.targetDir, lockName) - - // Create new lock directory first - err := os.Mkdir(newLockPath, 0700) - if err != nil { - return fmt.Errorf("failed to create new lock directory: %w", err) - } - - // Update the lock path to the new one - l.lockPath = newLockPath - - // Clean stale locks - if err := l.cleanStale(); err != nil { - logger.Errorf(ctx, "Failed to clean stale locks: %v", err) + // Touch the lock directory to update its modification time + now := time.Now() + if err := os.Chtimes(l.lockPath, now, now); err != nil { + return fmt.Errorf("failed to update lock timestamp: %w", err) } return nil @@ -153,23 +129,21 @@ func (l *dirLock) TryLock() error { return nil // Already held by us } - // Clean up any stale locks first - if err := l.cleanStale(); err != nil { - return fmt.Errorf("failed to clean stale locks: %w", err) - } - - // Check for any existing non-stale locks - entries, err := os.ReadDir(l.targetDir) - if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to read directory: %w", err) - } - - for _, entry := range entries { - if entry.IsDir() && strings.HasPrefix(entry.Name(), ".dagu_lock.") { - if !l.isStale(entry.Name()) { - return ErrLockConflict + // Check if lock exists + info, err := os.Stat(l.lockPath) + if err == nil { + // Lock exists, check if it's stale + if l.isStaleInfo(info) { + // Remove stale lock + if err := os.RemoveAll(l.lockPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove stale lock: %w", err) } + } else { + // Lock is held by another process + return ErrLockConflict } + } else if !os.IsNotExist(err) { + return fmt.Errorf("failed to check lock status: %w", err) } // Ensure the target directory exists @@ -177,10 +151,7 @@ func (l *dirLock) TryLock() error { return fmt.Errorf("failed to create target directory: %w", err) } - // Create lock directory with timestamp only - lockName := fmt.Sprintf(".dagu_lock.%s.%d", l.id, time.Now().UnixNano()) - l.lockPath = filepath.Join(l.targetDir, lockName) - + // Try to create the lock directory err = os.Mkdir(l.lockPath, 0700) if err != nil { if os.IsExist(err) { @@ -228,230 +199,84 @@ func (l *dirLock) Unlock() error { return nil } - // Remove all lock directories belonging to this instance - entries, err := os.ReadDir(l.targetDir) - if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to read directory: %w", err) - } - - var removeErrors []error - for _, entry := range entries { - if entry.IsDir() && strings.HasPrefix(entry.Name(), ".dagu_lock.") { - // Check if this lock belongs to us - id, err := getID(entry.Name()) - if err != nil { - continue // Invalid lock file, skip - } - if id == l.id { - // This is our lock, remove it - lockPath := filepath.Join(l.targetDir, entry.Name()) - if err := os.RemoveAll(lockPath); err != nil && !os.IsNotExist(err) { - removeErrors = append(removeErrors, fmt.Errorf("failed to remove lock %s: %w", lockPath, err)) - } - } - } - } - - // If we had any errors removing locks, return the first one - if len(removeErrors) > 0 { - return removeErrors[0] + // Remove the lock directory + if err := os.RemoveAll(l.lockPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove lock directory: %w", err) } l.isHeld = false - l.lockPath = "" return nil } // IsLocked checks if directory is currently locked func (l *dirLock) IsLocked() bool { - entries, err := os.ReadDir(l.targetDir) + info, err := os.Stat(l.lockPath) if err != nil { return false } - for _, entry := range entries { - if entry.IsDir() && strings.HasPrefix(entry.Name(), ".dagu_lock.") { - // Check if this lock is stale - lockPath := filepath.Join(l.targetDir, entry.Name()) - if !l.isStale(entry.Name()) { - return true - } - // Clean up stale lock - _ = os.RemoveAll(lockPath) - } + // Check if lock is stale + if l.isStaleInfo(info) { + // Clean up stale lock + _ = os.RemoveAll(l.lockPath) + return false } - return false + return true } // IsHeldByMe checks if this instance holds the lock func (l *dirLock) IsHeldByMe() bool { l.mu.Lock() defer l.mu.Unlock() + if !l.isHeld { return false } - // check if the file exists + // Check if the lock directory still exists _, err := os.Stat(l.lockPath) if os.IsNotExist(err) { - // If the lock path does not exist, it means it was removed + // Lock was removed externally l.isHeld = false return false } - // check if newer lock exists - entries, err := os.ReadDir(l.targetDir) - if err != nil { - return false - } - - ourTimestamp, err := getTimestamp(filepath.Base(l.lockPath)) - if err != nil { - return false // Invalid lock path, assume not held - } - - for _, entry := range entries { - if entry.IsDir() && strings.HasPrefix(entry.Name(), ".dagu_lock.") { - id, err := getID(entry.Name()) - if err != nil { - continue // Invalid lock file, skip - } - if id == l.id { - continue // This is our lock - } - timestamp, err := getTimestamp(entry.Name()) - if err != nil { - continue // Invalid timestamp, skip - } - if timestamp > ourTimestamp { - // Found a newer lock, we don't hold the lock - return false - } - } - } - return true } // Info returns information about current lock holder func (l *dirLock) Info() (*LockInfo, error) { - entries, err := os.ReadDir(l.targetDir) + info, err := os.Stat(l.lockPath) if err != nil { - return nil, fmt.Errorf("failed to read directory: %w", err) - } - - for _, entry := range entries { - if entry.IsDir() && strings.HasPrefix(entry.Name(), ".dagu_lock.") { - if !l.isStale(entry.Name()) { - // Extract timestamp from lock directory name - // Format: .dagu_lock.. - timestamp, err := getTimestamp(entry.Name()) - if err != nil { - continue - } - - return &LockInfo{ - AcquiredAt: time.Unix(0, timestamp), - LockDirName: entry.Name(), - }, nil - } + if os.IsNotExist(err) { + return nil, nil } + return nil, fmt.Errorf("failed to get lock info: %w", err) } - return nil, nil + if l.isStaleInfo(info) { + // Lock is stale + return nil, nil + } + + return &LockInfo{ + AcquiredAt: info.ModTime(), + LockDirName: ".dagu_lock", + }, nil } // ForceUnlock forcibly removes a lock (administrative operation) func ForceUnlock(directory string) error { - entries, err := os.ReadDir(directory) - if err != nil { - return fmt.Errorf("failed to read directory: %w", err) + lockPath := filepath.Join(directory, ".dagu_lock") + if err := os.RemoveAll(lockPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to force unlock: %w", err) } - - for _, entry := range entries { - if entry.IsDir() && strings.HasPrefix(entry.Name(), ".dagu_lock.") { - lockPath := filepath.Join(directory, entry.Name()) - if err := os.RemoveAll(lockPath); err != nil { - return fmt.Errorf("failed to remove lock directory %s: %w", lockPath, err) - } - } - } - return nil } -// cleanStale removes any stale locks -func (l *dirLock) cleanStale() error { - entries, err := os.ReadDir(l.targetDir) - if err != nil { - if os.IsNotExist(err) { - return nil // Directory doesn't exist yet - } - return fmt.Errorf("failed to read directory: %w", err) - } - - for _, entry := range entries { - if entry.IsDir() && strings.HasPrefix(entry.Name(), ".dagu_lock.") { - if l.isStale(entry.Name()) { - lockPath := filepath.Join(l.targetDir, entry.Name()) - if err := os.RemoveAll(lockPath); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to remove stale lock %s: %w", lockPath, err) - } - } - } - } - - return nil -} - -// isStale checks if a lock directory is stale based on age -func (l *dirLock) isStale(lockDirName string) bool { - timestamp, err := getTimestamp(lockDirName) - if err != nil { - return true // Invalid timestamp - } - - // Check age - age := time.Now().UnixNano() - timestamp - return age > int64(l.opts.StaleThreshold.Nanoseconds()) -} - -// getTimestamp extracts the timestamp from a lock directory name -func getTimestamp(lockDirName string) (int64, error) { - // Format: .dagu_lock.. - parts := strings.Split(lockDirName, ".") - if len(parts) != 4 { - return 0, ErrInvalidLockFile - } - - timestamp, err := strconv.ParseInt(parts[3], 10, 64) - if err != nil { - return 0, ErrInvalidLockFile - } - - return timestamp, nil -} - -// getID extracts the identifier from the lock directory name -func getID(lockDirName string) (string, error) { - // Format: .dagu_lock.. - parts := strings.Split(lockDirName, ".") - if len(parts) != 4 { - return "", ErrInvalidLockFile - } - - return parts[2], nil -} - -func generateID() string { - // Generate an identifier for this lock instance - host, err := os.Hostname() - if err != nil { - host = "unknown" - } - pid := os.Getpid() - id := fileutil.SafeName(fmt.Sprintf("%s@%d", host, pid)) - // replace '.' with '_' to avoid issues in directory names - return strings.ReplaceAll(id, ".", "_") +// isStaleInfo checks if a lock is stale based on file info +func (l *dirLock) isStaleInfo(info os.FileInfo) bool { + age := time.Since(info.ModTime()) + return age > l.opts.StaleThreshold } diff --git a/internal/persistence/dirlock/dirlock_test.go b/internal/persistence/dirlock/dirlock_test.go index c1f8c63c..860aed08 100644 --- a/internal/persistence/dirlock/dirlock_test.go +++ b/internal/persistence/dirlock/dirlock_test.go @@ -2,10 +2,8 @@ package dirlock import ( "context" - "fmt" "os" "path/filepath" - "strings" "sync" "testing" "time" @@ -58,7 +56,6 @@ func TestTryLock(t *testing.T) { t.Run("lock conflict", func(t *testing.T) { lock1 := New(tmpDir, nil) - lock2 := New(tmpDir, nil) // First lock succeeds @@ -153,7 +150,6 @@ func TestLock(t *testing.T) { t.Run("context cancellation", func(t *testing.T) { lock1 := New(tmpDir, nil) - lock2 := New(tmpDir, nil) // First lock acquired @@ -221,7 +217,6 @@ func TestIsLocked(t *testing.T) { t.Run("with lock", func(t *testing.T) { lock1 := New(tmpDir, nil) - lock2 := New(tmpDir, nil) err := lock1.TryLock() @@ -243,9 +238,13 @@ func TestStaleDetection(t *testing.T) { t.Run("clean stale lock", func(t *testing.T) { // Create a stale lock manually - staleLockName := fmt.Sprintf(".dagu_lock.%d", time.Now().Add(-60*time.Second).UnixNano()) - staleLockPath := filepath.Join(tmpDir, staleLockName) - err := os.Mkdir(staleLockPath, 0700) + lockPath := filepath.Join(tmpDir, ".dagu_lock") + err := os.Mkdir(lockPath, 0700) + require.NoError(t, err) + + // Set modification time to past + pastTime := time.Now().Add(-60 * time.Second) + err = os.Chtimes(lockPath, pastTime, pastTime) require.NoError(t, err) lock := New(tmpDir, &LockOptions{ @@ -256,9 +255,8 @@ func TestStaleDetection(t *testing.T) { err = lock.TryLock() require.NoError(t, err) - // Verify stale lock was removed - _, err = os.Stat(staleLockPath) - require.True(t, os.IsNotExist(err)) + // Verify lock is held + require.True(t, lock.IsHeldByMe()) // Cleanup err = lock.Unlock() @@ -339,9 +337,11 @@ func TestHeartbeat(t *testing.T) { err := lock.TryLock() require.NoError(t, err) - // Get the dirLock to access internal state - dl := lock.(*dirLock) - initialLockPath := dl.lockPath + // Get initial lock info + info1, err := lock.Info() + require.NoError(t, err) + require.NotNil(t, info1) + initialTime := info1.AcquiredAt // Wait a bit to ensure timestamp difference time.Sleep(10 * time.Millisecond) @@ -350,8 +350,13 @@ func TestHeartbeat(t *testing.T) { err = lock.Heartbeat(context.Background()) require.NoError(t, err) - // Verify lock path was updated (new lock file created) - require.NotEqual(t, initialLockPath, dl.lockPath) + // Get updated lock info + info2, err := lock.Info() + require.NoError(t, err) + require.NotNil(t, info2) + + // Verify timestamp was updated + require.True(t, info2.AcquiredAt.After(initialTime)) // Verify lock is still held require.True(t, lock.IsHeldByMe()) @@ -428,41 +433,33 @@ func TestEdgeCases(t *testing.T) { require.NoError(t, err) }) - t.Run("invalid lock directory format", func(t *testing.T) { + t.Run("info returns correct data", func(t *testing.T) { tmpDir := t.TempDir() - - // Create invalid lock directories - invalidNames := []string{ - ".dagu_lock", - ".dagu_lock.", - ".dagu_lock.abc", - ".dagu_lock.123.456", - } - - for _, name := range invalidNames { - err := os.Mkdir(filepath.Join(tmpDir, name), 0700) - require.NoError(t, err) - } - lock := New(tmpDir, nil) - // Should clean up invalid locks and succeed - err := lock.TryLock() + // No lock initially + info, err := lock.Info() + require.NoError(t, err) + require.Nil(t, info) + + // Acquire lock + err = lock.TryLock() require.NoError(t, err) - // Verify all invalid locks were removed - entries, err := os.ReadDir(tmpDir) + // Get info + info, err = lock.Info() require.NoError(t, err) + require.NotNil(t, info) + require.Equal(t, ".dagu_lock", info.LockDirName) + require.WithinDuration(t, time.Now(), info.AcquiredAt, 1*time.Second) - validLockCount := 0 - for _, entry := range entries { - if strings.HasPrefix(entry.Name(), ".dagu_lock.") { - validLockCount++ - } - } - require.Equal(t, 1, validLockCount) - + // Cleanup err = lock.Unlock() require.NoError(t, err) + + // No lock after unlock + info, err = lock.Info() + require.NoError(t, err) + require.Nil(t, info) }) } diff --git a/internal/persistence/filedagrun/store.go b/internal/persistence/filedagrun/store.go index e22612f1..716ed291 100644 --- a/internal/persistence/filedagrun/store.go +++ b/internal/persistence/filedagrun/store.go @@ -189,7 +189,9 @@ func (store *Store) collectStatusesFromRoots( run, err := dagRun.LatestAttempt(ctx, store.cache) if err != nil { - logger.Error(ctx, "Failed to get latest run", "err", err) + if !errors.Is(err, models.ErrNoStatusData) { + logger.Error(ctx, "Failed to get latest run", "err", err) + } continue } diff --git a/internal/persistence/fileproc/procgrp.go b/internal/persistence/fileproc/procgrp.go index d335b398..adff826d 100644 --- a/internal/persistence/fileproc/procgrp.go +++ b/internal/persistence/fileproc/procgrp.go @@ -13,10 +13,13 @@ import ( "github.com/dagu-org/dagu/internal/digraph" "github.com/dagu-org/dagu/internal/logger" "github.com/dagu-org/dagu/internal/models" + "github.com/dagu-org/dagu/internal/persistence/dirlock" ) // ProcGroup is a struct that manages process files for a given DAG name. type ProcGroup struct { + dirlock.DirLock + groupName string baseDir string staleTime time.Duration @@ -31,7 +34,12 @@ var procFileRegex = regexp.MustCompile(`^proc_\d{8}_\d{6}Z_.*\.proc$`) // NewProcGroup creates a new instance of a ProcGroup with the specified base directory and DAG name. func NewProcGroup(baseDir, groupName string, staleTime time.Duration) *ProcGroup { + dirLock := dirlock.New(baseDir, &dirlock.LockOptions{ + StaleThreshold: 5 * time.Second, + RetryInterval: 100 * time.Millisecond, + }) return &ProcGroup{ + DirLock: dirLock, baseDir: baseDir, groupName: groupName, staleTime: staleTime, diff --git a/internal/persistence/fileproc/store.go b/internal/persistence/fileproc/store.go index 642a295b..bd6147e3 100644 --- a/internal/persistence/fileproc/store.go +++ b/internal/persistence/fileproc/store.go @@ -2,12 +2,12 @@ package fileproc import ( "context" - "fmt" "path/filepath" "sync" "time" "github.com/dagu-org/dagu/internal/digraph" + "github.com/dagu-org/dagu/internal/logger" "github.com/dagu-org/dagu/internal/models" ) @@ -28,36 +28,35 @@ func New(baseDir string) *Store { } } +// Lock locks process group +func (s *Store) TryLock(_ context.Context, groupName string) error { + procGroup := s.newProcGroup(groupName) + return procGroup.TryLock() +} + +// Lock locks process group +func (s *Store) Unlock(ctx context.Context, groupName string) { + procGroup := s.newProcGroup(groupName) + if err := procGroup.Unlock(); err != nil { + logger.Error(ctx, "Failed to unlock the proc group", "err", err) + } +} + // CountAlive implements models.ProcStore. func (s *Store) CountAlive(ctx context.Context, groupName string) (int, error) { - pgBaseDir := filepath.Join(s.baseDir, groupName) - pg, _ := s.procGroups.LoadOrStore(groupName, NewProcGroup(pgBaseDir, groupName, s.staleTime)) - procGroup, ok := pg.(*ProcGroup) - if !ok { - return 0, fmt.Errorf("invalid type in procGroups map: expected *ProcGroup, got %T", pg) - } + procGroup := s.newProcGroup(groupName) return procGroup.Count(ctx) } // ListAlive implements models.ProcStore. func (s *Store) ListAlive(ctx context.Context, groupName string) ([]digraph.DAGRunRef, error) { - pgBaseDir := filepath.Join(s.baseDir, groupName) - pg, _ := s.procGroups.LoadOrStore(groupName, NewProcGroup(pgBaseDir, groupName, s.staleTime)) - procGroup, ok := pg.(*ProcGroup) - if !ok { - return nil, fmt.Errorf("invalid type in procGroups map: expected *ProcGroup, got %T", pg) - } + procGroup := s.newProcGroup(groupName) return procGroup.ListAlive(ctx) } // Acquire implements models.ProcStore. func (s *Store) Acquire(ctx context.Context, groupName string, dagRun digraph.DAGRunRef) (models.ProcHandle, error) { - pgBaseDir := filepath.Join(s.baseDir, groupName) - pg, _ := s.procGroups.LoadOrStore(groupName, NewProcGroup(pgBaseDir, groupName, s.staleTime)) - procGroup, ok := pg.(*ProcGroup) - if !ok { - return nil, fmt.Errorf("invalid type in procGroups map: expected *ProcGroup, got %T", pg) - } + procGroup := s.newProcGroup(groupName) proc, err := procGroup.Acquire(ctx, dagRun) if err != nil { return nil, err @@ -70,11 +69,19 @@ func (s *Store) Acquire(ctx context.Context, groupName string, dagRun digraph.DA // IsRunAlive implements models.ProcStore. func (s *Store) IsRunAlive(ctx context.Context, groupName string, dagRun digraph.DAGRunRef) (bool, error) { - pgBaseDir := filepath.Join(s.baseDir, groupName) - pg, _ := s.procGroups.LoadOrStore(groupName, NewProcGroup(pgBaseDir, groupName, s.staleTime)) - procGroup, ok := pg.(*ProcGroup) - if !ok { - return false, fmt.Errorf("invalid type in procGroups map: expected *ProcGroup, got %T", pg) - } + procGroup := s.newProcGroup(groupName) return procGroup.IsRunAlive(ctx, dagRun) } + +func (s *Store) newProcGroup(groupName string) *ProcGroup { + // Check if the ProcGroup already exists + if pg, ok := s.procGroups.Load(groupName); ok { + return pg.(*ProcGroup) + } + + // Create a new ProcGroup only if it doesn't exist + pgBaseDir := filepath.Join(s.baseDir, groupName) + newPG := NewProcGroup(pgBaseDir, groupName, s.staleTime) + pg, _ := s.procGroups.LoadOrStore(groupName, newPG) + return pg.(*ProcGroup) +} diff --git a/internal/scheduler/dag_executor.go b/internal/scheduler/dag_executor.go index 9648e016..505ba258 100644 --- a/internal/scheduler/dag_executor.go +++ b/internal/scheduler/dag_executor.go @@ -81,7 +81,7 @@ func (e *DAGExecutor) HandleJob( if e.shouldUseDistributedExecution(dag) && operation == coordinatorv1.Operation_OPERATION_START { logger.Info(ctx, "Enqueueing DAG for distributed execution", "dag", dag.Name, - "runID", runID, + "runId", runID, "workerSelector", dag.WorkerSelector) if err := e.dagRunManager.EnqueueDAGRun(ctx, dag, dagrun.EnqueueOptions{ diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index ff6dc757..ee4eb392 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -403,6 +403,9 @@ func (s *Scheduler) handleQueue(ctx context.Context, ch chan models.QueuedItem, goto SEND_RESULT } + // Wait until the DAG to be alive + time.Sleep(500 * time.Millisecond) + // Wait for the DAG to be picked up by checking process heartbeat WAIT_FOR_RUN: for { @@ -414,16 +417,34 @@ func (s *Scheduler) handleQueue(ctx context.Context, ch chan models.QueuedItem, } else if isAlive { // Process has started and has heartbeat logger.Info(ctx, "DAG run has started (heartbeat detected)", "data", data) - result = models.QueuedItemProcessingResultDiscard + result = models.QueuedItemProcessingResultSuccess break WAIT_FOR_RUN } // Check timeout - if time.Since(startedAt) > 10*time.Second { + if time.Since(startedAt) > 30*time.Second { logger.Error(ctx, "Cancelling due to timeout waiting for the run to be alive (10sec)", "data", data) + + // Somehow it's failed to execute. Mark it failed and discard from queue. if err := s.markStatusFailed(ctx, attempt); err != nil { logger.Error(ctx, "Failed to mark the status cancelled") } + + logger.Info(ctx, "Discard the queue item due to timeout", "data", data) + result = models.QueuedItemProcessingResultDiscard + break WAIT_FOR_RUN + } + + // Check status if it's already finished + st, err = attempt.ReadStatus(ctx) + if err != nil { + logger.Error(ctx, "Failed to read status. Is it corrupted?", "err", err, "data", data) + result = models.QueuedItemProcessingResultDiscard + break WAIT_FOR_RUN + } + + if st.Status != status.Queued { + logger.Info(ctx, "Looks like the DAG is already executed", "data", data, "status", st.Status.String()) result = models.QueuedItemProcessingResultDiscard break WAIT_FOR_RUN } @@ -461,6 +482,7 @@ func (s *Scheduler) markStatusFailed(ctx context.Context, attempt models.DAGRunA }() if st.Status != status.Queued { logger.Info(ctx, "Tried to mark a queued item 'cancelled' but it's different status now", "status", st.Status.String()) + return nil } st.Status = status.Cancel // Mark it cancel if err := attempt.Write(ctx, *st); err != nil { diff --git a/internal/scheduler/zombie_detector_test.go b/internal/scheduler/zombie_detector_test.go index f0f1f151..416c99b1 100644 --- a/internal/scheduler/zombie_detector_test.go +++ b/internal/scheduler/zombie_detector_test.go @@ -412,11 +412,22 @@ func (m *mockDAGRunStore) RemoveDAGRun(ctx context.Context, dagRun digraph.DAGRu return args.Error(0) } +var _ models.ProcStore = (*mockProcStore)(nil) + // Mock ProcStore type mockProcStore struct { mock.Mock } +// TryLock implements models.ProcStore. +func (m *mockProcStore) TryLock(_ context.Context, _ string) error { + return nil +} + +// Unlock implements models.ProcStore. +func (m *mockProcStore) Unlock(_ context.Context, _ string) { +} + func (m *mockProcStore) Acquire(ctx context.Context, groupName string, dagRun digraph.DAGRunRef) (models.ProcHandle, error) { args := m.Called(ctx, groupName, dagRun) return args.Get(0).(models.ProcHandle), args.Error(1) diff --git a/internal/test/setup.go b/internal/test/setup.go index d6bad644..b42e5e45 100644 --- a/internal/test/setup.go +++ b/internal/test/setup.go @@ -78,6 +78,9 @@ func Setup(t *testing.T, opts ...HelperOption) Helper { // Set the log level to debug _ = os.Setenv("DEBUG", "true") + // Set the CI flag + _ = os.Setenv("CI", "true") + // Disable the DAG run queue for tests _ = os.Setenv("DISABLE_DAG_RUN_QUEUE", "true") @@ -381,7 +384,6 @@ func (d *DAG) Agent(opts ...AgentOption) *Agent { d.DAGRunMgr, d.DAGStore, d.DAGRunStore, - d.ProcStore, d.ServiceRegistry, root, helper.opts,