【Java】ループ処理を並列化し、ループの要素ごとにエラーハンドリングする

実行環境

ループ処理を並列化し、ループの要素ごとにエラーハンドリングする

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadSample {

    public static void main(String[] args) {
        System.out.println("START");

        // スレッド数
        ExecutorService es = Executors.newFixedThreadPool(10);

        // 並列処理対象
        // 1秒待機しインデックスを返す
        // インデックスが10の倍数なら例外をスローする
        List<CompletableFuture<Integer>> futureList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            int index = i;
            futureList.add(CompletableFuture.supplyAsync(() -> {
                System.out.println("index=" + index + " thread=" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (index != 0 && index % 10 == 0) {
                    throw new RuntimeException(String.valueOf(index));
                }
                return index;
            }, es));
        }

        // 並列実行
        CompletableFuture<Void> cf =
                CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));

        // 全てのスレッドが完了するまで待機
        try {
            cf.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 1件づつハンドリング
        futureList.forEach(future -> {
            try {
                System.out.println("normal-index=" + future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                System.out.println("error-index=" + e.getCause().getMessage());
            }
        });

        System.out.println("END");
    }
}
START
index=8 thread=pool-1-thread-9
index=4 thread=pool-1-thread-5
index=3 thread=pool-1-thread-4
index=7 thread=pool-1-thread-8
index=6 thread=pool-1-thread-7
index=1 thread=pool-1-thread-2
index=9 thread=pool-1-thread-10
index=2 thread=pool-1-thread-3
index=0 thread=pool-1-thread-1
index=5 thread=pool-1-thread-6
index=10 thread=pool-1-thread-9
index=11 thread=pool-1-thread-4
index=12 thread=pool-1-thread-8
index=13 thread=pool-1-thread-3
index=14 thread=pool-1-thread-7
index=15 thread=pool-1-thread-5
index=16 thread=pool-1-thread-10
index=17 thread=pool-1-thread-1
index=18 thread=pool-1-thread-6
index=19 thread=pool-1-thread-2
index=21 thread=pool-1-thread-8
index=20 thread=pool-1-thread-6
index=22 thread=pool-1-thread-1
index=23 thread=pool-1-thread-10
index=25 thread=pool-1-thread-3
index=24 thread=pool-1-thread-5
index=26 thread=pool-1-thread-7
index=27 thread=pool-1-thread-2
index=28 thread=pool-1-thread-4
index=29 thread=pool-1-thread-9
index=31 thread=pool-1-thread-8
index=36 thread=pool-1-thread-6
index=38 thread=pool-1-thread-7
index=39 thread=pool-1-thread-10
index=35 thread=pool-1-thread-4
index=30 thread=pool-1-thread-5
index=34 thread=pool-1-thread-1
index=33 thread=pool-1-thread-9
index=32 thread=pool-1-thread-2
index=37 thread=pool-1-thread-3
index=42 thread=pool-1-thread-3
index=45 thread=pool-1-thread-6
index=47 thread=pool-1-thread-10
index=48 thread=pool-1-thread-7
index=41 thread=pool-1-thread-1
index=44 thread=pool-1-thread-9
index=43 thread=pool-1-thread-2
index=40 thread=pool-1-thread-8
index=49 thread=pool-1-thread-4
index=46 thread=pool-1-thread-5
index=50 thread=pool-1-thread-2
index=57 thread=pool-1-thread-8
index=52 thread=pool-1-thread-3
index=56 thread=pool-1-thread-7
index=55 thread=pool-1-thread-10
index=54 thread=pool-1-thread-9
index=53 thread=pool-1-thread-5
index=51 thread=pool-1-thread-1
index=59 thread=pool-1-thread-4
index=58 thread=pool-1-thread-6
index=60 thread=pool-1-thread-4
index=68 thread=pool-1-thread-6
index=62 thread=pool-1-thread-8
index=65 thread=pool-1-thread-5
index=66 thread=pool-1-thread-9
index=64 thread=pool-1-thread-3
index=67 thread=pool-1-thread-7
index=63 thread=pool-1-thread-1
index=61 thread=pool-1-thread-10
index=69 thread=pool-1-thread-2
index=70 thread=pool-1-thread-7
index=78 thread=pool-1-thread-2
index=79 thread=pool-1-thread-4
index=72 thread=pool-1-thread-6
index=77 thread=pool-1-thread-9
index=76 thread=pool-1-thread-10
index=75 thread=pool-1-thread-3
index=71 thread=pool-1-thread-8
index=73 thread=pool-1-thread-1
index=74 thread=pool-1-thread-5
index=80 thread=pool-1-thread-6
index=87 thread=pool-1-thread-10
index=89 thread=pool-1-thread-9
index=88 thread=pool-1-thread-4
index=85 thread=pool-1-thread-8
index=84 thread=pool-1-thread-2
index=86 thread=pool-1-thread-5
index=83 thread=pool-1-thread-7
index=82 thread=pool-1-thread-1
index=81 thread=pool-1-thread-3
index=91 thread=pool-1-thread-3
index=93 thread=pool-1-thread-6
index=94 thread=pool-1-thread-7
index=96 thread=pool-1-thread-4
index=90 thread=pool-1-thread-1
index=98 thread=pool-1-thread-5
index=92 thread=pool-1-thread-9
index=99 thread=pool-1-thread-8
index=97 thread=pool-1-thread-10
index=95 thread=pool-1-thread-2
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 10
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
    at ThreadSample.main(ThreadSample.java:42)
Caused by: java.lang.RuntimeException: 10
    at ThreadSample.lambda$main$0(ThreadSample.java:30)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
normal-index=0
normal-index=1
normal-index=2
normal-index=3
normal-index=4
normal-index=5
normal-index=6
normal-index=7
normal-index=8
normal-index=9
error-index=10
normal-index=11
normal-index=12
normal-index=13
normal-index=14
normal-index=15
normal-index=16
normal-index=17
normal-index=18
normal-index=19
error-index=20
normal-index=21
normal-index=22
normal-index=23
normal-index=24
normal-index=25
normal-index=26
normal-index=27
normal-index=28
normal-index=29
error-index=30
normal-index=31
normal-index=32
normal-index=33
normal-index=34
normal-index=35
normal-index=36
normal-index=37
normal-index=38
normal-index=39
error-index=40
normal-index=41
normal-index=42
normal-index=43
normal-index=44
normal-index=45
normal-index=46
normal-index=47
normal-index=48
normal-index=49
error-index=50
normal-index=51
normal-index=52
normal-index=53
normal-index=54
normal-index=55
normal-index=56
normal-index=57
normal-index=58
normal-index=59
error-index=60
normal-index=61
normal-index=62
normal-index=63
normal-index=64
normal-index=65
normal-index=66
normal-index=67
normal-index=68
normal-index=69
error-index=70
normal-index=71
normal-index=72
normal-index=73
normal-index=74
normal-index=75
normal-index=76
normal-index=77
normal-index=78
normal-index=79
error-index=80
normal-index=81
normal-index=82
normal-index=83
normal-index=84
normal-index=85
normal-index=86
normal-index=87
normal-index=88
normal-index=89
error-index=90
normal-index=91
normal-index=92
normal-index=93
normal-index=94
normal-index=95
normal-index=96
normal-index=97
normal-index=98
normal-index=99
END

並列化処理をSpring Bootに乗せ換える

Main

package com.example.asyncmethod;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@SpringBootApplication
@EnableAsync
public class AsyncMethodApplication {

    public static void main(String[] args) {
        SpringApplication.run(AsyncMethodApplication.class, args).close();
    }

    // 10スレッドで実行する
    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("Sample-");
        executor.initialize();
        return executor;
    }
}

Service

package com.example.asyncmethod;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class SampleService {

    // 1秒待機しインデックスを返す
    // インデックスが10の倍数なら例外をスローする
    @Async
    public CompletableFuture<Integer> execute(int index) {
        System.out.println("index=" + index + " thread=" + Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (index != 0 && index % 10 == 0) {
            throw new RuntimeException(String.valueOf(index));
        }
        return CompletableFuture.completedFuture(index);
    }
}

Runner

package com.example.asyncmethod;

import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Component
public class SampleRunner implements CommandLineRunner {

    private final SampleService sampleService;

    public SampleRunner(SampleService sampleService) {
        this.sampleService = sampleService;
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("START");

        // 並列処理対象
        List<CompletableFuture<Integer>> futureList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            futureList.add(sampleService.execute(i));
        }

        // 並列実行
        CompletableFuture<Void> cf =
                CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));

        // 全てのスレッドが完了するまで待機
        try {
            cf.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 1件づつハンドリング
        futureList.forEach(future -> {
            try {
                System.out.println("normal-index=" + future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                System.out.println("error-index=" + e.getCause().getMessage());
            }
        });

        System.out.println("END");
    }
}

実行結果

START
index=7 thread=Sample-8
index=2 thread=Sample-3
index=4 thread=Sample-5
index=3 thread=Sample-4
index=8 thread=Sample-9
index=6 thread=Sample-7
index=9 thread=Sample-10
index=0 thread=Sample-1
index=5 thread=Sample-6
index=1 thread=Sample-2
index=13 thread=Sample-1
index=17 thread=Sample-3
index=15 thread=Sample-4
index=16 thread=Sample-8
index=14 thread=Sample-6
index=11 thread=Sample-5
index=10 thread=Sample-10
index=12 thread=Sample-7
index=19 thread=Sample-2
index=18 thread=Sample-9
index=21 thread=Sample-6
index=28 thread=Sample-2
index=27 thread=Sample-9
index=26 thread=Sample-5
index=25 thread=Sample-7
index=24 thread=Sample-3
index=20 thread=Sample-8
index=23 thread=Sample-1
index=22 thread=Sample-4
index=29 thread=Sample-10
index=30 thread=Sample-6
index=38 thread=Sample-8
index=39 thread=Sample-2
index=34 thread=Sample-5
index=37 thread=Sample-3
index=36 thread=Sample-10
index=33 thread=Sample-7
index=35 thread=Sample-4
index=32 thread=Sample-1
index=31 thread=Sample-9
index=40 thread=Sample-4
index=46 thread=Sample-2
index=45 thread=Sample-6
index=48 thread=Sample-3
index=43 thread=Sample-1
index=44 thread=Sample-10
index=41 thread=Sample-7
index=42 thread=Sample-9
index=49 thread=Sample-8
index=47 thread=Sample-5
index=52 thread=Sample-7
index=59 thread=Sample-1
index=58 thread=Sample-8
index=57 thread=Sample-5
index=55 thread=Sample-4
index=50 thread=Sample-2
index=54 thread=Sample-10
index=56 thread=Sample-9
index=51 thread=Sample-6
index=53 thread=Sample-3
index=62 thread=Sample-6
index=68 thread=Sample-2
index=69 thread=Sample-3
index=67 thread=Sample-9
index=66 thread=Sample-10
index=65 thread=Sample-4
index=64 thread=Sample-7
index=63 thread=Sample-8
index=60 thread=Sample-1
index=61 thread=Sample-5
index=70 thread=Sample-9
index=79 thread=Sample-1
index=78 thread=Sample-8
index=76 thread=Sample-4
index=75 thread=Sample-5
index=77 thread=Sample-3
index=74 thread=Sample-10
index=72 thread=Sample-2
index=73 thread=Sample-7
index=71 thread=Sample-6
index=80 thread=Sample-4
index=89 thread=Sample-7
index=88 thread=Sample-1
index=86 thread=Sample-8
index=87 thread=Sample-2
index=85 thread=Sample-9
index=84 thread=Sample-5
index=81 thread=Sample-10
index=82 thread=Sample-3
index=83 thread=Sample-6
index=91 thread=Sample-1
index=99 thread=Sample-6
index=98 thread=Sample-4
index=94 thread=Sample-5
index=93 thread=Sample-8
index=96 thread=Sample-2
index=97 thread=Sample-3
index=90 thread=Sample-9
index=95 thread=Sample-7
index=92 thread=Sample-10
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 10
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
    at com.example.asyncmethod.SampleRunner.run(SampleRunner.java:36)
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:802)
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:786)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:345)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1354)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343)
    at com.example.asyncmethod.AsyncMethodApplication.main(AsyncMethodApplication.java:16)
Caused by: java.lang.RuntimeException: 10
    at com.example.asyncmethod.SampleService.execute(SampleService.java:22)
    at com.example.asyncmethod.SampleService$$FastClassBySpringCGLIB$$1f886f09.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:783)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:753)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
    at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.lambda$doSubmit$3(AsyncExecutionAspectSupport.java:276)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
normal-index=0
normal-index=1
normal-index=2
normal-index=3
normal-index=4
normal-index=5
normal-index=6
normal-index=7
normal-index=8
normal-index=9
error-index=10
normal-index=11
normal-index=12
normal-index=13
normal-index=14
normal-index=15
normal-index=16
normal-index=17
normal-index=18
normal-index=19
error-index=20
normal-index=21
normal-index=22
normal-index=23
normal-index=24
normal-index=25
normal-index=26
normal-index=27
normal-index=28
normal-index=29
error-index=30
normal-index=31
normal-index=32
normal-index=33
normal-index=34
normal-index=35
normal-index=36
normal-index=37
normal-index=38
normal-index=39
error-index=40
normal-index=41
normal-index=42
normal-index=43
normal-index=44
normal-index=45
normal-index=46
normal-index=47
normal-index=48
normal-index=49
error-index=50
normal-index=51
normal-index=52
normal-index=53
normal-index=54
normal-index=55
normal-index=56
normal-index=57
normal-index=58
normal-index=59
error-index=60
normal-index=61
normal-index=62
normal-index=63
normal-index=64
normal-index=65
normal-index=66
normal-index=67
normal-index=68
normal-index=69
error-index=70
normal-index=71
normal-index=72
normal-index=73
normal-index=74
normal-index=75
normal-index=76
normal-index=77
normal-index=78
normal-index=79
error-index=80
normal-index=81
normal-index=82
normal-index=83
normal-index=84
normal-index=85
normal-index=86
normal-index=87
normal-index=88
normal-index=89
error-index=90
normal-index=91
normal-index=92
normal-index=93
normal-index=94
normal-index=95
normal-index=96
normal-index=97
normal-index=98
normal-index=99
END