Tatsumakiアプリケーションを作ってみる

Tatsumakiを使ったアプリの例は幾つか見たことはあったのだけど、

実際に自分で書いたことはなくて、結構分からないことが多かったので、自分でもサンプルアプリを作ってみた。やっぱり実際に書きながら試したりする方が理解できる…。
Tatsumakiにあるchatアプリの例を参考にしながら。
Tatsumaki/eg/chat at master · miyagawa/Tatsumaki · GitHub


作ったのはアクセスログ(timestamp, useragent)をリアルタイムにブラウザ表示するもの。
https://github.com/sugyan/TatsumakiSample


psgiではTatsumaki::Applicationでパスとハンドラの対応を指定。

#!/usr/bin/env perl
use strict;
use warnings;

use FindBin::libs;
use IndexHandler;
use StreamHandler;
use Tatsumaki::Application;

my $app = Tatsumaki::Application->new([
    '/'       => 'IndexHandler',
    '/stream' => 'StreamHandler',
]);

return $app->psgi_app;


ハンドラでStreamingなことをしない場合は普通にrenderでテンプレートファイルを指定して返すだけ。

package IndexHandler;
use strict;
use warnings;
use parent 'Tatsumaki::Handler';

use Tatsumaki::MessageQueue;

sub get {
    my ($self) = @_;

    my $mq = Tatsumaki::MessageQueue->instance(1);
    $mq->publish({
        type => 'message',
        timestamp => scalar localtime,
        useragent => $self->request->user_agent,
    });
    $self->render('index.html');
}

1;

ここではこのときにTatsumaki::MessageQueueに(instanceのchannelは1に固定)、タイムスタンプとUserAgent情報を詰めてpublishしている。ので"/"にアクセスされるたびにpublishされることになる。


Streamingな部分は、__PACKAGE__->asynchronous(1);とセットし、pollとかpoll_onceとかでMessageQueueからの通知を待ち受ける。

package StreamHandler;
use strict;
use warnings;
use parent 'Tatsumaki::Handler';
__PACKAGE__->asynchronous(1);

use Tatsumaki::MessageQueue;

sub get {
    my ($self) = @_;

    my $mq = Tatsumaki::MessageQueue->instance(1);
    my $client_id = $self->request->param('client_id');
    $mq->poll_once(
        $client_id,
        sub {
            my @events = @_;
            $self->write(\@events);
            $self->finish;
        },
    );
}

1;


クライアント側ではjquery.ev.jsを使ってlong-pollingでサーバからのプッシュを受け取って表示を更新。
GitHub - beppu/jquery-ev: a COMET event loop for jQuery

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-8">
    <title>test</title>
    <script type="text/javascript" src="/static/js/jquery-1.6.2.min.js"></script>
    <script type="text/javascript" src="/static/js/jquery.ev.js"></script>
    <script type="text/javascript">
      $(function () {
          $.ev.handlers.message = function (obj) {
              $('#logs').append($('<pre>').append([obj.timestamp, obj.useragent].join(' - ')));
          };
          $.ev.loop('/stream?client_id=' + Math.random());
      });
    </script>
    <style>
      pre { 
        margin: 0; 
        font-family: Courier;
      }
    </style>
  </head>
  <body>
    <div id="logs"></div>
  </body>
</html>

これで出来上がり。"/"にアクセスされるたびに開いている画面すべてでログが追記されていく。
multipart_xhr_pushを使うときはちょっとやり方が変わる。chatデモや、以下を参考に。
Big Sky :: javascriptで動くtwitter streamクライアントを作るならばmultipart/mixedを使うべき


テストはどう書けば良いのだろう? 色々試行錯誤しつつ

use strict;
use warnings;
use Test::More tests => 9;

use AnyEvent;
use AnyEvent::HTTP;
use FindBin;
use Furl;
use JSON::XS 'decode_json';
use Test::TCP qw/empty_port wait_port/;
use Try::Tiny;
use Plack::Runner;
use Proc::Guard;

my ($tatsumaki, $port) = prepare();

my $rand = rand 1;
my $count = 0;

my $polling; $polling = sub {
    my $cv = AE::cv;
    my $w = http_request
        GET => "http://localhost:${port}/stream?client_id=${rand}",
            want_body_handle => 1,
            sub {
                my ($hdl, $hdr) = @_;

                is($hdr->{Status}, 200, '/stream');
                $hdl->push_read(
                    json => sub {
                        my (undef, $json) = @_;
                        is($json->[0]{useragent}, "agent${count}", 'useragent');
                        $cv->send;
                    }
                );
            };
    $cv->recv;
    return if ($count >= 3);
    $polling->();
};

my $cv = AE::cv;
my $w = AE::timer 1, 1, sub {
    $count++;
    my $res = Furl->new(agent => "agent${count}")->get("http://localhost:${port}/");
    ok($res->is_success, '/');
    $cv->send if $count >= 3;
};
$polling->();
$cv->recv;


sub prepare {
    my $psgi = File::Spec->catfile($FindBin::Bin, '..', 'app.psgi');
    my $port = empty_port();
    my $async = proc_guard(
        sub {
            my $runner = Plack::Runner->new;
            $runner->parse_options('-p', $port, '-s', 'Twiggy', $psgi);
            $runner->run;
        }
    );
    wait_port($port);

    return ($async, $port);
}

というかんじで書いてみたけど… クライアント側のlong-pollingをエミュレートしながらテストするのはちょっと大変だなーという印象。

$ prove -v
t/01_stream.t ..
1..9
Twiggy: Accepting connections at http://0.0.0.0:50849/
127.0.0.1 - - [28/ 7/2011 16:19:27] "GET /stream?client_id=0.525066107036288 HTTP/1.1" 200 80 "http://localhost:50849/stream?client_id=0.525066107036288" "Mozilla/5.0 (compatible; U; AnyEvent-HTTP/2.1; +http://software.schmorp.de/pkg/AnyEvent)"
127.0.0.1 - - [28/ 7/2011 16:19:27] "GET / HTTP/1.1" 200 700 "-" "agent1"
ok 1 - /
ok 2 - /stream
ok 3 - useragent
127.0.0.1 - - [28/ 7/2011 16:19:28] "GET /stream?client_id=0.525066107036288 HTTP/1.1" 200 80 "http://localhost:50849/stream?client_id=0.525066107036288" "Mozilla/5.0 (compatible; U; AnyEvent-HTTP/2.1; +http://software.schmorp.de/pkg/AnyEvent)"
127.0.0.1 - - [28/ 7/2011 16:19:28] "GET / HTTP/1.1" 200 700 "-" "agent2"
ok 4 - /
ok 5 - /stream
ok 6 - useragent
127.0.0.1 - - [28/ 7/2011 16:19:29] "GET /stream?client_id=0.525066107036288 HTTP/1.1" 200 80 "http://localhost:50849/stream?client_id=0.525066107036288" "Mozilla/5.0 (compatible; U; AnyEvent-HTTP/2.1; +http://software.schmorp.de/pkg/AnyEvent)"
127.0.0.1 - - [28/ 7/2011 16:19:29] "GET / HTTP/1.1" 200 700 "-" "agent3"
ok 7 - /
ok 8 - /stream
ok 9 - useragent
ok
All tests successful.
Files=1, Tests=9,  3 wallclock secs ( 0.03 usr  0.00 sys +  0.28 cusr  0.06 csys =  0.37 CPU)
Result: PASS


まだよく分かってないことが多いのでソースとか読みつつ勉強する…