“Data is the new science. Big Data holds the answers.” - Pat Gelsinger


Prerequisites

In this section, we will use daily data from the AVISO sea-surface height satellite altimetry dataset. Download the file (aviso_2015.zip, ~ 390 MB) from here, unzip it, you will have a folder (aviso_2015) containing 365 netcdf (nc) files. Move the folder to your working directory.

The Jupyter Notenook for this section is available at here.

Important:

In this section, we will use the cper environment you created in Section 11. Make sure you have it activated, and open jupter notebook under it, not the base environment.


Dask for parallel computing

In past lectures, we learned how to use numpy, pandas, and xarray to analyze various types of data. In this section, we address an increasingly common problem: what happens if the data we wish to analyze is “big data”.

A good threshold for when data becomes difficult to deal with is when the volume of data exceeds your computer’s RAM (Random-access memory). Most modern laptops have between 2 GB and 16 GB of RAM. High-end workstations and servers can have 1 TB or RAM or more. If the dataset you are trying to analyze can’t fit in your computer’s memory, some special care is required to carry out the analysis.

The next threshold of difficulty is when the data can’t fit on your hard drive. Most modern laptops have between 100 GB and 4 TB of storage space on the hard drive. If you can’t fit your dataset on your internal hard drive, you can buy an external hard drive. However, at that point you are better off using a high-end server, HPC system, or cloud-based storage for your dataset. Once you have many TB of data to analyze, you are definitely in the realm of “big data”.

Introduction to dask

Dask is a tool that helps us easily extend our familiar Python data analysis tools to medium and big data, i.e. dataset that can’t fit in our computer’s RAM. In many cases, dask also allows us to speed up our analysis by using multiple CPU cores. Dask can help us work more efficiently on our laptop, and it can also help us scale up our analysis on HPC and cloud platforms. Most importantly, dask is almost invisible to the user, meaning that you can focus on your science, rather than the details of parallel computing.

Dask provides collections for big data and a scheduler for parallel computing. In this section, we will learn the basics of using dask through examples. For more about dask, check The Dask Documentation and The Dask Github Site. For other modules enabling parallel computing, check Parallel Processing and Multiprocessing in Python.

Dask arrays

A dask array looks and feels a lot like a numpy array. However, a dask array doesn’t directly hold any data. Instead, it symbolically represents the computations needed to generate the data. Nothing is actually computed until the actual numerical values are needed. This mode of operation is called lazy; it allows one to build up complex, large calculations symbolically before turning them over the scheduler for execution.

If we want to create a numpy array of all ones, we do it like this:

# Create a numpy array
shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np

This array contains exactly 32 MB of data:

# Check the size of array in MB
ones_np.nbytes / 1e6

Now let’s create the same array using dask’s array interface:

# Create a dask array
ones_da = da.ones(shape)
ones_da

As you see, dask create the array in one chunk. In fact, a crucial advantage with dask is that we can specify the chunks argument. Here, chunks describes how the array is split up over many sub-arrays. There are several ways to specify chunks. In this section, we will use a block shape:

# Specify the chunk
chunk_shape = (1000, 1000)
ones_da = da.ones(shape, chunks=chunk_shape)
ones_da

Notice that we just see a symbolic representation of the array, including its shape, dtype, and chunksize. No data has been generated yet. When we call .compute() on a dask array, the computation is trigger and the dask array becomes a numpy array.

# Run computation
ones_da.compute()

In order to understand what happened when we called .compute(), you can visualize the dask graph, the symbolic operations that make up the array:

# Visualize the computation
ones_da.visualize()

As you find, the array has 4 chunks. To generate it, dask calls np.ones four times and then concatenates this together into one array.

Rather than immediately loading a dask array (which puts all the data into RAM), it is more common to want to reduce the data somehow. For example:

# Reduce the array with sum()
sum_of_ones = ones_da.sum()

# Visualize the computation
sum_of_ones.visualize()

Here we see dask’s strategy for finding the sum. This simple example illustrates the beauty of dask: it automatically designs an algorithm appropriate for custom operations with big data.

A bigger calculation

The previous example are too simple - the data (32 MB) is nowhere nearly big enough to warrant the use of dask. Let’s make it a lot bigger:

# A much bigger array
bigshape = (200000, 4000)

# Define the array (lazy method)
big_ones = da.ones(bigshape, chunks=chunk_shape)

# Run computation
big_ones

# Check the size of array in MB
big_ones.nbytes / 1e6

This dataset is 6.4 GB! This is probably close to or greater than the amount of available RAM than you have in your computer. Nevertheless, dask has no problem working on it. When doing a big calculation, dask also has some tools to help us understand what is happening.

# Import ProgressBar
from dask.diagnostics import ProgressBar

# Define the computation (lazy method)
calc = (np.cos(big_ones)**2).mean()

# Show ProgressBar
with ProgressBar():
    # Run computation
    result = calc.compute()

# Show result
result

In fact, all the usual numpy methods work on dask arrays. You can also apply numpy function directly to a dask array, and it will stay “lazy” until the computation starts.

# Define the computation (lazy method)
calc2 = (np.exp(big_ones)**10).mean(axis=0)
calc2

Plotting also triggers computation, since we need the actual values.

# Run computation and plot
plt.plot(calc2)

Making a local client

Dask has several ways of executing code in parallel. We’ll use the distributed scheduler by creating a dask.distributed.Client.

For now, this will provide us with some nice diagnostics. Let’s make a local client (my_client) with 4 workers (1 thread per worker):

# Use the distributed scheduler to form a local client (cluster)
# 4 workers, 1 thread (CPU) per worker
my_client = Client(n_workers=4, threads_per_worker=1)

# Show information of the local client
my_client.cluster

Click the the Dashboard link, get yourself familiarized with it.

If you no longer use a client, make sure to close your client or stop this kernel:

my_client.close()

Parallelizing code with dask.delayed()

In this sub-section we parallelize some simple code with dask and dask.delayed. Often, this is the only function that you will need to convert functions for use with dask. This is a simple way to use dask to parallelize existing codebases or build complex systems.

First let’s make some toy functions, fun1 and fun2, that sleep for a while to simulate work. We’ll then time running these functions normally.

# Define two functions
def fun1(x):
    sleep(1)
    return x + 1

def fun2(x, y):
    sleep(1)
    return x + y

We then time the execution of this normal code using the %%time magic, which is a special function of the Jupyter Notebook.

%%time

# This takes three seconds to run because we call each
# function sequentially, one after the other

# Call fun1
x = fun1(1)

# Call fun1
y = fun1(2)

# Call fun2
z = fun2(x, y)

Those two increment calls could be called in parallel, because they are totally independent of one-another.

We’ll transform the fun1 and fun2 functions using the dask.delayed() function. When we call the delayed version by passing the arguments, exactly as before, the original function isn’t actually called yet - which is why the cell execution finishes very quickly. Instead, a delayed object is made, which keeps track of the function to call and the arguments to pass to it.

%%time

# This runs immediately, all it does is build a graph
x = delayed(fun1)(1)
y = delayed(fun1)(2)
z = delayed(fun2)(x, y)

This ran immediately, since nothing has really happened yet. To get the result, call compute. Notice that this runs faster than the original code.

%%time

# This actually runs our computation using a local cluster
z.compute()

Here, the z object is a lazy Delayed object, which holds everything we need to compute the final result, including references to all of the functions that are required and their inputs and relationship to one-another. We can evaluate the result with .compute() as above or we can visualize the task graph for this value with .visualize(). As you can see, such a lazy execution allows building up complex, large calculations symbolically before turning them over to the scheduler for execution. For more, check dask delayed execution.

# Look at the task graph for z
z.visualize()

Notice that this includes the names of the functions from before, and the logical flow of the outputs of the fun1 functions to the inputs of fun2.

Parallelizing a for loop with dask.delayed()

for loops are one of the most common things that we want to parallelize. Use dask.delayedon fun1 and sum to parallelize the computation below:

# Make a simple list
data = [1, 2, 3, 4]
%%time

# Sequential code
results = []

# Loop element one by one
for i in data:
    temp = fun1(i)
    results.append(temp)

# Compute
total = sum(results)

# After it's computed
print("After computing :", total)  

A parallel version with dask.delayed() would be:

%%time

# Parallel code 
results = []

for i in data:
    temp = delayed(fun1)(i)
    results.append(temp)

# Define the method
total = delayed(sum)(results)

# Let's see what type of thing total is
print("Before computing:", total)

# Compute
result = total.compute()

# After it's computed
print("After computing :", result) 

Visualize the task graph for this:

# Look at the task graph for total
total.visualize()

Now enlarge data to a larger list, e.g, [1, 2, 3, 4, 5, 6, 7, 8]. Check what would happen to the running time of total? Can you figure out why?

Combing dask and xarray

Xarray can automatically wrap its data in dask arrays. This capability turns xarray into an extremely powerful tool for Big Data earth science.

To illustrate this in action, we will use a fairly large dataset to analyze. This file contains 1 year of daily data from the AVISO sea-surface height satellite altimetry dataset.

Let’s load the first file as a regular xarray dataset:

# Load the first file with xarray
ds_first = xr.open_dataset('aviso_2015/dt_global_allsat_madt_h_20150101_20150914.nc')

# Check the data
ds_first

This one file is about 8 MB. So 365 of them will be nearly 3 GB. If we had downloaded all 25 years of data, it would be 73 GB!. This is a good example of “medium data”.

With open_mfdataset, we can easily open all the netcdf files into one Dataset object.

# Use open_mfdataset to load all the nc files
ds = xr.open_mfdataset('aviso_2015/*.nc')

# Check data object
# Notice that the values are not displayed
ds

Note that the values are not displayed, since that would trigger computation.

# Get sea surface height (adt)
ssh = ds.adt

# Check the data, this is a dask array
ssh

Try the following code, see what happens:

# Compute annual mean ssh
ssh_2015_mean = ssh.mean(dim='time')

To actually compute the data:

# Compute annual mean ssh
ssh_2015_mean.load()

# Plot annual mean
ssh_2015_mean.plot()

Finally, close the client (local cluster):

# Close the client (local cluster)
my_client.close()

Python next?

So far we have covered the basics of Python and fundamental modules to make plots and analyze data. Looking forward, the learning curve is still steep, be patient as it takes time. Use Google and stackoverflow without hesitation, as you will find 9 out 10 times your questions/issues have been reported and solved (luckily) already.

Have fun!


The notes are modified from the excellent Dask for Parallel Computing and Big Data, available freely online.


In-class exercises

Exercise #1

Go through the notes, make sure you understand the scripts.

Exercise #2

Make a local client with 6 CPUs (only if your laptop allows!), then compute and plot the variance in daily ssh anomalies with dask.