Structured Concurrency with Kotlin coroutines
はじめに
こんにちは!KTCでAndroidエンジニアをしている長谷川(@gotlinan)です!
普段はmyrouteというアプリの開発をしています。myrouteのメンバーが書いた他の記事も是非読んで見てください!
本記事ではKotlin coroutinesを使用したStructured Concurrencyを解説します。
Structured Concurrencyは知っているけど、coroutineを使う方法はどんな感じ?って方は、並行処理のための便利関数をご確認ください。
Structured Concurrency?
Structured Concurrencyって何でしょう?日本語にすると「構造化された並行処理」みたいな感じだと思います。イメージとしては、二つ以上の処理を並行しながら、それぞれでキャンセルやエラーが発生した場合も正しく管理されていること、だと思います。本記事を通じて、Structured Concurrencyについて詳しくなりましょう!
今回は二つのよくある例をもとに紹介してみます。
1. エラーを協調したい
まずよくある例として処理1と処理2を実行後、その結果に応じて処理3を実行したい場合です。
図にすると、以下のようになります。
処理1と処理2を実行後、その結果に応じて処理3を実行する
この場合、処理1でエラーが発生した場合、処理2を継続しても無駄ですね。
したがって処理1でエラーが発生した場合、処理2をキャンセルする必要があります。
同様に処理2でエラーが発生した場合も、処理1をキャンセルして、処理3に進む必要はありません。
2. エラーを協調したくない
次によくある例として、画面内に複数のエリアがあり、それぞれ独立して表示する場合です。
図にすると、以下のようになります。
画面内に複数のエリアがあり、それぞれ独立して表示する
この場合、仮に処理1でエラーが発生しても、処理2や処理3の結果は表示したい場合があります。
したがって処理1でエラーが発生した場合でも処理2や処理3はキャンセルせずに継続する必要があります。
二つの例は理解できましたか?coroutineでは上記のような例を、Structured Concurrencyの考えをもとに簡単に実装することができます!
ただし理解するためにはcoroutineの基礎を理解する必要があります。次のセクションからは実際にcoroutineを学びましょう!
基礎は知っているよっていう方は、並行処理のための便利関数までスキップしてください。
coroutineの基礎
詳しい解説の前にcoroutineの基礎的な話をしましょう。
coroutineではCoroutineScope
からlaunch
関数を呼ぶことで非同期の処理を開始できます。具体的には以下のような形です。
CoroutineScope.launch {
// 実行したいコード
}
ところでなぜCoroutineScope
を使用する必要があるのでしょうか?それは非同期処理では、「どのスレッドで実行するか」、「キャンセルやエラーが発生した時にどう振る舞うか」がとても重要だからです。CoroutineScope
はCoroutineContext
を持ちます。あるCoroutineScope
で実行されるcoroutineはCoroutineContext
をもとに制御されます。
具体的にはCoroutineContext
は以下の要素などから構成されます。
Dispatcher
: どのスレッドで動くかJob
: キャンセルの実行、キャンセルやエラーの伝搬CoroutineExceptionHandler
: エラーハンドリング
CoroutineScope
を作成する際は、それぞれの要素を+演算子で渡すことが可能です。
そしてCoroutineContext
はcoroutineの親子間で継承されます。例えば以下のようなコードがあったとします。
val handler = CoroutineExceptionHandler { _, _ -> }
val scope = CoroutineScope(Dispatchers.Default + Job() + handler)
scope.launch { // 親処理
launch { // 子処理1
launch {} // 子処理1-1
launch {}// 子処理1-2
}
launch {} // 子処理2
}
この場合はCoroutineContext
が以下のように継承されます。
CoroutineContextの継承
おや、画像を見るとJob
は継承されずに新しく作成されているようですね?
これは間違いではないです。「CoroutineContext
はcoroutineの親子間で継承されます」と述べましたが、厳密には「Job
以外のCoroutineContext
はcoroutineの親子間で継承される」の方が正しいです。それならJob
はどうなるんだ?と思いますよね。
次のセクションではJob
について理解を深めてみましょう!
Jobとは
coroutineにおけるJob
とは何でしょうか?それは短くまとめるのであれば、「coroutineの実行を制御する」ものだと思います。
Job
にはcancel
メソッドがあり、開発者は開始されたcoroutineをいつでもキャンセルすることが可能です。
val job = scope.launch {
println("start")
delay(10000) // Long Process
println("end")
}
job.cancel()
// start (printed out)
// end (not printed out)
Androidエンジニアがよく利用するであろうviewModelScope
やlifecycleScope
に紐づくJob
はそれぞれのライフサイクルの終わりの時にキャンセルされています。これによりユーザーが画面外にでた場合に継続中の処理があっても、開発者が意識せずに正しくキャンセルされます。
そんな超重要なJob
ですが、coroutineの親子間でのキャンセルやエラーの伝搬の役割も持ちます。前のセクションではJob
は継承されない話をしましたが、その例を使うと、以下の画像のようにJob
は階層関係を持ちます。
Jobの階層関係
実際にJob
の定義を一部抜粋すると、以下のようになっています。
public interface Job : CoroutineContext.Element {
public val parent: Job?
public val children: Sequence<Job>
}
親子関係を保持できるになっており、キャンセルやエラーが発生したときに親や子のJob
を操作できそうですね。
次の章からはJob
の階層関係を通じて、どのようにcoroutineがキャンセルやエラーを伝搬しているか確認してみましょう!
cancelの伝搬
coroutineがキャンセルされた場合、以下のような挙動になります。
- 自身の子coroutineを全てキャンセルする
- 自身の親coroutineには影響しない
※ CoroutineContext
をNonCancellable
に変更することで親coroutineのキャンセルの影響を受けないcoroutineを実行することも可能です。Structured Concurrencyのテーマとは離れるため、今回は割愛します。
つまりキャンセルはJob
の階層関係において下方向に影響します。
下記の例だと、Job2
がキャンセルされた場合、Job2
、Job3
、Job4
で動いているcoroutineがキャンセルされます。
Cancelの伝搬
エラーの伝搬
実はJob
には大きく分けて、Job
とSupervisorJob
があります。
この種類によって、エラーが発生した場合の挙動が変わります。
自身のJobでエラーが発生したときと、子Job
でエラーが発生したときの挙動を二つの表にまとめました。
Job
内でエラーが発生したとき
子Jobを | 自身のJobを | 親Jobに | |
---|---|---|---|
Job | 全てキャンセルする | エラー終了する | エラーを伝搬する |
SupervisorJob | 全てキャンセルする | エラー終了する | エラーを伝搬しない |
エラーが子Job
から伝搬してきたとき
他の子Jobを | 自身のJobを | 親Jobに | |
---|---|---|---|
Job | 全てキャンセルする | エラー終了する | エラーを伝搬する |
SupervisorJob | 何もしない | 何もしない | エラーを伝搬しない |
二つの表を参考にしてエラー発生時の挙動を表したイメージは、Job
とSupervisorJob
の場合でそれぞれ以下のようになります。
Jobの場合
通常のJobのJob2でエラーが発生した場合
- 子Jobである
Job3
、Job4
はキャンセルされる - 自身のJobである
Job2
はエラー終了する - エラーを親Jobである
Job1
に伝搬する Job1
の他の子JobであるJob5
をキャンセルするJob1
がエラー終了する
SupervisorJobの場合
通常のJobのJob2でエラーが発生した場合
- 子Jobである
Job3
、Job4
はキャンセルされる - 自身のJobである
Job2
はエラー終了する - エラーを親SupervisorJobである
Job1
に伝搬する
意識してもらいたい点として、エラーが伝搬されたSupervisorJob1
は、他の子Job(Job5
)をキャンセルせず、自身も通常終了します。
ちなみにJob
が、通常終了したのか、エラーにより終了したか、キャンセルにより終了したのかを確認する方法として、invokeOnCompletion
を使用することができます。
val job = scope.launch {} // Some work
job.invokeOnCompletion { cause ->
when (cause) {
is CancellationException -> {} // cancellation
is Throwable -> {} // other exceptions
null -> {} // normal completions
}
}
catchされなかった例外
ところでcoroutineで捕捉されなかった例外はどうなるのでしょうか?
例えば
- TopLevelの
Job
でエラーが発生したり、伝搬してきた場合はどうなるの? SupervisorJob
でエラーが発生したり、伝搬してきた場合どうなるの?
などの疑問があると思います。
答えは
CoroutineExceptionHandler
が指定されていれば、呼ばれるCoroutineExceptionHandler
が指定されていなければ、スレッドのデフォルトのUncaughtExceptionHandler
が呼ばれる
となります。
coroutineの基礎で前述のように、CoroutineExceptionHandler
もCoroutineContext
の仲間です。以下のように渡すことができます。
val handler = CoroutineExceptionHandler { coroutineContext, throwable ->
// Handle Exception
}
val scope = CoroutineScope(Dispatchers.Default + handler)
もしCoroutineExceptionHandler
が指定されていない場合、スレッドのデフォルトのUncaughtExceptionHandler
が呼ばれます。
開発者が指定したい場合は以下のように記述します。
Thread.setDefaultUncaughtExceptionHandler { thread, exception ->
// Handle Uncaught Exception
}
自分が本記事執筆まで誤解していたこととして、SupervisorJob
を使用すれば、エラーが伝搬しないのでアプリケーションは終了しないという認識がありました。
しかしSupervisorJob
はあくまでcoroutineのJob
の階層関係上でエラーを伝搬しないだけです。従って上記の二種類のHandlerのどちらかを適宜定義しておかないと、意図した通りに動かない可能性があります。
例えばAndroidアプリではスレッドのデフォルトのUncaughtExceptionHandler
は、開発者が指定しない限りアプリケーションが終了(クラッシュ)するようになっています。一方で通常のKotlinコードを実行すると、ただエラーログを表示するだけとなります。
また、少し話は逸れますが、try-catch
とCoroutineExceptionHandler
のどちらを使用すればいいのか、という疑問があるかもしれません。CoroutineExceptionHandler
でエラーを捕捉したとき、coroutineのJob
は既に終了しており、復帰することはできません。基本的に復帰可能なエラーはtry-catch
を使用して、Structured Concurrencyの考えをもとに実装する際や、ログを出しておきたいときなどは、CoroutineExceptionHandler
を設定する方針が良さそうです。
並行処理のための便利関数
ここまでの説明が少し長くなってしまいましたが、coroutineではStructured Concurrencyを達成するために、coroutineScope()
やsupervisorScope()
のような関数があります。
coroutineScope()
1. エラーを協調したいを覚えていますか?このような例ではcoroutineScope()
を使用することができます。coroutineScope()
は起動した子coroutineが全て終了するまで待ちます。また子coroutineでエラーが発生した場合、他の子coroutineはキャンセルします。
以下のようにコードを記述すると、
- 子処理1と子処理2は並行で実行
- 子処理3は子処理1と子処理2が終わった後に実行
- どの子処理でエラーが発生しても、他の子処理はキャンセルされる
などを達成することができます。
scope.launch {
coroutineScope {
launch {
// 子処理1
}
launch {
// 子処理2
}
}
// 子処理3
}
supervisorScope()
2. エラーを協調したくないを覚えていますか?このような例ではsupervisorScope()
を使用することができます。supervisorScope()
も起動した子coroutineが全て終了するまで待ちます。また子coroutineでエラーが発生した場合でも、他の子coroutineはキャンセルしません。
以下のように記述すると、
- 子処理1と子処理2と子処理3は並行で実行
- どの子処理でエラーが発生しても、他の子処理に影響しない
などを達成することができます。
scope.launch {
supervisorScope {
launch {
// 子処理1
}
launch {
// 子処理2
}
launch {
// 子処理3
}
}
}
まとめ
Structured Concurrencyは理解できましたか?
説明のための基礎的な内容が多かったかもしれませんが、基礎的な内容を理解しておくと、いざ複雑な実装に取り組む際の助けとなります。
そしてStructured Concurrencyをうまく記述できるようになると、比較的簡単にサービスの局所的なパフォーマンスの改善に繋げることができます。もし無駄に直列で実行しているようなボトルネックがあれば、Structured Concurrencyを考慮してみてはどうでしょうか?
以上です〜
関連記事 | Related Posts
We are hiring!
【iOS/Androidエンジニア】モバイルアプリ開発G/東京・大阪
モバイルアプリ開発GについてKINTOテクノロジーズにおける、モバイルアプリ開発のスペシャリストが集まっているグループです。KINTOやmy routeなどのサービスを開発・運用しているグループと協調しながら品質の高いモバイルアプリを開発し、サービスの発展に貢献する事を目標としています。
【バックエンドエンジニア】my route開発G/東京
my route開発グループについてmy route開発グループは、my routeに関わる開発・運用に取り組んでいます。my routeの概要 my routeは、移動需要を創出するために「魅力ある地域情報の発信」、「最適な移動手段の提案」、「交通機関や施設利用のスムーズな予約・決済」をワンストップで提供する、スマートフォン向けマルチモーダルモビリティサービスです。