mirror of
https://github.com/dagu-org/dagu.git
synced 2025-12-28 06:34:22 +00:00
[fb-next] Command for migrating history data from v1.16.x to v1.17.x (#969)
This commit is contained in:
parent
5cb885a718
commit
2967d22849
2
.github/workflows/docker-next.yaml
vendored
2
.github/workflows/docker-next.yaml
vendored
@ -55,6 +55,8 @@ jobs:
|
||||
file: Dockerfile
|
||||
platforms: linux/amd64,linux/arm64,linux/arm/v7
|
||||
push: true
|
||||
build-args: |
|
||||
LDFLAGS=-X 'main.version=next-${{ env.SHORT_SHA }}'
|
||||
tags: |
|
||||
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:next
|
||||
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:next-${{ env.SHORT_SHA }}
|
||||
|
||||
6
.github/workflows/docker.yaml
vendored
6
.github/workflows/docker.yaml
vendored
@ -44,6 +44,8 @@ jobs:
|
||||
file: Dockerfile
|
||||
platforms: linux/amd64,linux/arm64,linux/arm/v7
|
||||
push: true
|
||||
build-args: |
|
||||
LDFLAGS=-X 'main.version=${{ env.VERSION }}-${{ github.sha }}'
|
||||
tags: |
|
||||
ghcr.io/${{ github.repository }}:latest
|
||||
ghcr.io/${{ github.repository }}:${{ env.VERSION }}
|
||||
@ -83,6 +85,8 @@ jobs:
|
||||
file: Dockerfile.dev
|
||||
platforms: linux/amd64,linux/arm64,linux/arm/v7
|
||||
push: true
|
||||
build-args: |
|
||||
LDFLAGS=-X 'main.version=${{ env.VERSION }}-${{ github.sha }}'
|
||||
tags: |
|
||||
ghcr.io/${{ github.repository }}:dev
|
||||
ghcr.io/${{ github.repository }}:${{ env.VERSION }}-dev
|
||||
@ -122,6 +126,8 @@ jobs:
|
||||
file: Dockerfile.alpine
|
||||
platforms: linux/amd64,linux/arm64,linux/arm/v7
|
||||
push: true
|
||||
build-args: |
|
||||
LDFLAGS=-X 'main.version=${{ env.VERSION }}-${{ github.sha }}'
|
||||
tags: |
|
||||
ghcr.io/${{ github.repository }}:alpine
|
||||
ghcr.io/${{ github.repository }}:${{ env.VERSION }}-alpine
|
||||
|
||||
8
.vscode/launch.json
vendored
8
.vscode/launch.json
vendored
@ -43,6 +43,14 @@
|
||||
"mode": "auto",
|
||||
"program": "${workspaceFolder}/cmd/",
|
||||
"args": ["enqueue", "${input:pathToSpec}"]
|
||||
},
|
||||
{
|
||||
"name": "Migrate",
|
||||
"type": "go",
|
||||
"request": "launch",
|
||||
"mode": "auto",
|
||||
"program": "${workspaceFolder}/cmd/",
|
||||
"args": ["migrate", "history"]
|
||||
}
|
||||
],
|
||||
"inputs": [
|
||||
|
||||
@ -45,7 +45,14 @@ We're excited to announce the beta release of Dagu 1.17.0! This release brings m
|
||||
- 🏗️ **API v2**: New `/api/v2` endpoints with refactored schema and better abstractions ([OpenAPI spec](./api/v2/api.yaml))
|
||||
- 🔧 **Various Enhancements**: Including [#925](https://github.com/dagu-org/dagu/issues/925), [#898](https://github.com/dagu-org/dagu/issues/898), [#895](https://github.com/dagu-org/dagu/issues/895), [#868](https://github.com/dagu-org/dagu/issues/868), [#903](https://github.com/dagu-org/dagu/issues/903), [#911](https://github.com/dagu-org/dagu/issues/911), [#913](https://github.com/dagu-org/dagu/issues/913), [#921](https://github.com/dagu-org/dagu/issues/921), [#923](https://github.com/dagu-org/dagu/issues/923), [#887](https://github.com/dagu-org/dagu/issues/887), [#922](https://github.com/dagu-org/dagu/issues/922), [#932](https://github.com/dagu-org/dagu/issues/932), [#962](https://github.com/dagu-org/dagu/issues/962)
|
||||
|
||||
**⚠️ Note on History Data**: Due to internal improvements, history data from 1.16.x requires migration to work with 1.17.0. Most of other functionality remains stable and compatible except for a few changes. We're committed to maintaining full backward compatibility as much as possible in future releases.
|
||||
**⚠️ Note on History Data**: Due to internal improvements, history data from 1.16.x requires migration to work with 1.17.0. You can migrate your historical data using the following command:
|
||||
|
||||
```bash
|
||||
# Migrate history data
|
||||
dagu migrate history
|
||||
```
|
||||
|
||||
After successful migration, legacy history directories are moved to `~/.config/dagu/history_migrated_<timestamp>` for safekeeping. Most other functionality remains stable and compatible except for a few changes. We're committed to maintaining backward compatibility as much as possible in future releases.
|
||||
|
||||
### ❤️ Huge Thanks to Our Contributors
|
||||
|
||||
|
||||
@ -38,6 +38,7 @@ func init() {
|
||||
rootCmd.AddCommand(cmd.CmdScheduler())
|
||||
rootCmd.AddCommand(cmd.CmdRetry())
|
||||
rootCmd.AddCommand(cmd.CmdStartAll())
|
||||
rootCmd.AddCommand(cmd.CmdMigrate())
|
||||
|
||||
build.Version = version
|
||||
}
|
||||
|
||||
@ -10,6 +10,8 @@ coverage:
|
||||
- "internal/digraph/executor"
|
||||
# The filenotify is a third party library
|
||||
- "internal/scheduler/filenotify"
|
||||
# Legacy persistence code
|
||||
- "internal/persistence/legacy"
|
||||
status:
|
||||
project:
|
||||
default:
|
||||
|
||||
@ -19,10 +19,10 @@ import (
|
||||
"github.com/dagu-org/dagu/internal/frontend"
|
||||
"github.com/dagu-org/dagu/internal/logger"
|
||||
"github.com/dagu-org/dagu/internal/models"
|
||||
"github.com/dagu-org/dagu/internal/persistence/localdag"
|
||||
"github.com/dagu-org/dagu/internal/persistence/localdagrun"
|
||||
"github.com/dagu-org/dagu/internal/persistence/localproc"
|
||||
"github.com/dagu-org/dagu/internal/persistence/localqueue"
|
||||
"github.com/dagu-org/dagu/internal/persistence/filedag"
|
||||
"github.com/dagu-org/dagu/internal/persistence/filedagrun"
|
||||
"github.com/dagu-org/dagu/internal/persistence/fileproc"
|
||||
"github.com/dagu-org/dagu/internal/persistence/filequeue"
|
||||
"github.com/dagu-org/dagu/internal/scheduler"
|
||||
"github.com/dagu-org/dagu/internal/stringutil"
|
||||
"github.com/google/uuid"
|
||||
@ -106,8 +106,8 @@ func NewContext(cmd *cobra.Command, flags []commandLineFlag) (*Context, error) {
|
||||
}
|
||||
|
||||
// Initialize history repository and history manager
|
||||
hrOpts := []localdagrun.DAGRunStoreOption{
|
||||
localdagrun.WithLatestStatusToday(cfg.Server.LatestStatusToday),
|
||||
hrOpts := []filedagrun.DAGRunStoreOption{
|
||||
filedagrun.WithLatestStatusToday(cfg.Server.LatestStatusToday),
|
||||
}
|
||||
|
||||
switch cmd.Name() {
|
||||
@ -115,13 +115,13 @@ func NewContext(cmd *cobra.Command, flags []commandLineFlag) (*Context, error) {
|
||||
// For long-running process, we setup file cache for better performance
|
||||
hc := fileutil.NewCache[*models.DAGRunStatus](0, time.Hour*12)
|
||||
hc.StartEviction(ctx)
|
||||
hrOpts = append(hrOpts, localdagrun.WithHistoryFileCache(hc))
|
||||
hrOpts = append(hrOpts, filedagrun.WithHistoryFileCache(hc))
|
||||
}
|
||||
|
||||
ps := localproc.New(cfg.Paths.ProcDir)
|
||||
drs := localdagrun.New(cfg.Paths.DAGRunsDir, hrOpts...)
|
||||
ps := fileproc.New(cfg.Paths.ProcDir)
|
||||
drs := filedagrun.New(cfg.Paths.DAGRunsDir, hrOpts...)
|
||||
drm := dagrun.New(drs, ps, cfg.Paths.Executable, cfg.Global.WorkDir)
|
||||
qs := localqueue.New(cfg.Paths.QueueDir)
|
||||
qs := filequeue.New(cfg.Paths.QueueDir)
|
||||
|
||||
return &Context{
|
||||
Context: ctx,
|
||||
@ -200,11 +200,11 @@ func (c *Context) dagStore(cache *fileutil.Cache[*digraph.DAG], searchPaths []st
|
||||
}
|
||||
|
||||
// Create a flag store based on the suspend flags directory.
|
||||
return localdag.New(
|
||||
return filedag.New(
|
||||
c.Config.Paths.DAGsDir,
|
||||
localdag.WithFlagsBaseDir(c.Config.Paths.SuspendFlagsDir),
|
||||
localdag.WithSearchPaths(searchPaths),
|
||||
localdag.WithFileCache(cache),
|
||||
filedag.WithFlagsBaseDir(c.Config.Paths.SuspendFlagsDir),
|
||||
filedag.WithSearchPaths(searchPaths),
|
||||
filedag.WithFileCache(cache),
|
||||
), nil
|
||||
}
|
||||
|
||||
|
||||
116
internal/cmd/migrate.go
Normal file
116
internal/cmd/migrate.go
Normal file
@ -0,0 +1,116 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/dagu-org/dagu/internal/logger"
|
||||
"github.com/dagu-org/dagu/internal/migration"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
// CmdMigrate creates the migrate command with subcommands
|
||||
func CmdMigrate() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "migrate",
|
||||
Short: "Migrate legacy data to new format",
|
||||
Long: `Migrate various types of legacy data to new formats.
|
||||
|
||||
Available subcommands:
|
||||
history - Migrate DAG run history from v1.16 format to v1.17+ format`,
|
||||
}
|
||||
|
||||
cmd.AddCommand(MigrateHistoryCommand())
|
||||
return cmd
|
||||
}
|
||||
|
||||
// MigrateHistoryCommand creates a command to migrate history data
|
||||
func MigrateHistoryCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "history",
|
||||
Short: "Migrate legacy history data",
|
||||
Long: `Migrate DAG run history from the legacy format (v1.16 and earlier) to the new format (v1.17+).
|
||||
|
||||
This command will:
|
||||
- Detect if legacy history data exists
|
||||
- Convert and migrate all historical DAG runs to the new format
|
||||
- Archive the old data to history_migrated_<timestamp> directory
|
||||
- Report migration progress and any errors
|
||||
|
||||
Example:
|
||||
dagu migrate history`,
|
||||
}
|
||||
|
||||
return NewCommand(cmd, nil, func(ctx *Context, _ []string) error {
|
||||
return runMigration(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
func runMigration(ctx *Context) error {
|
||||
logger.Info(ctx.Context, "Starting history migration")
|
||||
|
||||
// Create DAG store for loading DAG definitions
|
||||
dagStore, err := ctx.dagStore(nil, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create DAG store: %w", err)
|
||||
}
|
||||
|
||||
// Create migrator
|
||||
migrator := migration.NewHistoryMigrator(
|
||||
ctx.DAGRunStore,
|
||||
dagStore,
|
||||
ctx.Config.Paths.DataDir,
|
||||
ctx.Config.Paths.DAGsDir,
|
||||
)
|
||||
|
||||
// Check if migration is needed
|
||||
needsMigration, err := migrator.NeedsMigration(ctx.Context)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check migration status: %w", err)
|
||||
}
|
||||
|
||||
if !needsMigration {
|
||||
logger.Info(ctx.Context, "No legacy history data found, migration not needed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run migration
|
||||
result, err := migrator.Migrate(ctx.Context)
|
||||
if err != nil {
|
||||
return fmt.Errorf("migration failed: %w", err)
|
||||
}
|
||||
|
||||
// Report results
|
||||
logger.Info(ctx.Context, "Migration completed",
|
||||
"total_dags", result.TotalDAGs,
|
||||
"total_runs", result.TotalRuns,
|
||||
"migrated", result.MigratedRuns,
|
||||
"skipped", result.SkippedRuns,
|
||||
"failed", result.FailedRuns,
|
||||
)
|
||||
|
||||
if len(result.Errors) > 0 {
|
||||
logger.Warn(ctx.Context, "Migration completed with errors", "error_count", len(result.Errors))
|
||||
for i, err := range result.Errors {
|
||||
if i < 10 { // Limit error output
|
||||
logger.Error(ctx.Context, "Migration error", "error", err)
|
||||
}
|
||||
}
|
||||
if len(result.Errors) > 10 {
|
||||
logger.Warn(ctx.Context, "Additional errors omitted", "count", len(result.Errors)-10)
|
||||
}
|
||||
}
|
||||
|
||||
// Move legacy data to archive if migration was successful
|
||||
if result.FailedRuns == 0 {
|
||||
if err := migrator.MoveLegacyData(ctx.Context); err != nil {
|
||||
logger.Error(ctx.Context, "Failed to move legacy data to archive", "error", err)
|
||||
logger.Info(ctx.Context, "Legacy data remains in original location", "path", filepath.Join(ctx.Config.Paths.DataDir, "history"))
|
||||
}
|
||||
} else {
|
||||
logger.Info(ctx.Context, "Legacy data not moved due to migration errors", "failed_runs", result.FailedRuns)
|
||||
logger.Info(ctx.Context, "Fix errors and run migration again to complete the process")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
167
internal/cmd/migrate_test.go
Normal file
167
internal/cmd/migrate_test.go
Normal file
@ -0,0 +1,167 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dagu-org/dagu/internal/config"
|
||||
"github.com/dagu-org/dagu/internal/digraph"
|
||||
"github.com/dagu-org/dagu/internal/digraph/scheduler"
|
||||
"github.com/dagu-org/dagu/internal/persistence/filedagrun"
|
||||
legacymodel "github.com/dagu-org/dagu/internal/persistence/legacy/model"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMigrateHistoryCommand(t *testing.T) {
|
||||
// Create temporary directories
|
||||
tempDir := t.TempDir()
|
||||
dataDir := filepath.Join(tempDir, "data")
|
||||
dagRunsDir := filepath.Join(tempDir, "dag-runs")
|
||||
dagsDir := filepath.Join(tempDir, "dags")
|
||||
|
||||
require.NoError(t, os.MkdirAll(dataDir, 0750))
|
||||
require.NoError(t, os.MkdirAll(dagRunsDir, 0750))
|
||||
require.NoError(t, os.MkdirAll(dagsDir, 0750))
|
||||
|
||||
// Create legacy data
|
||||
legacyDagDir := filepath.Join(dataDir, "test-dag-abc123")
|
||||
require.NoError(t, os.MkdirAll(legacyDagDir, 0750))
|
||||
|
||||
// Create legacy status
|
||||
legacyStatus := legacymodel.Status{
|
||||
RequestID: "req123",
|
||||
Name: "test-dag",
|
||||
Status: scheduler.StatusSuccess,
|
||||
StartedAt: time.Now().Add(-1 * time.Hour).Format(time.RFC3339),
|
||||
FinishedAt: time.Now().Add(-30 * time.Minute).Format(time.RFC3339),
|
||||
Nodes: []*legacymodel.Node{
|
||||
{
|
||||
Step: digraph.Step{Name: "step1"},
|
||||
Status: scheduler.NodeStatusSuccess,
|
||||
StartedAt: time.Now().Add(-50 * time.Minute).Format(time.RFC3339),
|
||||
FinishedAt: time.Now().Add(-40 * time.Minute).Format(time.RFC3339),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Write legacy data file
|
||||
statusData, _ := json.Marshal(legacyStatus)
|
||||
datFile := filepath.Join(legacyDagDir, "test-dag.20240101.100000.req123.dat")
|
||||
require.NoError(t, os.WriteFile(datFile, statusData, 0600))
|
||||
|
||||
// Create DAG file
|
||||
dagPath := filepath.Join(dagsDir, "test-dag.yaml")
|
||||
require.NoError(t, os.WriteFile(dagPath, []byte("name: test-dag\nsteps:\n - name: step1\n command: echo test"), 0600))
|
||||
|
||||
// Create a test context
|
||||
cfg := &config.Config{
|
||||
Paths: config.PathsConfig{
|
||||
DataDir: dataDir,
|
||||
DAGRunsDir: dagRunsDir,
|
||||
DAGsDir: dagsDir,
|
||||
},
|
||||
Global: config.Global{
|
||||
WorkDir: tempDir,
|
||||
},
|
||||
}
|
||||
|
||||
// Create stores
|
||||
dagRunStore := filedagrun.New(dagRunsDir)
|
||||
|
||||
// Create command context
|
||||
cmd := &cobra.Command{}
|
||||
ctx := &Context{
|
||||
Context: context.Background(),
|
||||
Command: cmd,
|
||||
Config: cfg,
|
||||
DAGRunStore: dagRunStore,
|
||||
}
|
||||
|
||||
t.Run("successful migration", func(t *testing.T) {
|
||||
err := runMigration(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify migration
|
||||
attempt, err := dagRunStore.FindAttempt(context.Background(), digraph.NewDAGRunRef("test-dag", "req123"))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, attempt)
|
||||
|
||||
status, err := attempt.ReadStatus(context.Background())
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "req123", status.DAGRunID)
|
||||
assert.Equal(t, "test-dag", status.Name)
|
||||
assert.Equal(t, scheduler.StatusSuccess, status.Status)
|
||||
|
||||
// Verify legacy directory was moved
|
||||
_, err = os.Stat(legacyDagDir)
|
||||
assert.True(t, os.IsNotExist(err))
|
||||
|
||||
// Verify archive exists
|
||||
entries, err := os.ReadDir(dataDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
archiveFound := false
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() && len(entry.Name()) > 17 && entry.Name()[:17] == "history_migrated_" {
|
||||
archiveFound = true
|
||||
}
|
||||
}
|
||||
assert.True(t, archiveFound)
|
||||
})
|
||||
}
|
||||
|
||||
func TestMigrateCommand_NoLegacyData(t *testing.T) {
|
||||
// Create temporary directories
|
||||
tempDir := t.TempDir()
|
||||
dataDir := filepath.Join(tempDir, "data")
|
||||
dagRunsDir := filepath.Join(tempDir, "dag-runs")
|
||||
dagsDir := filepath.Join(tempDir, "dags")
|
||||
|
||||
require.NoError(t, os.MkdirAll(dataDir, 0750))
|
||||
require.NoError(t, os.MkdirAll(dagRunsDir, 0750))
|
||||
require.NoError(t, os.MkdirAll(dagsDir, 0750))
|
||||
|
||||
// Create a test context
|
||||
cfg := &config.Config{
|
||||
Paths: config.PathsConfig{
|
||||
DataDir: dataDir,
|
||||
DAGRunsDir: dagRunsDir,
|
||||
DAGsDir: dagsDir,
|
||||
},
|
||||
}
|
||||
|
||||
// Create stores
|
||||
dagRunStore := filedagrun.New(dagRunsDir)
|
||||
|
||||
// Create command context
|
||||
cmd := &cobra.Command{}
|
||||
ctx := &Context{
|
||||
Context: context.Background(),
|
||||
Command: cmd,
|
||||
Config: cfg,
|
||||
DAGRunStore: dagRunStore,
|
||||
}
|
||||
|
||||
// Run migration with no legacy data
|
||||
err := runMigration(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Should complete without errors
|
||||
}
|
||||
|
||||
func TestCmdMigrate(t *testing.T) {
|
||||
cmd := CmdMigrate()
|
||||
assert.NotNil(t, cmd)
|
||||
assert.Equal(t, "migrate", cmd.Use)
|
||||
assert.True(t, cmd.HasSubCommands())
|
||||
|
||||
// Check for history subcommand
|
||||
historyCmd := cmd.Commands()[0]
|
||||
assert.Equal(t, "history", historyCmd.Use)
|
||||
}
|
||||
@ -250,7 +250,7 @@ func setupScript(_ context.Context, step digraph.Step) (string, error) {
|
||||
}
|
||||
|
||||
// Add execute permissions to the script file
|
||||
if err = os.Chmod(file.Name(), 0755); err != nil { // nolint: gosec
|
||||
if err = os.Chmod(file.Name(), 0750); err != nil { // nolint: gosec
|
||||
return "", fmt.Errorf("failed to set execute permissions on script file: %w", err)
|
||||
}
|
||||
|
||||
|
||||
@ -819,7 +819,7 @@ func TestScheduler(t *testing.T) {
|
||||
|
||||
go func() {
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
err := os.WriteFile(file, []byte("ready"), 0644)
|
||||
err := os.WriteFile(file, []byte("ready"), 0600)
|
||||
require.NoError(t, err, "failed to write to file")
|
||||
}()
|
||||
|
||||
@ -951,7 +951,7 @@ func TestScheduler(t *testing.T) {
|
||||
}
|
||||
t.Cleanup(func() { err := os.Remove(file); require.NoError(t, err) })
|
||||
// Write initial value
|
||||
err = os.WriteFile(file, []byte("notyet"), 0644)
|
||||
err = os.WriteFile(file, []byte("notyet"), 0600)
|
||||
require.NoError(t, err)
|
||||
graph := sc.newGraph(t,
|
||||
newStep("1",
|
||||
@ -968,7 +968,7 @@ func TestScheduler(t *testing.T) {
|
||||
)
|
||||
go func() {
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
err := os.WriteFile(file, []byte("done"), 0644)
|
||||
err := os.WriteFile(file, []byte("done"), 0600)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
result := graph.Schedule(t, scheduler.StatusSuccess)
|
||||
|
||||
@ -25,14 +25,14 @@ func TestReadLogLines(t *testing.T) {
|
||||
"Line 9",
|
||||
"Line 10",
|
||||
}
|
||||
err := os.WriteFile(testLogPath, []byte(strings.Join(testContent, "\n")), 0644)
|
||||
err := os.WriteFile(testLogPath, []byte(strings.Join(testContent, "\n")), 0600)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test log file: %v", err)
|
||||
}
|
||||
|
||||
// Create an empty log file for testing
|
||||
emptyLogPath := filepath.Join(tempDir, "empty.log")
|
||||
err = os.WriteFile(emptyLogPath, []byte(""), 0644)
|
||||
err = os.WriteFile(emptyLogPath, []byte(""), 0600)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create empty log file: %v", err)
|
||||
}
|
||||
@ -193,7 +193,7 @@ func TestReadLogContent(t *testing.T) {
|
||||
"Line 4",
|
||||
"Line 5",
|
||||
}
|
||||
err := os.WriteFile(testLogPath, []byte(strings.Join(testContent, "\n")), 0644)
|
||||
err := os.WriteFile(testLogPath, []byte(strings.Join(testContent, "\n")), 0600)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test log file: %v", err)
|
||||
}
|
||||
@ -329,7 +329,7 @@ func TestCountLinesExact(t *testing.T) {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Create test file
|
||||
testFilePath := filepath.Join(tempDir, tc.name+".log")
|
||||
err := os.WriteFile(testFilePath, []byte(tc.content), 0644)
|
||||
err := os.WriteFile(testFilePath, []byte(tc.content), 0600)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test file: %v", err)
|
||||
}
|
||||
@ -369,7 +369,7 @@ func TestReadFirstLines(t *testing.T) {
|
||||
"Line 4",
|
||||
"Line 5",
|
||||
}
|
||||
err := os.WriteFile(testLogPath, []byte(strings.Join(testContent, "\n")), 0644)
|
||||
err := os.WriteFile(testLogPath, []byte(strings.Join(testContent, "\n")), 0600)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test log file: %v", err)
|
||||
}
|
||||
@ -480,7 +480,7 @@ func TestReadLastLines(t *testing.T) {
|
||||
"Line 4",
|
||||
"Line 5",
|
||||
}
|
||||
err := os.WriteFile(testLogPath, []byte(strings.Join(testContent, "\n")), 0644)
|
||||
err := os.WriteFile(testLogPath, []byte(strings.Join(testContent, "\n")), 0600)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test log file: %v", err)
|
||||
}
|
||||
@ -591,7 +591,7 @@ func TestReadLinesRange(t *testing.T) {
|
||||
"Line 4",
|
||||
"Line 5",
|
||||
}
|
||||
err := os.WriteFile(testLogPath, []byte(strings.Join(testContent, "\n")), 0644)
|
||||
err := os.WriteFile(testLogPath, []byte(strings.Join(testContent, "\n")), 0600)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test log file: %v", err)
|
||||
}
|
||||
@ -758,7 +758,7 @@ func TestEstimateLineCount(t *testing.T) {
|
||||
|
||||
// For small files, write the exact content
|
||||
if tt.fileSize <= 10 {
|
||||
err := os.WriteFile(testFilePath, []byte(tt.content), 0644)
|
||||
err := os.WriteFile(testFilePath, []byte(tt.content), 0600)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test file: %v", err)
|
||||
}
|
||||
|
||||
@ -17,7 +17,7 @@ import (
|
||||
"github.com/dagu-org/dagu/internal/digraph"
|
||||
"github.com/dagu-org/dagu/internal/digraph/scheduler"
|
||||
"github.com/dagu-org/dagu/internal/models"
|
||||
"github.com/dagu-org/dagu/internal/persistence/localdagrun"
|
||||
"github.com/dagu-org/dagu/internal/persistence/filedagrun"
|
||||
"github.com/samber/lo"
|
||||
"golang.org/x/text/encoding"
|
||||
"golang.org/x/text/encoding/japanese"
|
||||
@ -320,7 +320,7 @@ func (a *API) readLog(
|
||||
var logFile string
|
||||
|
||||
if statusFile != "" {
|
||||
status, err := localdagrun.ParseStatusFile(statusFile)
|
||||
status, err := filedagrun.ParseStatusFile(statusFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -355,7 +355,7 @@ func (a *API) readStepLog(
|
||||
var status *models.DAGRunStatus
|
||||
|
||||
if statusFile != "" {
|
||||
parsedStatus, err := localdagrun.ParseStatusFile(statusFile)
|
||||
parsedStatus, err := filedagrun.ParseStatusFile(statusFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
481
internal/migration/migration.go
Normal file
481
internal/migration/migration.go
Normal file
@ -0,0 +1,481 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/dagu-org/dagu/internal/digraph"
|
||||
"github.com/dagu-org/dagu/internal/logger"
|
||||
"github.com/dagu-org/dagu/internal/models"
|
||||
legacymodel "github.com/dagu-org/dagu/internal/persistence/legacy/model"
|
||||
)
|
||||
|
||||
// HistoryMigrator handles migration from legacy history format to new format
|
||||
type HistoryMigrator struct {
|
||||
dagRunStore models.DAGRunStore
|
||||
dagStore models.DAGStore
|
||||
dataDir string
|
||||
dagsDir string
|
||||
}
|
||||
|
||||
// NewHistoryMigrator creates a new history migrator
|
||||
func NewHistoryMigrator(
|
||||
dagRunStore models.DAGRunStore,
|
||||
dagStore models.DAGStore,
|
||||
dataDir string,
|
||||
dagsDir string,
|
||||
) *HistoryMigrator {
|
||||
return &HistoryMigrator{
|
||||
dagRunStore: dagRunStore,
|
||||
dagStore: dagStore,
|
||||
dataDir: dataDir,
|
||||
dagsDir: dagsDir,
|
||||
}
|
||||
}
|
||||
|
||||
// MigrationResult contains the result of a migration
|
||||
type MigrationResult struct {
|
||||
TotalDAGs int
|
||||
TotalRuns int
|
||||
MigratedRuns int
|
||||
SkippedRuns int
|
||||
FailedRuns int
|
||||
Errors []error
|
||||
}
|
||||
|
||||
// NeedsMigration checks if legacy data exists that needs migration
|
||||
func (m *HistoryMigrator) NeedsMigration(_ context.Context) (bool, error) {
|
||||
dataDir := m.dataDir
|
||||
|
||||
// Check if history directory exists
|
||||
if _, err := os.Stat(dataDir); os.IsNotExist(err) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check if there are any DAG directories in the history folder
|
||||
entries, err := os.ReadDir(dataDir)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to read history directory: %w", err)
|
||||
}
|
||||
|
||||
// Look for directories with .dat files
|
||||
for _, entry := range entries {
|
||||
if !entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
dagHistoryDir := filepath.Join(dataDir, entry.Name())
|
||||
files, err := os.ReadDir(dagHistoryDir)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if there are any .dat files
|
||||
for _, file := range files {
|
||||
if strings.HasSuffix(file.Name(), ".dat") {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Migrate performs the migration from legacy to new format
|
||||
func (m *HistoryMigrator) Migrate(ctx context.Context) (*MigrationResult, error) {
|
||||
result := &MigrationResult{}
|
||||
|
||||
logger.Info(ctx, "Starting history migration from legacy format")
|
||||
|
||||
historyDir := m.dataDir
|
||||
entries, err := os.ReadDir(historyDir)
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("failed to read history directory: %w", err)
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
if !entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
dagName := m.extractDAGName(entry.Name())
|
||||
result.TotalDAGs++
|
||||
|
||||
logger.Info(ctx, "Migrating DAG history", "dag", dagName)
|
||||
|
||||
// Migrate all runs for this DAG
|
||||
if err := m.migrateDAGHistory(ctx, entry.Name(), dagName, result); err != nil {
|
||||
result.Errors = append(result.Errors, fmt.Errorf("failed to migrate DAG %s: %w", dagName, err))
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Migration completed",
|
||||
"total_dags", result.TotalDAGs,
|
||||
"total_runs", result.TotalRuns,
|
||||
"migrated", result.MigratedRuns,
|
||||
"skipped", result.SkippedRuns,
|
||||
"failed", result.FailedRuns,
|
||||
)
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// migrateDAGHistory migrates all runs for a specific DAG
|
||||
func (m *HistoryMigrator) migrateDAGHistory(ctx context.Context, dirName, dagName string, result *MigrationResult) error {
|
||||
dagHistoryDir := filepath.Join(m.dataDir, dirName)
|
||||
|
||||
files, err := os.ReadDir(dagHistoryDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read DAG history directory: %w", err)
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
if !strings.HasSuffix(file.Name(), ".dat") {
|
||||
continue
|
||||
}
|
||||
|
||||
result.TotalRuns++
|
||||
|
||||
// Read the legacy status file to get the actual RequestID
|
||||
filePath := filepath.Join(dagHistoryDir, file.Name())
|
||||
statusFile, err := m.readLegacyStatusFile(filePath)
|
||||
if err != nil {
|
||||
result.FailedRuns++
|
||||
result.Errors = append(result.Errors, fmt.Errorf("failed to read legacy status file %s: %w", file.Name(), err))
|
||||
continue
|
||||
}
|
||||
|
||||
if statusFile == nil || statusFile.Status.RequestID == "" {
|
||||
result.SkippedRuns++
|
||||
logger.Debug(ctx, "Skipping file with no valid status", "file", file.Name())
|
||||
continue
|
||||
}
|
||||
|
||||
requestID := statusFile.Status.RequestID
|
||||
|
||||
// Check if already migrated
|
||||
if m.isAlreadyMigrated(ctx, statusFile.Status.Name, requestID) {
|
||||
result.SkippedRuns++
|
||||
logger.Debug(ctx, "Run already migrated", "dag", statusFile.Status.Name, "request_id", requestID)
|
||||
continue
|
||||
}
|
||||
|
||||
// Convert and save - pass the directory name (without hash) as additional hint
|
||||
if err := m.migrateRun(ctx, statusFile, dagName); err != nil {
|
||||
result.FailedRuns++
|
||||
result.Errors = append(result.Errors, fmt.Errorf("failed to migrate run %s: %w", requestID, err))
|
||||
continue
|
||||
}
|
||||
|
||||
result.MigratedRuns++
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// migrateRun converts and saves a single run
|
||||
func (m *HistoryMigrator) migrateRun(ctx context.Context, legacyStatusFile *legacymodel.StatusFile, dirBasedDagName string) error {
|
||||
legacyStatus := &legacyStatusFile.Status
|
||||
|
||||
// Load the DAG definition - try both the status name and directory-based name
|
||||
dag, err := m.loadDAGForMigration(ctx, legacyStatus.Name, dirBasedDagName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load DAG %s: %w", legacyStatus.Name, err)
|
||||
}
|
||||
|
||||
// Convert legacy status to new format
|
||||
newStatus := m.convertStatus(legacyStatus, dag)
|
||||
|
||||
// Parse started time to get timestamp for CreateAttempt
|
||||
startedAt, _ := m.parseTime(legacyStatus.StartedAt)
|
||||
if startedAt.IsZero() {
|
||||
startedAt = time.Now()
|
||||
}
|
||||
|
||||
// Create attempt in new store
|
||||
attempt, err := m.dagRunStore.CreateAttempt(ctx, dag, startedAt, newStatus.DAGRunID, models.NewDAGRunAttemptOptions{
|
||||
RootDAGRun: nil, // No hierarchy info in legacy format
|
||||
Retry: false,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create attempt: %w", err)
|
||||
}
|
||||
|
||||
// Open the attempt for writing
|
||||
if err := attempt.Open(ctx); err != nil {
|
||||
return fmt.Errorf("failed to open attempt: %w", err)
|
||||
}
|
||||
|
||||
// Write the converted status
|
||||
if err := attempt.Write(ctx, *newStatus); err != nil {
|
||||
return fmt.Errorf("failed to write status: %w", err)
|
||||
}
|
||||
|
||||
// Close the attempt
|
||||
if err := attempt.Close(ctx); err != nil {
|
||||
return fmt.Errorf("failed to close attempt: %w", err)
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "Migrated run", "dag", legacyStatus.Name, "request_id", legacyStatus.RequestID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// convertStatus converts legacy status to new DAGRunStatus format
|
||||
func (m *HistoryMigrator) convertStatus(legacy *legacymodel.Status, dag *digraph.DAG) *models.DAGRunStatus {
|
||||
// Convert timestamps
|
||||
startedAt, _ := m.parseTime(legacy.StartedAt)
|
||||
finishedAt, _ := m.parseTime(legacy.FinishedAt)
|
||||
|
||||
// Create createdAt timestamp based on startedAt
|
||||
createdAt := time.Now().UnixMilli()
|
||||
if !startedAt.IsZero() {
|
||||
createdAt = startedAt.UnixMilli()
|
||||
}
|
||||
|
||||
status := &models.DAGRunStatus{
|
||||
Name: legacy.Name,
|
||||
DAGRunID: legacy.RequestID,
|
||||
Status: legacy.Status,
|
||||
PID: models.PID(legacy.PID),
|
||||
Log: legacy.Log,
|
||||
Nodes: make([]*models.Node, 0),
|
||||
Params: legacy.Params,
|
||||
ParamsList: legacy.ParamsList,
|
||||
CreatedAt: createdAt,
|
||||
StartedAt: formatTime(startedAt),
|
||||
FinishedAt: formatTime(finishedAt),
|
||||
QueuedAt: formatTime(startedAt), // Use StartedAt as QueuedAt for migration
|
||||
}
|
||||
|
||||
// Convert nodes
|
||||
for _, node := range legacy.Nodes {
|
||||
status.Nodes = append(status.Nodes, m.convertNode(node))
|
||||
}
|
||||
|
||||
// Convert handler nodes
|
||||
if legacy.OnExit != nil {
|
||||
status.OnExit = m.convertNode(legacy.OnExit)
|
||||
}
|
||||
if legacy.OnSuccess != nil {
|
||||
status.OnSuccess = m.convertNode(legacy.OnSuccess)
|
||||
}
|
||||
if legacy.OnFailure != nil {
|
||||
status.OnFailure = m.convertNode(legacy.OnFailure)
|
||||
}
|
||||
if legacy.OnCancel != nil {
|
||||
status.OnCancel = m.convertNode(legacy.OnCancel)
|
||||
}
|
||||
|
||||
// Set preconditions from DAG if available
|
||||
if dag != nil {
|
||||
status.Preconditions = dag.Preconditions
|
||||
}
|
||||
|
||||
return status
|
||||
}
|
||||
|
||||
// convertNode converts legacy node to new Node format
|
||||
func (m *HistoryMigrator) convertNode(legacy *legacymodel.Node) *models.Node {
|
||||
node := &models.Node{
|
||||
Step: legacy.Step,
|
||||
Status: legacy.Status,
|
||||
Error: legacy.Error,
|
||||
RetryCount: legacy.RetryCount,
|
||||
DoneCount: legacy.DoneCount,
|
||||
StartedAt: legacy.StartedAt,
|
||||
FinishedAt: legacy.FinishedAt,
|
||||
RetriedAt: legacy.RetriedAt,
|
||||
}
|
||||
|
||||
// Legacy format stored logs inline, new format uses file paths
|
||||
// We'll store the log content as a note that it was migrated
|
||||
if legacy.Log != "" {
|
||||
node.Stdout = "(migrated - log content was inline)"
|
||||
}
|
||||
|
||||
return node
|
||||
}
|
||||
|
||||
// parseTime attempts to parse various time formats
|
||||
func (m *HistoryMigrator) parseTime(timeStr string) (time.Time, error) {
|
||||
if timeStr == "" || timeStr == "-" {
|
||||
return time.Time{}, fmt.Errorf("empty time string")
|
||||
}
|
||||
|
||||
// Try various formats
|
||||
formats := []string{
|
||||
time.RFC3339,
|
||||
"2006-01-02T15:04:05Z07:00",
|
||||
"2006-01-02 15:04:05",
|
||||
"2006-01-02T15:04:05",
|
||||
}
|
||||
|
||||
for _, format := range formats {
|
||||
if t, err := time.Parse(format, timeStr); err == nil {
|
||||
return t, nil
|
||||
}
|
||||
}
|
||||
|
||||
return time.Time{}, fmt.Errorf("unable to parse time: %s", timeStr)
|
||||
}
|
||||
|
||||
// formatTime formats a time value for the new format
|
||||
func formatTime(t time.Time) string {
|
||||
if t.IsZero() {
|
||||
return ""
|
||||
}
|
||||
return t.Format(time.RFC3339)
|
||||
}
|
||||
|
||||
// extractDAGName extracts the DAG name from directory name
|
||||
func (m *HistoryMigrator) extractDAGName(dirName string) string {
|
||||
// Directory format: {dag-name}-{hash}
|
||||
// Just remove the last part after hyphen if it looks like a hash
|
||||
lastHyphen := strings.LastIndex(dirName, "-")
|
||||
if lastHyphen == -1 {
|
||||
return dirName
|
||||
}
|
||||
|
||||
// Check if the part after hyphen is all hex chars
|
||||
suffix := dirName[lastHyphen+1:]
|
||||
for _, ch := range suffix {
|
||||
if !((ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f')) { //nolint:staticcheck
|
||||
return dirName // Not a hash, return full name
|
||||
}
|
||||
}
|
||||
|
||||
return dirName[:lastHyphen]
|
||||
}
|
||||
|
||||
// readLegacyStatusFile reads a legacy status file directly
|
||||
func (m *HistoryMigrator) readLegacyStatusFile(filePath string) (*legacymodel.StatusFile, error) {
|
||||
data, err := os.ReadFile(filePath) //nolint:gosec
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read file: %w", err)
|
||||
}
|
||||
|
||||
// The legacy status files contain multiple JSON lines, read the last one
|
||||
lines := strings.Split(string(data), "\n")
|
||||
var lastValidLine string
|
||||
for i := len(lines) - 1; i >= 0; i-- {
|
||||
line := strings.TrimSpace(lines[i])
|
||||
if line != "" {
|
||||
lastValidLine = line
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if lastValidLine == "" {
|
||||
return nil, fmt.Errorf("no valid status data found in file")
|
||||
}
|
||||
|
||||
var statusFile legacymodel.StatusFile
|
||||
if err := json.Unmarshal([]byte(lastValidLine), &statusFile.Status); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal status: %w", err)
|
||||
}
|
||||
|
||||
return &statusFile, nil
|
||||
}
|
||||
|
||||
// isAlreadyMigrated checks if a run has already been migrated
|
||||
func (m *HistoryMigrator) isAlreadyMigrated(ctx context.Context, dagName, requestID string) bool {
|
||||
attempt, err := m.dagRunStore.FindAttempt(ctx, digraph.NewDAGRunRef(dagName, requestID))
|
||||
if err != nil || attempt == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
status, err := attempt.ReadStatus(ctx)
|
||||
return err == nil && status != nil
|
||||
}
|
||||
|
||||
// loadDAGForMigration attempts to load the DAG definition
|
||||
func (m *HistoryMigrator) loadDAGForMigration(ctx context.Context, statusDagName, dirBasedDagName string) (*digraph.DAG, error) {
|
||||
// Try both DAG names as candidates
|
||||
candidates := []string{statusDagName}
|
||||
if dirBasedDagName != "" && dirBasedDagName != statusDagName {
|
||||
candidates = append(candidates, dirBasedDagName)
|
||||
}
|
||||
|
||||
// Try to find the DAG file with different extensions
|
||||
extensions := []string{".yaml", ".yml"}
|
||||
|
||||
for _, candidate := range candidates {
|
||||
for _, ext := range extensions {
|
||||
path := filepath.Join(m.dagsDir, candidate+ext)
|
||||
if _, err := os.Stat(path); err == nil {
|
||||
dag, err := m.dagStore.GetDetails(ctx, path)
|
||||
if err == nil && dag != nil {
|
||||
return dag, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we can't find the DAG, create a minimal one
|
||||
return &digraph.DAG{
|
||||
Name: statusDagName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// MoveLegacyData moves individual legacy DAG directories to an archive location after successful migration
|
||||
func (m *HistoryMigrator) MoveLegacyData(ctx context.Context) error {
|
||||
archiveDir := filepath.Join(m.dataDir, fmt.Sprintf("history_migrated_%s", time.Now().Format("20060102_150405")))
|
||||
|
||||
// Create archive directory
|
||||
if err := os.MkdirAll(archiveDir, 0750); err != nil {
|
||||
return fmt.Errorf("failed to create archive directory: %w", err)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Moving legacy history directories to archive", "archive_dir", archiveDir)
|
||||
|
||||
// Read data directory entries
|
||||
entries, err := os.ReadDir(m.dataDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read data directory: %w", err)
|
||||
}
|
||||
|
||||
movedCount := 0
|
||||
for _, entry := range entries {
|
||||
if !entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
dirPath := filepath.Join(m.dataDir, entry.Name())
|
||||
|
||||
// Check if this directory contains .dat files (legacy history)
|
||||
files, err := os.ReadDir(dirPath)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
hasDataFiles := false
|
||||
for _, file := range files {
|
||||
if strings.HasSuffix(file.Name(), ".dat") {
|
||||
hasDataFiles = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !hasDataFiles {
|
||||
continue
|
||||
}
|
||||
|
||||
// Move this legacy directory to archive
|
||||
archivePath := filepath.Join(archiveDir, entry.Name())
|
||||
if err := os.Rename(dirPath, archivePath); err != nil {
|
||||
logger.Warn(ctx, "Failed to move legacy directory", "dir", entry.Name(), "error", err)
|
||||
} else {
|
||||
movedCount++
|
||||
logger.Debug(ctx, "Moved legacy directory", "dir", entry.Name())
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info(ctx, "Legacy history data archived successfully", "location", archiveDir, "directories_moved", movedCount)
|
||||
return nil
|
||||
}
|
||||
524
internal/migration/migration_test.go
Normal file
524
internal/migration/migration_test.go
Normal file
@ -0,0 +1,524 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dagu-org/dagu/internal/digraph"
|
||||
"github.com/dagu-org/dagu/internal/digraph/scheduler"
|
||||
"github.com/dagu-org/dagu/internal/models"
|
||||
"github.com/dagu-org/dagu/internal/persistence/filedagrun"
|
||||
legacymodel "github.com/dagu-org/dagu/internal/persistence/legacy/model"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// mockDAGStore implements models.DAGStore for testing
|
||||
type mockDAGStore struct {
|
||||
dags map[string]*digraph.DAG
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) GetDetails(_ context.Context, path string, _ ...digraph.LoadOption) (*digraph.DAG, error) {
|
||||
if dag, ok := m.dags[path]; ok {
|
||||
return dag, nil
|
||||
}
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) List(_ context.Context, _ models.ListDAGsOptions) (models.PaginatedResult[*digraph.DAG], []string, error) {
|
||||
var dags []*digraph.DAG
|
||||
for _, dag := range m.dags {
|
||||
dags = append(dags, dag)
|
||||
}
|
||||
return models.PaginatedResult[*digraph.DAG]{
|
||||
Items: dags,
|
||||
TotalCount: len(dags),
|
||||
}, nil, nil
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) FindByName(_ context.Context, name string) (*digraph.DAG, error) {
|
||||
for _, dag := range m.dags {
|
||||
if dag.Name == name {
|
||||
return dag, nil
|
||||
}
|
||||
}
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) Create(_ context.Context, _ string, _ []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) Delete(_ context.Context, _ string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) Update(_ context.Context, _ string, _ []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) Rename(_ context.Context, _, _ string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) GetSpec(_ context.Context, _ string) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) IsSuspended(_ context.Context, _ string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) ToggleSuspend(_ context.Context, _ string, _ bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) GetMetadata(_ context.Context, _ string) (*digraph.DAG, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) Grep(_ context.Context, _ string) ([]*models.GrepDAGsResult, []string, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) UpdateSpec(_ context.Context, _ string, _ []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) LoadSpec(_ context.Context, _ []byte, _ ...digraph.LoadOption) (*digraph.DAG, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *mockDAGStore) TagList(_ context.Context) ([]string, []string, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func TestExtractDAGName(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
dirName string
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "directory with hash",
|
||||
dirName: "my-dag-a1b2c3d4",
|
||||
expected: "my-dag",
|
||||
},
|
||||
{
|
||||
name: "directory with longer hash",
|
||||
dirName: "test-workflow-deadbeef1234",
|
||||
expected: "test-workflow",
|
||||
},
|
||||
{
|
||||
name: "directory without hash",
|
||||
dirName: "simple-dag",
|
||||
expected: "simple-dag",
|
||||
},
|
||||
{
|
||||
name: "directory with multiple hyphens",
|
||||
dirName: "my-complex-dag-name-abc123",
|
||||
expected: "my-complex-dag-name",
|
||||
},
|
||||
{
|
||||
name: "directory with non-hex suffix",
|
||||
dirName: "dag-with-suffix-xyz",
|
||||
expected: "dag-with-suffix-xyz",
|
||||
},
|
||||
}
|
||||
|
||||
m := &HistoryMigrator{}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := m.extractDAGName(tt.dirName)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadLegacyStatusFile(t *testing.T) {
|
||||
// Create a temporary directory
|
||||
tempDir := t.TempDir()
|
||||
|
||||
// Create a test status file with multiple lines
|
||||
testFile := filepath.Join(tempDir, "test.dat")
|
||||
status1 := legacymodel.Status{
|
||||
RequestID: "req123",
|
||||
Name: "test-dag",
|
||||
Status: scheduler.StatusRunning,
|
||||
StartedAt: "2024-01-01T10:00:00Z",
|
||||
}
|
||||
status2 := legacymodel.Status{
|
||||
RequestID: "req123",
|
||||
Name: "test-dag",
|
||||
Status: scheduler.StatusSuccess,
|
||||
StartedAt: "2024-01-01T10:00:00Z",
|
||||
FinishedAt: "2024-01-01T10:05:00Z",
|
||||
}
|
||||
|
||||
// Write multiple status lines
|
||||
f, err := os.Create(testFile)
|
||||
require.NoError(t, err)
|
||||
|
||||
data1, _ := json.Marshal(status1)
|
||||
data2, _ := json.Marshal(status2)
|
||||
_, err = f.WriteString(string(data1) + "\n")
|
||||
require.NoError(t, err)
|
||||
_, err = f.WriteString(string(data2) + "\n")
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, f.Close())
|
||||
|
||||
// Test reading the file
|
||||
m := &HistoryMigrator{}
|
||||
statusFile, err := m.readLegacyStatusFile(testFile)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, statusFile)
|
||||
|
||||
// Should get the last status (finished one)
|
||||
assert.Equal(t, "req123", statusFile.Status.RequestID)
|
||||
assert.Equal(t, scheduler.StatusSuccess, statusFile.Status.Status)
|
||||
assert.Equal(t, "2024-01-01T10:05:00Z", statusFile.Status.FinishedAt)
|
||||
}
|
||||
|
||||
func TestNeedsMigration(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("no history directory", func(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
m := &HistoryMigrator{dataDir: tempDir}
|
||||
|
||||
needsMigration, err := m.NeedsMigration(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, needsMigration)
|
||||
})
|
||||
|
||||
t.Run("history directory with dat files", func(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
|
||||
// Create legacy structure
|
||||
dagDir := filepath.Join(tempDir, "my-dag-abc123")
|
||||
require.NoError(t, os.MkdirAll(dagDir, 0750))
|
||||
|
||||
datFile := filepath.Join(dagDir, "my-dag.20240101.100000.req123.dat")
|
||||
require.NoError(t, os.WriteFile(datFile, []byte(`{"RequestId":"req123"}`), 0600))
|
||||
|
||||
m := &HistoryMigrator{dataDir: tempDir}
|
||||
needsMigration, err := m.NeedsMigration(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, needsMigration)
|
||||
})
|
||||
|
||||
t.Run("history directory without dat files", func(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
|
||||
// Create directory without .dat files
|
||||
dagDir := filepath.Join(tempDir, "my-dag-abc123")
|
||||
require.NoError(t, os.MkdirAll(dagDir, 0750))
|
||||
|
||||
otherFile := filepath.Join(dagDir, "other.txt")
|
||||
require.NoError(t, os.WriteFile(otherFile, []byte("test"), 0600))
|
||||
|
||||
m := &HistoryMigrator{dataDir: tempDir}
|
||||
needsMigration, err := m.NeedsMigration(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, needsMigration)
|
||||
})
|
||||
}
|
||||
|
||||
func TestConvertStatus(t *testing.T) {
|
||||
legacy := &legacymodel.Status{
|
||||
RequestID: "req123",
|
||||
Name: "test-dag",
|
||||
Status: scheduler.StatusSuccess,
|
||||
PID: legacymodel.PID(12345),
|
||||
Log: "test log",
|
||||
StartedAt: "2024-01-01T10:00:00Z",
|
||||
FinishedAt: "2024-01-01T10:05:00Z",
|
||||
Params: "param1=value1",
|
||||
ParamsList: []string{"param1", "value1"},
|
||||
Nodes: []*legacymodel.Node{
|
||||
{
|
||||
Step: digraph.Step{
|
||||
Name: "step1",
|
||||
},
|
||||
Status: scheduler.NodeStatusSuccess,
|
||||
StartedAt: "2024-01-01T10:01:00Z",
|
||||
FinishedAt: "2024-01-01T10:02:00Z",
|
||||
Log: "step log",
|
||||
},
|
||||
},
|
||||
OnSuccess: &legacymodel.Node{
|
||||
Step: digraph.Step{
|
||||
Name: "on_success",
|
||||
},
|
||||
Status: scheduler.NodeStatusSuccess,
|
||||
},
|
||||
}
|
||||
|
||||
dag := &digraph.DAG{
|
||||
Name: "test-dag",
|
||||
Preconditions: []*digraph.Condition{
|
||||
{Condition: "test condition"},
|
||||
},
|
||||
}
|
||||
|
||||
m := &HistoryMigrator{}
|
||||
result := m.convertStatus(legacy, dag)
|
||||
|
||||
assert.Equal(t, "test-dag", result.Name)
|
||||
assert.Equal(t, "req123", result.DAGRunID)
|
||||
assert.Equal(t, scheduler.StatusSuccess, result.Status)
|
||||
assert.Equal(t, models.PID(12345), result.PID)
|
||||
assert.Equal(t, "test log", result.Log)
|
||||
assert.Equal(t, "param1=value1", result.Params)
|
||||
assert.Equal(t, []string{"param1", "value1"}, result.ParamsList)
|
||||
assert.NotEmpty(t, result.StartedAt)
|
||||
assert.NotEmpty(t, result.FinishedAt)
|
||||
assert.Len(t, result.Nodes, 1)
|
||||
assert.NotNil(t, result.OnSuccess)
|
||||
assert.Equal(t, 1, len(result.Preconditions))
|
||||
}
|
||||
|
||||
func TestParseTime(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
timeStr string
|
||||
shouldErr bool
|
||||
}{
|
||||
{
|
||||
name: "RFC3339 format",
|
||||
timeStr: "2024-01-01T10:00:00Z",
|
||||
shouldErr: false,
|
||||
},
|
||||
{
|
||||
name: "RFC3339 with timezone",
|
||||
timeStr: "2024-01-01T10:00:00+09:00",
|
||||
shouldErr: false,
|
||||
},
|
||||
{
|
||||
name: "space separated format",
|
||||
timeStr: "2024-01-01 10:00:00",
|
||||
shouldErr: false,
|
||||
},
|
||||
{
|
||||
name: "empty string",
|
||||
timeStr: "",
|
||||
shouldErr: true,
|
||||
},
|
||||
{
|
||||
name: "dash only",
|
||||
timeStr: "-",
|
||||
shouldErr: true,
|
||||
},
|
||||
{
|
||||
name: "invalid format",
|
||||
timeStr: "not a date",
|
||||
shouldErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
m := &HistoryMigrator{}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result, err := m.parseTime(tt.timeStr)
|
||||
if tt.shouldErr {
|
||||
assert.Error(t, err)
|
||||
assert.True(t, result.IsZero())
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, result.IsZero())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMoveLegacyData(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
tempDir := t.TempDir()
|
||||
|
||||
// Create legacy directories with .dat files
|
||||
legacyDir1 := filepath.Join(tempDir, "dag1-abc123")
|
||||
require.NoError(t, os.MkdirAll(legacyDir1, 0750))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(legacyDir1, "test.dat"), []byte("data"), 0600))
|
||||
|
||||
legacyDir2 := filepath.Join(tempDir, "dag2-def456")
|
||||
require.NoError(t, os.MkdirAll(legacyDir2, 0750))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(legacyDir2, "test.dat"), []byte("data"), 0600))
|
||||
|
||||
// Create a non-legacy directory
|
||||
otherDir := filepath.Join(tempDir, "other-dir")
|
||||
require.NoError(t, os.MkdirAll(otherDir, 0750))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(otherDir, "test.txt"), []byte("data"), 0600))
|
||||
|
||||
m := &HistoryMigrator{dataDir: tempDir}
|
||||
err := m.MoveLegacyData(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check that legacy directories were moved
|
||||
_, err = os.Stat(legacyDir1)
|
||||
assert.True(t, os.IsNotExist(err), "legacy directory 1 should be moved")
|
||||
|
||||
_, err = os.Stat(legacyDir2)
|
||||
assert.True(t, os.IsNotExist(err), "legacy directory 2 should be moved")
|
||||
|
||||
// Check that non-legacy directory still exists
|
||||
_, err = os.Stat(otherDir)
|
||||
assert.NoError(t, err, "non-legacy directory should still exist")
|
||||
|
||||
// Check that archive directory was created
|
||||
entries, err := os.ReadDir(tempDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
archiveFound := false
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() && len(entry.Name()) > 17 && entry.Name()[:17] == "history_migrated_" {
|
||||
archiveFound = true
|
||||
|
||||
// Check archive contents
|
||||
archiveDir := filepath.Join(tempDir, entry.Name())
|
||||
archiveEntries, err := os.ReadDir(archiveDir)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, archiveEntries, 2, "should have 2 moved directories")
|
||||
}
|
||||
}
|
||||
assert.True(t, archiveFound, "archive directory should be created")
|
||||
}
|
||||
|
||||
func TestLoadDAGForMigration(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
tempDir := t.TempDir()
|
||||
|
||||
// Create test DAG files
|
||||
dag1Path := filepath.Join(tempDir, "test-dag.yaml")
|
||||
dag2Path := filepath.Join(tempDir, "test-dag-v2.yml")
|
||||
|
||||
dag1 := &digraph.DAG{Name: "test-dag", Location: dag1Path}
|
||||
dag2 := &digraph.DAG{Name: "test-dag-v2", Location: dag2Path}
|
||||
|
||||
mockStore := &mockDAGStore{
|
||||
dags: map[string]*digraph.DAG{
|
||||
dag1Path: dag1,
|
||||
dag2Path: dag2,
|
||||
},
|
||||
}
|
||||
|
||||
m := &HistoryMigrator{
|
||||
dagStore: mockStore,
|
||||
dagsDir: tempDir,
|
||||
}
|
||||
|
||||
// Create the actual files
|
||||
require.NoError(t, os.WriteFile(dag1Path, []byte("test"), 0600))
|
||||
require.NoError(t, os.WriteFile(dag2Path, []byte("test"), 0600))
|
||||
|
||||
t.Run("find by status name", func(t *testing.T) {
|
||||
result, err := m.loadDAGForMigration(ctx, "test-dag", "other-name")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test-dag", result.Name)
|
||||
})
|
||||
|
||||
t.Run("find by directory name", func(t *testing.T) {
|
||||
result, err := m.loadDAGForMigration(ctx, "not-found", "test-dag-v2")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test-dag-v2", result.Name)
|
||||
})
|
||||
|
||||
t.Run("not found - create minimal", func(t *testing.T) {
|
||||
result, err := m.loadDAGForMigration(ctx, "not-found", "also-not-found")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "not-found", result.Name)
|
||||
})
|
||||
}
|
||||
|
||||
func TestFullMigration(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
tempDir := t.TempDir()
|
||||
|
||||
// Set up directories - use subdirectories to avoid counting them as legacy dirs
|
||||
dataDir := filepath.Join(tempDir, "data")
|
||||
dagRunsDir := filepath.Join(tempDir, "runs")
|
||||
dagsDir := filepath.Join(tempDir, "dags")
|
||||
|
||||
require.NoError(t, os.MkdirAll(dataDir, 0750))
|
||||
require.NoError(t, os.MkdirAll(dagRunsDir, 0750))
|
||||
require.NoError(t, os.MkdirAll(dagsDir, 0750))
|
||||
|
||||
// Create legacy data
|
||||
legacyDagDir := filepath.Join(dataDir, "test-dag-abc123")
|
||||
require.NoError(t, os.MkdirAll(legacyDagDir, 0750))
|
||||
|
||||
// Create legacy status
|
||||
legacyStatus := legacymodel.Status{
|
||||
RequestID: "req123",
|
||||
Name: "test-dag",
|
||||
Status: scheduler.StatusSuccess,
|
||||
StartedAt: time.Now().Add(-1 * time.Hour).Format(time.RFC3339),
|
||||
FinishedAt: time.Now().Add(-30 * time.Minute).Format(time.RFC3339),
|
||||
Nodes: []*legacymodel.Node{
|
||||
{
|
||||
Step: digraph.Step{Name: "step1"},
|
||||
Status: scheduler.NodeStatusSuccess,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Write legacy data file
|
||||
statusData, _ := json.Marshal(legacyStatus)
|
||||
datFile := filepath.Join(legacyDagDir, "test-dag.20240101.100000.req123.dat")
|
||||
require.NoError(t, os.WriteFile(datFile, statusData, 0600))
|
||||
|
||||
// Create DAG file
|
||||
dagPath := filepath.Join(dagsDir, "test-dag.yaml")
|
||||
testDAG := &digraph.DAG{
|
||||
Name: "test-dag",
|
||||
Location: dagPath,
|
||||
}
|
||||
require.NoError(t, os.WriteFile(dagPath, []byte("name: test-dag"), 0600))
|
||||
|
||||
// Set up stores
|
||||
dagRunStore := filedagrun.New(dagRunsDir)
|
||||
dagStore := &mockDAGStore{
|
||||
dags: map[string]*digraph.DAG{
|
||||
dagPath: testDAG,
|
||||
},
|
||||
}
|
||||
|
||||
// Create migrator
|
||||
migrator := NewHistoryMigrator(dagRunStore, dagStore, dataDir, dagsDir)
|
||||
|
||||
// Check migration is needed
|
||||
needsMigration, err := migrator.NeedsMigration(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, needsMigration)
|
||||
|
||||
// Run migration
|
||||
result, err := migrator.Migrate(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, result.TotalDAGs)
|
||||
assert.Equal(t, 1, result.TotalRuns)
|
||||
assert.Equal(t, 1, result.MigratedRuns)
|
||||
assert.Equal(t, 0, result.FailedRuns)
|
||||
|
||||
// Verify migration
|
||||
attempt, err := dagRunStore.FindAttempt(ctx, digraph.NewDAGRunRef("test-dag", "req123"))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, attempt)
|
||||
|
||||
status, err := attempt.ReadStatus(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "req123", status.DAGRunID)
|
||||
assert.Equal(t, "test-dag", status.Name)
|
||||
assert.Equal(t, scheduler.StatusSuccess, status.Status)
|
||||
assert.Len(t, status.Nodes, 1)
|
||||
|
||||
// Move legacy data
|
||||
err = migrator.MoveLegacyData(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify legacy directory was moved
|
||||
_, err = os.Stat(legacyDagDir)
|
||||
assert.True(t, os.IsNotExist(err))
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
package localdag
|
||||
package filedag
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -10,7 +10,6 @@ import (
|
||||
"regexp"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/dagu-org/dagu/internal/digraph"
|
||||
"github.com/dagu-org/dagu/internal/fileutil"
|
||||
@ -86,7 +85,6 @@ type Storage struct {
|
||||
flagsBaseDir string // Base directory for flag store
|
||||
fileCache *fileutil.Cache[*digraph.DAG] // Optional cache for DAG objects
|
||||
searchPaths []string // Additional search paths for DAG files
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// GetMetadata retrieves the metadata of a DAG by its name.
|
||||
@ -1,4 +1,4 @@
|
||||
package localdag
|
||||
package filedag
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -46,31 +46,31 @@ func TestListDAGsInSubdirectories(t *testing.T) {
|
||||
steps:
|
||||
- name: step1
|
||||
command: echo "root"`
|
||||
err := os.WriteFile(filepath.Join(tmpDir, "root-dag.yaml"), []byte(rootDAG), 0644)
|
||||
err := os.WriteFile(filepath.Join(tmpDir, "root-dag.yaml"), []byte(rootDAG), 0600)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create subdirectory and DAG
|
||||
subDir := filepath.Join(tmpDir, "subdir")
|
||||
err = os.MkdirAll(subDir, 0755)
|
||||
err = os.MkdirAll(subDir, 0750)
|
||||
require.NoError(t, err)
|
||||
|
||||
subDAG := `name: sub-dag
|
||||
steps:
|
||||
- name: step1
|
||||
command: echo "sub"`
|
||||
err = os.WriteFile(filepath.Join(subDir, "sub-dag.yaml"), []byte(subDAG), 0644)
|
||||
err = os.WriteFile(filepath.Join(subDir, "sub-dag.yaml"), []byte(subDAG), 0600)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create nested subdirectory and DAG
|
||||
nestedDir := filepath.Join(tmpDir, "subdir", "nested")
|
||||
err = os.MkdirAll(nestedDir, 0755)
|
||||
err = os.MkdirAll(nestedDir, 0750)
|
||||
require.NoError(t, err)
|
||||
|
||||
nestedDAG := `name: nested-dag
|
||||
steps:
|
||||
- name: step1
|
||||
command: echo "nested"`
|
||||
err = os.WriteFile(filepath.Join(nestedDir, "nested-dag.yaml"), []byte(nestedDAG), 0644)
|
||||
err = os.WriteFile(filepath.Join(nestedDir, "nested-dag.yaml"), []byte(nestedDAG), 0600)
|
||||
require.NoError(t, err)
|
||||
|
||||
// List all DAGs
|
||||
@ -101,7 +101,7 @@ tags: ["tag1", "tag2"]
|
||||
steps:
|
||||
- name: step1
|
||||
command: echo "hello"`
|
||||
err := os.WriteFile(filepath.Join(tmpDir, "test-dag.yaml"), []byte(dagContent), 0644)
|
||||
err := os.WriteFile(filepath.Join(tmpDir, "test-dag.yaml"), []byte(dagContent), 0600)
|
||||
require.NoError(t, err)
|
||||
|
||||
dag, err := store.GetMetadata(ctx, "test-dag")
|
||||
@ -131,7 +131,7 @@ schedule: "0 1 * * *"
|
||||
steps:
|
||||
- name: step1
|
||||
command: echo "detailed"`
|
||||
err := os.WriteFile(filepath.Join(tmpDir, "detailed-dag.yaml"), []byte(dagContent), 0644)
|
||||
err := os.WriteFile(filepath.Join(tmpDir, "detailed-dag.yaml"), []byte(dagContent), 0600)
|
||||
require.NoError(t, err)
|
||||
|
||||
dag, err := store.GetDetails(ctx, "detailed-dag")
|
||||
@ -161,7 +161,7 @@ func TestGetSpec(t *testing.T) {
|
||||
steps:
|
||||
- name: step1
|
||||
command: echo "spec"`
|
||||
err := os.WriteFile(filepath.Join(tmpDir, "spec-dag.yaml"), []byte(dagContent), 0644)
|
||||
err := os.WriteFile(filepath.Join(tmpDir, "spec-dag.yaml"), []byte(dagContent), 0600)
|
||||
require.NoError(t, err)
|
||||
|
||||
spec, err := store.GetSpec(ctx, "spec-dag")
|
||||
@ -1,4 +1,4 @@
|
||||
package localdagrun
|
||||
package filedagrun
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
@ -1,4 +1,4 @@
|
||||
package localdagrun
|
||||
package filedagrun
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -1,4 +1,4 @@
|
||||
package localdagrun
|
||||
package filedagrun
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -1,4 +1,4 @@
|
||||
package localdagrun
|
||||
package filedagrun
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -82,17 +82,17 @@ func TestListChildDAGRuns(t *testing.T) {
|
||||
|
||||
// Create child dag-run directory and some child dag-run directories
|
||||
childDir := filepath.Join(run.baseDir, ChildDAGRunsDir)
|
||||
require.NoError(t, os.MkdirAll(childDir, 0755))
|
||||
require.NoError(t, os.MkdirAll(childDir, 0750))
|
||||
|
||||
// Create two child dag-run directories
|
||||
child1Dir := filepath.Join(childDir, ChildDAGRunDirPrefix+"child1")
|
||||
child2Dir := filepath.Join(childDir, ChildDAGRunDirPrefix+"child2")
|
||||
require.NoError(t, os.MkdirAll(child1Dir, 0755))
|
||||
require.NoError(t, os.MkdirAll(child2Dir, 0755))
|
||||
require.NoError(t, os.MkdirAll(child1Dir, 0750))
|
||||
require.NoError(t, os.MkdirAll(child2Dir, 0750))
|
||||
|
||||
// Create a non-directory file (should be ignored)
|
||||
nonDirFile := filepath.Join(childDir, "not-a-directory.txt")
|
||||
require.NoError(t, os.WriteFile(nonDirFile, []byte("test"), 0644))
|
||||
require.NoError(t, os.WriteFile(nonDirFile, []byte("test"), 0600))
|
||||
|
||||
children, err := run.ListChildDAGRuns(run.Context)
|
||||
require.NoError(t, err)
|
||||
@ -115,7 +115,7 @@ func TestDAGRunListRuns(t *testing.T) {
|
||||
|
||||
// Remove the run that was created by CreateTestDAGRun
|
||||
require.NoError(t, os.RemoveAll(run.baseDir))
|
||||
require.NoError(t, os.MkdirAll(run.baseDir, 0755))
|
||||
require.NoError(t, os.MkdirAll(run.baseDir, 0750))
|
||||
|
||||
runs, err := run.ListAttempts(run.Context)
|
||||
require.NoError(t, err)
|
||||
@ -202,7 +202,7 @@ func TestRemoveLogFiles(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, logFile := range logFiles {
|
||||
require.NoError(t, os.WriteFile(logFile, []byte("test log content"), 0644))
|
||||
require.NoError(t, os.WriteFile(logFile, []byte("test log content"), 0600))
|
||||
}
|
||||
|
||||
root := setupTestDataRoot(t)
|
||||
@ -261,7 +261,7 @@ func TestRemoveLogFiles(t *testing.T) {
|
||||
|
||||
allLogFiles := append(parentLogFiles, childLogFiles...)
|
||||
for _, logFile := range allLogFiles {
|
||||
require.NoError(t, os.WriteFile(logFile, []byte("test log content"), 0644))
|
||||
require.NoError(t, os.WriteFile(logFile, []byte("test log content"), 0600))
|
||||
}
|
||||
|
||||
root := setupTestDataRoot(t)
|
||||
@ -286,10 +286,10 @@ func TestRemoveLogFiles(t *testing.T) {
|
||||
|
||||
// Create child dag-run directory
|
||||
childDir := filepath.Join(run.baseDir, ChildDAGRunsDir)
|
||||
require.NoError(t, os.MkdirAll(childDir, 0755))
|
||||
require.NoError(t, os.MkdirAll(childDir, 0750))
|
||||
|
||||
childDAGRunDir := filepath.Join(childDir, ChildDAGRunDirPrefix+"child1")
|
||||
require.NoError(t, os.MkdirAll(childDAGRunDir, 0755))
|
||||
require.NoError(t, os.MkdirAll(childDAGRunDir, 0750))
|
||||
|
||||
childDAGRun, err := NewDAGRun(childDAGRunDir)
|
||||
require.NoError(t, err)
|
||||
@ -339,7 +339,7 @@ func TestDAGRunRemove(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, logFile := range logFiles {
|
||||
require.NoError(t, os.WriteFile(logFile, []byte("test log content"), 0644))
|
||||
require.NoError(t, os.WriteFile(logFile, []byte("test log content"), 0600))
|
||||
}
|
||||
|
||||
root := setupTestDataRoot(t)
|
||||
@ -409,7 +409,7 @@ func TestDAGRunRemove(t *testing.T) {
|
||||
|
||||
allLogFiles := append(append(parentLogFiles, child1LogFiles...), child2LogFiles...)
|
||||
for _, logFile := range allLogFiles {
|
||||
require.NoError(t, os.WriteFile(logFile, []byte("test log content"), 0644))
|
||||
require.NoError(t, os.WriteFile(logFile, []byte("test log content"), 0600))
|
||||
}
|
||||
|
||||
root := setupTestDataRoot(t)
|
||||
@ -434,7 +434,7 @@ func TestDAGRunRemove(t *testing.T) {
|
||||
|
||||
// Create child dag-run directory
|
||||
childDir := filepath.Join(run.baseDir, ChildDAGRunsDir)
|
||||
require.NoError(t, os.MkdirAll(childDir, 0755))
|
||||
require.NoError(t, os.MkdirAll(childDir, 0750))
|
||||
|
||||
// Create two child dag-run with their own log files
|
||||
childDAGRuns := []struct {
|
||||
@ -447,7 +447,7 @@ func TestDAGRunRemove(t *testing.T) {
|
||||
|
||||
for _, child := range childDAGRuns {
|
||||
childDAGRunDir := filepath.Join(childDir, ChildDAGRunDirPrefix+child.dagRunID)
|
||||
require.NoError(t, os.MkdirAll(childDAGRunDir, 0755))
|
||||
require.NoError(t, os.MkdirAll(childDAGRunDir, 0750))
|
||||
|
||||
childDAGRun, err := NewDAGRun(childDAGRunDir)
|
||||
require.NoError(t, err)
|
||||
@ -1,4 +1,4 @@
|
||||
package localdagrun
|
||||
package filedagrun
|
||||
|
||||
import (
|
||||
// nolint: gosec
|
||||
@ -18,7 +18,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/dagu-org/dagu/internal/digraph"
|
||||
"github.com/dagu-org/dagu/internal/fileutil"
|
||||
"github.com/dagu-org/dagu/internal/logger"
|
||||
"github.com/dagu-org/dagu/internal/models"
|
||||
@ -28,11 +27,10 @@ import (
|
||||
// It handles the organization of run data in a hierarchical structure
|
||||
// based on year, month, and day.
|
||||
type DataRoot struct {
|
||||
baseDir string // Base directory for all DAGs
|
||||
prefix string // Sanitized prefix for directory names
|
||||
dagRunsDir string // Path to the dag-runs directory
|
||||
globPattern string // Pattern for finding run directories
|
||||
root *digraph.DAGRunRef // Optional reference to the root DAG
|
||||
baseDir string // Base directory for all DAGs
|
||||
prefix string // Sanitized prefix for directory names
|
||||
dagRunsDir string // Path to the dag-runs directory
|
||||
globPattern string // Pattern for finding run directories
|
||||
}
|
||||
|
||||
// NewDataRoot creates a new DataRoot instance for managing a DAG's run history.
|
||||
@ -1,4 +1,4 @@
|
||||
package localdagrun
|
||||
package filedagrun
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -178,16 +178,16 @@ func TestDataRootRemoveOld(t *testing.T) {
|
||||
createAttemptWithStatus(dagRun2, ts2)
|
||||
|
||||
// Verify dag-runs exist
|
||||
assert.True(t, fileutil.FileExists(dagRun1.DAGRun.baseDir), "dag-run 1 should exist before cleanup")
|
||||
assert.True(t, fileutil.FileExists(dagRun2.DAGRun.baseDir), "dag-run 2 should exist before cleanup")
|
||||
assert.True(t, fileutil.FileExists(dagRun1.baseDir), "dag-run 1 should exist before cleanup")
|
||||
assert.True(t, fileutil.FileExists(dagRun2.baseDir), "dag-run 2 should exist before cleanup")
|
||||
|
||||
// Remove all dag-runs (retention = 0)
|
||||
err := root.RemoveOld(root.Context, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify all dag-runs are removed
|
||||
assert.False(t, fileutil.FileExists(dagRun1.DAGRun.baseDir), "dag-run 1 should be removed")
|
||||
assert.False(t, fileutil.FileExists(dagRun2.DAGRun.baseDir), "dag-run 2 should be removed")
|
||||
assert.False(t, fileutil.FileExists(dagRun1.baseDir), "dag-run 1 should be removed")
|
||||
assert.False(t, fileutil.FileExists(dagRun2.baseDir), "dag-run 2 should be removed")
|
||||
})
|
||||
|
||||
t.Run("KeepRecentWhenRetentionIsPositive", func(t *testing.T) {
|
||||
@ -224,16 +224,16 @@ func TestDataRootRemoveOld(t *testing.T) {
|
||||
createAttemptWithStatus(dagRun2, recentTime)
|
||||
|
||||
// Verify dag-runs exist
|
||||
assert.True(t, fileutil.FileExists(dagRun1.DAGRun.baseDir), "Old dag-run should exist before cleanup")
|
||||
assert.True(t, fileutil.FileExists(dagRun2.DAGRun.baseDir), "Recent dag-run should exist before cleanup")
|
||||
assert.True(t, fileutil.FileExists(dagRun1.baseDir), "Old dag-run should exist before cleanup")
|
||||
assert.True(t, fileutil.FileExists(dagRun2.baseDir), "Recent dag-run should exist before cleanup")
|
||||
|
||||
// Remove dag-runs older than 7 days (should remove old but keep recent)
|
||||
err := root.RemoveOld(root.Context, 7)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify old dag-run is removed but recent one is kept
|
||||
assert.False(t, fileutil.FileExists(dagRun1.DAGRun.baseDir), "Old dag-run should be removed")
|
||||
assert.True(t, fileutil.FileExists(dagRun2.DAGRun.baseDir), "Recent dag-run should be kept")
|
||||
assert.False(t, fileutil.FileExists(dagRun1.baseDir), "Old dag-run should be removed")
|
||||
assert.True(t, fileutil.FileExists(dagRun2.baseDir), "Recent dag-run should be kept")
|
||||
})
|
||||
|
||||
t.Run("RemoveEmptyDirectories", func(t *testing.T) {
|
||||
@ -270,16 +270,16 @@ func TestDataRootRemoveOld(t *testing.T) {
|
||||
createAttemptWithStatus(dagRun2, date2)
|
||||
|
||||
// Verify directory structure exists
|
||||
assert.True(t, fileutil.FileExists(dagRun1.DAGRun.baseDir), "dag-run 1 should exist")
|
||||
assert.True(t, fileutil.FileExists(dagRun2.DAGRun.baseDir), "dag-run 2 should exist")
|
||||
assert.True(t, fileutil.FileExists(dagRun1.baseDir), "dag-run 1 should exist")
|
||||
assert.True(t, fileutil.FileExists(dagRun2.baseDir), "dag-run 2 should exist")
|
||||
|
||||
// Remove all old dag-runs (retention = 0)
|
||||
err := root.RemoveOld(root.Context, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify dag-runs are removed
|
||||
assert.False(t, fileutil.FileExists(dagRun1.DAGRun.baseDir), "dag-run 1 should be removed")
|
||||
assert.False(t, fileutil.FileExists(dagRun2.DAGRun.baseDir), "dag-run 2 should be removed")
|
||||
assert.False(t, fileutil.FileExists(dagRun1.baseDir), "dag-run 1 should be removed")
|
||||
assert.False(t, fileutil.FileExists(dagRun2.baseDir), "dag-run 2 should be removed")
|
||||
|
||||
// Verify that the cleanup also removes empty directories
|
||||
// The method should clean up empty year/month/day directories
|
||||
@ -1,4 +1,4 @@
|
||||
package localdagrun
|
||||
package filedagrun
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -1,4 +1,4 @@
|
||||
package localdagrun
|
||||
package filedagrun
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -379,7 +379,7 @@ func (store *Store) FindAttempt(ctx context.Context, ref digraph.DAGRunRef) (mod
|
||||
// Check for context cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, fmt.Errorf("Find canceled: %w", ctx.Err())
|
||||
return nil, fmt.Errorf("find canceled: %w", ctx.Err())
|
||||
default:
|
||||
// Continue with operation
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
package localdagrun
|
||||
package filedagrun
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -1,4 +1,4 @@
|
||||
package localdagrun
|
||||
package filedagrun
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
@ -1,4 +1,4 @@
|
||||
package localdagrun
|
||||
package filedagrun
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -1,4 +1,4 @@
|
||||
package localproc
|
||||
package fileproc
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -47,7 +47,7 @@ func NewProcHandler(file string, meta models.ProcMeta) *ProcHandle {
|
||||
}
|
||||
|
||||
// Stop implements models.Proc.
|
||||
func (p *ProcHandle) Stop(ctx context.Context) error {
|
||||
func (p *ProcHandle) Stop(_ context.Context) error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if !p.started.Load() {
|
||||
@ -69,11 +69,11 @@ func (p *ProcHandle) startHeartbeat(ctx context.Context) error {
|
||||
|
||||
// Ensure the directory exists
|
||||
dir := filepath.Dir(p.fileName)
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
if err := os.MkdirAll(dir, 0750); err != nil {
|
||||
return fmt.Errorf("failed to create directory: %w", err)
|
||||
}
|
||||
|
||||
fd, err := os.OpenFile(p.fileName, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0644)
|
||||
fd, err := os.OpenFile(p.fileName, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0600)
|
||||
if err != nil {
|
||||
p.started.Store(false)
|
||||
return err
|
||||
@ -139,7 +139,7 @@ func (p *ProcHandle) startHeartbeat(ctx context.Context) error {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
binary.BigEndian.PutUint64(buf, uint64(time.Now().Unix()))
|
||||
binary.BigEndian.PutUint64(buf, uint64(time.Now().Unix())) // nolint:gosec
|
||||
if _, err := fd.WriteAt(buf, 0); err != nil {
|
||||
logger.Error(ctx, "Failed to write heartbeat", "err", err)
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
package localproc
|
||||
package fileproc
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -1,4 +1,4 @@
|
||||
package localproc
|
||||
package fileproc
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -40,7 +40,7 @@ func NewProcGroup(baseDir, name string, staleTime time.Duration) *ProcGroup {
|
||||
}
|
||||
|
||||
// Count retrieves the count of alive proc files for the specified DAG name.
|
||||
func (pg *ProcGroup) Count(ctx context.Context, name string) (int, error) {
|
||||
func (pg *ProcGroup) Count(ctx context.Context) (int, error) {
|
||||
pg.mu.Lock()
|
||||
defer pg.mu.Unlock()
|
||||
|
||||
@ -88,7 +88,7 @@ func (pg *ProcGroup) isStale(ctx context.Context, file string) bool {
|
||||
}
|
||||
|
||||
// Check if the file is stale by checking its content (timestamp).
|
||||
data, err := os.ReadFile(file)
|
||||
data, err := os.ReadFile(file) // nolint:gosec
|
||||
if err != nil {
|
||||
logger.Warn(ctx, "failed to read file %s: %v", file, err)
|
||||
return false
|
||||
@ -103,7 +103,7 @@ func (pg *ProcGroup) isStale(ctx context.Context, file string) bool {
|
||||
}
|
||||
|
||||
// Parse the timestamp from the file
|
||||
unixTime = int64(binary.BigEndian.Uint64(data[:8]))
|
||||
unixTime = int64(binary.BigEndian.Uint64(data[:8])) // nolint:gosec
|
||||
parsedTime := time.Unix
|
||||
duration := time.Since(time.Unix(unixTime, 0))
|
||||
if duration < pg.staleTime {
|
||||
@ -116,7 +116,7 @@ func (pg *ProcGroup) isStale(ctx context.Context, file string) bool {
|
||||
|
||||
// GetProc retrieves a proc file for the specified dag-run reference.
|
||||
// It returns a new Proc instance with the generated file name.
|
||||
func (pg *ProcGroup) Acquire(ctx context.Context, dagRun digraph.DAGRunRef) (*ProcHandle, error) {
|
||||
func (pg *ProcGroup) Acquire(_ context.Context, dagRun digraph.DAGRunRef) (*ProcHandle, error) {
|
||||
// Sanity check the dag-run reference
|
||||
if pg.name != dagRun.Name {
|
||||
return nil, fmt.Errorf("DAG name %s does not match proc file name %s", dagRun.Name, pg.name)
|
||||
@ -1,4 +1,4 @@
|
||||
package localproc
|
||||
package fileproc
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -43,7 +43,7 @@ func TestProcGroup(t *testing.T) {
|
||||
}()
|
||||
|
||||
// Check if the count is 1
|
||||
count, err := procFiles.Count(ctx, name)
|
||||
count, err := procFiles.Count(ctx)
|
||||
require.NoError(t, err, "failed to count proc files")
|
||||
require.Equal(t, 1, count, "expected 1 proc file")
|
||||
|
||||
@ -51,7 +51,7 @@ func TestProcGroup(t *testing.T) {
|
||||
<-done
|
||||
|
||||
// Check if the count is 0
|
||||
count, err = procFiles.Count(ctx, name)
|
||||
count, err = procFiles.Count(ctx)
|
||||
require.NoError(t, err, "failed to count proc files")
|
||||
require.Equal(t, 0, count, "expected 0 proc files")
|
||||
}
|
||||
@ -66,7 +66,7 @@ func TestProcGroup_Empty(t *testing.T) {
|
||||
procFiles := NewProcGroup(baseDir, name, time.Hour)
|
||||
|
||||
// Check if the count is 0
|
||||
count, err := procFiles.Count(ctx, name)
|
||||
count, err := procFiles.Count(ctx)
|
||||
require.NoError(t, err, "failed to count proc files")
|
||||
require.Equal(t, 0, count, "expected 0 proc files")
|
||||
}
|
||||
@ -88,7 +88,7 @@ func TestProcGroup_IsStale(t *testing.T) {
|
||||
require.NoError(t, err, "failed to get proc")
|
||||
|
||||
// Make sure the directory exists
|
||||
err = os.MkdirAll(filepath.Dir(proc.fileName), 0755)
|
||||
err = os.MkdirAll(filepath.Dir(proc.fileName), 0750)
|
||||
require.NoError(t, err, "failed to create proc directory")
|
||||
|
||||
// Create the proc file
|
||||
@ -99,6 +99,7 @@ func TestProcGroup_IsStale(t *testing.T) {
|
||||
buf := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(buf, uint64(time.Now().Add(-pg.staleTime).Unix()))
|
||||
_, err = fd.WriteAt(buf, 0)
|
||||
require.NoError(t, err, "failed to write timestamp to proc file")
|
||||
|
||||
// Close the file
|
||||
_ = fd.Sync()
|
||||
@ -106,7 +107,7 @@ func TestProcGroup_IsStale(t *testing.T) {
|
||||
|
||||
// Check the count of alive proc files is still 1 because the file is not stale yet
|
||||
// due to the modification time
|
||||
count, err := pg.Count(ctx, name)
|
||||
count, err := pg.Count(ctx)
|
||||
require.NoError(t, err, "failed to count proc files")
|
||||
require.Equal(t, 1, count, "expected 1 proc file")
|
||||
|
||||
@ -115,7 +116,7 @@ func TestProcGroup_IsStale(t *testing.T) {
|
||||
require.NoError(t, err, "failed to update file times")
|
||||
|
||||
// Check the count of alive proc files is 0 because the file is stale
|
||||
count, err = pg.Count(ctx, name)
|
||||
count, err = pg.Count(ctx)
|
||||
require.NoError(t, err, "failed to count proc files")
|
||||
require.Equal(t, 0, count, "expected 0 proc files")
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
package localproc
|
||||
package fileproc
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -34,7 +34,7 @@ func (s *Store) CountAlive(ctx context.Context, name string) (int, error) {
|
||||
s.procGroups.Store(name, NewProcGroup(pgBaseDir, name, s.staleTime))
|
||||
}
|
||||
pg, _ := s.procGroups.Load(name)
|
||||
return pg.(*ProcGroup).Count(ctx, name)
|
||||
return pg.(*ProcGroup).Count(ctx)
|
||||
}
|
||||
|
||||
// Acquire implements models.ProcStore.
|
||||
@ -1,4 +1,4 @@
|
||||
package localproc
|
||||
package fileproc
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -1,4 +1,4 @@
|
||||
package localqueue
|
||||
package filequeue
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
@ -31,5 +31,5 @@ func (j *Job) ID() string {
|
||||
|
||||
// Data implements models.QueuedItem.
|
||||
func (j *Job) Data() digraph.DAGRunRef {
|
||||
return j.ItemData.DAGRun
|
||||
return j.DAGRun
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
package localqueue
|
||||
package filequeue
|
||||
|
||||
import (
|
||||
"testing"
|
||||
@ -1,4 +1,4 @@
|
||||
package localqueue
|
||||
package filequeue
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -1,4 +1,4 @@
|
||||
package localqueue
|
||||
package filequeue
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
@ -28,12 +28,14 @@ func TestQueue(t *testing.T) {
|
||||
Name: "test-name",
|
||||
ID: "low-priority-dag-run",
|
||||
})
|
||||
require.NoError(t, err, "expected no error when adding job to queue")
|
||||
|
||||
// Add a high priority job to the queue
|
||||
err = queue.Enqueue(th.Context, models.QueuePriorityHigh, digraph.DAGRunRef{
|
||||
Name: "test-name",
|
||||
ID: "high-priority-dag-run",
|
||||
})
|
||||
require.NoError(t, err, "expected no error when adding job to queue")
|
||||
|
||||
// Check if the queue length is 2
|
||||
queueLen, err = queue.Len(th.Context)
|
||||
@ -1,4 +1,4 @@
|
||||
package localqueue
|
||||
package filequeue
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -61,7 +61,7 @@ type ItemData struct {
|
||||
|
||||
// Push adds a job to the queue
|
||||
// Since it's a prototype, it just create a json file with the job ID and dag-run reference
|
||||
func (q *QueueFile) Push(ctx context.Context, dagRun digraph.DAGRunRef) error {
|
||||
func (q *QueueFile) Push(_ context.Context, dagRun digraph.DAGRunRef) error {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
@ -74,7 +74,7 @@ func (q *QueueFile) Push(ctx context.Context, dagRun digraph.DAGRunRef) error {
|
||||
fullPath := filepath.Join(q.baseDir, fileName)
|
||||
|
||||
// Create the directory if it doesn't exist
|
||||
if err := os.MkdirAll(q.baseDir, os.ModePerm); err != nil {
|
||||
if err := os.MkdirAll(q.baseDir, 0750); err != nil { // nolint: gosec
|
||||
return fmt.Errorf("failed to create directory %s: %w", q.baseDir, err)
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
package localqueue
|
||||
package filequeue
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
@ -53,7 +53,7 @@ func TestQueueFile(t *testing.T) {
|
||||
require.Regexp(t, "^item_high_", job.FileName, "expected job file name to start with 'item_priority_'")
|
||||
|
||||
// Check if the queue is empty again
|
||||
queueLen, err = qf.Len(th.Context)
|
||||
_, err = qf.Len(th.Context)
|
||||
require.NoError(t, err, "expected no error when getting queue length")
|
||||
|
||||
// Check if pop returns an error when the queue is empty
|
||||
@ -1,4 +1,4 @@
|
||||
package localqueue
|
||||
package filequeue
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -75,7 +75,7 @@ func (q *queueReaderImpl) Start(ctx context.Context, ch chan<- models.QueuedItem
|
||||
baseDir := q.store.BaseDir()
|
||||
if err := q.setupWatcher(ctx, watcher, baseDir); err != nil {
|
||||
logger.Warn(ctx, "Failed to setup file watcher, falling back to polling only", "err", err)
|
||||
watcher.Close()
|
||||
_ = watcher.Close()
|
||||
watcher = nil
|
||||
}
|
||||
}
|
||||
@ -89,7 +89,7 @@ func (q *queueReaderImpl) Start(ctx context.Context, ch chan<- models.QueuedItem
|
||||
if err != nil {
|
||||
q.running.Store(false)
|
||||
if watcher != nil {
|
||||
watcher.Close()
|
||||
_ = watcher.Close()
|
||||
}
|
||||
return fmt.Errorf("failed to read initial items: %w", err)
|
||||
}
|
||||
@ -357,7 +357,7 @@ func (q *queueReaderImpl) setupWatcher(ctx context.Context, watcher filenotify.F
|
||||
}
|
||||
|
||||
// Create base directory if it doesn't exist
|
||||
if err := os.MkdirAll(baseDir, 0755); err != nil {
|
||||
if err := os.MkdirAll(baseDir, 0750); err != nil {
|
||||
return fmt.Errorf("failed to create base directory %s: %w", baseDir, err)
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
package localqueue
|
||||
package filequeue
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -1,4 +1,4 @@
|
||||
package localqueue
|
||||
package filequeue
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -7,7 +7,6 @@ import (
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/dagu-org/dagu/internal/digraph"
|
||||
"github.com/dagu-org/dagu/internal/logger"
|
||||
@ -25,10 +24,6 @@ type Store struct {
|
||||
// queues is a map of queues, where the key is the queue name (DAG name)
|
||||
queues map[string]*DualQueue
|
||||
mu sync.Mutex
|
||||
|
||||
// cache for the last fetched items
|
||||
lastFetched time.Time
|
||||
cache []models.QueuedItemData
|
||||
}
|
||||
|
||||
// All implements models.QueueStore.
|
||||
@ -161,7 +156,7 @@ func (s *Store) Enqueue(ctx context.Context, name string, p models.QueuePriority
|
||||
}
|
||||
|
||||
// Reader implements models.QueueStore.
|
||||
func (s *Store) Reader(ctx context.Context) models.QueueReader {
|
||||
func (s *Store) Reader(_ context.Context) models.QueueReader {
|
||||
return newQueueReader(s)
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
package localqueue
|
||||
package filequeue
|
||||
|
||||
import (
|
||||
"testing"
|
||||
@ -73,6 +73,7 @@ func TestStore_DequeueByDAGRunID(t *testing.T) {
|
||||
Name: "test-name",
|
||||
ID: "test-dag-2",
|
||||
})
|
||||
require.NoError(t, err, "expected no error when adding job to store")
|
||||
|
||||
// Check if dequeue by dag-run ID returns the job
|
||||
jobs, err := store.DequeueByDAGRunID(th.Context, "test-name", "test-dag-2")
|
||||
28
internal/persistence/legacy/interface.go
Normal file
28
internal/persistence/legacy/interface.go
Normal file
@ -0,0 +1,28 @@
|
||||
package legacy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/dagu-org/dagu/internal/persistence/legacy/model"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrRequestIDNotFound = fmt.Errorf("request id not found")
|
||||
ErrNoStatusDataToday = fmt.Errorf("no status data today")
|
||||
ErrNoStatusData = fmt.Errorf("no status data")
|
||||
)
|
||||
|
||||
type HistoryStore interface {
|
||||
Open(ctx context.Context, key string, timestamp time.Time, requestID string) error
|
||||
Write(ctx context.Context, status model.Status) error
|
||||
Close(ctx context.Context) error
|
||||
Update(ctx context.Context, key, requestID string, status model.Status) error
|
||||
ReadStatusRecent(ctx context.Context, key string, itemLimit int) []model.StatusFile
|
||||
ReadStatusToday(ctx context.Context, key string) (*model.Status, error)
|
||||
FindByRequestID(ctx context.Context, key string, requestID string) (*model.StatusFile, error)
|
||||
RemoveAll(ctx context.Context, key string) error
|
||||
RemoveOld(ctx context.Context, key string, retentionDays int) error
|
||||
Rename(ctx context.Context, oldKey, newKey string) error
|
||||
}
|
||||
488
internal/persistence/legacy/jsondb/jsondb.go
Normal file
488
internal/persistence/legacy/jsondb/jsondb.go
Normal file
@ -0,0 +1,488 @@
|
||||
package jsondb
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
|
||||
// nolint: gosec
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/dagu-org/dagu/internal/fileutil"
|
||||
"github.com/dagu-org/dagu/internal/logger"
|
||||
"github.com/dagu-org/dagu/internal/persistence/legacy"
|
||||
"github.com/dagu-org/dagu/internal/persistence/legacy/model"
|
||||
"github.com/dagu-org/dagu/internal/stringutil"
|
||||
)
|
||||
|
||||
var (
|
||||
errRequestIDNotFound = errors.New("request ID not found")
|
||||
errCreateNewDirectory = errors.New("failed to create new directory")
|
||||
errKeyEmpty = errors.New("dagFile is empty")
|
||||
|
||||
// rTimestamp is a regular expression to match the timestamp in the file name.
|
||||
rTimestamp = regexp.MustCompile(`2\d{7}\.\d{2}:\d{2}:\d{2}\.\d{3}|2\d{7}\.\d{2}:\d{2}:\d{2}\.\d{3}Z`)
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Location string
|
||||
LatestStatusToday bool
|
||||
}
|
||||
|
||||
const (
|
||||
requestIDLenSafe = 8
|
||||
extDat = ".dat"
|
||||
dateTimeFormatUTC = "20060102.15:04:05.000Z"
|
||||
dateTimeFormat = "20060102.15:04:05.000"
|
||||
dateFormat = "20060102"
|
||||
)
|
||||
|
||||
var _ legacy.HistoryStore = (*JSONDB)(nil)
|
||||
|
||||
// JSONDB manages DAGs status files in local storage.
|
||||
type JSONDB struct {
|
||||
baseDir string
|
||||
latestStatusToday bool
|
||||
writer *writer
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
type Options struct {
|
||||
LatestStatusToday bool
|
||||
}
|
||||
|
||||
func WithLatestStatusToday(latestStatusToday bool) Option {
|
||||
return func(o *Options) {
|
||||
o.LatestStatusToday = latestStatusToday
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a new JSONDB instance.
|
||||
func New(baseDir string, opts ...Option) *JSONDB {
|
||||
options := &Options{
|
||||
LatestStatusToday: true,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(options)
|
||||
}
|
||||
return &JSONDB{
|
||||
baseDir: baseDir,
|
||||
latestStatusToday: options.LatestStatusToday,
|
||||
}
|
||||
}
|
||||
|
||||
func (db *JSONDB) Update(ctx context.Context, key, requestID string, status model.Status) error {
|
||||
statusFile, err := db.FindByRequestID(ctx, key, requestID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
writer := newWriter(statusFile.File)
|
||||
if err := writer.open(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = writer.close()
|
||||
}()
|
||||
|
||||
return writer.write(status)
|
||||
}
|
||||
|
||||
func (db *JSONDB) Open(ctx context.Context, key string, timestamp time.Time, requestID string) error {
|
||||
filePath, err := db.generateFilePath(key, newUTC(timestamp), requestID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Infof(ctx, "Initializing status file: %s", filePath)
|
||||
|
||||
writer := newWriter(filePath)
|
||||
if err := writer.open(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
db.writer = writer
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *JSONDB) Write(_ context.Context, status model.Status) error {
|
||||
return db.writer.write(status)
|
||||
}
|
||||
|
||||
func (db *JSONDB) Close(ctx context.Context) error {
|
||||
if db.writer == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = db.writer.close()
|
||||
db.writer = nil
|
||||
}()
|
||||
|
||||
if err := db.Compact(ctx, db.writer.target); err != nil {
|
||||
return err
|
||||
}
|
||||
return db.writer.close()
|
||||
}
|
||||
|
||||
func (db *JSONDB) ReadStatusRecent(_ context.Context, key string, itemLimit int) []model.StatusFile {
|
||||
var ret []model.StatusFile
|
||||
|
||||
files := db.getLatestMatches(db.globPattern(key), itemLimit)
|
||||
for _, file := range files {
|
||||
status, err := db.parseStatusFile(file)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
ret = append(ret, model.StatusFile{
|
||||
File: file,
|
||||
Status: *status,
|
||||
})
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func (db *JSONDB) ReadStatusToday(_ context.Context, key string) (*model.Status, error) {
|
||||
file, err := db.latestToday(key, time.Now(), db.latestStatusToday)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return db.parseStatusFile(file)
|
||||
}
|
||||
|
||||
func (db *JSONDB) FindByRequestID(_ context.Context, key string, requestID string) (*model.StatusFile, error) {
|
||||
if requestID == "" {
|
||||
return nil, errRequestIDNotFound
|
||||
}
|
||||
|
||||
matches, err := filepath.Glob(db.globPattern(key))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sort.Sort(sort.Reverse(sort.StringSlice(matches)))
|
||||
for _, match := range matches {
|
||||
status, err := ParseStatusFile(match)
|
||||
if err != nil {
|
||||
log.Printf("parsing failed %s : %s", match, err)
|
||||
continue
|
||||
}
|
||||
if status != nil && status.RequestID == requestID {
|
||||
return &model.StatusFile{
|
||||
File: match,
|
||||
Status: *status,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("%w : %s", legacy.ErrRequestIDNotFound, requestID)
|
||||
}
|
||||
|
||||
func (db *JSONDB) RemoveAll(ctx context.Context, key string) error {
|
||||
return db.RemoveOld(ctx, key, 0)
|
||||
}
|
||||
|
||||
func (db *JSONDB) RemoveOld(_ context.Context, key string, retentionDays int) error {
|
||||
if retentionDays < 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
matches, err := filepath.Glob(db.globPattern(key))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
oldDate := time.Now().AddDate(0, 0, -retentionDays)
|
||||
var lastErr error
|
||||
for _, m := range matches {
|
||||
info, err := os.Stat(m)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if info.ModTime().Before(oldDate) {
|
||||
if err := os.Remove(m); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func (db *JSONDB) Compact(_ context.Context, targetFilePath string) error {
|
||||
status, err := ParseStatusFile(targetFilePath)
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %s", err, targetFilePath)
|
||||
}
|
||||
|
||||
newFile := fmt.Sprintf("%s_c.dat", strings.TrimSuffix(filepath.Base(targetFilePath), filepath.Ext(targetFilePath)))
|
||||
tempFilePath := filepath.Join(filepath.Dir(targetFilePath), newFile)
|
||||
writer := newWriter(tempFilePath)
|
||||
if err := writer.open(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = writer.close()
|
||||
}()
|
||||
|
||||
if err := writer.write(*status); err != nil {
|
||||
if removeErr := os.Remove(tempFilePath); removeErr != nil {
|
||||
return fmt.Errorf("%w: %s", err, removeErr)
|
||||
}
|
||||
return fmt.Errorf("%w: %s", err, tempFilePath)
|
||||
}
|
||||
|
||||
// remove the original file
|
||||
if err := os.Remove(targetFilePath); err != nil {
|
||||
return fmt.Errorf("%w: %s", err, targetFilePath)
|
||||
}
|
||||
|
||||
// rename the file to the original
|
||||
if err := os.Rename(tempFilePath, targetFilePath); err != nil {
|
||||
return fmt.Errorf("%w: %s", err, targetFilePath)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *JSONDB) Rename(_ context.Context, oldKey, newKey string) error {
|
||||
if !filepath.IsAbs(oldKey) || !filepath.IsAbs(newKey) {
|
||||
return fmt.Errorf("invalid path: %s -> %s", oldKey, newKey)
|
||||
}
|
||||
|
||||
oldDir := db.getDirectory(oldKey, getPrefix(oldKey))
|
||||
if !db.exists(oldDir) {
|
||||
return nil
|
||||
}
|
||||
|
||||
newDir := db.getDirectory(newKey, getPrefix(newKey))
|
||||
if !db.exists(newDir) {
|
||||
if err := os.MkdirAll(newDir, 0750); err != nil {
|
||||
return fmt.Errorf("%w: %s : %s", errCreateNewDirectory, newDir, err)
|
||||
}
|
||||
}
|
||||
|
||||
matches, err := filepath.Glob(db.globPattern(oldKey))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
oldPrefix := filepath.Base(db.createPrefix(oldKey))
|
||||
newPrefix := filepath.Base(db.createPrefix(newKey))
|
||||
for _, m := range matches {
|
||||
base := filepath.Base(m)
|
||||
f := strings.Replace(base, oldPrefix, newPrefix, 1)
|
||||
if err := os.Rename(m, filepath.Join(newDir, f)); err != nil {
|
||||
log.Printf("failed to rename %s to %s: %s", m, f, err)
|
||||
}
|
||||
}
|
||||
if files, _ := os.ReadDir(oldDir); len(files) == 0 {
|
||||
_ = os.Remove(oldDir)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *JSONDB) parseStatusFile(file string) (*model.Status, error) {
|
||||
return ParseStatusFile(file)
|
||||
}
|
||||
|
||||
func (db *JSONDB) getDirectory(key string, prefix string) string {
|
||||
if key != prefix {
|
||||
// Add a hash postfix to the directory name to avoid conflicts.
|
||||
// nolint: gosec
|
||||
h := md5.New()
|
||||
_, _ = h.Write([]byte(key))
|
||||
v := hex.EncodeToString(h.Sum(nil))
|
||||
return filepath.Join(db.baseDir, fmt.Sprintf("%s-%s", prefix, v))
|
||||
}
|
||||
|
||||
return filepath.Join(db.baseDir, key)
|
||||
}
|
||||
|
||||
func (db *JSONDB) generateFilePath(key string, timestamp timeInUTC, requestID string) (string, error) {
|
||||
if key == "" {
|
||||
return "", errKeyEmpty
|
||||
}
|
||||
prefix := db.createPrefix(key)
|
||||
timestampString := timestamp.Format(dateTimeFormatUTC)
|
||||
requestID = stringutil.TruncString(requestID, requestIDLenSafe)
|
||||
return fmt.Sprintf("%s.%s.%s.dat", prefix, timestampString, requestID), nil
|
||||
}
|
||||
|
||||
func (db *JSONDB) latestToday(key string, day time.Time, latestStatusToday bool) (string, error) {
|
||||
prefix := db.createPrefix(key)
|
||||
pattern := fmt.Sprintf("%s.*.*.dat", prefix)
|
||||
|
||||
matches, err := filepath.Glob(pattern)
|
||||
if err != nil || len(matches) == 0 {
|
||||
return "", legacy.ErrNoStatusDataToday
|
||||
}
|
||||
|
||||
ret := filterLatest(matches, 1)
|
||||
if len(ret) == 0 {
|
||||
return "", legacy.ErrNoStatusData
|
||||
}
|
||||
|
||||
startOfDay := day.Truncate(24 * time.Hour)
|
||||
startOfDayInUTC := newUTC(startOfDay)
|
||||
if latestStatusToday {
|
||||
timestamp, err := findTimestamp(ret[0])
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if timestamp.Before(startOfDayInUTC.Time) {
|
||||
return "", legacy.ErrNoStatusDataToday
|
||||
}
|
||||
}
|
||||
|
||||
return ret[0], nil
|
||||
}
|
||||
|
||||
func (s *JSONDB) getLatestMatches(pattern string, itemLimit int) []string {
|
||||
matches, err := filepath.Glob(pattern)
|
||||
if err != nil || len(matches) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return filterLatest(matches, itemLimit)
|
||||
}
|
||||
|
||||
func (s *JSONDB) globPattern(key string) string {
|
||||
return s.createPrefix(key) + "*" + extDat
|
||||
}
|
||||
|
||||
func (s *JSONDB) createPrefix(key string) string {
|
||||
prefix := getPrefix(key)
|
||||
return filepath.Join(s.getDirectory(key, prefix), prefix)
|
||||
}
|
||||
|
||||
func (s *JSONDB) exists(filePath string) bool {
|
||||
_, err := os.Stat(filePath)
|
||||
return !os.IsNotExist(err)
|
||||
}
|
||||
|
||||
func ParseStatusFile(filePath string) (*model.Status, error) {
|
||||
f, err := os.Open(filePath) // nolint:gosec
|
||||
if err != nil {
|
||||
log.Printf("failed to open file. err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
_ = f.Close()
|
||||
}()
|
||||
|
||||
var (
|
||||
offset int64
|
||||
result *model.Status
|
||||
)
|
||||
for {
|
||||
line, err := readLineFrom(f, offset)
|
||||
if err == io.EOF {
|
||||
if result == nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset += int64(len(line)) + 1 // +1 for newline
|
||||
if len(line) > 0 {
|
||||
status, err := model.StatusFromJSON(string(line))
|
||||
if err == nil {
|
||||
result = status
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func filterLatest(files []string, itemLimit int) []string {
|
||||
if len(files) == 0 {
|
||||
return nil
|
||||
}
|
||||
sort.Slice(files, func(i, j int) bool {
|
||||
a, err := findTimestamp(files[i])
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
b, err := findTimestamp(files[j])
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
return a.After(b)
|
||||
})
|
||||
return files[:min(len(files), itemLimit)]
|
||||
}
|
||||
|
||||
func findTimestamp(file string) (time.Time, error) {
|
||||
timestampString := rTimestamp.FindString(file)
|
||||
if !strings.Contains(timestampString, "Z") {
|
||||
// For backward compatibility
|
||||
t, err := time.Parse(dateTimeFormat, timestampString)
|
||||
if err != nil {
|
||||
return time.Time{}, nil
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// UTC
|
||||
t, err := time.Parse(dateTimeFormatUTC, timestampString)
|
||||
if err != nil {
|
||||
return time.Time{}, nil
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func readLineFrom(f *os.File, offset int64) ([]byte, error) {
|
||||
if _, err := f.Seek(offset, io.SeekStart); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
reader := bufio.NewReader(f)
|
||||
var ret []byte
|
||||
for {
|
||||
line, isPrefix, err := reader.ReadLine()
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
ret = append(ret, line...)
|
||||
if !isPrefix {
|
||||
break
|
||||
}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func getPrefix(key string) string {
|
||||
ext := filepath.Ext(key)
|
||||
if ext == "" {
|
||||
// No extension
|
||||
return filepath.Base(key)
|
||||
}
|
||||
if fileutil.IsYAMLFile(key) {
|
||||
// Remove .yaml or .yml extension
|
||||
return strings.TrimSuffix(filepath.Base(key), ext)
|
||||
}
|
||||
// Use the base name (if it's a path or just a name)
|
||||
return filepath.Base(key)
|
||||
}
|
||||
|
||||
// timeInUTC is a wrapper for time.Time that ensures the time is in UTC.
|
||||
type timeInUTC struct{ time.Time }
|
||||
|
||||
func newUTC(t time.Time) timeInUTC {
|
||||
return timeInUTC{t.UTC()}
|
||||
}
|
||||
113
internal/persistence/legacy/jsondb/writer.go
Normal file
113
internal/persistence/legacy/jsondb/writer.go
Normal file
@ -0,0 +1,113 @@
|
||||
package jsondb
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/dagu-org/dagu/internal/fileutil"
|
||||
"github.com/dagu-org/dagu/internal/persistence/legacy/model"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrWriterClosed = errors.New("writer is closed")
|
||||
ErrWriterNotOpen = errors.New("writer is not open")
|
||||
)
|
||||
|
||||
// writer manages writing status to a local file.
|
||||
type writer struct {
|
||||
target string
|
||||
writer *bufio.Writer
|
||||
file *os.File
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
func newWriter(target string) *writer {
|
||||
return &writer{target: target}
|
||||
}
|
||||
|
||||
// open opens the writer.
|
||||
func (w *writer) open() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if w.closed {
|
||||
return ErrWriterClosed
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(filepath.Dir(w.target), 0750); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
file, err := fileutil.OpenOrCreateFile(w.target)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.file = file
|
||||
w.writer = bufio.NewWriter(file)
|
||||
return nil
|
||||
}
|
||||
|
||||
// write appends the status to the local file.
|
||||
func (w *writer) write(st model.Status) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if w.closed {
|
||||
return ErrWriterClosed
|
||||
}
|
||||
|
||||
if w.writer == nil {
|
||||
return ErrWriterNotOpen
|
||||
}
|
||||
|
||||
jsonb, err := json.Marshal(st)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := w.writer.Write(jsonb); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := w.writer.WriteByte('\n'); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return w.writer.Flush()
|
||||
}
|
||||
|
||||
// close closes the writer.
|
||||
func (w *writer) close() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if w.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
if w.writer != nil {
|
||||
err = w.writer.Flush()
|
||||
}
|
||||
|
||||
if w.file != nil {
|
||||
if syncErr := w.file.Sync(); syncErr != nil && err == nil {
|
||||
err = syncErr
|
||||
}
|
||||
if closeErr := w.file.Close(); closeErr != nil && err == nil {
|
||||
err = closeErr
|
||||
}
|
||||
}
|
||||
|
||||
w.closed = true
|
||||
w.writer = nil
|
||||
w.file = nil
|
||||
|
||||
return err
|
||||
}
|
||||
96
internal/persistence/legacy/model/node.go
Normal file
96
internal/persistence/legacy/model/node.go
Normal file
@ -0,0 +1,96 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/dagu-org/dagu/internal/digraph"
|
||||
"github.com/dagu-org/dagu/internal/digraph/scheduler"
|
||||
"github.com/dagu-org/dagu/internal/stringutil"
|
||||
)
|
||||
|
||||
func FromSteps(steps []digraph.Step) []*Node {
|
||||
var ret []*Node
|
||||
for _, s := range steps {
|
||||
ret = append(ret, NewNode(s))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func FromNodes(nodes []scheduler.NodeData) []*Node {
|
||||
var ret []*Node
|
||||
for _, node := range nodes {
|
||||
ret = append(ret, FromNode(node))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func FromNode(node scheduler.NodeData) *Node {
|
||||
return &Node{
|
||||
Step: node.Step,
|
||||
Log: node.State.Stdout,
|
||||
StartedAt: stringutil.FormatTime(node.State.StartedAt),
|
||||
FinishedAt: stringutil.FormatTime(node.State.FinishedAt),
|
||||
Status: node.State.Status,
|
||||
StatusText: node.State.Status.String(),
|
||||
RetriedAt: stringutil.FormatTime(node.State.RetriedAt),
|
||||
RetryCount: node.State.RetryCount,
|
||||
DoneCount: node.State.DoneCount,
|
||||
Error: errText(node.State.Error),
|
||||
}
|
||||
}
|
||||
|
||||
type Node struct {
|
||||
Step digraph.Step `json:"Step"`
|
||||
Log string `json:"Log"`
|
||||
StartedAt string `json:"StartedAt"`
|
||||
FinishedAt string `json:"FinishedAt"`
|
||||
Status scheduler.NodeStatus `json:"Status"`
|
||||
RetriedAt string `json:"RetriedAt,omitempty"`
|
||||
RetryCount int `json:"RetryCount,omitempty"`
|
||||
DoneCount int `json:"DoneCount,omitempty"`
|
||||
Error string `json:"Error,omitempty"`
|
||||
StatusText string `json:"StatusText"`
|
||||
}
|
||||
|
||||
func (n *Node) ToNode() *scheduler.Node {
|
||||
startedAt, _ := stringutil.ParseTime(n.StartedAt)
|
||||
finishedAt, _ := stringutil.ParseTime(n.FinishedAt)
|
||||
retriedAt, _ := stringutil.ParseTime(n.RetriedAt)
|
||||
return scheduler.NewNode(n.Step, scheduler.NodeState{
|
||||
Status: n.Status,
|
||||
Stdout: n.Log,
|
||||
StartedAt: startedAt,
|
||||
FinishedAt: finishedAt,
|
||||
RetriedAt: retriedAt,
|
||||
RetryCount: n.RetryCount,
|
||||
DoneCount: n.DoneCount,
|
||||
Error: errFromText(n.Error),
|
||||
})
|
||||
}
|
||||
|
||||
func NewNode(step digraph.Step) *Node {
|
||||
return &Node{
|
||||
Step: step,
|
||||
StartedAt: "-",
|
||||
FinishedAt: "-",
|
||||
Status: scheduler.NodeStatusNone,
|
||||
StatusText: scheduler.NodeStatusNone.String(),
|
||||
}
|
||||
}
|
||||
|
||||
var errNodeProcessing = errors.New("node processing error")
|
||||
|
||||
func errFromText(err string) error {
|
||||
if err == "" {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("%w: %s", errNodeProcessing, err)
|
||||
}
|
||||
|
||||
func errText(err error) string {
|
||||
if err == nil {
|
||||
return ""
|
||||
}
|
||||
return err.Error()
|
||||
}
|
||||
79
internal/persistence/legacy/model/status.go
Normal file
79
internal/persistence/legacy/model/status.go
Normal file
@ -0,0 +1,79 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/dagu-org/dagu/internal/digraph/scheduler"
|
||||
"github.com/dagu-org/dagu/internal/stringutil"
|
||||
)
|
||||
|
||||
func StatusFromJSON(s string) (*Status, error) {
|
||||
status := new(Status)
|
||||
err := json.Unmarshal([]byte(s), status)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return status, err
|
||||
}
|
||||
|
||||
type StatusFile struct {
|
||||
File string
|
||||
Status Status
|
||||
}
|
||||
|
||||
type StatusResponse struct {
|
||||
Status *Status `json:"status"`
|
||||
}
|
||||
|
||||
type Status struct {
|
||||
RequestID string `json:"RequestId"`
|
||||
Name string `json:"Name"`
|
||||
Status scheduler.Status `json:"Status"`
|
||||
StatusText string `json:"StatusText"`
|
||||
PID PID `json:"Pid"`
|
||||
Nodes []*Node `json:"Nodes"`
|
||||
OnExit *Node `json:"OnExit"`
|
||||
OnSuccess *Node `json:"OnSuccess"`
|
||||
OnFailure *Node `json:"OnFailure"`
|
||||
OnCancel *Node `json:"OnCancel"`
|
||||
StartedAt string `json:"StartedAt"`
|
||||
FinishedAt string `json:"FinishedAt"`
|
||||
Log string `json:"Log"`
|
||||
Params string `json:"Params,omitempty"`
|
||||
ParamsList []string `json:"ParamsList,omitempty"`
|
||||
}
|
||||
|
||||
func (st *Status) CorrectRunningStatus() {
|
||||
if st.Status == scheduler.StatusRunning {
|
||||
st.Status = scheduler.StatusError
|
||||
st.StatusText = st.Status.String()
|
||||
}
|
||||
}
|
||||
|
||||
func FormatTime(val time.Time) string {
|
||||
if val.IsZero() {
|
||||
return ""
|
||||
}
|
||||
return stringutil.FormatTime(val)
|
||||
}
|
||||
|
||||
func Time(t time.Time) *time.Time {
|
||||
return &t
|
||||
}
|
||||
|
||||
type PID int
|
||||
|
||||
const pidNotRunning PID = -1
|
||||
|
||||
func (p PID) String() string {
|
||||
if p == pidNotRunning {
|
||||
return ""
|
||||
}
|
||||
return fmt.Sprintf("%d", p)
|
||||
}
|
||||
|
||||
func (p PID) IsRunning() bool {
|
||||
return p != pidNotRunning
|
||||
}
|
||||
@ -10,10 +10,10 @@ import (
|
||||
"github.com/dagu-org/dagu/internal/dagrun"
|
||||
"github.com/dagu-org/dagu/internal/fileutil"
|
||||
"github.com/dagu-org/dagu/internal/models"
|
||||
"github.com/dagu-org/dagu/internal/persistence/localdag"
|
||||
"github.com/dagu-org/dagu/internal/persistence/localdagrun"
|
||||
"github.com/dagu-org/dagu/internal/persistence/localproc"
|
||||
"github.com/dagu-org/dagu/internal/persistence/localqueue"
|
||||
"github.com/dagu-org/dagu/internal/persistence/filedag"
|
||||
"github.com/dagu-org/dagu/internal/persistence/filedagrun"
|
||||
"github.com/dagu-org/dagu/internal/persistence/fileproc"
|
||||
"github.com/dagu-org/dagu/internal/persistence/filequeue"
|
||||
"github.com/dagu-org/dagu/internal/scheduler"
|
||||
"github.com/dagu-org/dagu/internal/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -70,10 +70,10 @@ func setupTest(t *testing.T) testHelper {
|
||||
Global: config.Global{WorkDir: tempDir},
|
||||
}
|
||||
|
||||
ds := localdag.New(cfg.Paths.DAGsDir, localdag.WithFlagsBaseDir(cfg.Paths.SuspendFlagsDir))
|
||||
drs := localdagrun.New(cfg.Paths.DAGRunsDir)
|
||||
ps := localproc.New(cfg.Paths.ProcDir)
|
||||
qs := localqueue.New(cfg.Paths.QueueDir)
|
||||
ds := filedag.New(cfg.Paths.DAGsDir, filedag.WithFlagsBaseDir(cfg.Paths.SuspendFlagsDir))
|
||||
drs := filedagrun.New(cfg.Paths.DAGRunsDir)
|
||||
ps := fileproc.New(cfg.Paths.ProcDir)
|
||||
qs := filequeue.New(cfg.Paths.QueueDir)
|
||||
|
||||
drm := dagrun.New(drs, ps, cfg.Paths.Executable, cfg.Global.WorkDir)
|
||||
em := scheduler.NewEntryReader(testdataDir, ds, drm, "", "")
|
||||
|
||||
@ -23,9 +23,9 @@ import (
|
||||
"github.com/dagu-org/dagu/internal/fileutil"
|
||||
"github.com/dagu-org/dagu/internal/logger"
|
||||
"github.com/dagu-org/dagu/internal/models"
|
||||
"github.com/dagu-org/dagu/internal/persistence/localdag"
|
||||
"github.com/dagu-org/dagu/internal/persistence/localdagrun"
|
||||
"github.com/dagu-org/dagu/internal/persistence/localproc"
|
||||
"github.com/dagu-org/dagu/internal/persistence/filedag"
|
||||
"github.com/dagu-org/dagu/internal/persistence/filedagrun"
|
||||
"github.com/dagu-org/dagu/internal/persistence/fileproc"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -98,9 +98,9 @@ func Setup(t *testing.T, opts ...HelperOption) Helper {
|
||||
cfg.Server = *options.ServerConfig
|
||||
}
|
||||
|
||||
dagStore := localdag.New(cfg.Paths.DAGsDir, localdag.WithFlagsBaseDir(cfg.Paths.SuspendFlagsDir))
|
||||
runStore := localdagrun.New(cfg.Paths.DAGRunsDir)
|
||||
procStore := localproc.New(cfg.Paths.ProcDir)
|
||||
dagStore := filedag.New(cfg.Paths.DAGsDir, filedag.WithFlagsBaseDir(cfg.Paths.SuspendFlagsDir))
|
||||
runStore := filedagrun.New(cfg.Paths.DAGRunsDir)
|
||||
procStore := fileproc.New(cfg.Paths.ProcDir)
|
||||
|
||||
drm := dagrun.New(runStore, procStore, cfg.Paths.Executable, cfg.Global.WorkDir)
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user