Java应用中如何优雅的等待所有异步任务执行完毕?

今天在工作群里,领导发了一个代码片段的截图。并且要大家对这种代码进行排查。

这个代码逻辑很明显。通过线程池异步执行子任务,然后主线程阻塞。直到所有子任务执行完毕。

他采取的办法是,主线程在死循环对taskQueue进行isEmpty判断。如果有多个这种代码片段存在,这可能导致CPU飙升。而且子任务执行异常的话,有可能会导致下面的while死循环。

image

通过JUC实现线程同步

juc包下就有很这种场景的并发工具类。不要在自己写while(true)了。

  • CountDownLatch
  • CompletableFuture

2种方式的实现代码都很简单,这里不做过多说明了。

package io.springcloud.test;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MainTest {

	public static void main(String[] args) throws Exception {
		
		// CountDownLatch
		test1();
		
		// CompletableFuture
		test2();
	}

	public static void test2() throws InterruptedException, ExecutionException {

		ExecutorService executor = Executors.newFixedThreadPool(2); // 2个线程

		LinkedList<String> queue = new LinkedList<String>(List.of("1", "2", "3", "4", "5"));

		CompletableFuture<?>[] completableFutureArray = new CompletableFuture<?>[queue.size()];

		for (int i = 0; i < queue.size(); i++) {

			String val = queue.get(i);

			// 使用线程池异步执行任务
			CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
				try {
					Thread.sleep(ThreadLocalRandom.current().nextLong(1000, 2000));
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				log.info("val = {}", val);
			}, executor);

			completableFutureArray[i] = completableFuture;
		}

		CompletableFuture<Void> future = CompletableFuture.allOf(completableFutureArray);

		// 阻塞,直到执行完毕
		future.join();

		log.info("执行完毕");

		executor.shutdown();
	}

	public static void test1() throws InterruptedException {

		LinkedList<String> queue = new LinkedList<String>(List.of("1", "2", "3", "4", "5"));

		CountDownLatch countDownLatch = new CountDownLatch(queue.size());

		for (String val : queue) {
			new Thread(() -> {
				try {
					Thread.sleep(ThreadLocalRandom.current().nextLong(1000, 2000));
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				log.info("val = {}", val);
				countDownLatch.countDown();
			}).start();
		}

		countDownLatch.await();
		log.info("执行完毕");
	}
}

日志输出

17:25:39.367 [Thread-4] INFO io.springcloud.test.MainTest - val = 5
17:25:39.459 [Thread-3] INFO io.springcloud.test.MainTest - val = 4
17:25:39.757 [Thread-0] INFO io.springcloud.test.MainTest - val = 1
17:25:39.785 [Thread-1] INFO io.springcloud.test.MainTest - val = 2
17:25:40.131 [Thread-2] INFO io.springcloud.test.MainTest - val = 3
17:25:40.131 [main] INFO io.springcloud.test.MainTest - 执行完毕
17:25:41.369 [pool-1-thread-1] INFO io.springcloud.test.MainTest - val = 1
17:25:41.389 [pool-1-thread-2] INFO io.springcloud.test.MainTest - val = 2
17:25:42.694 [pool-1-thread-2] INFO io.springcloud.test.MainTest - val = 4
17:25:42.909 [pool-1-thread-1] INFO io.springcloud.test.MainTest - val = 3
17:25:43.781 [pool-1-thread-2] INFO io.springcloud.test.MainTest - val = 5
17:25:43.781 [main] INFO io.springcloud.test.MainTest - 执行完毕
2 Likes

《已收藏》

1 Like

学到了,已保研! :rofl:

1 Like

:cow:

多的不说,少的不唠,还得是k弟

有帮助啊k哥,我最近就在想分布式任务拆分,然后等待任务完成返回,三克油了

牛牛牛 K哥