fix: initialize node before setting node status running (#1302)

This commit is contained in:
YotaHamada 2025-10-05 14:55:51 +09:00 committed by Yota Hamada
parent 968198724e
commit 6d0ff95ead

View File

@ -139,6 +139,15 @@ func (sc *Scheduler) Schedule(ctx context.Context, graph *ExecutionGraph, progre
wg.Add(1)
logger.Info(ctx, "Step started", "step", node.Name())
if err := sc.setupNode(ctx, node); err != nil {
sc.setLastError(err)
node.MarkError(err)
node.SetStatus(status.NodeError)
sc.finishNode(node, &wg)
continue NodesIteration
}
node.SetStatus(status.NodeRunning)
if progressCh != nil {
progressCh <- node
@ -189,17 +198,10 @@ func (sc *Scheduler) Schedule(ctx context.Context, graph *ExecutionGraph, progre
return
}
setupSucceed := true
if err := sc.setupNode(ctx, node); err != nil {
setupSucceed = false
sc.setLastError(err)
node.MarkError(err)
}
ctx = node.SetupContextBeforeExec(ctx)
ExecRepeat: // repeat execution
for setupSucceed && !sc.isCanceled() {
for !sc.isCanceled() {
execErr := sc.execNode(ctx, node)
isRetriable := sc.handleNodeExecutionError(ctx, graph, node, execErr)
if isRetriable {
@ -555,8 +557,6 @@ func isReady(ctx context.Context, g *ExecutionGraph, node *Node) bool {
func (sc *Scheduler) runEventHandler(ctx context.Context, graph *ExecutionGraph, node *Node) error {
defer node.Finish()
node.SetStatus(status.NodeRunning)
if !sc.dry {
if err := node.Setup(ctx, sc.logDir, sc.dagRunID); err != nil {
node.SetStatus(status.NodeError)
@ -567,6 +567,8 @@ func (sc *Scheduler) runEventHandler(ctx context.Context, graph *ExecutionGraph,
_ = node.Teardown(ctx)
}()
node.SetStatus(status.NodeRunning)
ctx = sc.setupEnvironEventHandler(ctx, graph, node)
if err := node.Execute(ctx); err != nil {
node.SetStatus(status.NodeError)