[Java] CompletableFuture สำหรับการทำงาน Async Programming

Panya.Bas
2 min readJan 17, 2021

--

เนื่องจากได้มีงานเขียน Backend Service (Spring Boot)โดยต้องไปเรียก REST Service Api ของส่วนกลางหลาย ๆ เส้นเพื่อเอา Data มารวมร่างกัน (จนแอปตัวเองแถบจะไม่ต้องใช้ Database เลย @_@) จึงได้รู้จักกับ CompletableFuture ที่จะช่วยให้การทำงาน Async ง่ายขึ้น

Fork-Join framework

Fork-Join framework ถูกเพิ่มเข้ามาใน Java 7 ช่วยเพิ่มความเร็วในการทำงาน โดยแตกงานออกเป็นงานย่อยๆ แล้วให้ทำงานพร้อมกันแบบ Parallel ซึ่งจะกระจายการทำงานไปยัง threads ต่างๆ ใน common thread pool โดยเรียกว่า ForkJoinPool

ForkJoinPool

แต่ละ thread ใน ForkJoinPool จะมี algorithm “workstealing” คือ เมื่อ thread ไหนว่างมันจะไปโขมยงานที่อยู่ในคิวออกมาทำเรื่อย ๆ จนกว่าทุกงานจะเสร็จ

https://www.java-success.com/10-%E2%99%A6-executorservice-vs-forkjoin-future-vs-completablefuture-interview-qa/
source: https://www.java-success.com/10-%E2%99%A6-executorservice-vs-forkjoin-future-vs-completablefuture-interview-qa/

CompletableFuture พระเอกของ Asynchronous Task

ถ้าเราไม่ได้ทำการ config ใดๆ เจ้า CompletableFuture จะใช้งาน thread จาก ForkJoinPool ด้านบนนั่นแหละฮะ

  1. CompletableFuture.supplyAsync() เป็นการแตก thread เพื่อแยกไปทำงานจาก main thread ส่วนใหญ่จะใช้ตัวนี้นะ เมื่อทำงานเสร็จแล้วจะ return ผลลัพธ์กลับมาให้เพื่อเอาไปงานต่อ
public CompletableFuture<Double> calculateAsync() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
return 5 * 10d;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 0d;
});
}

2. CompletableFuture.runAsync() คล้ายกับตัวด้านบนเลย แต่ไม่ return ผลลัพธ์กลับมานะ เหมาะกับงานพวก print log นะหรือ ใช้กับ method void อะ

public void calculateAsync() {
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
System.out.println("Success");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Fail");
});
}

ต่อไปสั่งให้เริ่มทำงาน และเอาผลลัพธ์ของ CompletableFuture มาทำงานต่อ

  1. thenCompose คือ Callback ที่จะได้ Response จาก CompletableFuture โดยใน callback thenCompose นี้เหมาะกับการทำงาน Asynchronous function ต่อ
CompletableFuture<Integer> future = 
CompletableFuture.supplyAsync(() -> 1)
.thenCompose(x -> CompletableFuture.supplyAsync(() -> x+1));
future.get() // Output = 2

2. thenApply ทำงานคล้ายกับ thenCompose คือ Callback ที่จะได้ Response จาก CompletableFuture โดยใน callback thenApply นี้เหมาะกับการทำงาน Synchronous function ต่อ

CompletableFuture<Integer> future = 
CompletableFuture.supplyAsync(() -> 1)
.thenApply(x -> x+1));
future.get() // Output = 2

3. thenAccept คือ Callback ที่งานคล้ายกับ thenApply แต่หลังจากตัวมันทำงานจบจะไม่ return ผลลัพธ์ใดๆ ออกมา เหมาะกับไว้ใช้เป็น chain callback ตัวสุดท้าย แบบ print log

ต่อไปสั่งให้เริ่มทำงาน และรอผลลัพธ์จาก CompletableFuture ที่ทำงาน parallel กัน

  1. allOf มันจะทำการรวบรวม task หลายๆ อันไว้ เมื่อเริ่มทำงานแต่ละ task จะทำงาน parallel กัน ตาม max thread pool และจะรอผลลัพธ์จากทุก task จนครบ (blocking main thread) ซึ่งมักจะใช้คู่กับ method get หรือ join เพื่อสั่งให้มันเริ่มทำงาน
public CompletableFuture<Double> calculateAsync() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
return 5 * 10d;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 0d;
});
}
List<CompletableFuture<Double>> futures = new ArrayList<>();
futures.add(calculateAsync());
futures.add(calculateAsync());
futures.add(calculateAsync());
CompletableFuture
.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
// wait until tasks finish
futures.get(0).get() or futures.get(0).join()
// get result from first task
output: 50;

จากตัวอย่างด้านบน get กับ join ต่างกันอย่างไร ?

ทั้งสอง funtion ทำหน้าที่เหมือนกันคือทำการดึงเอา result จาก CompletableFuture

  1. join จะ return result เมื่อ task เสร็จสิ้น และ ถ้า task เกิดข้อผิดพลาด จะ throws unchecked exception (คือ exception ที่ compiler ไม่ได้เช็ค เช่น RuntimeException)

2. get จะ return result เมื่อ task เสร็จเช่นกัน และ ถ้า task เกิดข้อผิดพลาด จะ throws checked exception และ get สามารถกำหนด timeout ได้ด้วย

Double result= calculateAsync().join(); // unchecked exceptiontry {
Double result2 = calculateAsync().get();
} catch (InterruptedException | ExecutionException e) {
//checked exception
}

เจ้า CompletableFuture ยังสามารถ config thread โดยใช้ เจ้า ExecutorService ได้อีกนะ ไว้เด๋วมาต่อ ….แล้วกัน

จบเพียงแค่นี้ก่อนนน พักก่อนนนน

--

--