Now that we have the basis of our flow and our retry flow, let’s build this out a bit and add some functionality. Let’s start with some additional routing. Up until this point, we have very generic routing being applied when a file fails:
- Route to retry flow
- Iterate counter
- Determine whether or not it exceeds the number of failures configure
- Yes: Fail ultimately
- No: Route back to the appropriate point in the flow
Simple Routing Schema in Database
What we have created isn’t really anything that NiFi doesn’t support out of the box as of NiFi 1.10. In order to make our routing “smarter”, we are going to leverage a database. We are going to create a database schema, populate it, and create a query that will be run whenever the flow receives a failure. Let’s take a look at the table created and populated with some information specific to our flow. I’ll go ahead and call out that this is a very simple and minimal data model. This should be further refined to track changes and can be fleshed out a bit to add more functionality if needed. However, this is what we will use in our example to easily illustrate exactly what is happening.
In this table, we have the columns id, process_group, processor, attempt, and route. We can ignore id, which is the primary key. It was created only to have a primary key to ease in maintenance. We could use process_group, processor, as a composite key. However, I’ve chosen to simply give it a unique constraint. It’s probably obvious what process_group and processor denote, but what about attempt? When would the same process_group and processor combination appear more than once?
There are times when it makes sense to retry a file once, and if it still fails, do not try again. We want to allow that. Or maybe it makes sense to try a certain fix on the first retry, and a second different fix on the subsequent retry. Let’s also allow that. We want to make this as flexible as possible and this will allow us to do that.
Flow to Utilize Route
Let’s take a look at the automated fixes process group.
This is where a file will route if there is a known “fix”. When a flowfile enters, it will be routed to the appropriate processor or process group. In our example, we only have one automated fix. As stated earlier, we are intentionally creating file collisions. When we have a file collision, we are going to append the UUID to the filename. It’s a really simple example, but could be applied to other use cases. Basically, if there is an issue that you know may occur and you know how to fix the issue, you would route that issue here and programmatically fix it.
Routing in Action
Quickly let’s view an error created by a file collision and the outcome of the automated fix.
As you can see in the error, the PutFile processor failed because a “file with the same name already exists.” At this point, the flowfile was directed to the automated fixes route. The filename was updated and then sent back to the PutFile processor which created the following output.
Recording Failure State
We now have the three routes for a failed flowfile:
- Retry with modification
- Ultimate Fail
We’ve covered both 1 and 2 and briefly touched on 3, but what should we really be doing here? In some cases, it would make sense to send an alert, but that design pattern is covered in many places. Whether we alert someone or not, we want to store the state of the flowfile content and all of its attributes when it fails. This allows us to later do analysis on failures, but it also allows someone to pick up the file and modify it before pushing it back into the system, if appropriate. Let’s first talk about how to store the state of the flowfile.
In our example, we are going to store the content of the flowfile to our local disk in a dead–letter-queue directory using a PutFile. However, in a production application, this would probably store the content to something like S3. The flowfile is useless, at least in NiFi, without the corresponding attributes. So, it is essential to store them as well. Again, there are multiple options to do this, two being S3 and a database. Let’s look at a quick example of this. We are going to use a really simple NiFi flow to create and store FlowFile content as well as the attributes.
Here, we are creating a flowfile with a static string of Latin text., storing random numbers as attributes then forking the flow. The right leg of the flow is updating the filename by adding .txt before writing it to disk. The left leg updates the filename by adding .json before dumping the FlowFile attributes to the file content. Then it writes the file to disk. Again, you’d probably use something like S3 for this instead of the local disk.
Let’s take a look at the output and see how the files are related.
When it comes time to debug or modify the file, we have both the attributes and the content stored in an easy to consume format. Really quickly, let’s view the file content to show the difference. To do so, I’m going to use “diff” to get the info on one page.
- The diff command is boxed in yellow.
- The attributes stored in the json file is boxed in blue.
- The FlowFile content stored in the txt file is boxed in green.
It’s important to note that the file content would most likely be stored in some sort of binary format, like Avro. In that case you would have to deserialize the file first to make it human-readable before inspecting the file.
FlowFile Reentering the System
The last piece to add in is accepting files back into the system. In order to do this, you must expose an endpoint to accept data, handle the data, and send a response. To do this in NiFi, we are going to use a HandleHTTPRequest, EvaluateJsonPath, and HandleHTTPResponse. Our implementation will be quite simple, but this can be as advanced as your system requires.
The HandleHTTPRequest expects to get JSON that contains the location of the file content and the location of the file attributes. If either of these aren’t present, we will consider it a failure.
Once the response is sent, we need to retrieve the attributes file and store them into the FlowFile attributes. Then, we will retrieve the actual file.
Let’s take a look:
We have split the functionality into two process groups. The first is responsible for accepting and validating the request. The second retrieves the FlowFile attributes and loads them into memory before getting the file content.
As discussed earlier, this flow is really simple. The request is accepted and the properties contentLocation and attributeLocation are stored in memory.
Then the file is brought into the system as a FlowFile. Here we have used FetchFile as everything is stored locally for demonstration purposes. However, this will likely use a processor such as FetchS3Object in a production environment.
That’s it for the basic framework. It is expected that there will be some differences as well as extensions put into place before rebuilding in a production environment, but I believe that this lays out a robust framework that enhances the handling of errors that NiFi ships with.