Think of ordering coffee at a busy cafe. You place your order, get a receipt with a number, and step aside. You are free to check your phone, chat with a friend, or grab a seat. When your coffee is ready, the barista calls your number. You did not stand at the counter blocking everyone behind you — you continued living your life while the work happened in the background.
CompletableFuture is that receipt. It is a promise that a value will be available at some point in the future. Introduced in Java 8 as part of java.util.concurrent, it is the most powerful tool Java provides for writing asynchronous, non-blocking code.
Java 5 introduced Future<T>, which represents an asynchronous computation. But Future has a fatal flaw: the only way to get the result is to call get(), which blocks the calling thread. You cannot attach a callback. You cannot chain operations. You cannot combine multiple futures. Calling get() defeats the purpose of async programming.
| Feature | Future | CompletableFuture |
|---|---|---|
| Get result | get() — blocks |
get(), join(), or non-blocking callbacks |
| Attach callback | Not supported | thenApply(), thenAccept(), thenRun() |
| Chain operations | Not supported | thenCompose(), thenApply() |
| Combine futures | Not supported | thenCombine(), allOf(), anyOf() |
| Exception handling | ExecutionException wrapper |
exceptionally(), handle(), whenComplete() |
| Manually complete | Not supported | complete(), completeExceptionally() |
import java.util.concurrent.*;
public class FutureVsCompletableFuture {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
// OLD WAY: Future -- blocks on get()
System.out.println("=== Future (blocking) ===");
Future future = executor.submit(() -> {
Thread.sleep(500);
return "Data from database";
});
// This line BLOCKS until the result is ready
String result = future.get();
System.out.println("Got: " + result);
// NEW WAY: CompletableFuture -- non-blocking
System.out.println("\n=== CompletableFuture (non-blocking) ===");
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "Data from database";
});
// Attach a callback -- runs when data is ready, does NOT block
cf.thenApply(data -> data.toUpperCase())
.thenAccept(data -> System.out.println("Got: " + data));
System.out.println("Main thread is free!");
Thread.sleep(1000);
executor.shutdown();
// Output:
// === Future (blocking) ===
// Got: Data from database
//
// === CompletableFuture (non-blocking) ===
// Main thread is free!
// Got: DATA FROM DATABASE
}
}
There are four main ways to create a CompletableFuture, each for a different situation.
Use supplyAsync() when your async operation produces a result. It takes a Supplier<T> and runs it on a background thread from ForkJoinPool.commonPool().
Use runAsync() for fire-and-forget operations that do not return a result: logging, sending notifications, writing to a file. It takes a Runnable and returns CompletableFuture<Void>.
Use completedFuture() to create a CompletableFuture that is already completed with a known value. This is useful for caching, default values, and testing.
By default, supplyAsync() and runAsync() use ForkJoinPool.commonPool(). For I/O-bound operations (database calls, HTTP requests, file reads), you should provide a custom executor to avoid starving the common pool.
import java.util.concurrent.*;
public class CreatingCompletableFuture {
public static void main(String[] args) throws Exception {
// 1. supplyAsync -- produces a result
CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching user on: " + Thread.currentThread().getName());
return "User{id=1, name='Alice'}";
});
System.out.println("User: " + userFuture.join());
// Output:
// Fetching user on: ForkJoinPool.commonPool-worker-1
// User: User{id=1, name='Alice'}
// 2. runAsync -- no result (fire-and-forget)
CompletableFuture logFuture = CompletableFuture.runAsync(() -> {
System.out.println("Logging on: " + Thread.currentThread().getName());
// Write to log file, send notification, etc.
});
logFuture.join(); // Wait for completion (returns null)
// 3. completedFuture -- already has a value
CompletableFuture cached = CompletableFuture.completedFuture("Cached Result");
System.out.println("Cached: " + cached.join());
// Output: Cached: Cached Result
// No thread was created -- the value was immediately available
// 4. Custom executor -- for I/O operations
ExecutorService ioPool = Executors.newFixedThreadPool(10, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("io-pool-" + t.getId());
return t;
});
CompletableFuture dbResult = CompletableFuture.supplyAsync(() -> {
System.out.println("DB query on: " + Thread.currentThread().getName());
try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "Query result";
}, ioPool); // Pass custom executor as second argument
System.out.println("DB: " + dbResult.join());
// Output:
// DB query on: io-pool-25
// DB: Query result
// 5. Manually completing
CompletableFuture manual = new CompletableFuture<>();
// Some other thread or callback will complete this later
new Thread(() -> {
try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); }
manual.complete("Manually completed!");
}).start();
System.out.println("Manual: " + manual.join());
// Output: Manual: Manually completed!
ioPool.shutdown();
}
}
| Method | Returns | Input | Thread Pool | Use Case |
|---|---|---|---|---|
supplyAsync(supplier) |
CompletableFuture<T> |
Supplier<T> |
Common pool | Async computation with result |
supplyAsync(supplier, executor) |
CompletableFuture<T> |
Supplier<T> |
Custom executor | I/O-bound operations |
runAsync(runnable) |
CompletableFuture<Void> |
Runnable |
Common pool | Fire-and-forget side effects |
completedFuture(value) |
CompletableFuture<T> |
T |
None (immediate) | Caching, defaults, testing |
new CompletableFuture() |
CompletableFuture<T> |
None | None (manual) | Bridging callback APIs |
Once you have a CompletableFuture, you can transform its result when it arrives using three callback methods. The key difference is what each callback accepts and returns.
| Method | Input | Returns | Analogy |
|---|---|---|---|
thenApply(Function) |
Previous result | Transformed result | Stream’s map() |
thenAccept(Consumer) |
Previous result | Void |
Stream’s forEach() |
thenRun(Runnable) |
Nothing | Void |
“Then do this, regardless of result” |
import java.util.concurrent.CompletableFuture;
public class TransformingResults {
public static void main(String[] args) {
// thenApply -- transform the result (like map)
System.out.println("=== thenApply ===");
CompletableFuture greeting = CompletableFuture
.supplyAsync(() -> "hello")
.thenApply(s -> s + " world") // "hello" -> "hello world"
.thenApply(String::toUpperCase); // "hello world" -> "HELLO WORLD"
System.out.println(greeting.join());
// Output: HELLO WORLD
// Chain of transformations
CompletableFuture wordCount = CompletableFuture
.supplyAsync(() -> "The quick brown fox jumps over the lazy dog")
.thenApply(String::trim)
.thenApply(s -> s.split("\\s+"))
.thenApply(words -> words.length);
System.out.println("Word count: " + wordCount.join());
// Output: Word count: 9
// thenAccept -- consume the result (no return value)
System.out.println("\n=== thenAccept ===");
CompletableFuture printFuture = CompletableFuture
.supplyAsync(() -> "User{name='Alice', age=30}")
.thenAccept(user -> System.out.println("Received: " + user));
printFuture.join();
// Output: Received: User{name='Alice', age=30}
// thenRun -- run something after completion (ignores result)
System.out.println("\n=== thenRun ===");
CompletableFuture pipeline = CompletableFuture
.supplyAsync(() -> {
System.out.println("Step 1: Fetching data...");
return "raw data";
})
.thenApply(data -> {
System.out.println("Step 2: Processing: " + data);
return "processed " + data;
})
.thenAccept(result -> {
System.out.println("Step 3: Saving: " + result);
})
.thenRun(() -> {
System.out.println("Step 4: All done! (cleanup)");
});
pipeline.join();
// Output:
// Step 1: Fetching data...
// Step 2: Processing: raw data
// Step 3: Saving: processed raw data
// Step 4: All done! (cleanup)
}
}
Composing means chaining async operations where each step depends on the result of the previous one. There are two key methods:
thenCompose() — Use when your transformation function itself returns a CompletableFuture. This is the async equivalent of flatMap() in streams. Without it, you end up with CompletableFuture<CompletableFuture<T>> — a future wrapped inside a future.thenCombine() — Use when you want to run two independent futures in parallel and combine their results when both complete.| Scenario | Use | Why |
|---|---|---|
Transform: T -> U |
thenApply() |
Synchronous transformation, returns a plain value |
Chain: T -> CompletableFuture<U> |
thenCompose() |
Async step that itself returns a future (avoids nesting) |
| Merge two independent results | thenCombine() |
Both futures run in parallel, combine when both finish |
import java.util.concurrent.CompletableFuture;
public class ComposingFutures {
// Simulated async service methods
static CompletableFuture fetchUser(int userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println(" Fetching user " + userId + "...");
try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "User{id=" + userId + ", name='Alice'}";
});
}
static CompletableFuture fetchOrders(String user) {
return CompletableFuture.supplyAsync(() -> {
System.out.println(" Fetching orders for " + user + "...");
try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "Orders[{id=101, item='Laptop'}, {id=102, item='Mouse'}]";
});
}
static CompletableFuture fetchCreditScore(int userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println(" Fetching credit score for user " + userId + "...");
try { Thread.sleep(300); } catch (InterruptedException e) { throw new RuntimeException(e); }
return 750.0;
});
}
public static void main(String[] args) {
// === thenCompose: Sequential dependency ===
// Step 1: Fetch user -> Step 2: Fetch orders for that user
System.out.println("=== thenCompose (sequential chain) ===");
CompletableFuture orderPipeline = fetchUser(1)
.thenCompose(user -> fetchOrders(user)); // user result feeds into fetchOrders
System.out.println("Result: " + orderPipeline.join());
// Output:
// Fetching user 1...
// Fetching orders for User{id=1, name='Alice'}...
// Result: Orders[{id=101, item='Laptop'}, {id=102, item='Mouse'}]
// WHY NOT thenApply? It would create nested futures:
// CompletableFuture> nested =
// fetchUser(1).thenApply(user -> fetchOrders(user)); // WRONG! Nested future
// === thenCombine: Parallel merge ===
// Fetch user data AND credit score simultaneously, then combine
System.out.println("\n=== thenCombine (parallel merge) ===");
CompletableFuture userFuture = fetchUser(1);
CompletableFuture creditFuture = fetchCreditScore(1);
CompletableFuture combined = userFuture.thenCombine(
creditFuture,
(user, creditScore) -> user + " | Credit Score: " + creditScore
);
System.out.println("Combined: " + combined.join());
// Both ran in parallel! Total time is ~300ms, not 200+300=500ms
// Output: Combined: User{id=1, name='Alice'} | Credit Score: 750.0
// === Complex chain: compose + combine ===
System.out.println("\n=== Complex Pipeline ===");
CompletableFuture fullReport = fetchUser(1)
.thenCompose(user -> {
// After getting user, fetch orders AND credit score in parallel
CompletableFuture orders = fetchOrders(user);
CompletableFuture credit = fetchCreditScore(1);
// Combine both results
return orders.thenCombine(credit, (o, c) ->
"Report: " + user + "\n " + o + "\n Credit: " + c);
});
System.out.println(fullReport.join());
// Output:
// Report: User{id=1, name='Alice'}
// Orders[{id=101, item='Laptop'}, {id=102, item='Mouse'}]
// Credit: 750.0
}
}
When you need to run many async operations in parallel and wait for all (or any) of them to complete, use allOf() and anyOf().
CompletableFuture.allOf() takes a varargs of futures and returns a CompletableFuture<Void> that completes when all input futures complete. It does not directly give you the results — you have to extract them from the individual futures.
CompletableFuture.anyOf() returns a CompletableFuture<Object> that completes as soon as any one of the input futures completes. Useful for racing multiple data sources.
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class CombiningFutures {
public static void main(String[] args) {
// === allOf: Parallel fetch multiple resources ===
System.out.println("=== allOf: Wait for ALL ===");
List urls = List.of(
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
"https://api.example.com/reviews"
);
// Create a future for each URL
List> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() -> {
// Simulate API call with varying delays
int delay = 100 + new Random().nextInt(400);
try { Thread.sleep(delay); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "Response from " + url.substring(url.lastIndexOf('/') + 1)
+ " (" + delay + "ms)";
}))
.collect(Collectors.toList());
// Wait for ALL to complete
long start = System.currentTimeMillis();
CompletableFuture allDone = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// Collect all results
CompletableFuture> allResults = allDone.thenApply(v ->
futures.stream()
.map(CompletableFuture::join) // Safe -- all are already complete
.collect(Collectors.toList())
);
List results = allResults.join();
long elapsed = System.currentTimeMillis() - start;
results.forEach(r -> System.out.println(" " + r));
System.out.println("Total time: " + elapsed + "ms (parallel, not sequential!)");
// All 4 API calls ran simultaneously
// Total time is ~max(individual times), not sum
// === anyOf: Race multiple sources ===
System.out.println("\n=== anyOf: First to finish ===");
CompletableFuture primary = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(300); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "Primary DB: user data";
});
CompletableFuture replica = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "Replica DB: user data";
});
CompletableFuture cache = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(50); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "Cache: user data";
});
CompletableFuture
Exceptions in async pipelines are tricky because they happen on background threads. CompletableFuture provides three methods for handling errors, each with a different purpose:
| Method | Receives | Returns | Purpose |
|---|---|---|---|
exceptionally(Function) |
Exception only | Recovery value | Provide a fallback when error occurs |
handle(BiFunction) |
Result AND exception | New result | Handle both success and failure in one place |
whenComplete(BiConsumer) |
Result AND exception | Same result (no transform) | Logging, cleanup, side effects |
Key rule: If an exception occurs in a CompletableFuture and is not handled, it propagates down the chain. Every subsequent thenApply(), thenAccept(), etc. is skipped until an exception handler is encountered.
import java.util.concurrent.CompletableFuture;
public class ExceptionHandling {
public static void main(String[] args) {
// === exceptionally: Provide a fallback ===
System.out.println("=== exceptionally ===");
CompletableFuture withFallback = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("Database connection failed!");
return "data";
})
.exceptionally(ex -> {
System.out.println("Error: " + ex.getMessage());
return "default data"; // Recovery value
});
System.out.println("Result: " + withFallback.join());
// Output:
// Error: Database connection failed!
// Result: default data
// === handle: Both success and failure ===
System.out.println("\n=== handle (success case) ===");
CompletableFuture handled = CompletableFuture
.supplyAsync(() -> "real data")
.handle((result, ex) -> {
if (ex != null) {
System.out.println("Failed: " + ex.getMessage());
return "fallback";
}
System.out.println("Success: " + result);
return result.toUpperCase();
});
System.out.println("Result: " + handled.join());
// Output:
// Success: real data
// Result: REAL DATA
System.out.println("\n=== handle (failure case) ===");
CompletableFuture handleError = CompletableFuture
.supplyAsync(() -> {
throw new RuntimeException("Network timeout");
})
.handle((result, ex) -> {
if (ex != null) {
System.out.println("Failed: " + ex.getMessage());
return "cached result";
}
return result;
});
System.out.println("Result: " + handleError.join());
// Output:
// Failed: Network timeout
// Result: cached result
// === whenComplete: Logging/cleanup (does NOT change result) ===
System.out.println("\n=== whenComplete ===");
CompletableFuture logged = CompletableFuture
.supplyAsync(() -> "important data")
.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("LOG: Operation failed - " + ex.getMessage());
} else {
System.out.println("LOG: Operation succeeded - " + result);
}
// NOTE: Cannot change the result here!
});
System.out.println("Result: " + logged.join());
// Output:
// LOG: Operation succeeded - important data
// Result: important data
// === Exception propagation ===
System.out.println("\n=== Exception Propagation ===");
CompletableFuture chain = CompletableFuture
.supplyAsync(() -> "data")
.thenApply(s -> {
throw new RuntimeException("Step 2 failed");
})
.thenApply(s -> {
System.out.println("This is SKIPPED");
return "never reached";
})
.thenApply(s -> {
System.out.println("This is ALSO SKIPPED");
return "never reached";
})
.exceptionally(ex -> {
System.out.println("Caught at the end: " + ex.getMessage());
return "recovered";
});
System.out.println("Result: " + chain.join());
// Output:
// Caught at the end: java.lang.RuntimeException: Step 2 failed
// Result: recovered
}
}
Every callback method in CompletableFuture has three variants:
| Variant | Thread Used | When to Use |
|---|---|---|
thenApply(fn) |
Same thread that completed the previous stage (or the caller thread if already complete) | Fast, CPU-light transformations |
thenApplyAsync(fn) |
ForkJoinPool.commonPool() |
CPU-intensive transformations |
thenApplyAsync(fn, executor) |
Your custom executor | I/O-bound operations (DB, HTTP) |
The same pattern applies to thenAccept/thenAcceptAsync, thenRun/thenRunAsync, thenCompose/thenComposeAsync, thenCombine/thenCombineAsync, etc.
Rule of thumb:
Async with a custom executor for anything that involves I/O or takes more than a few milliseconds.Async) for blocking I/O — it has limited threads and is shared across your entire application.import java.util.concurrent.*;
public class AsyncVariants {
public static void main(String[] args) {
// Custom executor for I/O work
ExecutorService ioPool = Executors.newFixedThreadPool(4, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("io-" + t.getId());
return t;
});
// thenApply -- runs on completing thread (fast)
CompletableFuture sync = CompletableFuture
.supplyAsync(() -> {
System.out.println("Produce on: " + Thread.currentThread().getName());
return "data";
})
.thenApply(s -> {
System.out.println("thenApply on: " + Thread.currentThread().getName());
return s.toUpperCase(); // Quick transformation, same thread
});
sync.join();
// Both likely run on: ForkJoinPool.commonPool-worker-1
System.out.println();
// thenApplyAsync -- runs on common pool (different thread)
CompletableFuture async = CompletableFuture
.supplyAsync(() -> {
System.out.println("Produce on: " + Thread.currentThread().getName());
return "data";
})
.thenApplyAsync(s -> {
System.out.println("thenApplyAsync on: " + Thread.currentThread().getName());
return s.toUpperCase(); // May run on a different thread
});
async.join();
System.out.println();
// thenApplyAsync with executor -- runs on YOUR thread pool
CompletableFuture customAsync = CompletableFuture
.supplyAsync(() -> {
System.out.println("Produce on: " + Thread.currentThread().getName());
return "data";
})
.thenApplyAsync(s -> {
System.out.println("Custom async on: " + Thread.currentThread().getName());
// Simulate I/O (database call)
try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "DB result for: " + s;
}, ioPool);
System.out.println("Result: " + customAsync.join());
// Output: Custom async on: io-25
// Result: DB result for: data
ioPool.shutdown();
}
}
These patterns solve common problems you will encounter in production code.
When you need data from multiple independent sources, fetch them all in parallel instead of sequentially. This can dramatically reduce response times.
import java.util.concurrent.*;
import java.util.*;
import java.util.stream.*;
public class ParallelApiCalls {
static ExecutorService httpPool = Executors.newFixedThreadPool(10);
// Simulated API calls
static CompletableFuture
In production, you cannot wait forever for an async operation to complete. You need timeouts.
import java.util.concurrent.*;
public class TimeoutHandling {
public static void main(String[] args) {
// === Java 9+: orTimeout() and completeOnTimeout() ===
// orTimeout: Fails with TimeoutException if not complete in time
CompletableFuture withTimeout = CompletableFuture
.supplyAsync(() -> {
try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "slow result";
})
.orTimeout(1, TimeUnit.SECONDS)
.exceptionally(ex -> {
System.out.println("Timed out: " + ex.getMessage());
return "timeout fallback";
});
System.out.println("orTimeout result: " + withTimeout.join());
// Output:
// Timed out: java.util.concurrent.TimeoutException
// orTimeout result: timeout fallback
// completeOnTimeout: Provides a default value on timeout (no exception)
CompletableFuture withDefault = CompletableFuture
.supplyAsync(() -> {
try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "slow result";
})
.completeOnTimeout("default value", 1, TimeUnit.SECONDS);
System.out.println("completeOnTimeout: " + withDefault.join());
// Output: completeOnTimeout: default value
// === Java 8 compatible: Manual timeout ===
System.out.println("\n=== Java 8 Timeout Pattern ===");
CompletableFuture java8Timeout = timeoutAfter(
CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "slow";
}),
1, TimeUnit.SECONDS,
"java8 fallback"
);
System.out.println("Java 8 timeout: " + java8Timeout.join());
// Output: Java 8 timeout: java8 fallback
}
// Java 8 compatible timeout helper
static CompletableFuture timeoutAfter(
CompletableFuture future, long timeout, TimeUnit unit, T fallback) {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
CompletableFuture timeoutFuture = new CompletableFuture<>();
scheduler.schedule(() -> timeoutFuture.complete(fallback), timeout, unit);
return CompletableFuture.anyOf(future, timeoutFuture)
.thenApply(result -> {
scheduler.shutdown();
@SuppressWarnings("unchecked")
T typed = (T) result;
return typed;
});
}
}
For transient failures (network blips, temporary unavailability), retrying the operation automatically is a common pattern.
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class RetryPattern {
static AtomicInteger callCount = new AtomicInteger(0);
// Simulated flaky service: fails first 2 times, succeeds on 3rd
static CompletableFuture flakyService() {
return CompletableFuture.supplyAsync(() -> {
int attempt = callCount.incrementAndGet();
System.out.println(" Attempt " + attempt + " on " + Thread.currentThread().getName());
if (attempt <= 2) {
throw new RuntimeException("Service unavailable (attempt " + attempt + ")");
}
return "Success on attempt " + attempt;
});
}
// Retry helper
static CompletableFuture retry(
java.util.function.Supplier> supplier,
int maxRetries,
long delayMs) {
CompletableFuture future = supplier.get();
for (int i = 0; i < maxRetries; i++) {
final int attempt = i + 1;
future = future.handle((result, ex) -> {
if (ex == null) {
return CompletableFuture.completedFuture(result);
}
System.out.println(" Retry " + attempt + ": " + ex.getMessage());
try { Thread.sleep(delayMs); } catch (InterruptedException e) { throw new RuntimeException(e); }
return supplier.get();
}).thenCompose(f -> f);
}
return future;
}
public static void main(String[] args) {
System.out.println("=== Retry Pattern ===");
CompletableFuture result = retry(
RetryPattern::flakyService,
3, // max retries
500 // delay between retries in ms
);
System.out.println("Final result: " + result.join());
// Output:
// Attempt 1 on ForkJoinPool.commonPool-worker-1
// Retry 1: java.lang.RuntimeException: Service unavailable (attempt 1)
// Attempt 2 on ForkJoinPool.commonPool-worker-2
// Retry 2: java.lang.RuntimeException: Service unavailable (attempt 2)
// Attempt 3 on ForkJoinPool.commonPool-worker-1
// Final result: Success on attempt 3
}
}
| # | Practice | Do | Do Not |
|---|---|---|---|
| 1 | Always handle exceptions | Use exceptionally() or handle() on every chain |
Let exceptions silently disappear |
| 2 | Use custom executors for I/O | supplyAsync(fn, ioPool) |
Use common pool for DB/HTTP calls |
| 3 | Prefer thenCompose over thenApply | thenCompose() when calling another async method |
thenApply() creating nested futures |
| 4 | Use join() over get() | future.join() — throws unchecked exception |
future.get() — forces try/catch for checked exception |
| 5 | Add timeouts | orTimeout() or completeOnTimeout() |
Wait forever for a result |
| 6 | Name your threads | Custom ThreadFactory with meaningful names | Debug with “pool-3-thread-7” names |
| 7 | Use allOf for parallel batch operations | allOf(futures) then collect results |
Sequential loop calling join() on each |
import java.util.concurrent.*;
public class BestPracticesDemo {
public static void main(String[] args) {
// GOOD: Custom executor for I/O work
ExecutorService ioPool = Executors.newFixedThreadPool(20, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("io-worker-" + t.getId());
return t;
});
// GOOD: Always handle exceptions
CompletableFuture safe = CompletableFuture
.supplyAsync(() -> fetchFromDatabase("user-123"), ioPool)
.thenApply(data -> process(data))
.exceptionally(ex -> {
System.err.println("Pipeline failed: " + ex.getMessage());
return "fallback";
});
System.out.println("Safe result: " + safe.join());
// GOOD: Use join() instead of get()
String result = safe.join(); // Throws CompletionException (unchecked)
// NOT: safe.get(); // Throws ExecutionException (checked) -- forces try/catch
// GOOD: Use thenCompose for async chains
CompletableFuture composed = CompletableFuture
.supplyAsync(() -> "user-123", ioPool)
.thenCompose(id -> fetchAsync(id)) // Returns CompletableFuture
.thenCompose(user -> enrichAsync(user)) // Chain of async calls
.exceptionally(ex -> "error: " + ex.getMessage());
System.out.println("Composed: " + composed.join());
// BAD: Sequential blocking -- defeats the purpose!
// for (CompletableFuture f : futures) {
// results.add(f.join()); // BLOCKS on each one sequentially!
// }
// GOOD: Parallel with allOf
// CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
// .thenApply(v -> futures.stream()
// .map(CompletableFuture::join)
// .collect(Collectors.toList()));
ioPool.shutdown();
}
static String fetchFromDatabase(String id) {
return "data for " + id;
}
static String process(String data) {
return "processed: " + data;
}
static CompletableFuture fetchAsync(String id) {
return CompletableFuture.completedFuture("User{" + id + "}");
}
static CompletableFuture enrichAsync(String user) {
return CompletableFuture.completedFuture("Enriched " + user);
}
}
This example demonstrates a realistic e-commerce order processing pipeline that uses CompletableFuture to parallelize inventory checks, payment processing, and notification delivery.
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class OrderProcessingSystem {
// Thread pool for I/O operations
static final ExecutorService IO_POOL = Executors.newFixedThreadPool(10, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("io-worker-" + t.getId());
return t;
});
// --- Domain classes ---
record Order(String orderId, String customerId, List items) {}
record OrderItem(String productId, String name, double price, int quantity) {}
record InventoryResult(String productId, boolean available, int stock) {}
record PaymentResult(String transactionId, boolean success, String message) {}
record ShippingResult(String trackingNumber, String estimatedDelivery) {}
// --- Service simulations ---
static CompletableFuture checkInventory(OrderItem item) {
return CompletableFuture.supplyAsync(() -> {
sleep(100 + new Random().nextInt(200));
boolean available = new Random().nextInt(10) > 1; // 90% available
int stock = available ? 10 + new Random().nextInt(90) : 0;
System.out.printf(" [Inventory] %s: %s (stock: %d)%n",
item.name(), available ? "AVAILABLE" : "OUT OF STOCK", stock);
return new InventoryResult(item.productId(), available, stock);
}, IO_POOL);
}
static CompletableFuture processPayment(String customerId, double total) {
return CompletableFuture.supplyAsync(() -> {
sleep(300);
String txnId = "TXN-" + UUID.randomUUID().toString().substring(0, 8).toUpperCase();
boolean success = new Random().nextInt(10) > 0; // 90% success
String message = success ? "Payment approved" : "Insufficient funds";
System.out.printf(" [Payment] Customer %s, $%.2f: %s (%s)%n",
customerId, total, message, txnId);
if (!success) {
throw new RuntimeException("Payment failed: " + message);
}
return new PaymentResult(txnId, true, message);
}, IO_POOL);
}
static CompletableFuture createShipment(Order order) {
return CompletableFuture.supplyAsync(() -> {
sleep(200);
String tracking = "SHIP-" + UUID.randomUUID().toString().substring(0, 8).toUpperCase();
System.out.printf(" [Shipping] Order %s: tracking %s%n", order.orderId(), tracking);
return new ShippingResult(tracking, "3-5 business days");
}, IO_POOL);
}
static CompletableFuture sendEmail(String to, String subject) {
return CompletableFuture.runAsync(() -> {
sleep(100);
System.out.printf(" [Email] To: %s | Subject: %s%n", to, subject);
}, IO_POOL);
}
static CompletableFuture sendSms(String to, String message) {
return CompletableFuture.runAsync(() -> {
sleep(50);
System.out.printf(" [SMS] To: %s | Message: %s%n", to, message);
}, IO_POOL);
}
// --- Main processing pipeline ---
static CompletableFuture processOrder(Order order) {
System.out.println("\nProcessing order: " + order.orderId());
long startTime = System.currentTimeMillis();
// Step 1: Check inventory for ALL items in PARALLEL
System.out.println("Step 1: Checking inventory...");
List> inventoryChecks = order.items().stream()
.map(OrderProcessingSystem::checkInventory)
.collect(Collectors.toList());
CompletableFuture> allInventory = CompletableFuture
.allOf(inventoryChecks.toArray(new CompletableFuture[0]))
.thenApply(v -> inventoryChecks.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
// Step 2: Verify all items available, then process payment
return allInventory
.thenCompose(inventoryResults -> {
// Check if any item is out of stock
List outOfStock = inventoryResults.stream()
.filter(r -> !r.available())
.map(InventoryResult::productId)
.collect(Collectors.toList());
if (!outOfStock.isEmpty()) {
CompletableFuture failed = new CompletableFuture<>();
failed.completeExceptionally(
new RuntimeException("Items out of stock: " + outOfStock));
return failed;
}
// Calculate total
double total = order.items().stream()
.mapToDouble(item -> item.price() * item.quantity())
.sum();
System.out.println("Step 2: Processing payment ($" + String.format("%.2f", total) + ")...");
return processPayment(order.customerId(), total);
})
// Step 3: Create shipment
.thenCompose(payment -> {
System.out.println("Step 3: Creating shipment...");
return createShipment(order)
.thenApply(shipping -> Map.of(
"payment", payment,
"shipping", shipping
));
})
// Step 4: Send notifications in PARALLEL
.thenCompose(results -> {
System.out.println("Step 4: Sending notifications...");
PaymentResult payment = (PaymentResult) results.get("payment");
ShippingResult shipping = (ShippingResult) results.get("shipping");
CompletableFuture email = sendEmail(
order.customerId() + "@example.com",
"Order " + order.orderId() + " confirmed! Tracking: " + shipping.trackingNumber()
);
CompletableFuture sms = sendSms(
order.customerId(),
"Order confirmed! Track: " + shipping.trackingNumber()
);
return CompletableFuture.allOf(email, sms)
.thenApply(v -> String.format(
"Order %s COMPLETE | Payment: %s | Tracking: %s | Delivery: %s",
order.orderId(), payment.transactionId(),
shipping.trackingNumber(), shipping.estimatedDelivery()
));
})
// Handle errors
.exceptionally(ex -> {
String error = "Order " + order.orderId() + " FAILED: " + ex.getMessage();
System.out.println(" [ERROR] " + error);
// Send failure notification
sendEmail(order.customerId() + "@example.com",
"Order " + order.orderId() + " could not be processed").join();
return error;
})
.whenComplete((result, ex) -> {
long elapsed = System.currentTimeMillis() - startTime;
System.out.println(" [Timer] Order " + order.orderId() + " took " + elapsed + "ms");
});
}
public static void main(String[] args) {
System.out.println("====================================");
System.out.println(" Order Processing System");
System.out.println("====================================");
// Create sample orders
Order order1 = new Order("ORD-001", "CUST-100", List.of(
new OrderItem("PROD-1", "Laptop", 999.99, 1),
new OrderItem("PROD-2", "Mouse", 29.99, 2),
new OrderItem("PROD-3", "Keyboard", 79.99, 1)
));
Order order2 = new Order("ORD-002", "CUST-200", List.of(
new OrderItem("PROD-4", "Monitor", 399.99, 1),
new OrderItem("PROD-5", "Headset", 149.99, 1)
));
Order order3 = new Order("ORD-003", "CUST-300", List.of(
new OrderItem("PROD-6", "Webcam", 89.99, 1)
));
// Process ALL orders in PARALLEL
List orders = List.of(order1, order2, order3);
long totalStart = System.currentTimeMillis();
List> orderFutures = orders.stream()
.map(OrderProcessingSystem::processOrder)
.collect(Collectors.toList());
// Wait for all orders to complete
CompletableFuture.allOf(orderFutures.toArray(new CompletableFuture[0])).join();
// Print final results
System.out.println("\n====================================");
System.out.println(" FINAL RESULTS");
System.out.println("====================================");
for (CompletableFuture future : orderFutures) {
System.out.println(" " + future.join());
}
long totalElapsed = System.currentTimeMillis() - totalStart;
System.out.println("\nTotal processing time: " + totalElapsed + "ms");
System.out.println("(All " + orders.size() + " orders processed in parallel)");
// Summary of concepts used
System.out.println("\n====================================");
System.out.println(" Concepts Demonstrated");
System.out.println("====================================");
System.out.println(" 1. supplyAsync() - Async computation with result");
System.out.println(" 2. runAsync() - Fire-and-forget async tasks");
System.out.println(" 3. Custom executor - Dedicated I/O thread pool");
System.out.println(" 4. thenApply() - Transform results");
System.out.println(" 5. thenCompose() - Chain dependent async operations");
System.out.println(" 6. thenCombine() - (shown in earlier examples)");
System.out.println(" 7. allOf() - Wait for all parallel tasks");
System.out.println(" 8. exceptionally() - Error recovery with fallback");
System.out.println(" 9. whenComplete() - Logging/timing side effects");
System.out.println(" 10. completeExceptionally() - Manual failure");
System.out.println(" 11. completedFuture() - (shown in earlier examples)");
System.out.println(" 12. Parallel streams - Collecting future results");
IO_POOL.shutdown();
}
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); }
}
}
// Sample Output:
// ====================================
// Order Processing System
// ====================================
//
// Processing order: ORD-001
// Step 1: Checking inventory...
// Processing order: ORD-002
// Step 1: Checking inventory...
// Processing order: ORD-003
// Step 1: Checking inventory...
// [Inventory] Webcam: AVAILABLE (stock: 45)
// [Inventory] Mouse: AVAILABLE (stock: 72)
// [Inventory] Keyboard: AVAILABLE (stock: 38)
// [Inventory] Headset: AVAILABLE (stock: 91)
// [Inventory] Monitor: AVAILABLE (stock: 23)
// [Inventory] Laptop: AVAILABLE (stock: 56)
// Step 2: Processing payment ($89.99)...
// Step 2: Processing payment ($549.98)...
// Step 2: Processing payment ($1139.96)...
// [Payment] Customer CUST-300, $89.99: Payment approved (TXN-A1B2C3D4)
// Step 3: Creating shipment...
// [Payment] Customer CUST-200, $549.98: Payment approved (TXN-E5F6G7H8)
// Step 3: Creating shipment...
// [Payment] Customer CUST-100, $1139.96: Payment approved (TXN-I9J0K1L2)
// Step 3: Creating shipment...
// [Shipping] Order ORD-003: tracking SHIP-M3N4O5P6
// Step 4: Sending notifications...
// [Shipping] Order ORD-002: tracking SHIP-Q7R8S9T0
// Step 4: Sending notifications...
// [SMS] To: CUST-300 | Message: Order confirmed! Track: SHIP-M3N4O5P6
// [Email] To: CUST-300@example.com | Subject: Order ORD-003 confirmed!
// [Shipping] Order ORD-001: tracking SHIP-U1V2W3X4
// Step 4: Sending notifications...
// [Timer] Order ORD-003 took 652ms
// [SMS] To: CUST-200 | Message: Order confirmed! Track: SHIP-Q7R8S9T0
// [Email] To: CUST-200@example.com | Subject: Order ORD-002 confirmed!
// [Timer] Order ORD-002 took 785ms
// [SMS] To: CUST-100 | Message: Order confirmed! Track: SHIP-U1V2W3X4
// [Email] To: CUST-100@example.com | Subject: Order ORD-001 confirmed!
// [Timer] Order ORD-001 took 892ms
//
// ====================================
// FINAL RESULTS
// ====================================
// Order ORD-001 COMPLETE | Payment: TXN-I9J0K1L2 | Tracking: SHIP-U1V2W3X4
// Order ORD-002 COMPLETE | Payment: TXN-E5F6G7H8 | Tracking: SHIP-Q7R8S9T0
// Order ORD-003 COMPLETE | Payment: TXN-A1B2C3D4 | Tracking: SHIP-M3N4O5P6
//
// Total processing time: 895ms
// (All 3 orders processed in parallel)
| Category | Method | Description |
|---|---|---|
| Creating | supplyAsync(supplier) |
Run async, return result |
runAsync(runnable) |
Run async, no result | |
completedFuture(value) |
Already-complete future | |
new CompletableFuture() |
Manually completable | |
| Transforming | thenApply(fn) |
Transform result (like map) |
thenAccept(consumer) |
Consume result (no return) | |
thenRun(runnable) |
Run after completion (ignores result) | |
| Composing | thenCompose(fn) |
Chain async operations (flatMap) |
thenCombine(other, fn) |
Merge two parallel results | |
| Combining | allOf(futures...) |
Wait for all to complete |
anyOf(futures...) |
First to complete wins | |
| Errors | exceptionally(fn) |
Provide fallback on error |
handle(bifn) |
Handle both success and error | |
whenComplete(biconsumer) |
Inspect result/error (no transform) | |
| Getting Result | join() |
Block and get result (unchecked exception) |
get(timeout, unit) |
Block with timeout (checked exception) | |
| Timeout (9+) | orTimeout(time, unit) |
Fail with TimeoutException |
completeOnTimeout(val, time, unit) |
Use default value on timeout |