断続的にデータを受けながら並行で時間差リアクションを行う

はじめて、の次のGo - すぎゃーんメモ にて作った go-genki-bot 、UserStreamからTweet取得して返信するところの処理が微妙なかんじで mattnさんやlestrratさんなどからアドバイスいただいたりした。

Goっぽい書き方で上手く並行処理をさせたい。

お題

「断続的にやってくるデータを受け取り表示しつつ、特定の条件のデータに対して遅延させた処理を行なう。」
例えばIDとText付きで送られてくるデータに対して逐次内容を表示させつつ、Textが"message0"というモノが来た場合は1秒待ってからそのIDを表示する。

1. for文の中で全部やる
package main

import (
	"fmt"
	"log"
	"math/rand"
	"time"
)

// Message type
type Message struct {
	ID   int
	Text string
}

var r = rand.New(rand.NewSource(time.Now().UnixNano()))

func main() {
	for i := 0; ; i++ {
		// 断続的にMessageを発行
		time.Sleep(time.Millisecond * time.Duration(r.Intn(200)))
		msg := &Message{
			ID:   i,
			Text: fmt.Sprintf("message%d", r.Intn(10)),
		}
		log.Println(*msg)
		// 特定のMessageに対し1秒後にアクション
		if msg.Text == "message0" {
			go func(m *Message) {
				time.Sleep(time.Second)
				log.Println(fmt.Sprintf("reply to Message %d", m.ID))
			}(msg)
		}
	}
}

もともとgo-genki-botで書いていたのがこんなかんじ。延々と続くループの中でデータを生成して その中で条件に当てはまるものだけgoroutineを呼び その中でtime.Sleep()使って遅延処理させる。
実行すると

$ go run sample.go
2014/07/04 23:26:03 {0 message9}
2014/07/04 23:26:03 {1 message1}
2014/07/04 23:26:03 {2 message8}
2014/07/04 23:26:03 {3 message9}
2014/07/04 23:26:03 {4 message2}
2014/07/04 23:26:03 {5 message4}
2014/07/04 23:26:04 {6 message9}
2014/07/04 23:26:04 {7 message8}
2014/07/04 23:26:04 {8 message7}
2014/07/04 23:26:04 {9 message3}
2014/07/04 23:26:04 {10 message1}
2014/07/04 23:26:04 {11 message2}
2014/07/04 23:26:04 {12 message0}
2014/07/04 23:26:04 {13 message8}
2014/07/04 23:26:04 {14 message4}
2014/07/04 23:26:04 {15 message1}
2014/07/04 23:26:05 {16 message4}
2014/07/04 23:26:05 {17 message0}
2014/07/04 23:26:05 {18 message8}
2014/07/04 23:26:05 {19 message7}
2014/07/04 23:26:05 reply to Message 12
2014/07/04 23:26:05 {20 message9}
2014/07/04 23:26:05 {21 message7}
2014/07/04 23:26:06 {22 message3}
2014/07/04 23:26:06 {23 message0}
2014/07/04 23:26:06 {24 message8}
2014/07/04 23:26:06 reply to Message 17
2014/07/04 23:26:06 {25 message9}
2014/07/04 23:26:06 {26 message5}
...

と、次々流れてくるデータはブロックせず処理しつつも 遅延してreply処理も並行で出来ている。

2. time.AfterFunc(time.After)を使う

「特定時間Sleepさせてから処理を行う」だけなら、time.AfterFuncが使える。

func main() {
	for i := 0; ; i++ {
		// 断続的にMessageを発行
		time.Sleep(time.Millisecond * time.Duration(r.Intn(200)))
		msg := &Message{
			ID:   i,
			Text: fmt.Sprintf("message%d", r.Intn(10)),
		}
		log.Println(*msg)
		// 特定のMessageに対し1秒後にアクション
		if msg.Text == "message0" {
			time.AfterFunc(time.Second, func() {
				log.Println(fmt.Sprintf("reply to Message %d", msg.ID))
			})
		}
	}
}

こうなる。goroutineがうまく隠蔽されるかんじ。
ただ、これだと引数を渡せなくて変数を束縛できない。そうしたい場合はやはり自分でgoroutineを呼んで明示的に引数を渡す形になるかな…?

3. Messgeの発行をchannel経由に

そもそもMessage発行とそれに対する処理を同じループの中でやる必要ないのでは。「Message発行」の部分をgoroutine上で行ない、channelを経由してそれを取り出して処理しよう。

func main() {
	ch := make(chan *Message)
	defer close(ch)
	go func() {
		for i := 0; ; i++ {
			time.Sleep(time.Millisecond * time.Duration(r.Intn(200)))
			ch <- &Message{
				ID:   i,
				Text: fmt.Sprintf("message%d", r.Intn(10)),
			}
		}
	}()

	for msg := range ch {
		log.Println(*msg)
		if msg.Text == "message0" {
			go func(m *Message) {
				time.Sleep(time.Second)
				log.Println(fmt.Sprintf("reply to Message %d", m.ID))
			}(msg)
		}
	}
}

チャンネルch chan *Messageを用意しておいて、goroutine内でMessageが発行されるたびにそこに流し込み、range chのループでそれを逐次取り出すことができる。
きれいに処理が分かれて、いいかんじ。

4. selectで受け取る

rangeを使わずにselectでchannelからのデータを受け取るようにも書ける。

func main() {
	ch := make(chan *Message)
	defer close(ch)
	go func() {
		for i := 0; ; i++ {
			time.Sleep(time.Millisecond * time.Duration(r.Intn(200)))
			ch <- &Message{
				ID:   i,
				Text: fmt.Sprintf("message%d", r.Intn(10)),
			}
		}
	}()

	for {
		select {
		case msg := <-ch:
			log.Println(*msg)
			if msg.Text == "message0" {
				go func(m *Message) {
					time.Sleep(time.Second)
					log.Println(fmt.Sprintf("reply to Message %d", m.ID))
				}(msg)
			}
		}
	}
}

やってることはほぼ変わらないけど、他にも別のgoroutineからchannelを通して何かが流れてくる場合にはこのカタチの方が便利そうだ。

例えば無限ループでメッセージ発行され続けてくるものに対しタイムアウトを設定してプログラムを終了させたい場合。

func main() {
	ch := make(chan *Message)
	defer close(ch)
	go func() {
		for i := 0; ; i++ {
			time.Sleep(time.Millisecond * time.Duration(r.Intn(200)))
			ch <- &Message{
				ID:   i,
				Text: fmt.Sprintf("message%d", r.Intn(10)),
			}
		}
	}()

	timeout := time.After(time.Second * 5)
	for {
		select {
		case msg := <-ch:
			log.Println(*msg)
			if msg.Text == "message0" {
				go func(m *Message) {
					time.Sleep(time.Second)
					log.Println(fmt.Sprintf("reply to Message %d", m.ID))
				}(msg)
			}
		case <-timeout:
			return
		}
	}
}

こうやって<-timeoutのcaseを追加するだけでループ終了を設定できる。

4. sync.WaitGroupで最後まで

上記のtimeoutで終了させると、突然プログラムが終了してしまいtimeout寸前で条件に当てはまるものが来ていても遅延したリアクションを完遂することなく終わってしまう。
ちゃんと「遅延したリアクションを行う」のを最後までやるなら、sync.WaitGroupを使ってwaitするようにした方が良さそうだ。

import "sync"

...

func main() {
	ch := make(chan *Message)
	defer close(ch)
	go func() {
		for i := 0; ; i++ {
			time.Sleep(time.Millisecond * time.Duration(r.Intn(200)))
			ch <- &Message{
				ID:   i,
				Text: fmt.Sprintf("message%d", r.Intn(10)),
			}
		}
	}()

	timeout := time.After(time.Second * 5)
	wg := &sync.WaitGroup{}
loop:
	for {
		select {
		case msg := <-ch:
			log.Println(*msg)
			if msg.Text == "message0" {
				wg.Add(1)
				go func(m *Message) {
					time.Sleep(time.Second)
					log.Println(fmt.Sprintf("reply to Message %d", m.ID))
					wg.Done()
				}(msg)
			}
		case <-timeout:
			break loop
		}
	}
	wg.Wait()

goroutineを走らせるたびにAdd(1)して、中の処理が終わったらDone()。で、Wait()を書いておけば、すべてがちゃんと終了するまで待ってくれる。

5. 全体を並行化するには…?

ここまでの「断続的にやってくるデータを受け取り表示しつつ、特定の条件のデータに対して遅延させた処理を行なう」という操作自体を幾つか並行して走らせようとすると、どうしたら良いのだろう。
Messageを生成する側はchanを返す関数を用意してしまえば幾つでも生成できると思うのだけど、受け取る側は…case文に書き足していくしかないのかな?

func main() {
	generator := func() <-chan *Message {
		ch := make(chan *Message)
		go func() {
			for i := 0; ; i++ {
				time.Sleep(time.Millisecond * time.Duration(r.Intn(200)))
				ch <- &Message{
					ID:   i,
					Text: fmt.Sprintf("message%d", r.Intn(10)),
				}
			}
		}()
		return ch
	}

	wg := &sync.WaitGroup{}
	ch1 := generator()
	ch2 := generator()
	ch3 := generator()
	processor := func(chID int, msg *Message) {
		log.Println(chID, *msg)
		if msg.Text == "message0" {
			wg.Add(1)
			go func(m *Message) {
				time.Sleep(time.Second)
				log.Println(chID, fmt.Sprintf("reply to Message %d", m.ID))
				wg.Done()
			}(msg)
		}
	}
	timeout := time.After(time.Second * 5)
loop:
	for {
		select {
		case msg := <-ch1:
			processor(1, msg)
		case msg := <-ch2:
			processor(2, msg)
		case msg := <-ch3:
			processor(3, msg)
		case <-timeout:
			break loop
		}
	}
	wg.Wait()
}

もっと上手いやり方があると思うのだけど、思いつかない…