GoでGracefulに停止する定期実行ワーカー

起動してから、指定された処理を「s秒ごと」のように固定間隔で実行し続ける、というワーカー的なプログラムを考える。

何も考慮しない場合

最も原始的には以下のように time.Ticker を使ってループし続ければ良い。

type Worker struct {
    interval time.Duration
    task     func(context.Context)
}

func NewWorker(interval time.Duration, task func(context.Context)) *Worker {
    return &Worker{
        interval: interval,
        task:     task,
    }
}

func (w *Worker) Start(ctx context.Context) {
    ticker := time.NewTicker(w.interval)
    defer ticker.Stop()

    for {
        <-ticker.C
        w.task(ctx)
    }
}

tickerが指定した周期でchannelに値を送信し受信可能状態になるので、それを待ってから次のtaskを実行すれば良い。

処理中のtaskは完了してから終了したい [失敗例]

ここから、例えばシグナルを受け取って終了するが 実行中のものが中断されないよう完了を待ってから停止したい、という場合。

うっかりこう書いてしまいそうだが、これは意図した挙動にならないことがある。

func (w *Worker) Start(ctx context.Context) {
    ticker := time.NewTicker(w.interval)
    defer ticker.Stop()

    for {
        select {
        // ここでcontextのキャンセルを監視
        case <-ctx.Done():
            return
        case <-ticker.C:
            w.task(ctx)
        }
    }
}

func doTask(ctx context.Context) {
    n := 500 + rand.Intn(500)
    log.Printf("Starting task (%d ms)...", n)
    time.Sleep(time.Duration(n) * time.Millisecond)
    log.Printf("Task completed.")
}


func main() {
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    NewWorker(time.Second, doTask).Start(ctx)
}

少し長い doTask が実行中にCtrl+Cでストップしようとしても、ちゃんと確実に Task completed. が出てから終了するのを確認できる。

しかし この doTask の実行時間がもっと長く、interval よりも長い場合、次のループに来たときに <- ctx.Done() を受信できるが、同じく <- ticker.C も受信できる状態になっている。この場合 select がどちらを通るかは不定のため、 ctx は既にキャンセルされているのに再度 task が実行される、ということが起こり得る。つまり「止まるかもしれないし 止まらないかもしれない」。

こういう考慮不足のものは意外と最近の生成AIでも書いてきたりするので注意が必要。

処理中のtaskは完了してから終了したい [修正版]

というわけで停止したい場合に確実に停止してもらうために、 ticker.C が選ばれた場合も ctx がキャンセルされているかどうかを必ずチェックするようにする。

func (w *Worker) Start(ctx context.Context) error {
    ticker := time.NewTicker(w.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            if ctx.Err() != nil {
                return ctx.Err()
            }
            w.task(ctx)
        }
    }
}

もしくは select の分岐内をシンプルに中止か待機かだけの役割にして

   for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
        }
        if ctx.Err() != nil {
            return ctx.Err()
        }
        w.task(ctx)
    }

とか。これなら select をループの後方に持っていくことで最初の1回目の実行を即座に開始できる。

   for {
        if ctx.Err() != nil {
            return ctx.Err()
        }
        w.task(ctx)

        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
    }
}

完了待機時間に上限を設ける

実行中の task が終了するまで待機するようにはなったが、もし何らかの不具合や非常に重い処理をしていたりして長い時間がかかっていると、いつまでもプログラムが終了しない問題がある。

それでは困るので、実行中の task の完了を待つ時間に上限を設定する。

type Worker struct {
    interval        time.Duration
    shutdownTimeout time.Duration
    task            func(context.Context)
}

func NewWorker(interval time.Duration, shutdownTimeout time.Duration, task func(context.Context)) *Worker {
    return &Worker{
        interval:        interval,
        shutdownTimeout: shutdownTimeout,
        task:            task,
    }
}

func (w *Worker) Start(ctx context.Context) error {
    ticker := time.NewTicker(w.interval)
    defer ticker.Stop()

    for {
        if ctx.Err() != nil {
            return ctx.Err()
        }

        // task用のcontextを作成
        taskCtx, taskCancel := context.WithCancel(context.Background())
        done := make(chan struct{})

        go func() {
            defer taskCancel()
            w.task(taskCtx)
            close(done)
        }()

        // taskの完了または親contextのキャンセルを待つ
        select {
        case <-done:
            // task完了
        case <-ctx.Done():
            // shutdownTimeout の間だけtaskの完了を待つ
            // 完了しなければ強制キャンセルして終了
            select {
            case <-done:
            case <-time.After(w.shutdownTimeout):
                taskCancel()
            }
            return ctx.Err()
        }

        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
        }
    }
}

task の実行をgoroutineに載せ、完了を待っている間に ctx.Done() を受信したらさらに完了を待ちつつ shutdownTimeout の時間だけ待機。そちらが先に来てしまったら taskCancel を呼んでから終了する。

task には親ctxをそのまま渡しても良いが、停止リクエスト受信時には即座に停止しようとはせずに実行中のものは出来るだけ完了まで継続して欲しい、という設計も有り得る。そういった場合にはこのように別のcontextを渡し、本当に待機上限時間を超えたときだけキャンセル、という形にできる。 ただし、この場合はcancelを呼んだ後にすぐにプログラムが終了すると本来のcancel処理が実行されずにプロセスごと終了してしまうことが有り得るので、最低限の「ドレイン待ち」のような処理は必要かもしれない。