Promises/Future と Async::Defer

You're Missing the Point of Promises · GitHubを読んだ。


特に興味深かったのが"That Second Paragraph"の見出しで始まるセクション。曰く、Promisesとは、非同期ルーチンとその結果を受ける処理における以下の「4つのシナリオ」を表現できるようにするものらしい。

  1. 非同期ルーチンが正常に終了し、その結果も正常である。 (fulfilled and accepted)
  2. 非同期ルーチンが正常に終了したが、その結果が異常なので例外を投げる。 (fufilled but rejected)
  3. 非同期ルーチンが例外を投げたが、その例外をキャッチして適切に処理する。 (rejected but handled)
  4. 非同期ルーチンが例外を投げ、その例外をキャッチするも、処理できずにrethrowする。 (rejected and rethrown)

もっとも、個人的にはこれに加えて第5のシナリオがあるのではと思っている。

  • 非同期ルーチンが例外を投げたが、キャッチしない。 (rejected and not handled)

自分としては、「Promisesはコールバックを複数くっつけたり、非同期コールを逐次実行したりできるモノ」程度の認識で、おそらくそれが原因でPromisesの本質がよく分からなかったが、ようやく理解できたような気がする。

で、Perlにおいては、現在自分の知る限り以下の二つのPromises実装が存在する。

なお、Object::Deferred ( http://search.cpan.org/perldoc?Object%3A%3ADeferred ) もCommonJS promise API実装を謳っているが、このモジュールはthenableではないので少なくともPromises/A実装とは言えないと思う。

Promisesは非同期ルーチンを扱う上では非常にシンプルでエレガントな仕組みだけど、それを使うプログラマに対して若干のパラダイムシフトを要求するように思う。従来の非同期ルーチンの扱いは、「終わったらコールバック呼んでね」というものだと思うけど、then()チェーンを組むとそういったプログラマの意図がコードから全く読み取れなくなるのでちょっと不安になってしまう。

もっとも、Promisesを使うことで同期ルーチンも非同期ルーチンもほぼ同様の書き方ができるというのはいいことなのだろう。then()チェーンの中で値を返すルーチンが突如同期から非同期(あるいは非同期から同期)になってもちゃんと動くのだから。しかし、それだったらプリミティブ型も含めてあらゆる値がthenableじゃないと中途半端な気もする。Rubyだったら割と簡単にできそうだけど、Perlはどうだろう。

また一方で、Promisesとは異なるアプローチで非同期ルーチンを扱うモジュールもある。

基本的に、このモジュールはコールバックで結果を渡すタイプの非同期ルーチンを複数登録して逐次実行していくモジュール。その意味ではasync.jsに似ている。しかしAsync::Deferは逐次実行に加えて並列実行、繰り返し、条件分岐、さらに例外処理までサポートする。登録する個々の非同期ルーチンは基本的に「終わったらコールバック呼んでね」というコーディングスタイルになるので、Promisesのようなエレガントさはなく若干冗長なコードになるものの、学習障壁はかなり低いと思う。

というわけで、これら3つのモジュールを使って、冒頭の「4つのシナリオ」 + 1をどう書けるか、試してみた。丸ごとのコードはGist(Comparing Promises, Future and Async::Defer in four (+1) scenarios · GitHub)に。

Promisesの場合

Promisesモジュールは主にthen(), resolve(), reject(), when()を実装したシンプルなモジュール。

まず、AnyEvent(AE)を使ってサンプルとなる非同期ルーチンを作る。

use constant WAIT_TIME => 0.5;
sub wait_promise {
    my $result = shift;
    my $d = Promises::Deferred->new();
    my $t; $t = AE::timer WAIT_TIME, 0, sub {
        undef $t;
        if($result eq "ok") {
            $d->resolve($result);
        }else {
            $d->reject($result);
        }
    };
    return $d->promise;
}

このルーチンは新たにPromises::Deferredオブジェクトを作り、引数が"ok"なら成功(resolve)、それ以外なら失敗(reject)を通知する。

こいつを使って5つのシナリオを書き下すと以下のような感じになるだろう。

$cv = AE::cv;
wait_promise("ok")->then(sub {
    say "@_";
    say "Scenario 1: fulfilled and accepted.";
    return "Result accepted";
}, sub {
    say "This is not executed";
    return @_;
})->then(sub {
    say "@_";
    $cv->send;
}, sub {
    say "This is not executed";
});
$cv->recv;

$cv = AE::cv;
wait_promise("ok")->then(sub {
    say "@_";
    say "Scenario 2: fulfilled but rejected.";
    return Promises::Deferred->new->reject('Result rejected')->promise;
}, sub {
    say "This is not executed";
    return @_;
})->then(sub {
    say "This is not executed";
}, sub {
    say "@_";
    $cv->send;
});
$cv->recv;

$cv = AE::cv;
wait_promise("ng")->then(sub {
    say "This is not executed";
}, sub {
    say "@_";
    say "Scenario 3: rejected but handled.";
    return Promises::Deferred->new->resolve("Exception handled")->promise;
})->then(sub {
    say "@_";
    $cv->send;
}, sub {
    say "This is not executed";
});
$cv->recv;

$cv = AE::cv;
wait_promise("ng")->then(sub {
    say "This is not executed";
}, sub {
    say "@_";
    say "Scenario 4: rejected and rethrown.";
    return Promises::Deferred->new->reject("Exception rethrown")->promise;
})->then(sub {
    say "This is not executed";
}, sub {
    say "@_";
    $cv->send;
});
$cv->recv;

$cv = AE::cv;
wait_promise("ng")->then(sub {
    say "This is not executed";
}, sub {
    @_; ## Exception goes through here
})->then(sub {
    say "This is not executed";
}, sub {
    say "@_";
    say "Scenario 5: rejected and not handled.";
    $cv->send;
});
$cv->recv;

このように、Promisesではthen()メソッドチェーンで処理の流れを記述する。then()メソッドの第1引数は成功(resolve)時のコールバック、第2引数は失敗(reject)時のコールバックで、戻り値は新しいPromiseオブジェクトになる。

then()メソッドを利用する際には最大3つのPromiseオブジェクトが関係する。

  1. p1: then()メソッドのレシーバとなるPromiseオブジェクト (i.e. $p1->then( ... ))
  2. p2: then()メソッドに渡すコールバックで戻り値となるPromiseオブジェクト (i.e. $p1->then(sub { ...; return $p2 }, ...))
  3. p3: then()メソッドが返すPromiseオブジェクト (i.e. $p3 = $p1->then( ... ))

使ってみて、Promisesモジュールは以下のようになっているらしいことが分かった。

  • then()のコールバックでPromiseオブジェクト(p2)が返された場合、p3は実質的にp2と同等になる。つまりp2がresolveすればp3もresolveし、p2がrejectすればp3もreject。それまではp3は不定状態。
  • then()のコールバックでPromiseオブジェクト以外のものが返された場合、p3の状態(resolve/reject)はp1と同等になる。ただし、p3の持つ値はthen()のコールバックの戻り値となる。
  • 以上のことから、シナリオ2, シナリオ3ではthen()のコールバック内でPromiseオブジェクトを返し、処理の流れをresolve/reject間で切り替えている。
  • then()のコールバックは両方共に省略不可能
    • (2012/11/25追記) Promises 0.02でrejectコールバックは省略可能になった。
  • then()のコールバックで例外を出す(dieする)と死ぬ。例外はユーザが処理しないといけない。
  • なお、then()のコールバックで返したp2がrejectになると、どういうわけかp2に渡した値がp3に渡されない。バグかもしれないのでレポートしておいた https://rt.cpan.org/Public/Bug/Display.html?id=81358
    • (2012/11/25追記) Promises 0.02で修正されました。

Futureの場合

FutureはPromisesに比べると異常なまでにリッチなAPIを持っているが、できることはPromisesとあまり変わらないように思える。やりたいことに合わせて使うメソッドを切り替えることはコードのreadabilityにつながるかもしれないが、メソッドの名前と意味を覚えるのが面倒かもしれない。

PromisesになくてFutureにある機能としては、非同期ルーチンの実行をキャンセルするためのインタフェースが備わっているということが挙げられる。

Futureを使ったサンプル非同期ルーチン(wait_future)の実装は、Promisesの場合とほぼ同じなのでここでは省略。

5つのシナリオは以下のようになる。

$cv = AE::cv;
wait_future("ok")->and_then(sub {
    my $f = shift;
    say $f->get;
    say "Scenario 1: fulfilled and accepted.";
    return Future->new->done("Result accepted.");
})->on_ready(sub {
    my $f = shift;
    say $f->get;
    $cv->send;
});
$cv->recv;

$cv = AE::cv;
wait_future("ok")->and_then(sub {
    my $f = shift;
    say $f->get;
    say "Scenario 2: fulfilled but rejected.";
    return Future->new->fail("Result rejected.");
})->on_ready(sub {
    my $f = shift;
    say $f->failure;
    $cv->send;
});
$cv->recv;

$cv = AE::cv;
wait_future("ng")->and_then(sub {
    say "This is not executed.";
    return @_;
})->or_else(sub {
    my $f = shift;
    say $f->failure;
    say "Scenario 3: rejected but handled.";
    return Future->new->done("Exception handled.");
})->on_ready(sub {
    my $f = shift;
    say $f->get;
    $cv->send;
});
$cv->recv;

$cv = AE::cv;
wait_future("ng")->and_then(sub {
    say "This is not executed.";
    return @_;
})->or_else(sub {
    my $f = shift;
    say $f->failure;
    say "Scenario 4: rejected and rethrown.";
    return Future->new->fail("Exception rethrown.");
})->on_ready(sub {
    my $f = shift;
    say $f->failure;
    $cv->send;
});
$cv->recv;

$cv = AE::cv;
wait_future("ng")->and_then(sub {
    say "This is not executed.";
    return @_;
})->on_ready(sub {
    my $f = shift;
    say $f->failure;
    say "Scenario 5: rejected and not handled.";
    $cv->send;
});
$cv->recv;

気づいた点。

  • Futureでthen()メソッドに対応するメソッドは、and_then(), or_else(), followed_by()の3種類。
  • and_then()はresolveコールバックのみ、or_else()はrejectコールバックのみ登録する。followed_by()は両方をキャッチするコールバックを登録する。
  • and_then(), or_else(), followed_by()の引数はメソッドのレシーバとなっているFutureオブジェクト。
  • and_then(), or_else(), followed_by()の戻り値はFutureオブジェクトでなければならない。引数として受け取ったFutureオブジェクトをそのまま戻してもOK。
  • and_then(), or_else(), followed_by()のコールバックで例外を出す(dieする)と死ぬ。やはり例外はユーザが処理しなければいけない。
  • on_done(), on_fail(), on_ready()メソッドはFutureにコールバックを登録するが、新たなFutureオブジェクトを返さない。

個人的には、thenチェーンを作らないon_ready系メソッドは要らないように思える。一方で、thenチェーンを作るfollowed_by系メソッドは絶対にFutureオブジェクトを返さないといけないのが若干面倒。

Async::Deferの場合

Async::Deferが想定する非同期ルーチンは従来のコールバック渡しによるもの。なので、wait_promiseに相当するルーチンは以下のようになる。

sub wait_callback {
    my ($result, $callback, $errback) = @_;
    my $t; $t = AE::timer WAIT_TIME, 0, sub {
        undef $t;
        if($result eq "ok") {
            $callback->($result);
        }else {
            $errback->($result);
        }
    };
}

5つのシナリオは以下のようになる。

$cv = AE::cv;
Async::Defer->new->do(sub {
    my $d = shift;
    wait_callback("ok", sub { $d->done(@_) }, sub { $d->throw(@_) });
})->do(sub {
    my ($d, @results) = @_;
    say "@results";
    say "Scenario 1: fulfilled and accepted.";
    $d->done("Result accepted.");
})->do(sub {
    my ($d, @results) = @_;
    say "@results";
    $cv->send;
    $d->done;
})->run;
$cv->recv;

$cv = AE::cv;
Async::Defer->new->do(sub {
    my $d = shift;
    wait_callback("ok", sub { $d->done(@_) }, sub { $d->throw(@_) });
})->try->do(sub {
    my ($d, @result) = @_;
    say "@result";
    say "Scenario 2: fulfilled but rejected.";
    $d->throw("Result rejected.");
})->catch( qr/.?/ => sub {
    my ($d, @result) = @_;
    say "@result";
    $cv->send;
    $d->done;
})->run;
$cv->recv;

$cv = AE::cv;
Async::Defer->new->try->do(sub {
    my $d = shift;
    wait_callback("ng", sub { $d->done(@_) }, sub { $d->throw(@_) });
})->do(sub {
    my $d = shift;
    say "This is not executed.";
    $d->done;
})->catch( qr/.?/ => sub {
    my ($d, @result) = @_;
    say "@result";
    say "Scenario 3: rejected but handled.";
    $d->done("Exception handled.");
})->do(sub {
    my ($d, @result) = @_;
    say "@result";
    $cv->send;
    $d->done;
})->run;
$cv->recv;

$cv = AE::cv;
Async::Defer->new->try->try->do(sub {
    my $d = shift;
    wait_callback("ng", sub { $d->done(@_) }, sub { $d->throw(@_) });
})->do(sub {
    my $d = shift;
    say "This is not executed.";
    $d->done;
})->catch( qr/.?/ => sub {
    my ($d, @result) = @_;
    say "@result";
    say "Scenario 4: rejected and rethrown.";
    $d->throw("Exception rethrown.");
})->catch( qr/.?/ => sub {
    my ($d, @result) = @_;
    say "@result";
    $cv->send;
    $d->done;
})->run;
$cv->recv;

$cv = AE::cv;
Async::Defer->new->try->do(sub {
    my $d = shift;
    wait_callback("ng", sub { $d->done(@_) }, sub { $d->throw(@_) });
})->do(sub {
    my $d = shift;
    say "This is not executed.";
    $d->done;
})->catch( qr/.?/ => sub {
    my ($d, @result) = @_;
    say "@result";
    say "Scenario 5: rejected and not handled.";
    $cv->send;
    $d->done;
})->run;
$cv->recv;


Async::Deferの基本的な使い方は以下のようになっている。

  1. new()メソッドでAsync::Deferオブジェクトを作成する。
  2. do/while/if/try/catchメソッドなどで、非同期ルーチンを順に登録する。
  3. 各非同期ルーチンのステップは、done()メソッドの呼び出しで正常終了、throw()メソッドの呼び出しで例外終了。ループ内ではbreak()とcontinue()も使える。
  4. 最後に、run()メソッドで起動する。

Async::Deferでは、例外をキャッチするにはtry - catchブロックを作らないといけないのが少々面倒。

Promises系モジュールとコールバック系モジュールを混ぜてしまう場合

全ての非同期ルーチンがPromiseオブジェクトを返してくれるのならばPromisesを使った非同期ルーチン処理は非常にエレガントなんだが、とはいえ現実はコールバック系I/Fを持つモジュールの方が多いだろう。この時、コールバックI/FをPromiseに変換するのが割と面倒だと思う。

例えば、wait_callbackを無理やりPromisesと一緒に使う場合。

$cv = AE::cv;
my $d = Promises::Deferred->new();
wait_callback("ok", sub { $d->resolve(@_) }, sub { $d->reject(@_) });
$d->then(sub {
    say "@_";
    my $d = Promises::Deferred->new();
    wait_callback("ng", sub { $d->resolve(@_) }, sub { $d->reject(@_) });
    return $d->promise;
}, sub {
    @_;
})->then(sub {
    say "Success: @_";
}, sub {
    say "Failure: @_";
    $cv->send;
});
$cv->recv;

このように、名前付き変数($d)にDeferredオブジェクトを格納する必要がある。対象ルーチンが一つならラッパを書けばいいが、無数のコールバック系非同期ルーチンを使っている場合、結構めんどくさそう。

逆に、wait_promiseをAsync::Deferと一緒に使ってみる。

cv = AE::cv;
Async::Defer->new->try->do(sub {
    my $d = shift;
    wait_promise("ok")->then(sub { $d->done(@_) }, sub { $d->throw(@_) });
})->do(sub {
    my ($d, @result) = @_;
    say "@result";
    wait_promise("ng")->then(sub { $d->done(@_) }, sub { $d->throw(@_) });
})->do(sub {
    my ($d, @result) = @_;
    say "Success: @result";
    $d->done;
})->catch(qr/.?/ => sub {
    my ($d, @result) = @_;
    say "Failure: @result";
    $d->done;
    $cv->send;
})->run;
$cv->recv;

Async::Deferは元々コードがちょっと冗長になるというのもあるが、比較的自然な形で書けていると思う。