Run scalable, cost-effective workloads on AWS in parallel using AWS Batch and Step Functions
In this article, I'll discuss one of the ways you can run scalable and cost-effective computing workloads on AWS in parallel using AWS Batch and AWS Step functions. My very straightforward workload example kick-starts six python processes in parallel. Each of the processes will run the same, simple dockerized Python program that prints out a run number and the start date/time of the run, sleeps for 30 seconds (to simulate a long-ish running process) before printing out the run number and the end date/time of the run. In Step, we'll pass a different run number to each of the six jobs using a Map state. In my example, I'm using Fargate as my Batch Computing resource but you can also use regular EC2 and/or SPOT images.
This is a completely toy example of processing but it will show you the basics of what you need to develop solutions to your workload requirements and leverage the power of high-performance computing on AWS that can apply to many use cases and in the end lead to real-world cost and time savings. For example, I was working on a data pipeline consisting of several jobs running on AWS Glue that processed some very large historical data sets on a month-by-month basis for a ten-year period. The associated Glue cost of doing this amounted to several hundred US dollars. By using the techniques outlined in this article and switching to Step/Batch and running my jobs concurrently on 16xlarge SPOT instances, I was able to significantly reduce the cost and run-time of processing that same data.
Before starting, you should ensure that you have the following setup in your development environment.
Additionally, you'll need some knowledge of the following technologies.
How Docker works & deploying docker images to AWS ECR
I've covered this before in a previous LinkedIn article, please see the following for more details:
AWS Step
Again, I have written about Step before. Check out my articles below if you need a refresher.
AWS batch
AWS Batch, which to my mind is a much under-used AWS service, is a fully managed batch computing service that plans, schedules, and runs your batch workloads across the full range of AWS compute offerings, such as AWS Fargate, Amazon Elastic Compute Cloud (EC2), and Amazon EC2 Spot Instances. There is no additional charge for AWS Batch. You pay only for any AWS resources (e.g. EC2 instances, Fargate) that you use to run your workloads. To get the most out of AWS Batch you need to spend a bit of time studying and using it so please take the time to do that. This article will get you started but I'm not going into any great depth as to how Batch works or the best way to set it up and use it. Each Batch environment can and should be tailored to your specific workload in terms of the type and size of the compute instances used and the amount of CPU and memory required.
A useable AWS batch environment consists of 4 main components
Compute Environment
The Compute environment is a set of managed or unmanaged Compute resources that are used to run jobs. With managed compute environments, you can specify the desired compute type (Fargate or EC2) at several levels of detail. You can set up compute environments that use a particular type of EC2 instance, a particular model such as c5.2xlarge or m5.10xlarge. Or, you can choose only to specify that you want to use the newest instance types. You can also specify the minimum, desired and maximum number of vCPUs for the environment, along with the amount that you're willing to pay for a Spot Instance as a percentage of the On-Demand Instance price and a target set of VPC subnets. AWS Batch efficiently launches, manages, and terminates compute types as needed. You can also manage your own Compute environments. As such, you're responsible for setting up and scaling the instances in an Amazon ECS cluster that AWS Batch creates for you.
Job Queues
When you submit an AWS Batch job, you submit it to a particular job queue, where the job resides until it's scheduled onto a Compute environment. You associate one or more compute environments with a job queue. You can also assign priority values for these compute environments and even across job queues themselves. For example, you can have a high-priority queue where you submit time-sensitive jobs, and a low-priority queue for jobs that can run anytime when compute resources are cheaper.
Job Definitions
A job definition specifies how jobs are to be run. You can think of a job definition as a blueprint for the resources in your job. You can supply your job with an IAM role to provide access to other AWS resources. You also specify both memory and CPU requirements. The job definition can also control container properties, environment variables, and mount points for persistent storage. Many of the specifications in a job definition can be overridden by specifying new values when submitting individual Jobs.
Jobs
A unit of work (such as a shell script or a Docker container image) that you submit to AWS Batch. It has a name and runs as a containerized application on AWS Fargate or Amazon EC2 resources in your Compute environment, using parameters that you specify in a job definition.
AWS Batch has two different in-built ways of parallelizing your batch runs.
1) Multinode parallel jobs
This is a method of splitting up one job to run across many servers or nodes. It only makes sense to use this method if your job is coded in such a way that it can run in a distributed computing environment.
2) Array jobs
This is where you run the same (or similar) jobs in parallel on different servers.
We're not going to be using any of these two techniques to parallelize our workload. Instead, we'll use a built-in capability of Step functions itself. Like Batch, Step has two main ways of running jobs in parallel.
The Parallel job state
We could use this method but it's mainly for running disparate jobs in parallel and also becomes a bit unwieldy in terms of console setup when running lots of jobs. As our workload will consist of the same job running multiple times simultaneously but with a different input parameter for each run, it brings us to the method we will be using, namely:
The Map job state
The Map state takes an array of items as its input. It then iterates over those items and asynchronously runs your job for each item in the array, passing the array item to the job at run-time. Currently, the Map state has a limit of 40 concurrent runs. If your Map state has more iterations than this then some will not begin until previous iterations have been completed. This isn't an issue as it's possible to achieve a higher number of concurrency by using nested state machines that cascade Map states. For example, to achieve a concurrency of 500 simultaneous jobs you could build a state machine that contains a Map state that iterates 20 times, then nest that state machine inside the Map state of a higher-level state machine that, itself, iterates 25 times.
Ok, let's get started. First off we need a workload to run on our Batch environment. As I said before, this will just be a simple python program that we'll dockerize and copy to AWS's Elastic Container Registry (ECR). ECR is just a repository where you can store container images. When Batch starts its workload it pulls the Docker image from ECR and starts to run it. Here is our python program.
Recommended by LinkedIn
mypytest.py
===========
import datetime
import time
import os
def main():
start_date = datetime.datetime.now()
# The MANAGED_BY_AWS env variable is automatically set when running via STEP
# so we can tell if we're running locally or on AWS
#
runtime_env=os.getenv("MANAGED_BY_AWS")
if runtime_env == "STARTED_BY_STEP_FUNCTIONS":
# the RunNumber variable will be set in the Step Map task by
# iterating over an array of input values we supply
run_num=os.getenv('RunNumber')
else:
run_num=1
print(f"Python program {run_num} started at {start_date}")
# simulate a long'ish running task by sleeping for a bit
time.sleep(30)
end_date = datetime.datetime.now()
print(f"Python program {run_num} ended at {end_date}")
if __name__ == "__main__":
main()
And here is the corresponding docker file we'll use to build our image file. This is just a text file (without extension) called Dockerfile with the following contents.
Dockerfile
==========
FROM python:3.9 as base
WORKDIR /app
COPY ./* .
CMD ["python", "mypytest.py"]
At this point, you should build your docker image, and run it locally to ensure it works OK before pushing it to the AWS ECR service. On ECR, take note of the URI image as you'll need it later when setting up the Batch environment.
Creating the Batch Infrastructure
Now it's time to create our AWS Batch infrastructure and run our python workload on it. We need four things for this and the first of these will be our Batch Compute environment. To start, search for Batch in the AWS console and click on the Batch link. Now click on the Compute Environments link near the top of the left-hand menu of the Batch console then the Create button near the top right of the screen. Select Fargate as your provisioning model, enter a compute environment name, and, optionally, a service role. Hit Next and enter the maximum vCPUs you require before hitting Next again. On this screen enter your networking requirements; VPC, subnets, and security groups. Click Next again to review your choices and then the Create compute environment button near the bottom of the screen.
Next, click on the Job Queue link on the left-hand menu and click the Create button. Choose Fargate as the Orchestration Type then enter a Job Queue name and pick the compute environment you just created in the Connected compute environment drop-down list. Click the Create Job Queue button.
Now click on the Job definition link on the left-hand menu and click the Create button. Again, choose Fargate as the Orchestration Type, and enter a name for the job definition. Enter 120 as the timeout value and choose an appropriate Execution Role in its drop-down list. Hit the Next page button and enter the URI of your Docker image and set the vCPU count to 2 and the memory to 4GB. Delete any text within the Command field. After that, hit the Next page button again. if you have any Linux/logging settings you want to enter do that on this page, otherwise just click on the Next page button to get to the review screen. If you're happy you can click on the Create job definition button
Now, to test things out we can create a job to run on our Batch infrastructure. Select the Jobs link on the left-hand menu and click on the Submit new job button. Enter a name for your job and choose the job definition you previously created from the drop-down list. Click the Next page button. On this page set the timeout, vCPUS's, and memory to your required values and hit the Next page button again to go to the Job review screen. If all looks Ok you can click the Create job button. Monitor your submitted jobs by using the dashboard link on the left-hand menu. From here you can select the job you're interested in and drill down to its cloud watch logs to check it worked ok. My log output looked like this:-
Now that we know everything works ok for a single batch job run it's time to create our Step function and kick off a bunch of Batch jobs in parallel. In the AWS console search for Step. Click on the Step Functions link and you should see a screen listing all your existing state machines. If you don't have any existing state machines you'll likely see a screen with a Get Started button on it. If that's the case don't click this, instead, go to the left-hand menu block and click on the State machines link. From there you should see a Create State machine button near the top right-hand side and you should click on this. When you do you should see a screen like this.
Enter the text "Map" into the search box at the top left-hand side of the screen, then drag the displayed Map task into the box in the middle of the screen that says Drag first state here. In the configuration section on the right of the screen type in $.myRuns into the Path to items array text box and set the Maximum concurrency value to 6. Leave everything else as is. Now go back to the search box on the top left and type in "SubmitJob". Drag that task into the box below the Map state. In the configuration for this task enter what you like for the State Name and Batch Job Name. The entries for the Batch Job definition and Batch Job Queue should be set to the ARNs of the job definition and job queue you created when setting up the Batch infrastructure previously.
At this point, your step function screen should look something like this.
The associated Amazon State Language (ASL) for the above should be similar to this.
{
"Comment": "Run parallel batch job",
"StartAt": "Map",
"States": {
"Map": {
"Type": "Map",
"Iterator": {
"StartAt": "MyPythonJobState",
"States": {
"MyPythonJobState": {
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition": "your_batch_job_definition_arn",
"JobName": "your_batch_job_name",
"JobQueue": "your_batch_job_queue_arn"
},
"End": true
}
}
},
"ItemsPath": "$.myRuns",
"MaxConcurrency": 6
}
}
}
We just need to make a couple of tweaks to the above to get our parallel job running to work. At the end of the line "JobQueue": "your_batch_job_queue_arn" add a comma and then the following text on a new line.
"ContainerOverrides.$": "$.ContainerOverrides"
And, below the "Type": "Map", line we need to add this section of code.
"Parameters": {
"ContainerOverrides": {
"Environment": [
{
"Name": "RunNumber",
"Value.$": "$$.Map.Item.Value"
}
]
}
} ,
So your final ASL should look like this:-
{
"Comment": "Run parallel batch job",
"StartAt": "Map",
"States": {
"Map": {
"Type": "Map",
"Parameters": {
"ContainerOverrides": {
"Environment": [
{
"Name": "RunNumber",
"Value.$": "$$.Map.Item.Value"
}
]
}
},
"Iterator": {
"StartAt": "Run Python Code",
"States": {
"Run Python Code": {
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition": "your_batch_job_definition_arn",
"JobName": "your_batch_job_name",
"JobQueue": "your_batch_job_queue_arn",
"ContainerOverrides.$": "$.ContainerOverrides"
},
"End": true
}
}
},
"ItemsPath": "$.myRuns",
"MaxConcurrency": 6,
"End": true
}
}
}
This will take our input array parameter containing the number of our job run and pass it to the RunNumber environment variable in our python dockerized container. This change is done by directly editing the ASL rather than via the Workflow Studio tool.
Now save your state machine. Next, run it by going back to the screen that lists all your state machine(s). Click on the one you just created and hit the Start Execution button near the top right-hand side of the screen. When you do this you will be prompted to enter optional input parameters. In our case, we want to run 6 jobs in parallel so enter an array of numbers from 1 to 6 like this:-
{
"myRuns": [1,2,3,4,5,6]
}
If all works as expected you should see both the states in your step function turn green after a few minutes indicating that everything has worked OK. This is what my output looked like in the Table view of the Step new execution page.
You can check the runs by clicking on the Batch job links which will take you to the Batch Job details screen where you can inspect the cloud watch logs via a provided link. Importantly, when you inspect the logs, you should see that all 6 jobs more or less start and end at the same time indicating that they are indeed running simultaneously in parallel.
Ok, that's all I have for now. If you found this article useful please like and share to help spread the knowledge around and check out the following links for more details.