JavaのFutureの取り回しがダルすぎるので色々工夫してみる
まえがき
街の名はホーチミン1区。 一夜にして崩落・再構成されパフォーマンス向上の租界となったこの都市は、マルチスレッドを臨む境界点で、極度の緊張地帯となる。 ここで世界の均衡を守るため暗躍するJava標準クラスFuture。この物語はこのFutureに挑む開発者の戦いと日常の記録である。 (血界戦線風)
ということで、久々の記事はjavaの非同期処理の返り値ラッパーである、Futureの取扱いについて、僕がよくやる実装Tipsについて書いていきたいと思います。
今回のテーマとしては、
- マルチスレッド、非同期処理でも上手いことエラーハンドリングしたい
- Futureの返り値展開をスッキリさせたい
の2つです。
1. 標準的な非同期処理とFutureの取り回し
Javaでマルチスレッド処理を書く際、多くの人は以下のような処理を書いていると思います。
- Executorsクラスでスレッドプールを作成する
- Callable/Runnableを実装したサブクラスを作成する (もしくはLambda式を使う)
- ExecutorServiceのsubmitで別スレッドに処理を任せる
- 返り値のFutureをListなどに格納する
- 全部を別スレッドに投げ終えたところでlistの中のFutureクラスのgetメソッドで同期を取る
- チェック例外の
InterruptedException
やExecutionException
を処理する
コードにするとこんな感じ。(Exceptionでウケるなとか、parallelStream使うなとかは見逃してください。)
@Slf4j public class Main { public static void main(String[] args) { // If wanna make it parallel with 100 threads. val executor = Executors.newFixedThreadPool(100); val futureList = IntStream.range(0,100) .mapToObj(num -> executor.submit(() -> waitAndSaySomething(num))) .collect(Collectors.toList()); futureList.parallelStream() .map(future -> { try { return future.get(); } catch (Exception e) { log.error("Error happened while extracting the thread result.", e); // Do something for recover. Here, just return null. return null; } }).forEach(System.out::println); executor.shutdown(); } static String waitAndSaySomething(int num) { try { Thread.sleep( num%10 * 1000); } catch (Exception e){ // do nothing since just sample } if (num%2 ==0) throw new RuntimeException("Error if odd"); return num + ": Something!"; } }
わりとよくあるコードだけど、Futureのget部分が冗長で書くのがダルい。
しかも、どのinputがエラー起こしたのかがわからないのでエラー処理がとても難しい。
2. まずエラー処理がちゃんとできるようにしてみる
Javaでマルチスレッドを実装する時、処理にinputを渡す方法は大体次の2つのうちのどちらかになる。
Callable/Runnable
を実装したクラスにプロパティとして持たせて、new
する際にコンストラクタの引数で渡す。- Lambdaの外部に宣言したfinalの変数をLambda式の中から参照する。
ただし、このいずれの場合も、エラーハンドリングは難しい。
1の場合、スレッドインスタンスを保存しておくのはそもそも微妙だし、FutureとThreadインスタンスのマッピングを何処かで管理しなきゃいけない。
2の場合、Futureからは投げた元のthreadの引数を取得できないためどのinputに対して起きたエラーなのか判別がつかない。
そこで、この問題を解決するために、以下の方法を取ることにしました。
Tupleを使ってプロパティとFutureをまとめて管理する
Tuple
は簡単に言うと、複数の値の組のことで、プログラミングでよく使われる概念です。
Javaには標準でTupleが提供されて無いのですが、同様の方法は色々あって、Common Lang
のPairやTripleクラス、reactor
のTupleクラスやJavaTuple
のPairクラスなどを使って実現できます。
(複雑なクラスじゃないので、自分で実装してもいいです。)
Tupleを使って、inputをLeftに、FutureをRightに保存するようにすることで、Error処理で入力元の値を利用したエラー処理ができるようになります。
え?propertyがたくさんある?inputが重すぎてOOM起こしそう??設計を見直さんかい。
さて、今回はほぼすべてのプロジェクトで使われてると思われる、Commons LangのPairを使ってみます。
Tupleを使うと上のmainクラスはこんな感じに書き直せます。
@Slf4j public class Main { public static void main(String[] args) { // If wanna make it parallel with 100 threads. val executor = Executors.newFixedThreadPool(100); val futureList = IntStream.range(0,100) .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num)))) .collect(Collectors.toList()); futureList.parallelStream() .map(future -> { try { return future.getRight().get(); } catch (Exception e) { log.error("Input {} was not processed correctly.", future.getLeft(), e); // Do something for recover. Here, just return null. return String.format("Input %s Failed in process, damn shit!! ", future.getLeft()); } }).forEach(System.out::println); executor.shutdown(); }
これで、入力値を利用したエラー処理とLoggingができるようになりました。
ただ、Futureを展開する箇所があいかわらず冗長で、なんだかイライラします。
そこで、次はこの部分を共通化してみます。
3. Futureの展開部分を共通化する
やりたいことはfutureの展開だけなので、そこだけ切り取れば、共通化は非常に簡単です。
一方で、エラー処理はinputに対して相対的に行いたいので、その部分は柔軟にできるような設計にしたいです。
そこで、エラーハンドリングの部分をExceptionとinputを利用して、任意のFunctionで処理できるように以下のようなFutureの展開クラスを用意します。
@RequiredArgsConstructor public class FutureFlattener<L, R> implements Function<Pair<L, Future<R>>, R> { /** * Callback function to recover when exception such as {@link InterruptedException} or {@link * java.util.concurrent.ExecutionException}. */ private final BiFunction<L, Exception, R> recoveryCallback; @Override public R apply(Pair<L, Future<R>> futurePair) { try { return futurePair.getRight().get(); } catch (Exception e) { return recoveryCallback.apply(futurePair.getLeft(), e); } } }
これを先程のMainクラスに組み込むと以下のようになります。
@Slf4j public class Main { public static void main(String[] args) { // If wanna make it parallel with 100 threads. val executor = Executors.newFixedThreadPool(100); BiFunction<Integer,Exception,String> errorHandler = (in, e) -> { log.error("Input {} was not processed correctly.", in, e); return String.format("Input %s Failed in process, damn shit!! ", in); }; val flattener = new FutureFlattener<Integer, String>(errorHandler); val futureList = IntStream.range(0, 100) .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num)))) .collect(Collectors.toList()); futureList .parallelStream() .map(flattener) .forEach(System.out::println); executor.shutdown(); }
しかし、せっかくFunctionインターフェイスを使ってるのに、内部で関数をプロパティに持つのは、正直めちゃくちゃダサいです。
ダサいのはいけません、クールに決めたい。
ということで、もう少しだけ拡張してみます。
4. JavaのFunctionインターフェイスを継承してエラーハンドリングを追加する
多くの他の言語がモナド型のクラスに、onCatchやthenCatchなど、Exceptionが投げられた時のためのメソッドを用意しています。
しかし、残念なことに、JavaのFunction Interfaceはcompose
, apply
, andThen
の成功を前提としたメソッドチェーンしかできません。
そこで、JavaのFunctionインターフェイスを継承して、onCatchを実装してみます。
public interface CatchableFunction<T, R> extends Function<T, R> { /** * by calling this method in advance of calling {@link Function#apply}, any exception thrown in * the apply method will be handled as defined in the argument onCatch. * * @param onCatch callback method to handle the exception. First Type T is the input of the base * function. * @return fail-safe function with a callback. This method will generate a new Function instance * instead of modifying the existing function instance. */ default Function<T, R> thenCatch(BiFunction<T, Exception, R> onCatch) { return t -> { try { return apply(t); } catch (Exception e) { return onCatch.apply(t, e); } }; } }
Javaの使用上、Type parameterをcatchすることはできないため、Exceptionで受けなくてはいけないのがもどかしいですが、これでかなりfunctionalに書けるようになりました。
このクラスを先程のFutureFlattenerクラスに実装すると以下のようになります。
@RequiredArgsConstructor public class FutureFlattener<L, R> implements CatchableFunction<Pair<L, Future<R>>, R> { @Override public R apply(Pair<L, Future<R>> futurePair) { try { return futurePair.getRight().get(); } catch (InterruptedException | ExecutionException e) { throw new FutureExpandException(e); } } // To be caught in the then catch method. private static class FutureExtractException extends RuntimeException { FutureExpandException(Throwable cause) { super(cause); } }
チェック例外はLamdba式の中で処理しなくては行けないため、FutureExtractException
でラップしてあります。
これでMain
クラスもスッキリします。
@Slf4j public class Main { public static void main(String[] args) { // If wanna make it parallel with 10 threads. val executor = Executors.newFixedThreadPool(100); val flattener = new FutureFlattener<Integer, String>() .thenCatch( (in, e) -> { log.error("Input {} was not processed correctly.", in, e); return String.format("Input %s Failed in process, damn shit!! ", in); }); val futureList = IntStream.range(0, 100) .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num)))) .collect(Collectors.toList()); futureList.parallelStream().map(flattener).forEach(System.out::println); executor.shutdown(); } static String waitAndSaySomething(int num) { try { Thread.sleep(num % 10 * 1000); } catch (Exception e) { // do nothing since just sample } if (num % 2 == 0) { throw new RuntimeException("Error if odd"); } return num + ": Something!"; } }
ネストが減って、関数の宣言もスッキリして、Futureの展開周りのソースもスッキリしました。
終わりに
さて、いかがだったでしょうか?
Functional Javaを使えばもっと楽に実装できたりする箇所はあるのですが、急いでいたため自前で実装してしまいました。
並列処理に関して言えば、最近はkafkaなどのメッセージキューを使って非同期かつ粗結合に作るのが基本ですが、だからといってマルチスレッドを使わないわけではありません。
一方で冗長なFuture展開はネストを増やし可読性を下げるだけでなく、最も気を使うべきエラーハンドリングに気が回らなくなります。
今回僕は上記のような解決方法を取りましたが、いかがだったでしょうか?
もっとええ方法あるで、という方がいらっしゃいましたら、コメント欄にお願いします。
それでは!