forkIOがmask状態を引き継ぐ意味が分からない

(注: この記事はghc 7.6.3, base-4.6.0.1で検証している)

HaskellのIOモナドには(フツーの命令型言語で言うところの)例外の仕組みが備わっているが、この例外には2種類がある。

  • 同期例外(synchronous exception): スレッド自身が行う処理によって発生する例外
  • 非同期例外(asynchronous exception): スレッドの外部から(throwTo関数などによって)送りつけられる例外

非同期例外はいつ発生するのか本当に予測不可能な例外であり、どちらかというとPOSIXのシグナルにイメージとしては近いと思う。

不用意なタイミングで非同期例外が発生するとプログラムの状態がおかしなことになってしまうため、Haskellには非同期例外の発生を一時的に抑制する仕組みがある。

mask_ :: IO a -> IO a

mask_関数は引数としてIOアクションをとり、非同期例外の発生を抑制した(マスクされた)アクションに変換して返す。

マスクされたアクションの実行中は、そのスレッドで非同期例外が発生することはない。その間、throwTo関数で非同期例外を送信したスレッドは、送信先スレッドのマスクが解除されて非同期例外がちゃんと発生するまでブロックする(これはghcに固有の動作かもしれない)。そのため、マスクされるアクションの範囲は必要最小限に抑えることが重要となる。

さて、Haskellではスレッドを作る時は一般的にforkIO関数を使う。

forkIO :: IO () -> IO ThreadId

forkIOは引数としてIOアクションをとり、それを別スレッドで実行し、呼び出し元にはスレッドIDを返す。

ここで重要なのは、forkIOで発生させたスレッドは親スレッドのマスク状態を引き継ぐという点である(これはghcに固有の動作かもしれない)。

そのため、親スレッドが非常に限定的な範囲でマスクしていたとしても、たまたまマスクした状態で子スレッドを作った場合、子スレッドは全体がマスクされることになる。

-: マスク解除状態の処理
*: マスク状態の処理

親スレッド:  -----------***------------------- ...
                         |
                      [forkIO]
                         |
子スレッド:              ********************* ...

さらに悪いことに、Haskellの例外処理関数の中には与えられたアクションをマスクして実行する類のものがある。

例えば、bracket関数の第1引数(リソース獲得アクション)と第2引数(リソース解放アクション)はマスクされる。これは以下のコードで検証できる。

import Control.Exception (bracket, getMaskingState)

showMask label = do
  state <- getMaskingState
  putStrLn (label ++ ": " ++ show state)

main = bracket (showMask "before") (const $ showMask "after") (const $ showMask "body")

-- before: MaskedInterruptible
-- body: Unmasked
-- after: MaskedInterruptible

したがって、例えばリソース獲得アクションとしてforkIOを書いてしまうと子スレッドは全体がマスクされる。

この事実はドキュメントには書かれていない。そもそもマスク状態は型に表れないので、ユーザは関数へ渡したアクションがマスクされるのかされないのか確実に見極めることはできない。

このように、マスク状態の引き継ぎは子スレッド全体が意図せずマスクされてしまう事態を引き起こすが、そもそもなぜforkIOの仕様がマスク状態を引き継ぐようになっているかが理解できない。非同期例外はスレッドごとに独立なのだからわざわざ引き継ぐ必要はないのではないか。

親スレッドのマスク状態に関わらず子スレッドのマスクを解除するには、とりあえず以下のようにすればよい・・・と思う。

{-# LANGUAGE CPP #-}
import qualified Control.Concurrent as CC
#if MIN_VERSION_base(4,3,0)
#else
import qualified Control.Exception as CE
#endif

forkIOUnmasked :: IO () -> IO CC.ThreadId
#if MIN_VERSION_base(4,4,0)
forkIOUnmasked action = CC.forkIOWithUnmask (\unmask -> unmask action)
#elif MIN_VERSION_base(4,3,0)
forkIOUnmasked = CC.forkIOUnmasked
#else
forkIOUnmasked action = if CE.blocked then CE.unblock $ CC.forkIO action else CC.forkIO action
#endif

マスク周りのAPI(以前は"ブロック(block)"と呼ばれていた)はいろいろと変更になっているため、古いbaseパッケージもサポートしようと思ったらバージョンごとに実装を変える必要がある。

正直、何か見落としている点があるかもしれないので上記のコードもそれほど自信はない。。