Using Java Stream Gatherers To Improve Stateful Operations
Explore how Java 24's Stream Gatherers improve stateful stream processing, using a real example: calculating percentage changes in stock quote data.
Join the DZone community and get the full member experience.
Join For FreeIn the AngularPortfolioMgr project, the logic for calculating the percentage difference between stock quotes is a stateful operation, since it requires access to the previous quote.
With Java 24, Stream Gatherers
are now finalized and offer a clean way to handle such stateful logic within the stream itself.
This eliminates the need for older workarounds, like declaring value references outside the stream (e.g., AtomicReference
) and updating them inside, which often led to side effects and harder-to-maintain code.
Java Stream Gatherers
Gatherers
have been introduced to enable stateful operations across multiple stream items. To support this, a Gatherer
can include the following steps:
- Initializer to hold the state
- Integrator to perform logic and push results to the stream
- Combiner to handle results from multiple parallel streams
- Finisher to manage any leftover stream items
These steps allow flexible handling of stateful operations within a stream. One of the provided Gatherers
is windowFixed(...)
, which takes a window size and maintains a collection in the initializer. The integrator fills that collection until the window size is reached, then pushes it downstream and clears it. The combiner sends merged collections downstream as they arrive. The finisher ensures any leftover items that didn’t fill a full window are still sent.
A practical use case for windowFixed(...)
is batching parameters for SQL IN
clauses, particularly with Oracle databases that limit IN
clause parameters to 1000.
The NewsFeedService uses a Gatherer
to solve this:
...
final var companyReports = companyReportsStream
.gather(Gatherers.windowFixed(999)).toList();
final var symbols = companyReports.stream()
.flatMap(myCompanyReports -> this.symbolRepository
.findBySymbolIn(myCompanyReports.stream()
.map(SymbolToCikWrapperDto.CompanySymbolDto::getTicker).toList())
...
With this pattern, many stateful operations can now be handled within the stream, minimizing the need for external state. This leads to cleaner stream implementations and gives the JVM's HotSpot optimizer more room to improve performance by eliminating side effects.
A Use Case for Java Stream Gatherer
The use case for stream Gatherers
is calculating the percentage change between closing prices of stock quotes. To calculate the change, the previous quote is needed. That was the implementation before Java 24: the previous value had to be stored outside the stream. This approach relied on side effects, which made the code harder to reason about and less efficient.
With Gatherers
, this stateful logic can now be implemented inside the stream, making the code cleaner and more optimized.
private LinkedHashMap<LocalDate, BigDecimal> calcClosePercentages(
List<DailyQuote> portfolioQuotes, final LocalDate cutOffDate) {
record DateToCloseAdjPercent(LocalDate localDate,
BigDecimal closeAdjPercent) { }
final var lastValue = new AtomicReference<BigDecimal>(
new BigDecimal(-1000L));
final var closeAdjPercents = portfolioQuotes.stream()
.filter(myQuote -> cutOffDate.isAfter(
myQuote.getLocalDay()))
.map(myQuote -> {
var result = new BigDecimal(-1000L);
if (lastValue.get().longValue() > -900L) {
result = myQuote.getAdjClose()
.divide(lastValue.get(), 25, RoundingMode.HALF_EVEN)
.multiply(new BigDecimal(100L));
}
lastValue.set(myQuote.getAdjClose());
return new DateToCloseAdjPercent(myQuote.getLocalDay(), result);
})
.sorted((a, b) -> a.localDate().compareTo(b.localDate()))
.filter(myValue ->
myValue.closeAdjPercent().longValue() < -900L)
.collect(Collectors.toMap(DateToCloseAdjPercent::localDate,
DateToCloseAdjPercent::closeAdjPercent,
(x, y) -> y, LinkedHashMap::new));
return closeAdjPercents;
}
The lastValue
is stored outside of the stream in an AtomicReference
. It is initialized with -1000
, as negative quotes do not exist—making -100
the lowest possible real value. This ensures that the initial value is filtered out before any quotes are collected, using a filter that excludes percentage differences smaller than -900
.
The Java 24 implementation with Gatherers
in the PortfolioStatisticService looks like this:
private LinkedHashMap<LocalDate, BigDecimal> calcClosePercentages(
List<DailyQuote> portfolioQuotes,final LocalDate cutOffDate) {
final var closeAdjPercents = portfolioQuotes.stream()
.filter(myQuote -> cutOffDate.isAfter(myQuote.getLocalDay()))
.gather(calcClosePercentage())
.sorted((a, b) -> a.localDate().compareTo(b.localDate()))
.collect(Collectors.toMap(DateToCloseAdjPercent::localDate,
DateToCloseAdjPercent::closeAdjPercent,
(x, y) -> y, LinkedHashMap::new));
return closeAdjPercents;
}
private static Gatherer<DailyQuote, AtomicReference<BigDecimal>,
DateToCloseAdjPercent> calcClosePercentage() {
return Gatherer.ofSequential(
// Initializer
() -> new AtomicReference<>(new BigDecimal(-1000L)),
// Integrator
(state, element, downstream) -> {
var result = true;
if (state.get().longValue() > -900L) {
var resultPercetage = element.getAdjClose()
.divide(state.get(), 25, RoundingMode.HALF_EVEN)
.multiply(new BigDecimal(100L));
result = downstream.push(new DateToCloseAdjPercent(
element.getLocalDay(), resultPercetage));
}
state.set(element.getAdjClose());
return result;
});
}
In the method calcClosePercentages(...)
, the record DateToCloseAdjPercent(...)
has moved to class level because it is used in both methods. The map operator has been replaced with .gather(calcClosePercentage(...))
. The filter for the percentage difference smaller than -900
could be removed because that is handled in the Gatherer
.
In the method calcClosePercentage(...)
, the Gatherer
is created with Gatherer.ofSequential(...)
because the calculation only works with ordered sequential quotes.
First, the initializer supplier is created with the initial value of BigDecimal(1000L)
. Second, the integrator is created with (state, element, downstream)
. The state parameter has the initial state of AtomicReference<>(new BigDecimal(-1000))
that is used for the previous closing of the quote. The element
is the current quote that is used in the calculation. The downstream
is the stream that the result is pushed to. The result is a boolean that shows if the stream accepts more values. It should be set to true
or the result of downstream.push(...)
, unless an exception occurs that cannot be handled. The downstream
parameter is used to push the DateToCloseAdjPercent
record to the stream. Values not pushed are effectively filtered out. The state
parameter is set to the current quote’s close value for the next time the Gatherer
is called. Then the result is returned to inform the stream whether more values are accepted.
Conclusion
This is only one of the use cases that can be improved with Gatherers
. The use of value references outside of the stream to do stateful operations in streams is quite common and is no longer needed. That will enable the JVM to optimize more effectively, because with Gatherers
, HotSpot does not have to handle side effects. With the Gatherers
API, Java has filled a gap in the Stream API and now enables elegant solutions for stateful use cases.
Java offers prebuilt Gatherers
like Gatherers.windowSliding(...)
and Gatherers.windowFixed(...)
that help solve common use cases.
The reasons for a Java 25 LTS update are:
- Thread pinning issue of virtual threads is mitigated → better scalability
- Ahead-of-Time Class Loading & Linking → faster application startup for large applications
- Stream
Gatherers
→ cleaner code, improved optimization (no side effects)
Opinions expressed by DZone contributors are their own.
Comments