Tatsumakiを使ったアプリの例は幾つか見たことはあったのだけど、
- TwitterStreamをTatsumakiを使ってブラウザ表示 - メメメモモ
- TatsumakiとTwiggy使ってみた - punitan (a.k.a. punytan) のメモ
- Big Sky :: TatsumakiとDUI Streamを使って画像ストリーミングサーバ作ってみた。
実際に自分で書いたことはなくて、結構分からないことが多かったので、自分でもサンプルアプリを作ってみた。やっぱり実際に書きながら試したりする方が理解できる…。
Tatsumakiにあるchatアプリの例を参考にしながら。
Tatsumaki/eg/chat at master · miyagawa/Tatsumaki · GitHub
作ったのはアクセスログ(timestamp, useragent)をリアルタイムにブラウザ表示するもの。
https://github.com/sugyan/TatsumakiSample
psgiではTatsumaki::Application
でパスとハンドラの対応を指定。
#!/usr/bin/env perl use strict; use warnings; use FindBin::libs; use IndexHandler; use StreamHandler; use Tatsumaki::Application; my $app = Tatsumaki::Application->new([ '/' => 'IndexHandler', '/stream' => 'StreamHandler', ]); return $app->psgi_app;
ハンドラでStreamingなことをしない場合は普通にrender
でテンプレートファイルを指定して返すだけ。
package IndexHandler; use strict; use warnings; use parent 'Tatsumaki::Handler'; use Tatsumaki::MessageQueue; sub get { my ($self) = @_; my $mq = Tatsumaki::MessageQueue->instance(1); $mq->publish({ type => 'message', timestamp => scalar localtime, useragent => $self->request->user_agent, }); $self->render('index.html'); } 1;
ここではこのときにTatsumaki::MessageQueue
に(instanceのchannelは1に固定)、タイムスタンプとUserAgent情報を詰めてpublishしている。ので"/"にアクセスされるたびにpublishされることになる。
Streamingな部分は、__PACKAGE__->asynchronous(1);
とセットし、poll
とかpoll_once
とかでMessageQueueからの通知を待ち受ける。
package StreamHandler; use strict; use warnings; use parent 'Tatsumaki::Handler'; __PACKAGE__->asynchronous(1); use Tatsumaki::MessageQueue; sub get { my ($self) = @_; my $mq = Tatsumaki::MessageQueue->instance(1); my $client_id = $self->request->param('client_id'); $mq->poll_once( $client_id, sub { my @events = @_; $self->write(\@events); $self->finish; }, ); } 1;
クライアント側ではjquery.ev.js
を使ってlong-pollingでサーバからのプッシュを受け取って表示を更新。
GitHub - beppu/jquery-ev: a COMET event loop for jQuery
<!DOCTYPE html> <html> <head> <meta charset="utf-8"> <title>test</title> <script type="text/javascript" src="/static/js/jquery-1.6.2.min.js"></script> <script type="text/javascript" src="/static/js/jquery.ev.js"></script> <script type="text/javascript"> $(function () { $.ev.handlers.message = function (obj) { $('#logs').append($('<pre>').append([obj.timestamp, obj.useragent].join(' - '))); }; $.ev.loop('/stream?client_id=' + Math.random()); }); </script> <style> pre { margin: 0; font-family: Courier; } </style> </head> <body> <div id="logs"></div> </body> </html>
これで出来上がり。"/"にアクセスされるたびに開いている画面すべてでログが追記されていく。
multipart_xhr_push
を使うときはちょっとやり方が変わる。chatデモや、以下を参考に。
Big Sky :: javascriptで動くtwitter streamクライアントを作るならばmultipart/mixedを使うべき
テストはどう書けば良いのだろう? 色々試行錯誤しつつ
use strict; use warnings; use Test::More tests => 9; use AnyEvent; use AnyEvent::HTTP; use FindBin; use Furl; use JSON::XS 'decode_json'; use Test::TCP qw/empty_port wait_port/; use Try::Tiny; use Plack::Runner; use Proc::Guard; my ($tatsumaki, $port) = prepare(); my $rand = rand 1; my $count = 0; my $polling; $polling = sub { my $cv = AE::cv; my $w = http_request GET => "http://localhost:${port}/stream?client_id=${rand}", want_body_handle => 1, sub { my ($hdl, $hdr) = @_; is($hdr->{Status}, 200, '/stream'); $hdl->push_read( json => sub { my (undef, $json) = @_; is($json->[0]{useragent}, "agent${count}", 'useragent'); $cv->send; } ); }; $cv->recv; return if ($count >= 3); $polling->(); }; my $cv = AE::cv; my $w = AE::timer 1, 1, sub { $count++; my $res = Furl->new(agent => "agent${count}")->get("http://localhost:${port}/"); ok($res->is_success, '/'); $cv->send if $count >= 3; }; $polling->(); $cv->recv; sub prepare { my $psgi = File::Spec->catfile($FindBin::Bin, '..', 'app.psgi'); my $port = empty_port(); my $async = proc_guard( sub { my $runner = Plack::Runner->new; $runner->parse_options('-p', $port, '-s', 'Twiggy', $psgi); $runner->run; } ); wait_port($port); return ($async, $port); }
というかんじで書いてみたけど… クライアント側のlong-pollingをエミュレートしながらテストするのはちょっと大変だなーという印象。
$ prove -v t/01_stream.t .. 1..9 Twiggy: Accepting connections at http://0.0.0.0:50849/ 127.0.0.1 - - [28/ 7/2011 16:19:27] "GET /stream?client_id=0.525066107036288 HTTP/1.1" 200 80 "http://localhost:50849/stream?client_id=0.525066107036288" "Mozilla/5.0 (compatible; U; AnyEvent-HTTP/2.1; +http://software.schmorp.de/pkg/AnyEvent)" 127.0.0.1 - - [28/ 7/2011 16:19:27] "GET / HTTP/1.1" 200 700 "-" "agent1" ok 1 - / ok 2 - /stream ok 3 - useragent 127.0.0.1 - - [28/ 7/2011 16:19:28] "GET /stream?client_id=0.525066107036288 HTTP/1.1" 200 80 "http://localhost:50849/stream?client_id=0.525066107036288" "Mozilla/5.0 (compatible; U; AnyEvent-HTTP/2.1; +http://software.schmorp.de/pkg/AnyEvent)" 127.0.0.1 - - [28/ 7/2011 16:19:28] "GET / HTTP/1.1" 200 700 "-" "agent2" ok 4 - / ok 5 - /stream ok 6 - useragent 127.0.0.1 - - [28/ 7/2011 16:19:29] "GET /stream?client_id=0.525066107036288 HTTP/1.1" 200 80 "http://localhost:50849/stream?client_id=0.525066107036288" "Mozilla/5.0 (compatible; U; AnyEvent-HTTP/2.1; +http://software.schmorp.de/pkg/AnyEvent)" 127.0.0.1 - - [28/ 7/2011 16:19:29] "GET / HTTP/1.1" 200 700 "-" "agent3" ok 7 - / ok 8 - /stream ok 9 - useragent ok All tests successful. Files=1, Tests=9, 3 wallclock secs ( 0.03 usr 0.00 sys + 0.28 cusr 0.06 csys = 0.37 CPU) Result: PASS
まだよく分かってないことが多いのでソースとか読みつつ勉強する…