Subscribe To Our Newsletter
You will receive our latest post and tutorial.
Thank you for subscribing!

required
required


MySQL Binlog

 

September 28, 2019

Elasticsearch Modeling Data

Elasticsearch, like most NoSQL databases, treats the world as though it were flat. An index is a flat collection of independent documents. A single document should con‐ tain all of the information that is required to decide whether it matches a search request.

Denormalizing your Data

The way to get the best search performance out of Elasticsearch is to use it as it is intended, by denormalizing your data at index time. Having redundant copies of data in each document that requires access to it removes the need for joins.

If we want to be able to find a blog post by the name of the user who wrote it, include the user’s name in the blog-post document itself.

PUT /users/blogpost/2
{
   "title": "Today Spirit", 
   "body": "Let's go!", 
   "user": {
     "id": 1,
     "name": "Folau Kaveinga" 
    }
}

Of course, data denormalization has downsides too. The first disadvantage is that the index will be bigger because the _source document for every blog post is bigger, and there are more indexed fields. This usually isn’t a huge problem. The data written to disk is highly compressed, and disk space is cheap. Elasticsearch can happily cope with the extra data.

The more important issue is that, if the user were to change his name, all of his blog posts would need to be updated too. Fortunately, users don’t often change names. Even if they did, it is unlikely that a user would have written more than a few thou‐ sand blog posts, so updating blog posts with the scroll and bulk APIs would take less than a second.

Nested Objects

Given the fact that creating, deleting, and updating a single document in Elasticsearch is atomic, it makes sense to store closely related entities within the same document. For instance, we could store an order and all of its order lines in one document, or we could store a blog post and all of its comments together, by passing an array of comments.

Note that each nested object is indexed as a hidden separate document. Because nested objects are indexed as separate hidden documents, we can’t query them directly. Instead, we have to use the nested query or nested filter to access them.

By indexing each nested object separately, the fields within the object maintain their relationships. We can run queries that will match only if the match occurs within the same nested object.

Not only that, because of the way that nested objects are indexed, joining the nested documents to the root document at query time is fast—almost as fast as if they were a single document.

These extra nested documents are hidden; we can’t access them directly. To update, add, or remove a nested object, we have to reindex the whole document. It’s impor‐ tant to note that, the result returned by a search request is not the nested object alone; it is the whole document.

When should you user nested objects?

Nested objects are useful when there is one main entity, like our user, with a limited number of closely related but less important entities, such as addresses. It is useful to be able to find addresses based on the content of the street or zipcode, and the nested query and filter provide for fast query-time joins.

Retiring Data

As time-based data ages, it becomes less relevant. It’s possible that we will want to see what happened last week, last month, or even last year, but for the most part, we’re interested in only the here and now. The nice thing about an index per time frame is that it enables us to easily delete old data: just delete the indices that are no longer relevant.

 

September 27, 2019

Elasticsearch Geo-Point

A geo-point is a single latitude/longitude point on the Earth’s surface. Geo-points can be used to calculate distance from a point, to determine whether a point falls within a bounding box, or in aggregations.

Geo-points cannot be automatically detected with dynamic mapping. Instead, geo_point fields should be mapped explicitly.

PUT /elasticsearch_learning
{
   "mappings": {
     "addresses": { 
       "properties": {
        "location": {
           "type": "geo_point" 
         }
      } 
    }
  } 
}

With the location field defined as a geo_point, we can proceed to index documents containing latitude/longitude pairs, which can be formatted as strings, arrays, or objects.

Geo Distance Filter(geo_distance)

The geo_distance filter draws a circle around the specified location and finds all documents that have a geo-point within that circle.

Find all location fields within 1 miles of the specified point.

GET elasticsearch_learning/_search 
{
"query":{
  "nested" : {
    "query" : {
      "bool" : {
        "filter" : [
          {
            "geo_distance" : {
              "addresses.location" : [
                -111.881186,
                40.414897
              ],
              "distance" : 1609.344,
              "distance_type" : "arc",
              "validation_method" : "STRICT",
              "ignore_unmapped" : false,
              "boost" : 1.0
            }
          }
        ],
        "adjust_pure_negative" : true,
        "boost" : 1.0
      }
    },
    "path" : "addresses",
    "ignore_unmapped" : false,
    "score_mode" : "none",
    "boost" : 1.0
  }
}
}
/**
 * https://www.elastic.co/guide/en/elasticsearch/reference/7.x/query-dsl-nested-query.html<br>
 * https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-geo-distance-query.html
 */
@Test
void searchWithGeopoint() {

    int pageNumber = 0;
    int pageSize = 3;

    SearchRequest searchRequest = new SearchRequest(database);
    searchRequest.allowPartialSearchResults(true);
    searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());

    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.from(pageNumber * pageSize);
    searchSourceBuilder.size(pageSize);
    searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
    /**
     * fetch only a few fields
     */
    searchSourceBuilder.fetchSource(new String[]{"*"}, new String[]{"cards"});
    // searchSourceBuilder.fetchSource(new FetchSourceContext(true, new String[]{"*"}, new String[]{"cards"}));
    /**
     * Query with bool
     */
    BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

    /**
     * Lehi skate park: 40.414897, -111.881186<br>
     * get locations/addresses close to skate park(from a radius).<br>
     * The geo_distance filter can work with multiple locations / points per document. Once a single location /
     * point matches the filter, the document will be included in the filter.
     */
    boolQuery.filter(QueryBuilders.geoDistanceQuery("addresses.location").point(40.414897, -111.881186).distance(1, DistanceUnit.MILES));

    searchSourceBuilder.query(QueryBuilders.nestedQuery("addresses", boolQuery, ScoreMode.None));

    searchRequest.source(searchSourceBuilder);

    searchRequest.preference("nested-address");

    if (searchSourceBuilder.sorts() != null && searchSourceBuilder.sorts().size() > 0) {
        log.info("\n{\n\"query\":{}, \"sort\":{}\n}", searchSourceBuilder.query().toString(), searchSourceBuilder.sorts().toString());
    } else {
        log.info("\n{\n\"query\":{}\n}", searchSourceBuilder.query().toString());
    }

    try {
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

        log.info("isTimedOut={}, totalShards={}, totalHits={}", searchResponse.isTimedOut(), searchResponse.getTotalShards(), searchResponse.getHits().getTotalHits().value);

        List<User> users = getResponseResult(searchResponse.getHits());

        log.info("results={}", ObjectUtils.toJson(users));

    } catch (IOException e) {
        log.warn("IOException, msg={}", e.getLocalizedMessage());
        e.printStackTrace();
    } catch (Exception e) {
        log.warn("Exception, msg={}", e.getLocalizedMessage());
        e.printStackTrace();
    }
}

Geo Bounding Box(geo_bounding_box) Filter

This is by far the most efficient geo-filter because its calculation is very simple. You provide it with the top, bottom, left, and right coordinates of a rectangle, and all it does is compare the latitude with the left and right coordinates, and the longitude with the top and bottom coordinates.

GET elasticsearch_learning/_search
{
"query":{
  "nested" : {
    "query" : {
      "bool" : {
        "filter" : [
          {
            "geo_bounding_box" : {
              "addresses.location" : {
                "top_left" : [
                  112.025029,
                  40.526588
                ],
                "bottom_right" : [
                  0.0,
                  0.0
                ]
              },
              "validation_method" : "STRICT",
              "type" : "MEMORY",
              "ignore_unmapped" : false,
              "boost" : 1.0
            }
          }
        ],
        "adjust_pure_negative" : true,
        "boost" : 1.0
      }
    },
    "path" : "addresses",
    "ignore_unmapped" : false,
    "score_mode" : "none",
    "boost" : 1.0
  }
}
}
/**
     * https://www.elastic.co/guide/en/elasticsearch/reference/7.x/query-dsl-nested-query.html<br>
     * https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-geo-distance-query.html
     */
    @Test
    void searchWithGeoBoundingBox() {

        int pageNumber = 0;
        int pageSize = 3;

        SearchRequest searchRequest = new SearchRequest(database);
        searchRequest.allowPartialSearchResults(true);
        searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.from(pageNumber * pageSize);
        searchSourceBuilder.size(pageSize);
        searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        /**
         * fetch only a few fields
         */
        searchSourceBuilder.fetchSource(new String[]{"id","firstName","lastName","addresses.street","addresses.city","addresses.zipcode"}, new String[]{"cards"});
        /**
         * Query with bool
         */
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

        /**
         * topLeft: herriman<br>
         * bottomRight: american folk
         */
        boolQuery.filter(QueryBuilders.geoBoundingBoxQuery("addresses.location")
                .setCorners(new GeoPoint(40.526588, 112.025029), new GeoPoint(0.0, 0.0)));

        searchSourceBuilder.query(QueryBuilders.nestedQuery("addresses", boolQuery, ScoreMode.None));

        searchRequest.source(searchSourceBuilder);

        searchRequest.preference("nested-address");

        if (searchSourceBuilder.sorts() != null && searchSourceBuilder.sorts().size() > 0) {
            log.info("\n{\n\"query\":{}, \"sort\":{}\n}", searchSourceBuilder.query().toString(), searchSourceBuilder.sorts().toString());
        } else {
            log.info("\n{\n\"query\":{}\n}", searchSourceBuilder.query().toString());
        }

        try {
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

            log.info("isTimedOut={}, totalShards={}, totalHits={}", searchResponse.isTimedOut(), searchResponse.getTotalShards(), searchResponse.getHits().getTotalHits().value);

            List<User> users = getResponseResult(searchResponse.getHits());

            log.info("results={}", ObjectUtils.toJson(users));

        } catch (IOException e) {
            log.warn("IOException, msg={}", e.getLocalizedMessage());
            e.printStackTrace();
        } catch (Exception e) {
            log.warn("Exception, msg={}", e.getLocalizedMessage());
            e.printStackTrace();
        }
    }

Scoring by Distance

It may be that distance is the only important factor in deciding the order in which results are returned, but more frequently we need to combine distance with other factors, such as full-text relevance, popularity, and price.

In these situations, we should reach for the function_score query that allows us to blend all of these factors into an overall score. 

The other drawback of sorting by distance is performance: the distance has to be cal‐ culated for all matching documents. The function_score query, on the other hand, can be executed during the rescore phase, limiting the number of calculations to just the top n results.

 

September 27, 2019

CompletableFuture

1. What is CompletableFuture?

Think of ordering coffee at a busy cafe. You place your order, get a receipt with a number, and step aside. You are free to check your phone, chat with a friend, or grab a seat. When your coffee is ready, the barista calls your number. You did not stand at the counter blocking everyone behind you — you continued living your life while the work happened in the background.

CompletableFuture is that receipt. It is a promise that a value will be available at some point in the future. Introduced in Java 8 as part of java.util.concurrent, it is the most powerful tool Java provides for writing asynchronous, non-blocking code.

The Problem with Future

Java 5 introduced Future<T>, which represents an asynchronous computation. But Future has a fatal flaw: the only way to get the result is to call get(), which blocks the calling thread. You cannot attach a callback. You cannot chain operations. You cannot combine multiple futures. Calling get() defeats the purpose of async programming.

What CompletableFuture Adds

Feature Future CompletableFuture
Get result get() — blocks get(), join(), or non-blocking callbacks
Attach callback Not supported thenApply(), thenAccept(), thenRun()
Chain operations Not supported thenCompose(), thenApply()
Combine futures Not supported thenCombine(), allOf(), anyOf()
Exception handling ExecutionException wrapper exceptionally(), handle(), whenComplete()
Manually complete Not supported complete(), completeExceptionally()
import java.util.concurrent.*;

public class FutureVsCompletableFuture {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // OLD WAY: Future -- blocks on get()
        System.out.println("=== Future (blocking) ===");
        Future future = executor.submit(() -> {
            Thread.sleep(500);
            return "Data from database";
        });
        // This line BLOCKS until the result is ready
        String result = future.get();
        System.out.println("Got: " + result);

        // NEW WAY: CompletableFuture -- non-blocking
        System.out.println("\n=== CompletableFuture (non-blocking) ===");
        CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "Data from database";
        });

        // Attach a callback -- runs when data is ready, does NOT block
        cf.thenApply(data -> data.toUpperCase())
          .thenAccept(data -> System.out.println("Got: " + data));

        System.out.println("Main thread is free!");
        Thread.sleep(1000);
        executor.shutdown();

        // Output:
        // === Future (blocking) ===
        // Got: Data from database
        //
        // === CompletableFuture (non-blocking) ===
        // Main thread is free!
        // Got: DATA FROM DATABASE
    }
}

2. Creating CompletableFuture

There are four main ways to create a CompletableFuture, each for a different situation.

2.1 supplyAsync() — Returns a Value

Use supplyAsync() when your async operation produces a result. It takes a Supplier<T> and runs it on a background thread from ForkJoinPool.commonPool().

2.2 runAsync() — No Return Value

Use runAsync() for fire-and-forget operations that do not return a result: logging, sending notifications, writing to a file. It takes a Runnable and returns CompletableFuture<Void>.

2.3 completedFuture() — Already Done

Use completedFuture() to create a CompletableFuture that is already completed with a known value. This is useful for caching, default values, and testing.

2.4 Custom Executor

By default, supplyAsync() and runAsync() use ForkJoinPool.commonPool(). For I/O-bound operations (database calls, HTTP requests, file reads), you should provide a custom executor to avoid starving the common pool.

import java.util.concurrent.*;

public class CreatingCompletableFuture {
    public static void main(String[] args) throws Exception {

        // 1. supplyAsync -- produces a result
        CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Fetching user on: " + Thread.currentThread().getName());
            return "User{id=1, name='Alice'}";
        });
        System.out.println("User: " + userFuture.join());
        // Output:
        // Fetching user on: ForkJoinPool.commonPool-worker-1
        // User: User{id=1, name='Alice'}

        // 2. runAsync -- no result (fire-and-forget)
        CompletableFuture logFuture = CompletableFuture.runAsync(() -> {
            System.out.println("Logging on: " + Thread.currentThread().getName());
            // Write to log file, send notification, etc.
        });
        logFuture.join(); // Wait for completion (returns null)

        // 3. completedFuture -- already has a value
        CompletableFuture cached = CompletableFuture.completedFuture("Cached Result");
        System.out.println("Cached: " + cached.join());
        // Output: Cached: Cached Result
        // No thread was created -- the value was immediately available

        // 4. Custom executor -- for I/O operations
        ExecutorService ioPool = Executors.newFixedThreadPool(10, r -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("io-pool-" + t.getId());
            return t;
        });

        CompletableFuture dbResult = CompletableFuture.supplyAsync(() -> {
            System.out.println("DB query on: " + Thread.currentThread().getName());
            try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "Query result";
        }, ioPool); // Pass custom executor as second argument

        System.out.println("DB: " + dbResult.join());
        // Output:
        // DB query on: io-pool-25
        // DB: Query result

        // 5. Manually completing
        CompletableFuture manual = new CompletableFuture<>();
        // Some other thread or callback will complete this later
        new Thread(() -> {
            try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); }
            manual.complete("Manually completed!");
        }).start();
        System.out.println("Manual: " + manual.join());
        // Output: Manual: Manually completed!

        ioPool.shutdown();
    }
}

Creation Methods Summary

Method Returns Input Thread Pool Use Case
supplyAsync(supplier) CompletableFuture<T> Supplier<T> Common pool Async computation with result
supplyAsync(supplier, executor) CompletableFuture<T> Supplier<T> Custom executor I/O-bound operations
runAsync(runnable) CompletableFuture<Void> Runnable Common pool Fire-and-forget side effects
completedFuture(value) CompletableFuture<T> T None (immediate) Caching, defaults, testing
new CompletableFuture() CompletableFuture<T> None None (manual) Bridging callback APIs

3. Transforming Results

Once you have a CompletableFuture, you can transform its result when it arrives using three callback methods. The key difference is what each callback accepts and returns.

Method Input Returns Analogy
thenApply(Function) Previous result Transformed result Stream’s map()
thenAccept(Consumer) Previous result Void Stream’s forEach()
thenRun(Runnable) Nothing Void “Then do this, regardless of result”
import java.util.concurrent.CompletableFuture;

public class TransformingResults {
    public static void main(String[] args) {

        // thenApply -- transform the result (like map)
        System.out.println("=== thenApply ===");
        CompletableFuture greeting = CompletableFuture
            .supplyAsync(() -> "hello")
            .thenApply(s -> s + " world")       // "hello" -> "hello world"
            .thenApply(String::toUpperCase);     // "hello world" -> "HELLO WORLD"

        System.out.println(greeting.join());
        // Output: HELLO WORLD

        // Chain of transformations
        CompletableFuture wordCount = CompletableFuture
            .supplyAsync(() -> "The quick brown fox jumps over the lazy dog")
            .thenApply(String::trim)
            .thenApply(s -> s.split("\\s+"))
            .thenApply(words -> words.length);

        System.out.println("Word count: " + wordCount.join());
        // Output: Word count: 9

        // thenAccept -- consume the result (no return value)
        System.out.println("\n=== thenAccept ===");
        CompletableFuture printFuture = CompletableFuture
            .supplyAsync(() -> "User{name='Alice', age=30}")
            .thenAccept(user -> System.out.println("Received: " + user));
        printFuture.join();
        // Output: Received: User{name='Alice', age=30}

        // thenRun -- run something after completion (ignores result)
        System.out.println("\n=== thenRun ===");
        CompletableFuture pipeline = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("Step 1: Fetching data...");
                return "raw data";
            })
            .thenApply(data -> {
                System.out.println("Step 2: Processing: " + data);
                return "processed " + data;
            })
            .thenAccept(result -> {
                System.out.println("Step 3: Saving: " + result);
            })
            .thenRun(() -> {
                System.out.println("Step 4: All done! (cleanup)");
            });

        pipeline.join();
        // Output:
        // Step 1: Fetching data...
        // Step 2: Processing: raw data
        // Step 3: Saving: processed raw data
        // Step 4: All done! (cleanup)
    }
}

4. Composing Futures

Composing means chaining async operations where each step depends on the result of the previous one. There are two key methods:

  • thenCompose() — Use when your transformation function itself returns a CompletableFuture. This is the async equivalent of flatMap() in streams. Without it, you end up with CompletableFuture<CompletableFuture<T>> — a future wrapped inside a future.
  • thenCombine() — Use when you want to run two independent futures in parallel and combine their results when both complete.

thenCompose vs thenApply

Scenario Use Why
Transform: T -> U thenApply() Synchronous transformation, returns a plain value
Chain: T -> CompletableFuture<U> thenCompose() Async step that itself returns a future (avoids nesting)
Merge two independent results thenCombine() Both futures run in parallel, combine when both finish
import java.util.concurrent.CompletableFuture;

public class ComposingFutures {

    // Simulated async service methods
    static CompletableFuture fetchUser(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("  Fetching user " + userId + "...");
            try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "User{id=" + userId + ", name='Alice'}";
        });
    }

    static CompletableFuture fetchOrders(String user) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("  Fetching orders for " + user + "...");
            try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "Orders[{id=101, item='Laptop'}, {id=102, item='Mouse'}]";
        });
    }

    static CompletableFuture fetchCreditScore(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("  Fetching credit score for user " + userId + "...");
            try { Thread.sleep(300); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return 750.0;
        });
    }

    public static void main(String[] args) {

        // === thenCompose: Sequential dependency ===
        // Step 1: Fetch user -> Step 2: Fetch orders for that user
        System.out.println("=== thenCompose (sequential chain) ===");

        CompletableFuture orderPipeline = fetchUser(1)
            .thenCompose(user -> fetchOrders(user)); // user result feeds into fetchOrders

        System.out.println("Result: " + orderPipeline.join());
        // Output:
        //   Fetching user 1...
        //   Fetching orders for User{id=1, name='Alice'}...
        // Result: Orders[{id=101, item='Laptop'}, {id=102, item='Mouse'}]

        // WHY NOT thenApply? It would create nested futures:
        // CompletableFuture> nested =
        //     fetchUser(1).thenApply(user -> fetchOrders(user)); // WRONG! Nested future

        // === thenCombine: Parallel merge ===
        // Fetch user data AND credit score simultaneously, then combine
        System.out.println("\n=== thenCombine (parallel merge) ===");

        CompletableFuture userFuture = fetchUser(1);
        CompletableFuture creditFuture = fetchCreditScore(1);

        CompletableFuture combined = userFuture.thenCombine(
            creditFuture,
            (user, creditScore) -> user + " | Credit Score: " + creditScore
        );

        System.out.println("Combined: " + combined.join());
        // Both ran in parallel! Total time is ~300ms, not 200+300=500ms
        // Output: Combined: User{id=1, name='Alice'} | Credit Score: 750.0

        // === Complex chain: compose + combine ===
        System.out.println("\n=== Complex Pipeline ===");

        CompletableFuture fullReport = fetchUser(1)
            .thenCompose(user -> {
                // After getting user, fetch orders AND credit score in parallel
                CompletableFuture orders = fetchOrders(user);
                CompletableFuture credit = fetchCreditScore(1);

                // Combine both results
                return orders.thenCombine(credit, (o, c) ->
                    "Report: " + user + "\n  " + o + "\n  Credit: " + c);
            });

        System.out.println(fullReport.join());
        // Output:
        // Report: User{id=1, name='Alice'}
        //   Orders[{id=101, item='Laptop'}, {id=102, item='Mouse'}]
        //   Credit: 750.0
    }
}

5. Combining Multiple Futures

When you need to run many async operations in parallel and wait for all (or any) of them to complete, use allOf() and anyOf().

allOf() — Wait for All

CompletableFuture.allOf() takes a varargs of futures and returns a CompletableFuture<Void> that completes when all input futures complete. It does not directly give you the results — you have to extract them from the individual futures.

anyOf() — First to Finish

CompletableFuture.anyOf() returns a CompletableFuture<Object> that completes as soon as any one of the input futures completes. Useful for racing multiple data sources.

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class CombiningFutures {
    public static void main(String[] args) {

        // === allOf: Parallel fetch multiple resources ===
        System.out.println("=== allOf: Wait for ALL ===");

        List urls = List.of(
            "https://api.example.com/users",
            "https://api.example.com/orders",
            "https://api.example.com/products",
            "https://api.example.com/reviews"
        );

        // Create a future for each URL
        List> futures = urls.stream()
            .map(url -> CompletableFuture.supplyAsync(() -> {
                // Simulate API call with varying delays
                int delay = 100 + new Random().nextInt(400);
                try { Thread.sleep(delay); } catch (InterruptedException e) { throw new RuntimeException(e); }
                return "Response from " + url.substring(url.lastIndexOf('/') + 1)
                    + " (" + delay + "ms)";
            }))
            .collect(Collectors.toList());

        // Wait for ALL to complete
        long start = System.currentTimeMillis();
        CompletableFuture allDone = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        );

        // Collect all results
        CompletableFuture> allResults = allDone.thenApply(v ->
            futures.stream()
                .map(CompletableFuture::join) // Safe -- all are already complete
                .collect(Collectors.toList())
        );

        List results = allResults.join();
        long elapsed = System.currentTimeMillis() - start;

        results.forEach(r -> System.out.println("  " + r));
        System.out.println("Total time: " + elapsed + "ms (parallel, not sequential!)");
        // All 4 API calls ran simultaneously
        // Total time is ~max(individual times), not sum

        // === anyOf: Race multiple sources ===
        System.out.println("\n=== anyOf: First to finish ===");

        CompletableFuture primary = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(300); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "Primary DB: user data";
        });

        CompletableFuture replica = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "Replica DB: user data";
        });

        CompletableFuture cache = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(50); } catch (InterruptedException e) { throw new RuntimeException(e); }
            return "Cache: user data";
        });

        CompletableFuture fastest = CompletableFuture.anyOf(primary, replica, cache);
        System.out.println("Fastest response: " + fastest.join());
        // Output: Fastest response: Cache: user data
        // Cache was fastest at 50ms

        // === Practical: Collecting typed results ===
        System.out.println("\n=== Collecting Results with Type Safety ===");

        CompletableFuture usersCount = CompletableFuture.supplyAsync(() -> 1500);
        CompletableFuture ordersCount = CompletableFuture.supplyAsync(() -> 3200);
        CompletableFuture productsCount = CompletableFuture.supplyAsync(() -> 850);

        CompletableFuture dashboard = usersCount
            .thenCombine(ordersCount, (users, orders) -> "Users: " + users + ", Orders: " + orders)
            .thenCombine(productsCount, (partial, products) -> partial + ", Products: " + products);

        System.out.println("Dashboard: " + dashboard.join());
        // Output: Dashboard: Users: 1500, Orders: 3200, Products: 850
    }
}

6. Exception Handling

Exceptions in async pipelines are tricky because they happen on background threads. CompletableFuture provides three methods for handling errors, each with a different purpose:

Method Receives Returns Purpose
exceptionally(Function) Exception only Recovery value Provide a fallback when error occurs
handle(BiFunction) Result AND exception New result Handle both success and failure in one place
whenComplete(BiConsumer) Result AND exception Same result (no transform) Logging, cleanup, side effects

Key rule: If an exception occurs in a CompletableFuture and is not handled, it propagates down the chain. Every subsequent thenApply(), thenAccept(), etc. is skipped until an exception handler is encountered.

import java.util.concurrent.CompletableFuture;

public class ExceptionHandling {
    public static void main(String[] args) {

        // === exceptionally: Provide a fallback ===
        System.out.println("=== exceptionally ===");

        CompletableFuture withFallback = CompletableFuture
            .supplyAsync(() -> {
                if (true) throw new RuntimeException("Database connection failed!");
                return "data";
            })
            .exceptionally(ex -> {
                System.out.println("Error: " + ex.getMessage());
                return "default data"; // Recovery value
            });

        System.out.println("Result: " + withFallback.join());
        // Output:
        // Error: Database connection failed!
        // Result: default data

        // === handle: Both success and failure ===
        System.out.println("\n=== handle (success case) ===");

        CompletableFuture handled = CompletableFuture
            .supplyAsync(() -> "real data")
            .handle((result, ex) -> {
                if (ex != null) {
                    System.out.println("Failed: " + ex.getMessage());
                    return "fallback";
                }
                System.out.println("Success: " + result);
                return result.toUpperCase();
            });
        System.out.println("Result: " + handled.join());
        // Output:
        // Success: real data
        // Result: REAL DATA

        System.out.println("\n=== handle (failure case) ===");

        CompletableFuture handleError = CompletableFuture
            .supplyAsync(() -> {
                throw new RuntimeException("Network timeout");
            })
            .handle((result, ex) -> {
                if (ex != null) {
                    System.out.println("Failed: " + ex.getMessage());
                    return "cached result";
                }
                return result;
            });
        System.out.println("Result: " + handleError.join());
        // Output:
        // Failed: Network timeout
        // Result: cached result

        // === whenComplete: Logging/cleanup (does NOT change result) ===
        System.out.println("\n=== whenComplete ===");

        CompletableFuture logged = CompletableFuture
            .supplyAsync(() -> "important data")
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    System.out.println("LOG: Operation failed - " + ex.getMessage());
                } else {
                    System.out.println("LOG: Operation succeeded - " + result);
                }
                // NOTE: Cannot change the result here!
            });
        System.out.println("Result: " + logged.join());
        // Output:
        // LOG: Operation succeeded - important data
        // Result: important data

        // === Exception propagation ===
        System.out.println("\n=== Exception Propagation ===");

        CompletableFuture chain = CompletableFuture
            .supplyAsync(() -> "data")
            .thenApply(s -> {
                throw new RuntimeException("Step 2 failed");
            })
            .thenApply(s -> {
                System.out.println("This is SKIPPED");
                return "never reached";
            })
            .thenApply(s -> {
                System.out.println("This is ALSO SKIPPED");
                return "never reached";
            })
            .exceptionally(ex -> {
                System.out.println("Caught at the end: " + ex.getMessage());
                return "recovered";
            });

        System.out.println("Result: " + chain.join());
        // Output:
        // Caught at the end: java.lang.RuntimeException: Step 2 failed
        // Result: recovered
    }
}

7. Async Variants

Every callback method in CompletableFuture has three variants:

Variant Thread Used When to Use
thenApply(fn) Same thread that completed the previous stage (or the caller thread if already complete) Fast, CPU-light transformations
thenApplyAsync(fn) ForkJoinPool.commonPool() CPU-intensive transformations
thenApplyAsync(fn, executor) Your custom executor I/O-bound operations (DB, HTTP)

The same pattern applies to thenAccept/thenAcceptAsync, thenRun/thenRunAsync, thenCompose/thenComposeAsync, thenCombine/thenCombineAsync, etc.

Rule of thumb:

  • Use the non-async version for quick transformations (string manipulation, mapping, filtering).
  • Use Async with a custom executor for anything that involves I/O or takes more than a few milliseconds.
  • Never use the common pool (default Async) for blocking I/O — it has limited threads and is shared across your entire application.
import java.util.concurrent.*;

public class AsyncVariants {
    public static void main(String[] args) {
        // Custom executor for I/O work
        ExecutorService ioPool = Executors.newFixedThreadPool(4, r -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("io-" + t.getId());
            return t;
        });

        // thenApply -- runs on completing thread (fast)
        CompletableFuture sync = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("Produce on: " + Thread.currentThread().getName());
                return "data";
            })
            .thenApply(s -> {
                System.out.println("thenApply on: " + Thread.currentThread().getName());
                return s.toUpperCase(); // Quick transformation, same thread
            });
        sync.join();
        // Both likely run on: ForkJoinPool.commonPool-worker-1

        System.out.println();

        // thenApplyAsync -- runs on common pool (different thread)
        CompletableFuture async = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("Produce on: " + Thread.currentThread().getName());
                return "data";
            })
            .thenApplyAsync(s -> {
                System.out.println("thenApplyAsync on: " + Thread.currentThread().getName());
                return s.toUpperCase(); // May run on a different thread
            });
        async.join();

        System.out.println();

        // thenApplyAsync with executor -- runs on YOUR thread pool
        CompletableFuture customAsync = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("Produce on: " + Thread.currentThread().getName());
                return "data";
            })
            .thenApplyAsync(s -> {
                System.out.println("Custom async on: " + Thread.currentThread().getName());
                // Simulate I/O (database call)
                try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); }
                return "DB result for: " + s;
            }, ioPool);

        System.out.println("Result: " + customAsync.join());
        // Output: Custom async on: io-25
        // Result: DB result for: data

        ioPool.shutdown();
    }
}

8. Real-World Patterns

These patterns solve common problems you will encounter in production code.

8.1 Parallel API Calls

When you need data from multiple independent sources, fetch them all in parallel instead of sequentially. This can dramatically reduce response times.

import java.util.concurrent.*;
import java.util.*;
import java.util.stream.*;

public class ParallelApiCalls {

    static ExecutorService httpPool = Executors.newFixedThreadPool(10);

    // Simulated API calls
    static CompletableFuture> fetchUserProfile(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(200);
            return Map.of("id", userId, "name", "Alice", "email", "alice@example.com");
        }, httpPool);
    }

    static CompletableFuture> fetchUserOrders(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(300);
            return List.of("Order-101", "Order-102", "Order-103");
        }, httpPool);
    }

    static CompletableFuture> fetchUserPreferences(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(150);
            return Map.of("theme", "dark", "language", "en", "notifications", true);
        }, httpPool);
    }

    public static void main(String[] args) {
        int userId = 1;
        long start = System.currentTimeMillis();

        // Kick off ALL requests simultaneously
        CompletableFuture> profileFuture = fetchUserProfile(userId);
        CompletableFuture> ordersFuture = fetchUserOrders(userId);
        CompletableFuture> prefsFuture = fetchUserPreferences(userId);

        // Wait for all and combine
        CompletableFuture> dashboard = CompletableFuture
            .allOf(profileFuture, ordersFuture, prefsFuture)
            .thenApply(v -> {
                Map result = new HashMap<>();
                result.put("profile", profileFuture.join());
                result.put("orders", ordersFuture.join());
                result.put("preferences", prefsFuture.join());
                return result;
            });

        Map result = dashboard.join();
        long elapsed = System.currentTimeMillis() - start;

        System.out.println("Dashboard data:");
        result.forEach((key, value) -> System.out.println("  " + key + ": " + value));
        System.out.println("Total time: " + elapsed + "ms");
        // Total time: ~300ms (max of 200, 300, 150) instead of 650ms (sequential)

        httpPool.shutdown();
    }

    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); }
    }
}

8.2 Timeout Handling

In production, you cannot wait forever for an async operation to complete. You need timeouts.

import java.util.concurrent.*;

public class TimeoutHandling {
    public static void main(String[] args) {

        // === Java 9+: orTimeout() and completeOnTimeout() ===

        // orTimeout: Fails with TimeoutException if not complete in time
        CompletableFuture withTimeout = CompletableFuture
            .supplyAsync(() -> {
                try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); }
                return "slow result";
            })
            .orTimeout(1, TimeUnit.SECONDS)
            .exceptionally(ex -> {
                System.out.println("Timed out: " + ex.getMessage());
                return "timeout fallback";
            });

        System.out.println("orTimeout result: " + withTimeout.join());
        // Output:
        // Timed out: java.util.concurrent.TimeoutException
        // orTimeout result: timeout fallback

        // completeOnTimeout: Provides a default value on timeout (no exception)
        CompletableFuture withDefault = CompletableFuture
            .supplyAsync(() -> {
                try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); }
                return "slow result";
            })
            .completeOnTimeout("default value", 1, TimeUnit.SECONDS);

        System.out.println("completeOnTimeout: " + withDefault.join());
        // Output: completeOnTimeout: default value

        // === Java 8 compatible: Manual timeout ===
        System.out.println("\n=== Java 8 Timeout Pattern ===");
        CompletableFuture java8Timeout = timeoutAfter(
            CompletableFuture.supplyAsync(() -> {
                try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); }
                return "slow";
            }),
            1, TimeUnit.SECONDS,
            "java8 fallback"
        );
        System.out.println("Java 8 timeout: " + java8Timeout.join());
        // Output: Java 8 timeout: java8 fallback
    }

    // Java 8 compatible timeout helper
    static  CompletableFuture timeoutAfter(
            CompletableFuture future, long timeout, TimeUnit unit, T fallback) {
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        CompletableFuture timeoutFuture = new CompletableFuture<>();

        scheduler.schedule(() -> timeoutFuture.complete(fallback), timeout, unit);

        return CompletableFuture.anyOf(future, timeoutFuture)
            .thenApply(result -> {
                scheduler.shutdown();
                @SuppressWarnings("unchecked")
                T typed = (T) result;
                return typed;
            });
    }
}

8.3 Retry Pattern

For transient failures (network blips, temporary unavailability), retrying the operation automatically is a common pattern.

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class RetryPattern {

    static AtomicInteger callCount = new AtomicInteger(0);

    // Simulated flaky service: fails first 2 times, succeeds on 3rd
    static CompletableFuture flakyService() {
        return CompletableFuture.supplyAsync(() -> {
            int attempt = callCount.incrementAndGet();
            System.out.println("  Attempt " + attempt + " on " + Thread.currentThread().getName());
            if (attempt <= 2) {
                throw new RuntimeException("Service unavailable (attempt " + attempt + ")");
            }
            return "Success on attempt " + attempt;
        });
    }

    // Retry helper
    static  CompletableFuture retry(
            java.util.function.Supplier> supplier,
            int maxRetries,
            long delayMs) {

        CompletableFuture future = supplier.get();

        for (int i = 0; i < maxRetries; i++) {
            final int attempt = i + 1;
            future = future.handle((result, ex) -> {
                if (ex == null) {
                    return CompletableFuture.completedFuture(result);
                }
                System.out.println("  Retry " + attempt + ": " + ex.getMessage());
                try { Thread.sleep(delayMs); } catch (InterruptedException e) { throw new RuntimeException(e); }
                return supplier.get();
            }).thenCompose(f -> f);
        }
        return future;
    }

    public static void main(String[] args) {
        System.out.println("=== Retry Pattern ===");

        CompletableFuture result = retry(
            RetryPattern::flakyService,
            3,        // max retries
            500       // delay between retries in ms
        );

        System.out.println("Final result: " + result.join());

        // Output:
        //   Attempt 1 on ForkJoinPool.commonPool-worker-1
        //   Retry 1: java.lang.RuntimeException: Service unavailable (attempt 1)
        //   Attempt 2 on ForkJoinPool.commonPool-worker-2
        //   Retry 2: java.lang.RuntimeException: Service unavailable (attempt 2)
        //   Attempt 3 on ForkJoinPool.commonPool-worker-1
        // Final result: Success on attempt 3
    }
}

9. Best Practices

# Practice Do Do Not
1 Always handle exceptions Use exceptionally() or handle() on every chain Let exceptions silently disappear
2 Use custom executors for I/O supplyAsync(fn, ioPool) Use common pool for DB/HTTP calls
3 Prefer thenCompose over thenApply thenCompose() when calling another async method thenApply() creating nested futures
4 Use join() over get() future.join() — throws unchecked exception future.get() — forces try/catch for checked exception
5 Add timeouts orTimeout() or completeOnTimeout() Wait forever for a result
6 Name your threads Custom ThreadFactory with meaningful names Debug with “pool-3-thread-7” names
7 Use allOf for parallel batch operations allOf(futures) then collect results Sequential loop calling join() on each
import java.util.concurrent.*;

public class BestPracticesDemo {
    public static void main(String[] args) {
        // GOOD: Custom executor for I/O work
        ExecutorService ioPool = Executors.newFixedThreadPool(20, r -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("io-worker-" + t.getId());
            return t;
        });

        // GOOD: Always handle exceptions
        CompletableFuture safe = CompletableFuture
            .supplyAsync(() -> fetchFromDatabase("user-123"), ioPool)
            .thenApply(data -> process(data))
            .exceptionally(ex -> {
                System.err.println("Pipeline failed: " + ex.getMessage());
                return "fallback";
            });
        System.out.println("Safe result: " + safe.join());

        // GOOD: Use join() instead of get()
        String result = safe.join(); // Throws CompletionException (unchecked)
        // NOT: safe.get(); // Throws ExecutionException (checked) -- forces try/catch

        // GOOD: Use thenCompose for async chains
        CompletableFuture composed = CompletableFuture
            .supplyAsync(() -> "user-123", ioPool)
            .thenCompose(id -> fetchAsync(id))     // Returns CompletableFuture
            .thenCompose(user -> enrichAsync(user)) // Chain of async calls
            .exceptionally(ex -> "error: " + ex.getMessage());
        System.out.println("Composed: " + composed.join());

        // BAD: Sequential blocking -- defeats the purpose!
        // for (CompletableFuture f : futures) {
        //     results.add(f.join()); // BLOCKS on each one sequentially!
        // }

        // GOOD: Parallel with allOf
        // CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        //     .thenApply(v -> futures.stream()
        //         .map(CompletableFuture::join)
        //         .collect(Collectors.toList()));

        ioPool.shutdown();
    }

    static String fetchFromDatabase(String id) {
        return "data for " + id;
    }

    static String process(String data) {
        return "processed: " + data;
    }

    static CompletableFuture fetchAsync(String id) {
        return CompletableFuture.completedFuture("User{" + id + "}");
    }

    static CompletableFuture enrichAsync(String user) {
        return CompletableFuture.completedFuture("Enriched " + user);
    }
}

10. Complete Practical Example — Parallel Order Processing System

This example demonstrates a realistic e-commerce order processing pipeline that uses CompletableFuture to parallelize inventory checks, payment processing, and notification delivery.

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class OrderProcessingSystem {

    // Thread pool for I/O operations
    static final ExecutorService IO_POOL = Executors.newFixedThreadPool(10, r -> {
        Thread t = new Thread(r);
        t.setDaemon(true);
        t.setName("io-worker-" + t.getId());
        return t;
    });

    // --- Domain classes ---
    record Order(String orderId, String customerId, List items) {}
    record OrderItem(String productId, String name, double price, int quantity) {}
    record InventoryResult(String productId, boolean available, int stock) {}
    record PaymentResult(String transactionId, boolean success, String message) {}
    record ShippingResult(String trackingNumber, String estimatedDelivery) {}

    // --- Service simulations ---

    static CompletableFuture checkInventory(OrderItem item) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(100 + new Random().nextInt(200));
            boolean available = new Random().nextInt(10) > 1; // 90% available
            int stock = available ? 10 + new Random().nextInt(90) : 0;
            System.out.printf("  [Inventory] %s: %s (stock: %d)%n",
                item.name(), available ? "AVAILABLE" : "OUT OF STOCK", stock);
            return new InventoryResult(item.productId(), available, stock);
        }, IO_POOL);
    }

    static CompletableFuture processPayment(String customerId, double total) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(300);
            String txnId = "TXN-" + UUID.randomUUID().toString().substring(0, 8).toUpperCase();
            boolean success = new Random().nextInt(10) > 0; // 90% success
            String message = success ? "Payment approved" : "Insufficient funds";
            System.out.printf("  [Payment] Customer %s, $%.2f: %s (%s)%n",
                customerId, total, message, txnId);
            if (!success) {
                throw new RuntimeException("Payment failed: " + message);
            }
            return new PaymentResult(txnId, true, message);
        }, IO_POOL);
    }

    static CompletableFuture createShipment(Order order) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(200);
            String tracking = "SHIP-" + UUID.randomUUID().toString().substring(0, 8).toUpperCase();
            System.out.printf("  [Shipping] Order %s: tracking %s%n", order.orderId(), tracking);
            return new ShippingResult(tracking, "3-5 business days");
        }, IO_POOL);
    }

    static CompletableFuture sendEmail(String to, String subject) {
        return CompletableFuture.runAsync(() -> {
            sleep(100);
            System.out.printf("  [Email] To: %s | Subject: %s%n", to, subject);
        }, IO_POOL);
    }

    static CompletableFuture sendSms(String to, String message) {
        return CompletableFuture.runAsync(() -> {
            sleep(50);
            System.out.printf("  [SMS] To: %s | Message: %s%n", to, message);
        }, IO_POOL);
    }

    // --- Main processing pipeline ---

    static CompletableFuture processOrder(Order order) {
        System.out.println("\nProcessing order: " + order.orderId());
        long startTime = System.currentTimeMillis();

        // Step 1: Check inventory for ALL items in PARALLEL
        System.out.println("Step 1: Checking inventory...");
        List> inventoryChecks = order.items().stream()
            .map(OrderProcessingSystem::checkInventory)
            .collect(Collectors.toList());

        CompletableFuture> allInventory = CompletableFuture
            .allOf(inventoryChecks.toArray(new CompletableFuture[0]))
            .thenApply(v -> inventoryChecks.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));

        // Step 2: Verify all items available, then process payment
        return allInventory
            .thenCompose(inventoryResults -> {
                // Check if any item is out of stock
                List outOfStock = inventoryResults.stream()
                    .filter(r -> !r.available())
                    .map(InventoryResult::productId)
                    .collect(Collectors.toList());

                if (!outOfStock.isEmpty()) {
                    CompletableFuture failed = new CompletableFuture<>();
                    failed.completeExceptionally(
                        new RuntimeException("Items out of stock: " + outOfStock));
                    return failed;
                }

                // Calculate total
                double total = order.items().stream()
                    .mapToDouble(item -> item.price() * item.quantity())
                    .sum();

                System.out.println("Step 2: Processing payment ($" + String.format("%.2f", total) + ")...");
                return processPayment(order.customerId(), total);
            })
            // Step 3: Create shipment
            .thenCompose(payment -> {
                System.out.println("Step 3: Creating shipment...");
                return createShipment(order)
                    .thenApply(shipping -> Map.of(
                        "payment", payment,
                        "shipping", shipping
                    ));
            })
            // Step 4: Send notifications in PARALLEL
            .thenCompose(results -> {
                System.out.println("Step 4: Sending notifications...");
                PaymentResult payment = (PaymentResult) results.get("payment");
                ShippingResult shipping = (ShippingResult) results.get("shipping");

                CompletableFuture email = sendEmail(
                    order.customerId() + "@example.com",
                    "Order " + order.orderId() + " confirmed! Tracking: " + shipping.trackingNumber()
                );
                CompletableFuture sms = sendSms(
                    order.customerId(),
                    "Order confirmed! Track: " + shipping.trackingNumber()
                );

                return CompletableFuture.allOf(email, sms)
                    .thenApply(v -> String.format(
                        "Order %s COMPLETE | Payment: %s | Tracking: %s | Delivery: %s",
                        order.orderId(), payment.transactionId(),
                        shipping.trackingNumber(), shipping.estimatedDelivery()
                    ));
            })
            // Handle errors
            .exceptionally(ex -> {
                String error = "Order " + order.orderId() + " FAILED: " + ex.getMessage();
                System.out.println("  [ERROR] " + error);
                // Send failure notification
                sendEmail(order.customerId() + "@example.com",
                    "Order " + order.orderId() + " could not be processed").join();
                return error;
            })
            .whenComplete((result, ex) -> {
                long elapsed = System.currentTimeMillis() - startTime;
                System.out.println("  [Timer] Order " + order.orderId() + " took " + elapsed + "ms");
            });
    }

    public static void main(String[] args) {
        System.out.println("====================================");
        System.out.println("  Order Processing System");
        System.out.println("====================================");

        // Create sample orders
        Order order1 = new Order("ORD-001", "CUST-100", List.of(
            new OrderItem("PROD-1", "Laptop", 999.99, 1),
            new OrderItem("PROD-2", "Mouse", 29.99, 2),
            new OrderItem("PROD-3", "Keyboard", 79.99, 1)
        ));

        Order order2 = new Order("ORD-002", "CUST-200", List.of(
            new OrderItem("PROD-4", "Monitor", 399.99, 1),
            new OrderItem("PROD-5", "Headset", 149.99, 1)
        ));

        Order order3 = new Order("ORD-003", "CUST-300", List.of(
            new OrderItem("PROD-6", "Webcam", 89.99, 1)
        ));

        // Process ALL orders in PARALLEL
        List orders = List.of(order1, order2, order3);
        long totalStart = System.currentTimeMillis();

        List> orderFutures = orders.stream()
            .map(OrderProcessingSystem::processOrder)
            .collect(Collectors.toList());

        // Wait for all orders to complete
        CompletableFuture.allOf(orderFutures.toArray(new CompletableFuture[0])).join();

        // Print final results
        System.out.println("\n====================================");
        System.out.println("  FINAL RESULTS");
        System.out.println("====================================");

        for (CompletableFuture future : orderFutures) {
            System.out.println("  " + future.join());
        }

        long totalElapsed = System.currentTimeMillis() - totalStart;
        System.out.println("\nTotal processing time: " + totalElapsed + "ms");
        System.out.println("(All " + orders.size() + " orders processed in parallel)");

        // Summary of concepts used
        System.out.println("\n====================================");
        System.out.println("  Concepts Demonstrated");
        System.out.println("====================================");
        System.out.println("  1.  supplyAsync()     - Async computation with result");
        System.out.println("  2.  runAsync()        - Fire-and-forget async tasks");
        System.out.println("  3.  Custom executor   - Dedicated I/O thread pool");
        System.out.println("  4.  thenApply()       - Transform results");
        System.out.println("  5.  thenCompose()     - Chain dependent async operations");
        System.out.println("  6.  thenCombine()     - (shown in earlier examples)");
        System.out.println("  7.  allOf()           - Wait for all parallel tasks");
        System.out.println("  8.  exceptionally()   - Error recovery with fallback");
        System.out.println("  9.  whenComplete()    - Logging/timing side effects");
        System.out.println("  10. completeExceptionally() - Manual failure");
        System.out.println("  11. completedFuture() - (shown in earlier examples)");
        System.out.println("  12. Parallel streams  - Collecting future results");

        IO_POOL.shutdown();
    }

    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); }
    }
}

// Sample Output:
// ====================================
//   Order Processing System
// ====================================
//
// Processing order: ORD-001
// Step 1: Checking inventory...
// Processing order: ORD-002
// Step 1: Checking inventory...
// Processing order: ORD-003
// Step 1: Checking inventory...
//   [Inventory] Webcam: AVAILABLE (stock: 45)
//   [Inventory] Mouse: AVAILABLE (stock: 72)
//   [Inventory] Keyboard: AVAILABLE (stock: 38)
//   [Inventory] Headset: AVAILABLE (stock: 91)
//   [Inventory] Monitor: AVAILABLE (stock: 23)
//   [Inventory] Laptop: AVAILABLE (stock: 56)
// Step 2: Processing payment ($89.99)...
// Step 2: Processing payment ($549.98)...
// Step 2: Processing payment ($1139.96)...
//   [Payment] Customer CUST-300, $89.99: Payment approved (TXN-A1B2C3D4)
// Step 3: Creating shipment...
//   [Payment] Customer CUST-200, $549.98: Payment approved (TXN-E5F6G7H8)
// Step 3: Creating shipment...
//   [Payment] Customer CUST-100, $1139.96: Payment approved (TXN-I9J0K1L2)
// Step 3: Creating shipment...
//   [Shipping] Order ORD-003: tracking SHIP-M3N4O5P6
// Step 4: Sending notifications...
//   [Shipping] Order ORD-002: tracking SHIP-Q7R8S9T0
// Step 4: Sending notifications...
//   [SMS] To: CUST-300 | Message: Order confirmed! Track: SHIP-M3N4O5P6
//   [Email] To: CUST-300@example.com | Subject: Order ORD-003 confirmed!
//   [Shipping] Order ORD-001: tracking SHIP-U1V2W3X4
// Step 4: Sending notifications...
//   [Timer] Order ORD-003 took 652ms
//   [SMS] To: CUST-200 | Message: Order confirmed! Track: SHIP-Q7R8S9T0
//   [Email] To: CUST-200@example.com | Subject: Order ORD-002 confirmed!
//   [Timer] Order ORD-002 took 785ms
//   [SMS] To: CUST-100 | Message: Order confirmed! Track: SHIP-U1V2W3X4
//   [Email] To: CUST-100@example.com | Subject: Order ORD-001 confirmed!
//   [Timer] Order ORD-001 took 892ms
//
// ====================================
//   FINAL RESULTS
// ====================================
//   Order ORD-001 COMPLETE | Payment: TXN-I9J0K1L2 | Tracking: SHIP-U1V2W3X4
//   Order ORD-002 COMPLETE | Payment: TXN-E5F6G7H8 | Tracking: SHIP-Q7R8S9T0
//   Order ORD-003 COMPLETE | Payment: TXN-A1B2C3D4 | Tracking: SHIP-M3N4O5P6
//
// Total processing time: 895ms
// (All 3 orders processed in parallel)

Quick Reference

Category Method Description
Creating supplyAsync(supplier) Run async, return result
runAsync(runnable) Run async, no result
completedFuture(value) Already-complete future
new CompletableFuture() Manually completable
Transforming thenApply(fn) Transform result (like map)
thenAccept(consumer) Consume result (no return)
thenRun(runnable) Run after completion (ignores result)
Composing thenCompose(fn) Chain async operations (flatMap)
thenCombine(other, fn) Merge two parallel results
Combining allOf(futures...) Wait for all to complete
anyOf(futures...) First to complete wins
Errors exceptionally(fn) Provide fallback on error
handle(bifn) Handle both success and error
whenComplete(biconsumer) Inspect result/error (no transform)
Getting Result join() Block and get result (unchecked exception)
get(timeout, unit) Block with timeout (checked exception)
Timeout (9+) orTimeout(time, unit) Fail with TimeoutException
completeOnTimeout(val, time, unit) Use default value on timeout
September 24, 2019

Brain Teaser

September 23, 2019