CompletableFuture์™€ ForkJoinPool

2024. 8. 8. 23:25ยทJava

๋น„๋™๊ธฐ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์˜ ๋‘ ์ถ•

Java 8 ๋ถ€ํ„ฐ ๋„์ž…๋œ CompletableFuture์™€ ForkJoinPool์€ ๋น„๋™๊ธฐ ๋ฐ ๋ณ‘๋ ฌ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์—์„œ ์ค‘์š”ํ•œ ์—ญํ• ์„ ํ•ฉ๋‹ˆ๋‹ค. ์ด ๋‘ ๊ฐ€์ง€๋Š” ์„œ๋กœ ๋ณด์™„์ ์ธ ๊ด€๊ณ„์— ์žˆ์œผ๋ฉฐ, ํšจ์œจ์ ์ธ ๋ฉ€ํ‹ฐ์Šค๋ ˆ๋“œ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•ฉ๋‹ˆ๋‹ค. 

 

์ด๋ฒˆ ๊ธ€์—์„œ๋Š” ์ด๋“ค์ด ์–ด๋–ป๊ฒŒ ํ•จ๊ป˜ ์ƒํ˜ธ ๋ณด์™„์ ์œผ๋กœ ๋™์ž‘ํ•˜๋Š”์ง€์™€ ์ฃผ์˜ํ•ด์•ผ ๋  ์ ์— ๋Œ€ํ•ด ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค. 

 

CompletableFuture

CompletableFuture๋Š” ๊ธฐ์กด์˜ Future ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ์ œ๊ณตํ•˜์ง€ ๋ชปํ–ˆ๋˜ ๋น„๋™๊ธฐ ๊ฒฐ๊ณผ ๊ฐ’์˜ ์กฐํ•ฉ๊ณผ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ๋ฅผ ํ›จ์”ฌ ๋” ํšจ๊ณผ์ ์œผ๋กœ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋Š” ๊ธฐ๋Šฅ์„ ๊ฐ–์ถ˜ ์ธํ„ฐํŽ˜์ด์Šค์ž…๋‹ˆ๋‹ค. ์ด ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ํ™œ์šฉํ•˜๋ฉด ๋™๊ธฐ์ ์ธ ์ˆ˜ํ–‰ ๋ฐฉ์‹์„ ๋น„๋™๊ธฐ์ ์ธ ์ˆ˜ํ–‰ ๋ฐฉ์‹์œผ๋กœ ๋ณ€ํ™˜ํ•˜์—ฌ ์„ฑ๋Šฅ์„ ๊ฐœ์„ ํ•˜๊ณ , ๋” ์œ ์—ฐํ•œ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ๋กœ์ง์„ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

์˜ˆ์ œ

์•„๋ž˜ ์˜ˆ์ œ๋Š” ItemRepository์—์„œ storeName์„ ํ†ตํ•ด item์„ ์กฐํšŒํ•˜๋Š” ์˜ˆ์ œ์ž…๋‹ˆ๋‹ค. 

@Getter
public class Item {
	
	private String name;
	private Integer price;
	
	@Builder 
	public Item(String name, Integer price) {
		this.name = name;
		this.price = price;
	}
	
}

 

ItemRepository.getPriceByName(...) ๋ฉ”์„œ๋“œ๋Š” ์š”์ฒญ์‹œ๋งˆ๋‹ค 1์ดˆ๊ฐ„ ๋Œ€๊ธฐ ํ›„ ๊ฐ€๊ฒŒ์— ๋“ฑ๋ก๋˜์–ด ์žˆ๋Š” Item์˜ ๊ฐ€๊ฒฉ์„ ๋ฆฌํ„ดํ•ฉ๋‹ˆ๋‹ค. 

@Component
public class ItemRepository {
	
	private Map<String, Item> storeMap = new ConcurrentHashMap<>();
	
	public ItemRepository() {
		storeMap.put("storeA", Item.builder().name("pencil").price(300).build());
		storeMap.put("storeB", Item.builder().name("eraser").price(500).build());
		storeMap.put("storeC", Item.builder().name("mechanical pencil").price(2000).build());
	}
	
	public int getPriceByStoreName(String name) {
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		return storeMap.get(name).getPrice();
	}
	
}

 

ํ•ด๋‹น ๋กœ์ง์—์„œ ๋™๊ธฐ์ ์œผ๋กœ ๋ชจ๋“  ์ƒํ’ˆ์„ ์กฐํšŒํ•˜๋ฉด ์‹œ๊ฐ„์ด ์–ผ๋งˆ๋‚˜ ๊ฑธ๋ฆด๊นŒ์š”? 

@Test
public void getPriceAllSync() {
	int priceA = itemRepository.findPriceByStoreName("storeA");
	int priceB = itemRepository.findPriceByStoreName("storeB");
	int priceC = itemRepository.findPriceByStoreName("storeC");
	
	log.info("price = {}", priceA);
	log.info("price = {}", priceB);
	log.info("price = {}", priceC);
}

3sec 482ms ๊ฒฝ๊ณผ

findPriceByStoreName(...) ๋ฉ”์„œ๋“œ๋Š” ํ˜ธ์ถœ๋  ๋•Œ ๋งˆ๋‹ค 1์ดˆ ๋™์•ˆ ๋Œ€๊ธฐํ•œ ํ›„ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์ž‘์—… ์‹œ๊ฐ„์ด 3์ดˆ ์ด์ƒ ๊ฑธ๋ฆฌ๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ๋ฐฉ์‹์˜ ๋กœ์ง์€ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์Šค๋ ˆ๋“œ๋กœ ํ•œ ๋ฒˆ์— ์š”์ฒญ์„ ํ•œ๋‹ค๋ฉด ๊ฒฝ๊ณผ ์‹œ๊ฐ„์„ ํฌ๊ฒŒ ์ค„์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

 

๊ทธ๋ ‡๋‹ค๋ฉด ํ•ด๋‹น ๋กœ์ง์„ ๋น„๋™๊ธฐ๋กœ ์ฒ˜๋ฆฌํ•˜๋„๋ก ์ž‘์„ฑํ•˜๋ ค๋ฉด ์–ด๋–ป๊ฒŒ ํ•ด์•ผํ• ๊นŒ์š”? 

@Test
public void getPriceAllAsync() {
	CompletableFuture<Void> futureA = CompletableFuture.supplyAsync(() -> {
		log.info("ThreadName = {}", Thread.currentThread().getName());
		return itemRepository.getPriceStoreName("storeA");
	  })
	  .thenAccept(price -> log.info("price = {}", price));
	  
	CompletableFuture<Void> futureB = CompletableFuture.supplyAsync(() -> {
		log.info("ThreadName = {}", Thread.currentThread().getName());
		return itemRepository.getPriceStoreName("storeB");
	  })
	  .thenAccept(price -> log.info("price = {}", price));
	  
	CompletableFuture<Void> futureC = CompletableFuture.supplyAsync(() -> {
		log.info("ThreadName = {}", Thread.currentThread().getName());
		return itemRepository.getPriceStoreName("storeC");
	  })
	  .thenAccept(price -> log.info("price = {}", price));
	
	CompletableFuture.allOf(futureA, futureB, futureC).join();
}

ํ˜„์žฌ ๋กœ์ง์—์„œ๋Š” ๊ฐœ๋ณ„ ์ž‘์—…์„ CompletableFuture.supplyAsync(...)๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ๋ฐฉ์‹์€ ํ˜„์žฌ ์Šค๋ ˆ๋“œ์™€๋Š” ๋ณ„๊ฐœ์˜ ์Šค๋ ˆ๋“œ์—์„œ ํ•จ์ˆ˜๊ฐ€ ์‹คํ–‰๋˜๋„๋ก ํ•˜์—ฌ ๋ณ‘๋ ฌ๋กœ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

  • theAccept(...): ๊ฒฐ๊ณผ๋ฅผ ์†Œ๋น„ํ•˜๋Š” ์ฝœ๋ฐฑํ•จ์ˆ˜๋กœ, ๋ฆฌํ„ดํƒ€์ž…์ด ์—†์Šต๋‹ˆ๋‹ค. CompletableFuture.supplyAsync(...)์—์„œ์˜ ์ž‘์—…์ด ์™„๋ฃŒ๋œ ํ›„ ๊ฒฐ๊ณผ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ์ž‘์—…์„ ์ง€์ •ํ•  ๋•Œ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค. ๋งŒ์•ฝ ๋ฆฌํ„ด ํƒ€์ž…์ด ์žˆ๋Š” ์ฝœ๋ฐฑ ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด, thenApply(...) ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. 
  • allOf(...): ์—ฌ๋Ÿฌ CompletableFuture๋ฅผ ๋ณ‘๋ ฌ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•ด์ฃผ๋Š” ๋ฉ”์„œ๋“œ์ž…๋‹ˆ๋‹ค. ๋ชจ๋“  CompletableFuture์˜ ์ฒ˜๋ฆฌ๊ฐ€ ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐํ•˜๋ฉฐ, ์™„๋ฃŒ๋œ ํ›„ CompletableFuture<Void>๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ allOf ๋ฉ”์„œ๋“œ๋Š” ๋ชจ๋“  Future์˜ ๊ฒฐ๊ณผ๋ฅผ ๊ฒฐํ•ฉํ•œ ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. 

๋”ฐ๋ผ์„œ ๊ฐœ๋ณ„ CompletableFuture์˜ ๊ฒฐ๊ณผ๋ฅผ ์‚ฌ์šฉํ•˜๋ ค๋ฉด join() ๋ฉ”์„œ๋“œ๋ฅผ ํ™œ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. join()์„ ํ˜ธ์ถœํ•˜์—ฌ allOf()์˜ ์ž‘์—…์ด ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๋ธ”๋กœํ‚นํ•˜์—ฌ ๋Œ€๊ธฐํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

 

์‹คํ–‰ ๊ฒฐ๊ณผ ๋ถ„์„

3sec 482ms -> 1sec 450ms

์ผ๋‹จ ๋น„๋™๊ธฐ๋กœ ์‹คํ–‰ํ•œ ์ฝ”๋“œ๊ฐ€ ์ด์ „ ๋™๊ธฐ์ ์œผ๋กœ ์‹คํ–‰ํ•œ ์ฝ”๋“œ๋ณด๋‹ค ๋น ๋ฅธ ์ˆ˜ํ–‰ ์‹œ๊ฐ„์„ ๋ณด์ธ๋‹ค๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

 

์œ„์˜ ์‹คํ–‰ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด, ํ•ด๋‹น CompletableFuture๊ฐ€ ์‹คํ–‰๋œ ์Šค๋ ˆ๋“œ๊ฐ€ ForkJoinPool.commonPool๋กœ ํ‘œ์‹œ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค. ์—ฌ๊ธฐ์„œ ForkJoinPool.commonPool์€ ๋ฌด์—‡์ด๊ณ  ์™œ ๋น„๋™๊ธฐ ๋กœ์ง์ด ํ•ด๋‹น ์Šค๋ ˆ๋“œ์—์„œ ๋™์ž‘ํ•˜๋Š” ๊ฒƒ์ผ๊นŒ์š”? 

 

ForkJoinPool

ForkJoinPool์€ ์ž๋ฐ”์—์„œ ์ œ๊ณตํ•˜๋Š” ๋น„๋™๊ธฐ ํ”„๋ ˆ์ž„์›Œํฌ๋กœ, ๋ถ„ํ• -์ •๋ณต ์•Œ๊ณ ๋ฆฌ์ฆ˜์„ ํ™œ์šฉํ•˜์—ฌ ์ž‘์—…์„ ํšจ์œจ์ ์œผ๋กœ ๋ถ„์‚ฐํ•˜๊ณ  ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•˜๋Š” ์Šค๋ ˆ๋“œ ํ’€์ž…๋‹ˆ๋‹ค. ์ด ์Šค๋ ˆ๋“œ ํ’€์€ ํฐ ์ž‘์—…์„ ์—ฌ๋Ÿฌ ์ž‘์€ Sub Task๋กœ ๋ถ„๋ฆฌ(Fork)ํ•˜๊ณ , ๊ฐ๊ฐ์„ ๋ณ‘๋ ฌ๋กœ ์ฒ˜๋ฆฌํ•œ ํ›„ ๊ทธ ๊ฒฐ๊ณผ๋ฅผ ๋‹ค์‹œ ํ•ฉ์ณ(Join) ์ตœ์ข…๊ฒฐ๊ณผ๋ฅผ ์ƒ์„ฑํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ ๋™์ž‘ํ•ฉ๋‹ˆ๋‹ค. 

 

๋™์ž‘ ๋ฐฉ์‹

ForkJoinPool์€ ๋‹ค๋ฅธ ์ข…๋ฅ˜์˜ ExecutorService์™€๋Š” ๋‹ค๋ฅธ Work-Stealing ๋งค์ปค๋‹ˆ์ฆ˜์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

ForkJoinPool์˜ ๋™์ž‘๋ฐฉ์‹

  1. submit()์„ ํ†ตํ•ด task๋ฅผ ๋ณด๋‚ธ๋‹ค.
  2. ์ธ๋ฐ”์šด๋“œ ํ์— task๊ฐ€ ๋“ค์–ด๊ฐ€๊ณ , A์™€ B ์Šค๋ ˆ๋“œ๊ฐ€ task๋ฅผ ์ฒ˜๋ฆฌํ•œ๋‹ค.
  3. A์™€ B๋Š” ๊ฐ์ž ํ๊ฐ€ ์žˆ์œผ๋ฉฐ, ์œ„์˜ ๊ทธ๋ฆผ์˜ B์ฒ˜๋Ÿผ ํ์— task๊ฐ€ ์—†์œผ๋ฉด A์˜ task๋ฅผ stealํ•˜์—ฌ ์ฒ˜๋ฆฌํ•œ๋‹ค. 

Work-Stealing ๋งค์ปค๋‹ˆ์ฆ˜์„ ์‚ฌ์šฉํ•˜๊ธฐ ๋•Œ๋ฌธ์— CPU ์ž์›์ด ๋†€์ง€ ์•Š๊ณ  ์ตœ์ ์˜ ์„ฑ๋Šฅ์„ ๋‚ผ ์ˆ˜ ์žˆ๊ฒŒ ํ•ด ์ค๋‹ˆ๋‹ค. 

 

ForkJoinPool๊ณผ CompletableFuture์˜ ์—ฐ๊ณ„

Java 8์—์„œ๋ถ€ํ„ฐ CompletableFuture๋Š” ForkJoinPool์„ ์‚ฌ์šฉํ•˜์—ฌ ๋น„๋™๊ธฐ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค. ๊ธฐ๋ณธ์ ์œผ๋กœ CompletableFuture๋Š” ForkJoinPool.commonPool()์„ ์‚ฌ์šฉํ•˜์—ฌ ์Šค๋ ˆ๋“œ ํ’€์„ ๊ฐ€์ ธ์˜ค๊ณ  ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. 

ForkJoinPool.commonPool() ๋””ํดํŠธ ์Šค๋ ˆ๋“œ ๊ฐœ์ˆ˜ = ์ฝ”์–ด ๊ฐœ์ˆ˜ * 2 - 1

ForkJoinPool.commonPool()์€ ๊ธฐ๋ณธ์ ์œผ๋กœ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ํ”„๋กœ์„ธ์„œ์˜ ์ˆ˜์— ๋”ฐ๋ผ ๋™์ ์œผ๋กœ ํฌ๊ธฐ๊ฐ€ ์กฐ์ •๋˜๋Š” ๊ณต์šฉ ForkJoinPool์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ CompletableFuture๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ๋Š” ๋ณ„๋„์˜ ์Šค๋ ˆ๋“œ ํ’€์„ ์ƒ์„ฑํ•˜๊ฑฐ๋‚˜ ๊ตฌ์„ฑํ•  ํ•„์š”๊ฐ€ ์—†์œผ๋ฉฐ, ๊ธฐ๋ณธ์ ์œผ๋กœ ์‹œ์Šคํ…œ ๋ฆฌ์†Œ์Šค๋ฅผ ์ž˜ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

 

ForkJoinPool ์‚ฌ์šฉ ์‹œ ์ฃผ์˜์‚ฌํ•ญ

CompletableFuture์™€ parallelStream์€ ๋‚ด๋ถ€์ ์œผ๋กœ ForkJoinPool.commonPool()์„ ์‚ฌ์šฉํ•˜์—ฌ ์ž‘์—…์„ ๋ณ‘๋ ฌํ™” ์‹œํ‚ต๋‹ˆ๋‹ค. ์ด ๋ง์€ commonPool์„ ๋ณ„๋„์˜ ์Šค๋ ˆ๋“œ ํ’€๋กœ ์ƒ์„ฑํ•˜๋Š” ๊ฒƒ์ด ์•„๋‹ˆ๋ผ๋Š” ๋ง๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค. ์ฆ‰, ๋ณ„๋„์˜ ์„ค์ •์ด ์—†๋‹ค๋ฉด ํ•˜๋‚˜์˜ ์Šค๋ ˆ๋“œ ํ’€์„ ๋ชจ๋“  CompletableFuture๊ณผ parallelStream์ด ๊ณต์œ ํ•˜๊ฒŒ ๋˜๊ณ , ์Šค๋ ˆ๋“œ ํ’€์„ ์‚ฌ์šฉํ•˜๋Š” ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ์— ์˜ํ–ฅ์„ ์ค„ ์ˆ˜ ์žˆ์œผ๋ฉฐ, ๋ฐ˜๋Œ€๋กœ ์˜ํ–ฅ์„ ๋ฐ›์„ ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค. 

 

์•„๋ž˜์™€ ๊ฐ™์€ ์ฝ”๋“œ๋ฅผ ๋™์‹œ์— ์š”์ฒญํ•˜์—ฌ ์‹คํ–‰ํ•œ๋‹ค๊ณ  ๊ฐ€์ •ํ•ด๋ด…์‹œ๋‹ค. 

public void getPriceAllAsync() {
	CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> {
		log.info("Thread Name = {}", Thread.currentThread().getName());
		return itemRepository.getPriceByStoreName("storeA");
	});
	
	CompletableFuture<Integer> futureB = CompletableFuture.supplyAsync(() -> {
		log.info("Thread Name = {}", Thread.currentThread().getName());
		return itemRepository.getPriceByStoreName("storeB");
	});
	
	CompletableFuture<Integer> futureC = CompletableFuture.supplyAsync(() -> {
		log.info("Thread Name = {}", Thread.currentThread().getName());
		return itemRepository.getPriceByStoreName("storeC");
	});
	
	log.info("price = {}", futureA.join());
	log.info("price = {}", futureB.join());
	log.info("price = {}", futureC.join());
}

 ์ฒซ ๋ฒˆ์งธ ์š”์ฒญ์ด ํ•ด๋‹น ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•˜์˜€์„ ๋•Œ ์Šค๋ ˆ๋“œ ํ’€์€ ์•„๋ž˜์™€ ๊ฐ™์€ ์ƒํƒœ๊ฐ€ ๋ฉ๋‹ˆ๋‹ค. 

์ฒซ ๋ฒˆ์งธ ์š”์ฒญ ์‹คํ–‰

์ฒซ ๋ฒˆ์งธ ์š”์ฒญ์„ ์ฒ˜๋ฆฌํ•˜๋Š” ๋™์•ˆ ์Šค๋ ˆ๋“œ๋ฅผ ์ ์œ ํ•˜๊ฒŒ ๋˜๋ฉด, ๋‹ค์Œ ์š”์ฒญ์ด ๋„์ฐฉํ•  ๋•Œ ์Šค๋ ˆ๋“œ๋ฅผ ๋ฐ˜ํ™˜ํ•˜์ง€ ์•Š๊ณ  ๊ณ„์† ์ ์œ ํ•˜๊ฒŒ ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋กœ ์ธํ•ด Thread 1, 2, 3์€ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†๊ฒŒ ๋˜๊ณ , ๊ฒฐ๊ตญ Thread 4 ํ•˜๋‚˜๋งŒ ๋‚จ์•„ ๋ชจ๋“  ์š”์ฒญ์„ ์ฒ˜๋ฆฌํ•ด์•ผ ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

 

ํŠนํžˆ ์Šค๋ ˆ๋“œ๊ฐ€ sleep๊ณผ ๊ฐ™์ด ์•„๋ฌด ์ž‘์—…๋„ ํ•˜์ง€ ์•Š๊ณ  ์ ์œ  ์ค‘์ด๋ฉด ์‹ฌ๊ฐํ•œ ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

๋‘ ๋ฒˆ์งธ ์š”์ฒญ ์‹คํ–‰

์œ„ ๊ณผ์ •์—์„œ ๋” ์‹ฌ๊ฐํ•œ ์ƒํ™ฉ์€ Thread 4๊นŒ์ง€ ์ ์œ ํ•œ ๊ฒฝ์šฐ์ž…๋‹ˆ๋‹ค. ์ด ๊ฒฝ์šฐ ์ถ”๊ฐ€ ์š”์ฒญ์€ ์ฒ˜๋ฆฌ๋˜์ง€ ์•Š๊ณ  Thread Pool Queue์— ์Œ“์ด๊ฒŒ ๋˜๋ฉฐ, ์ผ์ • ์‹œ๊ฐ„์ด ์ง€๋‚˜๋ฉด ์š”์ฒญ์ด ์†์‹ค๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

 

ForkJoinPool.commPool()์€ ์ „์—ญ์ ์œผ๋กœ ๊ณต์œ ๋˜๋Š” ์Šค๋ ˆ๋“œ ํ’€์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ด ๋•Œ๋ฌธ์— 4๊ฐœ์˜ ์Šค๋ ˆ๋“œ๊ฐ€ ๋ชจ๋‘ ์‚ฌ์šฉ ์ค‘์ด๋ผ๋ฉด, ๋‹ค๋ฅธ ์ž‘์—…๋“ค์€ ๋Œ€๊ธฐ ์ƒํƒœ์— ๋“ค์–ด๊ฐ€๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. ํŠนํžˆ Blocking I/O๊ฐ€ ๋ฐœ์ƒํ•˜๋Š” ์ž‘์—…์ด ์žˆ์„ ๊ฒฝ์šฐ, ์Šค๋ ˆ๋“œ ํ’€ ๋‚ด๋ถ€์˜ ์Šค๋ ˆ๋“œ๋“ค์ด ๋ธ”๋ก ๋˜์–ด ๋‹ค๋ฅธ ์š”์ฒญ๋“ค์ด ์Šค๋ ˆ๋“œ๋ฅผ ์–ป๊ธฐ ์ „๊นŒ์ง€ ๊ธฐ๋‹ค๋ ค์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ด๋กœ ์ธํ•ด ์„ฑ๋Šฅ ์ €ํ•˜์™€ ๊ฐ™์€ ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

 

public void getPriceAllAsync() {
	ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
	
	CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> {
		log.info("Thread Name = {}", Thread.currentThread().getName());
		return itemRepository.getPriceByStoreName("storeA");
	}, pool);
	
	CompletableFuture<Integer> futureB = CompletableFuture.supplyAsync(() -> {
		log.info("Thread Name = {}", Thread.currentThread().getName());
		return itemRepository.getPriceByStoreName("storeB");
	}, pool);
	
	CompletableFuture<Integer> futurec = CompletableFuture.supplyAsync(() -> {
		log.info("Thread Name = {}", Thread.currentThread().getName());
		return itemRepository.getPriceByStoreName("storeC");
	}, pool);
	
	log.info("price = {}", futureA.join());
	log.info("price = {}", futureB.join());
	log.info("price = {}", futureC.join());
	
	pool.shutdown();
}

์ด ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด, ๊ฐ CompletableFuture๋‚˜ parallelStream์— ๋Œ€ํ•ด ๋…๋ฆฝ์ ์ธ ์Šค๋ ˆ๋“œ ํ’€์„ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์ด ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋ ‡๊ฒŒ ํ•˜๋ฉด ์Šค๋ ˆ๋“œ ํ’€์ด ๋…๋ฆฝ์ ์œผ๋กœ ๊ด€๋ฆฌ๋˜๋ฏ€๋กœ, ํ•œ ์ž‘์—…์ด ์Šค๋ ˆ๋“œ๋ฅผ ์ฐจ์ง€ํ•˜๊ณ  ์žˆ์–ด๋„ ๋‹ค๋ฅธ ์ž‘์—…์€ ์˜ํ–ฅ์„ ๋ฐ›์ง€ ์•Š๊ณ  ์›ํ™œํ•˜๊ฒŒ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

 

CompletableFuture์™€ parallelStream

Java์—์„œ ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ๋ฅผ ํ•˜๋Š” ๋ฐฉ๋ฒ•์€ CompletableFuture๋งŒ ์žˆ๋Š” ๊ฒƒ์ด ์•„๋‹Œ parallelStream๋„ ์กด์žฌํ•ฉ๋‹ˆ๋‹ค. ์ด ๋‘˜์€ ForkJoinPool.commonPool()์„ ๊ณต์œ ํ•˜๊ณ  ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ๋ฅผ ์ง€์›ํ•˜์ง€๋งŒ, ๊ฐ๊ฐ์˜ ํŠน์„ฑ๊ณผ ์ ํ•ฉํ•œ ์‚ฌ์šฉ ์ƒํ™ฉ์ด ๋‹ค๋ฆ…๋‹ˆ๋‹ค. 

 

parallelStream

parallelStream์€ Java 8๋ถ€ํ„ฐ ์ถ”๊ฐ€๋œ ์ŠคํŠธ๋ฆผ API์˜ ํ™•์žฅ์œผ๋กœ ์ŠคํŠธ๋ฆผ์„ ๋ณ‘๋ ฌ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ๋ฐ์ดํ„ฐ ์ง‘ํ•ฉ์„ ๋ณ‘๋ ฌ๋กœ ์ฒ˜๋ฆฌํ•˜์—ฌ ์„ฑ๋Šฅ์„ ํ–ฅ์ƒ ์‹œํ‚ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

 

์•„๋ž˜๋Š” ์ด์ „ ์˜ˆ์ œ๋ฅผ parallelStream์„ ํ†ตํ•ด ์ฒ˜๋ฆฌํ•˜๋Š” ์˜ˆ์ œ์ž…๋‹ˆ๋‹ค. 

@Test
public void getPriceAllAsyncWithParallelStream() {
	List<String> stores = List.of("storeA", "storeB", "storeC");
	
	stores.parallelStream() 
		.forEach(store -> {
			log.info("Thread Name = {}", Thread.currentThread().getName());
			log.info("price = {}", itemRepository.getPriceByStore(store));
		});
}

์œ„์˜ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•ด๋ณด๋ฉด ์ด์ „ CompletableFuture๋ฅผ ์‚ฌ์šฉํ•œ ๋ฐฉ์‹๊ณผ ๋‹ค๋ฅธ ์  ์—†์ด ๋™์ผํ•œ ๊ฒฐ๊ณผ์™€ ์ˆ˜ํ–‰ ์‹œ๊ฐ„์ด ๋‚˜ํƒ€๋‚ฉ๋‹ˆ๋‹ค. ๊ทธ๋ ‡๋‹ค๋ฉด CompletableFuture์™€ parallelStream ์‚ฌ์ด์—๋Š” ์–ด๋–ค ์ฐจ์ด๊ฐ€ ์žˆ๋Š” ๊ฑธ๊นŒ์š”? 

 

parallelStream๋„ Stream์ด๋‹ค

์•„๋ž˜ ์˜ˆ์ œ๋ฅผ ํ†ตํ•ด ์ด ๋‘ ๋ฐฉ์‹์˜ ์ฐจ์ด๋ฅผ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค. 

@Test 
public void getPriceAllAsyncWithParallelStream() throws Exception {
	List<String> stores = List.of("storeA", "storeB", "storeC");
	
	List<Integer> prices = stores.parallelStream()
		.map(store -> itemRepository.getPriceByStoreName(store))
		.toList();
	
	Thread.sleep(1000);
	
	prices.forEach(price -> log.info("price = {}", price));
}

์ด ์ฝ”๋“œ๋Š” parallelStream์„ ํ†ตํ•œ ๊ฐ€๊ฒฉ ์กฐํšŒ ๋ถ€๋ถ„๊ณผ ํ•ด๋‹น ๊ฐ€๊ฒฉ์„ ์ถœ๋ ฅํ•˜๋Š” ์ฝ”๋“œ ์‚ฌ์ด์— ์–ด๋– ํ•œ ์ž‘์—…์ด ์žˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•˜๊ณ  ์ž‘์„ฑํ•œ ์ฝ”๋“œ์ž…๋‹ˆ๋‹ค. 

 

ํ•ด๋‹น ์ฝ”๋“œ๋ฅผ ์ˆ˜ํ–‰ํ•œ๋‹ค๋ฉด ์•„๋ž˜์™€ ๊ฐ™์€ ๊ฒฐ๊ณผ๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

์ˆ˜ํ–‰์‹œ๊ฐ„ 2sec 465ms

 

CompletableFuture๋ฅผ ์‚ฌ์šฉํ•œ ์ฝ”๋“œ๋„ ๊ฐ™์€ ์ƒํ™ฉ์„ ๊ฐ€์ •ํ•˜์—ฌ ์ž‘์„ฑํ•˜์˜€์Šต๋‹ˆ๋‹ค. 

@Test
public void getPriceAllAsyncWithCompletableFuture() throws Exception {
	List<String> stores = List.of("storeA", "storeB", "storeC");
	
	List<CompletableFuture<Integer> futures = stores.stream()
		.map(store -> CompletableFuture.supplyAsync(() -> itemRepository.getPriceByStoreName(store)))
		.toList();
		
	Thread.sleep(1000);
	
	futures.stream()
		.map(CompletableFuture::join)
		.forEach(price -> log.info("price = {}", price));
}

ํ•ด๋‹น ์ฝ”๋“œ๋ฅผ ์‹คํ–‰์‹œ์ผœ๋ณด๋ฉด ์ด์ „ parallelStream์„ ์‚ฌ์šฉํ•œ ์ฝ”๋“œ๋ณด๋‹ค ๋น ๋ฅธ ์ˆ˜ํ–‰์‹œ๊ฐ„์„ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค.

2sec 465ms -> 1sec 457ms

์ด๋Ÿฐ ์ฐจ์ด๊ฐ€ ๋‚˜๋Š” ์ด์œ ๋Š” parallelStream์€ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰๋˜๊ธด ํ•˜์ง€๋งŒ, ์ŠคํŠธ๋ฆผ์˜ ๋ชจ๋“  ์š”์†Œ๊ฐ€ ์ฒ˜๋ฆฌ๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ ค์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ฆ‰, ๋ชจ๋“  ์š”์†Œ๊ฐ€ ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๋ธ”๋ก๋ฉ๋‹ˆ๋‹ค. ๋ฐ˜๋ฉด, CompletableFuture๋Š” I/O์™€ ๊ฐ™์€ ์‹œ๊ฐ„์ด ๊ฑธ๋ฆฌ๋Š” ์ž‘์—…์„ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์š”์ฒญํ•œ ํ›„, ๋‹ค๋ฅธ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋‹ค๊ฐ€ ํ•„์š”ํ•œ ์‹œ์ ์— ๊ฒฐ๊ณผ๋ฅผ ์กฐํšŒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

 

์ด๋Ÿฌํ•œ ์ด์œ ๋กœ parallelStream์€ ๋ฐ์ดํ„ฐ์˜ ์ปฌ๋ ‰์…˜ ์ฒ˜๋ฆฌ, ํ•„ํ„ฐ๋ง, ๋ณ€ํ™˜๊ณผ ๊ฐ™์€ ๋ณ‘๋ ฌ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์ž‘์—…์— ์ ํ•ฉํ•˜๊ณ , CompletableFuture๋Š” ๋น„๋™๊ธฐ ์ž‘์—…, ๋„คํŠธ์›Œํฌ ํ˜ธ์ถœ๊ณผ ๊ฐ™์€ ๋น„๋™๊ธฐ ์ž‘์—…์˜ ์กฐํ•ฉ์ด ํ•„์š”ํ•  ๋•Œ ์ ํ•ฉํ•˜๋‹ค๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

 

๊ฒฐ๋ก 

  • ForkJoinPool์€ Work-Stealing ๋งค์ปค๋‹ˆ์ฆ˜์„ ์‚ฌ์šฉํ•˜๋Š” ์Šค๋ ˆ๋“œ ํ’€๋กœ CPU ์ง‘์•ฝ์ ์ธ ์ž‘์—…์„ ํ•  ๋•Œ ์ตœ์ ์˜ ์„ฑ๋Šฅ์„ ๋‚ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 
  • ์ด๋Š” CompletableFuture์™€ parallelStream์— ForkJoinPool.commonPool()๋กœ ์ „์—ญ์ ์œผ๋กœ ์ œ๊ณต๋˜์–ด ๋ฌธ์ œ๋ฅผ ๋ฐœ์ƒ์‹œํ‚ฌ ์ˆ˜ ์žˆ์–ด ์š”์ฒญ๋งˆ๋‹ค ์Šค๋ ˆ๋“œ ํ’€์„ ๋งŒ๋“ค์–ด ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์ข‹์Šต๋‹ˆ๋‹ค. 
  • CompletableFuture์™€ parallelStream์€ ์ž‘์—…์„ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ ์‹œํ‚ฌ์ˆ˜ ์žˆ์ง€๋งŒ, CompletableFuture๋Š” I/O ๋ฐ”์šด๋“œ ์ž‘์—…์„ parallelStream์€ CPU ๋ฐ”์šด๋“œ ์ž‘์—…์— ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ํšจ์œจ์ ์ด๊ธฐ์— ์ž‘์—…์˜ ํŠน์„ฑ๊ณผ ์š”๊ตฌ์‚ฌํ•ญ์— ๋”ฐ๋ผ ์ ์ ˆํžˆ ์„ ํƒํ•˜๋Š” ๊ฒƒ์ด ์ข‹์Šต๋‹ˆ๋‹ค. 

'Java' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

Call by Value์™€ Call by Reference  (0) 2025.03.07
Java์˜ GC  (0) 2025.03.02
ConcurrentHashMap์€ ์–ด๋–ป๊ฒŒ ๋™์‹œ์„ฑ์„ ๋ณด์žฅํ• ๊นŒ?  (0) 2024.10.28
'Java' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€
  • Call by Value์™€ Call by Reference
  • Java์˜ GC
  • ConcurrentHashMap์€ ์–ด๋–ป๊ฒŒ ๋™์‹œ์„ฑ์„ ๋ณด์žฅํ• ๊นŒ?
jwooo๐ŸŒฑ
jwooo๐ŸŒฑ
jwooo's log ์ž…๋‹ˆ๋‹ค.
  • jwooo๐ŸŒฑ
    jwooo's log
    jwooo๐ŸŒฑ
  • ์ „์ฒด
    ์˜ค๋Š˜
    ์–ด์ œ
    • ๋ถ„๋ฅ˜ ์ „์ฒด๋ณด๊ธฐ (12)
      • Java (4)
      • Project (6)
      • Computer Science (2)
        • Network (1)
        • Security (1)
  • ๋ธ”๋กœ๊ทธ ๋ฉ”๋‰ด

    • ํ™ˆ
    • ๋ฐฉ๋ช…๋ก
  • ๋งํฌ

  • ๊ณต์ง€์‚ฌํ•ญ

  • ์ธ๊ธฐ ๊ธ€

  • ์ตœ๊ทผ ๋Œ“๊ธ€

  • ์ตœ๊ทผ ๊ธ€

  • hELLOยท Designed By์ •์ƒ์šฐ.v4.10.3
jwooo๐ŸŒฑ
CompletableFuture์™€ ForkJoinPool
์ƒ๋‹จ์œผ๋กœ

ํ‹ฐ์Šคํ† ๋ฆฌํˆด๋ฐ”