๋น๋๊ธฐ ํ๋ก๊ทธ๋๋ฐ์ ๋ ์ถ
Java 8 ๋ถํฐ ๋์ ๋ CompletableFuture์ ForkJoinPool์ ๋น๋๊ธฐ ๋ฐ ๋ณ๋ ฌ ํ๋ก๊ทธ๋๋ฐ์์ ์ค์ํ ์ญํ ์ ํฉ๋๋ค. ์ด ๋ ๊ฐ์ง๋ ์๋ก ๋ณด์์ ์ธ ๊ด๊ณ์ ์์ผ๋ฉฐ, ํจ์จ์ ์ธ ๋ฉํฐ์ค๋ ๋ ํ๋ก๊ทธ๋๋ฐ์ ๊ฐ๋ฅํ๊ฒ ํฉ๋๋ค.
์ด๋ฒ ๊ธ์์๋ ์ด๋ค์ด ์ด๋ป๊ฒ ํจ๊ป ์ํธ ๋ณด์์ ์ผ๋ก ๋์ํ๋์ง์ ์ฃผ์ํด์ผ ๋ ์ ์ ๋ํด ์ดํด๋ณด๊ฒ ์ต๋๋ค.
CompletableFuture
CompletableFuture๋ ๊ธฐ์กด์ Future ์ธํฐํ์ด์ค๊ฐ ์ ๊ณตํ์ง ๋ชปํ๋ ๋น๋๊ธฐ ๊ฒฐ๊ณผ ๊ฐ์ ์กฐํฉ๊ณผ ์์ธ ์ฒ๋ฆฌ๋ฅผ ํจ์ฌ ๋ ํจ๊ณผ์ ์ผ๋ก ์ํํ ์ ์๋ ๊ธฐ๋ฅ์ ๊ฐ์ถ ์ธํฐํ์ด์ค์ ๋๋ค. ์ด ์ธํฐํ์ด์ค๋ฅผ ํ์ฉํ๋ฉด ๋๊ธฐ์ ์ธ ์ํ ๋ฐฉ์์ ๋น๋๊ธฐ์ ์ธ ์ํ ๋ฐฉ์์ผ๋ก ๋ณํํ์ฌ ์ฑ๋ฅ์ ๊ฐ์ ํ๊ณ , ๋ ์ ์ฐํ ๋น๋๊ธฐ ์ฒ๋ฆฌ ๋ก์ง์ ๊ตฌํํ ์ ์์ต๋๋ค.
์์
์๋ ์์ ๋ ItemRepository์์ storeName์ ํตํด item์ ์กฐํํ๋ ์์ ์ ๋๋ค.
@Getter
public class Item {
private String name;
private Integer price;
@Builder
public Item(String name, Integer price) {
this.name = name;
this.price = price;
}
}
ItemRepository.getPriceByName(...) ๋ฉ์๋๋ ์์ฒญ์๋ง๋ค 1์ด๊ฐ ๋๊ธฐ ํ ๊ฐ๊ฒ์ ๋ฑ๋ก๋์ด ์๋ Item์ ๊ฐ๊ฒฉ์ ๋ฆฌํดํฉ๋๋ค.
@Component
public class ItemRepository {
private Map<String, Item> storeMap = new ConcurrentHashMap<>();
public ItemRepository() {
storeMap.put("storeA", Item.builder().name("pencil").price(300).build());
storeMap.put("storeB", Item.builder().name("eraser").price(500).build());
storeMap.put("storeC", Item.builder().name("mechanical pencil").price(2000).build());
}
public int getPriceByStoreName(String name) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return storeMap.get(name).getPrice();
}
}
ํด๋น ๋ก์ง์์ ๋๊ธฐ์ ์ผ๋ก ๋ชจ๋ ์ํ์ ์กฐํํ๋ฉด ์๊ฐ์ด ์ผ๋ง๋ ๊ฑธ๋ฆด๊น์?
@Test
public void getPriceAllSync() {
int priceA = itemRepository.findPriceByStoreName("storeA");
int priceB = itemRepository.findPriceByStoreName("storeB");
int priceC = itemRepository.findPriceByStoreName("storeC");
log.info("price = {}", priceA);
log.info("price = {}", priceB);
log.info("price = {}", priceC);
}
findPriceByStoreName(...) ๋ฉ์๋๋ ํธ์ถ๋ ๋ ๋ง๋ค 1์ด ๋์ ๋๊ธฐํ ํ ์์ ์ ์ํํ๊ธฐ ๋๋ฌธ์ ์์ ์๊ฐ์ด 3์ด ์ด์ ๊ฑธ๋ฆฌ๊ฒ ๋ฉ๋๋ค. ์ด๋ฌํ ๋ฐฉ์์ ๋ก์ง์ ์ฌ๋ฌ ๊ฐ์ ์ค๋ ๋๋ก ํ ๋ฒ์ ์์ฒญ์ ํ๋ค๋ฉด ๊ฒฝ๊ณผ ์๊ฐ์ ํฌ๊ฒ ์ค์ผ ์ ์์ต๋๋ค.
๊ทธ๋ ๋ค๋ฉด ํด๋น ๋ก์ง์ ๋น๋๊ธฐ๋ก ์ฒ๋ฆฌํ๋๋ก ์์ฑํ๋ ค๋ฉด ์ด๋ป๊ฒ ํด์ผํ ๊น์?
@Test
public void getPriceAllAsync() {
CompletableFuture<Void> futureA = CompletableFuture.supplyAsync(() -> {
log.info("ThreadName = {}", Thread.currentThread().getName());
return itemRepository.getPriceStoreName("storeA");
})
.thenAccept(price -> log.info("price = {}", price));
CompletableFuture<Void> futureB = CompletableFuture.supplyAsync(() -> {
log.info("ThreadName = {}", Thread.currentThread().getName());
return itemRepository.getPriceStoreName("storeB");
})
.thenAccept(price -> log.info("price = {}", price));
CompletableFuture<Void> futureC = CompletableFuture.supplyAsync(() -> {
log.info("ThreadName = {}", Thread.currentThread().getName());
return itemRepository.getPriceStoreName("storeC");
})
.thenAccept(price -> log.info("price = {}", price));
CompletableFuture.allOf(futureA, futureB, futureC).join();
}
ํ์ฌ ๋ก์ง์์๋ ๊ฐ๋ณ ์์ ์ CompletableFuture.supplyAsync(...)๋ฅผ ์ฌ์ฉํ์ฌ ๋น๋๊ธฐ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ณ ์์ต๋๋ค. ์ด ๋ฐฉ์์ ํ์ฌ ์ค๋ ๋์๋ ๋ณ๊ฐ์ ์ค๋ ๋์์ ํจ์๊ฐ ์คํ๋๋๋ก ํ์ฌ ๋ณ๋ ฌ๋ก ์์ ์ ์ํํ ์ ์์ต๋๋ค.
- theAccept(...): ๊ฒฐ๊ณผ๋ฅผ ์๋นํ๋ ์ฝ๋ฐฑํจ์๋ก, ๋ฆฌํดํ์ ์ด ์์ต๋๋ค. CompletableFuture.supplyAsync(...)์์์ ์์ ์ด ์๋ฃ๋ ํ ๊ฒฐ๊ณผ๋ฅผ ์ฒ๋ฆฌํ๋ ์์ ์ ์ง์ ํ ๋ ์ฌ์ฉ๋ฉ๋๋ค. ๋ง์ฝ ๋ฆฌํด ํ์ ์ด ์๋ ์ฝ๋ฐฑ ํจ์๋ฅผ ์ฌ์ฉํ๊ณ ์ถ๋ค๋ฉด, thenApply(...) ๋ฉ์๋๋ฅผ ์ฌ์ฉํด์ผ ํฉ๋๋ค.
- allOf(...): ์ฌ๋ฌ CompletableFuture๋ฅผ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌํ ์ ์๋๋ก ํด์ฃผ๋ ๋ฉ์๋์ ๋๋ค. ๋ชจ๋ CompletableFuture์ ์ฒ๋ฆฌ๊ฐ ์๋ฃ๋ ๋๊น์ง ๋๊ธฐํ๋ฉฐ, ์๋ฃ๋ ํ CompletableFuture<Void>๋ฅผ ๋ฐํํฉ๋๋ค. ๊ทธ๋ฌ๋ allOf ๋ฉ์๋๋ ๋ชจ๋ Future์ ๊ฒฐ๊ณผ๋ฅผ ๊ฒฐํฉํ ๊ฐ์ ๋ฐํํ์ง ์์ต๋๋ค.
๋ฐ๋ผ์ ๊ฐ๋ณ CompletableFuture์ ๊ฒฐ๊ณผ๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด join() ๋ฉ์๋๋ฅผ ํ์ฉํด์ผ ํฉ๋๋ค. join()์ ํธ์ถํ์ฌ allOf()์ ์์ ์ด ์๋ฃ๋ ๋๊น์ง ๋ธ๋กํนํ์ฌ ๋๊ธฐํ ์ ์์ต๋๋ค.
์คํ ๊ฒฐ๊ณผ ๋ถ์
์ผ๋จ ๋น๋๊ธฐ๋ก ์คํํ ์ฝ๋๊ฐ ์ด์ ๋๊ธฐ์ ์ผ๋ก ์คํํ ์ฝ๋๋ณด๋ค ๋น ๋ฅธ ์ํ ์๊ฐ์ ๋ณด์ธ๋ค๋ ๊ฒ์ ํ์ธํ ์ ์์ต๋๋ค.
์์ ์คํ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด, ํด๋น CompletableFuture๊ฐ ์คํ๋ ์ค๋ ๋๊ฐ ForkJoinPool.commonPool๋ก ํ์๋์ด ์์ต๋๋ค. ์ฌ๊ธฐ์ ForkJoinPool.commonPool์ ๋ฌด์์ด๊ณ ์ ๋น๋๊ธฐ ๋ก์ง์ด ํด๋น ์ค๋ ๋์์ ๋์ํ๋ ๊ฒ์ผ๊น์?
ForkJoinPool
ForkJoinPool์ ์๋ฐ์์ ์ ๊ณตํ๋ ๋น๋๊ธฐ ํ๋ ์์ํฌ๋ก, ๋ถํ -์ ๋ณต ์๊ณ ๋ฆฌ์ฆ์ ํ์ฉํ์ฌ ์์ ์ ํจ์จ์ ์ผ๋ก ๋ถ์ฐํ๊ณ ๋ณ๋ ฌ๋ก ์คํํ๋ ์ค๋ ๋ ํ์ ๋๋ค. ์ด ์ค๋ ๋ ํ์ ํฐ ์์ ์ ์ฌ๋ฌ ์์ Sub Task๋ก ๋ถ๋ฆฌ(Fork)ํ๊ณ , ๊ฐ๊ฐ์ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌํ ํ ๊ทธ ๊ฒฐ๊ณผ๋ฅผ ๋ค์ ํฉ์ณ(Join) ์ต์ข ๊ฒฐ๊ณผ๋ฅผ ์์ฑํ๋ ๋ฐฉ์์ผ๋ก ๋์ํฉ๋๋ค.
๋์ ๋ฐฉ์
ForkJoinPool์ ๋ค๋ฅธ ์ข ๋ฅ์ ExecutorService์๋ ๋ค๋ฅธ Work-Stealing ๋งค์ปค๋์ฆ์ ์ฌ์ฉํฉ๋๋ค.
- submit()์ ํตํด task๋ฅผ ๋ณด๋ธ๋ค.
- ์ธ๋ฐ์ด๋ ํ์ task๊ฐ ๋ค์ด๊ฐ๊ณ , A์ B ์ค๋ ๋๊ฐ task๋ฅผ ์ฒ๋ฆฌํ๋ค.
- A์ B๋ ๊ฐ์ ํ๊ฐ ์์ผ๋ฉฐ, ์์ ๊ทธ๋ฆผ์ B์ฒ๋ผ ํ์ task๊ฐ ์์ผ๋ฉด A์ task๋ฅผ stealํ์ฌ ์ฒ๋ฆฌํ๋ค.
Work-Stealing ๋งค์ปค๋์ฆ์ ์ฌ์ฉํ๊ธฐ ๋๋ฌธ์ CPU ์์์ด ๋์ง ์๊ณ ์ต์ ์ ์ฑ๋ฅ์ ๋ผ ์ ์๊ฒ ํด ์ค๋๋ค.
ForkJoinPool๊ณผ CompletableFuture์ ์ฐ๊ณ
Java 8์์๋ถํฐ CompletableFuture๋ ForkJoinPool์ ์ฌ์ฉํ์ฌ ๋น๋๊ธฐ ์์ ์ ์ฒ๋ฆฌํฉ๋๋ค. ๊ธฐ๋ณธ์ ์ผ๋ก CompletableFuture๋ ForkJoinPool.commonPool()์ ์ฌ์ฉํ์ฌ ์ค๋ ๋ ํ์ ๊ฐ์ ธ์ค๊ณ ์ฌ์ฉํฉ๋๋ค.
ForkJoinPool.commonPool() ๋ํดํธ ์ค๋ ๋ ๊ฐ์ = ์ฝ์ด ๊ฐ์ * 2 - 1
ForkJoinPool.commonPool()์ ๊ธฐ๋ณธ์ ์ผ๋ก ์ฌ์ฉ ๊ฐ๋ฅํ ํ๋ก์ธ์์ ์์ ๋ฐ๋ผ ๋์ ์ผ๋ก ํฌ๊ธฐ๊ฐ ์กฐ์ ๋๋ ๊ณต์ฉ ForkJoinPool์ ๋ฐํํฉ๋๋ค. ๋ฐ๋ผ์ CompletableFuture๋ฅผ ์ฌ์ฉํ ๋๋ ๋ณ๋์ ์ค๋ ๋ ํ์ ์์ฑํ๊ฑฐ๋ ๊ตฌ์ฑํ ํ์๊ฐ ์์ผ๋ฉฐ, ๊ธฐ๋ณธ์ ์ผ๋ก ์์คํ ๋ฆฌ์์ค๋ฅผ ์ ํ์ฉํ ์ ์์ต๋๋ค.
ForkJoinPool ์ฌ์ฉ ์ ์ฃผ์์ฌํญ
CompletableFuture์ parallelStream์ ๋ด๋ถ์ ์ผ๋ก ForkJoinPool.commonPool()์ ์ฌ์ฉํ์ฌ ์์ ์ ๋ณ๋ ฌํ ์ํต๋๋ค. ์ด ๋ง์ commonPool์ ๋ณ๋์ ์ค๋ ๋ ํ๋ก ์์ฑํ๋ ๊ฒ์ด ์๋๋ผ๋ ๋ง๊ณผ ๊ฐ์ต๋๋ค. ์ฆ, ๋ณ๋์ ์ค์ ์ด ์๋ค๋ฉด ํ๋์ ์ค๋ ๋ ํ์ ๋ชจ๋ CompletableFuture๊ณผ parallelStream์ด ๊ณต์ ํ๊ฒ ๋๊ณ , ์ค๋ ๋ ํ์ ์ฌ์ฉํ๋ ๋ค๋ฅธ ์ค๋ ๋์ ์ํฅ์ ์ค ์ ์์ผ๋ฉฐ, ๋ฐ๋๋ก ์ํฅ์ ๋ฐ์ ์๋ ์์ต๋๋ค.
์๋์ ๊ฐ์ ์ฝ๋๋ฅผ ๋์์ ์์ฒญํ์ฌ ์คํํ๋ค๊ณ ๊ฐ์ ํด๋ด ์๋ค.
public void getPriceAllAsync() {
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> {
log.info("Thread Name = {}", Thread.currentThread().getName());
return itemRepository.getPriceByStoreName("storeA");
});
CompletableFuture<Integer> futureB = CompletableFuture.supplyAsync(() -> {
log.info("Thread Name = {}", Thread.currentThread().getName());
return itemRepository.getPriceByStoreName("storeB");
});
CompletableFuture<Integer> futureC = CompletableFuture.supplyAsync(() -> {
log.info("Thread Name = {}", Thread.currentThread().getName());
return itemRepository.getPriceByStoreName("storeC");
});
log.info("price = {}", futureA.join());
log.info("price = {}", futureB.join());
log.info("price = {}", futureC.join());
}
์ฒซ ๋ฒ์งธ ์์ฒญ์ด ํด๋น ์ฝ๋๋ฅผ ์คํํ์์ ๋ ์ค๋ ๋ ํ์ ์๋์ ๊ฐ์ ์ํ๊ฐ ๋ฉ๋๋ค.
์ฒซ ๋ฒ์งธ ์์ฒญ์ ์ฒ๋ฆฌํ๋ ๋์ ์ค๋ ๋๋ฅผ ์ ์ ํ๊ฒ ๋๋ฉด, ๋ค์ ์์ฒญ์ด ๋์ฐฉํ ๋ ์ค๋ ๋๋ฅผ ๋ฐํํ์ง ์๊ณ ๊ณ์ ์ ์ ํ๊ฒ ๋ ์ ์์ต๋๋ค. ์ด๋ก ์ธํด Thread 1, 2, 3์ ์ฌ์ฉํ ์ ์๊ฒ ๋๊ณ , ๊ฒฐ๊ตญ Thread 4 ํ๋๋ง ๋จ์ ๋ชจ๋ ์์ฒญ์ ์ฒ๋ฆฌํด์ผ ํ ์ ์์ต๋๋ค.
ํนํ ์ค๋ ๋๊ฐ sleep๊ณผ ๊ฐ์ด ์๋ฌด ์์ ๋ ํ์ง ์๊ณ ์ ์ ์ค์ด๋ฉด ์ฌ๊ฐํ ๋ฌธ์ ๊ฐ ๋ฐ์ํ ์ ์์ต๋๋ค.
์ ๊ณผ์ ์์ ๋ ์ฌ๊ฐํ ์ํฉ์ Thread 4๊น์ง ์ ์ ํ ๊ฒฝ์ฐ์ ๋๋ค. ์ด ๊ฒฝ์ฐ ์ถ๊ฐ ์์ฒญ์ ์ฒ๋ฆฌ๋์ง ์๊ณ Thread Pool Queue์ ์์ด๊ฒ ๋๋ฉฐ, ์ผ์ ์๊ฐ์ด ์ง๋๋ฉด ์์ฒญ์ด ์์ค๋ ์ ์์ต๋๋ค.
ForkJoinPool.commPool()์ ์ ์ญ์ ์ผ๋ก ๊ณต์ ๋๋ ์ค๋ ๋ ํ์ ์ฌ์ฉํฉ๋๋ค. ์ด ๋๋ฌธ์ 4๊ฐ์ ์ค๋ ๋๊ฐ ๋ชจ๋ ์ฌ์ฉ ์ค์ด๋ผ๋ฉด, ๋ค๋ฅธ ์์ ๋ค์ ๋๊ธฐ ์ํ์ ๋ค์ด๊ฐ๊ฒ ๋ฉ๋๋ค. ํนํ Blocking I/O๊ฐ ๋ฐ์ํ๋ ์์ ์ด ์์ ๊ฒฝ์ฐ, ์ค๋ ๋ ํ ๋ด๋ถ์ ์ค๋ ๋๋ค์ด ๋ธ๋ก ๋์ด ๋ค๋ฅธ ์์ฒญ๋ค์ด ์ค๋ ๋๋ฅผ ์ป๊ธฐ ์ ๊น์ง ๊ธฐ๋ค๋ ค์ผ ํฉ๋๋ค. ์ด๋ก ์ธํด ์ฑ๋ฅ ์ ํ์ ๊ฐ์ ๋ฌธ์ ๊ฐ ๋ฐ์ํ ์ ์์ต๋๋ค.
public void getPriceAllAsync() {
ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> {
log.info("Thread Name = {}", Thread.currentThread().getName());
return itemRepository.getPriceByStoreName("storeA");
}, pool);
CompletableFuture<Integer> futureB = CompletableFuture.supplyAsync(() -> {
log.info("Thread Name = {}", Thread.currentThread().getName());
return itemRepository.getPriceByStoreName("storeB");
}, pool);
CompletableFuture<Integer> futurec = CompletableFuture.supplyAsync(() -> {
log.info("Thread Name = {}", Thread.currentThread().getName());
return itemRepository.getPriceByStoreName("storeC");
}, pool);
log.info("price = {}", futureA.join());
log.info("price = {}", futureB.join());
log.info("price = {}", futureC.join());
pool.shutdown();
}
์ด ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด, ๊ฐ CompletableFuture๋ parallelStream์ ๋ํด ๋ ๋ฆฝ์ ์ธ ์ค๋ ๋ ํ์ ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ด ์์ต๋๋ค. ์ด๋ ๊ฒ ํ๋ฉด ์ค๋ ๋ ํ์ด ๋ ๋ฆฝ์ ์ผ๋ก ๊ด๋ฆฌ๋๋ฏ๋ก, ํ ์์ ์ด ์ค๋ ๋๋ฅผ ์ฐจ์งํ๊ณ ์์ด๋ ๋ค๋ฅธ ์์ ์ ์ํฅ์ ๋ฐ์ง ์๊ณ ์ํํ๊ฒ ์คํํ ์ ์์ต๋๋ค.
CompletableFuture์ parallelStream
Java์์ ๋ณ๋ ฌ์ฒ๋ฆฌ๋ฅผ ํ๋ ๋ฐฉ๋ฒ์ CompletableFuture๋ง ์๋ ๊ฒ์ด ์๋ parallelStream๋ ์กด์ฌํฉ๋๋ค. ์ด ๋์ ForkJoinPool.commonPool()์ ๊ณต์ ํ๊ณ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ์ง์ํ์ง๋ง, ๊ฐ๊ฐ์ ํน์ฑ๊ณผ ์ ํฉํ ์ฌ์ฉ ์ํฉ์ด ๋ค๋ฆ ๋๋ค.
parallelStream
parallelStream์ Java 8๋ถํฐ ์ถ๊ฐ๋ ์คํธ๋ฆผ API์ ํ์ฅ์ผ๋ก ์คํธ๋ฆผ์ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌํ ์ ์๋๋ก ํฉ๋๋ค. ์ด๋ฅผ ํตํด ๋ฐ์ดํฐ ์งํฉ์ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌํ์ฌ ์ฑ๋ฅ์ ํฅ์ ์ํฌ ์ ์์ต๋๋ค.
์๋๋ ์ด์ ์์ ๋ฅผ parallelStream์ ํตํด ์ฒ๋ฆฌํ๋ ์์ ์ ๋๋ค.
@Test
public void getPriceAllAsyncWithParallelStream() {
List<String> stores = List.of("storeA", "storeB", "storeC");
stores.parallelStream()
.forEach(store -> {
log.info("Thread Name = {}", Thread.currentThread().getName());
log.info("price = {}", itemRepository.getPriceByStore(store));
});
}
์์ ์ฝ๋๋ฅผ ์คํํด๋ณด๋ฉด ์ด์ CompletableFuture๋ฅผ ์ฌ์ฉํ ๋ฐฉ์๊ณผ ๋ค๋ฅธ ์ ์์ด ๋์ผํ ๊ฒฐ๊ณผ์ ์ํ ์๊ฐ์ด ๋ํ๋ฉ๋๋ค. ๊ทธ๋ ๋ค๋ฉด CompletableFuture์ parallelStream ์ฌ์ด์๋ ์ด๋ค ์ฐจ์ด๊ฐ ์๋ ๊ฑธ๊น์?
parallelStream๋ Stream์ด๋ค
์๋ ์์ ๋ฅผ ํตํด ์ด ๋ ๋ฐฉ์์ ์ฐจ์ด๋ฅผ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
@Test
public void getPriceAllAsyncWithParallelStream() throws Exception {
List<String> stores = List.of("storeA", "storeB", "storeC");
List<Integer> prices = stores.parallelStream()
.map(store -> itemRepository.getPriceByStoreName(store))
.toList();
Thread.sleep(1000);
prices.forEach(price -> log.info("price = {}", price));
}
์ด ์ฝ๋๋ parallelStream์ ํตํ ๊ฐ๊ฒฉ ์กฐํ ๋ถ๋ถ๊ณผ ํด๋น ๊ฐ๊ฒฉ์ ์ถ๋ ฅํ๋ ์ฝ๋ ์ฌ์ด์ ์ด๋ ํ ์์ ์ด ์๋ค๊ณ ๊ฐ์ ํ๊ณ ์์ฑํ ์ฝ๋์ ๋๋ค.
ํด๋น ์ฝ๋๋ฅผ ์ํํ๋ค๋ฉด ์๋์ ๊ฐ์ ๊ฒฐ๊ณผ๋ฅผ ์ป์ ์ ์์ต๋๋ค.
CompletableFuture๋ฅผ ์ฌ์ฉํ ์ฝ๋๋ ๊ฐ์ ์ํฉ์ ๊ฐ์ ํ์ฌ ์์ฑํ์์ต๋๋ค.
@Test
public void getPriceAllAsyncWithCompletableFuture() throws Exception {
List<String> stores = List.of("storeA", "storeB", "storeC");
List<CompletableFuture<Integer> futures = stores.stream()
.map(store -> CompletableFuture.supplyAsync(() -> itemRepository.getPriceByStoreName(store)))
.toList();
Thread.sleep(1000);
futures.stream()
.map(CompletableFuture::join)
.forEach(price -> log.info("price = {}", price));
}
ํด๋น ์ฝ๋๋ฅผ ์คํ์์ผ๋ณด๋ฉด ์ด์ parallelStream์ ์ฌ์ฉํ ์ฝ๋๋ณด๋ค ๋น ๋ฅธ ์ํ์๊ฐ์ ๋ณด์ฌ์ค๋๋ค.
์ด๋ฐ ์ฐจ์ด๊ฐ ๋๋ ์ด์ ๋ parallelStream์ ๋ณ๋ ฌ๋ก ์คํ๋๊ธด ํ์ง๋ง, ์คํธ๋ฆผ์ ๋ชจ๋ ์์๊ฐ ์ฒ๋ฆฌ๋ ๋๊น์ง ๊ธฐ๋ค๋ ค์ผ ํฉ๋๋ค. ์ฆ, ๋ชจ๋ ์์๊ฐ ์๋ฃ๋ ๋๊น์ง ๋ธ๋ก๋ฉ๋๋ค. ๋ฐ๋ฉด, CompletableFuture๋ I/O์ ๊ฐ์ ์๊ฐ์ด ๊ฑธ๋ฆฌ๋ ์์ ์ ๋น๋๊ธฐ์ ์ผ๋ก ์์ฒญํ ํ, ๋ค๋ฅธ ์์ ์ ์ํํ๋ค๊ฐ ํ์ํ ์์ ์ ๊ฒฐ๊ณผ๋ฅผ ์กฐํํ ์ ์์ต๋๋ค.
์ด๋ฌํ ์ด์ ๋ก parallelStream์ ๋ฐ์ดํฐ์ ์ปฌ๋ ์ ์ฒ๋ฆฌ, ํํฐ๋ง, ๋ณํ๊ณผ ๊ฐ์ ๋ณ๋ ฌ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์์ ์ ์ ํฉํ๊ณ , CompletableFuture๋ ๋น๋๊ธฐ ์์ , ๋คํธ์ํฌ ํธ์ถ๊ณผ ๊ฐ์ ๋น๋๊ธฐ ์์ ์ ์กฐํฉ์ด ํ์ํ ๋ ์ ํฉํ๋ค๋ ๊ฒ์ ์ ์ ์์ต๋๋ค.
๊ฒฐ๋ก
- ForkJoinPool์ Work-Stealing ๋งค์ปค๋์ฆ์ ์ฌ์ฉํ๋ ์ค๋ ๋ ํ๋ก CPU ์ง์ฝ์ ์ธ ์์ ์ ํ ๋ ์ต์ ์ ์ฑ๋ฅ์ ๋ผ ์ ์์ต๋๋ค.
- ์ด๋ CompletableFuture์ parallelStream์ ForkJoinPool.commonPool()๋ก ์ ์ญ์ ์ผ๋ก ์ ๊ณต๋์ด ๋ฌธ์ ๋ฅผ ๋ฐ์์ํฌ ์ ์์ด ์์ฒญ๋ง๋ค ์ค๋ ๋ ํ์ ๋ง๋ค์ด ์ฌ์ฉํ๋ ๋ฐฉ์์ ์ฌ์ฉํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
- CompletableFuture์ parallelStream์ ์์ ์ ๋ณ๋ ฌ๋ก ์คํ ์ํฌ์ ์์ง๋ง, CompletableFuture๋ I/O ๋ฐ์ด๋ ์์ ์ parallelStream์ CPU ๋ฐ์ด๋ ์์ ์ ์ฌ์ฉํ๋ ๊ฒ์ด ํจ์จ์ ์ด๊ธฐ์ ์์ ์ ํน์ฑ๊ณผ ์๊ตฌ์ฌํญ์ ๋ฐ๋ผ ์ ์ ํ ์ ํํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
'Java' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
ConcurrentHashMap์ ์ด๋ป๊ฒ ๋์์ฑ์ ๋ณด์ฅํ ๊น? (0) | 2024.10.28 |
---|