Since Java 8, the Stream API has been one of the most powerful tools in your toolkit. You can filter, map, flatMap, reduce, and collect your way through most data-processing tasks with clean, declarative code. But if you have spent enough time with streams, you have inevitably hit a wall: there is no way to define your own intermediate operations.
Think about it. Java gives you Collector as an extension point for terminal operations — you can write custom collectors that fold, group, partition, or summarize data in any way you want. But for intermediate operations? You are stuck with what the API provides. If the built-in map(), filter(), flatMap(), distinct(), sorted(), peek(), limit(), skip(), and takeWhile() do not cover your use case, you have to break out of the stream pipeline, materialize into a collection, manipulate it imperatively, and then stream it again. That defeats the entire point.
Consider an analogy: imagine you are building an assembly line in a factory. Java gave you the ability to customize the packaging station at the end (collectors), but it locked down every station in the middle of the line. Want a station that groups items into batches of five? Want one that computes a running average and passes it along? Want one that deduplicates consecutive items? You had to hack around the limitation or abandon the assembly line entirely.
Java 25 changes this with Stream Gatherers (JEP 485). Gatherers are the missing counterpart to collectors — they let you define custom intermediate operations that plug directly into a stream pipeline. A gatherer can transform elements one-to-one, one-to-many, many-to-one, or many-to-many. It can carry state across elements. It can short-circuit to stop processing early. It can even support parallel execution. And just like collectors, Java ships several built-in gatherers that handle common use cases out of the box.
Stream Gatherers were previewed in Java 22 (JEP 461) and Java 23 (JEP 473), and finalized without changes in Java 24 (JEP 485), making them a standard feature in Java 25 LTS. This post covers everything you need to know: the interface anatomy, all five built-in gatherers, how to write your own, and real-world patterns that will change how you think about stream pipelines.
To appreciate why gatherers matter, let us look at things you cannot do cleanly with the existing Stream API — operations that require state, context, or structural transformation between elements.
Suppose you have a stream of stock prices and you want to compute a 3-day moving average. You need to look at three consecutive elements at a time, slide one position forward, and produce an average for each window. There is no built-in stream operation for this. Before gatherers, your options were:
// The ugly workaround: materialize, index, and re-stream Listprices = List.of(100.0, 102.5, 101.0, 105.0, 103.5, 107.0); List movingAverages = IntStream.range(0, prices.size() - 2) .mapToObj(i -> (prices.get(i) + prices.get(i + 1) + prices.get(i + 2)) / 3.0) .toList(); // Requires random access -- cannot work with a true stream
This only works because you materialized the data into a List first. With a true stream (say, reading from a socket or a database cursor), you cannot index into it. You need an intermediate operation that remembers previous elements.
The built-in distinct() removes all duplicates across the entire stream, but what if you only want to remove consecutive duplicates? For example, turning [1, 1, 2, 2, 2, 3, 1, 1] into [1, 2, 3, 1]. There is no built-in operation for this. You need state — specifically, you need to remember the last element you emitted.
You have a stream of records and you want to group them into batches of 100 for bulk database inserts. The stream might have 10,000 elements, and you need to emit 100 lists of 100. This is a many-to-many transformation that requires an accumulator, and the built-in API has nothing for it.
You want a running total: given [1, 2, 3, 4], produce [1, 3, 6, 10]. The reduce() operation produces a single value, not a stream of intermediate results. You would have to use an external mutable variable (which violates the stream contract) or fall back to imperative code.
Collectors are powerful, but they are terminal operations. They consume the entire stream and produce a single result. They cannot sit in the middle of a pipeline and emit elements downstream. Gatherers fill exactly this gap — they are intermediate operations that can carry state, transform structure, and emit zero or more elements per input, all while remaining composable with the rest of the pipeline.
| Limitation | Before Gatherers | With Gatherers |
|---|---|---|
| Sliding window | Materialize to list, index manually | Gatherers.windowSliding(n) |
| Fixed-size batches | Collect all, then partition | Gatherers.windowFixed(n) |
| Running total | External mutable variable (unsafe) | Gatherers.scan(init, fn) |
| Fold to single value (intermediate) | Not possible in pipeline | Gatherers.fold(init, fn) |
| Concurrent mapping with limit | Custom thread pool + futures | Gatherers.mapConcurrent(n, fn) |
| Custom stateful logic | Break pipeline, write imperative code | stream.gather(myGatherer) |
A gatherer is defined by the java.util.stream.Gatherer<T, A, R> interface. If you have worked with Collector<T, A, R>, the shape will feel familiar, but there are important differences. Let us break down the type parameters and the four functions that make up a gatherer.
| Parameter | Meaning | Collector Equivalent |
|---|---|---|
| T | Type of input elements consumed from upstream | Same — input type |
| A | Type of the mutable state object (private, per-gatherer) | Same — accumulator type |
| R | Type of output elements emitted downstream | Different — in Collector, R is the final result type, not a stream element type |
The critical insight: a Collector’s R is the single result you get back (like a List or a Map). A Gatherer’s R is the element type of the output stream. The gatherer sits in the middle of the pipeline and produces a new stream.
Every gatherer is composed of four functions. Two are required (well, one is required), and two are optional:
| Function | Type | Required? | Purpose |
|---|---|---|---|
| initializer() | Supplier<A> |
Optional | Creates the private mutable state object. Called once per stream evaluation. If omitted, the gatherer is stateless. |
| integrator() | Integrator<A, T, R> |
Required | The core logic. Called once per input element. Receives the state, the current element, and a Downstream handle to push output elements. Returns boolean: true to continue, false to short-circuit. |
| combiner() | BinaryOperator<A> |
Optional | Merges two state objects when running in parallel. Without this, the gatherer runs sequentially even on a parallel stream. |
| finisher() | BiConsumer<A, Downstream<? super R>> |
Optional | Called after all input elements have been processed. Can emit final elements downstream. Useful for flushing buffered state. |
The Downstream<R> object is how a gatherer emits elements to the next stage in the pipeline. It has two key methods:
public interface Downstream{ // Push an element downstream. Returns true if more elements are accepted, // false if the downstream is done (e.g., a short-circuiting terminal op). boolean push(R element); // Check if the downstream is rejecting further elements. boolean isRejecting(); }
This is one of the key differences from Collector. A collector’s accumulator is a BiConsumer — it just consumes, with no feedback. A gatherer’s integrator gets feedback from downstream via the return value of push(). This enables short-circuiting: if the downstream says “I am done,” the gatherer can stop processing immediately.
The Integrator interface comes in two flavors:
// Standard integrator -- may short-circuit (return false)
Integrator.of((state, element, downstream) -> {
// Process element, optionally push to downstream
// Return false to stop processing early
return true;
});
// Greedy integrator -- promises to never short-circuit
// The stream runtime can optimize based on this guarantee
Integrator.ofGreedy((state, element, downstream) -> {
downstream.push(transform(element));
// No return value needed -- always continues
});
Use Integrator.ofGreedy() when your gatherer always processes all elements (like a mapping or filtering gatherer). Use Integrator.of() when your gatherer might need to stop early (like a “take first N matching” gatherer).
| Aspect | Collector<T, A, R> | Gatherer<T, A, R> |
|---|---|---|
| Pipeline position | Terminal (end) | Intermediate (middle) |
| Output | Single result of type R | Stream of elements of type R |
| Accumulator / Integrator | BiConsumer<A, T> — no feedback |
Integrator<A, T, R> — returns boolean, has Downstream |
| Finisher | Function<A, R> — returns the result |
BiConsumer<A, Downstream> — pushes to stream |
| Short-circuiting | Not supported | Supported via integrator return value |
| Composability | Not directly composable | Composable via andThen() |
Java ships five built-in gatherers in the java.util.stream.Gatherers utility class. These cover the most commonly requested operations that were impossible with the old API. Let us go through each one with concrete examples.
fold() is a many-to-one gatherer. It works like reduce(), but as an intermediate operation that emits a single result element into the downstream when all input is consumed. Think of it as “reduce, but keep going with the pipeline.”
Signature:
staticGatherer fold( Supplier initial, BiFunction super R, ? super T, ? extends R> folder )
How it works: The initial supplier creates the starting value (the identity). The folder function takes the current accumulated value and the next input element, and returns the new accumulated value. After all input elements are processed, the final accumulated value is emitted downstream as a single element.
Example: Join strings with a semicolon delimiter
String result = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
.gather(
Gatherers.fold(
() -> "",
(accumulated, element) -> {
if (accumulated.isEmpty()) return element.toString();
return accumulated + ";" + element;
}
)
)
.findFirst()
.get();
System.out.println(result);
// Output: 1;2;3;4;5;6;7;8;9
“Wait, can’t I just use Collectors.joining(";")?” Yes, for this specific case. But fold() is an intermediate operation — the result flows into the rest of the pipeline. You could chain more gatherers or operations after it. With a collector, the pipeline ends.
Example: Sum as intermediate operation, then continue processing
// Fold to compute sum, then map the result, then collect Listresult = Stream.of(10, 20, 30) .gather(Gatherers.fold(() -> 0, Integer::sum)) .map(sum -> "Total: " + sum) .toList(); System.out.println(result); // Output: [Total: 60]
scan() is a one-to-one stateful gatherer. It produces a running accumulation — for each input element, it emits the current accumulated value. If you are familiar with functional programming, this is the classic “prefix scan” or “cumulative fold.”
Signature:
staticGatherer scan( Supplier initial, BiFunction super R, ? super T, ? extends R> scanner )
How it works: For each input element, the scanner function is applied to the current state and the element. The result becomes both the new state and the output element pushed downstream.
Example: Running sum (prefix sum)
Stream.of(1, 2, 3, 4, 5)
.gather(Gatherers.scan(() -> 0, Integer::sum))
.forEach(System.out::println);
// Output:
// 1
// 3
// 6
// 10
// 15
Notice the output has the same number of elements as the input — that is the one-to-one nature of scan(). Each output is the cumulative result up to that point.
Example: Running sum starting from a seed value
Stream.of(1, 2, 3, 4, 5)
.gather(Gatherers.scan(() -> 100, (current, next) -> current + next))
.forEach(System.out::println);
// Output:
// 101
// 103
// 106
// 110
// 115
Example: Running maximum
Stream.of(3, 1, 4, 1, 5, 9, 2, 6)
.gather(Gatherers.scan(() -> Integer.MIN_VALUE, Integer::max))
.forEach(System.out::println);
// Output:
// 3
// 3
// 4
// 4
// 5
// 9
// 9
// 9
windowFixed() is a many-to-many gatherer. It groups input elements into fixed-size lists (batches). When the window is full, it is emitted downstream as a List. The last window may contain fewer elements if the stream size is not evenly divisible.
Signature:
staticGatherer > windowFixed(int windowSize)
Example: Batch elements into groups of 3
Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
.gather(Gatherers.windowFixed(3))
.forEach(System.out::println);
// Output:
// [1, 2, 3]
// [4, 5, 6]
// [7, 8]
Notice how the last window [7, 8] has only two elements — it emits whatever is left when the stream ends.
Example: Bulk database inserts in batches of 100
records.stream()
.gather(Gatherers.windowFixed(100))
.forEach(batch -> {
jdbcTemplate.batchUpdate(
"INSERT INTO orders (id, amount) VALUES (?, ?)",
batch.stream()
.map(order -> new Object[]{order.id(), order.amount()})
.toList()
);
System.out.println("Inserted batch of " + batch.size());
});
windowSliding() is a many-to-many gatherer that creates overlapping windows. Each window contains windowSize elements, and the window slides forward by one position for each new element. This is exactly what you need for moving averages, n-gram generation, and similar sliding-window algorithms.
Signature:
staticGatherer > windowSliding(int windowSize)
Example: Sliding windows of size 3
Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
.gather(Gatherers.windowSliding(3))
.forEach(System.out::println);
// Output:
// [1, 2, 3]
// [2, 3, 4]
// [3, 4, 5]
// [4, 5, 6]
// [5, 6, 7]
// [6, 7, 8]
Each window overlaps with the previous one by windowSize - 1 elements. The input stream of 8 elements produces 6 windows of size 3.
Example: 3-day moving average of stock prices
Listprices = List.of(100.0, 102.5, 101.0, 105.0, 103.5, 107.0, 106.0); prices.stream() .gather(Gatherers.windowSliding(3)) .map(window -> window.stream() .mapToDouble(Double::doubleValue) .average() .orElse(0.0)) .forEach(avg -> System.out.printf("%.2f%n", avg)); // Output: // 101.17 // 102.83 // 103.17 // 105.17 // 105.50
mapConcurrent() is a one-to-one gatherer that applies a mapping function concurrently, up to a specified concurrency limit. This is incredibly useful for I/O-bound operations where you want to parallelize the mapping without converting the entire stream to parallel (which would parallelize everything, including non-I/O stages).
Signature:
staticGatherer mapConcurrent( int maxConcurrency, Function super T, ? extends R> mapper )
Key properties:
maxConcurrency simultaneous invocationsExample: Fetch URLs with concurrency limit of 5
Listurls = List.of( "https://api.example.com/users/1", "https://api.example.com/users/2", "https://api.example.com/users/3", "https://api.example.com/users/4", "https://api.example.com/users/5", "https://api.example.com/users/6", "https://api.example.com/users/7", "https://api.example.com/users/8" ); List responses = urls.stream() .gather(Gatherers.mapConcurrent(5, url -> { // This runs concurrently, up to 5 at a time HttpClient client = HttpClient.newHttpClient(); HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(url)) .build(); try { return client.send(request, HttpResponse.BodyHandlers.ofString()) .body(); } catch (Exception e) { return "Error: " + e.getMessage(); } })) .toList(); // All 8 URLs are fetched, max 5 at a time, results in original order
Before mapConcurrent(), achieving this required manually managing a thread pool, submitting CompletableFuture tasks, collecting results, and handling ordering yourself. Now it is one line in a stream pipeline.
The gather() method is the new intermediate operation on Stream that accepts a Gatherer. It sits alongside map(), filter(), and flatMap() in the pipeline.
Signature:
// On the Stream interface:Stream gather(Gatherer super T, ?, R> gatherer)
You can use gather() anywhere you would use any other intermediate operation. You can chain multiple gather() calls, mix them with map() and filter(), and end with any terminal operation.
Example: Chaining gatherers with standard operations
Listresult = Stream.of(10.0, 20.5, 15.3, 30.0, 25.7, 18.2, 22.1, 35.0, 28.4, 19.6) .filter(value -> value > 15.0) // Standard filter .gather(Gatherers.windowSliding(3)) // Sliding windows of 3 .map(window -> window.stream() // Standard map .mapToDouble(Double::doubleValue) .average() .orElse(0.0)) .gather(Gatherers.scan(() -> 0.0, // Running sum of averages (sum, avg) -> sum + avg)) .toList(); // Terminal operation System.out.println(result);
Gatherers support composition via the andThen() method. This lets you combine two gatherers into a single gatherer, which can be useful for building reusable, composable transformation pipelines.
// Create composed gatherer: scan then fold GathererscanGatherer = Gatherers.scan(() -> 0, Integer::sum); Gatherer foldGatherer = Gatherers.fold( () -> "", (result, element) -> result.isEmpty() ? element.toString() : result + ";" + element ); // Compose them: first scan (running sum), then fold (join into string) Gatherer composed = scanGatherer.andThen(foldGatherer); // These are equivalent: String result1 = Stream.of(1, 2, 3, 4, 5) .gather(composed) .findFirst().get(); String result2 = Stream.of(1, 2, 3, 4, 5) .gather(scanGatherer) .gather(foldGatherer) .findFirst().get(); System.out.println(result1); // Output: 1;3;6;10;15 System.out.println(result1.equals(result2)); // Output: true
When the stream pipeline is executed and encounters a gather() step, the following happens:
Downstream object is created that forwards elements to the next pipeline stageinitializer is called to create the state objectintegrator function is retrievedintegrator.integrate(state, element, downstream) is calledfalse, processing stops immediately (short-circuit)finisher is called with the final state and downstreamThe built-in gatherers cover common cases, but the real power of the API is creating your own. There are two approaches: implementing the Gatherer interface directly, or using the factory methods Gatherer.of() and Gatherer.ofSequential().
Let us build a gatherer that removes consecutive duplicate elements — something you cannot do with distinct() (which removes all duplicates globally).
import java.util.stream.Gatherer; import java.util.function.*; /** * A gatherer that removes consecutive duplicates. * [1, 1, 2, 2, 2, 3, 1, 1] -> [1, 2, 3, 1] */ public class DistinctConsecutiveimplements Gatherer , T> { @Override public Supplier > initializer() { // State: a single-element list holding the last emitted value return () -> new ArrayList<>(1); } @Override public Integrator
, T, T> integrator() { return Integrator.ofGreedy((state, element, downstream) -> { if (state.isEmpty() || !Objects.equals(state.getFirst(), element)) { state.clear(); state.add(element); downstream.push(element); } // If same as last, skip it }); } // No combiner -- sequential only // No finisher -- nothing to flush }
Usage:
Stream.of(1, 1, 2, 2, 2, 3, 1, 1, 4, 4)
.gather(new DistinctConsecutive<>())
.forEach(System.out::println);
// Output:
// 1
// 2
// 3
// 1
// 4
For simpler gatherers that do not need parallel support, Gatherer.ofSequential() is more concise. Let us build the same consecutive-distinct gatherer using the factory method:
staticGatherer distinctConsecutive() { return Gatherer.ofSequential( // Initializer: mutable container for the last seen element () -> new ArrayList (1), // Integrator Integrator.ofGreedy((state, element, downstream) -> { if (state.isEmpty() || !Objects.equals(state.getFirst(), element)) { state.clear(); state.add(element); downstream.push(element); } }) ); } // Usage: Stream.of("a", "a", "b", "b", "c", "a", "a") .gather(distinctConsecutive()) .toList(); // Result: [a, b, c, a]
When you need parallel execution, use Gatherer.of() which accepts all four functions including a combiner:
staticGatherer distinctConsecutiveParallel() { return Gatherer.of( // Initializer () -> new ArrayList (1), // Integrator Integrator.ofGreedy((state, element, downstream) -> { if (state.isEmpty() || !Objects.equals(state.getFirst(), element)) { state.clear(); state.add(element); downstream.push(element); } }), // Combiner for parallel execution (left, right) -> { // When merging parallel segments, keep the state from the right segment // because it represents the more recent "last seen" element return right.isEmpty() ? left : right; } // No finisher needed ); }
| Factory Method | Parameters | Parallel? | Use When |
|---|---|---|---|
Gatherer.ofSequential(integrator) |
Integrator only | No | Stateless, sequential transformation |
Gatherer.ofSequential(init, integrator) |
Initializer + Integrator | No | Stateful, sequential, no flush needed |
Gatherer.ofSequential(init, integrator, finisher) |
Initializer + Integrator + Finisher | No | Stateful, sequential, needs final flush |
Gatherer.of(init, integrator, combiner) |
Initializer + Integrator + Combiner | Yes | Stateful, parallelizable, no flush |
Gatherer.of(init, integrator, combiner, finisher) |
All four | Yes | Full-featured gatherer |
Stateful gatherers are where the API truly shines. These are operations that need to remember information across elements — something that was impossible to do correctly in a stream pipeline before gatherers.
A gatherer that computes a running average, emitting the current average after each input element:
static GathererrunningAverage() { // State: [sum, count] return Gatherer.ofSequential( () -> new double[]{0.0, 0.0}, Integrator.ofGreedy((state, element, downstream) -> { state[0] += element; // sum state[1] += 1; // count downstream.push(state[0] / state[1]); }) ); } // Usage: Stream.of(10.0, 20.0, 30.0, 40.0, 50.0) .gather(runningAverage()) .forEach(avg -> System.out.printf("%.1f%n", avg)); // Output: // 10.0 // 15.0 // 20.0 // 25.0 // 30.0
Unlike distinct(), which uses a HashSet internally and removes all duplicates, you might want to deduplicate within a time or count window — for example, suppress duplicate log messages within a batch of 100:
staticGatherer deduplicateWithinWindow(int windowSize) { return Gatherer.ofSequential( () -> new Object[]{new LinkedHashSet (), 0}, Integrator.ofGreedy((state, element, downstream) -> { @SuppressWarnings("unchecked") LinkedHashSet seen = (LinkedHashSet ) state[0]; int count = (int) state[1]; // Reset the window when we hit the limit if (count > 0 && count % windowSize == 0) { seen.clear(); } if (seen.add(element)) { downstream.push(element); } state[1] = count + 1; }) ); } // Usage: Suppress duplicate log levels within batches of 5 Stream.of("INFO", "WARN", "INFO", "ERROR", "WARN", "INFO", "DEBUG", "INFO", "WARN", "ERROR") .gather(deduplicateWithinWindow(5)) .forEach(System.out::println); // Output (first window of 5 input elements): // INFO // WARN // ERROR // (second window of 5 input elements -- seen set is cleared): // INFO // DEBUG // WARN // ERROR
A gatherer that enforces a maximum throughput by introducing delays when elements arrive too quickly:
staticGatherer rateLimited(int maxPerSecond) { long intervalNanos = 1_000_000_000L / maxPerSecond; return Gatherer.ofSequential( // State: last emission time in nanos () -> new long[]{0L}, Integrator.ofGreedy((state, element, downstream) -> { long now = System.nanoTime(); long elapsed = now - state[0]; if (elapsed < intervalNanos) { try { Thread.sleep((intervalNanos - elapsed) / 1_000_000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } state[0] = System.nanoTime(); downstream.push(element); }) ); } // Usage: Process API calls at most 10 per second urls.stream() .gather(rateLimited(10)) .gather(Gatherers.mapConcurrent(5, url -> fetchUrl(url))) .forEach(response -> process(response));
Group elements into sessions based on a gap threshold. If two consecutive elements are more than a specified distance apart, start a new session:
record TimestampedEvent(long timestamp, String data) {}
static Gatherer> sessionGrouping(long maxGapMillis) {
return Gatherer.ofSequential(
// State: current session (list of events)
ArrayList::new,
// Integrator: add to current session or start new one
Integrator.ofGreedy((session, event, downstream) -> {
if (!session.isEmpty()) {
long lastTimestamp = session.getLast().timestamp();
if (event.timestamp() - lastTimestamp > maxGapMillis) {
// Gap exceeds threshold -- emit current session, start new one
downstream.push(List.copyOf(session));
session.clear();
}
}
session.add(event);
}),
// Finisher: emit the last session
(session, downstream) -> {
if (!session.isEmpty()) {
downstream.push(List.copyOf(session));
}
}
);
}
// Usage: Group click events into sessions (30-second gap threshold)
List clicks = List.of(
new TimestampedEvent(1000, "click_home"),
new TimestampedEvent(3000, "click_products"),
new TimestampedEvent(5000, "click_item"),
new TimestampedEvent(60000, "click_home"), // 55s gap -- new session
new TimestampedEvent(62000, "click_cart"),
new TimestampedEvent(63000, "click_checkout")
);
clicks.stream()
.gather(sessionGrouping(30_000))
.forEach(session -> System.out.println("Session: " + session));
// Output:
// Session: [TimestampedEvent[timestamp=1000, data=click_home], ...]
// Session: [TimestampedEvent[timestamp=60000, data=click_home], ...]
A one-to-many gatherer emits multiple output elements for each input element. You might think “that is what flatMap() does” — and you would be right for the stateless case. But gatherers can do stateful one-to-many transformations, which flatMap() cannot.
For each input number, emit both the number itself and the running total so far:
static GathererelementAndRunningTotal() { return Gatherer.ofSequential( () -> new int[]{0}, // running total Integrator.ofGreedy((state, element, downstream) -> { state[0] += element; downstream.push("Value: " + element); downstream.push("Running total: " + state[0]); }) ); } Stream.of(10, 20, 30) .gather(elementAndRunningTotal()) .forEach(System.out::println); // Output: // Value: 10 // Running total: 10 // Value: 20 // Running total: 30 // Value: 30 // Running total: 60
Given a stream of range objects, expand each into individual integers — but only emit values that are unique across all ranges (stateful dedup during expansion):
record IntRange(int start, int endInclusive) {}
static Gatherer expandUniqueRanges() {
return Gatherer.ofSequential(
HashSet::new, // track all emitted values globally
Integrator.ofGreedy((seen, range, downstream) -> {
for (int i = range.start(); i <= range.endInclusive(); i++) {
if (seen.add(i)) {
downstream.push(i);
}
}
})
);
}
Stream.of(
new IntRange(1, 5),
new IntRange(3, 8), // 3, 4, 5 already seen -- only emit 6, 7, 8
new IntRange(7, 10) // 7, 8 already seen -- only emit 9, 10
)
.gather(expandUniqueRanges())
.forEach(System.out::println);
// Output: 1 2 3 4 5 6 7 8 9 10
A gatherer that splits each input line into words, maintaining a word count across all lines:
record NumberedWord(int globalIndex, String word) {}
static Gatherer tokenize() {
return Gatherer.ofSequential(
() -> new int[]{0}, // global word counter
Integrator.ofGreedy((counter, line, downstream) -> {
String[] words = line.trim().split("\\s+");
for (String word : words) {
if (!word.isEmpty()) {
counter[0]++;
downstream.push(new NumberedWord(counter[0], word));
}
}
})
);
}
Stream.of("hello world", "foo bar baz", "java streams")
.gather(tokenize())
.forEach(System.out::println);
// Output:
// NumberedWord[globalIndex=1, word=hello]
// NumberedWord[globalIndex=2, word=world]
// NumberedWord[globalIndex=3, word=foo]
// NumberedWord[globalIndex=4, word=bar]
// NumberedWord[globalIndex=5, word=baz]
// NumberedWord[globalIndex=6, word=java]
// NumberedWord[globalIndex=7, word=streams]
Short-circuiting is one of the most powerful capabilities of gatherers. By returning false from the integrator, you tell the stream to stop processing immediately. This enables operations like "take while a condition holds, but with state" -- something that built-in takeWhile() cannot do because it is stateless.
Take elements from the stream until the cumulative sum exceeds a threshold:
static GatherertakeUntilSumExceeds(int threshold) { return Gatherer.ofSequential( () -> new int[]{0}, // running sum Integrator.of((state, element, downstream) -> { state[0] += element; if (state[0] > threshold) { return false; // Stop -- sum exceeded threshold } return downstream.push(element); }) ); } Stream.of(10, 20, 30, 40, 50, 60) .gather(takeUntilSumExceeds(55)) .forEach(System.out::println); // Output: // 10 // 20 // (30 would make sum = 60 > 55, so processing stops)
Take elements until you have seen N distinct values. This combines state (a set of seen values) with short-circuiting:
staticGatherer takeNDistinct(int n) { return Gatherer.ofSequential( HashSet ::new, Integrator.of((seen, element, downstream) -> { seen.add(element); downstream.push(element); return seen.size() < n; // Stop when we have N distinct values }) ); } Stream.of(1, 2, 1, 3, 2, 4, 5, 3, 6) .gather(takeNDistinct(4)) .forEach(System.out::println); // Output: 1, 2, 1, 3, 2, 4 // Stops after seeing 4th distinct value (4), but includes all elements up to that point
Skip the first N elements, then take the first one that matches a predicate:
staticGatherer firstMatchAfterSkipping(int skip, Predicate predicate) { return Gatherer.ofSequential( () -> new int[]{0}, // element counter Integrator.of((counter, element, downstream) -> { counter[0]++; if (counter[0] > skip && predicate.test(element)) { downstream.push(element); return false; // Found it -- stop } return true; // Keep looking }) ); } Stream.of(2, 4, 6, 7, 8, 9, 10, 11) .gather(firstMatchAfterSkipping(3, n -> n % 2 != 0)) .forEach(System.out::println); // Output: 7 // Skipped first 3 (2, 4, 6), then found first odd number (7)
The key pattern for short-circuiting: use Integrator.of() (not ofGreedy()) and return false when you want to stop. This works even on infinite streams -- the gatherer will terminate the pipeline when the condition is met.
Since gatherers and collectors share a similar structure, it is important to understand exactly when to use each one. Here is a comprehensive comparison:
| Aspect | Collector<T, A, R> | Gatherer<T, A, R> |
|---|---|---|
| Pipeline position | Terminal -- ends the pipeline | Intermediate -- pipeline continues after it |
| Output | Single result (List, Map, String, etc.) | Stream of zero or more elements |
| Used with | stream.collect(collector) |
stream.gather(gatherer) |
| Accumulator / Integrator | BiConsumer<A, T> -- no return value |
Integrator<A, T, R> -- returns boolean |
| Downstream access | No -- accumulates into state | Yes -- can push elements to next stage |
| Finisher | Function<A, R> -- transforms state to result |
BiConsumer<A, Downstream> -- pushes to stream |
| Short-circuiting | Not supported | Supported via integrator return value |
| Composability | Not composable with other collectors | Composable via andThen() |
| Cardinality | Many-to-one (always produces single result) | Any: 1-to-1, 1-to-many, many-to-1, many-to-many |
| Infinite streams | Cannot handle (never terminates) | Can handle via short-circuiting |
| Use case | Aggregate/summarize data | Transform/reshape data flow |
Use a Collector when:
Use a Gatherer when:
Can a Gatherer replace a Collector? In some cases, yes. Gatherers.fold() is essentially a collector that emits into the stream. But collectors have a richer ecosystem (groupingBy, partitioningBy, teeing, etc.) and produce direct results without needing a terminal operation after them. Use the right tool for the job.
Let us look at real-world scenarios where gatherers solve problems that were painful or impossible with the old Stream API.
A financial application needs to compute a simple moving average (SMA) over a configurable window of price data points:
static GatherermovingAverage(int windowSize) { // Use windowSliding + map, composed into a single gatherer return Gatherers. windowSliding(windowSize) .andThen(Gatherer.ofSequential( Integrator.ofGreedy((Void state, List window, Gatherer.Downstream super Double> downstream) -> { double avg = window.stream() .mapToDouble(Double::doubleValue) .average() .orElse(0.0); downstream.push(avg); }) )); } // Usage: 5-day SMA List closingPrices = List.of( 150.0, 152.5, 148.0, 155.0, 153.0, 157.5, 160.0, 158.0, 162.0, 165.0 ); closingPrices.stream() .gather(movingAverage(5)) .forEach(sma -> System.out.printf("SMA: %.2f%n", sma)); // Output: // SMA: 151.70 // SMA: 153.20 // SMA: 154.70 // SMA: 156.70 // SMA: 158.10 // SMA: 160.50
Process large datasets in batches, logging progress after each batch:
record BatchResult(int batchNumber, int batchSize, List items) {} static Gatherer > batchWithProgress(int batchSize) { return Gatherer.ofSequential( () -> new Object[]{new ArrayList (), 0}, // [buffer, batchCount] Integrator.ofGreedy((state, element, downstream) -> { @SuppressWarnings("unchecked") List buffer = (List ) state[0]; buffer.add(element); if (buffer.size() >= batchSize) { int batchNum = (int) state[1] + 1; state[1] = batchNum; downstream.push(new BatchResult<>(batchNum, buffer.size(), List.copyOf(buffer))); buffer.clear(); } }), // Finisher: emit remaining elements as final batch (state, downstream) -> { @SuppressWarnings("unchecked") List buffer = (List ) state[0]; if (!buffer.isEmpty()) { int batchNum = (int) state[1] + 1; downstream.push(new BatchResult<>(batchNum, buffer.size(), List.copyOf(buffer))); } } ); } // Usage: IntStream.rangeClosed(1, 23) .boxed() .gather(batchWithProgress(5)) .forEach(batch -> { System.out.printf("Processing batch %d (%d items): %s%n", batch.batchNumber(), batch.batchSize(), batch.items()); // Insert into database, send to API, etc. }); // Output: // Processing batch 1 (5 items): [1, 2, 3, 4, 5] // Processing batch 2 (5 items): [6, 7, 8, 9, 10] // Processing batch 3 (5 items): [11, 12, 13, 14, 15] // Processing batch 4 (5 items): [16, 17, 18, 19, 20] // Processing batch 5 (3 items): [21, 22, 23]
Send a large list of IDs to an API that accepts a maximum of 50 IDs per request, executing requests concurrently with a limit of 3:
record ApiResponse(int status, String body) {}
List responses = userIds.stream()
// Step 1: Chunk IDs into groups of 50
.gather(Gatherers.windowFixed(50))
// Step 2: Convert each chunk to a comma-separated query parameter
.map(chunk -> chunk.stream()
.map(String::valueOf)
.collect(Collectors.joining(",")))
// Step 3: Make concurrent API calls (max 3 at a time)
.gather(Gatherers.mapConcurrent(3, ids -> {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://api.example.com/users?ids=" + ids))
.build();
try {
HttpResponse resp = client.send(request,
HttpResponse.BodyHandlers.ofString());
return new ApiResponse(resp.statusCode(), resp.body());
} catch (Exception e) {
return new ApiResponse(500, e.getMessage());
}
}))
// Step 4: Filter successful responses
.filter(resp -> resp.status() == 200)
.toList();
This entire pipeline -- chunking, URL construction, concurrent HTTP calls, response filtering -- is expressed as a single, readable stream pipeline. Before gatherers, you would need a loop, a thread pool, a list of futures, and manual result collection.
Group log entries into sessions based on time gaps, useful for analyzing user behavior or debugging:
record LogEntry(Instant timestamp, String level, String message) {}
record LogSession(int sessionId, Duration duration, List entries) {
static LogSession from(int id, List entries) {
Duration dur = Duration.between(
entries.getFirst().timestamp(),
entries.getLast().timestamp()
);
return new LogSession(id, dur, entries);
}
}
static Gatherer groupIntoSessions(Duration maxGap) {
return Gatherer.ofSequential(
() -> new Object[]{new ArrayList(), 0},
Integrator.ofGreedy((state, entry, downstream) -> {
@SuppressWarnings("unchecked")
List current = (List) state[0];
if (!current.isEmpty()) {
Instant lastTime = current.getLast().timestamp();
if (Duration.between(lastTime, entry.timestamp()).compareTo(maxGap) > 0) {
// Gap exceeds threshold -- emit session
int sessionId = (int) state[1] + 1;
state[1] = sessionId;
downstream.push(LogSession.from(sessionId, List.copyOf(current)));
current.clear();
}
}
current.add(entry);
}),
(state, downstream) -> {
@SuppressWarnings("unchecked")
List current = (List) state[0];
if (!current.isEmpty()) {
int sessionId = (int) state[1] + 1;
downstream.push(LogSession.from(sessionId, List.copyOf(current)));
}
}
);
}
// Usage:
logEntries.stream()
.sorted(Comparator.comparing(LogEntry::timestamp))
.gather(groupIntoSessions(Duration.ofMinutes(5)))
.forEach(session -> System.out.printf(
"Session %d: %d entries, duration %s%n",
session.sessionId(), session.entries().size(), session.duration()
));
Stream Gatherers are a powerful new tool, and with great power comes the opportunity to misuse it. Here are the guidelines I follow when writing production-quality gatherers.
The state object returned by the initializer is not shared across threads unless you provide a combiner. Each thread segment gets its own state via a fresh initializer() call. However, you must ensure that:
// BAD: Shared mutable state outside the gatherer ListsharedList = new ArrayList<>(); // Danger! Gatherer bad = Gatherer.ofSequential( Integrator.ofGreedy((Void state, String element, Gatherer.Downstream super String> downstream) -> { sharedList.add(element); // Race condition in parallel streams! downstream.push(element); }) ); // GOOD: All state is inside the gatherer Gatherer good = Gatherer.ofSequential( ArrayList ::new, Integrator.ofGreedy((state, element, downstream) -> { state.add(element); // Safe -- state is per-evaluation downstream.push(element); }) );
Integrator.ofGreedy() when your gatherer never short-circuits. This gives the runtime optimization hints.new int[]{0} instead of new AtomicInteger(0)).mapConcurrent(). It uses virtual threads, which are great for I/O but provide no benefit for CPU-bound work.andThen() rather than building monolithic ones.Gatherer from static factory methods for clean API design, just like Collectors.toList().windowFixed(int size) is more reusable than a windowOfFive().// GOOD: Reusable, composable, parameterized gatherer library
public class CustomGatherers {
public static Gatherer distinctConsecutive() {
return Gatherer.ofSequential(
() -> new ArrayList(1),
Integrator.ofGreedy((state, element, downstream) -> {
if (state.isEmpty() || !Objects.equals(state.getFirst(), element)) {
state.clear();
state.add(element);
downstream.push(element);
}
})
);
}
public static Gatherer takeUntilSumExceeds(
int threshold, ToIntFunction valueExtractor) {
return Gatherer.ofSequential(
() -> new int[]{0},
Integrator.of((state, element, downstream) -> {
state[0] += valueExtractor.applyAsInt(element);
if (state[0] > threshold) return false;
return downstream.push(element);
})
);
}
public static Gatherer> groupByGap(
BiPredicate gapDetector) {
return Gatherer.ofSequential(
ArrayList::new,
Integrator.ofGreedy((group, element, downstream) -> {
if (!group.isEmpty() && gapDetector.test(group.getLast(), element)) {
downstream.push(List.copyOf(group));
group.clear();
}
group.add(element);
}),
(group, downstream) -> {
if (!group.isEmpty()) {
downstream.push(List.copyOf(group));
}
}
);
}
}
// Usage: compose distinct + window + fold
Stream.of(1, 1, 2, 3, 3, 4, 5, 5)
.gather(CustomGatherers.distinctConsecutive()
.andThen(Gatherers.windowFixed(2))
.andThen(Gatherers.fold(
() -> new ArrayList>(),
(acc, window) -> { acc.add(window); return acc; }
)))
.forEach(System.out::println);
// Output: [[1, 2], [3, 4], [5]]
If your gatherer buffers elements (like windowing or batching), you must provide a finisher to flush the remaining buffer. Without it, the last partial batch is silently lost:
// BAD: Missing finisher -- last partial window is lost staticGatherer > brokenWindow(int size) { return Gatherer.ofSequential( ArrayList ::new, Integrator.ofGreedy((buffer, element, downstream) -> { buffer.add(element); if (buffer.size() >= size) { downstream.push(List.copyOf(buffer)); buffer.clear(); } }) // No finisher! Elements [4, 5] are lost if stream has 5 elements and window is 3 ); } // GOOD: Finisher flushes remaining elements static Gatherer > correctWindow(int size) { return Gatherer.ofSequential( ArrayList ::new, Integrator.ofGreedy((buffer, element, downstream) -> { buffer.add(element); if (buffer.size() >= size) { downstream.push(List.copyOf(buffer)); buffer.clear(); } }), (buffer, downstream) -> { if (!buffer.isEmpty()) { downstream.push(List.copyOf(buffer)); // Flush remainder } } ); }
Gatherers are not always the right answer:
map(), filter(), or flatMap() can do the job, use them. They are simpler, more readable, and better optimized by the runtime.Collectors.reducing() or Collectors.groupingBy() are more idiomatic for terminal operations.mapConcurrent() uses virtual threads, which shine for I/O. For CPU-bound parallel work, use parallelStream() or a ForkJoinPool.List.subList() or Guava's Lists.partition() might be simpler.Stream Gatherers are the biggest expansion to the Stream API since Java 8. They fill the critical gap of custom intermediate operations, enabling stateful transformations, windowing, concurrent mapping, and short-circuiting that were previously impossible within a stream pipeline. The five built-in gatherers (fold, scan, windowFixed, windowSliding, mapConcurrent) cover the most common needs, and the Gatherer interface gives you the power to build anything else. Welcome to the next era of Java streams.