dask
“Data is the new science. Big Data holds the answers.” - Pat Gelsinger
In this section, we will use daily data from the AVISO sea-surface
height satellite altimetry dataset. Download the file
(aviso_2015.zip
, ~ 390 MB
) from here,
unzip it, you will have a folder (aviso_2015
) containing
365
netcdf (nc) files. Move the folder to your working
directory.
The Jupyter Notenook
for this section is available at here.
Important:
In this section, we will use the cper
environment you
created in Section
11. Make sure you have it activated, and open
jupter notebook
under it, not the base
environment.
Dask
for parallel computingIn past lectures, we learned how to use numpy
,
pandas
, and xarray
to analyze various types of
data. In this section, we address an increasingly common problem: what
happens if the data we wish to analyze is “big data”.
A good threshold for when data becomes difficult to deal with is when
the volume of data exceeds your computer’s RAM (Random-access memory).
Most modern laptops have between 2 GB
and
16 GB
of RAM. High-end workstations and servers can have
1 TB
or RAM or more. If the dataset you are trying to
analyze can’t fit in your computer’s memory, some special care is
required to carry out the analysis.
The next threshold of difficulty is when the data can’t fit on your
hard drive. Most modern laptops have between 100 GB
and
4 TB
of storage space on the hard drive. If you can’t fit
your dataset on your internal hard drive, you can buy an external hard
drive. However, at that point you are better off using a high-end
server, HPC system, or cloud-based storage for your dataset. Once you
have many TB
of data to analyze, you are definitely in the
realm of “big data”.
dask
Dask
is a tool that helps us easily extend our familiar
Python
data analysis tools to medium and big data,
i.e. dataset that can’t fit in our computer’s RAM. In many
cases, dask
also allows us to speed up our analysis by
using multiple CPU cores. Dask
can help us work more
efficiently on our laptop, and it can also help us scale
up our analysis on HPC and cloud platforms. Most importantly,
dask
is almost invisible to the user, meaning that you can
focus on your science, rather than the details of parallel
computing.
Dask
provides collections for big data and a scheduler
for parallel computing. In this section, we will learn the basics of
using dask
through examples. For more about
dask
, check The
Dask Documentation and The
Dask Github Site. For other modules enabling parallel computing,
check Parallel
Processing and Multiprocessing in Python.
Dask
arraysA dask
array looks and feels a lot like a
numpy
array. However, a dask
array doesn’t
directly hold any data. Instead, it symbolically
represents the computations needed to generate the data. Nothing is
actually computed until the actual numerical values are needed. This
mode of operation is called lazy; it allows one to build up
complex, large calculations symbolically before turning them over the
scheduler for execution.
If we want to create a numpy array of all ones, we do it like this:
This array contains exactly 32 MB
of data:
Now let’s create the same array using dask
’s array
interface:
As you see, dask
create the array in one chunk.
In fact, a crucial advantage with dask
is that we can
specify the chunks
argument. Here, chunks
describes how the array is split up over many sub-arrays. There are several
ways to specify chunks. In this section, we will use a block
shape:
Notice that we just see a symbolic representation of the array,
including its shape
, dtype
, and
chunksize
. No data has been generated yet. When we call
.compute()
on a dask
array, the computation is
trigger and the dask
array becomes a numpy
array.
In order to understand what happened when we called
.compute()
, you can visualize the dask
graph, the symbolic operations that make up the array:
As you find, the array has 4
chunks. To generate it,
dask
calls np.ones
four times and then
concatenates this together into one array.
Rather than immediately loading a dask
array (which puts
all the data into RAM), it is more common to want to reduce the data
somehow. For example:
# Reduce the array with sum()
sum_of_ones = ones_da.sum()
# Visualize the computation
sum_of_ones.visualize()
Here we see dask
’s strategy for finding the
sum
. This simple example illustrates the beauty of
dask
: it automatically designs an
algorithm appropriate for custom operations with big data.
The previous example are too simple - the data (32 MB
)
is nowhere nearly big enough to warrant the use of dask
.
Let’s make it a lot bigger:
# A much bigger array
bigshape = (200000, 4000)
# Define the array (lazy method)
big_ones = da.ones(bigshape, chunks=chunk_shape)
# Run computation
big_ones
# Check the size of array in MB
big_ones.nbytes / 1e6
This dataset is 6.4 GB
! This is probably close to or
greater than the amount of available RAM than you have in your computer.
Nevertheless, dask
has no problem working on it. When doing
a big calculation, dask
also has some tools to help us
understand what is happening.
# Import ProgressBar
from dask.diagnostics import ProgressBar
# Define the computation (lazy method)
calc = (np.cos(big_ones)**2).mean()
# Show ProgressBar
with ProgressBar():
# Run computation
result = calc.compute()
# Show result
result
In fact, all the usual numpy
methods work on
dask
arrays. You can also apply numpy
function
directly to a dask
array, and it will stay “lazy” until the
computation starts.
Plotting also triggers computation, since we need the actual values.
Dask
has several ways of executing code in parallel.
We’ll use the distributed scheduler
by creating a dask.distributed.Client
.
For now, this will provide us with some nice diagnostics. Let’s make
a local client (my_client
) with 4
workers
(1
thread per worker):
# Use the distributed scheduler to form a local client (cluster)
# 4 workers, 1 thread (CPU) per worker
my_client = Client(n_workers=4, threads_per_worker=1)
# Show information of the local client
my_client.cluster
Click the the Dashboard link, get yourself familiarized with it.
If you no longer use a client, make sure to close your client or stop this kernel:
dask.delayed()
In this sub-section we parallelize some simple code with
dask
and dask.delayed
.
Often, this is the only function that you will need to convert functions
for use with dask
. This is a simple way to use
dask
to parallelize existing codebases or build complex
systems.
First let’s make some toy functions, fun1
and
fun2
, that sleep for a while to simulate work. We’ll then
time running these functions normally.
We then time the execution of this normal code using the
%%time
magic, which is a special function of the
Jupyter Notebook
.
%%time
# This takes three seconds to run because we call each
# function sequentially, one after the other
# Call fun1
x = fun1(1)
# Call fun1
y = fun1(2)
# Call fun2
z = fun2(x, y)
Those two increment calls could be called in parallel, because they are totally independent of one-another.
We’ll transform the fun1
and fun2
functions
using the dask.delayed()
function. When we call the
delayed version by passing the arguments, exactly as
before, the original function isn’t actually called yet - which is why
the cell execution finishes very quickly. Instead, a delayed object is
made, which keeps track of the function to call and the arguments to
pass to it.
%%time
# This runs immediately, all it does is build a graph
x = delayed(fun1)(1)
y = delayed(fun1)(2)
z = delayed(fun2)(x, y)
This ran immediately, since nothing has really happened yet. To get
the result, call compute
. Notice that this runs faster than
the original code.
Here, the z
object is a lazy
Delayed
object, which holds everything we need to compute
the final result, including references to all of the functions that are
required and their inputs and relationship to one-another. We can
evaluate the result with .compute()
as above or we can
visualize the task graph for this value with .visualize()
.
As you can see, such a lazy execution allows building up complex, large
calculations symbolically before turning them over to the scheduler for
execution. For more, check dask
delayed execution.
Notice that this includes the names of the functions from before, and
the logical flow of the outputs of the fun1
functions to
the inputs of fun2
.
for
loop with
dask.delayed()
for
loops are one of the most common things that we want
to parallelize. Use dask.delayed
on fun1
and
sum
to parallelize the computation below:
%%time
# Sequential code
results = []
# Loop element one by one
for i in data:
temp = fun1(i)
results.append(temp)
# Compute
total = sum(results)
# After it's computed
print("After computing :", total)
A parallel version with dask.delayed()
would be:
%%time
# Parallel code
results = []
for i in data:
temp = delayed(fun1)(i)
results.append(temp)
# Define the method
total = delayed(sum)(results)
# Let's see what type of thing total is
print("Before computing:", total)
# Compute
result = total.compute()
# After it's computed
print("After computing :", result)
Visualize the task graph for this:
Now enlarge data
to a larger list, e.g,
[1, 2, 3, 4, 5, 6, 7, 8]
. Check what would happen to the
running time of total
? Can you figure out why?
dask
and xarray
Xarray
can automatically wrap its data in dask
arrays. This capability turns xarray
into an extremely
powerful tool for Big Data earth science.
To illustrate this in action, we will use a fairly large dataset to
analyze. This file contains 1
year of daily data from the
AVISO
sea-surface height satellite altimetry dataset.
Let’s load the first file as a regular xarray
dataset:
# Load the first file with xarray
ds_first = xr.open_dataset('aviso_2015/dt_global_allsat_madt_h_20150101_20150914.nc')
# Check the data
ds_first
This one file is about 8 MB
. So 365
of them
will be nearly 3 GB
. If we had downloaded all
25
years of data, it would be 73 GB
!. This is
a good example of “medium data”.
With open_mfdataset
,
we can easily open all the netcdf files into one Dataset
object.
# Use open_mfdataset to load all the nc files
ds = xr.open_mfdataset('aviso_2015/*.nc')
# Check data object
# Notice that the values are not displayed
ds
Note that the values are not displayed, since that would trigger computation.
Try the following code, see what happens:
To actually compute the data:
Finally, close the client (local cluster):
Python
next?So far we have covered the basics of Python
and
fundamental modules to make plots and analyze data. Looking forward, the
learning curve is still steep, be patient as it takes time. Use Google
and stackoverflow without
hesitation, as you will find 9 out 10 times your questions/issues have
been reported and solved (luckily) already.
Have fun!
The notes are modified from the excellent Dask for Parallel Computing and Big Data, available freely online.
Go through the notes, make sure you understand the scripts.
Make a local client with 6
CPUs (only if your laptop
allows!), then compute and plot the variance in daily ssh
anomalies with dask
.