CompletableFuture

1. What is CompletableFuture?

Think of ordering coffee at a busy cafe. You place your order, get a receipt with a number, and step aside. You are free to check your phone, chat with a friend, or grab a seat. When your coffee is ready, the barista calls your number. You did not stand at the counter blocking everyone behind you — you continued living your life while the work happened in the background.

CompletableFuture is that receipt. It is a promise that a value will be available at some point in the future. Introduced in Java 8 as part of java.util.concurrent, it is the most powerful tool Java provides for writing asynchronous, non-blocking code.

The Problem with Future

Java 5 introduced Future<T>, which represents an asynchronous computation. But Future has a fatal flaw: the only way to get the result is to call get(), which blocks the calling thread. You cannot attach a callback. You cannot chain operations. You cannot combine multiple futures. Calling get() defeats the purpose of async programming.

What CompletableFuture Adds

Feature Future CompletableFuture
Get result get() — blocks get(), join(), or non-blocking callbacks
Attach callback Not supported thenApply(), thenAccept(), thenRun()
Chain operations Not supported thenCompose(), thenApply()
Combine futures Not supported thenCombine(), allOf(), anyOf()
Exception handling ExecutionException wrapper exceptionally(), handle(), whenComplete()
Manually complete Not supported complete(), completeExceptionally()
import java.util.concurrent.*;

public class FutureVsCompletableFuture {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // OLD WAY: Future -- blocks on get()
        System.out.println("=== Future (blocking) ===");
        Future future = executor.submit(() -> {
            Thread.sleep(500);
            return "Data from database";
        });
        // This line BLOCKS until the result is ready
        String result = future.get();
        System.out.println("Got: " + result);

        // NEW WAY: CompletableFuture -- non-blocking
        System.out.println("\n=== CompletableFuture (non-blocking) ===");
        CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "Data from database";
        });

        // Attach a callback -- runs when data is ready, does NOT block
        cf.thenApply(data -> data.toUpperCase())
          .thenAccept(data -> System.out.println("Got: " + data));

        System.out.println("Main thread is free!");
        Thread.sleep(1000);
        executor.shutdown();

        // Output:
        // === Future (blocking) ===
        // Got: Data from database
        //
        // === CompletableFuture (non-blocking) ===
        // Main thread is free!
        // Got: DATA FROM DATABASE
    }
}

2. Creating CompletableFuture

There are four main ways to create a CompletableFuture, each for a different situation.

2.1 supplyAsync() — Returns a Value

Use supplyAsync() when your async operation produces a result. It takes a Supplier<T> and runs it on a background thread from ForkJoinPool.commonPool().

2.2 runAsync() — No Return Value

Use runAsync() for fire-and-forget operations that do not return a result: logging, sending notifications, writing to a file. It takes a Runnable and returns CompletableFuture<Void>.

2.3 completedFuture() — Already Done

Use completedFuture() to create a CompletableFuture that is already completed with a known value. This is useful for caching, default values, and testing.

2.4 Custom Executor

By default, supplyAsync() and runAsync() use ForkJoinPool.commonPool(). For I/O-bound operations (database calls, HTTP requests, file reads), you should provide a custom executor to avoid starving the common pool.

import java.util.concurrent.*;

public class CreatingCompletableFuture {
    public static void main(String[] args) throws Exception {

        // 1. supplyAsync -- produces a result
        CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Fetching user on: " + Thread.currentThread().getName());
            return "User{id=1, name='Alice'}";
        });
        System.out.println("User: " + userFuture.join());
        // Output:
        // Fetching user on: ForkJoinPool.commonPool-worker-1
        // User: User{id=1, name='Alice'}

        // 2. runAsync -- no result (fire-and-forget)
        CompletableFuture logFuture = CompletableFuture.runAsync(() -> {
            System.out.println("Logging on: " + Thread.currentThread().getName());
            // Write to log file, send notification, etc.
        });
        logFuture.join(); // Wait for completion (returns null)

        // 3. completedFuture -- already has a value
        CompletableFuture cached = CompletableFuture.completedFuture("Cached Result");
        System.out.println("Cached: " + cached.join());
        // Output: Cached: Cached Result
        // No thread was created -- the value was immediately available

        // 4. Custom executor -- for I/O operations
        ExecutorService ioPool = Executors.newFixedThreadPool(10, r -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("io-pool-" + t.getId());
            return t;
        });

        CompletableFuture dbResult = CompletableFuture.supplyAsync(() -> {
            System.out.println("DB query on: " + Thread.currentThread().getName());
            try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "Query result";
        }, ioPool); // Pass custom executor as second argument

        System.out.println("DB: " + dbResult.join());
        // Output:
        // DB query on: io-pool-25
        // DB: Query result

        // 5. Manually completing
        CompletableFuture manual = new CompletableFuture<>();
        // Some other thread or callback will complete this later
        new Thread(() -> {
            try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); }
            manual.complete("Manually completed!");
        }).start();
        System.out.println("Manual: " + manual.join());
        // Output: Manual: Manually completed!

        ioPool.shutdown();
    }
}

Creation Methods Summary

Method Returns Input Thread Pool Use Case
supplyAsync(supplier) CompletableFuture<T> Supplier<T> Common pool Async computation with result
supplyAsync(supplier, executor) CompletableFuture<T> Supplier<T> Custom executor I/O-bound operations
runAsync(runnable) CompletableFuture<Void> Runnable Common pool Fire-and-forget side effects
completedFuture(value) CompletableFuture<T> T None (immediate) Caching, defaults, testing
new CompletableFuture() CompletableFuture<T> None None (manual) Bridging callback APIs

3. Transforming Results

Once you have a CompletableFuture, you can transform its result when it arrives using three callback methods. The key difference is what each callback accepts and returns.

Method Input Returns Analogy
thenApply(Function) Previous result Transformed result Stream’s map()
thenAccept(Consumer) Previous result Void Stream’s forEach()
thenRun(Runnable) Nothing Void “Then do this, regardless of result”
import java.util.concurrent.CompletableFuture;

public class TransformingResults {
    public static void main(String[] args) {

        // thenApply -- transform the result (like map)
        System.out.println("=== thenApply ===");
        CompletableFuture greeting = CompletableFuture
            .supplyAsync(() -> "hello")
            .thenApply(s -> s + " world")       // "hello" -> "hello world"
            .thenApply(String::toUpperCase);     // "hello world" -> "HELLO WORLD"

        System.out.println(greeting.join());
        // Output: HELLO WORLD

        // Chain of transformations
        CompletableFuture wordCount = CompletableFuture
            .supplyAsync(() -> "The quick brown fox jumps over the lazy dog")
            .thenApply(String::trim)
            .thenApply(s -> s.split("\\s+"))
            .thenApply(words -> words.length);

        System.out.println("Word count: " + wordCount.join());
        // Output: Word count: 9

        // thenAccept -- consume the result (no return value)
        System.out.println("\n=== thenAccept ===");
        CompletableFuture printFuture = CompletableFuture
            .supplyAsync(() -> "User{name='Alice', age=30}")
            .thenAccept(user -> System.out.println("Received: " + user));
        printFuture.join();
        // Output: Received: User{name='Alice', age=30}

        // thenRun -- run something after completion (ignores result)
        System.out.println("\n=== thenRun ===");
        CompletableFuture pipeline = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("Step 1: Fetching data...");
                return "raw data";
            })
            .thenApply(data -> {
                System.out.println("Step 2: Processing: " + data);
                return "processed " + data;
            })
            .thenAccept(result -> {
                System.out.println("Step 3: Saving: " + result);
            })
            .thenRun(() -> {
                System.out.println("Step 4: All done! (cleanup)");
            });

        pipeline.join();
        // Output:
        // Step 1: Fetching data...
        // Step 2: Processing: raw data
        // Step 3: Saving: processed raw data
        // Step 4: All done! (cleanup)
    }
}

4. Composing Futures

Composing means chaining async operations where each step depends on the result of the previous one. There are two key methods:

  • thenCompose() — Use when your transformation function itself returns a CompletableFuture. This is the async equivalent of flatMap() in streams. Without it, you end up with CompletableFuture<CompletableFuture<T>> — a future wrapped inside a future.
  • thenCombine() — Use when you want to run two independent futures in parallel and combine their results when both complete.

thenCompose vs thenApply

Scenario Use Why
Transform: T -> U thenApply() Synchronous transformation, returns a plain value
Chain: T -> CompletableFuture<U> thenCompose() Async step that itself returns a future (avoids nesting)
Merge two independent results thenCombine() Both futures run in parallel, combine when both finish
import java.util.concurrent.CompletableFuture;

public class ComposingFutures {

    // Simulated async service methods
    static CompletableFuture fetchUser(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("  Fetching user " + userId + "...");
            try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "User{id=" + userId + ", name='Alice'}";
        });
    }

    static CompletableFuture fetchOrders(String user) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("  Fetching orders for " + user + "...");
            try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "Orders[{id=101, item='Laptop'}, {id=102, item='Mouse'}]";
        });
    }

    static CompletableFuture fetchCreditScore(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("  Fetching credit score for user " + userId + "...");
            try { Thread.sleep(300); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return 750.0;
        });
    }

    public static void main(String[] args) {

        // === thenCompose: Sequential dependency ===
        // Step 1: Fetch user -> Step 2: Fetch orders for that user
        System.out.println("=== thenCompose (sequential chain) ===");

        CompletableFuture orderPipeline = fetchUser(1)
            .thenCompose(user -> fetchOrders(user)); // user result feeds into fetchOrders

        System.out.println("Result: " + orderPipeline.join());
        // Output:
        //   Fetching user 1...
        //   Fetching orders for User{id=1, name='Alice'}...
        // Result: Orders[{id=101, item='Laptop'}, {id=102, item='Mouse'}]

        // WHY NOT thenApply? It would create nested futures:
        // CompletableFuture> nested =
        //     fetchUser(1).thenApply(user -> fetchOrders(user)); // WRONG! Nested future

        // === thenCombine: Parallel merge ===
        // Fetch user data AND credit score simultaneously, then combine
        System.out.println("\n=== thenCombine (parallel merge) ===");

        CompletableFuture userFuture = fetchUser(1);
        CompletableFuture creditFuture = fetchCreditScore(1);

        CompletableFuture combined = userFuture.thenCombine(
            creditFuture,
            (user, creditScore) -> user + " | Credit Score: " + creditScore
        );

        System.out.println("Combined: " + combined.join());
        // Both ran in parallel! Total time is ~300ms, not 200+300=500ms
        // Output: Combined: User{id=1, name='Alice'} | Credit Score: 750.0

        // === Complex chain: compose + combine ===
        System.out.println("\n=== Complex Pipeline ===");

        CompletableFuture fullReport = fetchUser(1)
            .thenCompose(user -> {
                // After getting user, fetch orders AND credit score in parallel
                CompletableFuture orders = fetchOrders(user);
                CompletableFuture credit = fetchCreditScore(1);

                // Combine both results
                return orders.thenCombine(credit, (o, c) ->
                    "Report: " + user + "\n  " + o + "\n  Credit: " + c);
            });

        System.out.println(fullReport.join());
        // Output:
        // Report: User{id=1, name='Alice'}
        //   Orders[{id=101, item='Laptop'}, {id=102, item='Mouse'}]
        //   Credit: 750.0
    }
}

5. Combining Multiple Futures

When you need to run many async operations in parallel and wait for all (or any) of them to complete, use allOf() and anyOf().

allOf() — Wait for All

CompletableFuture.allOf() takes a varargs of futures and returns a CompletableFuture<Void> that completes when all input futures complete. It does not directly give you the results — you have to extract them from the individual futures.

anyOf() — First to Finish

CompletableFuture.anyOf() returns a CompletableFuture<Object> that completes as soon as any one of the input futures completes. Useful for racing multiple data sources.

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class CombiningFutures {
    public static void main(String[] args) {

        // === allOf: Parallel fetch multiple resources ===
        System.out.println("=== allOf: Wait for ALL ===");

        List urls = List.of(
            "https://api.example.com/users",
            "https://api.example.com/orders",
            "https://api.example.com/products",
            "https://api.example.com/reviews"
        );

        // Create a future for each URL
        List> futures = urls.stream()
            .map(url -> CompletableFuture.supplyAsync(() -> {
                // Simulate API call with varying delays
                int delay = 100 + new Random().nextInt(400);
                try { Thread.sleep(delay); } catch (InterruptedException e) { throw new RuntimeException(e); }
                return "Response from " + url.substring(url.lastIndexOf('/') + 1)
                    + " (" + delay + "ms)";
            }))
            .collect(Collectors.toList());

        // Wait for ALL to complete
        long start = System.currentTimeMillis();
        CompletableFuture allDone = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        );

        // Collect all results
        CompletableFuture> allResults = allDone.thenApply(v ->
            futures.stream()
                .map(CompletableFuture::join) // Safe -- all are already complete
                .collect(Collectors.toList())
        );

        List results = allResults.join();
        long elapsed = System.currentTimeMillis() - start;

        results.forEach(r -> System.out.println("  " + r));
        System.out.println("Total time: " + elapsed + "ms (parallel, not sequential!)");
        // All 4 API calls ran simultaneously
        // Total time is ~max(individual times), not sum

        // === anyOf: Race multiple sources ===
        System.out.println("\n=== anyOf: First to finish ===");

        CompletableFuture primary = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(300); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "Primary DB: user data";
        });

        CompletableFuture replica = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "Replica DB: user data";
        });

        CompletableFuture cache = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(50); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "Cache: user data";
        });

        CompletableFuture fastest = CompletableFuture.anyOf(primary, replica, cache);
        System.out.println("Fastest response: " + fastest.join());
        // Output: Fastest response: Cache: user data
        // Cache was fastest at 50ms

        // === Practical: Collecting typed results ===
        System.out.println("\n=== Collecting Results with Type Safety ===");

        CompletableFuture usersCount = CompletableFuture.supplyAsync(() -> 1500);
        CompletableFuture ordersCount = CompletableFuture.supplyAsync(() -> 3200);
        CompletableFuture productsCount = CompletableFuture.supplyAsync(() -> 850);

        CompletableFuture dashboard = usersCount
            .thenCombine(ordersCount, (users, orders) -> "Users: " + users + ", Orders: " + orders)
            .thenCombine(productsCount, (partial, products) -> partial + ", Products: " + products);

        System.out.println("Dashboard: " + dashboard.join());
        // Output: Dashboard: Users: 1500, Orders: 3200, Products: 850
    }
}

6. Exception Handling

Exceptions in async pipelines are tricky because they happen on background threads. CompletableFuture provides three methods for handling errors, each with a different purpose:

Method Receives Returns Purpose
exceptionally(Function) Exception only Recovery value Provide a fallback when error occurs
handle(BiFunction) Result AND exception New result Handle both success and failure in one place
whenComplete(BiConsumer) Result AND exception Same result (no transform) Logging, cleanup, side effects

Key rule: If an exception occurs in a CompletableFuture and is not handled, it propagates down the chain. Every subsequent thenApply(), thenAccept(), etc. is skipped until an exception handler is encountered.

import java.util.concurrent.CompletableFuture;

public class ExceptionHandling {
    public static void main(String[] args) {

        // === exceptionally: Provide a fallback ===
        System.out.println("=== exceptionally ===");

        CompletableFuture withFallback = CompletableFuture
            .supplyAsync(() -> {
                if (true) throw new RuntimeException("Database connection failed!");
                return "data";
            })
            .exceptionally(ex -> {
                System.out.println("Error: " + ex.getMessage());
                return "default data"; // Recovery value
            });

        System.out.println("Result: " + withFallback.join());
        // Output:
        // Error: Database connection failed!
        // Result: default data

        // === handle: Both success and failure ===
        System.out.println("\n=== handle (success case) ===");

        CompletableFuture handled = CompletableFuture
            .supplyAsync(() -> "real data")
            .handle((result, ex) -> {
                if (ex != null) {
                    System.out.println("Failed: " + ex.getMessage());
                    return "fallback";
                }
                System.out.println("Success: " + result);
                return result.toUpperCase();
            });
        System.out.println("Result: " + handled.join());
        // Output:
        // Success: real data
        // Result: REAL DATA

        System.out.println("\n=== handle (failure case) ===");

        CompletableFuture handleError = CompletableFuture
            .supplyAsync(() -> {
                throw new RuntimeException("Network timeout");
            })
            .handle((result, ex) -> {
                if (ex != null) {
                    System.out.println("Failed: " + ex.getMessage());
                    return "cached result";
                }
                return result;
            });
        System.out.println("Result: " + handleError.join());
        // Output:
        // Failed: Network timeout
        // Result: cached result

        // === whenComplete: Logging/cleanup (does NOT change result) ===
        System.out.println("\n=== whenComplete ===");

        CompletableFuture logged = CompletableFuture
            .supplyAsync(() -> "important data")
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    System.out.println("LOG: Operation failed - " + ex.getMessage());
                } else {
                    System.out.println("LOG: Operation succeeded - " + result);
                }
                // NOTE: Cannot change the result here!
            });
        System.out.println("Result: " + logged.join());
        // Output:
        // LOG: Operation succeeded - important data
        // Result: important data

        // === Exception propagation ===
        System.out.println("\n=== Exception Propagation ===");

        CompletableFuture chain = CompletableFuture
            .supplyAsync(() -> "data")
            .thenApply(s -> {
                throw new RuntimeException("Step 2 failed");
            })
            .thenApply(s -> {
                System.out.println("This is SKIPPED");
                return "never reached";
            })
            .thenApply(s -> {
                System.out.println("This is ALSO SKIPPED");
                return "never reached";
            })
            .exceptionally(ex -> {
                System.out.println("Caught at the end: " + ex.getMessage());
                return "recovered";
            });

        System.out.println("Result: " + chain.join());
        // Output:
        // Caught at the end: java.lang.RuntimeException: Step 2 failed
        // Result: recovered
    }
}

7. Async Variants

Every callback method in CompletableFuture has three variants:

Variant Thread Used When to Use
thenApply(fn) Same thread that completed the previous stage (or the caller thread if already complete) Fast, CPU-light transformations
thenApplyAsync(fn) ForkJoinPool.commonPool() CPU-intensive transformations
thenApplyAsync(fn, executor) Your custom executor I/O-bound operations (DB, HTTP)

The same pattern applies to thenAccept/thenAcceptAsync, thenRun/thenRunAsync, thenCompose/thenComposeAsync, thenCombine/thenCombineAsync, etc.

Rule of thumb:

  • Use the non-async version for quick transformations (string manipulation, mapping, filtering).
  • Use Async with a custom executor for anything that involves I/O or takes more than a few milliseconds.
  • Never use the common pool (default Async) for blocking I/O — it has limited threads and is shared across your entire application.
import java.util.concurrent.*;

public class AsyncVariants {
    public static void main(String[] args) {
        // Custom executor for I/O work
        ExecutorService ioPool = Executors.newFixedThreadPool(4, r -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("io-" + t.getId());
            return t;
        });

        // thenApply -- runs on completing thread (fast)
        CompletableFuture sync = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("Produce on: " + Thread.currentThread().getName());
                return "data";
            })
            .thenApply(s -> {
                System.out.println("thenApply on: " + Thread.currentThread().getName());
                return s.toUpperCase(); // Quick transformation, same thread
            });
        sync.join();
        // Both likely run on: ForkJoinPool.commonPool-worker-1

        System.out.println();

        // thenApplyAsync -- runs on common pool (different thread)
        CompletableFuture async = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("Produce on: " + Thread.currentThread().getName());
                return "data";
            })
            .thenApplyAsync(s -> {
                System.out.println("thenApplyAsync on: " + Thread.currentThread().getName());
                return s.toUpperCase(); // May run on a different thread
            });
        async.join();

        System.out.println();

        // thenApplyAsync with executor -- runs on YOUR thread pool
        CompletableFuture customAsync = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("Produce on: " + Thread.currentThread().getName());
                return "data";
            })
            .thenApplyAsync(s -> {
                System.out.println("Custom async on: " + Thread.currentThread().getName());
                // Simulate I/O (database call)
                try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); }
                return "DB result for: " + s;
            }, ioPool);

        System.out.println("Result: " + customAsync.join());
        // Output: Custom async on: io-25
        // Result: DB result for: data

        ioPool.shutdown();
    }
}

8. Real-World Patterns

These patterns solve common problems you will encounter in production code.

8.1 Parallel API Calls

When you need data from multiple independent sources, fetch them all in parallel instead of sequentially. This can dramatically reduce response times.

import java.util.concurrent.*;
import java.util.*;
import java.util.stream.*;

public class ParallelApiCalls {

    static ExecutorService httpPool = Executors.newFixedThreadPool(10);

    // Simulated API calls
    static CompletableFuture> fetchUserProfile(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(200);
            return Map.of("id", userId, "name", "Alice", "email", "alice@example.com");
        }, httpPool);
    }

    static CompletableFuture> fetchUserOrders(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(300);
            return List.of("Order-101", "Order-102", "Order-103");
        }, httpPool);
    }

    static CompletableFuture> fetchUserPreferences(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(150);
            return Map.of("theme", "dark", "language", "en", "notifications", true);
        }, httpPool);
    }

    public static void main(String[] args) {
        int userId = 1;
        long start = System.currentTimeMillis();

        // Kick off ALL requests simultaneously
        CompletableFuture> profileFuture = fetchUserProfile(userId);
        CompletableFuture> ordersFuture = fetchUserOrders(userId);
        CompletableFuture> prefsFuture = fetchUserPreferences(userId);

        // Wait for all and combine
        CompletableFuture> dashboard = CompletableFuture
            .allOf(profileFuture, ordersFuture, prefsFuture)
            .thenApply(v -> {
                Map result = new HashMap<>();
                result.put("profile", profileFuture.join());
                result.put("orders", ordersFuture.join());
                result.put("preferences", prefsFuture.join());
                return result;
            });

        Map result = dashboard.join();
        long elapsed = System.currentTimeMillis() - start;

        System.out.println("Dashboard data:");
        result.forEach((key, value) -> System.out.println("  " + key + ": " + value));
        System.out.println("Total time: " + elapsed + "ms");
        // Total time: ~300ms (max of 200, 300, 150) instead of 650ms (sequential)

        httpPool.shutdown();
    }

    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); }
    }
}

8.2 Timeout Handling

In production, you cannot wait forever for an async operation to complete. You need timeouts.

import java.util.concurrent.*;

public class TimeoutHandling {
    public static void main(String[] args) {

        // === Java 9+: orTimeout() and completeOnTimeout() ===

        // orTimeout: Fails with TimeoutException if not complete in time
        CompletableFuture withTimeout = CompletableFuture
            .supplyAsync(() -> {
                try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); }
                return "slow result";
            })
            .orTimeout(1, TimeUnit.SECONDS)
            .exceptionally(ex -> {
                System.out.println("Timed out: " + ex.getMessage());
                return "timeout fallback";
            });

        System.out.println("orTimeout result: " + withTimeout.join());
        // Output:
        // Timed out: java.util.concurrent.TimeoutException
        // orTimeout result: timeout fallback

        // completeOnTimeout: Provides a default value on timeout (no exception)
        CompletableFuture withDefault = CompletableFuture
            .supplyAsync(() -> {
                try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); }
                return "slow result";
            })
            .completeOnTimeout("default value", 1, TimeUnit.SECONDS);

        System.out.println("completeOnTimeout: " + withDefault.join());
        // Output: completeOnTimeout: default value

        // === Java 8 compatible: Manual timeout ===
        System.out.println("\n=== Java 8 Timeout Pattern ===");
        CompletableFuture java8Timeout = timeoutAfter(
            CompletableFuture.supplyAsync(() -> {
                try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); }
                return "slow";
            }),
            1, TimeUnit.SECONDS,
            "java8 fallback"
        );
        System.out.println("Java 8 timeout: " + java8Timeout.join());
        // Output: Java 8 timeout: java8 fallback
    }

    // Java 8 compatible timeout helper
    static  CompletableFuture timeoutAfter(
            CompletableFuture future, long timeout, TimeUnit unit, T fallback) {
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        CompletableFuture timeoutFuture = new CompletableFuture<>();

        scheduler.schedule(() -> timeoutFuture.complete(fallback), timeout, unit);

        return CompletableFuture.anyOf(future, timeoutFuture)
            .thenApply(result -> {
                scheduler.shutdown();
                @SuppressWarnings("unchecked")
                T typed = (T) result;
                return typed;
            });
    }
}

8.3 Retry Pattern

For transient failures (network blips, temporary unavailability), retrying the operation automatically is a common pattern.

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class RetryPattern {

    static AtomicInteger callCount = new AtomicInteger(0);

    // Simulated flaky service: fails first 2 times, succeeds on 3rd
    static CompletableFuture flakyService() {
        return CompletableFuture.supplyAsync(() -> {
            int attempt = callCount.incrementAndGet();
            System.out.println("  Attempt " + attempt + " on " + Thread.currentThread().getName());
            if (attempt <= 2) {
                throw new RuntimeException("Service unavailable (attempt " + attempt + ")");
            }
            return "Success on attempt " + attempt;
        });
    }

    // Retry helper
    static  CompletableFuture retry(
            java.util.function.Supplier> supplier,
            int maxRetries,
            long delayMs) {

        CompletableFuture future = supplier.get();

        for (int i = 0; i < maxRetries; i++) {
            final int attempt = i + 1;
            future = future.handle((result, ex) -> {
                if (ex == null) {
                    return CompletableFuture.completedFuture(result);
                }
                System.out.println("  Retry " + attempt + ": " + ex.getMessage());
                try { Thread.sleep(delayMs); } catch (InterruptedException e) { throw new RuntimeException(e); }
                return supplier.get();
            }).thenCompose(f -> f);
        }
        return future;
    }

    public static void main(String[] args) {
        System.out.println("=== Retry Pattern ===");

        CompletableFuture result = retry(
            RetryPattern::flakyService,
            3,        // max retries
            500       // delay between retries in ms
        );

        System.out.println("Final result: " + result.join());

        // Output:
        //   Attempt 1 on ForkJoinPool.commonPool-worker-1
        //   Retry 1: java.lang.RuntimeException: Service unavailable (attempt 1)
        //   Attempt 2 on ForkJoinPool.commonPool-worker-2
        //   Retry 2: java.lang.RuntimeException: Service unavailable (attempt 2)
        //   Attempt 3 on ForkJoinPool.commonPool-worker-1
        // Final result: Success on attempt 3
    }
}

9. Best Practices

# Practice Do Do Not
1 Always handle exceptions Use exceptionally() or handle() on every chain Let exceptions silently disappear
2 Use custom executors for I/O supplyAsync(fn, ioPool) Use common pool for DB/HTTP calls
3 Prefer thenCompose over thenApply thenCompose() when calling another async method thenApply() creating nested futures
4 Use join() over get() future.join() — throws unchecked exception future.get() — forces try/catch for checked exception
5 Add timeouts orTimeout() or completeOnTimeout() Wait forever for a result
6 Name your threads Custom ThreadFactory with meaningful names Debug with “pool-3-thread-7” names
7 Use allOf for parallel batch operations allOf(futures) then collect results Sequential loop calling join() on each
import java.util.concurrent.*;

public class BestPracticesDemo {
    public static void main(String[] args) {
        // GOOD: Custom executor for I/O work
        ExecutorService ioPool = Executors.newFixedThreadPool(20, r -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("io-worker-" + t.getId());
            return t;
        });

        // GOOD: Always handle exceptions
        CompletableFuture safe = CompletableFuture
            .supplyAsync(() -> fetchFromDatabase("user-123"), ioPool)
            .thenApply(data -> process(data))
            .exceptionally(ex -> {
                System.err.println("Pipeline failed: " + ex.getMessage());
                return "fallback";
            });
        System.out.println("Safe result: " + safe.join());

        // GOOD: Use join() instead of get()
        String result = safe.join(); // Throws CompletionException (unchecked)
        // NOT: safe.get(); // Throws ExecutionException (checked) -- forces try/catch

        // GOOD: Use thenCompose for async chains
        CompletableFuture composed = CompletableFuture
            .supplyAsync(() -> "user-123", ioPool)
            .thenCompose(id -> fetchAsync(id))     // Returns CompletableFuture
            .thenCompose(user -> enrichAsync(user)) // Chain of async calls
            .exceptionally(ex -> "error: " + ex.getMessage());
        System.out.println("Composed: " + composed.join());

        // BAD: Sequential blocking -- defeats the purpose!
        // for (CompletableFuture f : futures) {
        //     results.add(f.join()); // BLOCKS on each one sequentially!
        // }

        // GOOD: Parallel with allOf
        // CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        //     .thenApply(v -> futures.stream()
        //         .map(CompletableFuture::join)
        //         .collect(Collectors.toList()));

        ioPool.shutdown();
    }

    static String fetchFromDatabase(String id) {
        return "data for " + id;
    }

    static String process(String data) {
        return "processed: " + data;
    }

    static CompletableFuture fetchAsync(String id) {
        return CompletableFuture.completedFuture("User{" + id + "}");
    }

    static CompletableFuture enrichAsync(String user) {
        return CompletableFuture.completedFuture("Enriched " + user);
    }
}

10. Complete Practical Example — Parallel Order Processing System

This example demonstrates a realistic e-commerce order processing pipeline that uses CompletableFuture to parallelize inventory checks, payment processing, and notification delivery.

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class OrderProcessingSystem {

    // Thread pool for I/O operations
    static final ExecutorService IO_POOL = Executors.newFixedThreadPool(10, r -> {
        Thread t = new Thread(r);
        t.setDaemon(true);
        t.setName("io-worker-" + t.getId());
        return t;
    });

    // --- Domain classes ---
    record Order(String orderId, String customerId, List items) {}
    record OrderItem(String productId, String name, double price, int quantity) {}
    record InventoryResult(String productId, boolean available, int stock) {}
    record PaymentResult(String transactionId, boolean success, String message) {}
    record ShippingResult(String trackingNumber, String estimatedDelivery) {}

    // --- Service simulations ---

    static CompletableFuture checkInventory(OrderItem item) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(100 + new Random().nextInt(200));
            boolean available = new Random().nextInt(10) > 1; // 90% available
            int stock = available ? 10 + new Random().nextInt(90) : 0;
            System.out.printf("  [Inventory] %s: %s (stock: %d)%n",
                item.name(), available ? "AVAILABLE" : "OUT OF STOCK", stock);
            return new InventoryResult(item.productId(), available, stock);
        }, IO_POOL);
    }

    static CompletableFuture processPayment(String customerId, double total) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(300);
            String txnId = "TXN-" + UUID.randomUUID().toString().substring(0, 8).toUpperCase();
            boolean success = new Random().nextInt(10) > 0; // 90% success
            String message = success ? "Payment approved" : "Insufficient funds";
            System.out.printf("  [Payment] Customer %s, $%.2f: %s (%s)%n",
                customerId, total, message, txnId);
            if (!success) {
                throw new RuntimeException("Payment failed: " + message);
            }
            return new PaymentResult(txnId, true, message);
        }, IO_POOL);
    }

    static CompletableFuture createShipment(Order order) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(200);
            String tracking = "SHIP-" + UUID.randomUUID().toString().substring(0, 8).toUpperCase();
            System.out.printf("  [Shipping] Order %s: tracking %s%n", order.orderId(), tracking);
            return new ShippingResult(tracking, "3-5 business days");
        }, IO_POOL);
    }

    static CompletableFuture sendEmail(String to, String subject) {
        return CompletableFuture.runAsync(() -> {
            sleep(100);
            System.out.printf("  [Email] To: %s | Subject: %s%n", to, subject);
        }, IO_POOL);
    }

    static CompletableFuture sendSms(String to, String message) {
        return CompletableFuture.runAsync(() -> {
            sleep(50);
            System.out.printf("  [SMS] To: %s | Message: %s%n", to, message);
        }, IO_POOL);
    }

    // --- Main processing pipeline ---

    static CompletableFuture processOrder(Order order) {
        System.out.println("\nProcessing order: " + order.orderId());
        long startTime = System.currentTimeMillis();

        // Step 1: Check inventory for ALL items in PARALLEL
        System.out.println("Step 1: Checking inventory...");
        List> inventoryChecks = order.items().stream()
            .map(OrderProcessingSystem::checkInventory)
            .collect(Collectors.toList());

        CompletableFuture> allInventory = CompletableFuture
            .allOf(inventoryChecks.toArray(new CompletableFuture[0]))
            .thenApply(v -> inventoryChecks.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));

        // Step 2: Verify all items available, then process payment
        return allInventory
            .thenCompose(inventoryResults -> {
                // Check if any item is out of stock
                List outOfStock = inventoryResults.stream()
                    .filter(r -> !r.available())
                    .map(InventoryResult::productId)
                    .collect(Collectors.toList());

                if (!outOfStock.isEmpty()) {
                    CompletableFuture failed = new CompletableFuture<>();
                    failed.completeExceptionally(
                        new RuntimeException("Items out of stock: " + outOfStock));
                    return failed;
                }

                // Calculate total
                double total = order.items().stream()
                    .mapToDouble(item -> item.price() * item.quantity())
                    .sum();

                System.out.println("Step 2: Processing payment ($" + String.format("%.2f", total) + ")...");
                return processPayment(order.customerId(), total);
            })
            // Step 3: Create shipment
            .thenCompose(payment -> {
                System.out.println("Step 3: Creating shipment...");
                return createShipment(order)
                    .thenApply(shipping -> Map.of(
                        "payment", payment,
                        "shipping", shipping
                    ));
            })
            // Step 4: Send notifications in PARALLEL
            .thenCompose(results -> {
                System.out.println("Step 4: Sending notifications...");
                PaymentResult payment = (PaymentResult) results.get("payment");
                ShippingResult shipping = (ShippingResult) results.get("shipping");

                CompletableFuture email = sendEmail(
                    order.customerId() + "@example.com",
                    "Order " + order.orderId() + " confirmed! Tracking: " + shipping.trackingNumber()
                );
                CompletableFuture sms = sendSms(
                    order.customerId(),
                    "Order confirmed! Track: " + shipping.trackingNumber()
                );

                return CompletableFuture.allOf(email, sms)
                    .thenApply(v -> String.format(
                        "Order %s COMPLETE | Payment: %s | Tracking: %s | Delivery: %s",
                        order.orderId(), payment.transactionId(),
                        shipping.trackingNumber(), shipping.estimatedDelivery()
                    ));
            })
            // Handle errors
            .exceptionally(ex -> {
                String error = "Order " + order.orderId() + " FAILED: " + ex.getMessage();
                System.out.println("  [ERROR] " + error);
                // Send failure notification
                sendEmail(order.customerId() + "@example.com",
                    "Order " + order.orderId() + " could not be processed").join();
                return error;
            })
            .whenComplete((result, ex) -> {
                long elapsed = System.currentTimeMillis() - startTime;
                System.out.println("  [Timer] Order " + order.orderId() + " took " + elapsed + "ms");
            });
    }

    public static void main(String[] args) {
        System.out.println("====================================");
        System.out.println("  Order Processing System");
        System.out.println("====================================");

        // Create sample orders
        Order order1 = new Order("ORD-001", "CUST-100", List.of(
            new OrderItem("PROD-1", "Laptop", 999.99, 1),
            new OrderItem("PROD-2", "Mouse", 29.99, 2),
            new OrderItem("PROD-3", "Keyboard", 79.99, 1)
        ));

        Order order2 = new Order("ORD-002", "CUST-200", List.of(
            new OrderItem("PROD-4", "Monitor", 399.99, 1),
            new OrderItem("PROD-5", "Headset", 149.99, 1)
        ));

        Order order3 = new Order("ORD-003", "CUST-300", List.of(
            new OrderItem("PROD-6", "Webcam", 89.99, 1)
        ));

        // Process ALL orders in PARALLEL
        List orders = List.of(order1, order2, order3);
        long totalStart = System.currentTimeMillis();

        List> orderFutures = orders.stream()
            .map(OrderProcessingSystem::processOrder)
            .collect(Collectors.toList());

        // Wait for all orders to complete
        CompletableFuture.allOf(orderFutures.toArray(new CompletableFuture[0])).join();

        // Print final results
        System.out.println("\n====================================");
        System.out.println("  FINAL RESULTS");
        System.out.println("====================================");

        for (CompletableFuture future : orderFutures) {
            System.out.println("  " + future.join());
        }

        long totalElapsed = System.currentTimeMillis() - totalStart;
        System.out.println("\nTotal processing time: " + totalElapsed + "ms");
        System.out.println("(All " + orders.size() + " orders processed in parallel)");

        // Summary of concepts used
        System.out.println("\n====================================");
        System.out.println("  Concepts Demonstrated");
        System.out.println("====================================");
        System.out.println("  1.  supplyAsync()     - Async computation with result");
        System.out.println("  2.  runAsync()        - Fire-and-forget async tasks");
        System.out.println("  3.  Custom executor   - Dedicated I/O thread pool");
        System.out.println("  4.  thenApply()       - Transform results");
        System.out.println("  5.  thenCompose()     - Chain dependent async operations");
        System.out.println("  6.  thenCombine()     - (shown in earlier examples)");
        System.out.println("  7.  allOf()           - Wait for all parallel tasks");
        System.out.println("  8.  exceptionally()   - Error recovery with fallback");
        System.out.println("  9.  whenComplete()    - Logging/timing side effects");
        System.out.println("  10. completeExceptionally() - Manual failure");
        System.out.println("  11. completedFuture() - (shown in earlier examples)");
        System.out.println("  12. Parallel streams  - Collecting future results");

        IO_POOL.shutdown();
    }

    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); }
    }
}

// Sample Output:
// ====================================
//   Order Processing System
// ====================================
//
// Processing order: ORD-001
// Step 1: Checking inventory...
// Processing order: ORD-002
// Step 1: Checking inventory...
// Processing order: ORD-003
// Step 1: Checking inventory...
//   [Inventory] Webcam: AVAILABLE (stock: 45)
//   [Inventory] Mouse: AVAILABLE (stock: 72)
//   [Inventory] Keyboard: AVAILABLE (stock: 38)
//   [Inventory] Headset: AVAILABLE (stock: 91)
//   [Inventory] Monitor: AVAILABLE (stock: 23)
//   [Inventory] Laptop: AVAILABLE (stock: 56)
// Step 2: Processing payment ($89.99)...
// Step 2: Processing payment ($549.98)...
// Step 2: Processing payment ($1139.96)...
//   [Payment] Customer CUST-300, $89.99: Payment approved (TXN-A1B2C3D4)
// Step 3: Creating shipment...
//   [Payment] Customer CUST-200, $549.98: Payment approved (TXN-E5F6G7H8)
// Step 3: Creating shipment...
//   [Payment] Customer CUST-100, $1139.96: Payment approved (TXN-I9J0K1L2)
// Step 3: Creating shipment...
//   [Shipping] Order ORD-003: tracking SHIP-M3N4O5P6
// Step 4: Sending notifications...
//   [Shipping] Order ORD-002: tracking SHIP-Q7R8S9T0
// Step 4: Sending notifications...
//   [SMS] To: CUST-300 | Message: Order confirmed! Track: SHIP-M3N4O5P6
//   [Email] To: CUST-300@example.com | Subject: Order ORD-003 confirmed!
//   [Shipping] Order ORD-001: tracking SHIP-U1V2W3X4
// Step 4: Sending notifications...
//   [Timer] Order ORD-003 took 652ms
//   [SMS] To: CUST-200 | Message: Order confirmed! Track: SHIP-Q7R8S9T0
//   [Email] To: CUST-200@example.com | Subject: Order ORD-002 confirmed!
//   [Timer] Order ORD-002 took 785ms
//   [SMS] To: CUST-100 | Message: Order confirmed! Track: SHIP-U1V2W3X4
//   [Email] To: CUST-100@example.com | Subject: Order ORD-001 confirmed!
//   [Timer] Order ORD-001 took 892ms
//
// ====================================
//   FINAL RESULTS
// ====================================
//   Order ORD-001 COMPLETE | Payment: TXN-I9J0K1L2 | Tracking: SHIP-U1V2W3X4
//   Order ORD-002 COMPLETE | Payment: TXN-E5F6G7H8 | Tracking: SHIP-Q7R8S9T0
//   Order ORD-003 COMPLETE | Payment: TXN-A1B2C3D4 | Tracking: SHIP-M3N4O5P6
//
// Total processing time: 895ms
// (All 3 orders processed in parallel)

Quick Reference

Category Method Description
Creating supplyAsync(supplier) Run async, return result
runAsync(runnable) Run async, no result
completedFuture(value) Already-complete future
new CompletableFuture() Manually completable
Transforming thenApply(fn) Transform result (like map)
thenAccept(consumer) Consume result (no return)
thenRun(runnable) Run after completion (ignores result)
Composing thenCompose(fn) Chain async operations (flatMap)
thenCombine(other, fn) Merge two parallel results
Combining allOf(futures...) Wait for all to complete
anyOf(futures...) First to complete wins
Errors exceptionally(fn) Provide fallback on error
handle(bifn) Handle both success and error
whenComplete(biconsumer) Inspect result/error (no transform)
Getting Result join() Block and get result (unchecked exception)
get(timeout, unit) Block with timeout (checked exception)
Timeout (9+) orTimeout(time, unit) Fail with TimeoutException
completeOnTimeout(val, time, unit) Use default value on timeout



Subscribe To Our Newsletter
You will receive our latest post and tutorial.
Thank you for subscribing!

required
required


Leave a Reply

Your email address will not be published. Required fields are marked *