Refactor internal packages into layered architecture (#1319)

This commit is contained in:
YotaHamada 2025-10-13 03:52:00 +09:00 committed by GitHub
parent 17e719382d
commit 85439bc71e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
380 changed files with 5958 additions and 5937 deletions

View File

@ -45,7 +45,7 @@ jobs:
- name: Copy web assets
run: |
cp ui/dist/* ./internal/frontend/assets/
cp ui/dist/* ./internal/service/frontend/assets/
- name: Install Snapcraft
run: sudo snap install snapcraft --classic

View File

@ -40,7 +40,7 @@ jobs:
- name: Copy web assets
run: |
cp ui/dist/* ./internal/frontend/assets/
cp ui/dist/* ./internal/service/frontend/assets/
- name: Install Snapcraft
run: sudo snap install snapcraft --classic

6
.gitignore vendored
View File

@ -1,7 +1,7 @@
# frontend assets
internal/frontend/assets/fonts/*
internal/frontend/assets/*
!internal/frontend/assets/.gitkeep
internal/service/frontend/assets/fonts/*
internal/service/frontend/assets/*
!internal/service/frontend/assets/.gitkeep
# NVM
.nvmrc

View File

@ -19,7 +19,7 @@ ARG TARGETARCH
WORKDIR /app
COPY . .
RUN go mod download && rm -rf frontend/assets
COPY --from=ui-builder /app/dist/ ./internal/frontend/assets/
COPY --from=ui-builder /app/dist/ ./internal/service/frontend/assets/
RUN GOOS=$TARGETOS GOARCH=$TARGETARCH go build -ldflags="${LDFLAGS}" -o ./bin/dagu ./cmd
# Stage 3: Final Image

View File

@ -19,7 +19,7 @@ ARG TARGETARCH
WORKDIR /app
COPY . .
RUN go mod download && rm -rf frontend/assets
COPY --from=ui-builder /app/dist/ ./internal/frontend/assets/
COPY --from=ui-builder /app/dist/ ./internal/service/frontend/assets/
RUN GOOS=$TARGETOS GOARCH=$TARGETARCH go build -ldflags="${LDFLAGS}" -o ./bin/dagu ./cmd
# Stage 3: Final Image

View File

@ -19,7 +19,7 @@ ARG TARGETARCH
WORKDIR /app
COPY . .
RUN go mod download && rm -rf frontend/assets
COPY --from=ui-builder /app/dist/ ./internal/frontend/assets/
COPY --from=ui-builder /app/dist/ ./internal/service/frontend/assets/
RUN GOOS=$TARGETOS GOARCH=$TARGETARCH go build -ldflags="${LDFLAGS}" -o ./bin/dagu ./cmd
# Stage 3: Final Image

View File

@ -49,7 +49,7 @@ OAPI_CONFIG_FILE_V1=${OAPI_SPEC_DIR_V1}/config.yaml
# Frontend directories
FE_DIR=./internal/frontend
FE_DIR=./internal/service/frontend
FE_GEN_DIR=${FE_DIR}/gen
FE_ASSETS_DIR=${FE_DIR}/assets
FE_BUILD_DIR=./ui/dist

View File

@ -4,7 +4,7 @@
# Build UI assets
echo "Building UI assets..."
cd ui && pnpm install --frozen-lockfile && pnpm build && cd ..
cp -r ui/dist/* internal/frontend/assets/
cp -r ui/dist/* internal/service/frontend/assets/
# Build for Windows platforms
echo "Cross-compiling for Windows x86-64..."

View File

@ -1,13 +0,0 @@
package main
import (
"os"
"os/signal"
"syscall"
)
func main() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT)
<-ch
}

View File

@ -4,10 +4,10 @@ import (
"os"
"github.com/dagu-org/dagu/internal/cmd"
"github.com/dagu-org/dagu/internal/config"
"github.com/dagu-org/dagu/internal/common/config"
"github.com/spf13/cobra"
_ "github.com/dagu-org/dagu/internal/digraph/executor" // Register built-in executors
_ "github.com/dagu-org/dagu/internal/runtime/builtin" // Register built-in executors
)
var rootCmd = &cobra.Command{
@ -28,22 +28,22 @@ func main() {
}
func init() {
rootCmd.AddCommand(cmd.CmdStart())
rootCmd.AddCommand(cmd.CmdEnqueue())
rootCmd.AddCommand(cmd.CmdDequeue())
rootCmd.AddCommand(cmd.CmdStop())
rootCmd.AddCommand(cmd.CmdRestart())
rootCmd.AddCommand(cmd.CmdDry())
rootCmd.AddCommand(cmd.CmdValidate())
rootCmd.AddCommand(cmd.CmdStatus())
rootCmd.AddCommand(cmd.CmdVersion())
rootCmd.AddCommand(cmd.CmdServer())
rootCmd.AddCommand(cmd.CmdScheduler())
rootCmd.AddCommand(cmd.Start())
rootCmd.AddCommand(cmd.Enqueue())
rootCmd.AddCommand(cmd.Dequeue())
rootCmd.AddCommand(cmd.Stop())
rootCmd.AddCommand(cmd.Restart())
rootCmd.AddCommand(cmd.Dry())
rootCmd.AddCommand(cmd.Validate())
rootCmd.AddCommand(cmd.Status())
rootCmd.AddCommand(cmd.Version())
rootCmd.AddCommand(cmd.Server())
rootCmd.AddCommand(cmd.Scheduler())
rootCmd.AddCommand(cmd.CmdCoordinator())
rootCmd.AddCommand(cmd.CmdWorker())
rootCmd.AddCommand(cmd.CmdRetry())
rootCmd.AddCommand(cmd.CmdStartAll())
rootCmd.AddCommand(cmd.CmdMigrate())
rootCmd.AddCommand(cmd.Retry())
rootCmd.AddCommand(cmd.StartAll())
rootCmd.AddCommand(cmd.Migrate())
config.Version = version
}

View File

@ -6,7 +6,7 @@ import (
"testing"
"github.com/dagu-org/dagu/internal/cmd"
"github.com/dagu-org/dagu/internal/config"
"github.com/dagu-org/dagu/internal/common/config"
"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
)
@ -70,20 +70,20 @@ func TestRootCommand(t *testing.T) {
rootCmd.ResetCommands()
// Re-add commands
rootCmd.AddCommand(cmd.CmdStart())
rootCmd.AddCommand(cmd.CmdEnqueue())
rootCmd.AddCommand(cmd.CmdDequeue())
rootCmd.AddCommand(cmd.CmdStop())
rootCmd.AddCommand(cmd.CmdRestart())
rootCmd.AddCommand(cmd.CmdDry())
rootCmd.AddCommand(cmd.CmdValidate())
rootCmd.AddCommand(cmd.CmdStatus())
rootCmd.AddCommand(cmd.CmdVersion())
rootCmd.AddCommand(cmd.CmdServer())
rootCmd.AddCommand(cmd.CmdScheduler())
rootCmd.AddCommand(cmd.CmdRetry())
rootCmd.AddCommand(cmd.CmdStartAll())
rootCmd.AddCommand(cmd.CmdMigrate())
rootCmd.AddCommand(cmd.Start())
rootCmd.AddCommand(cmd.Enqueue())
rootCmd.AddCommand(cmd.Dequeue())
rootCmd.AddCommand(cmd.Stop())
rootCmd.AddCommand(cmd.Restart())
rootCmd.AddCommand(cmd.Dry())
rootCmd.AddCommand(cmd.Validate())
rootCmd.AddCommand(cmd.Status())
rootCmd.AddCommand(cmd.Version())
rootCmd.AddCommand(cmd.Server())
rootCmd.AddCommand(cmd.Scheduler())
rootCmd.AddCommand(cmd.Retry())
rootCmd.AddCommand(cmd.StartAll())
rootCmd.AddCommand(cmd.Migrate())
// Set args
rootCmd.SetArgs(tt.args[1:]) // Skip program name
@ -232,18 +232,18 @@ operations, or remote commands.
}
// Re-add all commands
rootCmd.AddCommand(cmd.CmdStart())
rootCmd.AddCommand(cmd.CmdEnqueue())
rootCmd.AddCommand(cmd.CmdDequeue())
rootCmd.AddCommand(cmd.CmdStop())
rootCmd.AddCommand(cmd.CmdRestart())
rootCmd.AddCommand(cmd.CmdDry())
rootCmd.AddCommand(cmd.CmdValidate())
rootCmd.AddCommand(cmd.CmdStatus())
rootCmd.AddCommand(cmd.CmdVersion())
rootCmd.AddCommand(cmd.CmdServer())
rootCmd.AddCommand(cmd.CmdScheduler())
rootCmd.AddCommand(cmd.CmdRetry())
rootCmd.AddCommand(cmd.CmdStartAll())
rootCmd.AddCommand(cmd.CmdMigrate())
rootCmd.AddCommand(cmd.Start())
rootCmd.AddCommand(cmd.Enqueue())
rootCmd.AddCommand(cmd.Dequeue())
rootCmd.AddCommand(cmd.Stop())
rootCmd.AddCommand(cmd.Restart())
rootCmd.AddCommand(cmd.Dry())
rootCmd.AddCommand(cmd.Validate())
rootCmd.AddCommand(cmd.Status())
rootCmd.AddCommand(cmd.Version())
rootCmd.AddCommand(cmd.Server())
rootCmd.AddCommand(cmd.Scheduler())
rootCmd.AddCommand(cmd.Retry())
rootCmd.AddCommand(cmd.StartAll())
rootCmd.AddCommand(cmd.Migrate())
}

View File

@ -3,7 +3,7 @@ coverage:
# Generated code
- "api"
# Ignore frontend code for now
- "internal/frontend"
- "internal/service/frontend"
# Test code
- "internal/test"
# Executor code does not measure coverage for now

View File

@ -4,14 +4,14 @@ import (
"fmt"
"os"
"github.com/dagu-org/dagu/internal/agent"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/runtime/agent"
"golang.org/x/term"
)
// ExecuteAgent runs an agent with optional progress display and handles common execution logic
func ExecuteAgent(ctx *Context, agentInstance *agent.Agent, dag *digraph.DAG, dagRunID string, logFile *os.File) error {
func ExecuteAgent(ctx *Context, agentInstance *agent.Agent, dag *core.DAG, dagRunID string, logFile *os.File) error {
// Check if progress display should be enabled
enableProgress := shouldEnableProgress(ctx)

View File

@ -5,7 +5,7 @@ import (
"os"
"testing"
"github.com/dagu-org/dagu/internal/config"
"github.com/dagu-org/dagu/internal/common/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

View File

@ -8,26 +8,27 @@ import (
"path/filepath"
"regexp"
"runtime/pprof"
"sync"
"syscall"
"time"
"github.com/dagu-org/dagu/internal/cmdutil"
"github.com/dagu-org/dagu/internal/config"
"github.com/dagu-org/dagu/internal/coordinator"
"github.com/dagu-org/dagu/internal/dagrun"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/fileutil"
"github.com/dagu-org/dagu/internal/frontend"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/metrics"
"github.com/dagu-org/dagu/internal/models"
"github.com/dagu-org/dagu/internal/common/cmdutil"
"github.com/dagu-org/dagu/internal/common/config"
"github.com/dagu-org/dagu/internal/common/fileutil"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/dagu-org/dagu/internal/common/stringutil"
"github.com/dagu-org/dagu/internal/common/telemetry"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/dagu-org/dagu/internal/persistence/filedag"
"github.com/dagu-org/dagu/internal/persistence/filedagrun"
"github.com/dagu-org/dagu/internal/persistence/fileproc"
"github.com/dagu-org/dagu/internal/persistence/filequeue"
"github.com/dagu-org/dagu/internal/persistence/fileserviceregistry"
"github.com/dagu-org/dagu/internal/scheduler"
"github.com/dagu-org/dagu/internal/stringutil"
"github.com/dagu-org/dagu/internal/runtime"
"github.com/dagu-org/dagu/internal/service/coordinator"
"github.com/dagu-org/dagu/internal/service/frontend"
"github.com/dagu-org/dagu/internal/service/scheduler"
"github.com/google/uuid"
"github.com/spf13/cobra"
"github.com/spf13/viper"
@ -42,13 +43,13 @@ type Context struct {
Config *config.Config
Quiet bool
DAGRunStore models.DAGRunStore
DAGRunMgr dagrun.Manager
ProcStore models.ProcStore
QueueStore models.QueueStore
ServiceRegistry models.ServiceRegistry
DAGRunStore execution.DAGRunStore
DAGRunMgr runtime.Manager
ProcStore execution.ProcStore
QueueStore execution.QueueStore
ServiceRegistry execution.ServiceRegistry
Proc models.ProcHandle
Proc execution.ProcHandle
}
// LogToFile creates a new logger context with a file writer.
@ -121,14 +122,14 @@ func NewContext(cmd *cobra.Command, flags []commandLineFlag) (*Context, error) {
switch cmd.Name() {
case "server", "scheduler", "start-all":
// For long-running process, we setup file cache for better performance
hc := fileutil.NewCache[*models.DAGRunStatus](0, time.Hour*12)
hc := fileutil.NewCache[*execution.DAGRunStatus](0, time.Hour*12)
hc.StartEviction(ctx)
hrOpts = append(hrOpts, filedagrun.WithHistoryFileCache(hc))
}
ps := fileproc.New(cfg.Paths.ProcDir)
drs := filedagrun.New(cfg.Paths.DAGRunsDir, hrOpts...)
drm := dagrun.New(drs, ps, cfg)
drm := runtime.NewManager(drs, ps, cfg)
qs := filequeue.New(cfg.Paths.QueueDir)
sm := fileserviceregistry.New(cfg.Paths.ServiceRegistryDir)
@ -149,7 +150,7 @@ func NewContext(cmd *cobra.Command, flags []commandLineFlag) (*Context, error) {
// NewServer creates and returns a new web UI NewServer.
// It initializes in-memory caches for DAGs and runstore, and uses them in the client.
func (c *Context) NewServer() (*frontend.Server, error) {
dc := fileutil.NewCache[*digraph.DAG](0, time.Hour*12)
dc := fileutil.NewCache[*core.DAG](0, time.Hour*12)
dc.StartEviction(c)
dr, err := c.dagStore(dc, nil)
@ -160,7 +161,7 @@ func (c *Context) NewServer() (*frontend.Server, error) {
// Create coordinator client (may be nil if not configured)
cc := c.NewCoordinatorClient()
collector := metrics.NewCollector(
collector := telemetry.NewCollector(
config.Version,
dr,
c.DAGRunStore,
@ -168,7 +169,7 @@ func (c *Context) NewServer() (*frontend.Server, error) {
c.ServiceRegistry,
)
mr := metrics.NewRegistry(collector)
mr := telemetry.NewRegistry(collector)
return frontend.NewServer(c.Config, dr, c.DAGRunStore, c.QueueStore, c.ProcStore, c.DAGRunMgr, cc, c.ServiceRegistry, mr), nil
}
@ -192,7 +193,7 @@ func (c *Context) NewCoordinatorClient() coordinator.Client {
// NewScheduler creates a new NewScheduler instance using the default client.
// It builds a DAG job manager to handle scheduled executions.
func (c *Context) NewScheduler() (*scheduler.Scheduler, error) {
cache := fileutil.NewCache[*digraph.DAG](0, time.Hour*12)
cache := fileutil.NewCache[*core.DAG](0, time.Hour*12)
cache.StartEviction(c)
dr, err := c.dagStore(cache, nil)
@ -201,7 +202,7 @@ func (c *Context) NewScheduler() (*scheduler.Scheduler, error) {
}
coordinatorCli := c.NewCoordinatorClient()
de := scheduler.NewDAGExecutor(coordinatorCli, dagrun.NewSubCmdBuilder(c.Config))
de := scheduler.NewDAGExecutor(coordinatorCli, runtime.NewSubCmdBuilder(c.Config))
m := scheduler.NewEntryReader(c.Config.Paths.DAGsDir, dr, c.DAGRunMgr, de, c.Config.Paths.Executable)
return scheduler.New(c.Config, m, c.DAGRunMgr, c.DAGRunStore, c.QueueStore, c.ProcStore, c.ServiceRegistry, coordinatorCli)
}
@ -221,7 +222,7 @@ func (c *Context) StringParam(name string) (string, error) {
// dagStore returns a new DAGRepository instance. It ensures that the directory exists
// (creating it if necessary) before returning the store.
func (c *Context) dagStore(cache *fileutil.Cache[*digraph.DAG], searchPaths []string) (models.DAGStore, error) {
func (c *Context) dagStore(cache *fileutil.Cache[*core.DAG], searchPaths []string) (execution.DAGStore, error) {
dir := c.Config.Paths.DAGsDir
_, err := os.Stat(dir)
if os.IsNotExist(err) {
@ -253,7 +254,7 @@ func (c *Context) dagStore(cache *fileutil.Cache[*digraph.DAG], searchPaths []st
// It evaluates the log directory, validates settings, creates the log directory,
// builds a filename using the current timestamp and dag-run ID, and then opens the file.
func (c *Context) OpenLogFile(
dag *digraph.DAG,
dag *core.DAG,
dagRunID string,
) (*os.File, error) {
logPath, err := c.GenLogFileName(dag, dagRunID)
@ -264,7 +265,7 @@ func (c *Context) OpenLogFile(
}
// GenLogFileName generates a log file name based on the DAG and dag-run ID.
func (c *Context) GenLogFileName(dag *digraph.DAG, dagRunID string) (string, error) {
func (c *Context) GenLogFileName(dag *core.DAG, dagRunID string) (string, error) {
// Read the global configuration for log directory.
baseLogDir, err := cmdutil.EvalString(c, c.Config.Paths.LogDir)
if err != nil {
@ -296,6 +297,8 @@ func (c *Context) GenLogFileName(dag *digraph.DAG, dagRunID string) (string, err
return filepath.Join(d, cfg.LogFile()), nil
}
var viperLock sync.Mutex // protects viper access across commands
// NewCommand creates a new command instance with the given cobra command and run function.
func NewCommand(cmd *cobra.Command, flags []commandLineFlag, runFunc func(cmd *Context, args []string) error) *cobra.Command {
initFlags(cmd, flags...)
@ -317,7 +320,10 @@ func NewCommand(cmd *cobra.Command, flags []commandLineFlag, runFunc func(cmd *C
}()
}
viperLock.Lock()
ctx, err := NewContext(cmd, flags)
viperLock.Unlock()
if err != nil {
fmt.Printf("Initialization error: %v\n", err)
os.Exit(1)

View File

@ -8,10 +8,10 @@ import (
"net"
"os"
"github.com/dagu-org/dagu/internal/config"
"github.com/dagu-org/dagu/internal/coordinator"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/models"
"github.com/dagu-org/dagu/internal/common/config"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/dagu-org/dagu/internal/service/coordinator"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -94,7 +94,7 @@ func runCoordinator(ctx *Context, _ []string) error {
// newCoordinator creates a new Coordinator service instance.
// It sets up a gRPC server and listener for distributed task coordination.
func newCoordinator(ctx context.Context, cfg *config.Config, registry models.ServiceRegistry) (*coordinator.Service, error) {
func newCoordinator(ctx context.Context, cfg *config.Config, registry execution.ServiceRegistry) (*coordinator.Service, error) {
// Generate instance ID
hostname, err := os.Hostname()
if err != nil {

View File

@ -43,7 +43,7 @@ func TestCoordinatorCommand(t *testing.T) {
th.Cancel()
}()
th.RunCommand(t, cmd.CmdCoordinator(), test.CmdTest{
Args: []string{"coordinator", "--config", test.TestdataPath(t, "cmd/config_test.yaml")},
Args: []string{"coordinator", "--config", test.TestdataPath(t, "cli/config_test.yaml")},
ExpectedOut: []string{"Coordinator initialization", "9876"},
})
})

View File

@ -12,8 +12,8 @@ import (
"github.com/charmbracelet/bubbles/textinput"
tea "github.com/charmbracelet/bubbletea"
"github.com/charmbracelet/lipgloss"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/models"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
)
var docStyle = lipgloss.NewStyle().Margin(1, 2)
@ -68,7 +68,7 @@ type Model struct {
// Parameter input
paramInput textinput.Model
dag *digraph.DAG
dag *core.DAG
params string
// Confirmation
@ -306,21 +306,21 @@ func (m Model) viewConfirmation() string {
// pickerModel holds context data for the picker
type pickerModel struct {
ctx context.Context
dagStore models.DAGStore
dagMap map[string]*digraph.DAG
dagStore execution.DAGStore
dagMap map[string]*core.DAG
}
// PickDAGInteractive shows a unified fullscreen UI for DAG selection, parameter input, and confirmation
func PickDAGInteractive(ctx context.Context, dagStore models.DAGStore, dag *digraph.DAG) (Result, error) {
func PickDAGInteractive(ctx context.Context, dagStore execution.DAGStore, dag *core.DAG) (Result, error) {
// Create an internal picker model
pickerModel := &pickerModel{
ctx: ctx,
dagStore: dagStore,
dagMap: make(map[string]*digraph.DAG),
dagMap: make(map[string]*core.DAG),
}
// Get list of DAGs
result, errs, err := dagStore.List(ctx, models.ListDAGsOptions{})
result, errs, err := dagStore.List(ctx, execution.ListDAGsOptions{})
if err != nil {
return Result{}, fmt.Errorf("failed to list DAGs: %w", err)
}
@ -420,7 +420,7 @@ func PickDAGInteractive(ctx context.Context, dagStore models.DAGStore, dag *digr
// PickDAG shows an interactive DAG picker and returns the selected DAG path
// Deprecated: Use PickDAGInteractive instead for a better user experience
func PickDAG(ctx context.Context, dagStore models.DAGStore) (string, error) {
func PickDAG(ctx context.Context, dagStore execution.DAGStore) (string, error) {
result, err := PickDAGInteractive(ctx, dagStore, nil)
if err != nil {
return "", err
@ -433,7 +433,7 @@ func PickDAG(ctx context.Context, dagStore models.DAGStore) (string, error) {
}
// PromptForParams prompts the user to enter parameters for a DAG
func PromptForParams(dag *digraph.DAG) (string, error) {
func PromptForParams(dag *core.DAG) (string, error) {
if dag.DefaultParams == "" && len(dag.Params) == 0 {
return "", nil
}

View File

@ -10,9 +10,9 @@ import (
"github.com/charmbracelet/bubbles/list"
"github.com/charmbracelet/bubbles/textinput"
tea "github.com/charmbracelet/bubbletea"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/builder"
"github.com/dagu-org/dagu/internal/models"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/dagu-org/dagu/internal/core/spec"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
@ -59,7 +59,7 @@ func TestDAGItem(t *testing.T) {
func TestPromptForParams(t *testing.T) {
t.Run("ReturnsEmptyStringWhenDAGHasNoParameters", func(t *testing.T) {
dag := &digraph.DAG{
dag := &core.DAG{
DefaultParams: "",
Params: []string{},
}
@ -70,7 +70,7 @@ func TestPromptForParams(t *testing.T) {
})
t.Run("DAGWithDefaultParameters", func(_ *testing.T) {
dag := &digraph.DAG{
dag := &core.DAG{
DefaultParams: "KEY1=value1 KEY2=value2",
Params: []string{},
}
@ -182,30 +182,30 @@ func (m *mockDAGStore) Delete(ctx context.Context, fileName string) error {
return args.Error(0)
}
func (m *mockDAGStore) List(ctx context.Context, params models.ListDAGsOptions) (models.PaginatedResult[*digraph.DAG], []string, error) {
func (m *mockDAGStore) List(ctx context.Context, params execution.ListDAGsOptions) (execution.PaginatedResult[*core.DAG], []string, error) {
args := m.Called(ctx, params)
return args.Get(0).(models.PaginatedResult[*digraph.DAG]), args.Get(1).([]string), args.Error(2)
return args.Get(0).(execution.PaginatedResult[*core.DAG]), args.Get(1).([]string), args.Error(2)
}
func (m *mockDAGStore) GetMetadata(ctx context.Context, fileName string) (*digraph.DAG, error) {
func (m *mockDAGStore) GetMetadata(ctx context.Context, fileName string) (*core.DAG, error) {
args := m.Called(ctx, fileName)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*digraph.DAG), args.Error(1)
return args.Get(0).(*core.DAG), args.Error(1)
}
func (m *mockDAGStore) GetDetails(ctx context.Context, fileName string, opts ...builder.LoadOption) (*digraph.DAG, error) {
func (m *mockDAGStore) GetDetails(ctx context.Context, fileName string, opts ...spec.LoadOption) (*core.DAG, error) {
args := m.Called(ctx, fileName, opts)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*digraph.DAG), args.Error(1)
return args.Get(0).(*core.DAG), args.Error(1)
}
func (m *mockDAGStore) Grep(ctx context.Context, pattern string) ([]*models.GrepDAGsResult, []string, error) {
func (m *mockDAGStore) Grep(ctx context.Context, pattern string) ([]*execution.GrepDAGsResult, []string, error) {
args := m.Called(ctx, pattern)
return args.Get(0).([]*models.GrepDAGsResult), args.Get(1).([]string), args.Error(2)
return args.Get(0).([]*execution.GrepDAGsResult), args.Get(1).([]string), args.Error(2)
}
func (m *mockDAGStore) Rename(ctx context.Context, oldID, newID string) error {
@ -223,12 +223,12 @@ func (m *mockDAGStore) UpdateSpec(ctx context.Context, fileName string, spec []b
return args.Error(0)
}
func (m *mockDAGStore) LoadSpec(ctx context.Context, spec []byte, opts ...builder.LoadOption) (*digraph.DAG, error) {
func (m *mockDAGStore) LoadSpec(ctx context.Context, spec []byte, opts ...spec.LoadOption) (*core.DAG, error) {
args := m.Called(ctx, spec, opts)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*digraph.DAG), args.Error(1)
return args.Get(0).(*core.DAG), args.Error(1)
}
func (m *mockDAGStore) TagList(ctx context.Context) ([]string, []string, error) {
@ -251,8 +251,8 @@ func TestPickDAG(t *testing.T) {
mockStore := new(mockDAGStore)
ctx := context.Background()
mockStore.On("List", ctx, models.ListDAGsOptions{}).Return(
models.PaginatedResult[*digraph.DAG]{},
mockStore.On("List", ctx, execution.ListDAGsOptions{}).Return(
execution.PaginatedResult[*core.DAG]{},
[]string{},
errors.New("database error"),
)
@ -267,9 +267,9 @@ func TestPickDAG(t *testing.T) {
mockStore := new(mockDAGStore)
ctx := context.Background()
mockStore.On("List", ctx, models.ListDAGsOptions{}).Return(
models.PaginatedResult[*digraph.DAG]{
Items: []*digraph.DAG{},
mockStore.On("List", ctx, execution.ListDAGsOptions{}).Return(
execution.PaginatedResult[*core.DAG]{
Items: []*core.DAG{},
},
[]string{},
nil,
@ -283,7 +283,7 @@ func TestPickDAG(t *testing.T) {
t.Run("CreatesProperDAGItemsFromDAGs", func(t *testing.T) {
// This tests the internal logic of converting DAGs to list items
dags := []*digraph.DAG{
dags := []*core.DAG{
{
Name: "dag1",
Location: "/path/to/dag1.yaml",

View File

@ -4,14 +4,13 @@ import (
"errors"
"fmt"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/status"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/models"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/spf13/cobra"
)
func CmdDequeue() *cobra.Command {
func Dequeue() *cobra.Command {
return NewCommand(
&cobra.Command{
Use: "dequeue [flags]",
@ -30,7 +29,7 @@ var dequeueFlags = []commandLineFlag{paramsFlag, dagRunFlagDequeue}
func runDequeue(ctx *Context, _ []string) error {
// Get dag-run reference from the context
dagRunRef, _ := ctx.StringParam("dag-run")
dagRun, err := digraph.ParseDAGRunRef(dagRunRef)
dagRun, err := execution.ParseDAGRunRef(dagRunRef)
if err != nil {
return fmt.Errorf("failed to parse dag-run reference %s: %w", dagRunRef, err)
}
@ -38,7 +37,7 @@ func runDequeue(ctx *Context, _ []string) error {
}
// dequeueDAGRun dequeues a dag-run from the queue.
func dequeueDAGRun(ctx *Context, dagRun digraph.DAGRunRef) error {
func dequeueDAGRun(ctx *Context, dagRun execution.DAGRunRef) error {
// Check if queues are enabled
if !ctx.Config.Queues.Enabled {
return fmt.Errorf("queues are disabled in configuration")
@ -53,7 +52,7 @@ func dequeueDAGRun(ctx *Context, dagRun digraph.DAGRunRef) error {
return fmt.Errorf("failed to read status: %w", err)
}
if dagStatus.Status != status.Queued {
if dagStatus.Status != core.Queued {
// If the status is not queued, return an error
return fmt.Errorf("dag-run %s is not in queued status but %s", dagRun.ID, dagStatus.Status)
}
@ -68,7 +67,7 @@ func dequeueDAGRun(ctx *Context, dagRun digraph.DAGRunRef) error {
if err != nil {
return fmt.Errorf("failed to get latest status: %w", err)
}
if latestStatus.Status != status.Queued {
if latestStatus.Status != core.Queued {
return fmt.Errorf("dag-run %s is not in queued status but %s", dagRun.ID, latestStatus.Status)
}
@ -78,7 +77,7 @@ func dequeueDAGRun(ctx *Context, dagRun digraph.DAGRunRef) error {
}
// Make the status as canceled
dagStatus.Status = status.Cancel
dagStatus.Status = core.Cancel
if err := attempt.Open(ctx.Context); err != nil {
return fmt.Errorf("failed to open run: %w", err)
@ -101,7 +100,7 @@ func dequeueDAGRun(ctx *Context, dagRun digraph.DAGRunRef) error {
// Read the latest attempt and if it's NotStarted, we can remove the DAGRun from the store
// as it only has the queued status and no other attempts.
_, err = ctx.DAGRunStore.FindAttempt(ctx, dagRun)
if errors.Is(err, models.ErrNoStatusData) {
if errors.Is(err, execution.ErrNoStatusData) {
if err := ctx.DAGRunStore.RemoveDAGRun(ctx, dagRun); err != nil {
return fmt.Errorf("failed to remove dag-run %s from store: %w", dagRun.ID, err)
}

View File

@ -5,8 +5,8 @@ import (
"testing"
"github.com/dagu-org/dagu/internal/cmd"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/status"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/dagu-org/dagu/internal/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -21,13 +21,13 @@ func TestDequeueCommand(t *testing.T) {
`)
// Enqueue the DAG first
th.RunCommand(t, cmd.CmdEnqueue(), test.CmdTest{
th.RunCommand(t, cmd.Enqueue(), test.CmdTest{
Name: "Enqueue",
Args: []string{"enqueue", "--run-id", "test-DAG", dag.Location},
})
// Now test the dequeue command
th.RunCommand(t, cmd.CmdDequeue(), test.CmdTest{
th.RunCommand(t, cmd.Dequeue(), test.CmdTest{
Name: "Dequeue",
Args: []string{"dequeue", "--dag-run", dag.Name + ":test-DAG"},
ExpectedOut: []string{"Dequeued dag-run"},
@ -45,13 +45,13 @@ func TestDequeueCommand_PreservesState(t *testing.T) {
`)
// First run the DAG successfully
th.RunCommand(t, cmd.CmdStart(), test.CmdTest{
th.RunCommand(t, cmd.Start(), test.CmdTest{
Name: "RunDAG",
Args: []string{"start", "--run-id", "success-run", dag.Location},
})
// Wait for it to complete
attempt, err := th.DAGRunStore.FindAttempt(ctx, digraph.DAGRunRef{
attempt, err := th.DAGRunStore.FindAttempt(ctx, execution.DAGRunRef{
Name: dag.Name,
ID: "success-run",
})
@ -59,16 +59,16 @@ func TestDequeueCommand_PreservesState(t *testing.T) {
dagStatus, err := attempt.ReadStatus(ctx)
require.NoError(t, err)
assert.Equal(t, status.Success, dagStatus.Status)
assert.Equal(t, core.Success, dagStatus.Status)
// Now enqueue a new run
th.RunCommand(t, cmd.CmdEnqueue(), test.CmdTest{
th.RunCommand(t, cmd.Enqueue(), test.CmdTest{
Name: "Enqueue",
Args: []string{"enqueue", "--run-id", "queued-run", dag.Location},
})
// Dequeue it
th.RunCommand(t, cmd.CmdDequeue(), test.CmdTest{
th.RunCommand(t, cmd.Dequeue(), test.CmdTest{
Name: "Dequeue",
Args: []string{"dequeue", "--dag-run", dag.Name + ":queued-run"},
ExpectedOut: []string{"Dequeued dag-run"},
@ -80,5 +80,5 @@ func TestDequeueCommand_PreservesState(t *testing.T) {
latestStatus, err := latestAttempt.ReadStatus(ctx)
require.NoError(t, err)
assert.Equal(t, status.Success, latestStatus.Status, "Latest visible status should be Success")
assert.Equal(t, core.Success, latestStatus.Status, "Latest visible status should be Success")
}

View File

@ -4,14 +4,14 @@ import (
"fmt"
"path/filepath"
"github.com/dagu-org/dagu/internal/agent"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/builder"
"github.com/dagu-org/dagu/internal/stringutil"
"github.com/dagu-org/dagu/internal/common/stringutil"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/dagu-org/dagu/internal/core/spec"
"github.com/dagu-org/dagu/internal/runtime/agent"
"github.com/spf13/cobra"
)
func CmdDry() *cobra.Command {
func Dry() *cobra.Command {
return NewCommand(
&cobra.Command{
Use: "dry [flags] <DAG definition> [-- param1 param2 ...]",
@ -36,24 +36,24 @@ Example:
var dryFlags = []commandLineFlag{paramsFlag}
func runDry(ctx *Context, args []string) error {
loadOpts := []builder.LoadOption{
builder.WithBaseConfig(ctx.Config.Paths.BaseConfig),
builder.WithDAGsDir(ctx.Config.Paths.DAGsDir),
loadOpts := []spec.LoadOption{
spec.WithBaseConfig(ctx.Config.Paths.BaseConfig),
spec.WithDAGsDir(ctx.Config.Paths.DAGsDir),
}
if argsLenAtDash := ctx.Command.ArgsLenAtDash(); argsLenAtDash != -1 {
// Get parameters from command line arguments after "--"
loadOpts = append(loadOpts, builder.WithParams(args[argsLenAtDash:]))
loadOpts = append(loadOpts, spec.WithParams(args[argsLenAtDash:]))
} else {
// Get parameters from flags
params, err := ctx.Command.Flags().GetString("params")
if err != nil {
return fmt.Errorf("failed to get parameters: %w", err)
}
loadOpts = append(loadOpts, builder.WithParams(stringutil.RemoveQuotes(params)))
loadOpts = append(loadOpts, spec.WithParams(stringutil.RemoveQuotes(params)))
}
dag, err := builder.Load(ctx, args[0], loadOpts...)
dag, err := spec.Load(ctx, args[0], loadOpts...)
if err != nil {
return fmt.Errorf("failed to load DAG from %s: %w", args[0], err)
}
@ -78,7 +78,7 @@ func runDry(ctx *Context, args []string) error {
return err
}
root := digraph.NewDAGRunRef(dag.Name, dagRunID)
root := execution.NewDAGRunRef(dag.Name, dagRunID)
agentInstance := agent.New(
dagRunID,

View File

@ -42,7 +42,7 @@ steps:
for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
th.RunCommand(t, cmd.CmdDry(), tc)
th.RunCommand(t, cmd.Dry(), tc)
})
}
})

View File

@ -4,15 +4,15 @@ import (
"fmt"
"time"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/status"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/models"
"github.com/dagu-org/dagu/internal/stringutil"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/dagu-org/dagu/internal/common/stringutil"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/dagu-org/dagu/internal/runtime/transform"
"github.com/spf13/cobra"
)
func CmdEnqueue() *cobra.Command {
func Enqueue() *cobra.Command {
return NewCommand(
&cobra.Command{
Use: "enqueue [flags]",
@ -86,7 +86,7 @@ func runEnqueue(ctx *Context, args []string) error {
}
// enqueueDAGRun enqueues a dag-run to the queue.
func enqueueDAGRun(ctx *Context, dag *digraph.DAG, dagRunID string) error {
func enqueueDAGRun(ctx *Context, dag *core.DAG, dagRunID string) error {
// Check if queues are enabled
if !ctx.Config.Queues.Enabled {
return fmt.Errorf("queues are disabled in configuration")
@ -96,32 +96,32 @@ func enqueueDAGRun(ctx *Context, dag *digraph.DAG, dagRunID string) error {
return fmt.Errorf("failed to generate log file name: %w", err)
}
dagRun := digraph.NewDAGRunRef(dag.Name, dagRunID)
dagRun := execution.NewDAGRunRef(dag.Name, dagRunID)
// Check if the dag-run is already existing in the history store
if _, err = ctx.DAGRunStore.FindAttempt(ctx, dagRun); err == nil {
return fmt.Errorf("DAG %q with ID %q already exists", dag.Name, dagRunID)
}
att, err := ctx.DAGRunStore.CreateAttempt(ctx.Context, dag, time.Now(), dagRunID, models.NewDAGRunAttemptOptions{})
att, err := ctx.DAGRunStore.CreateAttempt(ctx.Context, dag, time.Now(), dagRunID, execution.NewDAGRunAttemptOptions{})
if err != nil {
return fmt.Errorf("failed to create run: %w", err)
}
opts := []models.StatusOption{
models.WithLogFilePath(logFile),
models.WithAttemptID(att.ID()),
models.WithPreconditions(dag.Preconditions),
models.WithQueuedAt(stringutil.FormatTime(time.Now())),
models.WithHierarchyRefs(
digraph.NewDAGRunRef(dag.Name, dagRunID),
digraph.DAGRunRef{},
opts := []transform.StatusOption{
transform.WithLogFilePath(logFile),
transform.WithAttemptID(att.ID()),
transform.WithPreconditions(dag.Preconditions),
transform.WithQueuedAt(stringutil.FormatTime(time.Now())),
transform.WithHierarchyRefs(
execution.NewDAGRunRef(dag.Name, dagRunID),
execution.DAGRunRef{},
),
}
// As a prototype, we save the status to the database to enqueue the dag-run.
// This could be changed to save to a queue file in the future
dagStatus := models.NewStatusBuilder(dag).Create(dagRunID, status.Queued, 0, time.Time{}, opts...)
dagStatus := transform.NewStatusBuilder(dag).Create(dagRunID, core.Queued, 0, time.Time{}, opts...)
if err := att.Open(ctx.Context); err != nil {
return fmt.Errorf("failed to open run: %w", err)
@ -134,7 +134,7 @@ func enqueueDAGRun(ctx *Context, dag *digraph.DAG, dagRunID string) error {
}
// Enqueue the dag-run to the queue
if err := ctx.QueueStore.Enqueue(ctx.Context, dag.Name, models.QueuePriorityLow, dagRun); err != nil {
if err := ctx.QueueStore.Enqueue(ctx.Context, dag.Name, execution.QueuePriorityLow, dagRun); err != nil {
return fmt.Errorf("failed to enqueue dag-run: %w", err)
}

View File

@ -51,7 +51,7 @@ steps:
for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
th.RunCommand(t, cmd.CmdEnqueue(), tc)
th.RunCommand(t, cmd.Enqueue(), tc)
})
}
}

View File

@ -1,7 +1,7 @@
package cmd
import (
"github.com/dagu-org/dagu/internal/stringutil"
"github.com/dagu-org/dagu/internal/common/stringutil"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

View File

@ -4,13 +4,12 @@ import (
"fmt"
"path/filepath"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/migration"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/spf13/cobra"
)
// CmdMigrate creates the migrate command with subcommands
func CmdMigrate() *cobra.Command {
// Migrate creates the migrate command with subcommands
func Migrate() *cobra.Command {
cmd := &cobra.Command{
Use: "migrate",
Short: "Migrate legacy data to new format",
@ -56,7 +55,7 @@ func runMigration(ctx *Context) error {
}
// Create migrator
migrator := migration.NewHistoryMigrator(
migrator := newHistoryMigrator(
ctx.DAGRunStore,
dagStore,
ctx.Config.Paths.DataDir,

View File

@ -8,9 +8,9 @@ import (
"testing"
"time"
"github.com/dagu-org/dagu/internal/config"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/status"
"github.com/dagu-org/dagu/internal/common/config"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/dagu-org/dagu/internal/persistence/filedagrun"
legacymodel "github.com/dagu-org/dagu/internal/persistence/legacy/model"
"github.com/spf13/cobra"
@ -37,13 +37,13 @@ func TestMigrateHistoryCommand(t *testing.T) {
legacyStatus := legacymodel.Status{
RequestID: "req123",
Name: "test-dag",
Status: status.Success,
Status: core.Success,
StartedAt: time.Now().Add(-1 * time.Hour).Format(time.RFC3339),
FinishedAt: time.Now().Add(-30 * time.Minute).Format(time.RFC3339),
Nodes: []*legacymodel.Node{
{
Step: digraph.Step{Name: "step1"},
Status: status.NodeSuccess,
Step: core.Step{Name: "step1"},
Status: core.NodeSuccess,
StartedAt: time.Now().Add(-50 * time.Minute).Format(time.RFC3339),
FinishedAt: time.Now().Add(-40 * time.Minute).Format(time.RFC3339),
},
@ -85,7 +85,7 @@ func TestMigrateHistoryCommand(t *testing.T) {
require.NoError(t, err)
// Verify migration
attempt, err := dagRunStore.FindAttempt(context.Background(), digraph.NewDAGRunRef("test-dag", "req123"))
attempt, err := dagRunStore.FindAttempt(context.Background(), execution.NewDAGRunRef("test-dag", "req123"))
require.NoError(t, err)
require.NotNil(t, attempt)
@ -93,7 +93,7 @@ func TestMigrateHistoryCommand(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, "req123", dagRunStatus.DAGRunID)
assert.Equal(t, "test-dag", dagRunStatus.Name)
assert.Equal(t, status.Success, dagRunStatus.Status)
assert.Equal(t, core.Success, dagRunStatus.Status)
// Verify legacy directory was moved
_, err = os.Stat(legacyDagDir)
@ -153,7 +153,7 @@ func TestMigrateCommand_NoLegacyData(t *testing.T) {
}
func TestCmdMigrate(t *testing.T) {
cmd := CmdMigrate()
cmd := Migrate()
assert.NotNil(t, cmd)
assert.Equal(t, "migrate", cmd.Use)
assert.True(t, cmd.HasSubCommands())

View File

@ -1,4 +1,4 @@
package migration
package cmd
import (
"context"
@ -9,29 +9,28 @@ import (
"strings"
"time"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/status"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/models"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
legacymodel "github.com/dagu-org/dagu/internal/persistence/legacy/model"
)
// HistoryMigrator handles migration from legacy history format to new format
type HistoryMigrator struct {
dagRunStore models.DAGRunStore
dagStore models.DAGStore
// historyMigrator handles migration from legacy history format to new format
type historyMigrator struct {
dagRunStore execution.DAGRunStore
dagStore execution.DAGStore
dataDir string
dagsDir string
}
// NewHistoryMigrator creates a new history migrator
func NewHistoryMigrator(
dagRunStore models.DAGRunStore,
dagStore models.DAGStore,
// newHistoryMigrator creates a new history migrator
func newHistoryMigrator(
dagRunStore execution.DAGRunStore,
dagStore execution.DAGStore,
dataDir string,
dagsDir string,
) *HistoryMigrator {
return &HistoryMigrator{
) *historyMigrator {
return &historyMigrator{
dagRunStore: dagRunStore,
dagStore: dagStore,
dataDir: dataDir,
@ -39,8 +38,8 @@ func NewHistoryMigrator(
}
}
// MigrationResult contains the result of a migration
type MigrationResult struct {
// migrationResult contains the result of a migration
type migrationResult struct {
TotalDAGs int
TotalRuns int
MigratedRuns int
@ -50,7 +49,7 @@ type MigrationResult struct {
}
// NeedsMigration checks if legacy data exists that needs migration
func (m *HistoryMigrator) NeedsMigration(_ context.Context) (bool, error) {
func (m *historyMigrator) NeedsMigration(_ context.Context) (bool, error) {
dataDir := m.dataDir
// Check if history directory exists
@ -88,8 +87,8 @@ func (m *HistoryMigrator) NeedsMigration(_ context.Context) (bool, error) {
}
// Migrate performs the migration from legacy to new format
func (m *HistoryMigrator) Migrate(ctx context.Context) (*MigrationResult, error) {
result := &MigrationResult{}
func (m *historyMigrator) Migrate(ctx context.Context) (*migrationResult, error) {
result := &migrationResult{}
logger.Info(ctx, "Starting history migration from legacy format")
@ -121,7 +120,7 @@ func (m *HistoryMigrator) Migrate(ctx context.Context) (*MigrationResult, error)
}
// migrateDAGHistory migrates all runs for a specific DAG
func (m *HistoryMigrator) migrateDAGHistory(ctx context.Context, dirName, dagName string, result *MigrationResult) error {
func (m *historyMigrator) migrateDAGHistory(ctx context.Context, dirName, dagName string, result *migrationResult) error {
dagHistoryDir := filepath.Join(m.dataDir, dirName)
files, err := os.ReadDir(dagHistoryDir)
@ -147,7 +146,7 @@ func (m *HistoryMigrator) migrateDAGHistory(ctx context.Context, dirName, dagNam
continue
}
if statusFile == nil || statusFile.Status.RequestID == "" || statusFile.Status.Status == status.None {
if statusFile == nil || statusFile.Status.RequestID == "" || statusFile.Status.Status == core.None {
result.SkippedRuns++
err := fmt.Sprintf("skipped invalid status file %s, RequestID=%s, Status=%s", file.Name(), statusFile.Status.RequestID, statusFile.Status.Status.String())
result.Errors = append(result.Errors, err)
@ -180,7 +179,7 @@ func (m *HistoryMigrator) migrateDAGHistory(ctx context.Context, dirName, dagNam
}
// migrateRun converts and saves a single run
func (m *HistoryMigrator) migrateRun(ctx context.Context, legacyStatusFile *legacymodel.StatusFile, dirBasedDagName string) error {
func (m *historyMigrator) migrateRun(ctx context.Context, legacyStatusFile *legacymodel.StatusFile, dirBasedDagName string) error {
legacyStatus := &legacyStatusFile.Status
// Load the DAG definition - try both the status name and directory-based name
@ -199,7 +198,7 @@ func (m *HistoryMigrator) migrateRun(ctx context.Context, legacyStatusFile *lega
}
// Create attempt in new store
attempt, err := m.dagRunStore.CreateAttempt(ctx, dag, startedAt, newStatus.DAGRunID, models.NewDAGRunAttemptOptions{
attempt, err := m.dagRunStore.CreateAttempt(ctx, dag, startedAt, newStatus.DAGRunID, execution.NewDAGRunAttemptOptions{
RootDAGRun: nil, // No hierarchy info in legacy format
Retry: false,
})
@ -228,7 +227,7 @@ func (m *HistoryMigrator) migrateRun(ctx context.Context, legacyStatusFile *lega
}
// convertStatus converts legacy status to new DAGRunStatus format
func (m *HistoryMigrator) convertStatus(legacy *legacymodel.Status, dag *digraph.DAG) *models.DAGRunStatus {
func (m *historyMigrator) convertStatus(legacy *legacymodel.Status, dag *core.DAG) *execution.DAGRunStatus {
// Convert timestamps
startedAt, _ := m.parseTime(legacy.StartedAt)
finishedAt, _ := m.parseTime(legacy.FinishedAt)
@ -239,13 +238,13 @@ func (m *HistoryMigrator) convertStatus(legacy *legacymodel.Status, dag *digraph
createdAt = startedAt.UnixMilli()
}
status := &models.DAGRunStatus{
status := &execution.DAGRunStatus{
Name: legacy.Name,
DAGRunID: legacy.RequestID,
Status: legacy.Status,
PID: models.PID(legacy.PID),
PID: execution.PID(legacy.PID),
Log: legacy.Log,
Nodes: make([]*models.Node, 0),
Nodes: make([]*execution.Node, 0),
Params: legacy.Params,
ParamsList: legacy.ParamsList,
CreatedAt: createdAt,
@ -282,8 +281,8 @@ func (m *HistoryMigrator) convertStatus(legacy *legacymodel.Status, dag *digraph
}
// convertNode converts legacy node to new Node format
func (m *HistoryMigrator) convertNode(legacy *legacymodel.Node) *models.Node {
node := &models.Node{
func (m *historyMigrator) convertNode(legacy *legacymodel.Node) *execution.Node {
node := &execution.Node{
Step: legacy.Step,
Status: legacy.Status,
Error: legacy.Error,
@ -299,7 +298,7 @@ func (m *HistoryMigrator) convertNode(legacy *legacymodel.Node) *models.Node {
}
// parseTime attempts to parse various time formats
func (m *HistoryMigrator) parseTime(timeStr string) (time.Time, error) {
func (m *historyMigrator) parseTime(timeStr string) (time.Time, error) {
if timeStr == "" || timeStr == "-" {
return time.Time{}, fmt.Errorf("empty time string")
}
@ -330,7 +329,7 @@ func formatTime(t time.Time) string {
}
// extractDAGName extracts the DAG name from directory name
func (m *HistoryMigrator) extractDAGName(dirName string) string {
func (m *historyMigrator) extractDAGName(dirName string) string {
// Directory format: {dag-name}-{hash}
// Just remove the last part after hyphen if it looks like a hash
lastHyphen := strings.LastIndex(dirName, "-")
@ -350,7 +349,7 @@ func (m *HistoryMigrator) extractDAGName(dirName string) string {
}
// readLegacyStatusFile reads a legacy status file directly
func (m *HistoryMigrator) readLegacyStatusFile(filePath string) (*legacymodel.StatusFile, error) {
func (m *historyMigrator) readLegacyStatusFile(filePath string) (*legacymodel.StatusFile, error) {
data, err := os.ReadFile(filePath) //nolint:gosec
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
@ -380,8 +379,8 @@ func (m *HistoryMigrator) readLegacyStatusFile(filePath string) (*legacymodel.St
}
// isAlreadyMigrated checks if a run has already been migrated
func (m *HistoryMigrator) isAlreadyMigrated(ctx context.Context, dagName, requestID string) bool {
attempt, err := m.dagRunStore.FindAttempt(ctx, digraph.NewDAGRunRef(dagName, requestID))
func (m *historyMigrator) isAlreadyMigrated(ctx context.Context, dagName, requestID string) bool {
attempt, err := m.dagRunStore.FindAttempt(ctx, execution.NewDAGRunRef(dagName, requestID))
if err != nil || attempt == nil {
return false
}
@ -391,7 +390,7 @@ func (m *HistoryMigrator) isAlreadyMigrated(ctx context.Context, dagName, reques
}
// loadDAGForMigration attempts to load the DAG definition
func (m *HistoryMigrator) loadDAGForMigration(ctx context.Context, statusDagName, dirBasedDagName string) (*digraph.DAG, error) {
func (m *historyMigrator) loadDAGForMigration(ctx context.Context, statusDagName, dirBasedDagName string) (*core.DAG, error) {
// Try both DAG names as candidates
candidates := []string{statusDagName}
if dirBasedDagName != "" && dirBasedDagName != statusDagName {
@ -414,13 +413,13 @@ func (m *HistoryMigrator) loadDAGForMigration(ctx context.Context, statusDagName
}
// If we can't find the DAG, create a minimal one
return &digraph.DAG{
return &core.DAG{
Name: statusDagName,
}, nil
}
// MoveLegacyData moves individual legacy DAG directories to an archive location after successful migration
func (m *HistoryMigrator) MoveLegacyData(ctx context.Context) error {
func (m *historyMigrator) MoveLegacyData(ctx context.Context) error {
archiveDir := filepath.Join(m.dataDir, fmt.Sprintf("history_migrated_%s", time.Now().Format("20060102_150405")))
// Create archive directory

View File

@ -1,4 +1,4 @@
package migration
package cmd
import (
"context"
@ -8,10 +8,9 @@ import (
"testing"
"time"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/builder"
"github.com/dagu-org/dagu/internal/digraph/status"
"github.com/dagu-org/dagu/internal/models"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/dagu-org/dagu/internal/core/spec"
"github.com/dagu-org/dagu/internal/persistence/filedagrun"
legacymodel "github.com/dagu-org/dagu/internal/persistence/legacy/model"
"github.com/stretchr/testify/assert"
@ -20,28 +19,28 @@ import (
// mockDAGStore implements models.DAGStore for testing
type mockDAGStore struct {
dags map[string]*digraph.DAG
dags map[string]*core.DAG
}
func (m *mockDAGStore) GetDetails(_ context.Context, path string, _ ...builder.LoadOption) (*digraph.DAG, error) {
func (m *mockDAGStore) GetDetails(_ context.Context, path string, _ ...spec.LoadOption) (*core.DAG, error) {
if dag, ok := m.dags[path]; ok {
return dag, nil
}
return nil, os.ErrNotExist
}
func (m *mockDAGStore) List(_ context.Context, _ models.ListDAGsOptions) (models.PaginatedResult[*digraph.DAG], []string, error) {
var dags []*digraph.DAG
func (m *mockDAGStore) List(_ context.Context, _ execution.ListDAGsOptions) (execution.PaginatedResult[*core.DAG], []string, error) {
var dags []*core.DAG
for _, dag := range m.dags {
dags = append(dags, dag)
}
return models.PaginatedResult[*digraph.DAG]{
return execution.PaginatedResult[*core.DAG]{
Items: dags,
TotalCount: len(dags),
}, nil, nil
}
func (m *mockDAGStore) FindByName(_ context.Context, name string) (*digraph.DAG, error) {
func (m *mockDAGStore) FindByName(_ context.Context, name string) (*core.DAG, error) {
for _, dag := range m.dags {
if dag.Name == name {
return dag, nil
@ -78,11 +77,11 @@ func (m *mockDAGStore) ToggleSuspend(_ context.Context, _ string, _ bool) error
return nil
}
func (m *mockDAGStore) GetMetadata(_ context.Context, _ string) (*digraph.DAG, error) {
func (m *mockDAGStore) GetMetadata(_ context.Context, _ string) (*core.DAG, error) {
return nil, nil
}
func (m *mockDAGStore) Grep(_ context.Context, _ string) ([]*models.GrepDAGsResult, []string, error) {
func (m *mockDAGStore) Grep(_ context.Context, _ string) ([]*execution.GrepDAGsResult, []string, error) {
return nil, nil, nil
}
@ -90,7 +89,7 @@ func (m *mockDAGStore) UpdateSpec(_ context.Context, _ string, _ []byte) error {
return nil
}
func (m *mockDAGStore) LoadSpec(_ context.Context, _ []byte, _ ...builder.LoadOption) (*digraph.DAG, error) {
func (m *mockDAGStore) LoadSpec(_ context.Context, _ []byte, _ ...spec.LoadOption) (*core.DAG, error) {
return nil, nil
}
@ -131,7 +130,7 @@ func TestExtractDAGName(t *testing.T) {
},
}
m := &HistoryMigrator{}
m := &historyMigrator{}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := m.extractDAGName(tt.dirName)
@ -149,13 +148,13 @@ func TestReadLegacyStatusFile(t *testing.T) {
status1 := legacymodel.Status{
RequestID: "req123",
Name: "test-dag",
Status: status.Running,
Status: core.Running,
StartedAt: "2024-01-01T10:00:00Z",
}
status2 := legacymodel.Status{
RequestID: "req123",
Name: "test-dag",
Status: status.Success,
Status: core.Success,
StartedAt: "2024-01-01T10:00:00Z",
FinishedAt: "2024-01-01T10:05:00Z",
}
@ -173,14 +172,14 @@ func TestReadLegacyStatusFile(t *testing.T) {
require.NoError(t, f.Close())
// Test reading the file
m := &HistoryMigrator{}
m := &historyMigrator{}
statusFile, err := m.readLegacyStatusFile(testFile)
require.NoError(t, err)
require.NotNil(t, statusFile)
// Should get the last status (finished one)
assert.Equal(t, "req123", statusFile.Status.RequestID)
assert.Equal(t, status.Success, statusFile.Status.Status)
assert.Equal(t, core.Success, statusFile.Status.Status)
assert.Equal(t, "2024-01-01T10:05:00Z", statusFile.Status.FinishedAt)
}
@ -189,7 +188,7 @@ func TestNeedsMigration(t *testing.T) {
t.Run("NoHistoryDirectory", func(t *testing.T) {
tempDir := t.TempDir()
m := &HistoryMigrator{dataDir: tempDir}
m := &historyMigrator{dataDir: tempDir}
needsMigration, err := m.NeedsMigration(ctx)
assert.NoError(t, err)
@ -206,7 +205,7 @@ func TestNeedsMigration(t *testing.T) {
datFile := filepath.Join(dagDir, "my-dag.20240101.100000.req123.dat")
require.NoError(t, os.WriteFile(datFile, []byte(`{"RequestId":"req123"}`), 0600))
m := &HistoryMigrator{dataDir: tempDir}
m := &historyMigrator{dataDir: tempDir}
needsMigration, err := m.NeedsMigration(ctx)
assert.NoError(t, err)
assert.True(t, needsMigration)
@ -222,7 +221,7 @@ func TestNeedsMigration(t *testing.T) {
otherFile := filepath.Join(dagDir, "other.txt")
require.NoError(t, os.WriteFile(otherFile, []byte("test"), 0600))
m := &HistoryMigrator{dataDir: tempDir}
m := &historyMigrator{dataDir: tempDir}
needsMigration, err := m.NeedsMigration(ctx)
assert.NoError(t, err)
assert.False(t, needsMigration)
@ -233,7 +232,7 @@ func TestConvertStatus(t *testing.T) {
legacy := &legacymodel.Status{
RequestID: "req123",
Name: "test-dag",
Status: status.Success,
Status: core.Success,
PID: legacymodel.PID(12345),
Log: "test log",
StartedAt: "2024-01-01T10:00:00Z",
@ -242,37 +241,37 @@ func TestConvertStatus(t *testing.T) {
ParamsList: []string{"param1", "value1"},
Nodes: []*legacymodel.Node{
{
Step: digraph.Step{
Step: core.Step{
Name: "step1",
},
Status: status.NodeSuccess,
Status: core.NodeSuccess,
StartedAt: "2024-01-01T10:01:00Z",
FinishedAt: "2024-01-01T10:02:00Z",
Log: "step log",
},
},
OnSuccess: &legacymodel.Node{
Step: digraph.Step{
Step: core.Step{
Name: "on_success",
},
Status: status.NodeSuccess,
Status: core.NodeSuccess,
},
}
dag := &digraph.DAG{
dag := &core.DAG{
Name: "test-dag",
Preconditions: []*digraph.Condition{
Preconditions: []*core.Condition{
{Condition: "test condition"},
},
}
m := &HistoryMigrator{}
m := &historyMigrator{}
result := m.convertStatus(legacy, dag)
assert.Equal(t, "test-dag", result.Name)
assert.Equal(t, "req123", result.DAGRunID)
assert.Equal(t, status.Success, result.Status)
assert.Equal(t, models.PID(12345), result.PID)
assert.Equal(t, core.Success, result.Status)
assert.Equal(t, execution.PID(12345), result.PID)
assert.Equal(t, "test log", result.Log)
assert.Equal(t, "param1=value1", result.Params)
assert.Equal(t, []string{"param1", "value1"}, result.ParamsList)
@ -321,7 +320,7 @@ func TestParseTime(t *testing.T) {
},
}
m := &HistoryMigrator{}
m := &historyMigrator{}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := m.parseTime(tt.timeStr)
@ -354,7 +353,7 @@ func TestMoveLegacyData(t *testing.T) {
require.NoError(t, os.MkdirAll(otherDir, 0750))
require.NoError(t, os.WriteFile(filepath.Join(otherDir, "test.txt"), []byte("data"), 0600))
m := &HistoryMigrator{dataDir: tempDir}
m := &historyMigrator{dataDir: tempDir}
err := m.MoveLegacyData(ctx)
require.NoError(t, err)
@ -396,17 +395,17 @@ func TestLoadDAGForMigration(t *testing.T) {
dag1Path := filepath.Join(tempDir, "test-dag.yaml")
dag2Path := filepath.Join(tempDir, "test-dag-v2.yml")
dag1 := &digraph.DAG{Name: "test-dag", Location: dag1Path}
dag2 := &digraph.DAG{Name: "test-dag-v2", Location: dag2Path}
dag1 := &core.DAG{Name: "test-dag", Location: dag1Path}
dag2 := &core.DAG{Name: "test-dag-v2", Location: dag2Path}
mockStore := &mockDAGStore{
dags: map[string]*digraph.DAG{
dags: map[string]*core.DAG{
dag1Path: dag1,
dag2Path: dag2,
},
}
m := &HistoryMigrator{
m := &historyMigrator{
dagStore: mockStore,
dagsDir: tempDir,
}
@ -455,13 +454,13 @@ func TestFullMigration(t *testing.T) {
legacyStatus := legacymodel.Status{
RequestID: "req123",
Name: "test-dag",
Status: status.Success,
Status: core.Success,
StartedAt: time.Now().Add(-1 * time.Hour).Format(time.RFC3339),
FinishedAt: time.Now().Add(-30 * time.Minute).Format(time.RFC3339),
Nodes: []*legacymodel.Node{
{
Step: digraph.Step{Name: "step1"},
Status: status.NodeSuccess,
Step: core.Step{Name: "step1"},
Status: core.NodeSuccess,
},
},
}
@ -473,7 +472,7 @@ func TestFullMigration(t *testing.T) {
// Create DAG file
dagPath := filepath.Join(dagsDir, "test-dag.yaml")
testDAG := &digraph.DAG{
testDAG := &core.DAG{
Name: "test-dag",
Location: dagPath,
}
@ -482,13 +481,13 @@ func TestFullMigration(t *testing.T) {
// Set up stores
dagRunStore := filedagrun.New(dagRunsDir)
dagStore := &mockDAGStore{
dags: map[string]*digraph.DAG{
dags: map[string]*core.DAG{
dagPath: testDAG,
},
}
// Create migrator
migrator := NewHistoryMigrator(dagRunStore, dagStore, dataDir, dagsDir)
migrator := newHistoryMigrator(dagRunStore, dagStore, dataDir, dagsDir)
// Check migration is needed
needsMigration, err := migrator.NeedsMigration(ctx)
@ -504,7 +503,7 @@ func TestFullMigration(t *testing.T) {
assert.Equal(t, 0, result.FailedRuns)
// Verify migration
attempt, err := dagRunStore.FindAttempt(ctx, digraph.NewDAGRunRef("test-dag", "req123"))
attempt, err := dagRunStore.FindAttempt(ctx, execution.NewDAGRunRef("test-dag", "req123"))
require.NoError(t, err)
require.NotNil(t, attempt)
@ -512,7 +511,7 @@ func TestFullMigration(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, "req123", dagRunStatus.DAGRunID)
assert.Equal(t, "test-dag", dagRunStatus.Name)
assert.Equal(t, status.Success, dagRunStatus.Status)
assert.Equal(t, core.Success, dagRunStatus.Status)
assert.Len(t, dagRunStatus.Nodes, 1)
// Move legacy data

View File

@ -7,16 +7,15 @@ import (
"path/filepath"
"time"
"github.com/dagu-org/dagu/internal/agent"
"github.com/dagu-org/dagu/internal/dagrun"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/status"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/models"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/dagu-org/dagu/internal/runtime"
"github.com/dagu-org/dagu/internal/runtime/agent"
"github.com/spf13/cobra"
)
func CmdRestart() *cobra.Command {
func Restart() *cobra.Command {
return NewCommand(
&cobra.Command{
Use: "restart [flags] <DAG name>",
@ -50,10 +49,10 @@ func runRestart(ctx *Context, args []string) error {
name := args[0]
var attempt models.DAGRunAttempt
var attempt execution.DAGRunAttempt
if dagRunID != "" {
// Retrieve the previous run for the specified dag-run ID.
dagRunRef := digraph.NewDAGRunRef(name, dagRunID)
dagRunRef := execution.NewDAGRunRef(name, dagRunID)
att, err := ctx.DAGRunStore.FindAttempt(ctx, dagRunRef)
if err != nil {
return fmt.Errorf("failed to find the run for dag-run ID %s: %w", dagRunID, err)
@ -71,8 +70,8 @@ func runRestart(ctx *Context, args []string) error {
if err != nil {
return fmt.Errorf("failed to read status: %w", err)
}
if dagStatus.Status != status.Running {
return fmt.Errorf("DAG %s is not running", name)
if dagStatus.Status != core.Running {
return fmt.Errorf("DAG %s is not running, current status: %s", name, dagStatus.Status)
}
dag, err := attempt.ReadDAG(ctx)
@ -87,7 +86,7 @@ func runRestart(ctx *Context, args []string) error {
return nil
}
func handleRestartProcess(ctx *Context, d *digraph.DAG, dagRunID string) error {
func handleRestartProcess(ctx *Context, d *core.DAG, dagRunID string) error {
// Stop if running
if err := stopDAGIfRunning(ctx, ctx.DAGRunMgr, d, dagRunID); err != nil {
return err
@ -107,7 +106,7 @@ func handleRestartProcess(ctx *Context, d *digraph.DAG, dagRunID string) error {
defer ctx.ProcStore.Unlock(ctx, d.ProcGroup())
// Acquire process handle
proc, err := ctx.ProcStore.Acquire(ctx, d.ProcGroup(), digraph.NewDAGRunRef(d.Name, dagRunID))
proc, err := ctx.ProcStore.Acquire(ctx, d.ProcGroup(), execution.NewDAGRunRef(d.Name, dagRunID))
if err != nil {
logger.Debug(ctx, "failed to acquire process handle", "err", err)
return fmt.Errorf("failed to acquire process handle: %w", errMaxRunReached)
@ -122,7 +121,7 @@ func handleRestartProcess(ctx *Context, d *digraph.DAG, dagRunID string) error {
return executeDAG(ctx, ctx.DAGRunMgr, d)
}
func executeDAG(ctx *Context, cli dagrun.Manager, dag *digraph.DAG) error {
func executeDAG(ctx *Context, cli runtime.Manager, dag *core.DAG) error {
dagRunID, err := genRunID()
if err != nil {
return fmt.Errorf("failed to generate dag-run ID: %w", err)
@ -154,7 +153,7 @@ func executeDAG(ctx *Context, cli dagrun.Manager, dag *digraph.DAG) error {
dr,
ctx.DAGRunStore,
ctx.ServiceRegistry,
digraph.NewDAGRunRef(dag.Name, dagRunID),
execution.NewDAGRunRef(dag.Name, dagRunID),
ctx.Config.Global.Peer,
agent.Options{Dry: false})
@ -171,13 +170,13 @@ func executeDAG(ctx *Context, cli dagrun.Manager, dag *digraph.DAG) error {
return nil
}
func stopDAGIfRunning(ctx context.Context, cli dagrun.Manager, dag *digraph.DAG, dagRunID string) error {
func stopDAGIfRunning(ctx context.Context, cli runtime.Manager, dag *core.DAG, dagRunID string) error {
dagStatus, err := cli.GetCurrentStatus(ctx, dag, dagRunID)
if err != nil {
return fmt.Errorf("failed to get current status: %w", err)
}
if dagStatus.Status == status.Running {
if dagStatus.Status == core.Running {
logger.Infof(ctx, "Stopping: %s", dag.Name)
if err := stopRunningDAG(ctx, cli, dag, dagRunID); err != nil {
return fmt.Errorf("failed to stop running DAG: %w", err)
@ -186,7 +185,7 @@ func stopDAGIfRunning(ctx context.Context, cli dagrun.Manager, dag *digraph.DAG,
return nil
}
func stopRunningDAG(ctx context.Context, cli dagrun.Manager, dag *digraph.DAG, dagRunID string) error {
func stopRunningDAG(ctx context.Context, cli runtime.Manager, dag *core.DAG, dagRunID string) error {
const stopPollInterval = 100 * time.Millisecond
for {
dagStatus, err := cli.GetCurrentStatus(ctx, dag, dagRunID)
@ -194,7 +193,7 @@ func stopRunningDAG(ctx context.Context, cli dagrun.Manager, dag *digraph.DAG, d
return fmt.Errorf("failed to get current status: %w", err)
}
if dagStatus.Status != status.Running {
if dagStatus.Status != core.Running {
return nil
}

View File

@ -2,11 +2,10 @@ package cmd_test
import (
"testing"
"time"
"github.com/dagu-org/dagu/internal/cmd"
"github.com/dagu-org/dagu/internal/digraph/builder"
"github.com/dagu-org/dagu/internal/digraph/status"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/spec"
"github.com/dagu-org/dagu/internal/test"
"github.com/stretchr/testify/require"
)
@ -22,44 +21,36 @@ steps:
script: "sleep 1"
`)
// Start the DAG to restart.
done1 := make(chan struct{})
go func() {
// Start the DAG to restart.
args := []string{"start", `--params="foo"`, dag.Location}
th.RunCommand(t, cmd.CmdStart(), test.CmdTest{Args: args})
th.RunCommand(t, cmd.Start(), test.CmdTest{Args: args})
close(done1)
}()
// Wait for the DAG to be running.
dag.AssertCurrentStatus(t, status.Running)
dag.AssertCurrentStatus(t, core.Running)
// Restart the DAG.
done := make(chan struct{})
done2 := make(chan struct{})
go func() {
defer close(done)
args := []string{"restart", dag.Location}
th.RunCommand(t, cmd.CmdRestart(), test.CmdTest{Args: args})
th.RunCommand(t, cmd.Restart(), test.CmdTest{Args: args})
close(done2)
}()
// Wait for the dag-run running again.
dag.AssertCurrentStatus(t, status.Running)
// Stop the restarted DAG.
th.RunCommand(t, cmd.CmdStop(), test.CmdTest{Args: []string{"stop", dag.Location}})
// Wait for the DAG is stopped.
dag.AssertCurrentStatus(t, status.None)
// Wait for both executions to complete.
<-done1
<-done2
// Check parameter was the same as the first execution
loaded, err := builder.Load(th.Context, dag.Location, builder.WithBaseConfig(th.Config.Paths.BaseConfig))
loaded, err := spec.Load(th.Context, dag.Location, spec.WithBaseConfig(th.Config.Paths.BaseConfig))
require.NoError(t, err)
time.Sleep(time.Millisecond * 1000) // Wait for the history to be updated.
// Check parameter was the same as the first execution
recentHistory := th.DAGRunMgr.ListRecentStatus(th.Context, loaded.Name, 2)
require.Len(t, recentHistory, 2)
require.Equal(t, recentHistory[0].Params, recentHistory[1].Params)
<-done
}

View File

@ -4,15 +4,15 @@ import (
"fmt"
"path/filepath"
"github.com/dagu-org/dagu/internal/agent"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/fileutil"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/models"
"github.com/dagu-org/dagu/internal/common/fileutil"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/dagu-org/dagu/internal/runtime/agent"
"github.com/spf13/cobra"
)
func CmdRetry() *cobra.Command {
func Retry() *cobra.Command {
return NewCommand(
&cobra.Command{
Use: "retry [flags] <DAG name or file>",
@ -45,7 +45,7 @@ func runRetry(ctx *Context, args []string) error {
}
// Retrieve the previous run data for specified dag-run ID.
ref := digraph.NewDAGRunRef(name, dagRunID)
ref := execution.NewDAGRunRef(name, dagRunID)
attempt, err := ctx.DAGRunStore.FindAttempt(ctx, ref)
if err != nil {
return fmt.Errorf("failed to find the record for dag-run ID %s: %w", dagRunID, err)
@ -88,7 +88,7 @@ func runRetry(ctx *Context, args []string) error {
}
// Acquire process handle
proc, err := ctx.ProcStore.Acquire(ctx, dag.ProcGroup(), digraph.NewDAGRunRef(dag.Name, dagRunID))
proc, err := ctx.ProcStore.Acquire(ctx, dag.ProcGroup(), execution.NewDAGRunRef(dag.Name, dagRunID))
if err != nil {
logger.Debug(ctx, "failed to acquire process handle", "err", err)
return fmt.Errorf("failed to acquire process handle: %w", errMaxRunReached)
@ -108,7 +108,7 @@ func runRetry(ctx *Context, args []string) error {
return nil
}
func executeRetry(ctx *Context, dag *digraph.DAG, status *models.DAGRunStatus, rootRun digraph.DAGRunRef, stepName string) error {
func executeRetry(ctx *Context, dag *core.DAG, status *execution.DAGRunStatus, rootRun execution.DAGRunRef, stepName string) error {
logger.Debug(ctx, "Executing dag-run retry", "dag", dag.Name, "runId", status.DAGRunID, "step", stepName)
// We use the same log file for the retry as the original run.

View File

@ -6,7 +6,7 @@ import (
"testing"
"github.com/dagu-org/dagu/internal/cmd"
"github.com/dagu-org/dagu/internal/digraph/status"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/test"
"github.com/stretchr/testify/require"
)
@ -23,23 +23,23 @@ steps:
// Run a DAG.
args := []string{"start", `--params="foo"`, dagFile.Location}
th.RunCommand(t, cmd.CmdStart(), test.CmdTest{Args: args})
th.RunCommand(t, cmd.Start(), test.CmdTest{Args: args})
// Find the dag-run ID.
cli := th.DAGStore
dagStore := th.DAGStore
ctx := context.Background()
dag, err := cli.GetMetadata(ctx, dagFile.Location)
dag, err := dagStore.GetMetadata(ctx, dagFile.Location)
require.NoError(t, err)
dagRunStatus, err := th.DAGRunMgr.GetLatestStatus(ctx, dag)
require.NoError(t, err)
require.Equal(t, dagRunStatus.Status, status.Success)
require.Equal(t, dagRunStatus.Status, core.Success)
require.NotNil(t, dagRunStatus.Status)
// Retry with the dag-run ID using file path.
args = []string{"retry", fmt.Sprintf("--run-id=%s", dagRunStatus.DAGRunID), dagFile.Location}
th.RunCommand(t, cmd.CmdRetry(), test.CmdTest{
th.RunCommand(t, cmd.Retry(), test.CmdTest{
Args: args,
ExpectedOut: []string{`[1=foo]`},
})
@ -56,23 +56,23 @@ steps:
// Run a DAG.
args := []string{"start", `--params="bar"`, dagFile.Location}
th.RunCommand(t, cmd.CmdStart(), test.CmdTest{Args: args})
th.RunCommand(t, cmd.Start(), test.CmdTest{Args: args})
// Find the dag-run ID.
cli := th.DAGStore
dagStore := th.DAGStore
ctx := context.Background()
dag, err := cli.GetMetadata(ctx, dagFile.Location)
dag, err := dagStore.GetMetadata(ctx, dagFile.Location)
require.NoError(t, err)
dagRunStatus, err := th.DAGRunMgr.GetLatestStatus(ctx, dag)
require.NoError(t, err)
require.Equal(t, dagRunStatus.Status, status.Success)
require.Equal(t, dagRunStatus.Status, core.Success)
require.NotNil(t, dagRunStatus.Status)
// Retry with the dag-run ID using DAG name.
args = []string{"retry", fmt.Sprintf("--run-id=%s", dagRunStatus.DAGRunID), dag.Name}
th.RunCommand(t, cmd.CmdRetry(), test.CmdTest{
th.RunCommand(t, cmd.Retry(), test.CmdTest{
Args: args,
ExpectedOut: []string{`[1=bar]`},
})

View File

@ -3,11 +3,11 @@ package cmd
import (
"fmt"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/spf13/cobra"
)
func CmdScheduler() *cobra.Command {
func Scheduler() *cobra.Command {
return NewCommand(
&cobra.Command{
Use: "scheduler [flags]",

View File

@ -16,7 +16,7 @@ func TestSchedulerCommand(t *testing.T) {
th.Cancel()
}()
th.RunCommand(t, cmd.CmdScheduler(), test.CmdTest{
th.RunCommand(t, cmd.Scheduler(), test.CmdTest{
Args: []string{"scheduler"},
ExpectedOut: []string{"Scheduler started"},
})
@ -28,8 +28,8 @@ func TestSchedulerCommand(t *testing.T) {
th.Cancel()
}()
th.RunCommand(t, cmd.CmdScheduler(), test.CmdTest{
Args: []string{"scheduler", "--config", test.TestdataPath(t, "cmd/config_test.yaml")},
th.RunCommand(t, cmd.Scheduler(), test.CmdTest{
Args: []string{"scheduler", "--config", test.TestdataPath(t, "cli/config_test.yaml")},
ExpectedOut: []string{"dagu_test"},
})
})

View File

@ -3,11 +3,11 @@ package cmd
import (
"fmt"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/spf13/cobra"
)
func CmdServer() *cobra.Command {
func Server() *cobra.Command {
return NewCommand(
&cobra.Command{
Use: "server [flags]",

View File

@ -19,7 +19,7 @@ func TestServerCommand(t *testing.T) {
th.Cancel()
}()
port := findPort(t)
th.RunCommand(t, cmd.CmdServer(), test.CmdTest{
th.RunCommand(t, cmd.Server(), test.CmdTest{
Args: []string{"server", fmt.Sprintf("--port=%s", port)},
ExpectedOut: []string{"Server is starting", port},
})
@ -31,8 +31,8 @@ func TestServerCommand(t *testing.T) {
time.Sleep(time.Millisecond * 500)
th.Cancel()
}()
th.RunCommand(t, cmd.CmdServer(), test.CmdTest{
Args: []string{"server", "--config", test.TestdataPath(t, "cmd/config_test.yaml")},
th.RunCommand(t, cmd.Server(), test.CmdTest{
Args: []string{"server", "--config", test.TestdataPath(t, "cli/config_test.yaml")},
ExpectedOut: []string{"54321"},
})
})

View File

@ -6,16 +6,16 @@ import (
"os"
"path/filepath"
"github.com/dagu-org/dagu/internal/agent"
"github.com/dagu-org/dagu/internal/cmd/dagpicker"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/builder"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/models"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/dagu-org/dagu/internal/core/spec"
"github.com/dagu-org/dagu/internal/runtime/agent"
"github.com/spf13/cobra"
"golang.org/x/term"
"github.com/dagu-org/dagu/internal/stringutil"
"github.com/dagu-org/dagu/internal/common/stringutil"
)
// Errors for start command
@ -30,8 +30,8 @@ var (
ErrDAGRunIDTooLong = errors.New("dag-run ID length must be less than 64 characters")
)
// CmdStart creates and returns a cobra command for starting a dag-run
func CmdStart() *cobra.Command {
// Start creates and returns a cobra command for starting a dag-run
func Start() *cobra.Command {
return NewCommand(
&cobra.Command{
Use: "start [flags] <DAG definition> [-- param1 param2 ...]",
@ -80,7 +80,7 @@ func runStart(ctx *Context, args []string) error {
// Handle child dag-run if applicable
if isChildDAGRun {
// Parse parent execution reference
parent, err := digraph.ParseDAGRunRef(parentRef)
parent, err := execution.ParseDAGRunRef(parentRef)
if err != nil {
return fmt.Errorf("failed to parse parent dag-run reference: %w", err)
}
@ -147,7 +147,7 @@ var (
)
// tryExecuteDAG tries to run the DAG within the max concurrent run config
func tryExecuteDAG(ctx *Context, dag *digraph.DAG, dagRunID string, root digraph.DAGRunRef) error {
func tryExecuteDAG(ctx *Context, dag *core.DAG, dagRunID string, root execution.DAGRunRef) error {
if err := ctx.ProcStore.TryLock(ctx, dag.ProcGroup()); err != nil {
logger.Debug(ctx, "failed to lock process group", "err", err)
return errMaxRunReached
@ -168,7 +168,7 @@ func tryExecuteDAG(ctx *Context, dag *digraph.DAG, dagRunID string, root digraph
}
// Acquire process handle
proc, err := ctx.ProcStore.Acquire(ctx, dag.ProcGroup(), digraph.NewDAGRunRef(dag.Name, dagRunID))
proc, err := ctx.ProcStore.Acquire(ctx, dag.ProcGroup(), execution.NewDAGRunRef(dag.Name, dagRunID))
if err != nil {
logger.Debug(ctx, "failed to acquire process handle", "err", err)
return fmt.Errorf("failed to acquire process handle: %w", errMaxRunReached)
@ -181,7 +181,7 @@ func tryExecuteDAG(ctx *Context, dag *digraph.DAG, dagRunID string, root digraph
// Unlock the process group
ctx.ProcStore.Unlock(ctx, dag.ProcGroup())
return executeDAGRun(ctx, dag, digraph.DAGRunRef{}, dagRunID, root)
return executeDAGRun(ctx, dag, execution.DAGRunRef{}, dagRunID, root)
}
// getDAGRunInfo extracts and validates dag-run ID and references from command flags
@ -219,7 +219,7 @@ func getDAGRunInfo(ctx *Context) (dagRunID, rootDAGRun, parentDAGRun string, isC
}
// loadDAGWithParams loads the DAG and its parameters from command arguments
func loadDAGWithParams(ctx *Context, args []string) (*digraph.DAG, string, error) {
func loadDAGWithParams(ctx *Context, args []string) (*core.DAG, string, error) {
var dagPath string
var interactiveParams string
@ -241,7 +241,7 @@ func loadDAGWithParams(ctx *Context, args []string) (*digraph.DAG, string, error
// Load DAG metadata first to pass to the picker
// This will be updated when user selects a DAG
var tempDAG *digraph.DAG
var tempDAG *core.DAG
// Show unified interactive UI
result, err := dagpicker.PickDAGInteractive(ctx, dagStore, tempDAG)
@ -261,9 +261,9 @@ func loadDAGWithParams(ctx *Context, args []string) (*digraph.DAG, string, error
}
// Prepare load options with base configuration
loadOpts := []builder.LoadOption{
builder.WithBaseConfig(ctx.Config.Paths.BaseConfig),
builder.WithDAGsDir(ctx.Config.Paths.DAGsDir),
loadOpts := []spec.LoadOption{
spec.WithBaseConfig(ctx.Config.Paths.BaseConfig),
spec.WithDAGsDir(ctx.Config.Paths.DAGsDir),
}
// Load parameters from command line arguments
@ -273,10 +273,10 @@ func loadDAGWithParams(ctx *Context, args []string) (*digraph.DAG, string, error
// Check if parameters are provided after "--"
if argsLenAtDash := ctx.Command.ArgsLenAtDash(); argsLenAtDash != -1 && len(args) > 0 {
// Get parameters from command line arguments after "--"
loadOpts = append(loadOpts, builder.WithParams(args[argsLenAtDash:]))
loadOpts = append(loadOpts, spec.WithParams(args[argsLenAtDash:]))
} else if interactiveParams != "" {
// Use interactive parameters
loadOpts = append(loadOpts, builder.WithParams(stringutil.RemoveQuotes(interactiveParams)))
loadOpts = append(loadOpts, spec.WithParams(stringutil.RemoveQuotes(interactiveParams)))
params = interactiveParams
} else {
// Get parameters from flags
@ -284,11 +284,11 @@ func loadDAGWithParams(ctx *Context, args []string) (*digraph.DAG, string, error
if err != nil {
return nil, "", fmt.Errorf("failed to get parameters: %w", err)
}
loadOpts = append(loadOpts, builder.WithParams(stringutil.RemoveQuotes(params)))
loadOpts = append(loadOpts, spec.WithParams(stringutil.RemoveQuotes(params)))
}
// Load the DAG from the specified file
dag, err := builder.Load(ctx, dagPath, loadOpts...)
dag, err := spec.Load(ctx, dagPath, loadOpts...)
if err != nil {
return nil, "", fmt.Errorf("failed to load DAG from %s: %w", dagPath, err)
}
@ -297,22 +297,22 @@ func loadDAGWithParams(ctx *Context, args []string) (*digraph.DAG, string, error
}
// determineRootDAGRun creates or parses the root execution reference
func determineRootDAGRun(isChildDAGRun bool, rootDAGRun string, dag *digraph.DAG, dagRunID string) (digraph.DAGRunRef, error) {
func determineRootDAGRun(isChildDAGRun bool, rootDAGRun string, dag *core.DAG, dagRunID string) (execution.DAGRunRef, error) {
if isChildDAGRun {
// Parse the rootDAGRun execution reference for child dag-runs
rootDAGRun, err := digraph.ParseDAGRunRef(rootDAGRun)
rootDAGRun, err := execution.ParseDAGRunRef(rootDAGRun)
if err != nil {
return digraph.DAGRunRef{}, fmt.Errorf("failed to parse root exec ref: %w", err)
return execution.DAGRunRef{}, fmt.Errorf("failed to parse root exec ref: %w", err)
}
return rootDAGRun, nil
}
// Create a new root execution reference for root execution
return digraph.NewDAGRunRef(dag.Name, dagRunID), nil
return execution.NewDAGRunRef(dag.Name, dagRunID), nil
}
// handleChildDAGRun processes a child dag-run, checking for previous runs
func handleChildDAGRun(ctx *Context, dag *digraph.DAG, dagRunID string, params string, root digraph.DAGRunRef, parent digraph.DAGRunRef) error {
func handleChildDAGRun(ctx *Context, dag *core.DAG, dagRunID string, params string, root execution.DAGRunRef, parent execution.DAGRunRef) error {
// Log child dag-run execution
logger.Info(ctx, "Executing child dag-run",
"dag", dag.Name,
@ -332,7 +332,7 @@ func handleChildDAGRun(ctx *Context, dag *digraph.DAG, dagRunID string, params s
// Look for existing execution childAttempt
childAttempt, err := ctx.DAGRunStore.FindChildAttempt(ctx, root, dagRunID)
if errors.Is(err, models.ErrDAGRunIDNotFound) {
if errors.Is(err, execution.ErrDAGRunIDNotFound) {
// If the dag-run ID is not found, proceed with new execution
return executeDAGRun(ctx, dag, parent, dagRunID, root)
}
@ -351,7 +351,7 @@ func handleChildDAGRun(ctx *Context, dag *digraph.DAG, dagRunID string, params s
}
// executeDAGRun handles the actual execution of a DAG
func executeDAGRun(ctx *Context, d *digraph.DAG, parent digraph.DAGRunRef, dagRunID string, root digraph.DAGRunRef) error {
func executeDAGRun(ctx *Context, d *core.DAG, parent execution.DAGRunRef, dagRunID string, root execution.DAGRunRef) error {
// Open the log file for the scheduler. The log file will be used for future
// execution for the same DAG/dag-run ID between attempts.
logFile, err := ctx.OpenLogFile(d, dagRunID)

View File

@ -60,7 +60,7 @@ steps:
for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
th.RunCommand(t, cmd.CmdStart(), tc)
th.RunCommand(t, cmd.Start(), tc)
})
}
}
@ -77,11 +77,11 @@ steps:
dagFile := th.CreateDAGFile(t, "test.yaml", dagContent)
// Providing a DAG path should work
cmd := cmd.CmdStart()
cmd.SetArgs([]string{dagFile})
cli := cmd.Start()
cli.SetArgs([]string{dagFile})
// The actual execution might fail for other reasons in test environment,
// but it should accept the DAG file argument
_ = cmd.Execute()
_ = cli.Execute()
})
t.Run("TerminalDetectionFunctionAvailable", func(t *testing.T) {
@ -108,12 +108,12 @@ steps:
`
dagFile := th.CreateDAGFile(t, "test-params.yaml", dagContent)
cmd := cmd.CmdStart()
cmd.SetArgs([]string{dagFile, "--", "KEY1=value1", "KEY2=value2"})
cli := cmd.Start()
cli.SetArgs([]string{dagFile, "--", "KEY1=value1", "KEY2=value2"})
// Execute will fail due to missing context setup, but we're testing
// that the command accepts the arguments
_ = cmd.Execute()
_ = cli.Execute()
})
t.Run("ShouldAcceptParamsFlag", func(t *testing.T) {
@ -126,11 +126,11 @@ steps:
`
dagFile := th.CreateDAGFile(t, "test-params-flag.yaml", dagContent)
cmd := cmd.CmdStart()
cmd.SetArgs([]string{dagFile, "--params", "KEY=value"})
cli := cmd.Start()
cli.SetArgs([]string{dagFile, "--params", "KEY=value"})
// Execute will fail due to missing context setup, but we're testing
// that the command accepts the arguments
_ = cmd.Execute()
_ = cli.Execute()
})
}

View File

@ -7,11 +7,11 @@ import (
"syscall"
"time"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/spf13/cobra"
)
func CmdStartAll() *cobra.Command {
func StartAll() *cobra.Command {
return NewCommand(
&cobra.Command{
Use: "start-all [flags]",

View File

@ -16,7 +16,7 @@ func TestStartAllCommand(t *testing.T) {
time.Sleep(time.Millisecond * 500)
th.Cancel()
}()
th.RunCommand(t, cmd.CmdStartAll(), test.CmdTest{
th.RunCommand(t, cmd.StartAll(), test.CmdTest{
Args: []string{
"start-all",
fmt.Sprintf("--port=%s", findPort(t)),
@ -32,10 +32,10 @@ func TestStartAllCommand(t *testing.T) {
time.Sleep(time.Millisecond * 500)
th.Cancel()
}()
th.RunCommand(t, cmd.CmdStartAll(), test.CmdTest{
th.RunCommand(t, cmd.StartAll(), test.CmdTest{
Args: []string{
"start-all",
"--config", test.TestdataPath(t, "cmd/config_startall.yaml"),
"--config", test.TestdataPath(t, "cli/config_startall.yaml"),
fmt.Sprintf("--coordinator.port=%s", findPort(t)),
},
ExpectedOut: []string{"54322", "dagu_test", "Coordinator initialization"},

View File

@ -6,17 +6,16 @@ import (
"strings"
"time"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/status"
"github.com/dagu-org/dagu/internal/models"
"github.com/dagu-org/dagu/internal/stringutil"
"github.com/dagu-org/dagu/internal/common/stringutil"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/fatih/color"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/jedib0t/go-pretty/v6/text"
"github.com/spf13/cobra"
)
func CmdStatus() *cobra.Command {
func Status() *cobra.Command {
return NewCommand(
&cobra.Command{
Use: "status [flags] <DAG name>",
@ -69,7 +68,7 @@ func runStatus(ctx *Context, args []string) error {
return fmt.Errorf("failed to read status from attempt: %w", err)
}
if dagStatus.Status == status.Running {
if dagStatus.Status == core.Running {
realtimeStatus, err := ctx.DAGRunMgr.GetCurrentStatus(ctx, dag, dagRunID)
if err != nil {
return fmt.Errorf("failed to retrieve current status: %w", err)
@ -86,7 +85,7 @@ func runStatus(ctx *Context, args []string) error {
}
// displayDetailedStatus renders a formatted table with DAG run information
func displayDetailedStatus(dag *digraph.DAG, dagStatus *models.DAGRunStatus) {
func displayDetailedStatus(dag *core.DAG, dagStatus *execution.DAGRunStatus) {
// Create header with 80 character width
fmt.Println()
headerColor := color.New(color.FgCyan, color.Bold)
@ -130,7 +129,7 @@ func displayDetailedStatus(dag *digraph.DAG, dagStatus *models.DAGRunStatus) {
t.AppendRow(table.Row{"Duration", stringutil.FormatDuration(duration)})
}
t.AppendRow(table.Row{"Finished At", dagStatus.FinishedAt})
} else if dagStatus.Status == status.Running && !startedAt.IsZero() {
} else if dagStatus.Status == core.Running && !startedAt.IsZero() {
elapsed := time.Since(startedAt)
t.AppendRow(table.Row{"Running For", stringutil.FormatDuration(elapsed)})
}
@ -166,27 +165,27 @@ func displayDetailedStatus(dag *digraph.DAG, dagStatus *models.DAGRunStatus) {
// Additional status-specific messages
fmt.Println()
switch dagStatus.Status {
case status.Running:
case core.Running:
fmt.Printf("%s The DAG is currently running. Use 'dagu stop %s' to stop it.\n",
color.YellowString("→"), dag.Name)
case status.Error:
case core.Error:
fmt.Printf("%s The DAG failed. Use 'dagu retry --run-id=%s %s' to retry.\n",
color.RedString("✗"), dagStatus.DAGRunID, dag.Name)
case status.Success:
case core.Success:
fmt.Printf("%s The DAG completed successfully.\n", color.GreenString("✓"))
case status.PartialSuccess:
case core.PartialSuccess:
fmt.Printf("%s The DAG completed with partial success.\n", color.YellowString("⚠"))
case status.Cancel:
case core.Cancel:
fmt.Printf("%s The DAG was cancelled.\n", color.YellowString("⚠"))
case status.Queued:
case core.Queued:
fmt.Printf("%s The DAG is queued for execution.\n", color.BlueString("●"))
case status.None:
case core.None:
fmt.Printf("%s The DAG has not been started yet.\n", color.New(color.Faint).Sprint("○"))
}
}
// displayStepSummary shows a summary of all steps in the DAG run
func displayStepSummary(nodes []*models.Node) {
func displayStepSummary(nodes []*execution.Node) {
headerColor := color.New(color.FgCyan, color.Bold)
// Create a boxed header with 80 character width
@ -200,7 +199,7 @@ func displayStepSummary(nodes []*models.Node) {
fmt.Println(strings.Repeat("─", 80))
// Count steps by status
statusCounts := make(map[status.NodeStatus]int)
statusCounts := make(map[core.NodeStatus]int)
for _, node := range nodes {
statusCounts[node.Status]++
}
@ -226,10 +225,10 @@ func displayStepSummary(nodes []*models.Node) {
// Show first few steps and any failed steps
shownSteps := 0
failedSteps := []*models.Node{}
failedSteps := []*execution.Node{}
for _, node := range nodes {
if node.Status == status.NodeError {
if node.Status == core.NodeError {
failedSteps = append(failedSteps, node)
}
@ -247,7 +246,7 @@ func displayStepSummary(nodes []*models.Node) {
if !startedAt.IsZero() && !finishedAt.IsZero() {
duration = stringutil.FormatDuration(finishedAt.Sub(startedAt))
}
} else if node.Status == status.NodeRunning && !startedAt.IsZero() {
} else if node.Status == core.NodeRunning && !startedAt.IsZero() {
duration = stringutil.FormatDuration(time.Since(startedAt))
}
}
@ -425,21 +424,21 @@ func isBinaryContent(data []byte) bool {
}
// formatStatus returns a colored status string
func formatStatus(st status.Status) string {
func formatStatus(st core.Status) string {
switch st {
case status.Success:
case core.Success:
return color.GreenString("Success")
case status.Error:
case core.Error:
return color.RedString("Failed")
case status.PartialSuccess:
case core.PartialSuccess:
return color.YellowString("Partial Success")
case status.Running:
case core.Running:
return color.New(color.FgHiGreen).Sprint("Running")
case status.Cancel:
case core.Cancel:
return color.YellowString("Cancelled")
case status.Queued:
case core.Queued:
return color.BlueString("Queued")
case status.None:
case core.None:
return color.New(color.Faint).Sprint("Not Started")
default:
return st.String()
@ -447,21 +446,21 @@ func formatStatus(st status.Status) string {
}
// formatNodeStatus returns a colored status string for node status
func formatNodeStatus(s status.NodeStatus) string {
func formatNodeStatus(s core.NodeStatus) string {
switch s {
case status.NodeSuccess:
case core.NodeSuccess:
return color.GreenString("Success")
case status.NodeError:
case core.NodeError:
return color.RedString("Failed")
case status.NodeRunning:
case core.NodeRunning:
return color.New(color.FgHiGreen).Sprint("Running")
case status.NodeCancel:
case core.NodeCancel:
return color.YellowString("Cancelled")
case status.NodeSkipped:
case core.NodeSkipped:
return color.New(color.Faint).Sprint("Skipped")
case status.NodePartialSuccess:
case core.NodePartialSuccess:
return color.YellowString("Partial Success")
case status.NodeNone:
case core.NodeNone:
return color.New(color.Faint).Sprint("Not Started")
default:
return fmt.Sprintf("%d", s)
@ -469,7 +468,7 @@ func formatNodeStatus(s status.NodeStatus) string {
}
// formatPID formats the process ID, showing "-" if not available
func formatPID(pid models.PID) string {
func formatPID(pid execution.PID) string {
if pid > 0 {
return fmt.Sprintf("%d", pid)
}
@ -495,10 +494,10 @@ func extractDAGName(ctx *Context, name string) (string, error) {
return name, nil
}
func extractAttemptID(ctx *Context, name, dagRunID string) (models.DAGRunAttempt, error) {
func extractAttemptID(ctx *Context, name, dagRunID string) (execution.DAGRunAttempt, error) {
if dagRunID != "" {
// Retrieve the previous run's record for the specified dag-run ID.
dagRunRef := digraph.NewDAGRunRef(name, dagRunID)
dagRunRef := execution.NewDAGRunRef(name, dagRunID)
att, err := ctx.DAGRunStore.FindAttempt(ctx, dagRunRef)
if err != nil {
return nil, fmt.Errorf("failed to find run data for dag-run ID %s: %w", dagRunID, err)

View File

@ -6,9 +6,8 @@ import (
"time"
"github.com/dagu-org/dagu/internal/cmd"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/status"
"github.com/dagu-org/dagu/internal/models"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/dagu-org/dagu/internal/test"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
@ -27,7 +26,7 @@ func TestStatusCommand(t *testing.T) {
go func() {
// Start a DAG to check the status.
args := []string{"start", dagFile.Location}
th.RunCommand(t, cmd.CmdStart(), test.CmdTest{Args: args})
th.RunCommand(t, cmd.Start(), test.CmdTest{Args: args})
close(done)
}()
@ -41,11 +40,11 @@ func TestStatusCommand(t *testing.T) {
return false
}
return status.Running == dagRunStatus.Status
return core.Running == dagRunStatus.Status
}, time.Second*3, time.Millisecond*50)
// Check the current status - just verify it runs without error
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location})
statusCmd.SilenceErrors = true
@ -56,7 +55,7 @@ func TestStatusCommand(t *testing.T) {
// Stop the DAG.
args := []string{"stop", dagFile.Location}
th.RunCommand(t, cmd.CmdStop(), test.CmdTest{Args: args})
th.RunCommand(t, cmd.Stop(), test.CmdTest{Args: args})
<-done
})
@ -69,7 +68,7 @@ func TestStatusCommand(t *testing.T) {
`)
// Run the DAG to completion
startCmd := cmd.CmdStart()
startCmd := cmd.Start()
startCmd.SetContext(th.Context)
startCmd.SetArgs([]string{dagFile.Location})
startCmd.SilenceErrors = true
@ -82,7 +81,7 @@ func TestStatusCommand(t *testing.T) {
time.Sleep(200 * time.Millisecond)
// Check the status runs without error
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location})
statusCmd.SilenceErrors = true
@ -109,7 +108,7 @@ func TestStatusCommand(t *testing.T) {
// Create a fake failed DAG run for testing status
dagRunID := uuid.Must(uuid.NewV7()).String()
attempt, err := th.DAGRunStore.CreateAttempt(th.Context, dag, time.Now(), dagRunID, models.NewDAGRunAttemptOptions{})
attempt, err := th.DAGRunStore.CreateAttempt(th.Context, dag, time.Now(), dagRunID, execution.NewDAGRunAttemptOptions{})
require.NoError(t, err)
// Open the attempt for writing
@ -117,17 +116,17 @@ func TestStatusCommand(t *testing.T) {
require.NoError(t, err)
// Write a failed status
status := models.DAGRunStatus{
status := execution.DAGRunStatus{
Name: dag.Name,
DAGRunID: dagRunID,
Status: status.Error,
Status: core.Error,
StartedAt: time.Now().Format(time.RFC3339),
FinishedAt: time.Now().Format(time.RFC3339),
AttemptID: attempt.ID(),
Nodes: []*models.Node{
Nodes: []*execution.Node{
{
Step: digraph.Step{Name: "error"},
Status: status.NodeError,
Step: core.Step{Name: "error"},
Status: core.NodeError,
Error: "exit status 1",
},
},
@ -140,7 +139,7 @@ func TestStatusCommand(t *testing.T) {
require.NoError(t, err)
// Check the status runs without error even for failed DAGs
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location})
statusCmd.SilenceErrors = true
@ -162,7 +161,7 @@ steps:
`)
// Run the DAG with custom parameters
startCmd := cmd.CmdStart()
startCmd := cmd.Start()
startCmd.SetContext(th.Context)
startCmd.SetArgs([]string{dagFile.Location, "--params=custom1 custom2"})
startCmd.SilenceErrors = true
@ -175,7 +174,7 @@ steps:
time.Sleep(200 * time.Millisecond)
// Check the status runs without error
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location})
statusCmd.SilenceErrors = true
@ -195,7 +194,7 @@ steps:
runID := uuid.Must(uuid.NewV7()).String()
// Run the DAG with a specific run ID
startCmd := cmd.CmdStart()
startCmd := cmd.Start()
startCmd.SetContext(th.Context)
startCmd.SetArgs([]string{dagFile.Location, "--run-id=" + runID})
startCmd.SilenceErrors = true
@ -208,7 +207,7 @@ steps:
time.Sleep(200 * time.Millisecond)
// Check the status with the specific run ID
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location, "--run-id=" + runID})
statusCmd.SilenceErrors = true
@ -227,7 +226,7 @@ steps:
`)
// Run the DAG twice
startCmd := cmd.CmdStart()
startCmd := cmd.Start()
startCmd.SetContext(th.Context)
startCmd.SetArgs([]string{dagFile.Location})
startCmd.SilenceErrors = true
@ -239,7 +238,7 @@ steps:
// Wait a bit to ensure different timestamps
time.Sleep(200 * time.Millisecond)
startCmd2 := cmd.CmdStart()
startCmd2 := cmd.Start()
startCmd2.SetContext(th.Context)
startCmd2.SetArgs([]string{dagFile.Location})
startCmd2.SilenceErrors = true
@ -252,7 +251,7 @@ steps:
time.Sleep(200 * time.Millisecond)
// Status without run ID should show the latest run
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location})
statusCmd.SilenceErrors = true
@ -287,7 +286,7 @@ steps:
// Create a fake DAG run with skipped steps
dagRunID := uuid.Must(uuid.NewV7()).String()
attempt, err := th.DAGRunStore.CreateAttempt(th.Context, dag, time.Now(), dagRunID, models.NewDAGRunAttemptOptions{})
attempt, err := th.DAGRunStore.CreateAttempt(th.Context, dag, time.Now(), dagRunID, execution.NewDAGRunAttemptOptions{})
require.NoError(t, err)
// Open the attempt for writing
@ -295,24 +294,24 @@ steps:
require.NoError(t, err)
// Write a status with skipped steps
status := models.DAGRunStatus{
status := execution.DAGRunStatus{
Name: dag.Name,
DAGRunID: dagRunID,
Status: status.Error,
Status: core.Error,
StartedAt: time.Now().Format(time.RFC3339),
FinishedAt: time.Now().Format(time.RFC3339),
AttemptID: attempt.ID(),
Nodes: []*models.Node{
Nodes: []*execution.Node{
{
Step: digraph.Step{Name: "check"},
Status: status.NodeError,
Step: core.Step{Name: "check"},
Status: core.NodeError,
Error: "exit status 1",
StartedAt: time.Now().Format(time.RFC3339),
FinishedAt: time.Now().Format(time.RFC3339),
},
{
Step: digraph.Step{Name: "skipped"},
Status: status.NodeSkipped,
Step: core.Step{Name: "skipped"},
Status: core.NodeSkipped,
StartedAt: "-",
FinishedAt: time.Now().Format(time.RFC3339),
},
@ -326,7 +325,7 @@ steps:
require.NoError(t, err)
// Check the status runs without error
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location})
statusCmd.SilenceErrors = true
@ -348,7 +347,7 @@ steps:
go func() {
// Start a long-running DAG
args := []string{"start", dagFile.Location}
th.RunCommand(t, cmd.CmdStart(), test.CmdTest{Args: args})
th.RunCommand(t, cmd.Start(), test.CmdTest{Args: args})
close(done)
}()
@ -362,18 +361,18 @@ steps:
if err != nil {
return false
}
return status.Running == dagRunStatus.Status
return core.Running == dagRunStatus.Status
}, time.Second*3, time.Millisecond*50)
// Cancel the DAG
th.RunCommand(t, cmd.CmdStop(), test.CmdTest{
th.RunCommand(t, cmd.Stop(), test.CmdTest{
Args: []string{"stop", dagFile.Location},
})
<-done
// Check the status runs without error
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location})
statusCmd.SilenceErrors = true
@ -411,7 +410,7 @@ steps:
`)
// Run the DAG
startCmd := cmd.CmdStart()
startCmd := cmd.Start()
startCmd.SetContext(th.Context)
startCmd.SetArgs([]string{dagFile.Location})
startCmd.SilenceErrors = true
@ -424,7 +423,7 @@ steps:
time.Sleep(200 * time.Millisecond)
// Check the status runs without error
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location})
statusCmd.SilenceErrors = true
@ -443,7 +442,7 @@ steps:
`)
// Run the DAG
startCmd := cmd.CmdStart()
startCmd := cmd.Start()
startCmd.SetContext(th.Context)
startCmd.SetArgs([]string{dagFile.Location})
startCmd.SilenceErrors = true
@ -456,7 +455,7 @@ steps:
time.Sleep(200 * time.Millisecond)
// Check status using DAG name instead of file path
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location})
statusCmd.SilenceErrors = true
@ -478,7 +477,7 @@ steps:
go func() {
// Start a DAG to check the PID
args := []string{"start", dagFile.Location}
th.RunCommand(t, cmd.CmdStart(), test.CmdTest{Args: args})
th.RunCommand(t, cmd.Start(), test.CmdTest{Args: args})
close(done)
}()
@ -492,11 +491,11 @@ steps:
if err != nil {
return false
}
return status.Running == dagRunStatus.Status
return core.Running == dagRunStatus.Status
}, time.Second*3, time.Millisecond*50)
// Check the status runs without error
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location})
statusCmd.SilenceErrors = true
@ -506,7 +505,7 @@ steps:
require.NoError(t, err)
// Stop the DAG
th.RunCommand(t, cmd.CmdStop(), test.CmdTest{
th.RunCommand(t, cmd.Stop(), test.CmdTest{
Args: []string{"stop", dagFile.Location},
})
<-done
@ -521,7 +520,7 @@ steps:
`)
// Run the DAG
startCmd := cmd.CmdStart()
startCmd := cmd.Start()
startCmd.SetContext(th.Context)
startCmd.SetArgs([]string{dagFile.Location})
startCmd.SilenceErrors = true
@ -542,7 +541,7 @@ steps:
require.NoError(t, err)
// Check the status runs without error
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location})
statusCmd.SilenceErrors = true
@ -564,7 +563,7 @@ steps:
`)
// Run the DAG
startCmd := cmd.CmdStart()
startCmd := cmd.Start()
startCmd.SetContext(th.Context)
startCmd.SetArgs([]string{dagFile.Location})
startCmd.SilenceErrors = true
@ -577,7 +576,7 @@ steps:
time.Sleep(200 * time.Millisecond)
// Check the status runs without error (it shows the log preview)
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location})
statusCmd.SilenceErrors = true
@ -605,7 +604,7 @@ steps:
// Create a fake DAG run with binary log content
dagRunID := uuid.Must(uuid.NewV7()).String()
attempt, err := th.DAGRunStore.CreateAttempt(th.Context, dag, time.Now(), dagRunID, models.NewDAGRunAttemptOptions{})
attempt, err := th.DAGRunStore.CreateAttempt(th.Context, dag, time.Now(), dagRunID, execution.NewDAGRunAttemptOptions{})
require.NoError(t, err)
// Open the attempt for writing
@ -613,17 +612,17 @@ steps:
require.NoError(t, err)
// Write a status with fake binary log paths
status := models.DAGRunStatus{
status := execution.DAGRunStatus{
Name: dag.Name,
DAGRunID: dagRunID,
Status: status.Success,
Status: core.Success,
StartedAt: time.Now().Format(time.RFC3339),
FinishedAt: time.Now().Format(time.RFC3339),
AttemptID: attempt.ID(),
Nodes: []*models.Node{
Nodes: []*execution.Node{
{
Step: digraph.Step{Name: "binary_output"},
Status: status.NodeSuccess,
Step: core.Step{Name: "binary_output"},
Status: core.NodeSuccess,
Stdout: "/nonexistent/binary.log", // This will trigger "(unable to read)"
Stderr: "",
},
@ -637,7 +636,7 @@ steps:
require.NoError(t, err)
// Check the status runs without error even with binary content
statusCmd := cmd.CmdStatus()
statusCmd := cmd.Status()
statusCmd.SetContext(th.Context)
statusCmd.SetArgs([]string{dagFile.Location})
statusCmd.SilenceErrors = true

View File

@ -3,13 +3,14 @@ package cmd
import (
"fmt"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/builder"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
"github.com/dagu-org/dagu/internal/core/spec"
"github.com/spf13/cobra"
)
func CmdStop() *cobra.Command {
func Stop() *cobra.Command {
return NewCommand(
&cobra.Command{
Use: "stop [flags] <DAG name>",
@ -48,10 +49,10 @@ func runStop(ctx *Context, args []string) error {
return fmt.Errorf("failed to extract DAG name: %w", err)
}
var dag *digraph.DAG
var dag *core.DAG
if dagRunID != "" {
// Retrieve the previous run's history record for the specified dag-run ID.
ref := digraph.NewDAGRunRef(name, dagRunID)
ref := execution.NewDAGRunRef(name, dagRunID)
rec, err := ctx.DAGRunStore.FindAttempt(ctx, ref)
if err != nil {
return fmt.Errorf("failed to find the record for dag-run ID %s: %w", dagRunID, err)
@ -63,7 +64,7 @@ func runStop(ctx *Context, args []string) error {
}
dag = d
} else {
d, err := builder.Load(ctx, args[0], builder.WithBaseConfig(ctx.Config.Paths.BaseConfig))
d, err := spec.Load(ctx, args[0], spec.WithBaseConfig(ctx.Config.Paths.BaseConfig))
if err != nil {
return fmt.Errorf("failed to load DAG from %s: %w", args[0], err)
}

View File

@ -5,7 +5,7 @@ import (
"time"
"github.com/dagu-org/dagu/internal/cmd"
"github.com/dagu-org/dagu/internal/digraph/status"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/test"
"github.com/google/uuid"
)
@ -23,22 +23,22 @@ func TestStopCommand(t *testing.T) {
go func() {
// Start the DAG to stop.
args := []string{"start", dag.Location}
th.RunCommand(t, cmd.CmdStart(), test.CmdTest{Args: args})
th.RunCommand(t, cmd.Start(), test.CmdTest{Args: args})
close(done)
}()
time.Sleep(time.Millisecond * 100)
// Wait for the dag-run running.
dag.AssertLatestStatus(t, status.Running)
dag.AssertLatestStatus(t, core.Running)
// Stop the dag-run.
th.RunCommand(t, cmd.CmdStop(), test.CmdTest{
th.RunCommand(t, cmd.Stop(), test.CmdTest{
Args: []string{"stop", dag.Location},
ExpectedOut: []string{"stopped"}})
// Check the dag-run is stopped.
dag.AssertLatestStatus(t, status.Cancel)
dag.AssertLatestStatus(t, core.Cancel)
<-done
})
t.Run("StopDAGRunWithRunID", func(t *testing.T) {
@ -54,22 +54,22 @@ func TestStopCommand(t *testing.T) {
go func() {
// Start the dag-run to stop.
args := []string{"start", "--run-id=" + dagRunID, dag.Location}
th.RunCommand(t, cmd.CmdStart(), test.CmdTest{Args: args})
th.RunCommand(t, cmd.Start(), test.CmdTest{Args: args})
close(done)
}()
time.Sleep(time.Millisecond * 100)
// Wait for the dag-run running
dag.AssertLatestStatus(t, status.Running)
dag.AssertLatestStatus(t, core.Running)
// Stop the dag-run with a specific run ID.
th.RunCommand(t, cmd.CmdStop(), test.CmdTest{
th.RunCommand(t, cmd.Stop(), test.CmdTest{
Args: []string{"stop", dag.Location, "--run-id=" + dagRunID},
ExpectedOut: []string{"stopped"}})
// Check the dag-run is stopped.
dag.AssertLatestStatus(t, status.Cancel)
dag.AssertLatestStatus(t, core.Cancel)
<-done
})
}

View File

@ -5,19 +5,19 @@ import (
"fmt"
"strings"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/builder"
"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/spec"
"github.com/spf13/cobra"
)
// CmdValidate creates the 'validate' CLI command that checks a DAG spec for errors.
// Validate creates the 'validate' CLI command that checks a DAG spec for errors.
//
// It follows the same validation logic used by the API's UpdateDAGSpec handler:
// - Load the YAML without evaluation
// - Run DAG.Validate()
//
// The command prints validation results and any errors found.
func CmdValidate() *cobra.Command {
func Validate() *cobra.Command {
return NewCommand(
&cobra.Command{
Use: "validate [flags] <DAG definition>",
@ -36,11 +36,11 @@ similar to the server-side spec validation.`,
func runValidate(ctx *Context, args []string) error {
// Try loading the DAG without evaluation, resolving relative names against DAGsDir
dag, err := builder.Load(
dag, err := spec.Load(
ctx,
args[0],
builder.WithoutEval(),
builder.WithDAGsDir(ctx.Config.Paths.DAGsDir),
spec.WithoutEval(),
spec.WithDAGsDir(ctx.Config.Paths.DAGsDir),
)
if err != nil {
@ -62,7 +62,7 @@ func runValidate(ctx *Context, args []string) error {
func formatValidationErrors(file string, err error) string {
// Collect message strings
var msgs []string
var list digraph.ErrorList
var list core.ErrorList
if errors.As(err, &list) {
msgs = list.ToStringList()
} else {

View File

@ -5,10 +5,11 @@ import (
"os/exec"
"testing"
"strings"
"github.com/dagu-org/dagu/internal/cmd"
"github.com/dagu-org/dagu/internal/test"
"github.com/spf13/cobra"
"strings"
)
func TestValidateCommand(t *testing.T) {
@ -79,7 +80,7 @@ func TestValidateCommand_HelperExit(t *testing.T) {
dagFile := os.Getenv("DAG_FILE_PATH")
root := &cobra.Command{Use: "root"}
root.AddCommand(cmd.CmdValidate())
root.AddCommand(cmd.Validate())
root.SetArgs([]string{"validate", dagFile})
// This will os.Exit(1) on validation failure via NewCommand wrapper

View File

@ -1,11 +1,11 @@
package cmd
import (
"github.com/dagu-org/dagu/internal/config"
"github.com/dagu-org/dagu/internal/common/config"
"github.com/spf13/cobra"
)
func CmdVersion() *cobra.Command {
func Version() *cobra.Command {
return &cobra.Command{
Use: "version",
Short: "Display the Dagu version information",

View File

@ -3,8 +3,8 @@ package cmd
import (
"fmt"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/worker"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/dagu-org/dagu/internal/service/worker"
"github.com/spf13/cobra"
)

View File

@ -9,9 +9,9 @@ import (
func TestWorkerCommand(t *testing.T) {
t.Run("WorkerCommandExists", func(t *testing.T) {
cmd := cmd.CmdWorker()
require.NotNil(t, cmd)
require.Equal(t, "worker [flags]", cmd.Use)
require.Equal(t, "Start a worker that polls the coordinator for tasks", cmd.Short)
cli := cmd.CmdWorker()
require.NotNil(t, cli)
require.Equal(t, "worker [flags]", cli.Use)
require.Equal(t, "Start a worker that polls the coordinator for tasks", cli.Short)
})
}

View File

@ -10,7 +10,7 @@ import (
"strconv"
"strings"
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/common/logger"
"github.com/itchyny/gojq"
)

View File

@ -1,4 +1,4 @@
package digraph
package collections
import (
"encoding/json"

View File

@ -1,4 +1,4 @@
package digraph
package collections_test
import (
"encoding/json"
@ -6,6 +6,8 @@ import (
"strings"
"testing"
"github.com/dagu-org/dagu/internal/common/collections"
"github.com/dagu-org/dagu/internal/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -13,12 +15,12 @@ import (
func TestDeterministicMap_MarshalJSON(t *testing.T) {
tests := []struct {
name string
input DeterministicMap
input collections.DeterministicMap
expected string
}{
{
name: "EmptyMap",
input: DeterministicMap{},
input: collections.DeterministicMap{},
expected: `{}`,
},
{
@ -28,14 +30,14 @@ func TestDeterministicMap_MarshalJSON(t *testing.T) {
},
{
name: "SingleKey",
input: DeterministicMap{
input: collections.DeterministicMap{
"key": "value",
},
expected: `{"key":"value"}`,
},
{
name: "MultipleKeysSorted",
input: DeterministicMap{
input: collections.DeterministicMap{
"zebra": "animal",
"apple": "fruit",
"banana": "fruit",
@ -45,7 +47,7 @@ func TestDeterministicMap_MarshalJSON(t *testing.T) {
},
{
name: "SpecialCharacters",
input: DeterministicMap{
input: collections.DeterministicMap{
"key with spaces": "value with spaces",
"key\"quotes\"": "value\"quotes\"",
"key\nnewline": "value\nnewline",
@ -74,13 +76,13 @@ func TestDeterministicMap_UnmarshalJSON(t *testing.T) {
tests := []struct {
name string
input string
expected DeterministicMap
expected collections.DeterministicMap
wantErr bool
}{
{
name: "EmptyObject",
input: `{}`,
expected: DeterministicMap{},
expected: collections.DeterministicMap{},
},
{
name: "Null",
@ -90,7 +92,7 @@ func TestDeterministicMap_UnmarshalJSON(t *testing.T) {
{
name: "SimpleObject",
input: `{"key1": "value1", "key2": "value2"}`,
expected: DeterministicMap{
expected: collections.DeterministicMap{
"key1": "value1",
"key2": "value2",
},
@ -104,7 +106,7 @@ func TestDeterministicMap_UnmarshalJSON(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var result DeterministicMap
var result collections.DeterministicMap
err := json.Unmarshal([]byte(tt.input), &result)
if tt.wantErr {
@ -119,19 +121,19 @@ func TestDeterministicMap_UnmarshalJSON(t *testing.T) {
}
func TestDeterministicMap_Merge(t *testing.T) {
base := DeterministicMap{
base := collections.DeterministicMap{
"key1": "value1",
"key2": "value2",
}
other := DeterministicMap{
other := collections.DeterministicMap{
"key2": "overridden",
"key3": "value3",
}
result := base.Merge(other)
expected := DeterministicMap{
expected := collections.DeterministicMap{
"key1": "value1",
"key2": "overridden",
"key3": "value3",
@ -143,7 +145,7 @@ func TestDeterministicMap_Merge(t *testing.T) {
}
func TestDeterministicMap_String(t *testing.T) {
m := DeterministicMap{
m := collections.DeterministicMap{
"b": "2",
"a": "1",
"c": "3",
@ -157,12 +159,12 @@ func TestDeterministicMap_String(t *testing.T) {
func TestDeterministicMap_EdgeCases(t *testing.T) {
tests := []struct {
name string
input DeterministicMap
input collections.DeterministicMap
expected string
}{
{
name: "UnicodeCharacters",
input: DeterministicMap{
input: collections.DeterministicMap{
"你好": "世界",
"مرحبا": "عالم",
"emoji": "🌍🚀",
@ -172,7 +174,7 @@ func TestDeterministicMap_EdgeCases(t *testing.T) {
},
{
name: "EmptyStringKeysAndValues",
input: DeterministicMap{
input: collections.DeterministicMap{
"": "empty key",
"empty": "",
"both": "",
@ -181,7 +183,7 @@ func TestDeterministicMap_EdgeCases(t *testing.T) {
},
{
name: "NumericStringKeysSortedLexicographically",
input: DeterministicMap{
input: collections.DeterministicMap{
"1": "one",
"10": "ten",
"2": "two",
@ -192,7 +194,7 @@ func TestDeterministicMap_EdgeCases(t *testing.T) {
},
{
name: "AllJSONSpecialCharacters",
input: DeterministicMap{
input: collections.DeterministicMap{
"tab": "value\twith\ttab",
"newline": "value\nwith\nnewline",
"return": "value\rwith\rreturn",
@ -204,14 +206,14 @@ func TestDeterministicMap_EdgeCases(t *testing.T) {
},
{
name: "VeryLongValues",
input: DeterministicMap{
input: collections.DeterministicMap{
"long": strings.Repeat("a", 10000),
},
expected: fmt.Sprintf(`{"long":"%s"}`, strings.Repeat("a", 10000)),
},
{
name: "CaseSensitiveKeys",
input: DeterministicMap{
input: collections.DeterministicMap{
"Key": "uppercase",
"key": "lowercase",
"KEY": "allcaps",
@ -221,7 +223,7 @@ func TestDeterministicMap_EdgeCases(t *testing.T) {
},
{
name: "BooleanAndNullLikeStrings",
input: DeterministicMap{
input: collections.DeterministicMap{
"bool_true": "true",
"bool_false": "false",
"null_str": "null",
@ -231,7 +233,7 @@ func TestDeterministicMap_EdgeCases(t *testing.T) {
},
{
name: "KeysWithSpecialSortingCharacters",
input: DeterministicMap{
input: collections.DeterministicMap{
"a-b": "dash",
"a_b": "underscore",
"a.b": "dot",
@ -243,7 +245,7 @@ func TestDeterministicMap_EdgeCases(t *testing.T) {
},
{
name: "JsonStringValues",
input: DeterministicMap{
input: collections.DeterministicMap{
"config": `{"timeout": 30, "retries": 3}`,
"array": `["item1", "item2", "item3"]`,
"nested": `{"level1": {"level2": {"value": "deep"}}}`,
@ -254,7 +256,7 @@ func TestDeterministicMap_EdgeCases(t *testing.T) {
},
{
name: "ComplexJsonInJsonScenario",
input: DeterministicMap{
input: collections.DeterministicMap{
"pipeline_config": `{"stages": ["build", "test", "deploy"], "parallel": true}`,
"env_vars": `{"NODE_ENV": "production", "API_KEY": "secret-key-123"}`,
"matrix": `[{"os": "linux", "arch": "amd64"}, {"os": "darwin", "arch": "arm64"}]`,
@ -264,7 +266,7 @@ func TestDeterministicMap_EdgeCases(t *testing.T) {
},
{
name: "MalformedJsonStrings",
input: DeterministicMap{
input: collections.DeterministicMap{
"invalid_json": `{"broken": "json`,
"not_json": `this is not json at all`,
"partial_escape": `{"key": "value with \" quote}`,
@ -292,7 +294,7 @@ func TestDeterministicMap_ConcurrentAccess(t *testing.T) {
func TestDeterministicMap_UnmarshalExistingMap(t *testing.T) {
// Test unmarshaling into an existing map
m := &DeterministicMap{
m := &collections.DeterministicMap{
"existing": "value",
}
@ -307,7 +309,7 @@ func TestDeterministicMap_UnmarshalExistingMap(t *testing.T) {
func TestDeterministicMap_MarshalUnmarshalRoundTrip(t *testing.T) {
// Test that marshal->unmarshal preserves data exactly
original := DeterministicMap{
original := collections.DeterministicMap{
"unicode": "Hello 世界 🌍",
"empty": "",
"spaces": " multiple spaces ",
@ -323,7 +325,7 @@ func TestDeterministicMap_MarshalUnmarshalRoundTrip(t *testing.T) {
require.NoError(t, err)
// Unmarshal
var restored DeterministicMap
var restored collections.DeterministicMap
err = json.Unmarshal(data, &restored)
require.NoError(t, err)
@ -338,9 +340,9 @@ func TestDeterministicMap_MarshalUnmarshalRoundTrip(t *testing.T) {
func TestDeterministicMap_Integration_ParallelItem(t *testing.T) {
// Test how DeterministicMap works within ParallelItem
item := ParallelItem{
item := core.ParallelItem{
Value: "",
Params: DeterministicMap{
Params: collections.DeterministicMap{
"REGION": "us-east-1",
"ENV": "production",
"DEBUG": "true",
@ -356,7 +358,7 @@ func TestDeterministicMap_Integration_ParallelItem(t *testing.T) {
assert.Equal(t, expected, string(data))
// Unmarshal and verify
var restored ParallelItem
var restored core.ParallelItem
err = json.Unmarshal(data, &restored)
require.NoError(t, err)
assert.Equal(t, item.Params, restored.Params)
@ -366,12 +368,12 @@ func TestDeterministicMap_RealWorldChildDAGParams(t *testing.T) {
// Test real-world scenario where child DAGs receive complex JSON parameters
tests := []struct {
name string
params DeterministicMap
params collections.DeterministicMap
expected string
}{
{
name: "DeploymentConfiguration",
params: DeterministicMap{
params: collections.DeterministicMap{
"DEPLOYMENT_CONFIG": `{"service": "api-gateway", "replicas": 3, "resources": {"cpu": "500m", "memory": "1Gi"}}`,
"ENVIRONMENT": "production",
"VERSION": "v1.2.3",
@ -381,7 +383,7 @@ func TestDeterministicMap_RealWorldChildDAGParams(t *testing.T) {
},
{
name: "DataProcessingPipeline",
params: DeterministicMap{
params: collections.DeterministicMap{
"INPUT_SCHEMA": `{"fields": [{"name": "id", "type": "string"}, {"name": "timestamp", "type": "datetime"}]}`,
"TRANSFORM_OPS": `[{"op": "filter", "field": "status", "value": "active"}, {"op": "aggregate", "by": "region"}]`,
"OUTPUT_FORMAT": "parquet",
@ -394,7 +396,7 @@ func TestDeterministicMap_RealWorldChildDAGParams(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create a ParallelItem with complex params
item := ParallelItem{
item := core.ParallelItem{
Params: tt.params,
}

View File

@ -1,4 +1,4 @@
package digraph
package collections
import (
"encoding/json"

View File

@ -1,11 +1,11 @@
package digraph_test
package collections_test
import (
"encoding/json"
"reflect"
"testing"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/common/collections"
)
func TestSyncMap_MarshalJSON(t *testing.T) {
@ -45,7 +45,7 @@ func TestSyncMap_MarshalJSON(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &digraph.SyncMap{}
m := &collections.SyncMap{}
for k, v := range tt.input {
m.Store(k, v)
}
@ -106,7 +106,7 @@ func TestSyncMap_UnmarshalJSON(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &digraph.SyncMap{}
m := &collections.SyncMap{}
err := m.UnmarshalJSON([]byte(tt.input))
if (err != nil) != tt.wantErr {
t.Errorf("SyncMap.UnmarshalJSON() error = %v, wantErr %v", err, tt.wantErr)
@ -129,7 +129,7 @@ func TestSyncMap_UnmarshalJSON(t *testing.T) {
}
func TestSyncMap_RoundTrip(t *testing.T) {
original := &digraph.SyncMap{}
original := &collections.SyncMap{}
original.Store("string", "value")
original.Store("number", float64(42))
original.Store("bool", true)
@ -142,7 +142,7 @@ func TestSyncMap_RoundTrip(t *testing.T) {
}
// Unmarshal
roundTripped := &digraph.SyncMap{}
roundTripped := &collections.SyncMap{}
err = roundTripped.UnmarshalJSON(data)
if err != nil {
t.Fatalf("Failed to unmarshal: %v", err)

View File

@ -4,7 +4,7 @@ import (
"context"
"testing"
"github.com/dagu-org/dagu/internal/config"
"github.com/dagu-org/dagu/internal/common/config"
"github.com/stretchr/testify/assert"
)

View File

@ -5,7 +5,7 @@ import (
"strings"
"testing"
"github.com/dagu-org/dagu/internal/config"
"github.com/dagu-org/dagu/internal/common/config"
"github.com/stretchr/testify/require"
)

View File

@ -12,7 +12,7 @@ import (
"time"
"github.com/adrg/xdg"
"github.com/dagu-org/dagu/internal/fileutil"
"github.com/dagu-org/dagu/internal/common/fileutil"
"github.com/spf13/viper"
)

View File

@ -4,7 +4,7 @@ import (
"os"
"path/filepath"
"github.com/dagu-org/dagu/internal/fileutil"
"github.com/dagu-org/dagu/internal/common/fileutil"
)
// Paths holds various file system path settings used by the application.

View File

@ -6,8 +6,8 @@ import (
"path/filepath"
"testing"
"github.com/dagu-org/dagu/internal/config"
"github.com/dagu-org/dagu/internal/fileutil"
"github.com/dagu-org/dagu/internal/common/config"
"github.com/dagu-org/dagu/internal/common/fileutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Some files were not shown because too many files have changed in this diff Show More