What is Flask?
Flask is a web development framework. It provides a lot libraries for building lightweight web applications in python. Flask is considered by many as a micro-framework due to the fact that it only provides the essential components – things like routing, request handling, sessions, and so on. It provides you with libraries, tools, and modules to develop web applications like a blog, wiki, or even a commercial website. It’s considered “beginner friendly” because it doesn’t have boilerplate code or dependencies which can distract from the primary function of an application. Data handling handling for example, the developer can write a custom module or use an extension.
What is Flask used for?
Flask is mainly used for backend development, but it makes use of a templating language called Jinja2 which is used to create HTML, XML or other markup formats that are returned to the user via an HTTP request.
Note that in this series of tutorial, we’ll be using Flask as a backend framework and React as a frontend framework.
The goal of filters is to reduce the number of documents that have to be examined by the query. Queries have to not only find matching documents, but also calculate how relevant each document is, which typically makes queries heavier than filters. Also, query results are not cachable. Filter is quick to calculate and easy to cache in memory, using only 1 bit per document. These cached filters can be reused efficiently for subsequent requests.
When to Use filter vs query?
As a general rule, use query clauses for full-text search or for any condition that should affect the relevance score, and use filter clauses for everything else.
There are two ways to filter search results.
filter clause. Search requests apply boolean filters to both search hits and aggregations.post_filter parameter. Search requests apply post filters only to search hits, not aggregations. You can use a post filter to calculate aggregations based on a broader result set, and then further narrow the results. A post filter has no impact on the aggregation results.Term filter
The term filter is used to filter by exact values, be they numbers, dates, Booleans, or not_analyzed exact-value string fields. Here we are filtering out all users whose rating is not 5. In other words, only retrieve users with rating of 5.
GET elasticsearch_learning/_search
{
"query": {
"bool": {
"filter": [
{ "term": { "rating": 5 }}
]
}
}
}
/**
* https://www.elastic.co/guide/en/elasticsearch/reference/current/filter-search-results.html
*/
@Test
void filterQuery() {
int pageNumber = 0;
int pageSize = 5;
SearchRequest searchRequest = new SearchRequest(database);
searchRequest.allowPartialSearchResults(true);
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.from(pageNumber * pageSize);
searchSourceBuilder.size(pageSize);
searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
/**
* fetch only a few fields
*/
searchSourceBuilder.fetchSource(new String[]{"id", "firstName", "lastName", "rating", "dateOfBirth"}, new String[]{""});
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.filter(QueryBuilders.termQuery("rating", 5));
searchSourceBuilder.query(boolQuery);
searchRequest.source(searchSourceBuilder);
searchRequest.preference("rating");
if (searchSourceBuilder.query() != null && searchSourceBuilder.sorts() != null && searchSourceBuilder.sorts().size() > 0) {
log.info("\n{\n\"query\":{}, \"sort\":{}\n}", searchSourceBuilder.query().toString(), searchSourceBuilder.sorts().toString());
} else {
log.info("\n{\n\"query\":{}\n}", searchSourceBuilder.query().toString());
}
try {
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
log.info("isTimedOut={}, totalShards={}, totalHits={}", searchResponse.isTimedOut(), searchResponse.getTotalShards(), searchResponse.getHits().getTotalHits().value);
List<User> users = getResponseResult(searchResponse.getHits());
log.info("results={}", ObjectUtils.toJson(users));
} catch (IOException e) {
log.warn("IOException, msg={}", e.getLocalizedMessage());
e.printStackTrace();
} catch (Exception e) {
log.warn("Exception, msg={}", e.getLocalizedMessage());
e.printStackTrace();
}
}
Range filter
The range filter allows you to find numbers or dates that fall into a specified range. Here we are filtering out all users whose rating is either a 2, 3, or 4.
GET elasticsearch_learning/_search
{
"query":{
"bool" : {
"filter" : [
{
"range" : {
"rating" : {
"from" : 2,
"to" : 4
}
}
}
]
}
}
}
/**
* https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-range-query.html
*/
@Test
void filterQueryWithRange() {
int pageNumber = 0;
int pageSize = 5;
SearchRequest searchRequest = new SearchRequest(database);
searchRequest.allowPartialSearchResults(true);
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.from(pageNumber * pageSize);
searchSourceBuilder.size(pageSize);
searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
/**
* fetch only a few fields
*/
searchSourceBuilder.fetchSource(new String[]{"id", "firstName", "lastName", "rating", "dateOfBirth"}, new String[]{""});
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.filter(QueryBuilders.rangeQuery("rating").gte(2).lte(4));
searchSourceBuilder.query(boolQuery);
searchRequest.source(searchSourceBuilder);
searchRequest.preference("rating");
if (searchSourceBuilder.query() != null && searchSourceBuilder.sorts() != null && searchSourceBuilder.sorts().size() > 0) {
log.info("\n{\n\"query\":{}, \"sort\":{}\n}", searchSourceBuilder.query().toString(), searchSourceBuilder.sorts().toString());
} else {
log.info("\n{\n\"query\":{}\n}", searchSourceBuilder.query().toString());
}
try {
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
log.info("isTimedOut={}, totalShards={}, totalHits={}", searchResponse.isTimedOut(), searchResponse.getTotalShards(), searchResponse.getHits().getTotalHits().value);
List<User> users = getResponseResult(searchResponse.getHits());
log.info("results={}", ObjectUtils.toJson(users));
} catch (IOException e) {
log.warn("IOException, msg={}", e.getLocalizedMessage());
e.printStackTrace();
} catch (Exception e) {
log.warn("Exception, msg={}", e.getLocalizedMessage());
e.printStackTrace();
}
}
Exists Filter
The exists and missing filters are used to find documents in which the specified field either has one or more values (exists) or doesn’t have any values (missing). It is similar in nature to IS_NULL (missing) and NOT IS_NULL (exists)in SQL.
{
"exists": {
"field": "name"
}
}
Here we are filtering out all users that have logged into the system.
GET elasticsearch_learning/_search
{
"query":{
"bool" : {
"filter" : [
{
"exists" : {
"field" : "lastLoggedInAt"
}
}
]
}
}
/**
* https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-range-query.html
*/
@Test
void filterQueryWithExists() {
int pageNumber = 0;
int pageSize = 5;
SearchRequest searchRequest = new SearchRequest(database);
searchRequest.allowPartialSearchResults(true);
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.from(pageNumber * pageSize);
searchSourceBuilder.size(pageSize);
searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
/**
* fetch only a few fields
*/
searchSourceBuilder.fetchSource(new String[]{"id", "firstName", "lastName", "rating", "dateOfBirth"}, new String[]{""});
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.filter(QueryBuilders.existsQuery("lastLoggedInAt"));
searchSourceBuilder.query(boolQuery);
searchRequest.source(searchSourceBuilder);
searchRequest.preference("rating");
if (searchSourceBuilder.query() != null && searchSourceBuilder.sorts() != null && searchSourceBuilder.sorts().size() > 0) {
log.info("\n{\n\"query\":{}, \"sort\":{}\n}", searchSourceBuilder.query().toString(), searchSourceBuilder.sorts().toString());
} else {
log.info("\n{\n\"query\":{}\n}", searchSourceBuilder.query().toString());
}
try {
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
log.info("isTimedOut={}, totalShards={}, totalHits={}", searchResponse.isTimedOut(), searchResponse.getTotalShards(), searchResponse.getHits().getTotalHits().value);
List<User> users = getResponseResult(searchResponse.getHits());
log.info("results={}", ObjectUtils.toJson(users));
} catch (IOException e) {
log.warn("IOException, msg={}", e.getLocalizedMessage());
e.printStackTrace();
} catch (Exception e) {
log.warn("Exception, msg={}", e.getLocalizedMessage());
e.printStackTrace();
}
}
Imagine having to execute a long running process and waiting for it to finish, most likely you are connected via ssh and you can’t do anything else but waiting for that process to finish. When the process is taking too long your connection will time out and you will be kicked out. When that happens your process will stop its execution and you will have to rerun it. This is painful and here is where screen comes into the picture.
For example, you are downloading a big file or a database using mysql dump command. This usually takes 10 to 20 minutes.
You can use the Linux screen command to push running terminal applications or processes to the background and pull them forward when you want to see them. It also supports split-screen displays and works over SSH connections, even after you disconnect and reconnect.
The usual operation with screen is to create a new window with a shell in it, run a command, and then push the window to the background (called “detaching”). When you want to see how your process is doing, you can pull the window to the foreground again (“reattach”) and use it again. This is great for long processes you don’t want to accidentally terminate by closing the terminal window.
Once you’ve got a screen session running, you can create new windows and run other processes in them. You can easily hop between windows to monitor their progress. You can also split your terminal window into vertical or horizontal regions, and display your various screen windows in one window.
You can connect to a remote machine, start a screen session, and launch a process. You can disconnect from the remote host, reconnect, and your process will still be running.
You can share a screen session between two different SSH connections so two people can see the same thing, in real-time.
Intall screen
# ubuntu sudo apt update sudo apt install screen # centos or fedora sudo yum install screen
Start a screen session
screen # start screen session with a name screen -S loading-database
This will open a screen session, create a new window, and start a shell in that window.
Most used commands for a screen window
Ctrl+a c Create a new window (with shell).Ctrl+a " List all windows.Ctrl+a 0 Switch to window 0 (by number).Ctrl+a A Rename the current window.Ctrl+a S Split current region horizontally into two regions.Ctrl+a | Split current region vertically into two regions.Ctrl+a tab Switch the input focus to the next region.Ctrl+a Ctrl+a Toggle between the current and previous windowsCtrl+a Q Close all regions but the current one.Ctrl+a X Close the current region.Detach from a screen session
Ctrl+a Ctrl+d
The program running in the screen session will continue to run after you detach from the session.
Reattach to a screen session
# resume current screen session, assuming one screen session screen -r
In case you have multiple screen sessions running on your machine, you will need to append the screen session ID after the r switch.
# list the current running screen sessions
screen -ls
# output
There is a screen on:
30093.pts-0.ip-172-31-8-213 (09/24/21 16:07:23) (Attached)
1 Socket in /run/screen/S-ubuntu.
# command
screen -r 30093
Terminate a screen session
Make sure you on the screen session and then run the exit command.
exit
or run this command out of the screen session
screen -X -S screen_id kill (or quit) or screen -XS <screen_id> kill (or quit) # Example screen -X -S 30093 kill or screen -XS 30093 quit
When running queries in your production environment, you have to be careful so you don’t bring it down or make a mistake that will cause your environment to act abnormally.
Here are some things you need to be careful about:
The MySQL READ UNCOMMITTED, also known as “dirty read” because it is the lowest level of isolation. If we specify a table hint then it will override the current default isolation level. MySQL default isolation level is REPEATABLE READ which means locks will be placed for each operation, but multiple connections can read data concurrently.
SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED ; SELECT * FROM TABLE_NAME ; SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ ;
Do everything you can to run query againts dev, qa, or backup database before running your query against a production database. And when you do run your query against your production database, make sure to do it first against backup tables and then the actual tables. A lot of times you will find that as a safety net in case you make a mistake on first try, you will be safe as that would be against the backup and not the actual live data. Once everything looks good in the backup tables then you can run your query against the actual live data.
Make sure that when running queries that your dev database server has the same version as your production database server. If they have to be in different versions the production server must be in higher version than the dev server. In most cases where the version is not the same the dev server is in higher version so be careful about that. The reason for this check is that you may have a query that works in dev and won’t work in production because the query uses functions that the production server does not support. For example. JSON_ARRAYAGG is not in mysql version 5.7. My dev server was in mysql 8 and production server was in 5.7. Not until release day that I realized my database migration was not going to work in production and had to pull it back.
If possible, run insert/update migration scripts during slow or off hours. There is a good chance you will run into issues especially with updating data based on existing data in the database. You might not be reading live data to perform updates as users are using and updating your data.
You can never go wrong with having help and verifying your query is doing what is intended to do. In most cases, you work in a team, you should have a team member or your team lead look at your query to make sure it’s doing what is intended to do. You can’t take a risk of running a query that may change thousands of records without having someone else review it.
Imagine ordering food at a restaurant. You walk up to the counter, place your order, and the cashier hands you a buzzer. That buzzer is a promise: “Your food will be ready at some point. Go sit down, check your phone, chat with friends — when it’s done, the buzzer will vibrate and you can pick it up.” You are not standing at the counter blocking everyone behind you. You are free to do other things while your order is being prepared.
CompletableFuture is that buzzer. It represents a future result of an asynchronous computation — a value that will be available at some point. It was introduced in Java 8 (in the java.util.concurrent package) and is the most powerful tool Java offers for writing non-blocking, asynchronous code.
Before CompletableFuture, Java had two main approaches to concurrent programming — both with significant limitations:
Raw Threads: You create a Thread, override run(), and call start(). But run() returns void — there is no built-in way to get a result back. You end up sharing mutable state, using wait()/notify(), and debugging race conditions at 2 AM.
ExecutorService + Future: Better. You submit a Callable to an ExecutorService and get a Future<T> back. But Future has a fatal flaw: the only way to get the result is to call get(), which blocks the calling thread. You cannot attach a callback. You cannot chain operations. You cannot combine multiple futures. You are back to blocking.
CompletableFuture solves all of these problems:
get() and block.Future which wraps everything in ExecutionException.CompletableFuture and complete it yourself from any thread, which is why it’s called “completable.”| Feature | Future | CompletableFuture |
|---|---|---|
| Get result | get() — blocks |
get(), join(), or non-blocking callbacks |
| Attach callback | Not supported | thenApply(), thenAccept(), thenRun() |
| Chain operations | Not supported | thenCompose(), thenApply() |
| Combine futures | Not supported | thenCombine(), allOf(), anyOf() |
| Exception handling | ExecutionException wrapper |
exceptionally(), handle(), whenComplete() |
| Manually complete | Not supported | complete(), completeExceptionally() |
| Cancel | cancel() — limited |
cancel() — does not interrupt running tasks |
Think of Future as a read-only receipt that says “your result will be here eventually, keep checking.” CompletableFuture is a full-featured event system: “When the result arrives, here’s what I want you to do with it.”
import java.util.concurrent.*;
public class FutureVsCompletableFuture {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
// OLD WAY: Future -- blocks on get()
Future future = executor.submit(() -> {
Thread.sleep(1000);
return "Result from Future";
});
// This blocks the current thread for ~1 second
String result = future.get();
System.out.println(result);
// Output: Result from Future
// NEW WAY: CompletableFuture -- non-blocking callbacks
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "Result from CompletableFuture";
});
// Non-blocking! This runs on a different thread when the result is ready
cf.thenAccept(r -> System.out.println(r));
// Output (after ~1 second): Result from CompletableFuture
// Keep the program alive long enough for async operations to complete
Thread.sleep(2000);
executor.shutdown();
}
}
There are four main ways to create a CompletableFuture, each suited for different situations.
Use supplyAsync() when your asynchronous operation produces a result. It takes a Supplier<T> (a function that takes no arguments and returns a value) and runs it on a background thread.
import java.util.concurrent.CompletableFuture;
public class SupplyAsyncExample {
public static void main(String[] args) {
// supplyAsync runs on ForkJoinPool.commonPool() by default
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("Running on: " + Thread.currentThread().getName());
// Simulate database query
try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "User{id=1, name='Alice'}";
});
// Do other work while the async operation runs
System.out.println("Main thread is free to do other work...");
System.out.println("Main thread: " + Thread.currentThread().getName());
// Get the result (blocks only if not yet complete)
String user = future.join();
System.out.println("Result: " + user);
// Output:
// Main thread is free to do other work...
// Main thread: main
// Running on: ForkJoinPool.commonPool-worker-1
// Result: User{id=1, name='Alice'}
}
}
Use runAsync() when your asynchronous operation does not produce a result — it performs a side effect like logging, sending a notification, or writing to a file. It takes a Runnable and returns CompletableFuture<Void>.
import java.util.concurrent.CompletableFuture;
public class RunAsyncExample {
public static void main(String[] args) {
CompletableFuture future = CompletableFuture.runAsync(() -> {
System.out.println("Sending email on: " + Thread.currentThread().getName());
try { Thread.sleep(300); } catch (InterruptedException e) { throw new RuntimeException(e); }
System.out.println("Email sent successfully!");
});
System.out.println("Main thread continues...");
// join() returns null for Void futures, but waits for completion
future.join();
System.out.println("Done.");
// Output:
// Main thread continues...
// Sending email on: ForkJoinPool.commonPool-worker-1
// Email sent successfully!
// Done.
}
}
Use completedFuture() when you already have the result and want to wrap it in a CompletableFuture. This is useful for testing, caching, or when a method signature requires a CompletableFuture but you have the value immediately.
import java.util.concurrent.CompletableFuture;
public class CompletedFutureExample {
public static void main(String[] args) {
// Already completed -- no async work happens
CompletableFuture cached = CompletableFuture.completedFuture("Cached Value");
// join() returns immediately -- no waiting
System.out.println(cached.join());
System.out.println("Is done? " + cached.isDone());
// Output:
// Cached Value
// Is done? true
}
// Common use case: method that returns CompletableFuture but sometimes has cached data
static CompletableFuture fetchUser(String userId) {
String cached = getFromCache(userId);
if (cached != null) {
return CompletableFuture.completedFuture(cached); // No async work needed
}
return CompletableFuture.supplyAsync(() -> fetchFromDatabase(userId)); // Async DB call
}
static String getFromCache(String userId) { return "1".equals(userId) ? "Alice" : null; }
static String fetchFromDatabase(String userId) { return "User-" + userId; }
}
By default, supplyAsync() and runAsync() use the ForkJoinPool.commonPool(). This shared thread pool is designed for CPU-bound work. If your async operations involve I/O (database calls, HTTP requests, file operations), you should provide a custom executor to avoid starving the common pool.
This is one of the most important production considerations. The common pool has a limited number of threads (typically Runtime.getRuntime().availableProcessors() - 1). If you fill it with slow I/O operations, all CompletableFuture operations in your entire application slow down — including parallel streams.
import java.util.concurrent.*;
public class CustomExecutorExample {
// Dedicated thread pool for I/O operations
private static final ExecutorService IO_EXECUTOR = Executors.newFixedThreadPool(10, r -> {
Thread t = new Thread(r);
t.setDaemon(true); // Won't prevent JVM shutdown
t.setName("io-worker-" + t.getId());
return t;
});
public static void main(String[] args) {
// Pass custom executor as second argument
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("Running on: " + Thread.currentThread().getName());
try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "Data from database";
}, IO_EXECUTOR); // <-- Custom executor
CompletableFuture logFuture = CompletableFuture.runAsync(() -> {
System.out.println("Logging on: " + Thread.currentThread().getName());
}, IO_EXECUTOR); // <-- Same custom executor
future.thenAccept(data -> System.out.println("Received: " + data));
logFuture.join();
future.join();
// Output:
// Running on: io-worker-21
// Logging on: io-worker-22
// Received: Data from database
IO_EXECUTOR.shutdown();
}
}
| Method | Returns | Input | Use When |
|---|---|---|---|
supplyAsync(supplier) |
CompletableFuture<T> |
Supplier<T> |
Async operation produces a result |
supplyAsync(supplier, executor) |
CompletableFuture<T> |
Supplier<T> |
Same, with custom thread pool |
runAsync(runnable) |
CompletableFuture<Void> |
Runnable |
Async operation with no result (side effects) |
runAsync(runnable, executor) |
CompletableFuture<Void> |
Runnable |
Same, with custom thread pool |
completedFuture(value) |
CompletableFuture<T> |
T |
Already have the value (caching, testing) |
At some point, you need to extract the actual result from a CompletableFuture. Java provides several ways to do this, each with different trade-offs.
Both join() and get() block the calling thread until the result is available. The key difference is how they handle exceptions:
| Method | Exception Type | Requires try-catch? | Preferred? |
|---|---|---|---|
join() |
CompletionException (unchecked) |
No | Yes — cleaner code, works in streams |
get() |
ExecutionException + InterruptedException (checked) |
Yes | Only when you need timeout |
import java.util.concurrent.*;
public class JoinVsGetExample {
public static void main(String[] args) {
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello");
// join() -- clean, no checked exceptions
String result1 = future.join();
System.out.println("join: " + result1);
// Output: join: Hello
// get() -- requires try-catch for checked exceptions
try {
String result2 = future.get();
System.out.println("get: " + result2);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// Output: get: Hello
// get() with timeout -- useful to prevent indefinite blocking
CompletableFuture slowFuture = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "Slow result";
});
try {
String result3 = slowFuture.get(1, TimeUnit.SECONDS); // Wait max 1 second
} catch (TimeoutException e) {
System.out.println("Timed out! The operation took too long.");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// Output: Timed out! The operation took too long.
}
}
getNow(defaultValue) returns the result immediately if it is already complete, or returns the default value if it is not yet done. This never blocks.
import java.util.concurrent.CompletableFuture;
public class GetNowExample {
public static void main(String[] args) throws InterruptedException {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "Computed Value";
});
// Not done yet -- returns default
String immediate = future.getNow("Default Value");
System.out.println("Immediate: " + immediate);
// Output: Immediate: Default Value
// Wait for completion
Thread.sleep(1500);
// Now done -- returns actual result
String completed = future.getNow("Default Value");
System.out.println("After wait: " + completed);
// Output: After wait: Computed Value
}
}
The real power of CompletableFuture comes from its ability to chain transformations. When an async operation completes, you can automatically transform, consume, or follow up on the result without blocking. These three methods are the workhorses of CompletableFuture pipelines.
thenApply() is like map() in the Stream API. It takes the result, transforms it, and returns a new CompletableFuture with the transformed value. Use it when you want to convert the result from one type to another.
import java.util.concurrent.CompletableFuture;
public class ThenApplyExample {
public static void main(String[] args) {
CompletableFuture future = CompletableFuture
.supplyAsync(() -> " Hello, CompletableFuture! ")
.thenApply(s -> s.trim()) // Remove whitespace
.thenApply(s -> s.toUpperCase()) // Convert to uppercase
.thenApply(s -> s + " [processed]"); // Append suffix
System.out.println(future.join());
// Output: HELLO, COMPLETABLEFUTURE! [processed]
// Practical: Fetch user ID -> Fetch user -> Extract name
CompletableFuture userName = CompletableFuture
.supplyAsync(() -> getUserId()) // Returns Integer
.thenApply(id -> fetchUser(id)) // Integer -> User (String)
.thenApply(user -> extractName(user)); // User -> Name (String)
System.out.println("User: " + userName.join());
// Output: User: Alice
}
static int getUserId() { return 42; }
static String fetchUser(int id) { return "User{id=" + id + ", name=Alice}"; }
static String extractName(String user) { return "Alice"; }
}
thenAccept() takes the result and does something with it but returns nothing (CompletableFuture<Void>). Use it as the final step in a pipeline when you want to perform a side effect like printing, logging, or saving to a database.
import java.util.concurrent.CompletableFuture;
public class ThenAcceptExample {
public static void main(String[] args) {
CompletableFuture future = CompletableFuture
.supplyAsync(() -> fetchOrderTotal())
.thenApply(total -> total * 1.08) // Add 8% tax
.thenAccept(total -> // Consume: print invoice
System.out.printf("Invoice Total: $%.2f%n", total)
);
future.join();
// Output: Invoice Total: $108.00
}
static double fetchOrderTotal() { return 100.00; }
}
thenRun() takes a Runnable — it does not receive the result at all. Use it when you want to run an action after the previous stage completes, but you do not need the result. Common for cleanup tasks, notifications, or logging that a process finished.
import java.util.concurrent.CompletableFuture;
public class ThenRunExample {
public static void main(String[] args) {
CompletableFuture future = CompletableFuture
.supplyAsync(() -> {
System.out.println("Processing payment...");
try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); }
return "Payment Confirmed";
})
.thenAccept(result -> System.out.println("Result: " + result))
.thenRun(() -> System.out.println("Audit log: payment processing completed."))
.thenRun(() -> System.out.println("Cleanup: releasing resources."));
future.join();
// Output:
// Processing payment...
// Result: Payment Confirmed
// Audit log: payment processing completed.
// Cleanup: releasing resources.
}
}
| Method | Input | Return | Functional Interface | Use When |
|---|---|---|---|---|
thenApply(fn) |
Previous result | New value | Function<T, U> |
Transform the result (map) |
thenAccept(consumer) |
Previous result | Void |
Consumer<T> |
Consume the result (side effect) |
thenRun(action) |
Nothing | Void |
Runnable |
Run action after completion (ignore result) |
A simple way to remember: thenApply = I need the result and return something new. thenAccept = I need the result but return nothing. thenRun = I don’t need the result at all.
Sometimes one async operation depends on the result of another. For example: fetch a user ID, then use that ID to fetch the user’s orders. This is where thenCompose() and thenCombine() come in.
thenCompose() is like flatMap() in the Stream API. When the function you pass to thenApply() itself returns a CompletableFuture, you end up with a nested CompletableFuture<CompletableFuture<T>>. thenCompose() flattens this into a single CompletableFuture<T>.
Use it when: step B is itself an async operation that depends on the result of step A.
import java.util.concurrent.CompletableFuture;
import java.util.List;
public class ThenComposeExample {
public static void main(String[] args) {
// BAD: thenApply with async function creates nested CompletableFuture
CompletableFuture>> nested =
getUserIdAsync()
.thenApply(userId -> getOrdersAsync(userId)); // Returns CF>!
// GOOD: thenCompose flattens the nesting
CompletableFuture> flat =
getUserIdAsync()
.thenCompose(userId -> getOrdersAsync(userId)); // Returns CF
System.out.println("Orders: " + flat.join());
// Output: Orders: [Order-1001, Order-1002, Order-1003]
// Chain multiple dependent async operations
CompletableFuture pipeline = getUserIdAsync()
.thenCompose(userId -> getOrdersAsync(userId))
.thenCompose(orders -> calculateTotalAsync(orders))
.thenApply(total -> String.format("Total: $%.2f", total));
System.out.println(pipeline.join());
// Output: Total: $299.97
}
static CompletableFuture getUserIdAsync() {
return CompletableFuture.supplyAsync(() -> 42);
}
static CompletableFuture> getOrdersAsync(int userId) {
return CompletableFuture.supplyAsync(() ->
List.of("Order-1001", "Order-1002", "Order-1003")
);
}
static CompletableFuture calculateTotalAsync(List orders) {
return CompletableFuture.supplyAsync(() -> orders.size() * 99.99);
}
}
thenCombine() takes two independent CompletableFutures that can run in parallel and combines their results when both are done. Think of it as: “Run A and B simultaneously. When both finish, merge their results.”
import java.util.concurrent.CompletableFuture;
public class ThenCombineExample {
public static void main(String[] args) {
// Two independent async operations -- run in parallel
CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching user on: " + Thread.currentThread().getName());
sleep(1000);
return "Alice";
});
CompletableFuture balanceFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching balance on: " + Thread.currentThread().getName());
sleep(800);
return 1500.75;
});
// Combine results when BOTH are complete
CompletableFuture combined = userFuture.thenCombine(
balanceFuture,
(user, balance) -> String.format("%s has a balance of $%.2f", user, balance)
);
System.out.println(combined.join());
// Output: Alice has a balance of $1500.75
// Total time: ~1000ms (parallel), not 1800ms (sequential)
}
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); }
}
}
| Scenario | Method | Why |
|---|---|---|
Transform result synchronously (e.g., String -> Integer) |
thenApply() |
The function returns a plain value |
Chain to another async operation (e.g., userId -> fetchOrders(userId)) |
thenCompose() |
The function returns a CompletableFuture |
| Combine two independent futures | thenCombine() |
Both futures run in parallel, merge results |
Rule of thumb: If your lambda returns a CompletableFuture, use thenCompose(). If it returns a plain value, use thenApply(). This is exactly the same distinction as map() vs flatMap() in streams.
Real applications often need to fire off many async operations at once — fetching data from multiple microservices, querying multiple databases, or calling multiple APIs. CompletableFuture provides allOf() and anyOf() for this.
CompletableFuture.allOf() takes an array of CompletableFutures and returns a new CompletableFuture<Void> that completes when all of them are done. Note: it returns Void, so you need to extract the individual results yourself.
import java.util.concurrent.CompletableFuture;
import java.util.List;
import java.util.stream.Collectors;
public class AllOfExample {
public static void main(String[] args) {
long start = System.currentTimeMillis();
// Fire off three independent async calls
CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Alice";
});
CompletableFuture> ordersFuture = CompletableFuture.supplyAsync(() -> {
sleep(1200);
return List.of("Laptop", "Mouse", "Keyboard");
});
CompletableFuture balanceFuture = CompletableFuture.supplyAsync(() -> {
sleep(800);
return 2500.00;
});
// Wait for ALL to complete
CompletableFuture allDone = CompletableFuture.allOf(
userFuture, ordersFuture, balanceFuture
);
// When all are done, extract individual results
allDone.join();
String user = userFuture.join(); // Already complete -- returns immediately
List orders = ordersFuture.join();
Double balance = balanceFuture.join();
long elapsed = System.currentTimeMillis() - start;
System.out.println("User: " + user);
System.out.println("Orders: " + orders);
System.out.printf("Balance: $%.2f%n", balance);
System.out.println("Completed in " + elapsed + "ms");
// Output:
// User: Alice
// Orders: [Laptop, Mouse, Keyboard]
// Balance: $2500.00
// Completed in ~1200ms (not 3000ms -- parallel!)
}
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); }
}
}
A common pattern is to fire off a list of async operations and collect all the results into a list.
import java.util.concurrent.CompletableFuture;
import java.util.List;
import java.util.stream.Collectors;
public class AllOfCollectExample {
public static void main(String[] args) {
List userIds = List.of(1, 2, 3, 4, 5);
// Fire async call for each user ID
List> futures = userIds.stream()
.map(id -> CompletableFuture.supplyAsync(() -> fetchUser(id)))
.collect(Collectors.toList());
// Wait for all and collect results into a list
List users = futures.stream()
.map(CompletableFuture::join) // join() each future
.collect(Collectors.toList());
System.out.println("Users: " + users);
// Output: Users: [User-1, User-2, User-3, User-4, User-5]
}
static String fetchUser(int id) {
sleep(200); // Simulate API call
return "User-" + id;
}
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); }
}
}
CompletableFuture.anyOf() returns a CompletableFuture<Object> that completes as soon as any one of the given futures completes. This is useful for racing multiple data sources (e.g., primary DB vs cache vs backup) or implementing timeout patterns.
import java.util.concurrent.CompletableFuture;
public class AnyOfExample {
public static void main(String[] args) {
// Race multiple data sources -- take whichever responds first
CompletableFuture primaryDb = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "Data from Primary DB";
});
CompletableFuture cache = CompletableFuture.supplyAsync(() -> {
sleep(100);
return "Data from Cache";
});
CompletableFuture backupDb = CompletableFuture.supplyAsync(() -> {
sleep(3000);
return "Data from Backup DB";
});
// Returns as soon as the fastest one completes
CompletableFuture
Exception handling in async code is tricky. If an exception is thrown inside a supplyAsync() lambda, who catches it? There is no surrounding try-catch. The exception is captured by the CompletableFuture and propagated down the chain. CompletableFuture provides three methods for dealing with exceptions.
exceptionally() is like a catch block for async pipelines. If the previous stage fails with an exception, exceptionally() catches it and provides a fallback value. If the previous stage succeeds, exceptionally() is skipped.
import java.util.concurrent.CompletableFuture;
public class ExceptionallyExample {
public static void main(String[] args) {
// Success case -- exceptionally() is skipped
CompletableFuture success = CompletableFuture
.supplyAsync(() -> "Data loaded")
.exceptionally(ex -> "Fallback data");
System.out.println(success.join());
// Output: Data loaded
// Failure case -- exceptionally() catches and recovers
CompletableFuture failure = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("Database is down!");
return "Data loaded";
})
.exceptionally(ex -> {
System.out.println("Caught: " + ex.getMessage());
return "Fallback: cached data";
});
System.out.println(failure.join());
// Output:
// Caught: java.lang.RuntimeException: Database is down!
// Fallback: cached data
// Exception propagation through chains
CompletableFuture chain = CompletableFuture
.supplyAsync(() -> {
throw new RuntimeException("Step 1 failed");
})
.thenApply(result -> {
System.out.println("This never executes");
return result + " -> Step 2";
})
.thenApply(result -> {
System.out.println("This never executes either");
return result + " -> Step 3";
})
.exceptionally(ex -> "Recovered from: " + ex.getCause().getMessage());
System.out.println(chain.join());
// Output: Recovered from: Step 1 failed
// Note: thenApply steps were SKIPPED because an earlier stage failed
}
}
handle() is more general than exceptionally(). It receives both the result and the exception (one of them will be null). It always executes, regardless of success or failure. Use it when you need to transform the result on success AND provide a fallback on failure.
import java.util.concurrent.CompletableFuture;
public class HandleExample {
public static void main(String[] args) {
// handle() always runs -- both parameters are provided
// On success: result = value, exception = null
// On failure: result = null, exception = the exception
CompletableFuture successHandled = CompletableFuture
.supplyAsync(() -> "100")
.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
}
return "Parsed: " + Integer.parseInt(result);
});
System.out.println(successHandled.join());
// Output: Parsed: 100
CompletableFuture failureHandled = CompletableFuture
.supplyAsync(() -> {
throw new RuntimeException("Network timeout");
})
.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getCause().getMessage();
}
return "Success: " + result;
});
System.out.println(failureHandled.join());
// Output: Error: Network timeout
// Practical: Parse with fallback
CompletableFuture parsed = CompletableFuture
.supplyAsync(() -> "not_a_number")
.handle((result, ex) -> {
try {
return Integer.parseInt(result);
} catch (NumberFormatException e) {
System.out.println("Parse failed, using default");
return 0;
}
});
System.out.println("Value: " + parsed.join());
// Output:
// Parse failed, using default
// Value: 0
}
}
whenComplete() lets you observe the result or exception without modifying it. The original result (or exception) is passed through unchanged. This is ideal for logging or monitoring.
import java.util.concurrent.CompletableFuture;
public class WhenCompleteExample {
public static void main(String[] args) {
CompletableFuture future = CompletableFuture
.supplyAsync(() -> "Operation result")
.whenComplete((result, ex) -> {
// This is for side effects only (logging, metrics, etc.)
if (ex != null) {
System.out.println("ALERT: Operation failed: " + ex.getMessage());
} else {
System.out.println("LOG: Operation succeeded: " + result);
}
})
.thenApply(result -> result + " [verified]"); // Original result flows through
System.out.println(future.join());
// Output:
// LOG: Operation succeeded: Operation result
// Operation result [verified]
}
}
| Method | Receives | Returns New Value? | Use When |
|---|---|---|---|
exceptionally(ex) |
Exception only | Yes — fallback value | Recover from failure with a default |
handle(result, ex) |
Result AND exception | Yes — transformed value | Transform result or recover from failure |
whenComplete(result, ex) |
Result AND exception | No — passes through original | Side effects: logging, monitoring, cleanup |
Every callback method in CompletableFuture has an async version: thenApplyAsync(), thenAcceptAsync(), thenRunAsync(), thenComposeAsync(), handleAsync(), etc.
| Method | Callback Runs On | Thread Behavior |
|---|---|---|
thenApply(fn) |
Same thread that completed the previous stage, OR the calling thread | No guarantee — may be the async thread or the thread calling thenApply |
thenApplyAsync(fn) |
A thread from the default ForkJoinPool |
Always runs on a pool thread |
thenApplyAsync(fn, executor) |
A thread from the specified executor | You control exactly which pool |
import java.util.concurrent.*;
public class AsyncVariantsExample {
private static final ExecutorService IO_POOL = Executors.newFixedThreadPool(4);
public static void main(String[] args) {
CompletableFuture future = CompletableFuture
.supplyAsync(() -> {
System.out.println("Stage 1 on: " + Thread.currentThread().getName());
return "data";
})
// Non-async: may run on same thread as previous stage
.thenApply(data -> {
System.out.println("Stage 2 (thenApply) on: " + Thread.currentThread().getName());
return data.toUpperCase();
})
// Async: guaranteed to run on ForkJoinPool thread
.thenApplyAsync(data -> {
System.out.println("Stage 3 (thenApplyAsync) on: " + Thread.currentThread().getName());
return data + "!";
})
// Async with custom executor: runs on our IO pool
.thenApplyAsync(data -> {
System.out.println("Stage 4 (thenApplyAsync+executor) on: " + Thread.currentThread().getName());
return data + " [done]";
}, IO_POOL);
System.out.println(future.join());
// Possible output:
// Stage 1 on: ForkJoinPool.commonPool-worker-1
// Stage 2 (thenApply) on: ForkJoinPool.commonPool-worker-1
// Stage 3 (thenApplyAsync) on: ForkJoinPool.commonPool-worker-2
// Stage 4 (thenApplyAsync+executor) on: pool-1-thread-1
// DATA! [done]
IO_POOL.shutdown();
}
}
thenApply() (non-async) for quick, CPU-light transformations — parsing a string, extracting a field, simple formatting. There is no need to pay the overhead of switching threads.thenApplyAsync() when the callback itself is slow or when you want to ensure it does not run on the calling thread (e.g., in a GUI application where the calling thread is the UI thread).thenApplyAsync(fn, executor) when the callback involves I/O and you want to use a dedicated I/O thread pool instead of the shared ForkJoinPool.Now that you understand the individual methods, let us look at patterns you will actually use in production code.
The most common use case: fetch data from multiple services simultaneously and combine the results. This is the bread and butter of microservices backends.
import java.util.concurrent.*;
import java.util.List;
public class ParallelApiCalls {
private static final ExecutorService HTTP_POOL = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
long start = System.currentTimeMillis();
// Fire all three API calls simultaneously
CompletableFuture userFuture = CompletableFuture.supplyAsync(
() -> fetchFromApi("/users/42"), HTTP_POOL
);
CompletableFuture> ordersFuture = CompletableFuture.supplyAsync(
() -> fetchOrders(42), HTTP_POOL
);
CompletableFuture> recommendationsFuture = CompletableFuture.supplyAsync(
() -> fetchRecommendations(42), HTTP_POOL
);
// Wait for all and combine
CompletableFuture dashboard = CompletableFuture
.allOf(userFuture, ordersFuture, recommendationsFuture)
.thenApply(v -> {
String user = userFuture.join();
List orders = ordersFuture.join();
List recs = recommendationsFuture.join();
return buildDashboard(user, orders, recs);
});
System.out.println(dashboard.join());
long elapsed = System.currentTimeMillis() - start;
System.out.println("Total time: " + elapsed + "ms (parallel, not 3000ms sequential)");
// Output:
// === Dashboard for Alice ===
// Recent Orders: [Laptop, Headphones]
// Recommendations: [Keyboard, Monitor, Mouse Pad]
// Total time: ~1200ms (parallel, not 3000ms sequential)
HTTP_POOL.shutdown();
}
static String fetchFromApi(String endpoint) {
sleep(1000); // Simulate HTTP call
return "Alice";
}
static List fetchOrders(int userId) {
sleep(1200); // Simulate HTTP call
return List.of("Laptop", "Headphones");
}
static List fetchRecommendations(int userId) {
sleep(800); // Simulate HTTP call
return List.of("Keyboard", "Monitor", "Mouse Pad");
}
static String buildDashboard(String user, List orders, List recs) {
return String.format("=== Dashboard for %s ===%nRecent Orders: %s%nRecommendations: %s",
user, orders, recs);
}
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); }
}
}
Java 9 added two very useful methods to CompletableFuture: orTimeout() and completeOnTimeout(). Before Java 9, implementing timeouts required manual scheduling with ScheduledExecutorService.
import java.util.concurrent.*;
public class TimeoutPatterns {
public static void main(String[] args) {
// ===== Java 9+: orTimeout() =====
// Completes exceptionally with TimeoutException if not done in time
CompletableFuture withTimeout = CompletableFuture
.supplyAsync(() -> {
sleep(5000); // Simulates slow service
return "Slow result";
})
.orTimeout(2, TimeUnit.SECONDS)
.exceptionally(ex -> {
System.out.println("Timed out: " + ex.getCause().getClass().getSimpleName());
return "Default value (timed out)";
});
System.out.println(withTimeout.join());
// Output:
// Timed out: TimeoutException
// Default value (timed out)
// ===== Java 9+: completeOnTimeout() =====
// Completes with a default value if not done in time (no exception)
CompletableFuture withDefault = CompletableFuture
.supplyAsync(() -> {
sleep(5000); // Simulates slow service
return "Slow result";
})
.completeOnTimeout("Fallback value", 1, TimeUnit.SECONDS);
System.out.println(withDefault.join());
// Output: Fallback value
// ===== Pre-Java 9: Manual timeout pattern =====
CompletableFuture manualTimeout = addTimeout(
CompletableFuture.supplyAsync(() -> {
sleep(5000);
return "Slow result";
}),
2, TimeUnit.SECONDS,
"Timeout fallback"
);
System.out.println(manualTimeout.join());
// Output: Timeout fallback
}
// Pre-Java 9 timeout helper
static CompletableFuture addTimeout(
CompletableFuture future, long timeout, TimeUnit unit, T fallback) {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.schedule(() -> future.complete(fallback), timeout, unit);
return future.whenComplete((r, ex) -> scheduler.shutdown());
}
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); }
}
}
Network calls fail. APIs return 503. Databases have hiccups. A retry pattern lets you automatically reattempt a failed async operation a certain number of times before giving up.
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
public class RetryPattern {
public static void main(String[] args) {
// Retry up to 3 times
CompletableFuture result = retryAsync(() -> callUnreliableApi(), 3);
System.out.println(result.join());
// Output (varies):
// Attempt 1: calling API...
// Attempt 1 failed: API error, retrying...
// Attempt 2: calling API...
// Attempt 2 succeeded!
// API Response: {status: ok}
}
static int attempt = 0;
static String callUnreliableApi() {
attempt++;
System.out.println("Attempt " + attempt + ": calling API...");
if (attempt < 2) { // Fail first attempt, succeed on second
throw new RuntimeException("API error");
}
System.out.println("Attempt " + attempt + " succeeded!");
return "API Response: {status: ok}";
}
/**
* Retries an async operation up to maxRetries times.
* On each failure, it retries with a new CompletableFuture.
*/
static CompletableFuture retryAsync(Supplier supplier, int maxRetries) {
CompletableFuture future = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries; i++) {
future = future.handle((result, ex) -> {
if (ex == null) {
return CompletableFuture.completedFuture(result);
}
System.out.println("Failed: " + ex.getCause().getMessage() + ", retrying...");
return CompletableFuture.supplyAsync(supplier);
}).thenCompose(f -> f);
}
return future;
}
}
A circuit breaker prevents your application from repeatedly calling a service that is known to be down. After a certain number of failures, the circuit “opens” and subsequent calls fail immediately without attempting the call. After a cool-down period, the circuit “half-opens” to test if the service is back.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class SimpleCircuitBreaker {
enum State { CLOSED, OPEN, HALF_OPEN }
private volatile State state = State.CLOSED;
private final AtomicInteger failureCount = new AtomicInteger(0);
private final AtomicLong lastFailureTime = new AtomicLong(0);
private final int failureThreshold; // failures before opening
private final long cooldownMillis; // wait before half-open
public SimpleCircuitBreaker(int failureThreshold, long cooldownMillis) {
this.failureThreshold = failureThreshold;
this.cooldownMillis = cooldownMillis;
}
public CompletableFuture execute(java.util.function.Supplier supplier) {
if (state == State.OPEN) {
long elapsed = System.currentTimeMillis() - lastFailureTime.get();
if (elapsed > cooldownMillis) {
state = State.HALF_OPEN; // Allow one test call
} else {
return CompletableFuture.failedFuture(
new RuntimeException("Circuit is OPEN -- failing fast")
);
}
}
return CompletableFuture.supplyAsync(supplier)
.handle((result, ex) -> {
if (ex != null) {
int failures = failureCount.incrementAndGet();
lastFailureTime.set(System.currentTimeMillis());
if (failures >= failureThreshold) {
state = State.OPEN;
System.out.println("Circuit OPENED after " + failures + " failures");
}
throw new RuntimeException("Service call failed", ex.getCause());
}
// Success -- reset
failureCount.set(0);
state = State.CLOSED;
return result;
});
}
public static void main(String[] args) throws InterruptedException {
SimpleCircuitBreaker cb = new SimpleCircuitBreaker(3, 2000);
// Simulate 5 calls to a failing service
for (int i = 1; i <= 5; i++) {
final int callNum = i;
CompletableFuture result = cb.execute(() -> {
throw new RuntimeException("Service unavailable");
}).exceptionally(ex -> "Call " + callNum + " result: " + ex.getMessage());
System.out.println(result.join());
}
// Output:
// Call 1 result: Service call failed
// Call 2 result: Service call failed
// Circuit OPENED after 3 failures
// Call 3 result: Service call failed
// Call 4 result: Circuit is OPEN -- failing fast (no call attempted!)
// Call 5 result: Circuit is OPEN -- failing fast
}
}
Java has evolved its concurrency tools over the years. Here is how CompletableFuture compares to other approaches.
| Feature | Thread / Runnable | ExecutorService + Future | CompletableFuture | Virtual Threads (Java 21+) |
|---|---|---|---|---|
| Java Version | 1.0 | 1.5 | 1.8 | 21 |
| Return value | None (Runnable) | Yes (Callable + Future) | Yes (supplyAsync) | Yes (Callable + Future) |
| Non-blocking result | No | No (get() blocks) | Yes (callbacks) | Blocking is cheap (virtual) |
| Chaining | Manual thread coordination | Manual (submit next task) | Built-in (thenApply, thenCompose) | Sequential code style |
| Combining | CountDownLatch, join() | invokeAll() | allOf(), anyOf(), thenCombine() | StructuredTaskScope (preview) |
| Exception handling | Thread.UncaughtExceptionHandler | ExecutionException wrapper | exceptionally(), handle() | Standard try-catch |
| Thread cost | ~1MB stack per thread | Pool managed, still OS threads | Pool managed, OS threads | ~KB per virtual thread |
| Best for | Learning, simple background tasks | Task submission, thread pool control | Async pipelines, reactive patterns | High-concurrency I/O (millions of tasks) |
| Code style | Imperative, callback-based | Submit-and-wait | Functional, pipeline-based | Synchronous-looking code |
When to use what:
CompletableFuture for complex pipelines.These are the bugs and anti-patterns I see most often in production code using CompletableFuture. Learn them so you can avoid them.
If a CompletableFuture completes exceptionally and you never check or handle the exception, it fails silently. No stack trace, no error message, nothing. Your program continues with missing data and you spend hours debugging.
import java.util.concurrent.CompletableFuture;
public class SilentFailureMistake {
public static void main(String[] args) throws InterruptedException {
// BAD: Exception is swallowed silently
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Database connection failed!");
}).thenAccept(result -> {
System.out.println("This never prints, and you will never know why");
});
Thread.sleep(1000);
System.out.println("Program continues -- no error was visible!");
// Output: Program continues -- no error was visible!
// The RuntimeException vanished into thin air!
// GOOD: Always handle exceptions
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Database connection failed!");
}).thenAccept(result -> {
System.out.println("Processing: " + result);
}).exceptionally(ex -> {
System.err.println("ERROR: " + ex.getCause().getMessage());
return null; // Required for Void futures
});
Thread.sleep(1000);
// Output: ERROR: Database connection failed!
}
}
Calling get() or join() inside a thenApply() or other callback defeats the purpose of async programming. You are blocking a thread pool thread, potentially causing a deadlock or starving the pool.
import java.util.concurrent.CompletableFuture;
public class BlockingInAsyncMistake {
public static void main(String[] args) {
// BAD: Blocking inside an async callback
CompletableFuture bad = CompletableFuture
.supplyAsync(() -> 42)
.thenApply(userId -> {
// This BLOCKS a ForkJoinPool thread!
CompletableFuture orders = CompletableFuture.supplyAsync(
() -> fetchOrders(userId)
);
return orders.join(); // BLOCKING inside async pipeline -- BAD!
});
// GOOD: Use thenCompose() for dependent async operations
CompletableFuture good = CompletableFuture
.supplyAsync(() -> 42)
.thenCompose(userId -> // Non-blocking chaining
CompletableFuture.supplyAsync(() -> fetchOrders(userId))
);
System.out.println(good.join());
// Output: Orders for user 42
}
static String fetchOrders(int userId) {
return "Orders for user " + userId;
}
}
The common ForkJoinPool is shared across your entire JVM. If you fill it with slow I/O operations (database queries, HTTP calls), all async operations in your application slow down — including parallel streams and other CompletableFuture calls.
import java.util.concurrent.*;
public class CommonPoolMistake {
public static void main(String[] args) {
// BAD: Using common pool for slow I/O
CompletableFuture bad = CompletableFuture.supplyAsync(() -> {
// This slow DB call hogs a common pool thread
sleep(5000);
return "DB result";
}); // Uses ForkJoinPool.commonPool() (default)
// GOOD: Use a dedicated I/O pool
ExecutorService ioPool = Executors.newFixedThreadPool(20);
CompletableFuture good = CompletableFuture.supplyAsync(() -> {
sleep(5000); // Slow DB call on dedicated pool
return "DB result";
}, ioPool); // Uses dedicated I/O pool
System.out.println("Common pool size: " + ForkJoinPool.commonPool().getPoolSize());
// Rule of thumb:
// CPU-bound tasks -> ForkJoinPool (default, cores-1 threads)
// I/O-bound tasks -> dedicated fixed/cached thread pool
good.join();
ioPool.shutdown();
}
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); }
}
}
If you create a CompletableFuture manually with new CompletableFuture<>() and forget to call complete() or completeExceptionally(), any code waiting on it with join() or get() will block forever.
import java.util.concurrent.*;
public class ForgotToCompleteMistake {
public static void main(String[] args) {
// BAD: This future is never completed -- join() blocks forever
CompletableFuture neverCompleted = new CompletableFuture<>();
// neverCompleted.join(); // This would hang indefinitely!
// GOOD: Always complete manually-created futures
CompletableFuture manual = new CompletableFuture<>();
// Complete it from another thread
CompletableFuture.runAsync(() -> {
try {
String result = doSomeWork();
manual.complete(result); // Success path
} catch (Exception e) {
manual.completeExceptionally(e); // Failure path
}
});
System.out.println(manual.join());
// Output: Work done!
// BEST: Use a timeout to protect against forgotten completions (Java 9+)
CompletableFuture safe = new CompletableFuture<>();
safe.orTimeout(5, TimeUnit.SECONDS); // Will throw TimeoutException after 5 seconds
}
static String doSomeWork() {
return "Work done!";
}
}
CompletableFuture is immutable in the sense that thenApply(), exceptionally(), etc., return a new CompletableFuture. If you do not capture the return value, the callback is still registered, but you lose the reference to the new stage.
import java.util.concurrent.CompletableFuture;
public class IgnoreReturnValueMistake {
public static void main(String[] args) {
CompletableFuture original = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Oops");
});
// BAD: exceptionally() returns a NEW future -- you're ignoring it!
original.exceptionally(ex -> "Recovered"); // Return value discarded
// If you join the ORIGINAL future, it still has the exception!
try {
original.join(); // Still throws!
} catch (Exception e) {
System.out.println("Original still failed: " + e.getCause().getMessage());
}
// GOOD: Capture the new future returned by exceptionally()
CompletableFuture recovered = original.exceptionally(ex -> "Recovered");
System.out.println(recovered.join());
// Output: Recovered
}
}
Follow these guidelines to write reliable, maintainable, and performant CompletableFuture code.
Every CompletableFuture pipeline should end with an exception handler. Use exceptionally() for recovery, handle() for transformation, or whenComplete() for logging. Never let exceptions disappear silently.
Create dedicated thread pools for different types of work. A common pattern is to have separate pools for HTTP calls, database queries, and CPU-bound computation.
import java.util.concurrent.*;
public class ExecutorBestPractice {
// Separate pools for different workload types
private static final ExecutorService HTTP_POOL =
Executors.newFixedThreadPool(20, namedThread("http-worker"));
private static final ExecutorService DB_POOL =
Executors.newFixedThreadPool(10, namedThread("db-worker"));
// CPU-bound work uses the default ForkJoinPool (no custom executor needed)
static ThreadFactory namedThread(String prefix) {
return r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName(prefix + "-" + t.getId());
return t;
};
}
public static void main(String[] args) {
CompletableFuture httpResult = CompletableFuture
.supplyAsync(() -> callExternalApi(), HTTP_POOL)
.thenApplyAsync(json -> parseResponse(json)) // CPU-bound: default pool
.thenApplyAsync(data -> saveToDb(data), DB_POOL) // I/O: DB pool
.exceptionally(ex -> {
System.err.println("Pipeline failed: " + ex.getCause().getMessage());
return "Error";
});
System.out.println(httpResult.join());
// Output: Saved: {parsed: api-response}
HTTP_POOL.shutdown();
DB_POOL.shutdown();
}
static String callExternalApi() { return "api-response"; }
static String parseResponse(String json) { return "{parsed: " + json + "}"; }
static String saveToDb(String data) { return "Saved: " + data; }
}
When one async operation depends on another, use thenCompose() instead of nesting CompletableFutures inside thenApply(). This keeps the pipeline flat and avoids blocking.
When you have multiple independent operations, fire them all at once with allOf() instead of running them sequentially. This reduces total latency from the sum of all operations to the duration of the slowest one.
Never call join(), get(), or Thread.sleep() inside a callback (thenApply, thenAccept, etc.). These block pool threads and can lead to thread starvation or deadlocks. Use thenCompose() for chaining and thenCombine() for combining.
Always set timeouts on operations that depend on external services. On Java 9+, use orTimeout() or completeOnTimeout(). On Java 8, use a ScheduledExecutorService to complete the future after a delay.
Use custom ThreadFactory implementations that give descriptive names to threads. When you look at a thread dump or log output, http-worker-23 is far more useful than pool-1-thread-23.
| # | Practice | Do | Don’t |
|---|---|---|---|
| 1 | Exception handling | End every pipeline with exceptionally() or handle() |
Let exceptions vanish silently |
| 2 | Thread pools | Use dedicated pools for I/O work | Use common ForkJoinPool for database/HTTP calls |
| 3 | Chaining | Use thenCompose() for dependent async ops |
Call join() inside thenApply() |
| 4 | Parallelism | Use allOf() for independent operations |
Chain independent operations sequentially |
| 5 | Blocking | Use callbacks and composition | Call get() / join() inside callbacks |
| 6 | Timeouts | Always set timeouts on external calls | Trust that services will respond quickly |
| 7 | Thread naming | Use custom ThreadFactory with descriptive names |
Use default pool-1-thread-N names |
Let us put everything together with a realistic example. An e-commerce system needs to process an order. This involves calling multiple services — inventory, payment, and notification — combining results, and handling failures gracefully. This example uses every major CompletableFuture feature covered in this tutorial.
import java.util.concurrent.*;
import java.util.List;
import java.util.Map;
/**
* E-Commerce Order Processing System
*
* Demonstrates: supplyAsync, thenApply, thenCompose, thenCombine,
* allOf, exceptionally, handle, whenComplete, custom executors,
* timeout pattern, and combining parallel operations.
*/
public class OrderProcessingSystem {
// Dedicated thread pools for different I/O operations
private static final ExecutorService INVENTORY_POOL =
Executors.newFixedThreadPool(5, namedThread("inventory"));
private static final ExecutorService PAYMENT_POOL =
Executors.newFixedThreadPool(5, namedThread("payment"));
private static final ExecutorService NOTIFICATION_POOL =
Executors.newFixedThreadPool(3, namedThread("notification"));
// ===================== Service Simulations =====================
/** Check if all items are in stock */
static CompletableFuture checkInventory(String orderId, List items) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] Checking inventory for " + orderId);
sleep(800); // Simulate DB call
System.out.println(" Inventory check passed: all items in stock");
return true; // All items available
}, INVENTORY_POOL);
}
/** Reserve items in inventory */
static CompletableFuture reserveItems(String orderId, List items) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] Reserving items for " + orderId);
sleep(500);
String reservationId = "RES-" + orderId.hashCode();
System.out.println(" Items reserved: " + reservationId);
return reservationId;
}, INVENTORY_POOL);
}
/** Process payment */
static CompletableFuture processPayment(String orderId, double amount) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] Processing payment: $" + amount);
sleep(1200); // Simulate payment gateway call
String transactionId = "TXN-" + System.currentTimeMillis();
System.out.println(" Payment successful: " + transactionId);
return transactionId;
}, PAYMENT_POOL);
}
/** Send confirmation email (fire-and-forget, with retry) */
static CompletableFuture sendConfirmationEmail(String orderId, String email) {
return CompletableFuture.runAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] Sending email to " + email);
sleep(600);
System.out.println(" Email sent for order " + orderId);
}, NOTIFICATION_POOL);
}
/** Send SMS notification */
static CompletableFuture sendSmsNotification(String orderId, String phone) {
return CompletableFuture.runAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] Sending SMS to " + phone);
sleep(400);
System.out.println(" SMS sent for order " + orderId);
}, NOTIFICATION_POOL);
}
/** Calculate shipping estimate (async, depends on items) */
static CompletableFuture calculateShipping(List items) {
return CompletableFuture.supplyAsync(() -> {
sleep(300);
return "3-5 business days";
});
}
// ===================== Order Processing Pipeline =====================
static CompletableFuture
| # | Concept | Where Used |
|---|---|---|
| 1 | supplyAsync(supplier, executor) |
All service methods use dedicated thread pools |
| 2 | runAsync(runnable, executor) |
Email and SMS notifications (no return value) |
| 3 | thenCompose() |
Inventory check -> reserve + payment (dependent chain) |
| 4 | thenCombine() |
Merging reservation + payment + shipping results |
| 5 | allOf() |
Waiting for both notifications to finish |
| 6 | thenAccept() |
Triggering notifications after order confirmed |
| 7 | exceptionally() |
Inventory timeout fallback, notification error handling |
| 8 | whenComplete() |
Logging order outcome (success or failure) with timing |
| 9 | orTimeout() |
3-second timeout on inventory check (Java 9+) |
| 10 | failedFuture() |
Short-circuit when items are out of stock |
| 11 | Custom executors | Separate pools for inventory, payment, and notifications |
| 12 | Named threads | namedThread() factory for debugging-friendly names |
| 13 | Parallel execution | Reserve + payment + shipping run simultaneously |
| 14 | Fire-and-forget | Notifications run after order is confirmed |
| Category | Method | Description |
|---|---|---|
| Create | supplyAsync(supplier) |
Run async task that returns a value |
runAsync(runnable) |
Run async task with no return value | |
completedFuture(value) |
Create already-completed future | |
| Get Result | join() |
Get result (unchecked exception) |
get() / get(timeout, unit) |
Get result (checked exception, with optional timeout) | |
getNow(default) |
Get result if done, else return default | |
| Transform | thenApply(fn) |
Transform result: T -> U |
thenAccept(consumer) |
Consume result: T -> void |
|
thenRun(action) |
Run action after completion (ignores result) | |
| Compose | thenCompose(fn) |
Chain dependent async op: T -> CF<U> (flatMap) |
thenCombine(other, fn) |
Combine two independent futures | |
| Multiple | allOf(cf1, cf2, ...) |
Wait for all futures to complete |
anyOf(cf1, cf2, ...) |
Wait for first future to complete | |
| Exceptions | exceptionally(fn) |
Catch exception, provide fallback value |
handle(fn) |
Handle result or exception, return new value | |
whenComplete(action) |
Inspect result/exception (no modification) | |
| Timeout (9+) | orTimeout(timeout, unit) |
Complete exceptionally if not done in time |
completeOnTimeout(value, timeout, unit) |
Complete with default if not done in time |