Nested-Dask Best Practices#
When to Use Nested-Dask vs Nested-Pandas#
Like Dask, Nested-Dask is focused towards working with large amounts of data. In particular, the threshold where this really will matter is when the amount of data exceeds the available memory of your machine/system and/or if parallel computing is needed. In such cases, Nested-Dask provides built-in tooling for working with these datasets and is recommended over using Nested-Pandas. These tools encompassing (but not limited to):
lazy computation: enabling construction of workflows with more control over when computation actually begins
partitioning: breaking data up into smaller partitions that can fit into memory, enabling work on each chunk while keeping the overall memory footprint smaller than the full dataset size
progress tracking: The Dask Dashboard can be used to track the progress of complex workflows, assess memory usage, find bottlenecks, etc.
parallel processing: Dask workers are able to work in parallel on the partitions of a dataset, both on a local machine and on a distributed cluster.
[1]:
from nested_dask.datasets import generate_data
# A lazily-represented dataset split into 5 partitions
generate_data(10, 100, npartitions=5)
[1]:
| a | b | nested | |
|---|---|---|---|
| npartitions=5 | |||
| 0 | float64 | float64 | nested<t: [double], flux: [double], band: [string]> |
| 1 | ... | ... | ... |
| ... | ... | ... | ... |
| 7 | ... | ... | ... |
| 9 | ... | ... | ... |
[2]:
# Setting up a Dask client, which would apply parallel processing
from dask.distributed import Client
client = Client()
client # provides a link to access the Dask Dashboard
[2]:
Client
Client-fe430057-cee9-11f0-8270-eeacff0d90f4
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
bac16eb7
| Dashboard: http://127.0.0.1:8787/status | Workers: 2 |
| Total threads: 2 | Total memory: 6.54 GiB |
| Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-952cb938-1bab-4fa4-a983-10a6d0a23a7e
| Comm: tcp://127.0.0.1:32813 | Workers: 0 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 0 |
| Started: Just now | Total memory: 0 B |
Workers
Worker: 0
| Comm: tcp://127.0.0.1:40473 | Total threads: 1 |
| Dashboard: http://127.0.0.1:39029/status | Memory: 3.27 GiB |
| Nanny: tcp://127.0.0.1:43801 | |
| Local directory: /tmp/dask-scratch-space/worker-cnlu559j | |
Worker: 1
| Comm: tcp://127.0.0.1:41839 | Total threads: 1 |
| Dashboard: http://127.0.0.1:38467/status | Memory: 3.27 GiB |
| Nanny: tcp://127.0.0.1:34583 | |
| Local directory: /tmp/dask-scratch-space/worker-w0ouxjbe | |
Avoiding Dask Inefficiency#
By contrast, when working with smaller datasets able to fit into memory it’s often better to work directly with Nested-Pandas. This is particularly relevant for workflows that start with large amounts of data and filter down to a small dataset and do not require computationally heavy processing of this small dataset. By the nature of lazy computation, these filtering operations are not automatically applied to the dataset, and therefore you’re still working effectively at scale. Let’s walk through an example where we load a “large” dataset, in this case it will fit into memory but let’s imagine that it is larger than memory.
[3]:
# generate a "large" lazy dataset
ndf = generate_data(1000, 1000, npartitions=10)
ndf
[3]:
| a | b | nested | |
|---|---|---|---|
| npartitions=10 | |||
| 0 | float64 | float64 | nested<t: [double], flux: [double], band: [string]> |
| 99 | ... | ... | ... |
| ... | ... | ... | ... |
| 899 | ... | ... | ... |
| 999 | ... | ... | ... |
Now let’s apply a query that will filter the dataset down to a very small subset.
[4]:
ndf = ndf.query("a > 0.99")
ndf.compute() # returns a handful of rows from the original 1000
/home/docs/checkouts/readthedocs.org/user_builds/nested-dask/envs/latest/lib/python3.10/site-packages/distributed/client.py:3374: UserWarning: Sending large graph of size 20.06 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
warnings.warn(
[4]:
| a | b | nested | |||||||
|---|---|---|---|---|---|---|---|---|---|
| 34 | 0.999593 | 1.530681 |
1000 rows × 3 columns |
||||||
| 41 | 0.993347 | 0.016259 |
1000 rows × 3 columns |
||||||
| 135 | 0.995518 | 0.885589 |
1000 rows × 3 columns |
||||||
| 204 | 0.997300 | 0.462908 |
1000 rows × 3 columns |
||||||
| 302 | 0.991429 | 1.402585 |
1000 rows × 3 columns |
||||||
| 361 | 0.991831 | 0.418476 |
1000 rows × 3 columns |
||||||
| 422 | 0.994259 | 0.512828 |
1000 rows × 3 columns |
||||||
| 563 | 0.998634 | 0.129835 |
1000 rows × 3 columns |
||||||
| 585 | 0.995518 | 0.535688 |
1000 rows × 3 columns |
||||||
| 593 | 0.990295 | 1.703818 |
1000 rows × 3 columns |
||||||
| 669 | 0.998624 | 0.827837 |
1000 rows × 3 columns |
||||||
| 694 | 0.992744 | 0.757566 |
1000 rows × 3 columns |
When compute() is called above, the Dask task graph is executed and the query is being run. However, the ndf object above is still a lazy Dask object meaning that any subsequent .compute()-like method (e.g. .head() or .to_parquet()) will still need to apply this query work all over again.
[5]:
import numpy as np
import pandas as pd
# The result will be a dataframe with a single column with float values
meta = pd.DataFrame(columns=[0], dtype=float)
# Apply a mean operation on the "nested.flux" column
mean_flux = ndf.reduce(np.mean, "nested.flux", meta=meta)
# Dask has to reapply the query over `ndf` here, then apply the mean operation
mean_flux.compute()
/home/docs/checkouts/readthedocs.org/user_builds/nested-dask/envs/latest/lib/python3.10/site-packages/distributed/client.py:3374: UserWarning: Sending large graph of size 40.12 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
warnings.warn(
[5]:
| 0 | |
|---|---|
| 34 | 51.836167 |
| 41 | 50.642766 |
| 135 | 49.743782 |
| 204 | 49.396559 |
| 302 | 49.238018 |
| 361 | 50.684524 |
| 422 | 48.975888 |
| 563 | 50.046228 |
| 585 | 49.448325 |
| 593 | 49.645546 |
| 669 | 49.739631 |
| 694 | 49.791944 |
12 rows × 1 columns
In this case, it’s better to work with the computed query in Nested-Pandas directly.
[6]:
import nested_pandas as npd
nf = ndf.compute() # The query is computed and the result is brought into memory
# The computed result is a Nested-Pandas NestedFrame
isinstance(nf, npd.NestedFrame)
/home/docs/checkouts/readthedocs.org/user_builds/nested-dask/envs/latest/lib/python3.10/site-packages/distributed/client.py:3374: UserWarning: Sending large graph of size 40.12 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
warnings.warn(
[6]:
True
[7]:
# Now we can apply the mean operation directly to the nested_pandas.NestedFrame
nf.reduce(np.mean, "nested.flux")
[7]:
| 0 | |
|---|---|
| 34 | 51.836167 |
| 41 | 50.642766 |
| 135 | 49.743782 |
| 204 | 49.396559 |
| 302 | 49.238018 |
| 361 | 50.684524 |
| 422 | 48.975888 |
| 563 | 50.046228 |
| 585 | 49.448325 |
| 593 | 49.645546 |
| 669 | 49.739631 |
| 694 | 49.791944 |
12 rows × 1 columns
Use Dask Divisions#
Dask “divisions” are an optional component of Dask, but are highly recommended for Nested-Dask work. When the dataset is sorted by the index, these divisions are ranges to show which index values reside in each partition. For example:
[8]:
# Divisions are in the left-most column
ndf = generate_data(15, 10, npartitions=5)
ndf
[8]:
| a | b | nested | |
|---|---|---|---|
| npartitions=5 | |||
| 0 | float64 | float64 | nested<t: [double], flux: [double], band: [string]> |
| 2 | ... | ... | ... |
| ... | ... | ... | ... |
| 11 | ... | ... | ... |
| 14 | ... | ... | ... |
[9]:
# Divisions show which index ranges reside in each partition
ndf.divisions
[9]:
(0, 2, 5, 8, 11, 14)
Divisions are particularly important to the speed and stability of table joins, which Dask-Nested uses heavily in it’s nesting scheme. To set divisions, there are two main options. The first is when loading from files on disc, there are kwargs that can be set to automatically set to calculate divisions (calculate_divisions=True in the case of read_parquet). Alternatively, you can calculate them as part of a set_index() call.
[10]:
# drop the index, no divisions set
ndf_no_index = ndf.reset_index()
ndf_no_index
[10]:
| index | a | b | nested | |
|---|---|---|---|---|
| npartitions=5 | ||||
| int64 | float64 | float64 | nested<t: [double], flux: [double], band: [string]> | |
| ... | ... | ... | ... | |
| ... | ... | ... | ... | ... |
| ... | ... | ... | ... | |
| ... | ... | ... | ... |
[11]:
# use sorted=True to indicate divisions should be set
# alternatively use sort=True if the chosen index is not sorted
ndf_no_index.set_index("index", sorted=True)
[11]:
| a | b | nested | |
|---|---|---|---|
| npartitions=5 | |||
| 0 | float64 | float64 | nested<t: [double], flux: [double], band: [string]> |
| 2 | ... | ... | ... |
| ... | ... | ... | ... |
| 11 | ... | ... | ... |
| 14 | ... | ... | ... |