Loading Data into Nested-Dask#

[1]:
import os
import tempfile

import nested_dask as nd
import nested_pandas as npd
from nested_dask import read_parquet
from nested_dask.datasets import generate_parquet_file

From Nested-Pandas#

Nested-Dask can load data from Nested-Pandas NestedFrame objects by using the from_pandas class function.

[2]:
# Create a Nested-Pandas NestedFrame
nf = npd.NestedFrame(data={"a": [1, 2, 3], "b": [2, 4, 6]}, index=[0, 1, 2])

nested = npd.NestedFrame(
    data={"c": [0, 2, 4, 1, 4, 3, 1, 4, 1], "d": [5, 4, 7, 5, 3, 1, 9, 3, 4]},
    index=[0, 0, 0, 1, 1, 1, 2, 2, 2],
)

nf = nf.add_nested(nested, "nested")

# Convert to Nested-Dask NestedFrame
nf = nd.NestedFrame.from_pandas(nf)
nf
[2]:
Nested-Dask NestedFrame Structure:
a b nested
npartitions=1
0 int64 int64 nested<c: [int64], d: [int64]>
2 ... ... ...
Dask Name: nestedframe, 2 expressions

Loading from Parquet#

For larger datasets, we support loading data from parquet files.

In the following cell, we generate a series of temporary parquet files with random data, and ingest them with the read_parquet method.

Then we use read_parquet to read each layer’s parquet files into their own NestedFrame. Then we again use add_nested to pack these into a single NestedFrame, nf.

Note that for each layer of our NestedFrame we expect a directory of parquet files where each file will be its own Dask partition.

[3]:
nf = None

# Note: that we use the `tempfile` module to create and then cleanup a temporary directory.
# You can of course remove this and use your own directory and real files on your system.
with tempfile.TemporaryDirectory() as temp_path:
    # Generates parquet files with random data within our temporary directory.
    generate_parquet_file(
        10,  # The number of rows to generated in the base layer
        {
            "nested1": 100,  # Generate a nested layer named 'nested1' with 100 rows.
            "nested2": 10,
        },  # Generate a nested layer nameed 'nested2' with 10 rows.
        temp_path,  # The root temporary directory to store our generated parquet files.
        npartitions=5,  # The number of Dask partitions for each layer.
        file_per_layer=True,  # Generates a unique directory of parquet files for each layer
    )

    # Note that each layer of our NestedFrame will be in its own directory,
    # with a parquet file for each Dask partition.
    parquet_dirs = [
        os.path.join(temp_path, "base"),
        os.path.join(temp_path, "nested1"),
        os.path.join(temp_path, "nested2"),
    ]
    for path in parquet_dirs:
        print(f"Directory {path} has the following parquet files {os.listdir(path)}.")

    # Create a single NestedFrame for our base layer from the directory containing the parquet files
    # for each of its partitions.
    nf = read_parquet(path=os.path.join(temp_path, "base"))

    # Read the nested layers from their respective directories.
    nested1 = read_parquet(os.path.join(temp_path, "nested1"))
    nested1 = nested1.persist()
    nested2 = read_parquet(os.path.join(temp_path, "nested2"))
    nested2 = nested2.persist()

    # Add the nested layers to the NestedFrame.
    nf = nf.add_nested(nested1, "nested1")
    nf = nf.add_nested(nested2, "nested2")

    # Here we have Dask 'persist' the data in memory now so that we don't have to read it from
    # the source parquet files again (as it may try to do due to lazy evaluation).
    # This is particularly useful since it forces Dask to read the data
    # from the temporary parquet files before they are deleted rather than
    nf = nf.persist()

nf
Directory /tmp/tmpber1nkgn/base has the following parquet files ['part.1.parquet', 'part.3.parquet', 'part.0.parquet', 'part.2.parquet', 'part.4.parquet'].
Directory /tmp/tmpber1nkgn/nested1 has the following parquet files ['part.1.parquet', 'part.3.parquet', 'part.0.parquet', 'part.2.parquet', 'part.4.parquet'].
Directory /tmp/tmpber1nkgn/nested2 has the following parquet files ['part.1.parquet', 'part.3.parquet', 'part.0.parquet', 'part.2.parquet', 'part.4.parquet'].
[3]:
Nested-Dask NestedFrame Structure:
a b nested1 nested2
npartitions=5
float64 float64 nested<t: [double], flux: [double], band: [large_string]> nested<t: [double], flux: [double], band: [large_string]>
... ... ... ...
... ... ... ... ...
... ... ... ...
... ... ... ...
Dask Name: getitem-fused-blockwisemerge, 1 expression

Note that we can use Dask’s compute() to fully evaluate our dataframe.

[4]:
nf.compute()
[4]:
  a b nested1 nested2
0 0.581025 1.837462
t flux band
9.356234 59.746053 r

1 rows × 3 columns

t flux band
15.052713 19.265926 g

1 rows × 3 columns

0 0.581025 1.837462
9.356234 59.746053 r

1 rows × 3 columns

18.818456 87.363232 r

1 rows × 3 columns

0 0.581025 1.837462
9.356234 59.746053 r

1 rows × 3 columns

12.601287 94.48806 r

1 rows × 3 columns

0 0.581025 1.837462
9.356234 59.746053 r

1 rows × 3 columns

9.785385 42.057208 r

1 rows × 3 columns

0 0.581025 1.837462
9.356234 59.746053 r

1 rows × 3 columns

14.675401 26.918504 r

1 rows × 3 columns

... ... ... ... ...
1525 rows x 4 columns

Saving NestedFrames to Parquet Files#

Additionally we can save an existing NestedFrame as a collection of parquet files using NestedFrame.to_parquet

We save each layer to its own directory, and each Dask partition for that layer to its own parquet file within that directory.

The base layer will be outputted to a directory named “base”, and each nested layer will be written to a directory based on its respective column name.

So the nested layer in column nested1 will be written to directory “nested1”.

[5]:
restored_nf = None

# Note: that we use the `tempfile` module to create and then cleanup a temporary directory.
# You can of course remove this and use your own directory and real files on your system.
with tempfile.TemporaryDirectory() as temp_path:
    nf.to_parquet(
        temp_path,  # The directory to save our output parquet files.
        by_layer=True,  # Each layer will be saved in its own sub directory.
    )
    # List the files in temp_path to ensure they were saved correctly.
    print("The NestedFrame was saved to the following directories :", os.listdir(temp_path))

    # Read the NestedFrame back in from our saved parquet files.
    restored_nf = read_parquet(os.path.join(temp_path, "base"))

    # Read the nested layers from their respective directories.
    nested1 = read_parquet(os.path.join(temp_path, "nested1"))
    nested2 = read_parquet(os.path.join(temp_path, "nested2"))

    # Add the nested layers to the NestedFrame.
    restored_nf = restored_nf.add_nested(nested1, "nested1")
    restored_nf = restored_nf.add_nested(nested2, "nested2")

    # Here we have Dask 'persist' the data in memory now so that we don't have to read it from
    # the source parquet files again (as it may try to do due to lazy evaluation).
    # This is particularly useful since it forces Dask to read the data
    # from the temporary parquet files before they are deleted rather than
    restored_nf = restored_nf.persist()

restored_nf  # our dataframe is restored from our saved parquet files
The NestedFrame was saved to the following directories : ['nested2', 'nested1', 'base']
[5]:
Nested-Dask NestedFrame Structure:
a b nested1 nested2
npartitions=5
float64 float64 nested<t: [double], flux: [double], band: [large_string]> nested<t: [double], flux: [double], band: [large_string]>
... ... ... ...
... ... ... ... ...
... ... ... ...
... ... ... ...
Dask Name: getitem-fused-blockwisemerge, 1 expression
[6]:
restored_nf.compute()
[6]:
  a b nested1 nested2
0 0.581025 1.837462
t flux band
9.356234 59.746053 r

125 rows × 3 columns

t flux band
15.052713 19.265926 g

125 rows × 3 columns

0 0.581025 1.837462
9.356234 59.746053 r

125 rows × 3 columns

15.052713 19.265926 g

125 rows × 3 columns

0 0.581025 1.837462
9.356234 59.746053 r

125 rows × 3 columns

15.052713 19.265926 g

125 rows × 3 columns

0 0.581025 1.837462
9.356234 59.746053 r

125 rows × 3 columns

15.052713 19.265926 g

125 rows × 3 columns

0 0.581025 1.837462
9.356234 59.746053 r

125 rows × 3 columns

15.052713 19.265926 g

125 rows × 3 columns

... ... ... ... ...
1525 rows x 4 columns
[ ]: