interface要素を持つstructへのJSON Unmarshal

ちょっとやり方が分からなくて調べたのでメモ。

例題

type hoge struct {
    Foo int `json:"foo"`
    Bar baz `json:"bar"`
}

type baz interface {
    hoge()
}

type fuga struct {
    Fuga string `json:"fuga"`
}

func (*fuga) hoge() {}

という感じで "foo"int に固定されているけど "bar"baz interface というのだけ定義されている hoge struct がある。

baz interface を正しく実装している fuga structを使ってその要素を持つJSONhoge structにunmarshalしたい。とする。

func main() {
    str := `{"foo":3,"bar":{"fuga":"fuga"}}`

    var h hoge
    if err := json.Unmarshal([]byte(str), &h); err != nil {
        log.Fatal(err)
    }
}

これは失敗する。

2018/06/23 22:42:58 json: cannot unmarshal object into Go struct field hoge.bar of type main.baz
exit status 1

"bar" 要素のunmarshalの方法が分からん。ということなので明示的に教えてやる必要がある。

hoge struct に UnmarshalJSON([]byte) error 関数を実装してやる。

mapを使う方法

直感的に分かりやすい? かも知れない方法

func (h *hoge) UnmarshalJSON(b []byte) error {
    m := map[string]json.RawMessage{}
    if err := json.Unmarshal(b, &m); err != nil {
        return err
    }
    for k, v := range m {
        switch k {
        case "foo":
            var i int
            if err := json.Unmarshal(v, &i); err != nil {
                return err
            }
            h.Foo = i
        case "bar":
            var f fuga
            if err := json.Unmarshal(v, &f); err != nil {
                return err
            }
            h.Bar = &f
        }
    }
    return nil
}

一旦 map[string]json.RawMessage{}マッピングしてunmarshalしてから、各要素をfor loopで回して それぞれの値を改めてunmarshalする。 他のfieldにunmarshalすべきtypeの情報が入っていればここで取り出してから分岐させたりして処理すれば良い。

func main() {
    str := `{"foo":3,"bar":{"fuga":"fuga"}}`

    var h hoge
    if err := json.Unmarshal([]byte(str), &h); err != nil {
        log.Fatal(err)
    }
    log.Printf("%#v, (bar: %+v)", h, h.Bar)
}
$ go run main.go
2018/06/23 22:52:12 main.hoge{Foo:3, Bar:(*main.fuga)(0xc42000e2c0)}, (bar: &{Fuga:fuga})

alias typeを使う

上記の方法だと Foo 要素とか interface 以外のものもいちいち分岐の中でunmarshal処理を書かなければならず、めんどい。

そこで、aliasなtypeを使って処理する方法があるようだ。

func (h *hoge) UnmarshalJSON(b []byte) error {
    type alias hoge
    a := struct {
        Bar json.RawMessage `json:"bar"`
        *alias
    }{
        alias: (*alias)(h),
    }
    if err := json.Unmarshal(b, &a); err != nil {
        return err
    }

    var f fuga
    if err := json.Unmarshal(a.Bar, &f); err != nil {
        return err
    }
    h.Bar = &f

    return nil
}

このようにalias typeを使って そこに突っ込むようにunmarshalすると、Bar だけ json.RawMessageで受け取り それ以外はそのままunmarshalしてくれる、ようだ。 なので Bar要素の部分だけ明示的に処理を書けば良いらしい。

スッキリ書けて良さそう。

複数のtype候補がある場合

例えば上記の fuga struct 以外にも 似たような piyo struct が同様に baz interface を実装していたとする。

type hoge struct {
    Foo int `json:"foo"`
    Bar baz `json:"bar"`
}

type baz interface {
    hoge()
}

// こっちのFugaはstring
type fuga struct {
    Fuga string `json:"fuga"`
}

func (*fuga) hoge() {}

// こっちのFugaはbool
type piyo struct {
    Fuga bool `json:"fuga"`
}

func (*piyo) hoge() {}

同名のフィールドを持っているが型が違うので、

{"foo":3,"bar":{"fuga":"true"}}

というJSON"fuga" は文字列なので fuga structで、

{"foo":3,"bar":{"fuga":true}}

というJSONの場合は "fuga" がbooleanを受け取るので piyo struct にしたい。

どっちか分からない、という場合は 「上手くunmarshalできるかどうかとにかく試してみる」という感じになりそう

func (h *hoge) UnmarshalJSON(b []byte) error {
    type alias hoge
    a := struct {
        Bar json.RawMessage `json:"bar"`
        *alias
    }{
        alias: (*alias)(h),
    }
    if err := json.Unmarshal(b, &a); err != nil {
        return err
    }

    var (
        f fuga
        p piyo
    )
    for _, c := range []baz{&f, &p} {
        err := json.Unmarshal(a.Bar, c)
        if err != nil {
            if _, ok := err.(*json.UnmarshalTypeError); ok {
                continue
            }
            return err
        }
        h.Bar = c
        break
    }
    return nil
}

とにかくfugapiyoもどっちも使ってjson.Unmarshalを試みる。文字列なのにboolとしてunmarshalしようとしたりその逆だったりした場合 json.UnmarshalTypeError が返ってくるので、その場合は別のtypeで再挑戦、みたいな感じ。

func main() {
    str1 := `{"foo":3,"bar":{"fuga":"true"}}`
    str2 := `{"foo":3,"bar":{"fuga":true}}`

    var h hoge
    if err := json.Unmarshal([]byte(str1), &h); err != nil {
        log.Fatal(err)
    }
    log.Printf("%#v, (bar: %+v)", h, h.Bar)

    if err := json.Unmarshal([]byte(str2), &h); err != nil {
        log.Fatal(err)
    }
    log.Printf("%#v, (bar: %+v)", h, h.Bar)
}
$ go run main.go
2018/06/23 23:16:47 main.hoge{Foo:3, Bar:(*main.fuga)(0xc42000e2a0)}, (bar: &{Fuga:true})
2018/06/23 23:16:47 main.hoge{Foo:3, Bar:(*main.piyo)(0xc420014288)}, (bar: &{Fuga:true})

ちゃんと1つ目はfugaとしてunmarshalされ 2つ目はpiyoとしてunmarshalされた。

まぁこんなのが必要になる場合はそもそものデータ設計が間違ってると思うけど。

参照

Google Code Jam 2018: Round 1C

Google Code Jam の第1ラウンド。

・予選のときの記事はこちら memo.sugyan.com

Round 1は時間制限が2時間半、3回行われてどこか1つで上位1,500人に入れば次に進める、というもの。 多分毎回ここで脱落していた

1Aは土曜日の早朝で起きられなくて断念、1Bは挑戦してみたものの全然正解できずに撃沈、最後の望みをかけて1Cに挑戦。

A Whole New Word

文字の並び換えで新しい単語を作る、もしくはもうすべて網羅されているか、を出力する。

どうやって解けばいいんだろう…とすごく悩んでしまった。 例えば最後の文字が3種類あったら 最後から2番目の文字は各種類 × 3回あらわれるはずで、それが2種類×3あったらさらに前の文字は各種類6回… となるはず、それより少なければそれは入力に含まれていない語がある、ということでそれを出力すればいい! という感じで、それを愚直に書いた。

def solve(w, l):
    n = 1
    p = []
    for i in range(l):
        d = {}
        for c in [s[l - i - 1] for s in w]:
            d[c] = d.get(c, 0) + 1
        if i > 0:
            for k in d.keys():
                if d[k] < n:
                    for e in p:
                        a = set(p) - set([s[l - i:] for s in w if s[l - i - 1] == k])
                        a = k + list(a)[0]
                        if len(a) < l:
                            a = w[0][0:l - len(a)] + a
                        return a
        if i == 0:
            p = [x for x in d.keys()]
        else:
            p = [x + e for x in d.keys() for e in p]
        n *= len(set(d.keys()))
    if len(w) == n:
        return '-'


t = int(input())
for i in range(1, t + 1):
    n, l = [int(e) for e in input().split(' ')]
    words = []
    for _ in range(n):
        words.append(input().strip())
    ans = solve(words, l)
    print('Case #{:d}: {}'.format(i, ans))

これで1時間くらい使ってしまった…。でも一応これで両方とも通っていたようだ

Lollipop Shop

飴を買いに来る人に最適なものをオススメしてたくさん販売できるようインタラクティブに。

この種のものにまず挑戦したことがなかったので とりあえずtesting_tool.py を使って動かしてみて、適当に出力を返すようにして通るのを確認、提出してみるとWrong answerに。 適当に販売してたらダメかそりゃそうか、で改めて問題を読み直して 買われる傾向が決まっているっぽい、ということが分かる。 じゃあ売れやすいものはまた注文が来るかもしれないし、売れにくいものを優先で売るようにすればいいのか? 候補を全部記録して、出現回数が少ないものを優先的に返すように変更。

終了2分前にギリギリで通った。

t = int(input())
for i in range(1, t + 1):
    n = int(input())
    f = set(range(n))
    s = {}
    for _ in range(n):
        c = []
        d = [int(x) for x in input().strip().split(' ')]
        for j in d[1:]:
            s[j] = s.get(j, 0) + 1
            if j in f:
                c.append(j)
        if len(c) > 0:
            m = min([s[j] for j in c])
            j = 0
            for k, v in s.items():
                if v == m and k in c:
                    j = k
                    break
            print(j)
            f.remove(j)
        else:
            print('-1')

Ant Stack

耐久重量が決まっている各蟻が前の蟻を乗せていって、最大幾つ積めるかを求める。

どういう形で積むのが最大になるかがなかなかイメージできなくて 幾つかテストケースを追加して考えてみる。 結局 途中n匹目までのところで「m段詰むと最軽量のものはw_mになる」というのを順番に計算していって 求められるかな…?と思って書いてみた。

def solve(w):
    d = {1: w[0]}
    for i in range(1, len(w)):
        dd = {1: set([w[i]])}
        for k, v in d.items():
            dd[k] = dd.get(k, set())
            dd[k].add(v)
            if v <= w[i] * 6:
                dd[k + 1] = dd.get(k + 1, set())
                dd[k + 1].add(v + w[i])
        d = {}
        for k, v in dd.items():
            d[k] = min(v)
    return max(d.keys())


t = int(input())
for i in range(1, t + 1):
    n = int(input())
    w = [int(x) for x in input().strip().split(' ', n)]
    ans = solve(w)
    print('Case #{}: {}'.format(i, ans))

Test set 1で通って安心してそのまま提出したけど、Test set 2ではTIME LIMIT EXCEEDEDになってしまっていた。 考慮が足りていなかったようだ…

結果

奇跡的に73ptで871位で通過できた、っぽい。嬉しい

将棋駒画像の分類器をラクして作る

ちょっと将棋関連の画像認識をやってみたいな、と思って。

理想的には、将棋の盤面の写真や盤面図の画像から認識を行い 盤面の状態をデータとして取得できるようにしたい。

となると、

  1. 入力画像から盤面の部分を検出し、切り出す
  2. 切り出した盤面をさらに、9x9の各マスに分割する
  3. 分割した各マスに対し どの駒が置かれているか(もしくは何も置かれていないか)を分類する
  4. さらに持駒の部分も画像内から検出し、同様に分類

といった処理が必要になりそう。 1, 2, や 4, の処理はともかくとして、3, の分類はアイドル顔認識と同じようなimage classifierで出来るはずなので、まずはこれを作ってみることに。

目標

将棋盤における駒の状態として有り得るのは「歩兵」「桂馬」「香車」「銀将」「金将」「角行」「飛車」「王将」の8種類、それに成駒である「と金」「成香」「成桂」「成銀」「竜馬」「竜王」を加えた14種類、それぞれ先手側と後手側があるので倍の28種類。そして何も駒が置かれていないマス、も分類対象に加える必要があるので合計で29 classの分類taskとなる。

また さらに、「将棋盤のマスでも何でもない、まったく関係ない画像である」というclassも含めても良さそう。このへんは用途によるかもしれない。

とにかく、この 29 or 30 classの分類を高精度・高速度でできるImage classifierを簡単に作りたい。

データセットを「生成」する

普通の画像分類には大量の「画像」と「正解ラベル」のセットが学習データとして必要になるが、それを用意するのがとにかく大変。 将棋駒のラベル付きデータセットがどこかに公開されていれば良いけど、少し探してみた限りではやはり存在してないようだ。

アイドル顔画像のときのようにやろうとすると、様々な将棋盤の画像を収集して 盤面を切り抜いて 各マスに分割して ラベル付けて… を繰り返すことになる。 極力それは避けたい。

ということで、画像データを自動的に「生成」して用意することを考えてみる。

有り難いことに、将棋盤画像を生成するのに使える良い素材を無料で公開してくださっている方々がいる。

kifulog.ko31.com

こちらの記事で紹介されている素材データは、それぞれ盤の画像と 透過pngの駒画像が個別に用意してあり(「しんえれ外部駒」は背景色固定の駒画像のみ)、これらを貼り合わせることで任意の盤面画像を作ることができる。

例えば以前の記事で載せている

f:id:sugyan:20171119215709p:plain

のような画像は、すべて「CC Resources for Shogi Applications」に含まれている盤・枠・駒それぞれの素材画像を使って貼り合わせることで生成している。

任意の盤面を自由に生成できるということは、当然どの位置にどの駒が置かれているかを分かっているということなので、生成した盤面画像から対象の駒の置いてあるマスの位置を切り抜いて、それと正解ラベルを関連づけることができる。 例えば上記の画像を生成した上で 2三 の位置のマスの部分を切り抜けば、それを「歩兵」としてラベル付けしたデータとして扱うことができる。

f:id:sugyan:20180427120917p:plain

必要な画像処理は

  • 画像の任意の位置に別の透過画像を貼り付ける
  • 任意の位置を切り抜く
  • 任意の大きさにリサイズする

くらい。これらは Pillow でもあれば簡単に実現できる。

また、素材画像の組み合わせではなくすべて自分で描画して生成することも出来る。 よく書籍などで使われる局面図は、漢字一文字(もしくは二文字)で表現される。

f:id:sugyan:20180427123119p:plain

(上図はShogipicより)

こういうものも、枠線を引いて任意のマス位置に文字を描画(後手側の場合は180°回転をする)することで生成することができる。 OSに入っているヒラギノ*.ttcなどのフォントを使って複数の種類のものを生成できる。

これらの方法で画像を生成し、自動でラベル付けする というより「ラベルに合う画像を生成する」という手法でデータセットを作ってみた。 素材の組み合わせを変えたり 画像の質を変えたりして少しずつ違う画像が生成されるよう調整し、各駒300〜400点が全自動で生成できた。

f:id:sugyan:20180427124355p:plain f:id:sugyan:20180427124404p:plain

TensorFlow Hubのモデルを使って学習

データセットがある程度用意できたら、いよいよこれらを識別する分類器を作る…ところだけど、これも既存のものを再利用できるならそれがラクで良い。

3月末に行われたTensorFlow Dev Summit 2018で、「TensorFlow Hub」が発表された。 ここで様々な学習済みモデルが公開され、独自データセットでの利用のための再学習の仕組みも整備されているようだ。

この中にある examples/image_retraining/retrain.py を使うことで、TensorFlow Hub のモジュールを使用して独自データに対する学習ができる。

今回は mobilenet_v2_100_96/classification/1 というMobileNet V2のものを使ってみた。

python retrain.py \
    --image_dir ${IMAGEDIR} \
    --tfhub_module https://tfhub.dev/google/imagenet/mobilenet_v2_100_96/classification/1 \
    --how_many_training_steps 10000

これで、指定したモジュールをダウンロードし そのモデルを使って指定ディレクトリ以下の画像データに対する分類のための再学習を行ってくれる。 distortionを行わない場合は最初に全データに対するbottleneckを計算してそれを使っていくだけのようなので、手元のMacBookProのCPUでも10,000 stepsくらいなら十数分程度で学習が完了する。

f:id:sugyan:20180428013442p:plain

順調にcross entropyが減少し、train dataset に対してはほぼ100%、validation datasetに対しても97〜98%くらいの正答率になっている。

結果と考察

学習が終わると output_graph.pb ファイルが書き出されるので、これを使って実際に幾つかの画像を入力して試してみる。

入力データは (?, 96, 96, 3) のshapeを持つTensorで、値は[0, 1]の範囲にscalingされたものとして学習されているので、それに合わせてやる必要がある。

盤面の部分だけを切り抜いた画像を用意し、縦横それぞれ9分割して81マスすべてに対して学習済みの分類器にかけて 最も高スコアになったラベルを表示してみる。

1. 学習データに使った素材で作ったもの

f:id:sugyan:20180501012958p:plain

-KI * +TO * -OU * +TO+TO+NY
 * -FU *  *  * +FU *  *  * 
+RY-TO *  * -FU-FU * -FU-KY
 * +KE+KE-GI-KI-TO *  * +RY
-UM-NK+KY+TO * -TO *  *  * 
 * +FU *  *  * -GI-GI+FU * 
 *  * +FU * +TO-GI *  * +KE
 *  *  *  * +TO * +FU * +KY
+UM *  *  *  *  * +KI * -KI

学習データそのままなのだから、当然と言えば当然なんだけど、すべて正解。

そのままとはいえ少し縦長のものを96 x 96にリサイズしているので縦横比など少しずれているとは思うのだけど、そのへんは上手く吸収してくれているようだ。

2. Shogipicで生成された局面図

f:id:sugyan:20180501013010p:plain

漢字のみで表現するタイプのもの。 これは…何というフォントを使って生成されているんだろう? ヒラギノ丸ゴ?であれば学習データにも含まれている。

-KI * +TO * -OU * +TO+TO+NY
 * -UM *  *  * +FU *  *  * 
+RY-TO+UM+UM-UM-FU ? -UM-KY
 * +KE+KE-KA-KI-TO ?  * +RY
-UM-KA+KY+TO * -TO *  *  * 
 * +FU ?  ?  * -KE-KE+FU * 
 *  * +FU ? +TO-KE ?  * +KE
 *  *  *  * +TO * +FU * +KY
+UM *  *  *  *  * +KI * -KI

不思議なことに 2三5三8二 などの後手側の「歩兵」が「竜馬」として認識されてしまっている。 4三 の位置のものは正しく「歩兵」になっているのに…。

それぞれ切り抜いて認識結果の上位3つとscoreを出してみる。

4三

f:id:sugyan:20180502174326p:plain

W_FU: 0.6095971
W_UM: 0.25771713
W_TO: 0.095992066

5三

f:id:sugyan:20180502174343p:plain

W_UM: 0.69194067
W_FU: 0.10603738
W_TO: 0.07219603

8二

f:id:sugyan:20180502174357p:plain

W_UM: 0.62901
W_TO: 0.24588391
W_NY: 0.05405906

うーん、真ん中部分は大きく違わないはずなのに結果は随分バラバラになる…。枠線部分の影響を強く受けてしまっているのだろうか…

その他にも、「銀将」が「桂馬」になってしまっていたり、「成桂」が「角行」になってしまっていたり、誤判定が目立つ。

3. 激指 14の局面スクリーンショット

f:id:sugyan:20180501013026p:plain

こちらは学習データの素材とはまったく関連がない画像、のはず。同じ駒でも微妙に模様が違っていたりする。とはいえ最もメジャーな菱湖書体、のものなのかな?

-KI * +TO * -OU * +TO+TO+NY
 * -FU *  *  * +FU *  *  * 
+RY-TO *  * -FU-FU * -FU-KY
 * +HI+HI-GI-KI-TO *  * +RY
-UM-NG+KY+TO * -TO *  *  * 
 * +FU *  *  * -GI-GI+FU * 
 *  * +OU * +TO-GI *  * +HI
 *  *  *  * +TO * +FU * +KY
+RY *  *  *  *  * +KI * -KI

2. のものよりは正答率が高いようだ。

8五 の「成桂」が「成銀」になってしまっているのは似ているから仕方無いかな、という感じはするけど、 7七 の「歩兵」が「王将」になってしまっているのはヒドい。その他にも 1七 の「桂馬」が「飛車」になってしまっていたり、やっぱりちょっと理解できない誤答がある…。

4. Shogi.ioの局面スクリーンショット

f:id:sugyan:20180501013041p:plain

-NG * +TO * -FU * +TO+TO+UM
 * -FU *  *  * +NK *  *  * 
-UM-TO ?  * -FU-FU * -FU-NG
 * +KE+OU-GI-NG-TO *  * +KE
-NG-UM+NG+TO * -TO *  *  * 
 * +NK ?  *  * -GI-GI-UM * 
 *  * +NK * +TO-GI *  * +KE
 *  *  *  * +TO * -UM * +NG
-UM *  *  *  *  * +NG * -NG

なんと、ビックリするくらい当たらない。めちゃくちゃだ。正解しているのは「と金」と後手の「歩兵」くらいか…? 駒の向きさえ正しくない場合がある。

そういえば今回作成したデータセットにはこのような形の「一字駒」を使用していなかった。 まったく学習していないタイプの駒画像なので正解しないのは当然と言えるかもしれない。「と金」だけは普通の書体の駒でも一字のものになるから似ていて正答できているのかもしれない。

この一字駒系はあまりフリー素材としては公開されていないのかな? しかしこうした局面図も正しく認識できるようにするためにはこうした一字駒も学習データに加える必要がありそうだ。

まとめ

一応それっぽく簡単に学習データを作って学習させて分類モデルを作ることは出来た。 が、使えるかどうかというと全然まだまだダメそう。

枠線の影響を受けずにもっとロバストに駒部分を正しく分類できるように、また一字駒などに対しても正しく分類できるよう より良い学習データを作る必要がありそうだ。

分類モデルの再利用については MobileNet V2のもので問題ないか、他のモデルを使った方が良いかどうか? もう少し良質な学習データが揃ってから検証したいところ。

参照

Repository

Google Code Jam 2018: Qualification Round

Google Code Jamに今年もチャレンジ。

(昨年の: Google Code Jam 2017: Qualification Round - すぎゃーんメモ)

今年もRubyで、と思って最初の問題を解いてコード提出してみたのだけど、Runtime Errorになって上手くいかなくて、なんで??と思ってPython3に変更して(結局Runtime Errorは単純なバグだったんだけど) そのまま他の問題もPython3で提出した。

Saving The Universe Again

C (charge) と S (shoot) からなる文字列が与えられ、倍々になって蓄積されるダメージを一定量まで減らすための最低操作回数を求める。

文字列を1つずつ読んでいけばダメージ総量はすぐに求められる。あとはこれをどのように減らしていけるか。 効率的に減らすためには最大ダメージのもの つまり最後方にあるSを直前のCと交換することで つまり1回の操作は「ダメージ最大のものを半分に減らすこと」と考えられる。 初期値のダメージ総量は1回だけ計算すればよくて、あとはshootによるダメージと回数の辞書を作って 最大ダメージのものから選択してその回数を減らし 半分の値の辞書エントリの回数の方に加える。減少させた総量 > ダメージ初期値 - 防御力になるまでそれを繰り返せばよさそう。 足し算引き算を繰り返しても良かったけど、掛け算的に一気に処理もできるな、と思ってそうしてみた

t = int(input())
for i in range(1, t + 1):
    d, p = input().split(' ')
    d = int(d)
    dam = {}
    s = 1
    for c in list(p):
        if c == 'C':
            s *= 2
        if c == 'S':
            dam[s] = dam.get(s, 0) + 1
    s = sum([k * v for k, v in dam.items()])
    a = 0
    while s > d:
        m = max(dam.keys())
        if m == 1:
            a = 'IMPOSSIBLE'
            break
        r = m / 2 * dam[m]
        if s - r > d:
            s -= r
            a += dam[m]
            dam[int(m / 2)] = dam.get(int(m / 2), 0) + dam[m]
            del(dam[m])
        else:
            a += int((s - d) / (m / 2))
            if (s - d) % (m / 2) != 0:
                a += 1
            break
    print('Case #{}: {}'.format(i, a))

Trouble Sort

数値の列が与えられたときに「順番に連続の3つの数字並びを見て 左端より右端の数値の方が大きくなるようswapする」という操作を繰り返すsortingを行ったときに、正しくsortされない箇所を求める。

実際にこれを実行するとそれなりに実行時間がかかるはずなのでシミュレーションして求める、のはダメそう。 とはいえどうすれば良いか分からなかったので最初は実際にそれを実装してランダムな初期値を与えて実行結果を観察してみた。 先頭に来るべき最小値が0番目じゃなくて1番目に来ることがある… 2番目に小さな値が先頭に来たり… → はっ これは初期状態の位置が偶数番目か奇数番目かによるな → そういえばn番目の要素はn+2番目の要素としか交換しないのか →つまり奇数番目の要素による数値列と偶数番目の要素による数値列が別々にsortされる、と考えれば良いのか、とようやく気付いた。

あとは普通に分けてsortしたものを先頭から比較して見ていって 昇順でなくなるところを検出する。

t = int(input())
for i in range(1, t + 1):
    n = int(input())
    v = [int(e) for e in input().split(' ', n)]
    a = 'OK'
    e, o = [], []
    for j, val in enumerate(v):
        if j % 2 == 0:
            e.append(val)
        else:
            o.append(val)
    e.sort()
    o.sort()
    for j in range(len(o)):
        if e[j] > o[j]:
            a = j * 2
            break
        if j < len(e) - 1 and o[j] > e[j + 1]:
            a = j * 2 + 1
            break
    print('Case #{}: {}'.format(i, a))

Test set 2 では 3 <= N <= 10 ** 5 とのことで10万要素あると普通のsortも結構時間かかってダメかも…?と思ったけど それ以上の方法が思い付かなかったのでこのまま提出。

Go, Gopher!

なんか問題が複雑そうで読むのもつらかったので捨て…

Cubic UFO

立方体による影の面積が指定された値になるよう回転させたときの各面の座標を求めよ、というもの

すごい難しそう、とは思ったものの、Test set 1は 1.000000 ≤ A ≤ 1.414213という制約があったので、0°から45°までのz軸固定の回転のみで考えても良さそう。 Aの面積は cos(回転角 - 45°) * sqrt(2)で算出できるのは単純な計算で分かるので、それを解いて必要な回転角度を出して、それをもとに座標を出す。

import math

t = int(input())
for i in range(1, t + 1):
    a = float(input())
    r = 45.0 * math.pi / 180.0 - math.acos(a / math.sqrt(2))
    print('Case #{}:'.format(i))
    print('{:.8f} {:.8f} 0.0'.format(0.5 * math.cos(r), 0.5 * math.sin(r)))
    print('{:.8f} {:.8f} 0.0'.format(-0.5 * math.sin(r), 0.5 * math.cos(r)))
    print('0.0 0.0 0.5')

明らかにTest set 1のみのための解法でTest set 2には絶対通らないけど、これで提出

結果

"Saving The Universe Again", "Trouble Sort" は無事にどちらも正解だったようで、"Cubic UFO"のTest set 1の正解もあわせて合計49ptで通過した模様。

次のRoundはもう全然通過できる気がしないけど頑張ろう…

YAPC::Okinawa 2018 に参加した

yapcjapan.org

一昨年のHokkaido、昨年のKansai、Fukuokaと参加できていなかったので YAPC::Japanのイベントは今回が初参加。 沖縄こそは行きたい! と思っていたので、ついに参加することができて嬉しいです。

もうPerlは全然書かなくなってしまったけど、自分のエンジニアコミュニティの入り口というか多くの出会いはこの"YAPC"からだし、僕の大事なふるさと。

今回も色々な人たちのお話を聞いたり、懐かしい人たちと再会できたり、新しい人とも出会えたり、とても良い時間を過ごせて楽しかったです。

前夜祭はうっかりチケット買いそびれて参加できなかったので、本編のみ。バス移動の早起きはつらかったけど どうにか間に合った。

印象的だったトークのメモとか

  • HTTP/2で速くなるときとならない時 (kazuhoさん)

    • HTTP/2は通信多重化してデータ送受信できるしHTTP/1より速いだろう、と断定しがちだが 場合によってはそうでないこともある
    • パケットロスが起きる場合など様々な環境できちんと計測して調べることの重要性
  • 曖昧な正規表現と文脈自由文法の話 (sinya8282さん)

  • Perlを中心としたワンライナーあれこれ (tecklさん)

    • よくワンライナー書いていて好きだったので これは聴きたい、と思い
    • でも僕がやってたネタのようなものじゃなくてとても実用的で使えるワンライナー集だった
    • 忘れかけていたけど何やってるかだいたい理解できたのでよかった
  • 2018年春のPerl (karupaneruraさん)

    • charsbarさんがこれまでやってきていた取り組みを支えていきたい、というkarupaneruraさんの想いが素晴らしい
    • 大変だと思いますが続けていっていただけると本当にステキだと思いました
  • Perl's work inside the company (nipotanさん、kazeburoさん、n0442さん、hirataraさん)

    • nipotanさんのLINE NEWSの話とか 身近にいながら全然知らなかったので こういう機会に聴けて良かったw
  • Keynote (Yappoさん)

    • 僕とかおじさん世代のヒトたちには懐かしい思い出がいっぱいでエモいトークでしたね
    • 若い人たちは半分くらい何言ってんのか分かんなかったのでは…という心配もあるけどw

あとはpapixさんのスポンサーセッション?でブログを書くことについてのアツい想いを語ったのとかすごく良かったです。LTも面白かった!

2トラックのセッションあって気になってはいたものの聴けなかったトークもあり。 mackee_wさんの「High (Availability|Performance) WebSocket for Perl Real-Time Application」の話とか 途中からちょろっと覗いただけだったのでよく分からず「ゴーのミドルウェアの話だった」とか書いてしまったけど、後から公開された資料とか読んでみたらリアルタイムアプリケーションを動かす上での課題から それを解決するための方法、設計 そして実践投入するためにやってきたことなど 色々なものが詰まっていてすごかった。

振り返り

当日は朝からあいにくの天気で残念だったけど、

まぁ天気よかったら話きかずに外の景色を楽しんでしまっていたかもしれないし、日中は土砂降りでかえって良かったのかも…。

夕方には晴れて、懇親会では都会では見られないようなステキな星空を見上げながらオリオンビール飲んで若者たちと交流できたり、すごく良かった。 Perlは書いてないけどGoで詰将棋のプログラム書いてるのでその話しにまた沖縄行きます!とか勢いで言っちゃったので実現できるよう頑張ろうと思います。

本当にありがとうございました!!次も行けるといいな

というわけで

df-pnアルゴリズムを用いた詰将棋Solverによる最善解・余詰の導出

以前書いた、詰将棋問題生成の続き。

memo.sugyan.com

逆算による詰将棋の問題生成の方法自体は悪くないとして (バグによって有り得ない局面が出来上がったりしてしまったりもしたけど)、正しく詰将棋問題として成立するものが出来上がっているかどうかを検証するためのSolverが必要不可欠であり、これのパフォーマンスが生成のパフォーマンスにも影響してくる、というようなことを書いた。

実際、前回の記事のときに実装したSolverでは

  • 総当たり的に探索するのは3〜5手が限界
    • 詰将棋のルールに則る動きに限定しても、有り得る局面は指数関数的に増加する
  • 合駒が絡む問題に対して正しく解が導けないことがある
    • 先の展開まで読まないと無駄な合駒かどうかの判定ができない

といった問題があった。

df-pnアルゴリズムによる探索

2002年の論文「df-pn アルゴリズムの詰将棋を解くプログラムへの応用」がとても有名なもののようだ。

ざっくり、概要というか考え方について書いてみる

AND/OR Tree

まず、指し手とそれによる局面の変化を "AND/OR Tree" で考える。

例えば前回の記事で作成した問題に対しては、2手目までで以下のようなパターンが考えられる。

f:id:sugyan:20171119212134p:plain

├─┬ ▲4三金
│ ├── △3五玉
│ ├── △4五玉
│ └── △5四玉
├─┬ ▲3四金
│ ├── △4五玉
│ └── △5四玉
├── ▲3四竜
├─┬ ▲2四竜
│ ├── △4五玉
│ ├── △3四歩
│ ├── △3四香
│ ├── △3四桂
│ ├── △3四銀
│ ├── △3四金
│ ├── △3四角
│ └── △3四飛
└─┬ ▲5五銀
  ├── △同玉
  ├── △3五玉
  └── △4五玉

「詰み」をTrue、「不詰」をFalseと考えると、攻方手番の局面は「OR節点」、玉方手番による局面は「AND節点」となる。そして深さが変わるごとに「OR節点」⇔「AND節点」を交互に繰り返すことになる。

すなわち、上図での根(攻方手番)では「どれか1つの子節点でもTrueになればTrue、すべての子節点がFalseならばFalse」であり、その子となる深さ1の節点(玉方手番)では「すべての子節点がTrueになればTrueになり、どれか1つでも子節点がFalseならFalse」となる。

これはまさに詰将棋のルール通りで「どうやっても詰むことが出来なくなったら不詰」「どう応じられても詰ますことが出来れば詰み」といったものを 論理演算のAND/ORで表現している。

なので、最終的に上述のAND/OR Treeの根がTrueであることを証明できれば、それが詰将棋問題の解答に繋がる、ということになる。

単純に考えると すべての節点をどんどん掘っていって その節点が葉となったら(つまりもう子節点が作れない)、動かせる有効な手が存在しないということなので それが攻方手番ならTrue、玉方手番ならFalseである、と確定できる。 しかし全節点を探索していくとなるとその探索空間は非常に膨大なものになり、特に不詰の場合は玉がどこまでも逃げていく局面があったりして、大変な計算時間になってしまう。

AND/OR Treeは上記の性質上、どこか1つの節点の真偽値が確定するとその時点で親節点の真偽値も確定し 他の兄弟節点について調べる必要がなくなる場合もあるので、探索の方法次第では効率的に根の真偽値を確定させることができる。根の真偽値を確定させるだけなら、すべての節点についてTrueFalseであるかをハッキリさせる必要がなく、不確定の節点があっても構わない。 上の例で示した問題の場合、1手詰で▲3四竜の場合にもう玉方の応手が無く「詰み (True)」になっているので 他の手から伸びていく深さ2以上の節点は調べるまでもなく「詰み」であることを言える。

df-pnアルゴリズム

df-pnアルゴリズムでは、このAND/OR Treeを効率良く探索するために、「証明数 (proof number)」「反証数 (disproof number)」という概念を用いる。簡単に言うと「その節点がTrueであることを証明するためにTrueであることを証明しないといけない節点の数」、またはその逆、という感じで、詰将棋でいうと「この手を指して詰むことを証明するためには、玉方の応手この手とこの手が考えられて…これらの局面でも詰むことを証明できなければ…」というパターン数のようなものになる。

これらの値を各節点で計算し、より値の小さいもの(その局面の真偽値を確定させるために証明すべき節点の数が少ないもの)を優先的に深さ優先で探索していく。これが depth first proof number (df-pn) アルゴリズムの基本的な考え方、のようだ。 実際に詰将棋を解くときも、まずは応手が限られる手から考えたり それに対する応手も最悪のパターンを優先的に考えたりすると思うし、直感的に近いものはあるんじゃないだろうか。

ポイントとしては、必ずしもすべて節点の真偽値を確定させる必要はなく、「詰み (True)」「不詰 (False)」「未知 (Unknown)」の3つの状態が許されると考えて木を探索していき、その上で簡単に確定させることが出来そうな未知節点から優先的にさらに深く探索していく、というところ。

このアルゴリズムに用いられている手法により、探索の結果として各節点の証明数・反証数が更新されていく。証明数が∞になると「この節点がTrueであると証明するために証明すべき節点が無限にある → つまりこの節点はFalse」、反証数の場合はその逆で、これらの値によって節点の真偽値を確定させることが出来る。

論文内では、単純にそれだけではなく ハッシュ(というかハッシュテーブル、辞書?)を用いて、同一局面が現れる場合の無駄な探索を減らし、それらに加えて局面がループするときに起きるGHI (Graph History Interaction)問題に対する対処などの工夫を入れている。

実装

論文の付録に疑似コードが載っているので、これをそのままコードに落とし込めば ほぼ問題ない、はず (全然簡単には出来なかったけど、、)。

先行例としては以下のものなどを参考にさせていただきました。

自分はGoで将棋プログラムを書いていたのでGoで。

df-pn Solverの特徴・制約

ということで、df-pn Solverを実装することで詰将棋の問題を正確に速く解くことが出来るように… と言いたいところだけど、これですべて解決というわけではない。

df-pnアルゴリズムによって解くのはあくまでも『その局面が「詰み」か「不詰」かを判定し、その証明ができる』だけであって、『詰将棋の問題に対する正解を導ける』わけではない。

例えば、以下のような問題。

f:id:sugyan:20180217212826p:plain

これを自作のdf-pn Solverを使って解くと、以下のようにAND/OR Treeと各節点の真偽値が得られる。

├─┬ ▲3二と (?)
│ ├── ...
│ └── ...
├─┬ ▲2二と (?)
│ └── ...
├─┬ ▲2二銀 (?)
│ └── ...
├─┬ ▲1二銀 (?)
│ ├── ...
│ ├── ...
│ └── ...
└─┬ ▲3二銀 (T)
  └─┬ △1二玉 (T)
    ├── ▲2一銀 (?)
    ├─┬ ▲2三銀成 (T)
    │ └─┬ △2一玉 (T)
    │   ├── ▲3二と (T)
    │   ├── ▲2二と (?)
    │   ├── ▲2二成銀 (?)
    │   ├── ▲1二成銀 (?)
    │   └── ▲3二成銀 (?)
    ├── ▲2三銀 (?)
    ├── ▲2三と (?)
    └── ▲2二と (?)

()内に書いてあるのが?は真偽未確定、TTrueFFalseを表す、とする。 ...で省略しているけど実際にはもう少し他の1手目に対しても深く探索されている。

結果としてこの得られた木のTrue節点を辿っていくことで「▲3二銀 △1二玉 ▲2三銀成 △2一玉 ▲3二と」という5手の詰み手順を得ることができる。 これは確かに詰んでいるのだけど、「正解ではない」。

実際は「▲3二銀 △1二玉 ▲2三と」までの3手詰で、これが最短の正解となる。

df-pnアルゴリズムは性質上、各節点の証明数・反証数を計算し対象となる節点の値が閾値を超えた時点で探索を終了する。つまり多くの場合は根の真偽値が確定した時点で終了となり、当然ながら現れている全部の節点について探索を行うわけではない。

ので、このように深さ優先で探索した結果、3手目の▲2三との節点を探索する前に▲2三銀成の節点から詰みを見つけてしまい、それで終了してしまう、ということがある。

おそらく指し将棋プログラムでの思考ルーチンとして組み込むぶんにはそれほど問題なく使えると思うのだけど(指し将棋の目的は通常「勝つこと」なので 最短の手順である必要はない)、詰将棋においては「最短で最善の手順で詰む(玉方も最長になるよう最善の手で応じる)」ということを踏まえて解を導かなければならない。 単純に探索終了したら正解が導ける、というものでもないようだ。


また、「不詰」の証明にも少し弱いようで(これは自分の実装のミスか何かなのかもしれないけど)、例えば以下のような局面を解かせようとすると、いつまでも計算が終わらない。

f:id:sugyan:20180217220442p:plain

df-pnアルゴリズムは各節点での証明数・反証数が閾値を超えると一度探索を止めて その閾値以下の別の節点の探索に移り また別のところで閾値が上がるとそこに戻ってきたりする、という仕組みになっているようなのだけど、どうも上記の局面などの場合 別々の節点から辿り着く複数の節点がそれぞれ閾値をお互いに上げていってしまい どちらも譲らずどこまでも証明数やその閾値が増加しつづけていって止まらなくなってしまうようだ。

最善解の導出

…と、ここまで書いてきたところで ここからが本題。

df-pn Solverは完璧に答えを出すことが出来るものではないけど、効率的に探索を行い特定の問題を解くのには利用できるので、これを応用することで「詰将棋の解答プログラム」を作ることを考えた。

別解を探す

先述のように、実際は3手で詰めるものに対して5手で詰む解を導いてしまう、といったことが起きる。 これを回避するためには「実際に他の手でもっと短く詰むことが出来ないかを調べる」ということになる。

単純には、探索が終了してもまだ真偽値が確定していない節点に対して再度Solverにかけて探索していく、という手法をとることになるけど、結局それだと総当たりで調べるのとほぼ変わらなくて効率が良くない。

一つ大事なのは、これを探し始める時点で1つの正解候補となる解答が導出できているので、「それより短い手順で解ける手はないか」だけを探索すれば良いことになる。 既に導いている解より長くなる詰み手順を見つけても無意味なので どこまでも深く探索する必要はなく、ある閾値以上の深さに辿り着いた時点で「これは不詰」と扱って探索を打ち切ってしまっても良い。

既に見つけている正解手順以外の手を探索した結果 より短い詰み手順を見つけることが出来たら、正解候補を更新して またもっと短い手順が無いか探索する。こうしていくことで探索の深さ上限を小さくしながらすべての未確定節点の真偽値を確かめていける。

また大事なのは探索対象の未知節点を選ぶ順番で、例えば1手目から正解候補と違う節点を調べようとしても大抵は不詰なので(いわゆる完全作の場合、正解手順は1つのみで最終手以外に余詰は存在しない)、探索空間が大きく無駄に時間がかかってしまう。 先にもっと短い解候補が発見できていれば探索深さの上限を下げられるので効率が良くなる。

なので基本的には深さ優先で、その時点で発見できている正解候補の手順の周辺の未知節点から探索していくのが効率が良い。7手の詰みが見つかっているのならその手順を辿りつつ別の5手目、別の3手目、という順番で選択して探索していくと より速く短い解候補が発見できる。はず。 その結果として例えば3手で詰む手順が発見できれば、1手目で違う手順を探索する(不詰であることを証明することになる)場合にはもう4手目以降の深さは考慮せず打ち切ることができる。

また、先述のように不詰の証明は計算が終わらない事態が起こることがある。 これについてはどうしようもなかったので、計算時に用いる∞の値をギリギリまで小さくすることで回避した。 正しく「詰み」を導くことができる場合は証明数はせいぜい数百〜千程度の値までしか上がらないので、それより少し大きな値を∞としておけば 先述のような状態になってもいずれは「証明数が∞に到達した」となり、不詰として扱うことができるようになる。 現時点で自分で試している限りでは 2^12 (= 4096) くらいの値にしていても大抵の7〜9手くらいまでは問題なく解けている。

最善経路の探索

Solverによる探索によって新規に詰みの手順が見つかるごとに、「最短・最善の解候補」を算出することになる。

解となる詰み手順は、基本的には探索後のAND/OR Treeの根からTrueの子節点を選択して葉に到達するまで下っていくことで求められる。 とはいえその経路は一意ではなく 複数の兄弟節点で詰みに繋がる場合も多い。特に玉方手番は「すべての子節点がTrueになればTrueになり、どれか1つでも子節点がFalseならFalse」というAND節点であり、どう応じられても詰むことができる ということが確定していない限りTrueにはならないので、玉方に複数の応手がある場合はすべてが詰みに繋がるようになっているはず。

ではどの経路を正解候補として選択するか。 詰将棋のルールとして「玉方は最善を尽くし、最も長く手数がかかるように逃げる」そして攻方は「最善・最短で詰ます」というのがあるので、これを満たすよう考える。

さらに駒余りも考慮して、すべての節点において

  • 攻方手番で複数の選択肢がある場合は最も手数が短くなる経路
    • 同手数のものが複数ある場合は駒を余らせる方を優先
  • 玉方手番で複数の選択肢がある場合は最も手数が長くなる経路
    • 同手数のものが複数ある場合は駒を余らせない方を優先

を選択するようにして、再帰的に求めていく。

合駒問題

このあたりで登場してくるのが「無駄合駒判定」の問題。

なかなか厄介なルールで、玉方による合駒が有効な場合と無効な場合がある。 すぐに取られて王手が続くなら無効、ということで「玉方の利きが無い位置に打つのはすべて無駄」と考えて そういう場所に合駒できない前提で解いてしまうのがラク… だと思いきや、そうとも限らなくて。 有名なのは看寿賞作品の3手詰「新たなる殺意」のような場合。

f:id:sugyan:20180218183219p:plain

ここに▲7三香成としたときの△8六歩は玉方の利きもなく無駄…に思えるのに、▲同角により9筋が開くことによって最終的に詰みが無くなってしまう。有効な合駒となる。

こういう場合もあるので、応手の候補を考える段階ではまだ「ここには合駒できない」という明確な判断はできず、結局すべての可能性を考慮した上で「合駒で打った駒がすぐ取られて、最終的にその駒を余らせて詰ますことができる」場合は無駄合駒、と判断するしかない。

なので、AND/OR Treeの探索の時点ではとりあえず合駒も含めてすべて探索し、その上で解答候補の詰み手順を探索する段階で 合駒が無効なものであったかどうかを判断する必要がある。

再帰的に解答を探索している中で、「玉方合駒 → 攻方同取る」があった場合には最終的な駒余りを確認し、その合駒に打ったものが余る場合は解答候補から外す。このようにすることで無駄合駒の絡む問題に対しても正しく解を導けるようになる。

ただしそれはつまり、必ずしも「節点の深さ=詰み手数」にはならない、ということで、ちょっと例が見つからなかったけど「先に5手の詰みが見つかったが、実は3手の合駒利かずの詰みがある」といった場合に、先に見つかった5手を探索深さの上限値として打ち切る設定で他の手を探索すると 「連続無駄合駒により深さ7で詰むが結果的には3手の詰み手順となる」といった手順を見つけられなくなってしまう。 ので、合駒を含む経路に現れる節点は例外として上限値よりもさらに深いところまで探索する、といった処理を入れるようにした。

余詰の検出

こうしてAND/OR Treeを使った「最善解の導出」が出来ると、同じように「余詰の検出」も簡単に行うことが出来る。

詰将棋の問題生成におけるSolverの重要性は前記事に書いた通りで、生成された問題が

  • 正しく意図した手数で詰ますことができる
  • 駒余りが無い
  • 余詰が無い

ことをチェックする必要がある。 最善解を導く過程で最短手数以内の全節点についての真偽値は確定しているはずなので、同様に根から探索しつつ「攻方に同手数で解ける複数の解が存在しないか」をチェックしていくだけ。 1手詰め以外の場合の最終手は複数あっても良いのでそこだけは許容する。

問題生成への応用

こうして、最善解・余詰をそれなりに速く正確に導くことができると 5手詰や7手詰の問題の場合でも前記事で書いたような問題生成ロジックで「正しく詰将棋の問題として成立しているか」をチェックしていくことができる。

ただしどうしても先述のように不詰の証明でループし続けたり、合駒の探索が余計な深さを必要とするなどで解答に異常に時間がかかってしまう場合がある。 なのでtimeoutを設定し、解答に一定以上の時間がかかってしまった場合はそこで「この問題は解けない」と判定して次のパターンで生成を試みるように、とした。

時間はかかるが、とりあえず前記事と同様のロジックで5手詰の問題も生成できるようになったようだ。

今後の課題や展望など

評価・解説

AND/OR Treeを丁寧に探索することで、「どれくらいのパターン数の手を考慮することでこの解答を導くことが出来たか」を測ることができる。 攻方も玉方もほぼ1手に絞られるような簡単な問題なら総節点数の少ない小さな木になるし、多くの選択肢が考えられる場合は幅の広い木になる。 この木を作る過程で考慮した節点の数を「問題の難しさの指標」として使うことができそうだ。

未知のままでも兄弟節点の真偽により親節点の真偽が確定している場合などは考慮しなくて良いので、基本的には真偽値が確定している節点の数だけを数え上げれば良いことになる。 ただし単純にそれだけを考えると、持駒に飛車角があって離れた場所にも打つことが出来る場合や 玉方が合駒する場合など(最大で7種類を合駒の候補として考えられる)に どうしても選択肢が増えるので、そういう場合には間引きするように少し調整してみている。

とりあえずはこうして得られる「スコア」を各問題の生成時に一緒に計算しておき、@などではそれなりにスコアの高めのものを優先的に選択して出題するようにしてみている(…つもりなんだけど、あんまりうまくいってないかも)。

問題が「良問かどうか」というのは探索量などとはまた別の評価指標もありそうなので、このへんも研究して より「良い問題」も量産できるようにしていきたいところ。


もう一つAND/OR Treeを有効利用できる点として、その木を見ることで「この手はどうして詰むか、この手だとどうして詰まないか」が説明できるようになる、というのが挙げられる。 つまり解いた問題に対して 例えばFalseの節点とその子節点を見れば「▲3二と▲2二とには△同玉▲2二銀には△1二玉で詰みません」、Trueの節点を辿れば「▲3二銀が正解で△1二玉の一手、これに△2三とで詰みです」という文章を、ある程度のテンプレートを用意しておけば作れる、はず。

まだ出来ていないけど、生成した問題にそれぞれこういった解説文をつけられると良さそうだな、と思っている。

計算速度

問題生成時にはランダムな詰み局面を生成してからそれを巻き戻して局面を作って それが詰将棋として成り立っているかどうかをチェックする…、というのを繰り返していて、運次第ではあるけど それなりに時間がかかる。

現状、3手詰くらいなら数秒〜数十秒程度で生成できるが、5手詰めは相当運が良くない限りは数分〜十数分計算し続けてようやく出来上がる、といった感じになってしまっている。 生成するたびにかけるSolverが速く解いてチェックしてくれればそれだけ生成速度も上がるはずではあるのだけど、まだまだパフォーマンス不足なようだ。 1.0秒でtimeoutする設定にしているけど、それも頻繁に起きている。

先述のように、合駒が絡む場合にどうしても探索の幅・深さが深くなって計算時間がかかってしまう問題がある。 いちいちすべての合駒パターンを考慮していると膨大な計算時間になってしまうので、これは減らす必要がありそうだ。 これには引用論文にもある「優越関係の利用」が使えそうで、ある局面が詰むならそれより持駒の少ないこれらの局面も詰みである、といったことが言えるようになるので 計算量を減らすことができそうだ。

あとはまぁ 自分が書きやすく管理しやすいGoでゼロから書いてみていたけど、やはりパフォーマンスを考慮するならC++とかで良いライブラリを利用して作るのが良いのかもしれない…とか。 それで実際どれくらい速くできるのか分からないけど。

生成ロジック自体もまだ無駄な計算を減らせる部分があると思うので、じっくり見直して改善していきたい。

ISUCON7 勝てなかった

ISUCON7id:kazeburo さんと id:gfx さんと、チーム「スギャブロエックス」で出場して、入賞もならず、最終結果5位で終わりました。

f:id:sugyan:20171127124436p:plain

うーん 悔しい。

お題

WebSocketで通信しつつ各ルームで複数のユーザが操作するので同期をとりながら値を更新していく、的なもの。

チームの動き

予選のときと同様に、言語はNode.jsを選択。 今回はTypeScriptを使うようにしていっても良いのでは、と gfxの発案で序盤にすべてのコードをTypeScriptに変換して それを中心に進めていくことに。

実際、TSLintが良く効いて構文エラーや簡単な型の不一致などはすぐに気付くことができて編集しやすくて良かったです。

序盤の各サーバへのログインやNodeのバージョン変更、deployのためのscriptなどの環境整備はお2人に任せて、自分はとにかくアプリケーションをまずgitリポジトリに入れて ローカル環境で動かせることの確認、コードの確認など。

MySQLトランザクションを使っている部分などは正直あまり詳しく分からなかったので そこらへんの解消はkazeburoさんにお任せして、とりあえずは 参照だけで更新のない数レコードだけのm_itemテーブルを使わないよう定数化して排除する、くらいのところから始める。

トランザクションによる負荷を少しでも軽減させるために、と各サーバでMySQLも立ち上げて 同じroomへのリクエストは単一のサーバにリクエストが集まるように4台それぞれ分けて動かすように kazeburoさんによる変更も。

1000回ループ問題

昼御飯の時間あたりから CPUの高負荷をとにかく取り除きたいな…と思いながらコード見ていて calcStatus関数の中で1000回のforループを繰り返しているのに気付いて、そこを消そう…!と思い至って そこを完全に任せてもらい改善の作業をすることに。

元のコードはこんな感じ。

  calcStatus (currentTime, mItems, addings, buyings) {

    ...

    // currentTime から 1000 ミリ秒先までシミュレーションする
    for (let t = currentTime + 1; t <= currentTime + 1000; t++) {
      totalMilliIsu = totalMilliIsu.add(totalPower)
      let updated = false

      // 時刻 t で発生する adding を計算する
      if (addingAt[t]) {
        let a = addingAt[t]
        updated = true
        totalMilliIsu = totalMilliIsu.add(bigint(a.isu).mul(bigint('1000')))
      }

      // 時刻 t で発生する buying を計算する
      if (buyingAt[t]) {
        updated = true
        const updatedID = {}
        for (let b of buyingAt[t]) {
          const m = mItems[b.item_id]
          updatedID[b.item_id] = true
          itemBuilt[b.item_id] = itemBuilt[b.item_id] ? itemBuilt[b.item_id] + 1 : 1
          const power = m.getPower(b.ordinal)
          itemPower[b.item_id] = itemPower[b.item_id].add(power)
          totalPower = totalPower.add(power)
        }
        for (let id in updatedID) {
          itemBuilding[id].push({
            time:        t,
            count_built: itemBuilt[id],
            power:       this.big2exp(itemPower[id]),
          })
        }
      }

      if (updated) {
        schedule.push({
          time:        t,
          milli_isu:   this.big2exp(totalMilliIsu),
          total_power: this.big2exp(totalPower),
        })
      }

      // 時刻 t で購入可能になったアイテムを記録する
      for (let itemId in mItems) {
        if (typeof itemOnSale[itemId] !== 'undefined') {
          continue;
        }
        if (0 <= totalMilliIsu.cmp(itemPrice[itemId].mul(bigint('1000')))) {
          itemOnSale[itemId] = t
        }
      }
    }

    ...
  }

currentTimeより以後のaddingAt, buyingAtの情報を元に、1000ms後の状態をシミュレーションする、というもの。

addingAtbuyingAtも無ければ、少なくとも totalMilliIsuの値の計算はtotalPowerの値を足すのを繰り返すだけなのだから 掛け算に変換すればいいだけ。 ただ それによって途中でアイテムが購入可能になったらその時刻は記録しておかないといけない、というのがあるので それだけは逆算して割り算で求める。

まずは 「addingAtbuyingAtも無ければ」という条件に限定して上記の変更をしてみたが、(実際どうだったか確かめていないけど)殆ど効果なかったようで、やはりaddingAt, buyingAtがあるときも処理していかなくては、と大幅に変更をしていく。

基本的な考え方は 「addingAtがあった時刻」or「buyingAtがあった時刻」だけ必要な更新をして、その間の区間totalPowerが加算されるだけだから掛け算で加えていく、という形。

最終的なコードは以下のようになった。

  calcStatus(currentTime, mItems, addings, buyings) {

    ...

    const eventTimes = new Set<number>(); // adding or buying times

    for (let a of addings) {
      if (a.time <= currentTime) {
        ...
      } else {
        ...
        eventTimes.add(a.time);
      }
    }

    for (let b of buyings) {
      ...
      if (b.time <= currentTime) {
        ...
      } else {
        ...
        eventTimes.add(b.time);
      }
    }

    // イベントが起きる時間だけ時系列に列挙する
    eventTimes.add(currentTime + 1000)

    let prevTime: number = currentTime
    Array.from(eventTimes).sort((a, b) => a - b).forEach((time) => {
      // 1000以上は計算する必要なし
      if (time > currentTime + 1000) {
        return
      }
      // 直前の状態は保持しておく
      const prevToatalMilliIsu = totalMilliIsu
      totalMilliIsu = totalMilliIsu.add(totalPower.mul(Number(time) - prevTime))
      if (addingAt[time]) {
        let a = addingAt[time]
        totalMilliIsu = totalMilliIsu.add(a.isu + "000")
      }
      // 購入可能になっていたらその時刻を逆算
      for (let itemId in mItems) {
        if (typeof itemOnSale[itemId] !== 'undefined') {
          continue;
        }
        if (0 <= totalMilliIsu.cmp(itemPriceX1000[itemId])) {
          if (totalPower.cmp(BI0) == 0) {
            itemOnSale[itemId] = time
          } else {
            const diff = itemPriceX1000[itemId].sub(prevToatalMilliIsu)
            const div = diff.div(totalPower)
            const mod = diff.mod(totalPower)
            const t = mod.eq(BI0) ? div.toNumber() : div.add(1).toNumber()
            if (prevTime + t < time) {
              itemOnSale[itemId] = prevTime + t
            } else {
              itemOnSale[itemId] = time
            }
          }
        }
      }
      if (buyingAt[time]) {
        const updatedID = {}
        for (let b of buyingAt[time]) {
          const m = mItems[b.item_id]
          updatedID[b.item_id] = true
          itemBuilt[b.item_id] = itemBuilt[b.item_id] ? itemBuilt[b.item_id] + 1 : 1
          const power = m.getPower(b.ordinal)
          itemPower[b.item_id] = itemPower[b.item_id].add(power)
          totalPower = totalPower.add(power)
        }
        for (let id in updatedID) {
          itemBuilding[id].push({
            time: time,
            count_built: itemBuilt[id],
            power: this.big2exp(itemPower[id]),
          })
        }
      }
      if (time != currentTime + 1000) {
        schedule.push({
          time: time,
          milli_isu: this.big2exp(totalMilliIsu),
          total_power: this.big2exp(totalPower),
        })
      }
      prevTime = Number(time)
    });

    ...
  }

currentTimeからその1000ms後までの区間で、addingAtbuyingAtがあった時刻を時系列に並べ、その区間ごとに前のイベント時刻との差分を使って掛け算でtotalMilliIsuに加える値を計算。

なんとなく実装して 用意されていたユニットテスト(めっちゃ有り難かった)も通った、のが14:40。 これでどうだ!と走らせてみると

負荷走行前のバリデーションに失敗しました。time = 1511588407211 の status において on_sale[item_id = 5].time が正しくありません : actual 1511588407999, expected 1511588408000

!!! 1msずれてる !!!

どっかで境界値バグがあったか… と試しにitemOnSaletimeの値を+1してみたりしたけどやっぱり通らず(当たり前か。この時間は無駄だったな…)、思った以上にバリデーションが細かくチェックしていた。

うーん うーん と唸りながらコードを見直して totalMilliIsuの更新タイミングと購入可能チェックのタイミングが順番がおかしかった…と思って 直したり

負荷走行中のバリデーションに失敗しました。room ROOMmVe0gsm8OU7zKMKUGNOQ にて schedule に同一の時刻が含まれています

と出て そうかaddingAtbuyingAtは同時刻に起こることもあるから単純に混ぜるんじゃなくてuniqueにしなきゃダメじゃん、ってなったり

あとはon_saleの時刻がどうしても大きくズレることがあって まとめて加算後に購入可能になったときitemPrice - prevToatalMilliIsutotalPowerで割ってprevTimeに足すだけだと思ってたけど 実はtotalPowerの値は小さくてaddingAtで大きく加算されたことによって購入可能になった場合もあるな、ということに気付いて直し。

さらに割り算も単純に割ってたらダメで小数は切り上げないといけない。これが境界値バグだった、のに気付いたのは一番最後だった。しかしbigintではMath.ceilは出来ないのか… divmodで分けて計算して、mod0より大きかったらdiv+1に…と修正。

とにかくことごとく勝手に罠にハマりまくって、ようやくちゃんと通ったのが 16:15頃。結局3時間くらいここにかかってしまった…。 とりあえずスコアは20,000点を越えるようにはなったけど、そこまですごいブレイクスルーにはならなかった。。

終盤

あとは無駄な部分を少しずつ潰しておこう、と itemのgetPrice(count)getPrice(count)countの値が同じなら同じ値を返すだけだしキャッシュしていこう、と一度計算したものは記憶させてそれを返すように、など。 あとで気付いたけどこういうのはinitialize時にある程度の数まで計算してしまえば走行中に計算するのは1回も無くなるんだった。

結局あまり時間も残っておらず そこから大幅な改善も出来ず、あとは2人の作業を見守るだけ、みたいな感じになってしまった。

再起動テストや微調整などを見守り、最高スコアは18:03に29,944を記録していたけど、それ以上の数字は出ないまま フィニッシュ。

反省

インフラよりアプリケーション寄りの問題だったので、主にアプリケーション担当として参加させてもらっている以上はもっとバリューを出していかなければダメだった。

もっと素早く正確にコードの変更をガンガン進めていければ インメモリ化してMySQLから離れる施策も検討できたかもしれないし、他にもチューニングしていける箇所は改善していけたはず。

1000回ループ問題も そこまで効果をちゃんと考えずに取り組んで すごい時間かかってしまったし…。 (このへん 競プロ勢のヒトとかだったらもっと簡単にサクっと解決できていたんだろうなー とか)

そもそもbigintの演算がどれくらい負荷になっていて どこを最優先で減らしていくべきかをちゃんと分析できていなかった、というのもある。 もっと各所でどんな値が流れていてどんな処理が時間とCPUを食っているか、を意識して調査しながら解決していく姿勢・技術が必要だった。

などなど、アプリケーションエンジニアとしての力不足を痛感する敗戦でした。 この先もまだこういう道で生き残っていきたいし、ちゃんと勉強して精進していかねばなー… と思ったのでした。

まとめ

  • やれるだけのことはやった
    • けど力不足でした

懇親会では色んな方と反省会しながらお話できて良かったです。ISUCONケーキめっちゃ美味しかった。

f:id:sugyan:20171125192629j:plain

運営・出題の皆様、主催・協賛の皆様、そして一緒にチームを組んで戦ってくださった id:kazeburoさん、id:gfx氏、本当にありがとうございました!

f:id:sugyan:20171125234916j:plain