JavaのFutureの取り回しがダルすぎるので色々工夫してみる

まえがき

街の名はホーチミン1区。
一夜にして崩落・再構成されパフォーマンス向上の租界となったこの都市は、マルチスレッドを臨む境界点で、極度の緊張地帯となる。  
ここで世界の均衡を守るため暗躍するJava標準クラスFuture。この物語はこのFutureに挑む開発者の戦いと日常の記録である。  
(血界戦線風)  

ということで、久々の記事はjavaの非同期処理の返り値ラッパーである、Futureの取扱いについて、僕がよくやる実装Tipsについて書いていきたいと思います。

今回のテーマとしては、

  • マルチスレッド、非同期処理でも上手いことエラーハンドリングしたい
  • Futureの返り値展開をスッキリさせたい

の2つです。

1. 標準的な非同期処理とFutureの取り回し

Javaでマルチスレッド処理を書く際、多くの人は以下のような処理を書いていると思います。

  1. Executorsクラスでスレッドプールを作成する
  2. Callable/Runnableを実装したサブクラスを作成する (もしくはLambda式を使う)
  3. ExecutorServiceのsubmitで別スレッドに処理を任せる
  4. 返り値のFutureをListなどに格納する
  5. 全部を別スレッドに投げ終えたところでlistの中のFutureクラスのgetメソッドで同期を取る
  6. チェック例外のInterruptedExceptionExecutionExceptionを処理する

コードにするとこんな感じ。(Exceptionでウケるなとか、parallelStream使うなとかは見逃してください。)

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 100 threads.

    val executor = Executors.newFixedThreadPool(100);

    val futureList = IntStream.range(0,100)
        .mapToObj(num -> executor.submit(() -> waitAndSaySomething(num)))
        .collect(Collectors.toList());
   
    futureList.parallelStream()
        .map(future -> {
          try {
            return future.get();
          } catch (Exception e) {
            log.error("Error happened while extracting the thread result.", e);
            // Do something for recover. Here, just return null.
            return null;
          }
        }).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep( num%10 * 1000);
    } catch (Exception e){
      // do nothing since just sample
    }
    if (num%2 ==0)
      throw new RuntimeException("Error if odd");
   return num + ": Something!";
  }
}

わりとよくあるコードだけど、Futureのget部分が冗長で書くのがダルい。
しかも、どのinputがエラー起こしたのかがわからないのでエラー処理がとても難しい。

2. まずエラー処理がちゃんとできるようにしてみる

Javaでマルチスレッドを実装する時、処理にinputを渡す方法は大体次の2つのうちのどちらかになる。

  1. Callable/Runnableを実装したクラスにプロパティとして持たせて、newする際にコンストラクタの引数で渡す。
  2. Lambdaの外部に宣言したfinalの変数をLambda式の中から参照する。

ただし、このいずれの場合も、エラーハンドリングは難しい。

1の場合、スレッドインスタンスを保存しておくのはそもそも微妙だし、FutureとThreadインスタンスのマッピングを何処かで管理しなきゃいけない。

2の場合、Futureからは投げた元のthreadの引数を取得できないためどのinputに対して起きたエラーなのか判別がつかない。

そこで、この問題を解決するために、以下の方法を取ることにしました。

Tupleを使ってプロパティとFutureをまとめて管理する

Tupleは簡単に言うと、複数の値の組のことで、プログラミングでよく使われる概念です。

Javaには標準でTupleが提供されて無いのですが、同様の方法は色々あって、Common LangPairTripleクラス、reactorTupleクラスやJavaTuplePairクラスなどを使って実現できます。

(複雑なクラスじゃないので、自分で実装してもいいです。)

Tupleを使って、inputをLeftに、FutureをRightに保存するようにすることで、Error処理で入力元の値を利用したエラー処理ができるようになります。

え?propertyがたくさんある?inputが重すぎてOOM起こしそう??設計を見直さんかい。

さて、今回はほぼすべてのプロジェクトで使われてると思われる、Commons LangのPairを使ってみます。
Tupleを使うと上のmainクラスはこんな感じに書き直せます。

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 100 threads.
    val executor = Executors.newFixedThreadPool(100);
 
   val futureList = IntStream.range(0,100)
        .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
        .collect(Collectors.toList());

    futureList.parallelStream()
        .map(future -> {
          try {
            return future.getRight().get();
          } catch (Exception e) {
            log.error("Input {} was not processed correctly.", future.getLeft(), e);
            // Do something for recover. Here, just return null.
            return String.format("Input %s Failed in process, damn shit!! ", future.getLeft());
          }
        }).forEach(System.out::println);

    executor.shutdown();
  }

これで、入力値を利用したエラー処理とLoggingができるようになりました。

ただ、Futureを展開する箇所があいかわらず冗長で、なんだかイライラします。

そこで、次はこの部分を共通化してみます。

3. Futureの展開部分を共通化する

やりたいことはfutureの展開だけなので、そこだけ切り取れば、共通化は非常に簡単です。

一方で、エラー処理はinputに対して相対的に行いたいので、その部分は柔軟にできるような設計にしたいです。

そこで、エラーハンドリングの部分をExceptionとinputを利用して、任意のFunctionで処理できるように以下のようなFutureの展開クラスを用意します。

@RequiredArgsConstructor
public class FutureFlattener<L, R> implements Function<Pair<L, Future<R>>, R> {
  /**
   * Callback function to recover when exception such as {@link InterruptedException} or {@link
   * java.util.concurrent.ExecutionException}.
   */
  private final BiFunction<L, Exception, R> recoveryCallback;

  @Override
  public R apply(Pair<L, Future<R>> futurePair) {
    try {
      return futurePair.getRight().get();
    } catch (Exception e) {
      return recoveryCallback.apply(futurePair.getLeft(), e);
    }
  }
}

これを先程のMainクラスに組み込むと以下のようになります。

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 100 threads.
    val executor = Executors.newFixedThreadPool(100);

    BiFunction<Integer,Exception,String> errorHandler =
        (in, e) -> {
          log.error("Input {} was not processed correctly.", in, e);
          return String.format("Input %s Failed in process, damn shit!! ", in);
        };
    val flattener = new FutureFlattener<Integer, String>(errorHandler);

    val futureList =
        IntStream.range(0, 100)
            .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
            .collect(Collectors.toList());

    futureList
        .parallelStream()
        .map(flattener)
        .forEach(System.out::println);
    executor.shutdown();
  }

しかし、せっかくFunctionインターフェイスを使ってるのに、内部で関数をプロパティに持つのは、正直めちゃくちゃダサいです。

ダサいのはいけません、クールに決めたい。

ということで、もう少しだけ拡張してみます。

4. JavaのFunctionインターフェイスを継承してエラーハンドリングを追加する

多くの他の言語がモナド型のクラスに、onCatchやthenCatchなど、Exceptionが投げられた時のためのメソッドを用意しています。

しかし、残念なことに、JavaのFunction Interfaceはcompose, apply, andThenの成功を前提としたメソッドチェーンしかできません。

そこで、JavaのFunctionインターフェイスを継承して、onCatchを実装してみます。

public interface CatchableFunction<T, R> extends Function<T, R> {

  /**
   * by calling this method in advance of calling {@link Function#apply}, any exception thrown in
   * the apply method will be handled as defined in the argument onCatch.
   *
   * @param onCatch callback method to handle the exception. First Type T is the input of the base
   *     function.
   * @return fail-safe function with a callback. This method will generate a new Function instance
   *     instead of modifying the existing function instance.
   */
  default Function<T, R> thenCatch(BiFunction<T, Exception, R> onCatch) {
    return t -> {
      try {
        return apply(t);
      } catch (Exception e) {
        return onCatch.apply(t, e);
      }
    };
  }
}

Javaの使用上、Type parameterをcatchすることはできないため、Exceptionで受けなくてはいけないのがもどかしいですが、これでかなりfunctionalに書けるようになりました。

このクラスを先程のFutureFlattenerクラスに実装すると以下のようになります。


@RequiredArgsConstructor
public class FutureFlattener<L, R> implements CatchableFunction<Pair<L, Future<R>>, R> {

  @Override
  public R apply(Pair<L, Future<R>> futurePair) {
    try {
      return futurePair.getRight().get();
    } catch (InterruptedException | ExecutionException e) {
      throw new FutureExpandException(e);
    }
  }

  // To be caught in the then catch method.
  private static class FutureExtractException extends RuntimeException {
    FutureExpandException(Throwable cause) {
      super(cause);
    }
  }

チェック例外はLamdba式の中で処理しなくては行けないため、FutureExtractExceptionでラップしてあります。

これでMainクラスもスッキリします。

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);

    val flattener = new FutureFlattener<Integer, String>()
            .thenCatch(
                (in, e) -> {
                  log.error("Input {} was not processed correctly.", in, e);
                  return String.format("Input %s Failed in process, damn shit!! ", in);
                });

    val futureList = IntStream.range(0, 100)
            .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
            .collect(Collectors.toList());
    futureList.parallelStream().map(flattener).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep(num % 10 * 1000);
    } catch (Exception e) {
      // do nothing since just sample
    }
    if (num % 2 == 0) {
      throw new RuntimeException("Error if odd");
    }
    return num + ": Something!";
  }
}

ネストが減って、関数の宣言もスッキリして、Futureの展開周りのソースもスッキリしました。

終わりに

さて、いかがだったでしょうか?

Functional Javaを使えばもっと楽に実装できたりする箇所はあるのですが、急いでいたため自前で実装してしまいました。

並列処理に関して言えば、最近はkafkaなどのメッセージキューを使って非同期かつ粗結合に作るのが基本ですが、だからといってマルチスレッドを使わないわけではありません。

一方で冗長なFuture展開はネストを増やし可読性を下げるだけでなく、最も気を使うべきエラーハンドリングに気が回らなくなります。

今回僕は上記のような解決方法を取りましたが、いかがだったでしょうか?

もっとええ方法あるで、という方がいらっしゃいましたら、コメント欄にお願いします。

それでは!