In [None]:
#--------------------------------------------------
# IMPORTANT NOTE:
# We will use the cper environment in this section,
# make sure you have activated it. 
# Open jupyter notebook under cper NOT the base.
#--------------------------------------------------

# Load modules
import numpy as np
import xarray as xr
import dask.array as da
from matplotlib import pyplot as plt
%matplotlib inline
import graphviz
from dask.distributed import Client
from dask import delayed
from time import sleep

In [None]:
# Create a numpy array
shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np

In [None]:
# Check the size of array in MB
ones_np.nbytes / 1e6

In [None]:
# Create a dask array
ones_da = da.ones(shape)
ones_da

In [None]:
# Specify the chunk
chunk_shape = (1000, 1000)
ones_da = da.ones(shape, chunks=chunk_shape)
ones_da

In [None]:
# Run computation
ones_da.compute()

In [None]:
# Visualize the computation
ones_da.visualize()

In [None]:
# Reduce the array with sum()
sum_of_ones = ones_da.sum()

# Visualize the computation
sum_of_ones.visualize()

In [None]:
# A much bigger array
bigshape = (200000, 4000)

# Define the array (lazy method)
big_ones = da.ones(bigshape, chunks=chunk_shape)

# Run computation
big_ones

In [None]:
# Check the size of array in MB
big_ones.nbytes / 1e6

In [None]:
# Import ProgressBar
from dask.diagnostics import ProgressBar

# Define the computation (lazy method)
calc = (np.cos(big_ones)**2).mean()

# Show ProgressBar
with ProgressBar():
    # Run computation
    result = calc.compute()

# Show result
result

In [None]:
# Define the computation (lazy method)
calc2 = (np.exp(big_ones)**10).mean(axis=0)
calc2

In [None]:
# Run computation and plot
plt.plot(calc2)

In [None]:
# Use the distributed scheduler to form a client (local cluster)
# 4 workers, 1 thread (CPU) per worker
my_client = Client(n_workers=4, threads_per_worker=1)

# Show information of the local client
my_client.cluster

In [None]:
# Define two functions
def fun1(x):
    sleep(1)
    return x + 1

def fun2(x, y):
    sleep(1)
    return x + y

In [None]:
%%time

# This takes three seconds to run because we call each
# function sequentially, one after the other

# Call fun1
x = fun1(1)

# Call fun1
y = fun1(2)

# Call fun2
z = fun2(x, y)

In [None]:
%%time

# This runs immediately, all it does is build a graph
x = delayed(fun1)(1)
y = delayed(fun1)(2)
z = delayed(fun2)(x, y)

In [None]:
%%time

# This actually runs our computation using a local cluster
z.compute()

In [None]:
# Look at the task graph for z
z.visualize()

In [None]:
# Make a simple list
data = [1, 2, 3, 4, 5, 6, 7]

In [None]:
%%time

# Sequential code
results = []

# Loop element one by one
for i in data:
    temp = fun1(i)
    results.append(temp)

# Compute
total = sum(results)

# After it's computed
print("After computing :", total)  

In [None]:
%%time

# Parallel code 
results = []

for i in data:
    temp = delayed(fun1)(i)
    results.append(temp)

# Define the method
total = delayed(sum)(results)

# Let's see what type of thing total is
print("Before computing:", total)

# Compute
result = total.compute()

# After it's computed
print("After computing :", result)  

In [None]:
# Look at the task graph for total
total.visualize()

In [None]:
# Load the first file with xarray
ds_first = xr.open_dataset('aviso_2015/dt_global_allsat_madt_h_20150101_20150914.nc')

# Check the data
ds_first

In [None]:
# Use open_mfdataset to load all the nc files
ds = xr.open_mfdataset('aviso_2015/*.nc')

# Check data object
# Notice that the values are not displayed
ds

In [None]:
# Get sea surface height (adt)
ssh = ds.adt

# Check the data, this is a dask array
ssh

In [None]:
# Compute annual mean ssh
ssh_2015_mean = ssh.mean(dim='time')

In [None]:
# Compute annual mean ssh
ssh_2015_mean.load()

# Plot annual mean
ssh_2015_mean.plot()

In [None]:
# Close the client (local cluster)
my_client.close()