Java 25 Stream Gatherers

1. Introduction

Since Java 8, the Stream API has been one of the most powerful tools in your toolkit. You can filter, map, flatMap, reduce, and collect your way through most data-processing tasks with clean, declarative code. But if you have spent enough time with streams, you have inevitably hit a wall: there is no way to define your own intermediate operations.

Think about it. Java gives you Collector as an extension point for terminal operations — you can write custom collectors that fold, group, partition, or summarize data in any way you want. But for intermediate operations? You are stuck with what the API provides. If the built-in map(), filter(), flatMap(), distinct(), sorted(), peek(), limit(), skip(), and takeWhile() do not cover your use case, you have to break out of the stream pipeline, materialize into a collection, manipulate it imperatively, and then stream it again. That defeats the entire point.

Consider an analogy: imagine you are building an assembly line in a factory. Java gave you the ability to customize the packaging station at the end (collectors), but it locked down every station in the middle of the line. Want a station that groups items into batches of five? Want one that computes a running average and passes it along? Want one that deduplicates consecutive items? You had to hack around the limitation or abandon the assembly line entirely.

Java 25 changes this with Stream Gatherers (JEP 485). Gatherers are the missing counterpart to collectors — they let you define custom intermediate operations that plug directly into a stream pipeline. A gatherer can transform elements one-to-one, one-to-many, many-to-one, or many-to-many. It can carry state across elements. It can short-circuit to stop processing early. It can even support parallel execution. And just like collectors, Java ships several built-in gatherers that handle common use cases out of the box.

Stream Gatherers were previewed in Java 22 (JEP 461) and Java 23 (JEP 473), and finalized without changes in Java 24 (JEP 485), making them a standard feature in Java 25 LTS. This post covers everything you need to know: the interface anatomy, all five built-in gatherers, how to write your own, and real-world patterns that will change how you think about stream pipelines.

2. The Problem Gatherers Solve

To appreciate why gatherers matter, let us look at things you cannot do cleanly with the existing Stream API — operations that require state, context, or structural transformation between elements.

Problem 1: Sliding Windows

Suppose you have a stream of stock prices and you want to compute a 3-day moving average. You need to look at three consecutive elements at a time, slide one position forward, and produce an average for each window. There is no built-in stream operation for this. Before gatherers, your options were:

// The ugly workaround: materialize, index, and re-stream
List prices = List.of(100.0, 102.5, 101.0, 105.0, 103.5, 107.0);

List movingAverages = IntStream.range(0, prices.size() - 2)
    .mapToObj(i -> (prices.get(i) + prices.get(i + 1) + prices.get(i + 2)) / 3.0)
    .toList();
// Requires random access -- cannot work with a true stream

This only works because you materialized the data into a List first. With a true stream (say, reading from a socket or a database cursor), you cannot index into it. You need an intermediate operation that remembers previous elements.

Problem 2: Stateful Deduplication

The built-in distinct() removes all duplicates across the entire stream, but what if you only want to remove consecutive duplicates? For example, turning [1, 1, 2, 2, 2, 3, 1, 1] into [1, 2, 3, 1]. There is no built-in operation for this. You need state — specifically, you need to remember the last element you emitted.

Problem 3: Batching / Chunking

You have a stream of records and you want to group them into batches of 100 for bulk database inserts. The stream might have 10,000 elements, and you need to emit 100 lists of 100. This is a many-to-many transformation that requires an accumulator, and the built-in API has nothing for it.

Problem 4: Running Totals / Prefix Sums

You want a running total: given [1, 2, 3, 4], produce [1, 3, 6, 10]. The reduce() operation produces a single value, not a stream of intermediate results. You would have to use an external mutable variable (which violates the stream contract) or fall back to imperative code.

Why Collectors Cannot Help Here

Collectors are powerful, but they are terminal operations. They consume the entire stream and produce a single result. They cannot sit in the middle of a pipeline and emit elements downstream. Gatherers fill exactly this gap — they are intermediate operations that can carry state, transform structure, and emit zero or more elements per input, all while remaining composable with the rest of the pipeline.

Limitation Before Gatherers With Gatherers
Sliding window Materialize to list, index manually Gatherers.windowSliding(n)
Fixed-size batches Collect all, then partition Gatherers.windowFixed(n)
Running total External mutable variable (unsafe) Gatherers.scan(init, fn)
Fold to single value (intermediate) Not possible in pipeline Gatherers.fold(init, fn)
Concurrent mapping with limit Custom thread pool + futures Gatherers.mapConcurrent(n, fn)
Custom stateful logic Break pipeline, write imperative code stream.gather(myGatherer)

3. The Gatherer Interface

A gatherer is defined by the java.util.stream.Gatherer<T, A, R> interface. If you have worked with Collector<T, A, R>, the shape will feel familiar, but there are important differences. Let us break down the type parameters and the four functions that make up a gatherer.

Type Parameters

Parameter Meaning Collector Equivalent
T Type of input elements consumed from upstream Same — input type
A Type of the mutable state object (private, per-gatherer) Same — accumulator type
R Type of output elements emitted downstream Different — in Collector, R is the final result type, not a stream element type

The critical insight: a Collector’s R is the single result you get back (like a List or a Map). A Gatherer’s R is the element type of the output stream. The gatherer sits in the middle of the pipeline and produces a new stream.

The Four Functions

Every gatherer is composed of four functions. Two are required (well, one is required), and two are optional:

Function Type Required? Purpose
initializer() Supplier<A> Optional Creates the private mutable state object. Called once per stream evaluation. If omitted, the gatherer is stateless.
integrator() Integrator<A, T, R> Required The core logic. Called once per input element. Receives the state, the current element, and a Downstream handle to push output elements. Returns boolean: true to continue, false to short-circuit.
combiner() BinaryOperator<A> Optional Merges two state objects when running in parallel. Without this, the gatherer runs sequentially even on a parallel stream.
finisher() BiConsumer<A, Downstream<? super R>> Optional Called after all input elements have been processed. Can emit final elements downstream. Useful for flushing buffered state.

The Downstream Interface

The Downstream<R> object is how a gatherer emits elements to the next stage in the pipeline. It has two key methods:

public interface Downstream {
    // Push an element downstream. Returns true if more elements are accepted,
    // false if the downstream is done (e.g., a short-circuiting terminal op).
    boolean push(R element);

    // Check if the downstream is rejecting further elements.
    boolean isRejecting();
}

This is one of the key differences from Collector. A collector’s accumulator is a BiConsumer — it just consumes, with no feedback. A gatherer’s integrator gets feedback from downstream via the return value of push(). This enables short-circuiting: if the downstream says “I am done,” the gatherer can stop processing immediately.

Integrator Variants

The Integrator interface comes in two flavors:

// Standard integrator -- may short-circuit (return false)
Integrator.of((state, element, downstream) -> {
    // Process element, optionally push to downstream
    // Return false to stop processing early
    return true;
});

// Greedy integrator -- promises to never short-circuit
// The stream runtime can optimize based on this guarantee
Integrator.ofGreedy((state, element, downstream) -> {
    downstream.push(transform(element));
    // No return value needed -- always continues
});

Use Integrator.ofGreedy() when your gatherer always processes all elements (like a mapping or filtering gatherer). Use Integrator.of() when your gatherer might need to stop early (like a “take first N matching” gatherer).

How a Gatherer Relates to a Collector

Aspect Collector<T, A, R> Gatherer<T, A, R>
Pipeline position Terminal (end) Intermediate (middle)
Output Single result of type R Stream of elements of type R
Accumulator / Integrator BiConsumer<A, T> — no feedback Integrator<A, T, R> — returns boolean, has Downstream
Finisher Function<A, R> — returns the result BiConsumer<A, Downstream> — pushes to stream
Short-circuiting Not supported Supported via integrator return value
Composability Not directly composable Composable via andThen()

4. Built-in Gatherers

Java ships five built-in gatherers in the java.util.stream.Gatherers utility class. These cover the most commonly requested operations that were impossible with the old API. Let us go through each one with concrete examples.

4.1 Gatherers.fold()

fold() is a many-to-one gatherer. It works like reduce(), but as an intermediate operation that emits a single result element into the downstream when all input is consumed. Think of it as “reduce, but keep going with the pipeline.”

Signature:

static  Gatherer fold(
    Supplier initial,
    BiFunction folder
)

How it works: The initial supplier creates the starting value (the identity). The folder function takes the current accumulated value and the next input element, and returns the new accumulated value. After all input elements are processed, the final accumulated value is emitted downstream as a single element.

Example: Join strings with a semicolon delimiter

String result = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .gather(
        Gatherers.fold(
            () -> "",
            (accumulated, element) -> {
                if (accumulated.isEmpty()) return element.toString();
                return accumulated + ";" + element;
            }
        )
    )
    .findFirst()
    .get();

System.out.println(result);
// Output: 1;2;3;4;5;6;7;8;9

“Wait, can’t I just use Collectors.joining(";")?” Yes, for this specific case. But fold() is an intermediate operation — the result flows into the rest of the pipeline. You could chain more gatherers or operations after it. With a collector, the pipeline ends.

Example: Sum as intermediate operation, then continue processing

// Fold to compute sum, then map the result, then collect
List result = Stream.of(10, 20, 30)
    .gather(Gatherers.fold(() -> 0, Integer::sum))
    .map(sum -> "Total: " + sum)
    .toList();

System.out.println(result);
// Output: [Total: 60]

4.2 Gatherers.scan()

scan() is a one-to-one stateful gatherer. It produces a running accumulation — for each input element, it emits the current accumulated value. If you are familiar with functional programming, this is the classic “prefix scan” or “cumulative fold.”

Signature:

static  Gatherer scan(
    Supplier initial,
    BiFunction scanner
)

How it works: For each input element, the scanner function is applied to the current state and the element. The result becomes both the new state and the output element pushed downstream.

Example: Running sum (prefix sum)

Stream.of(1, 2, 3, 4, 5)
    .gather(Gatherers.scan(() -> 0, Integer::sum))
    .forEach(System.out::println);

// Output:
// 1
// 3
// 6
// 10
// 15

Notice the output has the same number of elements as the input — that is the one-to-one nature of scan(). Each output is the cumulative result up to that point.

Example: Running sum starting from a seed value

Stream.of(1, 2, 3, 4, 5)
    .gather(Gatherers.scan(() -> 100, (current, next) -> current + next))
    .forEach(System.out::println);

// Output:
// 101
// 103
// 106
// 110
// 115

Example: Running maximum

Stream.of(3, 1, 4, 1, 5, 9, 2, 6)
    .gather(Gatherers.scan(() -> Integer.MIN_VALUE, Integer::max))
    .forEach(System.out::println);

// Output:
// 3
// 3
// 4
// 4
// 5
// 9
// 9
// 9

4.3 Gatherers.windowFixed()

windowFixed() is a many-to-many gatherer. It groups input elements into fixed-size lists (batches). When the window is full, it is emitted downstream as a List. The last window may contain fewer elements if the stream size is not evenly divisible.

Signature:

static  Gatherer> windowFixed(int windowSize)

Example: Batch elements into groups of 3

Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
    .gather(Gatherers.windowFixed(3))
    .forEach(System.out::println);

// Output:
// [1, 2, 3]
// [4, 5, 6]
// [7, 8]

Notice how the last window [7, 8] has only two elements — it emits whatever is left when the stream ends.

Example: Bulk database inserts in batches of 100

records.stream()
    .gather(Gatherers.windowFixed(100))
    .forEach(batch -> {
        jdbcTemplate.batchUpdate(
            "INSERT INTO orders (id, amount) VALUES (?, ?)",
            batch.stream()
                .map(order -> new Object[]{order.id(), order.amount()})
                .toList()
        );
        System.out.println("Inserted batch of " + batch.size());
    });

4.4 Gatherers.windowSliding()

windowSliding() is a many-to-many gatherer that creates overlapping windows. Each window contains windowSize elements, and the window slides forward by one position for each new element. This is exactly what you need for moving averages, n-gram generation, and similar sliding-window algorithms.

Signature:

static  Gatherer> windowSliding(int windowSize)

Example: Sliding windows of size 3

Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
    .gather(Gatherers.windowSliding(3))
    .forEach(System.out::println);

// Output:
// [1, 2, 3]
// [2, 3, 4]
// [3, 4, 5]
// [4, 5, 6]
// [5, 6, 7]
// [6, 7, 8]

Each window overlaps with the previous one by windowSize - 1 elements. The input stream of 8 elements produces 6 windows of size 3.

Example: 3-day moving average of stock prices

List prices = List.of(100.0, 102.5, 101.0, 105.0, 103.5, 107.0, 106.0);

prices.stream()
    .gather(Gatherers.windowSliding(3))
    .map(window -> window.stream()
        .mapToDouble(Double::doubleValue)
        .average()
        .orElse(0.0))
    .forEach(avg -> System.out.printf("%.2f%n", avg));

// Output:
// 101.17
// 102.83
// 103.17
// 105.17
// 105.50

4.5 Gatherers.mapConcurrent()

mapConcurrent() is a one-to-one gatherer that applies a mapping function concurrently, up to a specified concurrency limit. This is incredibly useful for I/O-bound operations where you want to parallelize the mapping without converting the entire stream to parallel (which would parallelize everything, including non-I/O stages).

Signature:

static  Gatherer mapConcurrent(
    int maxConcurrency,
    Function mapper
)

Key properties:

  • Limits concurrency to maxConcurrency simultaneous invocations
  • Preserves encounter order — elements come out in the same order they went in
  • Uses virtual threads under the hood for efficient I/O handling
  • Perfect for rate-limiting API calls, database queries, or file operations

Example: Fetch URLs with concurrency limit of 5

List urls = List.of(
    "https://api.example.com/users/1",
    "https://api.example.com/users/2",
    "https://api.example.com/users/3",
    "https://api.example.com/users/4",
    "https://api.example.com/users/5",
    "https://api.example.com/users/6",
    "https://api.example.com/users/7",
    "https://api.example.com/users/8"
);

List responses = urls.stream()
    .gather(Gatherers.mapConcurrent(5, url -> {
        // This runs concurrently, up to 5 at a time
        HttpClient client = HttpClient.newHttpClient();
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(url))
            .build();
        try {
            return client.send(request, HttpResponse.BodyHandlers.ofString())
                         .body();
        } catch (Exception e) {
            return "Error: " + e.getMessage();
        }
    }))
    .toList();

// All 8 URLs are fetched, max 5 at a time, results in original order

Before mapConcurrent(), achieving this required manually managing a thread pool, submitting CompletableFuture tasks, collecting results, and handling ordering yourself. Now it is one line in a stream pipeline.

5. Using stream.gather()

The gather() method is the new intermediate operation on Stream that accepts a Gatherer. It sits alongside map(), filter(), and flatMap() in the pipeline.

Signature:

// On the Stream interface:
 Stream gather(Gatherer gatherer)

You can use gather() anywhere you would use any other intermediate operation. You can chain multiple gather() calls, mix them with map() and filter(), and end with any terminal operation.

Example: Chaining gatherers with standard operations

List result = Stream.of(10.0, 20.5, 15.3, 30.0, 25.7, 18.2, 22.1, 35.0, 28.4, 19.6)
    .filter(value -> value > 15.0)          // Standard filter
    .gather(Gatherers.windowSliding(3))      // Sliding windows of 3
    .map(window -> window.stream()           // Standard map
        .mapToDouble(Double::doubleValue)
        .average()
        .orElse(0.0))
    .gather(Gatherers.scan(() -> 0.0,        // Running sum of averages
        (sum, avg) -> sum + avg))
    .toList();                               // Terminal operation

System.out.println(result);

Composing Gatherers with andThen()

Gatherers support composition via the andThen() method. This lets you combine two gatherers into a single gatherer, which can be useful for building reusable, composable transformation pipelines.

// Create composed gatherer: scan then fold
Gatherer scanGatherer =
    Gatherers.scan(() -> 0, Integer::sum);

Gatherer foldGatherer =
    Gatherers.fold(
        () -> "",
        (result, element) -> result.isEmpty()
            ? element.toString()
            : result + ";" + element
    );

// Compose them: first scan (running sum), then fold (join into string)
Gatherer composed = scanGatherer.andThen(foldGatherer);

// These are equivalent:
String result1 = Stream.of(1, 2, 3, 4, 5)
    .gather(composed)
    .findFirst().get();

String result2 = Stream.of(1, 2, 3, 4, 5)
    .gather(scanGatherer)
    .gather(foldGatherer)
    .findFirst().get();

System.out.println(result1);
// Output: 1;3;6;10;15

System.out.println(result1.equals(result2));
// Output: true

How stream.gather() Works Under the Hood

When the stream pipeline is executed and encounters a gather() step, the following happens:

  1. A Downstream object is created that forwards elements to the next pipeline stage
  2. The gatherer’s initializer is called to create the state object
  3. The integrator function is retrieved
  4. For each input element, integrator.integrate(state, element, downstream) is called
  5. If the integrator returns false, processing stops immediately (short-circuit)
  6. After all elements (or short-circuit), the finisher is called with the final state and downstream

6. Creating Custom Gatherers

The built-in gatherers cover common cases, but the real power of the API is creating your own. There are two approaches: implementing the Gatherer interface directly, or using the factory methods Gatherer.of() and Gatherer.ofSequential().

Approach 1: Implementing the Interface

Let us build a gatherer that removes consecutive duplicate elements — something you cannot do with distinct() (which removes all duplicates globally).

import java.util.stream.Gatherer;
import java.util.function.*;

/**
 * A gatherer that removes consecutive duplicates.
 * [1, 1, 2, 2, 2, 3, 1, 1] -> [1, 2, 3, 1]
 */
public class DistinctConsecutive implements Gatherer, T> {

    @Override
    public Supplier> initializer() {
        // State: a single-element list holding the last emitted value
        return () -> new ArrayList<>(1);
    }

    @Override
    public Integrator, T, T> integrator() {
        return Integrator.ofGreedy((state, element, downstream) -> {
            if (state.isEmpty() || !Objects.equals(state.getFirst(), element)) {
                state.clear();
                state.add(element);
                downstream.push(element);
            }
            // If same as last, skip it
        });
    }

    // No combiner -- sequential only
    // No finisher -- nothing to flush
}

Usage:

Stream.of(1, 1, 2, 2, 2, 3, 1, 1, 4, 4)
    .gather(new DistinctConsecutive<>())
    .forEach(System.out::println);

// Output:
// 1
// 2
// 3
// 1
// 4

Approach 2: Using Gatherer.ofSequential()

For simpler gatherers that do not need parallel support, Gatherer.ofSequential() is more concise. Let us build the same consecutive-distinct gatherer using the factory method:

static  Gatherer distinctConsecutive() {
    return Gatherer.ofSequential(
        // Initializer: mutable container for the last seen element
        () -> new ArrayList(1),

        // Integrator
        Integrator.ofGreedy((state, element, downstream) -> {
            if (state.isEmpty() || !Objects.equals(state.getFirst(), element)) {
                state.clear();
                state.add(element);
                downstream.push(element);
            }
        })
    );
}

// Usage:
Stream.of("a", "a", "b", "b", "c", "a", "a")
    .gather(distinctConsecutive())
    .toList();
// Result: [a, b, c, a]

Approach 3: Using Gatherer.of() for Parallel Support

When you need parallel execution, use Gatherer.of() which accepts all four functions including a combiner:

static  Gatherer distinctConsecutiveParallel() {
    return Gatherer.of(
        // Initializer
        () -> new ArrayList(1),

        // Integrator
        Integrator.ofGreedy((state, element, downstream) -> {
            if (state.isEmpty() || !Objects.equals(state.getFirst(), element)) {
                state.clear();
                state.add(element);
                downstream.push(element);
            }
        }),

        // Combiner for parallel execution
        (left, right) -> {
            // When merging parallel segments, keep the state from the right segment
            // because it represents the more recent "last seen" element
            return right.isEmpty() ? left : right;
        }

        // No finisher needed
    );
}

Factory Methods Comparison

Factory Method Parameters Parallel? Use When
Gatherer.ofSequential(integrator) Integrator only No Stateless, sequential transformation
Gatherer.ofSequential(init, integrator) Initializer + Integrator No Stateful, sequential, no flush needed
Gatherer.ofSequential(init, integrator, finisher) Initializer + Integrator + Finisher No Stateful, sequential, needs final flush
Gatherer.of(init, integrator, combiner) Initializer + Integrator + Combiner Yes Stateful, parallelizable, no flush
Gatherer.of(init, integrator, combiner, finisher) All four Yes Full-featured gatherer

7. Stateful Gatherers

Stateful gatherers are where the API truly shines. These are operations that need to remember information across elements — something that was impossible to do correctly in a stream pipeline before gatherers.

7.1 Running Average

A gatherer that computes a running average, emitting the current average after each input element:

static Gatherer runningAverage() {
    // State: [sum, count]
    return Gatherer.ofSequential(
        () -> new double[]{0.0, 0.0},

        Integrator.ofGreedy((state, element, downstream) -> {
            state[0] += element;  // sum
            state[1] += 1;        // count
            downstream.push(state[0] / state[1]);
        })
    );
}

// Usage:
Stream.of(10.0, 20.0, 30.0, 40.0, 50.0)
    .gather(runningAverage())
    .forEach(avg -> System.out.printf("%.1f%n", avg));

// Output:
// 10.0
// 15.0
// 20.0
// 25.0
// 30.0

7.2 Deduplication with Memory

Unlike distinct(), which uses a HashSet internally and removes all duplicates, you might want to deduplicate within a time or count window — for example, suppress duplicate log messages within a batch of 100:

static  Gatherer deduplicateWithinWindow(int windowSize) {
    return Gatherer.ofSequential(
        () -> new Object[]{new LinkedHashSet(), 0},

        Integrator.ofGreedy((state, element, downstream) -> {
            @SuppressWarnings("unchecked")
            LinkedHashSet seen = (LinkedHashSet) state[0];
            int count = (int) state[1];

            // Reset the window when we hit the limit
            if (count > 0 && count % windowSize == 0) {
                seen.clear();
            }

            if (seen.add(element)) {
                downstream.push(element);
            }

            state[1] = count + 1;
        })
    );
}

// Usage: Suppress duplicate log levels within batches of 5
Stream.of("INFO", "WARN", "INFO", "ERROR", "WARN", "INFO", "DEBUG", "INFO", "WARN", "ERROR")
    .gather(deduplicateWithinWindow(5))
    .forEach(System.out::println);

// Output (first window of 5 input elements):
// INFO
// WARN
// ERROR
// (second window of 5 input elements -- seen set is cleared):
// INFO
// DEBUG
// WARN
// ERROR

7.3 Rate Limiting

A gatherer that enforces a maximum throughput by introducing delays when elements arrive too quickly:

static  Gatherer rateLimited(int maxPerSecond) {
    long intervalNanos = 1_000_000_000L / maxPerSecond;

    return Gatherer.ofSequential(
        // State: last emission time in nanos
        () -> new long[]{0L},

        Integrator.ofGreedy((state, element, downstream) -> {
            long now = System.nanoTime();
            long elapsed = now - state[0];

            if (elapsed < intervalNanos) {
                try {
                    Thread.sleep((intervalNanos - elapsed) / 1_000_000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }

            state[0] = System.nanoTime();
            downstream.push(element);
        })
    );
}

// Usage: Process API calls at most 10 per second
urls.stream()
    .gather(rateLimited(10))
    .gather(Gatherers.mapConcurrent(5, url -> fetchUrl(url)))
    .forEach(response -> process(response));

7.4 Session Grouping

Group elements into sessions based on a gap threshold. If two consecutive elements are more than a specified distance apart, start a new session:

record TimestampedEvent(long timestamp, String data) {}

static Gatherer> sessionGrouping(long maxGapMillis) {
    return Gatherer.ofSequential(
        // State: current session (list of events)
        ArrayList::new,

        // Integrator: add to current session or start new one
        Integrator.ofGreedy((session, event, downstream) -> {
            if (!session.isEmpty()) {
                long lastTimestamp = session.getLast().timestamp();
                if (event.timestamp() - lastTimestamp > maxGapMillis) {
                    // Gap exceeds threshold -- emit current session, start new one
                    downstream.push(List.copyOf(session));
                    session.clear();
                }
            }
            session.add(event);
        }),

        // Finisher: emit the last session
        (session, downstream) -> {
            if (!session.isEmpty()) {
                downstream.push(List.copyOf(session));
            }
        }
    );
}

// Usage: Group click events into sessions (30-second gap threshold)
List clicks = List.of(
    new TimestampedEvent(1000, "click_home"),
    new TimestampedEvent(3000, "click_products"),
    new TimestampedEvent(5000, "click_item"),
    new TimestampedEvent(60000, "click_home"),       // 55s gap -- new session
    new TimestampedEvent(62000, "click_cart"),
    new TimestampedEvent(63000, "click_checkout")
);

clicks.stream()
    .gather(sessionGrouping(30_000))
    .forEach(session -> System.out.println("Session: " + session));

// Output:
// Session: [TimestampedEvent[timestamp=1000, data=click_home], ...]
// Session: [TimestampedEvent[timestamp=60000, data=click_home], ...]

8. One-to-Many Gatherers

A one-to-many gatherer emits multiple output elements for each input element. You might think “that is what flatMap() does” — and you would be right for the stateless case. But gatherers can do stateful one-to-many transformations, which flatMap() cannot.

Example: Emit Element and Its Cumulative Sum

For each input number, emit both the number itself and the running total so far:

static Gatherer elementAndRunningTotal() {
    return Gatherer.ofSequential(
        () -> new int[]{0},  // running total

        Integrator.ofGreedy((state, element, downstream) -> {
            state[0] += element;
            downstream.push("Value: " + element);
            downstream.push("Running total: " + state[0]);
        })
    );
}

Stream.of(10, 20, 30)
    .gather(elementAndRunningTotal())
    .forEach(System.out::println);

// Output:
// Value: 10
// Running total: 10
// Value: 20
// Running total: 30
// Value: 30
// Running total: 60

Example: Expand Ranges into Individual Elements

Given a stream of range objects, expand each into individual integers — but only emit values that are unique across all ranges (stateful dedup during expansion):

record IntRange(int start, int endInclusive) {}

static Gatherer expandUniqueRanges() {
    return Gatherer.ofSequential(
        HashSet::new,  // track all emitted values globally

        Integrator.ofGreedy((seen, range, downstream) -> {
            for (int i = range.start(); i <= range.endInclusive(); i++) {
                if (seen.add(i)) {
                    downstream.push(i);
                }
            }
        })
    );
}

Stream.of(
    new IntRange(1, 5),
    new IntRange(3, 8),   // 3, 4, 5 already seen -- only emit 6, 7, 8
    new IntRange(7, 10)   // 7, 8 already seen -- only emit 9, 10
)
.gather(expandUniqueRanges())
.forEach(System.out::println);

// Output: 1 2 3 4 5 6 7 8 9 10

Example: Tokenizer -- Split Lines into Words

A gatherer that splits each input line into words, maintaining a word count across all lines:

record NumberedWord(int globalIndex, String word) {}

static Gatherer tokenize() {
    return Gatherer.ofSequential(
        () -> new int[]{0},  // global word counter

        Integrator.ofGreedy((counter, line, downstream) -> {
            String[] words = line.trim().split("\\s+");
            for (String word : words) {
                if (!word.isEmpty()) {
                    counter[0]++;
                    downstream.push(new NumberedWord(counter[0], word));
                }
            }
        })
    );
}

Stream.of("hello world", "foo bar baz", "java streams")
    .gather(tokenize())
    .forEach(System.out::println);

// Output:
// NumberedWord[globalIndex=1, word=hello]
// NumberedWord[globalIndex=2, word=world]
// NumberedWord[globalIndex=3, word=foo]
// NumberedWord[globalIndex=4, word=bar]
// NumberedWord[globalIndex=5, word=baz]
// NumberedWord[globalIndex=6, word=java]
// NumberedWord[globalIndex=7, word=streams]

9. Short-Circuiting Gatherers

Short-circuiting is one of the most powerful capabilities of gatherers. By returning false from the integrator, you tell the stream to stop processing immediately. This enables operations like "take while a condition holds, but with state" -- something that built-in takeWhile() cannot do because it is stateless.

Example: Take Until Sum Exceeds Threshold

Take elements from the stream until the cumulative sum exceeds a threshold:

static Gatherer takeUntilSumExceeds(int threshold) {
    return Gatherer.ofSequential(
        () -> new int[]{0},  // running sum

        Integrator.of((state, element, downstream) -> {
            state[0] += element;
            if (state[0] > threshold) {
                return false;  // Stop -- sum exceeded threshold
            }
            return downstream.push(element);
        })
    );
}

Stream.of(10, 20, 30, 40, 50, 60)
    .gather(takeUntilSumExceeds(55))
    .forEach(System.out::println);

// Output:
// 10
// 20
// (30 would make sum = 60 > 55, so processing stops)

Example: Take N Distinct Elements

Take elements until you have seen N distinct values. This combines state (a set of seen values) with short-circuiting:

static  Gatherer takeNDistinct(int n) {
    return Gatherer.ofSequential(
        HashSet::new,

        Integrator.of((seen, element, downstream) -> {
            seen.add(element);
            downstream.push(element);
            return seen.size() < n;  // Stop when we have N distinct values
        })
    );
}

Stream.of(1, 2, 1, 3, 2, 4, 5, 3, 6)
    .gather(takeNDistinct(4))
    .forEach(System.out::println);

// Output: 1, 2, 1, 3, 2, 4
// Stops after seeing 4th distinct value (4), but includes all elements up to that point

Example: Find First Match After N Elements

Skip the first N elements, then take the first one that matches a predicate:

static  Gatherer firstMatchAfterSkipping(int skip, Predicate predicate) {
    return Gatherer.ofSequential(
        () -> new int[]{0},  // element counter

        Integrator.of((counter, element, downstream) -> {
            counter[0]++;
            if (counter[0] > skip && predicate.test(element)) {
                downstream.push(element);
                return false;  // Found it -- stop
            }
            return true;  // Keep looking
        })
    );
}

Stream.of(2, 4, 6, 7, 8, 9, 10, 11)
    .gather(firstMatchAfterSkipping(3, n -> n % 2 != 0))
    .forEach(System.out::println);

// Output: 7
// Skipped first 3 (2, 4, 6), then found first odd number (7)

The key pattern for short-circuiting: use Integrator.of() (not ofGreedy()) and return false when you want to stop. This works even on infinite streams -- the gatherer will terminate the pipeline when the condition is met.

10. Gatherers vs Collectors

Since gatherers and collectors share a similar structure, it is important to understand exactly when to use each one. Here is a comprehensive comparison:

Aspect Collector<T, A, R> Gatherer<T, A, R>
Pipeline position Terminal -- ends the pipeline Intermediate -- pipeline continues after it
Output Single result (List, Map, String, etc.) Stream of zero or more elements
Used with stream.collect(collector) stream.gather(gatherer)
Accumulator / Integrator BiConsumer<A, T> -- no return value Integrator<A, T, R> -- returns boolean
Downstream access No -- accumulates into state Yes -- can push elements to next stage
Finisher Function<A, R> -- transforms state to result BiConsumer<A, Downstream> -- pushes to stream
Short-circuiting Not supported Supported via integrator return value
Composability Not composable with other collectors Composable via andThen()
Cardinality Many-to-one (always produces single result) Any: 1-to-1, 1-to-many, many-to-1, many-to-many
Infinite streams Cannot handle (never terminates) Can handle via short-circuiting
Use case Aggregate/summarize data Transform/reshape data flow

Decision Guide

Use a Collector when:

  • You need a single result at the end of the pipeline (a List, Map, sum, average)
  • You are aggregating data into a container
  • You want to group, partition, or summarize
  • The pipeline has no more transformations after the collection

Use a Gatherer when:

  • You need a custom transformation in the middle of the pipeline
  • You need to carry state across elements (running totals, deduplication)
  • You need to change the cardinality (batch N elements into groups, expand one element into many)
  • You need to stop early based on accumulated state
  • You want to compose multiple transformations into a reusable unit

Can a Gatherer replace a Collector? In some cases, yes. Gatherers.fold() is essentially a collector that emits into the stream. But collectors have a richer ecosystem (groupingBy, partitioningBy, teeing, etc.) and produce direct results without needing a terminal operation after them. Use the right tool for the job.

11. Practical Examples

Let us look at real-world scenarios where gatherers solve problems that were painful or impossible with the old Stream API.

11.1 Moving Average Calculator

A financial application needs to compute a simple moving average (SMA) over a configurable window of price data points:

static Gatherer movingAverage(int windowSize) {
    // Use windowSliding + map, composed into a single gatherer
    return Gatherers.windowSliding(windowSize)
        .andThen(Gatherer.ofSequential(
            Integrator.ofGreedy((Void state, List window, Gatherer.Downstream downstream) -> {
                double avg = window.stream()
                    .mapToDouble(Double::doubleValue)
                    .average()
                    .orElse(0.0);
                downstream.push(avg);
            })
        ));
}

// Usage: 5-day SMA
List closingPrices = List.of(
    150.0, 152.5, 148.0, 155.0, 153.0,
    157.5, 160.0, 158.0, 162.0, 165.0
);

closingPrices.stream()
    .gather(movingAverage(5))
    .forEach(sma -> System.out.printf("SMA: %.2f%n", sma));

// Output:
// SMA: 151.70
// SMA: 153.20
// SMA: 154.70
// SMA: 156.70
// SMA: 158.10
// SMA: 160.50

11.2 Batch Processing with Progress Tracking

Process large datasets in batches, logging progress after each batch:

record BatchResult(int batchNumber, int batchSize, List items) {}

static  Gatherer> batchWithProgress(int batchSize) {
    return Gatherer.ofSequential(
        () -> new Object[]{new ArrayList(), 0},  // [buffer, batchCount]

        Integrator.ofGreedy((state, element, downstream) -> {
            @SuppressWarnings("unchecked")
            List buffer = (List) state[0];
            buffer.add(element);

            if (buffer.size() >= batchSize) {
                int batchNum = (int) state[1] + 1;
                state[1] = batchNum;
                downstream.push(new BatchResult<>(batchNum, buffer.size(), List.copyOf(buffer)));
                buffer.clear();
            }
        }),

        // Finisher: emit remaining elements as final batch
        (state, downstream) -> {
            @SuppressWarnings("unchecked")
            List buffer = (List) state[0];
            if (!buffer.isEmpty()) {
                int batchNum = (int) state[1] + 1;
                downstream.push(new BatchResult<>(batchNum, buffer.size(), List.copyOf(buffer)));
            }
        }
    );
}

// Usage:
IntStream.rangeClosed(1, 23)
    .boxed()
    .gather(batchWithProgress(5))
    .forEach(batch -> {
        System.out.printf("Processing batch %d (%d items): %s%n",
            batch.batchNumber(), batch.batchSize(), batch.items());
        // Insert into database, send to API, etc.
    });

// Output:
// Processing batch 1 (5 items): [1, 2, 3, 4, 5]
// Processing batch 2 (5 items): [6, 7, 8, 9, 10]
// Processing batch 3 (5 items): [11, 12, 13, 14, 15]
// Processing batch 4 (5 items): [16, 17, 18, 19, 20]
// Processing batch 5 (3 items): [21, 22, 23]

11.3 Chunked HTTP Requests

Send a large list of IDs to an API that accepts a maximum of 50 IDs per request, executing requests concurrently with a limit of 3:

record ApiResponse(int status, String body) {}

List responses = userIds.stream()
    // Step 1: Chunk IDs into groups of 50
    .gather(Gatherers.windowFixed(50))

    // Step 2: Convert each chunk to a comma-separated query parameter
    .map(chunk -> chunk.stream()
        .map(String::valueOf)
        .collect(Collectors.joining(",")))

    // Step 3: Make concurrent API calls (max 3 at a time)
    .gather(Gatherers.mapConcurrent(3, ids -> {
        HttpClient client = HttpClient.newHttpClient();
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create("https://api.example.com/users?ids=" + ids))
            .build();
        try {
            HttpResponse resp = client.send(request,
                HttpResponse.BodyHandlers.ofString());
            return new ApiResponse(resp.statusCode(), resp.body());
        } catch (Exception e) {
            return new ApiResponse(500, e.getMessage());
        }
    }))

    // Step 4: Filter successful responses
    .filter(resp -> resp.status() == 200)
    .toList();

This entire pipeline -- chunking, URL construction, concurrent HTTP calls, response filtering -- is expressed as a single, readable stream pipeline. Before gatherers, you would need a loop, a thread pool, a list of futures, and manual result collection.

11.4 Log Session Grouping

Group log entries into sessions based on time gaps, useful for analyzing user behavior or debugging:

record LogEntry(Instant timestamp, String level, String message) {}

record LogSession(int sessionId, Duration duration, List entries) {
    static LogSession from(int id, List entries) {
        Duration dur = Duration.between(
            entries.getFirst().timestamp(),
            entries.getLast().timestamp()
        );
        return new LogSession(id, dur, entries);
    }
}

static Gatherer groupIntoSessions(Duration maxGap) {
    return Gatherer.ofSequential(
        () -> new Object[]{new ArrayList(), 0},

        Integrator.ofGreedy((state, entry, downstream) -> {
            @SuppressWarnings("unchecked")
            List current = (List) state[0];

            if (!current.isEmpty()) {
                Instant lastTime = current.getLast().timestamp();
                if (Duration.between(lastTime, entry.timestamp()).compareTo(maxGap) > 0) {
                    // Gap exceeds threshold -- emit session
                    int sessionId = (int) state[1] + 1;
                    state[1] = sessionId;
                    downstream.push(LogSession.from(sessionId, List.copyOf(current)));
                    current.clear();
                }
            }
            current.add(entry);
        }),

        (state, downstream) -> {
            @SuppressWarnings("unchecked")
            List current = (List) state[0];
            if (!current.isEmpty()) {
                int sessionId = (int) state[1] + 1;
                downstream.push(LogSession.from(sessionId, List.copyOf(current)));
            }
        }
    );
}

// Usage:
logEntries.stream()
    .sorted(Comparator.comparing(LogEntry::timestamp))
    .gather(groupIntoSessions(Duration.ofMinutes(5)))
    .forEach(session -> System.out.printf(
        "Session %d: %d entries, duration %s%n",
        session.sessionId(), session.entries().size(), session.duration()
    ));

12. Best Practices

Stream Gatherers are a powerful new tool, and with great power comes the opportunity to misuse it. Here are the guidelines I follow when writing production-quality gatherers.

12.1 Thread Safety

The state object returned by the initializer is not shared across threads unless you provide a combiner. Each thread segment gets its own state via a fresh initializer() call. However, you must ensure that:

  • Your state object is mutable but does not reference shared external state
  • The integrator does not capture mutable variables from outside the gatherer
  • If you provide a combiner, the merge logic is correct and does not lose data
// BAD: Shared mutable state outside the gatherer
List sharedList = new ArrayList<>();  // Danger!
Gatherer bad = Gatherer.ofSequential(
    Integrator.ofGreedy((Void state, String element, Gatherer.Downstream downstream) -> {
        sharedList.add(element);  // Race condition in parallel streams!
        downstream.push(element);
    })
);

// GOOD: All state is inside the gatherer
Gatherer good = Gatherer.ofSequential(
    ArrayList::new,
    Integrator.ofGreedy((state, element, downstream) -> {
        state.add(element);  // Safe -- state is per-evaluation
        downstream.push(element);
    })
);

12.2 Performance Considerations

  • Use Integrator.ofGreedy() when your gatherer never short-circuits. This gives the runtime optimization hints.
  • Minimize state size. Large state objects are copied per-thread-segment in parallel execution.
  • Prefer primitive arrays over boxed types for numerical state (e.g., new int[]{0} instead of new AtomicInteger(0)).
  • Avoid heavy computation in the integrator if possible. The integrator is called once per element -- keep it lightweight and push heavy work downstream.
  • Be cautious with mapConcurrent(). It uses virtual threads, which are great for I/O but provide no benefit for CPU-bound work.

12.3 Composability

  • Write focused, single-purpose gatherers. Compose them with andThen() rather than building monolithic ones.
  • Return Gatherer from static factory methods for clean API design, just like Collectors.toList().
  • Parameterize your gatherers. A windowFixed(int size) is more reusable than a windowOfFive().
// GOOD: Reusable, composable, parameterized gatherer library
public class CustomGatherers {

    public static  Gatherer distinctConsecutive() {
        return Gatherer.ofSequential(
            () -> new ArrayList(1),
            Integrator.ofGreedy((state, element, downstream) -> {
                if (state.isEmpty() || !Objects.equals(state.getFirst(), element)) {
                    state.clear();
                    state.add(element);
                    downstream.push(element);
                }
            })
        );
    }

    public static  Gatherer takeUntilSumExceeds(
            int threshold, ToIntFunction valueExtractor) {
        return Gatherer.ofSequential(
            () -> new int[]{0},
            Integrator.of((state, element, downstream) -> {
                state[0] += valueExtractor.applyAsInt(element);
                if (state[0] > threshold) return false;
                return downstream.push(element);
            })
        );
    }

    public static  Gatherer> groupByGap(
            BiPredicate gapDetector) {
        return Gatherer.ofSequential(
            ArrayList::new,
            Integrator.ofGreedy((group, element, downstream) -> {
                if (!group.isEmpty() && gapDetector.test(group.getLast(), element)) {
                    downstream.push(List.copyOf(group));
                    group.clear();
                }
                group.add(element);
            }),
            (group, downstream) -> {
                if (!group.isEmpty()) {
                    downstream.push(List.copyOf(group));
                }
            }
        );
    }
}

// Usage: compose distinct + window + fold
Stream.of(1, 1, 2, 3, 3, 4, 5, 5)
    .gather(CustomGatherers.distinctConsecutive()
        .andThen(Gatherers.windowFixed(2))
        .andThen(Gatherers.fold(
            () -> new ArrayList>(),
            (acc, window) -> { acc.add(window); return acc; }
        )))
    .forEach(System.out::println);
// Output: [[1, 2], [3, 4], [5]]

12.4 Do Not Forget the Finisher

If your gatherer buffers elements (like windowing or batching), you must provide a finisher to flush the remaining buffer. Without it, the last partial batch is silently lost:

// BAD: Missing finisher -- last partial window is lost
static  Gatherer> brokenWindow(int size) {
    return Gatherer.ofSequential(
        ArrayList::new,
        Integrator.ofGreedy((buffer, element, downstream) -> {
            buffer.add(element);
            if (buffer.size() >= size) {
                downstream.push(List.copyOf(buffer));
                buffer.clear();
            }
        })
        // No finisher! Elements [4, 5] are lost if stream has 5 elements and window is 3
    );
}

// GOOD: Finisher flushes remaining elements
static  Gatherer> correctWindow(int size) {
    return Gatherer.ofSequential(
        ArrayList::new,
        Integrator.ofGreedy((buffer, element, downstream) -> {
            buffer.add(element);
            if (buffer.size() >= size) {
                downstream.push(List.copyOf(buffer));
                buffer.clear();
            }
        }),
        (buffer, downstream) -> {
            if (!buffer.isEmpty()) {
                downstream.push(List.copyOf(buffer));  // Flush remainder
            }
        }
    );
}

12.5 When NOT to Use Gatherers

Gatherers are not always the right answer:

  • Simple transformations: If map(), filter(), or flatMap() can do the job, use them. They are simpler, more readable, and better optimized by the runtime.
  • Terminal aggregation: If you need a single result at the end, use a Collector. Gatherers.fold() is neat, but Collectors.reducing() or Collectors.groupingBy() are more idiomatic for terminal operations.
  • Pure CPU-bound parallelism: mapConcurrent() uses virtual threads, which shine for I/O. For CPU-bound parallel work, use parallelStream() or a ForkJoinPool.
  • Very simple batching: If you just need to split a known-size list into sublists, List.subList() or Guava's Lists.partition() might be simpler.

Summary

Stream Gatherers are the biggest expansion to the Stream API since Java 8. They fill the critical gap of custom intermediate operations, enabling stateful transformations, windowing, concurrent mapping, and short-circuiting that were previously impossible within a stream pipeline. The five built-in gatherers (fold, scan, windowFixed, windowSliding, mapConcurrent) cover the most common needs, and the Gatherer interface gives you the power to build anything else. Welcome to the next era of Java streams.




Subscribe To Our Newsletter
You will receive our latest post and tutorial.
Thank you for subscribing!

required
required


Leave a Reply

Your email address will not be published. Required fields are marked *