DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Thread-Safety Pitfalls in XML Processing
  • Java Stream API: 3 Things Every Developer Should Know About
  • Understanding Lazy Evaluation in Java Streams
  • Exploring TakeWhile and DropWhile Functions in Java

Trending

  • ACID vs BASE: Transaction Models Explained
  • Tired of Spring Overhead? Try Dropwizard for Your Next Java Microservice
  • IoT and Cybersecurity: Addressing Data Privacy and Security Challenges
  • After 9 Years, Microsoft Fulfills This Windows Feature Request
  1. DZone
  2. Coding
  3. Java
  4. Using Java Stream Gatherers To Improve Stateful Operations

Using Java Stream Gatherers To Improve Stateful Operations

Explore how Java 24's Stream Gatherers improve stateful stream processing, using a real example: calculating percentage changes in stock quote data.

By 
Sven Loesekann user avatar
Sven Loesekann
·
May. 27, 25 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
3.1K Views

Join the DZone community and get the full member experience.

Join For Free

In the AngularPortfolioMgr project, the logic for calculating the percentage difference between stock quotes is a stateful operation, since it requires access to the previous quote.

With Java 24, Stream Gatherers are now finalized and offer a clean way to handle such stateful logic within the stream itself.

This eliminates the need for older workarounds, like declaring value references outside the stream (e.g., AtomicReference) and updating them inside, which often led to side effects and harder-to-maintain code.

Java Stream Gatherers

Gatherers have been introduced to enable stateful operations across multiple stream items. To support this, a Gatherer can include the following steps:

  • Initializer to hold the state
  • Integrator to perform logic and push results to the stream
  • Combiner to handle results from multiple parallel streams
  • Finisher to manage any leftover stream items

These steps allow flexible handling of stateful operations within a stream. One of the provided Gatherers is windowFixed(...), which takes a window size and maintains a collection in the initializer. The integrator fills that collection until the window size is reached, then pushes it downstream and clears it. The combiner sends merged collections downstream as they arrive. The finisher ensures any leftover items that didn’t fill a full window are still sent.

A practical use case for windowFixed(...) is batching parameters for SQL IN clauses, particularly with Oracle databases that limit IN clause parameters to 1000. The NewsFeedService uses a Gatherer to solve this:

Java
 
...
final var companyReports = companyReportsStream
  .gather(Gatherers.windowFixed(999)).toList();
final var symbols = companyReports.stream()
  .flatMap(myCompanyReports -> this.symbolRepository
    .findBySymbolIn(myCompanyReports.stream()
      .map(SymbolToCikWrapperDto.CompanySymbolDto::getTicker).toList())
...


With this pattern, many stateful operations can now be handled within the stream, minimizing the need for external state. This leads to cleaner stream implementations and gives the JVM's HotSpot optimizer more room to improve performance by eliminating side effects.

A Use Case for Java Stream Gatherer

The use case for stream Gatherers is calculating the percentage change between closing prices of stock quotes. To calculate the change, the previous quote is needed. That was the implementation before Java 24: the previous value had to be stored outside the stream. This approach relied on side effects, which made the code harder to reason about and less efficient.

With Gatherers, this stateful logic can now be implemented inside the stream, making the code cleaner and more optimized.

Java
 
private LinkedHashMap<LocalDate, BigDecimal> calcClosePercentages(
  List<DailyQuote> portfolioQuotes, final LocalDate cutOffDate) {
  record DateToCloseAdjPercent(LocalDate localDate, 
    BigDecimal closeAdjPercent) { }
  final var lastValue = new AtomicReference<BigDecimal>(
    new BigDecimal(-1000L));
  final var closeAdjPercents = portfolioQuotes.stream()
    .filter(myQuote -> cutOffDate.isAfter(
      myQuote.getLocalDay()))
    .map(myQuote -> {
      var result = new BigDecimal(-1000L);
      if (lastValue.get().longValue() > -900L) {
        result = myQuote.getAdjClose()
          .divide(lastValue.get(), 25, RoundingMode.HALF_EVEN)
          .multiply(new BigDecimal(100L));
      }
      lastValue.set(myQuote.getAdjClose());
      return new DateToCloseAdjPercent(myQuote.getLocalDay(), result);
    })	
    .sorted((a, b) -> a.localDate().compareTo(b.localDate()))
    .filter(myValue -> 
      myValue.closeAdjPercent().longValue() < -900L)
    .collect(Collectors.toMap(DateToCloseAdjPercent::localDate, 
      DateToCloseAdjPercent::closeAdjPercent, 
      (x, y) -> y, LinkedHashMap::new));
    return closeAdjPercents;
}


The lastValue is stored outside of the stream in an AtomicReference. It is initialized with -1000, as negative quotes do not exist—making -100 the lowest possible real value. This ensures that the initial value is filtered out before any quotes are collected, using a filter that excludes percentage differences smaller than -900.

The Java 24 implementation with Gatherers in the PortfolioStatisticService looks like this:

Java
 
private LinkedHashMap<LocalDate, BigDecimal> calcClosePercentages(
  List<DailyQuote> portfolioQuotes,final LocalDate cutOffDate) {
  final var closeAdjPercents = portfolioQuotes.stream()
    .filter(myQuote -> cutOffDate.isAfter(myQuote.getLocalDay()))
    .gather(calcClosePercentage())
    .sorted((a, b) -> a.localDate().compareTo(b.localDate()))
    .collect(Collectors.toMap(DateToCloseAdjPercent::localDate,
      DateToCloseAdjPercent::closeAdjPercent, 
      (x, y) -> y, LinkedHashMap::new));
    return closeAdjPercents;
}

private static Gatherer<DailyQuote, AtomicReference<BigDecimal>, 
  DateToCloseAdjPercent> calcClosePercentage() {
  return Gatherer.ofSequential(
    // Initializer
    () -> new AtomicReference<>(new BigDecimal(-1000L)),
    // Integrator
    (state, element, downstream) -> {
      var result = true;
      if (state.get().longValue() > -900L) {
        var resultPercetage = element.getAdjClose()
          .divide(state.get(), 25, RoundingMode.HALF_EVEN)
	  .multiply(new BigDecimal(100L));
	result = downstream.push(new DateToCloseAdjPercent(
          element.getLocalDay(), resultPercetage));
      }
      state.set(element.getAdjClose());
      return result;
    });	
}


In the method calcClosePercentages(...), the record DateToCloseAdjPercent(...) has moved to class level because it is used in both methods. The map operator has been replaced with .gather(calcClosePercentage(...)). The filter for the percentage difference smaller than -900 could be removed because that is handled in the Gatherer.

In the method calcClosePercentage(...), the Gatherer is created with Gatherer.ofSequential(...) because the calculation only works with ordered sequential quotes.

First, the initializer supplier is created with the initial value of BigDecimal(1000L). Second, the integrator is created with (state, element, downstream). The state parameter has the initial state of AtomicReference<>(new BigDecimal(-1000)) that is used for the previous closing of the quote. The element is the current quote that is used in the calculation. The downstream is the stream that the result is pushed to. The result is a boolean that shows if the stream accepts more values. It should be set to true or the result of downstream.push(...), unless an exception occurs that cannot be handled. The downstream parameter is used to push the DateToCloseAdjPercent record to the stream. Values not pushed are effectively filtered out. The state parameter is set to the current quote’s close value for the next time the Gatherer is called. Then the result is returned to inform the stream whether more values are accepted.

Conclusion

This is only one of the use cases that can be improved with Gatherers. The use of value references outside of the stream to do stateful operations in streams is quite common and is no longer needed. That will enable the JVM to optimize more effectively, because with Gatherers, HotSpot does not have to handle side effects. With the Gatherers API, Java has filled a gap in the Stream API and now enables elegant solutions for stateful use cases.

Java offers prebuilt Gatherers like Gatherers.windowSliding(...) and Gatherers.windowFixed(...) that help solve common use cases.

The reasons for a Java 25 LTS update are:

  • Thread pinning issue of virtual threads is mitigated → better scalability
  • Ahead-of-Time Class Loading & Linking → faster application startup for large applications
  • Stream Gatherers → cleaner code, improved optimization (no side effects)
Java (programming language) Stream (computing)

Opinions expressed by DZone contributors are their own.

Related

  • Thread-Safety Pitfalls in XML Processing
  • Java Stream API: 3 Things Every Developer Should Know About
  • Understanding Lazy Evaluation in Java Streams
  • Exploring TakeWhile and DropWhile Functions in Java

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

OSZAR »