The Gatherer API
Introducing Gatherers
Starting with the JDK 24 you can use a specific API to model your intermediate operations in the Stream API, called the Gatherers API. Design-wise it is similar to the Collector API for terminal Stream operations.
Why did the Stream API needed such a feature? The Stream API is a very rich API, that gives you many ways of processing in-memory data, following the map-filter-reduce pattern. The Stream API builds on the Spliterator API, to model parallelizability of operations. The versality of these two patterns gives many possibilities, if not every possibility. The only drawback is that the Spliterator API is not easy to use and does not lead to simple code nor readable. Plus, if you need to leverage parallel streams, it may become very tricky to use.
Organization of the Gatherer API
The Gatherer API brings simpler patterns than the Spliterator API, with excellent support for parallelism. In fact, you can use a gatherer that does not support parallelism, in a parallel stream, and still benefit from the performance parallel streams brings you. If your Gatherer is sequential, it will of course limit the parallelizability of the resulting stream evaluation, but not suppress it. This is something that the Spliterator API does not give you.
The Gatherer API is built on two main elements: a Gatherer
interface and a Gatherers
factory class. There are also a number of interfaces used to interact with gatherers, as well as classes to implement these.
A Gatherer
is an object that you can pass to a method of the Stream
interface: gather()
. This gather()
method is an intermediate operation of the Stream API, and this object models what this intermediate operation is doing.
When to Use Gatherers?
The Gatherer API is not a simple API, and you should not use it unless you have good reasons to do so. Even if it can model the simplest Stream operations, like map, filter, or flat-map, building a gatherer for that would be complex, and would lead to hard to understand code. Your good old map()
method does a mapping in a simple way, so this is what you should choose for your application.
The Gatherer API is there to create complex operations, that are not already available in the Stream API. Here are some examples.
- Create a stream made of fixed sized lists of consecutive elements from that stream.
- Create a distinct-like operation, with a custom way of comparing elements.
- Create a complex map-filter operation, maybe involving some flat-mapping and some optional handling, that you want to fuse in a single operation, properly named, to make your code more readable.
There are a number of things that the Gatherer API can do, that you are going to discover one by one in this section. Let us start with how you can integrate elements to a downstream, given what you get from a upstream.
Integrating Elements in a Downstream
Learning how the Gatherer API is working is probably a frustrating process, as you need to go through a series of simple examples that you should not use in your application. The first examples that this section covers are about mapping, filtering, or flat-mapping streams, operations that you can conduct with the classical Stream.map()
, Stream.filter()
, or Stream.flatMap()
methods. These examples have no other goals than to show you the different elements that compose a gatherer in a simple way, and learn how you can use them.
Fixing Some Vocabulary
Let us fix some vocabulary first. A gatherer models an intermediate operations on a stream. As such it operates on a stream, consumes the elements this stream produces, do something with them, and pushes (or not) elements to a downstream.
Let us call the first stream the elements are coming from the upstream, and the stream this gatherer pushes elements to the downstream.
Using an Integrator to Map a Stream
Let us start with a simple first example: the mapping of a stream of strings of characters.
Writing a gatherer is about implementing the Gatherer
interface, which turns to be a functional interface. You can implement it with a lambda, and you will see how to do it in a few minutes. But you can also create a gatherer with one of the few factory methods available on this interface.
For a simple gatherer you can use the Gatherer.of()
method, that takes an Integrator
as a parameter. This method has overloads that are covered later in this part. So to write the gatherer you need, you first need to write this integrator.
The role of an integrator is to consume elements from the upstream, and to push elements to the downstream. These elements could be the same, of the same type, or not.
At this point, you can see that an integrator needs two elements to work with: an element from the upstream, and an object to model the downstream, with a way to push elements to it. It turns out that an integrator works a third element, which is an internal state, that you can define within your gatherer implementation. This state is covered later in this section, we are going to leave it for now.
The following code shows you how you can define an integrator, a gatherer, and how you can use it in a stream. This gatherer does not do anything, but it shows you how you can put all these elements together.
Running this code produces the following.
result = [one, two, three]
The first line is the following.
It is an implementation of the abstract method you can find on the Integrator
.
The example you wrote defines an integrator of type <Void, String, String>
. The first type is the type of the internal state that we are not using here. The second type is the type of elements consumed from the upstream, and the third type is the type of elements produced to the downstream.
Note that this method returns a boolean
. Fortunately the downstream.push()
method also returns a boolean
, reason why you can compile and execute the example. This boolean
is very important, and its exact role is covered later in this section.
You can now write a mapper that does some mapping (instead of the identity function). Let us put this gatherer in a method, so that you can pass your mapper as an argument.
Running this code produces the following result.
result = [ONE, TWO, THREE]
You can change the function on line 8 to the following. Note that, thanks to the use of var
you do not need to change the type of the gatherer you produce.
The result then becomes the following.
result = [3, 3, 5]
Using a Gatherer to Filter a Stream
A gatherer can decide to push an element to a downstream or not. In that case, it can act as a filtering operation. Writing such a gatherer is a slight modification of the mapping gatherer you wrote in the previous section.
Running this code produces the following.
result = [one, two]
Note that there is a return true
on line 7 of this code. This may not be quite right. You will see the correct code you need to put here later in this section.
Using a Gatherer to Flat-Map a Stream
The third basic operation that you may have in mind is of course the flat-map operation. Mapping a stream does not change the number of elements this stream processes. Filtering it, can reduce this number, even push it to 0, where the flat-mapping can either reduce it, or increase it.
Suppose that your flat-map operation is about creating a stream of the letters of a string of characters. You can write it in this way.
Running this code produces the following.
result = [o, n, e, t, w, o, t, h, r, e, e]
Note that the line 5 of this code: return true
looks very suspicious. And indeed, this is not how you should manage the boolean returned by the downstream::push
call. You will see why this code is wrong, and how to fix it later in this section.
Note also that there are at least two subtle bugs in the writing of flatMapper.apply(element).forEach(downstream::push)
, that we will discuss later. Using this code in the previous example is fine though.
Fusing Mappings, Filterings and Flat-Mappings
So far all the gatherers you saw are examples that would be way too complex to use in production. The equivalent methods from the Stream API are much simpler and will make your code much more readable.
If you are really after performance, fusing these operations into a single gatherer can give you better results than chaining several method calls on the Stream API. The readability of your code may suffer, so this is really a matter of choice, taking into account the context of your application.
Let us fuse the three operations from this section into a single gatherer.
Running this code produces the following result.
result = [O, N, E, T, W, O]
Once again, take this example with a grain of salt, as the return true
on line 12 is not the right code. More on that later in this section.
Why could this code run faster? Because it creates fewer objects than the equivalent stream code. Each stream method call creates a new Stream object, that represents an overhead. In this code, there are only two Stream objects created, which may be an optimization.
You will learn about chaining gatherers later in this section, which gives another, better (and recommended!) way to fuse several gatherer operations into one.In any case, you need to measure if this kind of code brings you any gain, on a production environment, before choosing to use it.
Note that the subtle bug that we mentioned in the previous example is still there in the writing of flatMapper.apply(mapped).forEach(downstream::push)
. So now is the time to fix it.
Writing a Correct Flat-Mapper
There are at least two bugs in the following code: flatMapper.apply(mapped).forEach(downstream::push)
, plus a third one.
The first bug is that a Downstream
instance is not a thread-safe object. Calling forEach(downstream::push)
is violating one of the rules of the Stream API: do not do any side effect from within Stream. If you do not follow this rule, you should be extra careful with parallel streams: they can create race conditions, which is exactly the case here. The solution is to protect yourself against a flatMapper
that could return a parallel stream, and generate race conditions in the Downstream
that is given to you. You can do so by just calling sequential on the returned stream: flatMapper.apply(mapped).sequential()
. If the stream is already sequential, this call does not do anything, it returns this
, and if it is a parallel stream, then it makes it non-parallel.
The second bug has to do with the fact that a stream needs to be closed once used. In many cases calling Stream.close()
does not do anything. But there are cases where a stream is opened on an I/O resource (like Files.lines()
) where not calling it would lead to some resource leaking. Since a Stream
implements AutoCloseable
, you can open it in a try-with-resources statement. The pattern then becomes the following.
And there is yet another one, that you should probably protect yourself against. It is our old friend the good NullPointerException
that nobody expects. Someone could pass a flat-mapper to your method, that could return null. Dealing with such a problem depends on your application. Maybe it is a bug that you need to report, and in that case you can decide to fail fast and rethrow this NullPointerException
. Or you could ignore it, and since there is nothing to push to the Downstream
, do nothing.
This last part has to do with how you can manage the boolean returned by downstream::push
, so you will see the complete, correct pattern later in this section.
Managing an Internal Mutable State
The second thing a Gatherer can do is managing an internal mutable state. Several intermediate operations use a mutable state to work in the Stream API.
limit()
has to count the number of elements that it processed before interrupting the stream.distinct()
needs to store the elements it already saw in a hashset.dropWhile()
has to switch an internal state to know if it is open or not.
There is a principle in the Stream API, which is that you should never mutate an external mutable state from within a stream. There are two reasons for that. First: doing so could prevent some optimizations to be applied to streams. And second, it would prevent you from using parallel streams, as it would create race conditions if you are not cautious.
Ignoring the First Elements of a Stream
Let us first create a dropWhile()
like behavior with a gatherer.
You may have seen that we ignored the first parameter of the integrators we wrote in the previous section. Well, now is the time to take them into account. A gatherer that manages an internal mutable state needs to know this current state to decide what to do. So the integrator takes this state as its first parameter. Since this state is mutable, an integrator can mutate it, and if it does so, the next time it is called will see this modification.
But that is not all: you also need a way to create the initial instance of this mutable state.
If what you need is a mutable collection for instance, then you can just use an implementation provided by the JDK. It does not have to be thread safe, so ArrayList
can do. If what you need is a counter, or a boolean, then you need to create your own mutable wrapper on this counter. Using an atomic variable would be overkill, as you do not need any thread safety. In this case, having just an integrator is not enough: you also need a supplier that the Gatherer implementation can call to create this initial mutable container. In the Gatherer API, this supplier is called an initializer.
Let us create a gatherer that can open the gate to let the elements flow on a predicate.
Running the previous code prints the following.
result = [4, 5, 4, 3, 2, 1]
The first element you can see in this dropWhile()
method is the definition of your mutable box. It's a simple local class, that you can declare within your method. In a next step you will see how you can make it an anonymous class. Then you can write a Supplier
to create an instance of this Box
class. This supplier is called by the implementation of your gatherer, just once.
The implementation of your integrator is different, and receives the instance of Box
that has been created. Since it is mutable, you can do whatever you need to do with it. Here, we just switch the boolean it wraps when the predicate becomes true, and in that case push the elements to the downstream. Note that this integrator always return true, which is actually not the right thing to do. We will fix this later.
The biggest difference is that you now need to call the Gatherer.ofSequential()
factory method instead of the simple Gatherer.of()
factory method. Using ofSequential()
instead of of()
tells the API that this gatherer cannot be used in parallel. You will learn more about parallel gatherers later in this section, and why you cannot use this gatherer in parallel.
You can write this gatherer in a different way, using anonymous classes, and leveraging non-denotable types.
Running the previous code gives you the same as previously.
result = [4, 5, 4, 3, 2, 1]
This code is working because an anonymous class is compiled as a non-denotable type, which is preserved as long as you do not put this variable in a non-inferred typed variable. Here, the type of the box
argument on line 4 is inferred by the compiler, this type is preserved, and you can access the field on your anonymous class.
That being said, non-denotable types breaks down as soon as you need to start providing types explicitly. Using method-local classes may be more readable, and will always work.
Removing the Duplicate in a Stream
Let me take you through a second example, which consists in implementing a distinct()
like behavior.
Everytime you consume an element, you need to check if it has already been seen or not. And in that case, you should not push it to the downstream. You can use a regular HashSet
as your mutable state, with no need to wrap it, nor make it thread-safe as it will be used in single thread.
One of the nice features of the Set.add()
method is that it returns true
if the object has been added, meaning it was not already in the set, and false
if it had not. So you can use this feature in that case.
Let us create this gatherer.
Running the previous example prints the following.
result = [1, 2, 3, 4, 5]
Once again, the return true
on line 8 is not the right code, you will fix it later.
Interrupting a Stream
Interrupting a stream is something that can be done by several intermediate operations, including limit()
and takeWhile()
.
Suppose that you have a list with one million elements, and you call the following code.
It would be very inefficient to process all the elements from ints
, reason why this limit()
call has the capacity to tell its upstream that it is not going to process any more element. It turns out that this upstream is returned by a map()
call. The implementation of this operation should manage this interruption and push it to its upstream. This upstream pulls elements from the list, so it should stop doing that.
The Gatherer API supports this behavior. It is able to check if its downstream still accepts more elements, and to transmit this information to its upstream. If this gatherer decides that it should interrupt the stream, it can also pass this information to its upstream.
You can check if your downstream still accepts elements in two ways.
- You can call
downstream.isRejecting()
that returnstrue
if the downstream does not accept any more elements. - You can also check the returned value of
downstream.push()
. It if it is false, then your downstream does not accept any more elements.
Limiting the Number of Elements Pushed to the Downstream
Let us implement a limit()
operation. Implementing such a feature requires a mutable counter, and an integrator that will both increment this counter, and check if the limit is reached. You can write the following code.
We are done returning true
. Now you can see that this code processes what the downstream is telling it.
This code is working properly, but is not the best you can write. There are three principles with the rejecting state of a downstream.
- A new downstream always starts in the non-rejecting state.
- A downstream can only switch this state from non-rejecting to rejecting. So a rejecting downstream stays in that state.
- A downstream can only switch this state when you push an element to it.
Following these three principles, these first lines of your integrator become useless.
When you first get your downstream, downstream.isRejecting()
returns false
, even if your call to gather()
is followed by a limit(0L)
. And since you return the boolean
returned by the call to downstream.push(element)
, this code cannot be called when this downstream is rejecting. So you can safely get rid of these lines.
The correct code you should use is more simple, it is the following.
Running this code produces the following.
result = [1, 2, 3]
On line 10, you push one more element, get the returned value of the downstream.push()
call, and immediately transmit it to your upstream. If the boolean you return is false
then this integrator will not be called again.
Then, if you reach line 12, it means that you have pushed enough elements to your downstream, and that you are done with it. So you return false
no matter what.
Fixing the Distinct Gatherer
With this in mind, you can now fix the previous examples that were left with some return true
as placeholders. Here is the correct version of the Distinct example, assuming that your downstream cannot start in a rejecting state.
Running this example produces the following.
result = [1, 2, 3, 4, 5]
It follows the principles of the Limit gatherer.
- Check if your downstream is accepting any more elements. If not, then return
false
, and don't do anything. - Then if you decide to push something to the downstream, and return the value that is given to you.
- If you decide not to push anything, but that you may push something in the future, then return true. If not, then return false.
In any case, you need to keep in mind that once you returned false
, you should not decide to return true
again. As the downstream of your upstream, you need to follow the rule that a downstream can never switch from the non-accepting to the accepting state.
By the way, if everything is working properly, once it returned false, your integrator should not be called anymore.
Pushing to a Rejecting Downstream
What can happen if you ignore what your downstream is telling you, and continue pushing elements to a downstream? Well nothing actually. The elements you push are silently ignored. No exception is raised if you do so. It is just a waste of resources, and an overhead you could avoid. In some cases this overhead may be very costly.
Creating Greedy Integrators
You have two factory methods on the Integrator
interface.
Integrator.of()
that takes a plainGatherer.Integrator
as a parameter.Integrator.ofGreedy()
that takes aGatherer.Integrator.Greedy
as a parameter.
There are two things that you need to keep in mind:
Gatherer.Integrator.Greedy
is an extension ofGatherer.Integrator
.- Both interfaces can be implemented with the same lambda expressions.
So you can write the following. These two integrators are strictly equivalent.
And you can also write the following. These two integrators are also strictly equivalent.
Moreover, these four integrators work the same. From a behavior point of view, there is no difference between them.
Defining an integrator as greedy means that this integrator never chooses to interrupt a stream on its own. It always transmits the rejecting state of the downstream it pushes elements to. So internally, the Gatherer API can take that into account and can activate some optimizations to execute your gatherer. If you do not follow the contract of a greedy integrator your code may fail. If you have a plain integrator that has a greedy behavior, your code will not fail, but you may miss some optimization opportunities.
So in a first step, you could choose to ignore this greedy behavior, and stick to plain integrators. Once you have set up your data processing pipeline you can then revisit each step and make the corresponding integrators greedy.
Adding a Finisher
There are cases where your gatherer needs to wait for all the elements of its upstream to start pushing elements to its downstream. This is the case if you need to create a gatherer that sorts the elements of your upstream. You cannot start producing elements until you have seen them all.
Such a gatherer needs to add all the elements it sees to an internal buffer. Then, when there is no more element pushed to its integrator, push these elements to the downstream. So far you have not seen any solution to do that. Nothing is telling your gatherer that no more elements are coming from the upstream. Actually, the upstream pushes elements to the integrator of your gatherer, and when there is no more element, nothing happens.
This is why the Gatherer API gives you another element, called a finisher, that is called by the implementation when no more elements are to be pushed to your integrator.
This finisher can be seen as a simplified integrator.
- It takes the current state of your gatherer, if you defined one by providing an initializer.
- It does not take the current element of the stream, just because when it is called, there is no more element, precisely.
- It takes the downstream so that you can push elements to it.
- It does not return anything, since after this finisher is called, nothing can happen anyway.
So a finisher takes two parameters: your state and the downstream, and does not return anything. It is modeled by a BiConsumer<State, Downstream>
.
Creating a Finisher with no State
Let us write a simple finisher, in a case where your gatherer does not define any internal mutable state.
Here we define a mapping gatherer, and, for some reason, we need to add "DONE" at the end of the stream. The finisher you have is a biconsumer, but since this gatherer does not define any state, the first parameter is of type Void
, and is ignored.
Running the previous code produces the following.
result = [ONE, TWO, THREE, DONE]
Creating a Sorting Distinct Gatherer with a Finisher
Sorting a stream requires to add all the elements to a buffer before starting to push them to the downstream. This buffer is the internal mutable state of your gatherer. The role of the integrator is to store all the elements to this buffer. Pushing the elements is then done by the finisher.
You can then write the following code. Note that we are using a TreeSet
to make the code simpler, that has the added effect to remove the duplicates from your stream.
Running this code prints the following.
result = [one, six, two, five, four, seven, three]
Several points are worth noting in this gatherer.
- The elements are added to a
TreeSet
so that they are kept sorted. It also removes the duplicates. - The integrator does not push any element to its downstream.
- The finisher has to check if the downstream is still accepting elements after each call to
downstream.push()
. UsingtakeWhile()
proves very useful here.
Note that we are using the following pattern to push the elements to the downstream. Indeed, what you should be doing is: while it is in a non-rejecting state, push the elements to the downstream. That you can easily translate to this code, thanks to the expressiveness of the Stream API.
You can also use the following pattern, which does the same thing, in a more efficient way.
Fixing the Flat-Mapper
You can also use the previous example to finally fix the flat-mapper we talked about earlier. The complete, correct code, that deals with parallel streams, the closing of the stream, and the possible NullPointerException
is the following.
Running this code produces the same result.
result = [O, N, E, T, W, O]
Parallel Gatherers
Executing Gatherers in Parallel Streams
One of the amazing features of the Stream API is that you can distribute your computations among all the cores of your CPU by just calling parallel()
on an existing stream.
Gatherers support parallel stream in two ways.
When you create a gatherer with one of the Gatherer.of()
factory methods, you create a gatherer that can be called in parallel.
On the other hand, when you use the Gatherer.ofSequential()
, then you create a gatherer that does not support parallelization.
What does it mean to be called in parallel, or to not support parallelization? In the Stream API, if you decide to make your stream a parallel stream, then all the operations of your stream will be executed in different threads, in parallel. In a nutshell: a parallel stream splits your source of data into several chunks, and each chunk is processed in its own thread, on the different cores of your CPU. Once everything has been processed, you end up with partial results that need to be merged into one. So each intermediate operation is executed in parallel, without having to synchronize on what is happenning in other threads. There are exceptions to that, as some intermediate operations do need to synchronize. This is the case for limit()
, takeWhile()
, disctinct()
, sorted()
, and some others.
This does not hold for the Gatherer API. If you use a gatherer in a parallel stream, two things can happen.
- Your gatherer does support parallelism. In that case, everything is executed in parallel, as you expect.
- Your gatherer does not support parallelism, it is called a sequential gatherer. You created it using one of the
Gatherer.ofSequential()
methods, and such gatherers can only be used sequentially. In that case, all the operations before your call togather()
are executed in parallel, just as if it was a regular parallel stream, then your gatherer is executed in a single thread and pushes elements to a single downstream, and the rest of the stream is again executed in parallel.
So even if you have a gatherer that does not support parallelism for some reason, you can still benefit from the performance brought to you by parallel streams. This feature is unique to the Gatherer API.
Creating Parallel Gatherers
You have three patterns to choose from to create a parallel gatherer.
The first two are equivalent to the ones you have in sequential gatherers.
Gatherer.of(integrator)
. You already saw this pattern earlier in this section. Your gatherer does not rely on any internal mutable state, and it does not need any finisher. There is a sequential version of this method:Gatherer.ofSequential(integrator)
.Gatherer.of(integrator, finisher)
. This gatherer does not need any mutable state, and it has a finisher. This is also a pattern that you saw earlier in this section. There is also a sequential version of this method:Gatherer.ofSequential(integrator, finisher)
.
The third pattern declares a mutable state. In the Gatherer API this mutable state is not shared among the different threads of your parallel stream. So everytime the Stream API runs a gatherer in a thread, it creates a new instance of this mutable state for this gatherer to work with. On the one hand it makes your life easier, because you do not need to bother with a thread safe mutable state, since it is never shared.
But on the other hand, you need a way to merge these different instances together, into a single one, once you are done with your computation. It makes the corresponding factory method a little more complex, as you need to provide a fourth parameter, called a combiner, that can combine two instances of your mutable state.
So there are now four parameters for this factory method.
- The initializer, which is a
Supplier
. - The integrator, which takes the current mutable state, the element you need to integrate, and the downstream. This method returns a boolean to tell if it is done accepting elements or not.
- The combiner, which takes two instances of the mutable state and returns one. It is modeled by a
BinaryOperator
. It can return one of the two instances, or create a new one. This combiner can be called any number of times, depending on the number of times your source of data has been split. - The finisher, which is the
BiConsumer
that we already talked about. This finisher is called only once, at the very end of your computation.
If for some reason, you need an internal mutable state, but you cannot create a combiner, then the Stream API cannot run your gatherer in parallel, since it would not be able to combine the instances of this mutable state together. You would then need to use a sequential gatherer.
Let us make our sorting gatherer a parallel gatherer.
Running the previous code prints the following.
result = [one, six, two, five, four, seven, three]
Chaining Gatherers
The Gatherer API exposes a andThen()
method. You can chain existing gatherers to create more gatherers with this method.
You can see chaining in action on some of the gatherers you wrote previously in this section.
Running the previous example prints the following.
result = [THREE, FOUR, FIVE, SEVEN]
As you can imagine, there are constraints on the gatherers you want to chain. The type of the elements produced by the first need to match the type of the elements accepted by the second.
Creating Gatherers with the Gatherers Factory Class
The last element of the Stream API you need to be aware of is the Gatherers
factory class. There are five factory methods in it, that create ready to use gatherers that you can create in your application.
Folding
The fold()
method implements a reduce operation, implemented from left to right, useful for cases where you cannot write a combiner.
Folding looks like the reduction of a stream. But the reduction operation requires your operator to have several properties, among them associativity, and in some cases, an identity element. When your reduction operation does not have an identity element the corresponding reduce()
method returns an optional, that is empty in case you were reducing an empty stream.
Folding does not have these restrictions. A folding just applies an operator (in this case a BiFunction
) on your elements to produce a result. It also takes a Supplier
to initialize the process.
Note that this gatherer implements your folding as an intermediate operation. So you end up with a stream that can only produce a single element. If you need to get this element, you need to call findFirst().orElseThrow()
on it. If you need to process your data further, then maybe using a collector is a good solution.
You can execute the following code to see this gatherer in action.
Running the previous code prints the following.
result = {1234}
Scanning
The scan()
factory method creates a gatherer that works with two elements.
- A supplier, that creates an initial value.
- And a
BiFunction
.
Each time the upstream pushes a new element to this gatherer, it applies the BiFunction
to the previous element and this next element from the upstream, and then pushes the result to the downstream.
You can build a similar example than the fold example. Even if the two gatherers fold and scan look similar, there is one major difference, which is that the fold gatherer only pushes the last element to the downstream. So it acts as a reducer within an intermediate operation. The scan gatherer pushes all the elements as it processes them.
Running the previous code prints the following.
result = [{1}, {12}, {123}, {1234}]
Mapping Concurrently
The mapConcurrent()
factory method creates a gatherer that can map your data concurrently. The gatherer it produces is not a parallel gatherer, as it is internally created with the Gatherer.ofSequential()
factory method.
This gatherer consumes all the elements it can, then maps them using the Function
you pass as an argument. Each mapping is executed in a virtual thread launched specifically for the mapping of this element. The number of active virtual threads at a given time is controlled by the maxConcurrency
argument.
Windowing Gatherers
The last two factory methods create gatherers that work with windows. A window is an interval of indexes on your stream. Calling a windowing gatherer on a non-ordered stream (in the ORDERED
sense) does not make sense, as several runs on the same data could lead to different, inconsistent results.
There are many strategies to create such windows. The API gives you two.
- The first one, called fixed window gives you a stream of disjoint indexes:
[0,1,2], [3,4,5], [6,7,8], ...
. So no element is repeated from one window to the following. The last window can be incomplete, as the number of element the upstream can produce may not be exactly divisible by the size of the window you need. - The second one, called sliding window, gives you a stream where the first index of each window is incremented by 1:
[0,1,2], [1,2,3], [2,3,4], ...
. All the windows of this stream have the same length.
Fixed Window
The windowFixed(windowSize)
factory method creates a gatherer that stores the elements pushed by the upstream in non-modifiable lists of size windowSize
. The index of the first element of each list is incremented by windowSize
. So a given element from the upstream is stored only once in a non-modifiable list. Each list is pushed to the downstream of this gatherer when it is ready. The last list contains all the remaining elements pushed by the upstream. If you are lucky there will be the right amount (windowSize
), but your code should not rely on that.
Let us write the following example to see this gatherer in action.
Running the previous code prints the following. As you can see the last windows is shorter that the others, as there are only five elements produced by this stream.
result = [[one, two], [three, four], [five]]
Sliding Window
The windowSliding(windowSize)
factory method creates a gatherer that stores the elements pushed by the upstream in non-modifiable lists of size windowSize
. The index of the first element of each list is incremented by 1. So a given element from the upstream is stored in windowSize
non-modifiable lists. This rule does not hold for the last elements of the upstream. Each list is pushed to the downstream of this gatherer when it is ready. The last list contains the same amount of elements as all the other lists. So the last element pushed by the upstream is present only in the last non-modifiable list pushed to the downstream.
You can write a first, basic example to see this gatherer in action.
Running the previous example prints the following.
result = [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, 8], [7, 8, 9]]
Note that for this gatherer, all the windows produce have the same size, as long as the upstream produces more elements that the size of the window you chose.
As the elements of the gathered stream are themselves lists, you can further process them using the Stream API. Let us do that by creating a gatherer that computes the average of each window, by composing two gatherers.
Running the previous code prints the following.
result = [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
Last update: March 3, 2025