HRRR Zarr Cloud Example (dask)

This notebook demonstrates using dask and AWS Fargate to calculate 95th percentile September wind gust climatologies for each gridpoint from 2016-2019 for the entire CONUS in under 10 minutes.

Motivation

In this example, the "win" of cloud computing isn't anything to do with performing calculations across large data. Instead, we run a calculation across small data (2,880 hours) a number of times (there are almost 2 million grid points). I/O takes much more time than computation, so beefier machines don't help––what does help is having a lot of separate machines with their own network bandwidth. The fact that we're I/O bound and don't require many compute resources (CPU or memory) means we can request many small, cheap machines to work in parallel.

Introduction

Fargate is an AWS product that allows us to abstract away all management of physical or virtual machines. Dask Cloud Provider adds another level of ease-of-use in deploying our code. Dask Cloud Provider will create a cluster for us––48 worker nodes managed by 1 central scheduler node that will divvy up tasks and collect the computation results.

This is called an "ephemeral cluster", created to perform just one job and then disappear. It's an approach that allows us to perform the analyses we need when we need them without having to purchase physical machines, handle scheduling machine use with colleagues, or pay for cloud resources that are running idle.

Pre-reqs:

Worker code

Every worker will run the same code in parallel. For this example, we'll be writing low-level code to grab each chunk of the zarr array and hand it to a separate worker.

We should be able to use libraries for this––dask, zarr, and xarray all have built-in support for parallelizing computations over chunks. However, I was running into trouble using them since the largest dimension in the dataset (time) is represented as separate datasets, not a dimension within the data array. Zarr arrays aren't easy to update, so it makes sense to represent the hourly-updating HRRR data as multiple arrays based on time. I'll update this example if/when I find a good way to get one of the libraries to partition the data by chunk and aggregate by time efficiently.

Setting up the cluster

Using dask_cloudprovider, starting the cluster is one line of code. Note that I added some extra stuff to make it easy to find the logs on CloudWatch.

It takes 2-3 minutes to start up for me. You'll start owing Amazon a (small) bill once these resources start up, but note that they'll automatically get killed if you leave the cluster running idle for too long, reducing the risk to you of accidentally incurring large charges. (You should still check manually though and kill any resources left running.) The downside is that it's quite possible for the cluster to time out while you're trying to do interactive data analysis (or debug your code). You can pass the scheduler_timeout argument to mitigate this.

Note on docker

The "image" argument below is a reference to a docker image on Amazon's Elastic Container Registry (ECR). You'll need to have your notebook run from an environment with compatible dependency versions––the contents of a requirements.txt stating these are at at the bottom of this notebook.

To make the image, I copied the Dockerfile from daskdev/dask image and added my own dependencies. The documentation talks about providing $EXTRA_CONDA_PACKAGES to the default image, but this option doesn't appear to be possible using the FargateCluster. It should be possible with the EC2Cluster.

Cluster dashboard

The displayed link will show you where the workers are in completing the required computations. It also has a tab showing each node's use of resources, so you can make sure to request machines with the minimum possible resources to minimize your costs.

Running the job

We use the dask library to set up a "delayed" object with the instructions for calculating the desired percentile for every chunk. Calling compute() then launches the actual calculation.

Note on runtime and resources

The job will take about 5-6 minutes. It could theoretically be halved by using 96 machines instead of 48, without appreciably increasing the cost (which should be under 50 cents). But when I tried the cluster struggled to start. YMMV.

Closing the cluster

I've noticed that calling cluster.close() often produces errors and sometimes leaves the scheduler hanging, at least if I'm being impatient and starting/killing cluster after cluster in the same notebook. I often finish cleaning up resources manually using the ECS dashboard. (ECS = Elastic Container Service, the parent service for Fargate.)

Post-processing the data

Now that we have a small-enough dataset returned to a single machine, we can finish our analysis without using cloud resources.

We knit the chunk data into a single array, then get the latitude/longitude values so we can plot it.

requirements.txt

python==3.8.10
python-blosc==1.9.2
cytoolz==0.11.0
dask==2021.5.0
dask_cloudprovider==2021.6.0
lz4==3.1.3
nomkl
numpy==1.20.3
pandas==1.0.1
xarray==0.18.2
s3fs==2021.5.0
zarr==2.8.3
boto3==1.16.52
msgpack-python==1.0.2
matplotlib
cartopy
metpy