DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Real-Time Analytics: All Data, Any Data, Any Scale, at Any Time
  • The Evolution of Adaptive Frameworks
  • How to Design Event Streams, Part 1
  • Effortless Concurrency: Leveraging the Actor Model in Financial Transaction Systems

Trending

  • Apache Doris vs Elasticsearch: An In-Depth Comparative Analysis
  • A Guide to Developing Large Language Models Part 1: Pretraining
  • Breaking Bottlenecks: Applying the Theory of Constraints to Software Development
  • Intro to RAG: Foundations of Retrieval Augmented Generation, Part 1
  1. DZone
  2. Data Engineering
  3. Data
  4. NiFi In-Memory Processing

NiFi In-Memory Processing

This article provides a brief introduction to NiFi in-memory processing as well as how to configure the data flow and some use cases.

By 
Madhusudhan Dasari user avatar
Madhusudhan Dasari
·
Jan. 12, 24 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
6.0K Views

Join the DZone community and get the full member experience.

Join For Free

Apache NiFi is an easy-to-use, powerful, highly available, and reliable system to process and distribute data. Made for data flow between source and target systems, it is a simple robust tool to process data from various sources and targets (find more on GitHub). NiFi has 3 repositories:

  1. FlowFile Repository: Stores the metadata of the FlowFiles during the active flow
  2. Content Repository: Holds the actual content of the FlowFiles
  3. Provenance Repository: Stores the snapshots of the FlowFiles in each processor; with that, it outlines a detailed data flow and the changes in each processor and allows an in-depth discovery of the chain of events

NiFi Registry is a stand-alone sub-project of NiFi that allows version control of NiFi. It allows saving FlowFile state and sharing FlowFiles between NiFi applications. Primarily used to version control the code written in Nifi.

General Setup and Usage

As data flows from the source to the target, the data and metadata of the FlowFile reside in the FlowFile and content repositories. NiFi stores all FlowFile content on disk to ensure resilience across restarts. It also provides backpressure to prevent data consumers/sources from overwhelming the system if the target is unable to keep up for some time.

For example, ConsumeKafka receives data as a FlowFile in NiFi (through the ConsumeKafka processor). Say the target is another Kafka topic (or Hive/SQL/Postgres table) after general filters, enrichments, etc. However, if the target is unavailable, or any code fails to work as expected (i.e., the filter code or enrichment code), the flow stops due to backpressure, and ConsumeKafka won't run. Fortunately, data loss does not occur because the data is present in the content repository, and once the issue is resolved, the data resumes flowing to the target.

Most application use cases work well in this setup. However, some use cases may require a slightly different architecture than what traditional NiFi provides.

Use Cases

If a user knows that the data source they are receiving data from is both persistent and replayable, it might be more beneficial to skip storing the data (in NiFi, as FlowFile in the content repository) instead of replaying the data from the source after restarting. This approach has multiple advantages. Firstly, data could be stored in memory instead of on disk, offering better performance and faster load times. Secondly, it enables seamless data transfer between machines without any loss.

This can be achieved with the NiFi EXECUTESTATELESS processor.

How to Setup and Run

  1. First, prepare the flow you want to set up. For example: Consume Kafka receives the data as FlowFile to the content repository. Application code runs (general filters/enrichments, etc.) publish to another Kafka/writes to Hive/SQL table/Postgres table, etc.
  2. Say the code, which consumes a lot of resources on disk/CPU due to some filter/enrichment, can be converted to the EXECUTESTATELESS process and can be run in memory.
    The flow looks like this:

Consumekafka --> executestateless processor --> publish kafka/puthiveql/putdatabaserecord.

3. When the stateless process fails and because of this back pressure occurs, and data can be replayed after the issue is resolved.  As this is executed in memory, it is faster compared to a conventional NiFi run.

4. Once the above code is ready (#2), keep it in processgroup. Right-click and check the code to NiFi Registry to start version control.Start version control

5. Now complete the full setup of the code: Drag the consumekafka and set up the configs like Kafka topic/SSL config/offset, etc. properties (considering the above example). Drag the execute stateless processor and follow step 7 below to configure. Connect this to the consumekafka processor and publishkafka processor as per the flow shown in #3. Drag  publishKafka and set up the configs like Kafka topic/SSL config/any other properties like compression, etc. 

  • An important point to note: If this code uses any secrets, such as keystore/truststore passwords or database credentials, they should be configured within the processgroup for which the executestateless process is going to run. This should also be passed from the executestateless process as variables with the same name as to how the configuration is made inside the process group. 

6. The screenshot below shows the configuration of the executestateless processor:
Configuration of the executestateless processor

  • Dataflow specification strategy: Use the NiFi registry 
  • Registry URL: Configured NiFi Registry URL 
  • Registry bucket: Specific bucket name where the code has been checked
  • Flow name: The name of the flow where the code has been checked
  • Input port: The name of the port where consumekafka is connecting (considering the above example); the process group should have an input port - if you have multiple inputs, give the names as comma-separated
  • Failure port: In case of any failures, the actual code should have failure ports present and these FlowFiles can be reprocessed again. If you have multiple failure ports, give the names as comma-separated.

7. Based on the point mentioned in #6 above,  add additional variables at the end of this as shown below for any of the secrets.Configure processor

  • Content storage strategy: change it to  "store content on heap".  
    • Please note: One of the most impactful configuration options for the Processor is the configuration of the "Content Storage Strategy" property. For performance reasons, the processor can be configured to hold all FlowFiles in memory. This includes incoming FlowFiles, as well as intermediate and output FlowFiles. This can be a significant performance improvement but comes with a significant risk. The content is stored on NiFi's heap. This is the same heap that is shared by all other ExecuteStateless flows by NiFi's processors and the NiFi process itself. If the data is very large, it can quickly exhaust the heap, resulting in out-of-memory errors in NiFi. These, in turn, can result in poor performance, as well as instability of the NiFi process itself. For this reason, it is not recommended to use the "Store Content on Heap" option unless it is known that all FlowFiles will be small (less than a few MB). Also, in order to help safeguard against the case that the processor receives an unexpectedly large FlowFile, the "Max Input FlowFile Size" property must be configured when storing data on the heap. Alternatively, and by default, the "Content Storage Strategy" can be configured to store FlowFile content on disk. When this option is used, the content of all FlowFiles is stored in the configured Working Directory. It is important to note, however, that this data is not meant to be persisted across restarts. Instead, this simply provides the stateless engine with a way to avoid loading everything into memory. Upon restart, the data will be deleted instead of allowing FlowFiles to resume from where they left off (reference). 

8. The final flow looks like this:

Final Flow

Conclusion

Stateless NiFi provides a different runtime engine than traditional NiFi. It is a single-threaded runtime engine, in which data is not persisted across restarts, but this can be run in multi-threaded. Make sure to set up multiple threads (according to the use case as described below). As explained above in step 7, performance implications should be considered.

When designing a flow to use with Stateless, it is important to consider how the flow might want to receive its data and what it might want to do with the data once it is processed.  Different options are as below:

  1. The flow to fully encapsulate the source of data and all destinations: For example, it might have a ConsumeKafkaRecord processor, perform some processing, and then publish to another topic via PublishKafkaRecord. 
  2. Build a flow that sources data from some external source, possibly performing some processing, but not defining the destination of the data. For example, the flow might consist of a ConsumeKafkaRecord processor and perform some filtering and transformation, but stop short of publishing the data anywhere. Instead, it can transfer the data to an output port, which could then be used by ExecuteStateless to bring that data into the NiFi dataflow.
  3. A dataflow may not define where it receives its input from, and instead just use an input port, so that any dataflow can be built to source data, and then deliver it to this dataflow, which is responsible for preparing and delivering the data.
  4. Finally, the dataflow may define neither the source nor the destination of the data. Instead, the dataflow will be built to use an input port, it will perform some filtering/routing/transformation, and finally provide its processing results to an Output Port.
    (reference).

Both the traditional NiFi Runtime Engine and the Stateless NiFi Runtime Engine have their strengths and weaknesses. The ideal situation would be one in which users could easily choose which parts of their data flow run Stateless and which parts run in the traditional NiFi Runtime Engine.

Additional Reference

  • NiFi: ExecuteStateless
Apache NiFi Data (computing) kafka In-memory processing

Opinions expressed by DZone contributors are their own.

Related

  • Real-Time Analytics: All Data, Any Data, Any Scale, at Any Time
  • The Evolution of Adaptive Frameworks
  • How to Design Event Streams, Part 1
  • Effortless Concurrency: Leveraging the Actor Model in Financial Transaction Systems

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

OSZAR »