Hi, welcome back. In this video, you will understand about how to do batch processing. What do I mean by batch process? Let me tell you the simple example. Let's assume that I have a file with millions of records. What I want to do is I want to flow to a poll for any new file in a directory.
Whenever there's such a file, I want to take the file and for each record in this file, I want to do some processing. And let's say I want to update to a database, or maybe I want to upload to Salesforce. So since we have to process millions of records, we want to do it asynchronously in an efficient way. So that job helps us to achieve this. So first of all, even before going to that job, let me tell you my use case Here's what I want to achieve. So I want to create a flow which will call for new files.
So maybe allow flow with on new or update file, you know that. So this will call for friends and get the fine. Then I want to add a transformer which will read that file and create a list. So the output filler after this transformer will be a list. Then I want to process each element of the list for each element of the list. I want to do processing in various steps.
So, maybe for each step, let's assume that there is a flow. So, let's assume that there is one more flow where I want the record to go through. Let's say this is flow one After flow one is completed, I want the record to go through flow two and finally, flow three that means for each record in this list I want each record to go through flow one flow to flow three and I want the flow to be executed only after flow one flow three to be executed only after float two that means, for record I want these flows to be executed one after the other. So, what I could do is one way I can actually use a for each scope for each scope and inside for a scope I can call I can use a flow reference refer to flow If that is successful call flow to a very successful call flow three.
So that I can, but if I use for each scope, this is synchronous processing. That means, in this list if there are million records, record two will be processed only after record one goes through F one f two and F three flows. But what I want is, I want the records in this list to be processed concurrently that hundred records be processed concurrently. And each record will go through f1 f2 and f3 flows one by one. So, using for each is not a solution because it is synchronous, and one more problem here using for each case let's assume that while processing hundred 30 Count, there is an exception in one of these flows, what will happen is the exception will get propagated, and rest of the records will not be processed. So, maybe we need to handle exceptions in each individual flow here itself.
But what I want is, if exception is thrown, while processing one of the flow, I will consider that record fee pricing of that record failing one of the flow. So, maybe if up to hundred record failures I want, I don't want to stop the pricing of the other records. If more than hundred records are failing failing means what exception is thrown in any one of these flows. So if more than hundred records or fail What I may want to do is I might I want to stop further processing of the records in this list. So, what I could do is in this case I can have error handling in each of these flows where I increment counter. So, if the count value is greater than hundred, I can write a condition in this for each loop not to process the records further, but again too much of manual things.
And again here whenever we are using for each the records are processed step by step sequentially right. So, there comes a batch job that job can help us to process the records badly. So, let us see with an example. So, in this example, let us see what I want to ask Do I just tell you step by step just follow a drag and drop of flow with HTTP listener as usual and configure it with Port 8081. So, but is slash test. Then, just for testing, I want to create a list with some numbers as payload, one 200.
So drag and drop this message. And here I want payload to be a list of one 200 numbers. So just copy paste. I just double click on it and then paste this array. So here I have a payload hard coded from one, two. 90.
Okay, so now I have a payload of type list and size nine, just to simulate how much parsing assume that there are 90 records that we process. Normally that we process more number of records, but just for testing, it's I want to process 90 Records concurrently. So now the payload isn't collection. So what I can do is in the code module, there is a batch job. I want a batch job to execute our process all these records concurrently, in one batch job. There are two parts you can see process records cons and on complete.
So in this process records, we have one back step, we just rename this as step one. Okay, in process records, we can have multiple back to back just as a drag and drop And here I can again rename it as step two, okay? So there can be record processing in multiple steps and drag and drop one more. At a name it does step three. Okay. Now, actually there are two phases right now whatever you can see here, process records phase and on complete phase.
It is on complete Phase I just dragged up a logger as of now, we'll see what I can do next. Now, in each of the steps, just drag and drop a logger just to print what is happening here. Okay, in logger one, we just write in step one payload and just write hash. Similarly, here in step two longer I write in step two, in Step three, here also write in step three and take this fight to understand the concepts I will tell you a real good use case next okay. So, right now you can see that job there are two phases process records phase and on complete phase, but in that job there is one more implicit face implicit face What is that that implicit face is called as load and dispatch face. So, let me just draw the diagram and show you graphically that three faces in that job.
So, actually can see on the graphical UI you can see only two things process recording face and on complete but there is an implicit face called as load and dispatch face. So, actually, the input payload to the batch job will be a collection right. Otherwise it doesn't make sense. So, the input to batch job will be a collection, the input load the implicit phase load and dispatch phase what we'll do is iterate through that connection and it will create one record object and populate a queue called as records queue. So, there will be something called as records queue Whenever payload comes as a list, it will iterate over each element in the list and create one date data structure of type record and populate the record key. So, all the all the elements of the list will be populated.
Once the record queue is populated, then the back job, actually stuff starts batch job instance starts in one batch job. There can be a number of steps. That means in process requests phase, there can be any number of steps. And for that job, there will be a thread pool by default of size 16 by default, which we can tune as well. So what happens is all the threads in a pool will polling for records in the record queue. Once a backdrop instances tax, the threats in a polling polling for recording no recording.
So, there are 16 threats right. By default, each thread takes one record and executes step one. That means step one is concurrently executed by the threads in the pool. So, once step one is executed again the thread will put the record back in the queue, the record kept back in the queue right. So, actually record is a data structure right and inside this record data structure, there will be some phase which is say what is the status. status of the record?
The status can be something like step one complete step by step. One field right like that. So, the status of the record will be updated by the thread and it will be kept back in the record queue. Okay after something whenever another thread picks up this record which is already processed, then you check Okay, this record has undergone step one already, then automatically call step two, and one step two is completed, again the record will be kept or pushed back into the queue. And again at some point of time, one other thread in the same pool might pick that record which has undergone step two and then understand Okay, step two is compete directly, it will call that means the record is going through step one step two step three, sequence But a lot of records are actually executed concurrently. So that is what a batch job does.
And one thing is when a thread tries to execute step two, we can configure a condition and say whether the step two would like to accept a record or not. Right. So let me show you in the diagram here. There are three steps right now. And you can see on step if I click on each tab, there is some accept policy called as no failures which is default. That means step two, whether it can accept a record or not is decided by the Accept policy.
That means when record is going through, step one, if exception is caused in step one, That will be treated as a failure for the record and the record data structure will be updated saying that status is failure in step one, etc, right. So, when the same record comes to step two, when another thread picks that record and tries to execute step two to check the Accept calls is no failure, but the record has failed, failed. So, step two will not be executed. Then it comes to step three. Now for step three, I can configure all that means step three may want to execute all the records whatever might be the case failure or success. So step three will be accept will be accepting that record, right.
So like that I have and one more thing is for each step, I can have an accept expression also. So along with tax policy, this accept expression if I have configured it has to evaluate to true, then only the record will be accepted by this step two. So in this case, what I'm getting as payload numbers right, I want step two to process maybe for all even numbers. So what I can do is I can write a simple expression like this. And just simply write payload mod two, modulus two is equals to zero that means on even numbers, it will be accepted. So step two will accept only even numbers.
Right and but step one will accept all step three also will accept or no step three out configured any accept expression, I said accept all. Okay, that now I have three steps. What we can also do is, in every step you can see there is something called as aggregator. So suppose in step two I want to write to database. I don't want to write each record, one one insert every time I want to do a, I want to do a bulk insert. That means I want to aggregate up to 50 Records.
And once 50 records are aggregated, I want to write to database right. So let's assume I have a standard database module just for now. So let's assume that I want to do an insert. If I just drag and drop an insert here what will happen for each record this insert query will be executed I don't want to execute insert for each and every record, I want to do bulk insert. So what I can do is in the aggregator for every step, I can drag and drop something called batch aggregator. And I can specify what is the aggregate size, let's say 50.
So this batch aggregator, what it will do is it will aggregate until 50 Records come. Once 50 requests come then it will invoke whatever is inside. So suppose if I'm using a database component, I'll do a bulk insert, where the payload for this bulk insert will pull this stuff costs. So at every step, if we want to do aggregation, we can actually use an ad batch aggregator right now what I do is I will just drag and drop or transformer so that a solid lager So they can display it. So just drag and drop a logger here. And for this logger, I just write a simple, I'll just write an expression as a message.
So just paste it here, whatever you have so that we can see. See what is that I'm writing a database expression. Where I'm just having expression where I have map key is logger in batch aggregate refers to and values a payload. Actually what will be the payload in this aggregator? It will be a list. So to print the list, okay.
So now step two will accept only even numbers and this aggregator will aggregate 15 Records maybe I just you here in 250, you 20 so that I can show you clearly what happens. And once all the steps are executed by all records, then the last phase of the job coalesce on complete phase will be triggered. Right in uncomplete phase, the payload, if you see, it is an object of that job result, the result of execution of this bad job. So, here as you can see in this bad job result, there are a lot of records failed records, loaded records on complete phase exceptions, if any process records successful records, always. So, we can use this batch job wizard object, extract the data inside it and maybe we can generate a report out of it. Right.
So, in this locker, I just write right now. In on complete things, and plus plus, I guess a payload dot total records, total records or process records. It'll show me the records processed. Okay. So that's it. Now I have a bad job configured you understood what is happening internally once that job is triggered, so tell me just think when will the batch of instance be created.
When the control comes after transform, the payload is a list actual the payload to the back job must be a list. Once the payload is a list, the batch job instance will be kicked off. So once a batch of instances kicked off, there are three phases in batch job processing. fastest step is load and dispatch phase. During this phase what happens? It read over each element of the list for each element of the list we'll create a data structure called as record and populate the record.
Then once the load and dispatch phases completed process requests facetimes. So for the batch job, there'll be a thread pool. All the threads in the pool will be polling for the records in the record queue. Once a thread gets a record, it'll execute step one, one step one is completed. Again, the record will be kept inside the records queue. But while keeping the record in the records queue, some variables will be upgraded in the record which tell what is the status of the record, the step one is complete or step two is complete.
What is the status of step one? Well, step one is faith. Your success etc. So, again, when the same record is picked up by and record, it will identify the current status and you try to execute the next step. And for each step that can be accept policy on accept expression. So if you want to, if you want your step to process only successful records in earlier steps, you can configure it accept courses, no failures.
And if, if you want your step to process all the records that you can keep it all right. And also, we understood that every step can have an aggregator. If you have a scenario where in every step you want to insert into database, instead of inserting each record, you can aggregate some records. And then you can insert a bulk insert kind of a thing like that. In this case when you're using batch job records are processed. by each step records are processed by multiple threads concurrently, and each record will go through the steps sequentially.
So let me just run this application and see what happens. To kick up this application. I need to actually give a request to what slash test just to trigger just for showing you an example. Okay, so at the start, yeah, it started successfully now triggered this. Let me just do a request to the last test. I hope it should have been triggered.
Now. Let's see the console. Yep, I can see lots of logs. Let me explain you here. Yep. So if you see this logs, it says starting loading face for this bad job instance a bad Java instance has been created with this instance ID.
First loading phase has been started and finished. While this loading phase is finished, then execution phase started. Here we can see the logger in step one got executed all the records and I can see most 90 all the 90 record the process, then step one finished. Then step two started, you can see step two is picking only the even numbers because I configured a condition to accept only even numbers. And you can see this is the logger in the batch aggregator. So whatever I wrote inside whatever I configured inside batch aggregator is executed only after aggregating 50 Records whatever after 50 Records only caught here.
You're right. So funny hearing you 20 requests Give backs aggregator sizes 20 soft 20 Records, whatever a captain's aggregate was executed again, all the records are processed again after 20 Records, again this is executed like that you can see and then here, step three started even before step four aggregate was executed, right. So like that you can see all the steps are going through, all the records are going through steps sequentially, but a lot of records are getting processed concurrently. And also you can see in the logs the thread number, this is the thread number, thread number one, thread number four, etc. So concurrently, records have been processed by multiple threads, right. So now, what I want is, I can also configure in this batch job level max field records Maybe until 50 records are failing failing is one exception thrown any step is a failure.
So, if any of the step throws an exception processing a record to be considered as failures, so just one record failure should not stop my batch. If 50 Records fail, then only I want my batch job to be stopped. I don't want to process any further records, I can configure max failed records right. And one more thing is, here you can see that that block size is 100. That means, by default is 100. That means Whenever a new thread is picking the record it was picking one record one record one record again it will be too time consuming, we can we can configure batch job to process in batch of let's say 10 that means at once whenever a thread goes and picks it will become it will pick up 10 Records Okay.
Then it will execute either step one or step two or step three for those reports. So that is nothing but the setting of batch block size. Okay. So, if there is something more to explain that I'll continue in next video.