Nested-Dask Best Practices#

When to Use Nested-Dask vs Nested-Pandas#

Like Dask, Nested-Dask is focused towards working with large amounts of data. In particular, the threshold where this really will matter is when the amount of data exceeds the available memory of your machine/system and/or if parallel computing is needed. In such cases, Nested-Dask provides built-in tooling for working with these datasets and is recommended over using Nested-Pandas. These tools encompassing (but not limited to):

  • lazy computation: enabling construction of workflows with more control over when computation actually begins

  • partitioning: breaking data up into smaller partitions that can fit into memory, enabling work on each chunk while keeping the overall memory footprint smaller than the full dataset size

  • progress tracking: The Dask Dashboard can be used to track the progress of complex workflows, assess memory usage, find bottlenecks, etc.

  • parallel processing: Dask workers are able to work in parallel on the partitions of a dataset, both on a local machine and on a distributed cluster.

[1]:
from nested_dask.datasets import generate_data

# A lazily-represented dataset split into 5 partitions
generate_data(10, 100, npartitions=5)
[1]:
Nested-Dask NestedFrame Structure:
a b nested
npartitions=5
0 float64 float64 nested<t: [double], flux: [double], band: [string]>
1 ... ... ...
... ... ... ...
7 ... ... ...
9 ... ... ...
Dask Name: repartition, 3 expressions
[2]:
# Setting up a Dask client, which would apply parallel processing
from dask.distributed import Client

client = Client()
client  # provides a link to access the Dask Dashboard
[2]:

Client

Client-fe430057-cee9-11f0-8270-eeacff0d90f4

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

Avoiding Dask Inefficiency#

By contrast, when working with smaller datasets able to fit into memory it’s often better to work directly with Nested-Pandas. This is particularly relevant for workflows that start with large amounts of data and filter down to a small dataset and do not require computationally heavy processing of this small dataset. By the nature of lazy computation, these filtering operations are not automatically applied to the dataset, and therefore you’re still working effectively at scale. Let’s walk through an example where we load a “large” dataset, in this case it will fit into memory but let’s imagine that it is larger than memory.

[3]:
# generate a "large" lazy dataset
ndf = generate_data(1000, 1000, npartitions=10)
ndf
[3]:
Nested-Dask NestedFrame Structure:
a b nested
npartitions=10
0 float64 float64 nested<t: [double], flux: [double], band: [string]>
99 ... ... ...
... ... ... ...
899 ... ... ...
999 ... ... ...
Dask Name: repartition, 3 expressions

Now let’s apply a query that will filter the dataset down to a very small subset.

[4]:
ndf = ndf.query("a > 0.99")
ndf.compute()  # returns a handful of rows from the original 1000
/home/docs/checkouts/readthedocs.org/user_builds/nested-dask/envs/latest/lib/python3.10/site-packages/distributed/client.py:3374: UserWarning: Sending large graph of size 20.06 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
  warnings.warn(
[4]:
  a b nested
34 0.999593 1.530681
t flux band
19.372983 66.634927 g

1000 rows × 3 columns

41 0.993347 0.016259
5.400583 70.117877 g

1000 rows × 3 columns

135 0.995518 0.885589
15.844226 57.982271 r

1000 rows × 3 columns

204 0.997300 0.462908
2.264813 13.728267 g

1000 rows × 3 columns

302 0.991429 1.402585
9.325181 99.983757 r

1000 rows × 3 columns

361 0.991831 0.418476
9.794402 13.857785 g

1000 rows × 3 columns

422 0.994259 0.512828
1.854611 27.843834 g

1000 rows × 3 columns

563 0.998634 0.129835
12.309507 76.665469 g

1000 rows × 3 columns

585 0.995518 0.535688
5.711755 25.210081 g

1000 rows × 3 columns

593 0.990295 1.703818
4.687916 13.568155 g

1000 rows × 3 columns

669 0.998624 0.827837
16.562802 90.276603 g

1000 rows × 3 columns

694 0.992744 0.757566
3.812167 9.251992 r

1000 rows × 3 columns

12 rows x 3 columns

When compute() is called above, the Dask task graph is executed and the query is being run. However, the ndf object above is still a lazy Dask object meaning that any subsequent .compute()-like method (e.g. .head() or .to_parquet()) will still need to apply this query work all over again.

[5]:
import numpy as np
import pandas as pd

# The result will be a dataframe with a single column with float values
meta = pd.DataFrame(columns=[0], dtype=float)

# Apply a mean operation on the "nested.flux" column
mean_flux = ndf.reduce(np.mean, "nested.flux", meta=meta)

# Dask has to reapply the query over `ndf` here, then apply the mean operation
mean_flux.compute()
/home/docs/checkouts/readthedocs.org/user_builds/nested-dask/envs/latest/lib/python3.10/site-packages/distributed/client.py:3374: UserWarning: Sending large graph of size 40.12 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
  warnings.warn(
[5]:
0
34 51.836167
41 50.642766
135 49.743782
204 49.396559
302 49.238018
361 50.684524
422 48.975888
563 50.046228
585 49.448325
593 49.645546
669 49.739631
694 49.791944

12 rows × 1 columns

In this case, it’s better to work with the computed query in Nested-Pandas directly.

[6]:
import nested_pandas as npd

nf = ndf.compute()  # The query is computed and the result is brought into memory

# The computed result is a Nested-Pandas NestedFrame
isinstance(nf, npd.NestedFrame)
/home/docs/checkouts/readthedocs.org/user_builds/nested-dask/envs/latest/lib/python3.10/site-packages/distributed/client.py:3374: UserWarning: Sending large graph of size 40.12 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
  warnings.warn(
[6]:
True
[7]:
# Now we can apply the mean operation directly to the nested_pandas.NestedFrame
nf.reduce(np.mean, "nested.flux")
[7]:
0
34 51.836167
41 50.642766
135 49.743782
204 49.396559
302 49.238018
361 50.684524
422 48.975888
563 50.046228
585 49.448325
593 49.645546
669 49.739631
694 49.791944

12 rows × 1 columns

Use Dask Divisions#

Dask “divisions” are an optional component of Dask, but are highly recommended for Nested-Dask work. When the dataset is sorted by the index, these divisions are ranges to show which index values reside in each partition. For example:

[8]:
# Divisions are in the left-most column
ndf = generate_data(15, 10, npartitions=5)
ndf
[8]:
Nested-Dask NestedFrame Structure:
a b nested
npartitions=5
0 float64 float64 nested<t: [double], flux: [double], band: [string]>
2 ... ... ...
... ... ... ...
11 ... ... ...
14 ... ... ...
Dask Name: repartition, 3 expressions
[9]:
# Divisions show which index ranges reside in each partition
ndf.divisions
[9]:
(0, 2, 5, 8, 11, 14)

Divisions are particularly important to the speed and stability of table joins, which Dask-Nested uses heavily in it’s nesting scheme. To set divisions, there are two main options. The first is when loading from files on disc, there are kwargs that can be set to automatically set to calculate divisions (calculate_divisions=True in the case of read_parquet). Alternatively, you can calculate them as part of a set_index() call.

[10]:
# drop the index, no divisions set
ndf_no_index = ndf.reset_index()
ndf_no_index
[10]:
Nested-Dask NestedFrame Structure:
index a b nested
npartitions=5
int64 float64 float64 nested<t: [double], flux: [double], band: [string]>
... ... ... ...
... ... ... ... ...
... ... ... ...
... ... ... ...
Dask Name: reset_index, 4 expressions
[11]:
# use sorted=True to indicate divisions should be set
# alternatively use sort=True if the chosen index is not sorted
ndf_no_index.set_index("index", sorted=True)
[11]:
Nested-Dask NestedFrame Structure:
a b nested
npartitions=5
0 float64 float64 nested<t: [double], flux: [double], band: [string]>
2 ... ... ...
... ... ... ...
11 ... ... ...
14 ... ... ...
Dask Name: resolveoverlappingdivisions, 6 expressions