Loading Data into Nested-Dask#
[1]:
import os
import tempfile
import nested_dask as nd
import nested_pandas as npd
from nested_dask import read_parquet
from nested_dask.datasets import generate_parquet_file
From Nested-Pandas#
Nested-Dask can load data from Nested-Pandas NestedFrame objects by using the from_pandas class function.
[2]:
# Create a Nested-Pandas NestedFrame
nf = npd.NestedFrame(data={"a": [1, 2, 3], "b": [2, 4, 6]}, index=[0, 1, 2])
nested = npd.NestedFrame(
data={"c": [0, 2, 4, 1, 4, 3, 1, 4, 1], "d": [5, 4, 7, 5, 3, 1, 9, 3, 4]},
index=[0, 0, 0, 1, 1, 1, 2, 2, 2],
)
nf = nf.add_nested(nested, "nested")
# Convert to Nested-Dask NestedFrame
nf = nd.NestedFrame.from_pandas(nf)
nf
[2]:
| a | b | nested | |
|---|---|---|---|
| npartitions=1 | |||
| 0 | int64 | int64 | nested<c: [int64], d: [int64]> |
| 2 | ... | ... | ... |
Loading from Parquet#
For larger datasets, we support loading data from parquet files.
In the following cell, we generate a series of temporary parquet files with random data, and ingest them with the read_parquet method.
Then we use read_parquet to read each layer’s parquet files into their own NestedFrame. Then we again use add_nested to pack these into a single NestedFrame, nf.
Note that for each layer of our NestedFrame we expect a directory of parquet files where each file will be its own Dask partition.
[3]:
nf = None
# Note: that we use the `tempfile` module to create and then cleanup a temporary directory.
# You can of course remove this and use your own directory and real files on your system.
with tempfile.TemporaryDirectory() as temp_path:
# Generates parquet files with random data within our temporary directory.
generate_parquet_file(
10, # The number of rows to generated in the base layer
{
"nested1": 100, # Generate a nested layer named 'nested1' with 100 rows.
"nested2": 10,
}, # Generate a nested layer nameed 'nested2' with 10 rows.
temp_path, # The root temporary directory to store our generated parquet files.
npartitions=5, # The number of Dask partitions for each layer.
file_per_layer=True, # Generates a unique directory of parquet files for each layer
)
# Note that each layer of our NestedFrame will be in its own directory,
# with a parquet file for each Dask partition.
parquet_dirs = [
os.path.join(temp_path, "base"),
os.path.join(temp_path, "nested1"),
os.path.join(temp_path, "nested2"),
]
for path in parquet_dirs:
print(f"Directory {path} has the following parquet files {os.listdir(path)}.")
# Create a single NestedFrame for our base layer from the directory containing the parquet files
# for each of its partitions.
nf = read_parquet(path=os.path.join(temp_path, "base"))
# Read the nested layers from their respective directories.
nested1 = read_parquet(os.path.join(temp_path, "nested1"))
nested1 = nested1.persist()
nested2 = read_parquet(os.path.join(temp_path, "nested2"))
nested2 = nested2.persist()
# Add the nested layers to the NestedFrame.
nf = nf.add_nested(nested1, "nested1")
nf = nf.add_nested(nested2, "nested2")
# Here we have Dask 'persist' the data in memory now so that we don't have to read it from
# the source parquet files again (as it may try to do due to lazy evaluation).
# This is particularly useful since it forces Dask to read the data
# from the temporary parquet files before they are deleted rather than
nf = nf.persist()
nf
Directory /tmp/tmpber1nkgn/base has the following parquet files ['part.1.parquet', 'part.3.parquet', 'part.0.parquet', 'part.2.parquet', 'part.4.parquet'].
Directory /tmp/tmpber1nkgn/nested1 has the following parquet files ['part.1.parquet', 'part.3.parquet', 'part.0.parquet', 'part.2.parquet', 'part.4.parquet'].
Directory /tmp/tmpber1nkgn/nested2 has the following parquet files ['part.1.parquet', 'part.3.parquet', 'part.0.parquet', 'part.2.parquet', 'part.4.parquet'].
[3]:
| a | b | nested1 | nested2 | |
|---|---|---|---|---|
| npartitions=5 | ||||
| float64 | float64 | nested<t: [double], flux: [double], band: [large_string]> | nested<t: [double], flux: [double], band: [large_string]> | |
| ... | ... | ... | ... | |
| ... | ... | ... | ... | ... |
| ... | ... | ... | ... | |
| ... | ... | ... | ... |
Note that we can use Dask’s compute() to fully evaluate our dataframe.
[4]:
nf.compute()
[4]:
| a | b | nested1 | nested2 | |||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 0.581025 | 1.837462 |
1 rows × 3 columns |
1 rows × 3 columns |
||||||||||||
| 0 | 0.581025 | 1.837462 |
1 rows × 3 columns |
1 rows × 3 columns |
||||||||||||
| 0 | 0.581025 | 1.837462 |
1 rows × 3 columns |
1 rows × 3 columns |
||||||||||||
| 0 | 0.581025 | 1.837462 |
1 rows × 3 columns |
1 rows × 3 columns |
||||||||||||
| 0 | 0.581025 | 1.837462 |
1 rows × 3 columns |
1 rows × 3 columns |
||||||||||||
| ... | ... | ... | ... | ... |
Saving NestedFrames to Parquet Files#
Additionally we can save an existing NestedFrame as a collection of parquet files using NestedFrame.to_parquet
We save each layer to its own directory, and each Dask partition for that layer to its own parquet file within that directory.
The base layer will be outputted to a directory named “base”, and each nested layer will be written to a directory based on its respective column name.
So the nested layer in column nested1 will be written to directory “nested1”.
[5]:
restored_nf = None
# Note: that we use the `tempfile` module to create and then cleanup a temporary directory.
# You can of course remove this and use your own directory and real files on your system.
with tempfile.TemporaryDirectory() as temp_path:
nf.to_parquet(
temp_path, # The directory to save our output parquet files.
by_layer=True, # Each layer will be saved in its own sub directory.
)
# List the files in temp_path to ensure they were saved correctly.
print("The NestedFrame was saved to the following directories :", os.listdir(temp_path))
# Read the NestedFrame back in from our saved parquet files.
restored_nf = read_parquet(os.path.join(temp_path, "base"))
# Read the nested layers from their respective directories.
nested1 = read_parquet(os.path.join(temp_path, "nested1"))
nested2 = read_parquet(os.path.join(temp_path, "nested2"))
# Add the nested layers to the NestedFrame.
restored_nf = restored_nf.add_nested(nested1, "nested1")
restored_nf = restored_nf.add_nested(nested2, "nested2")
# Here we have Dask 'persist' the data in memory now so that we don't have to read it from
# the source parquet files again (as it may try to do due to lazy evaluation).
# This is particularly useful since it forces Dask to read the data
# from the temporary parquet files before they are deleted rather than
restored_nf = restored_nf.persist()
restored_nf # our dataframe is restored from our saved parquet files
The NestedFrame was saved to the following directories : ['nested2', 'nested1', 'base']
[5]:
| a | b | nested1 | nested2 | |
|---|---|---|---|---|
| npartitions=5 | ||||
| float64 | float64 | nested<t: [double], flux: [double], band: [large_string]> | nested<t: [double], flux: [double], band: [large_string]> | |
| ... | ... | ... | ... | |
| ... | ... | ... | ... | ... |
| ... | ... | ... | ... | |
| ... | ... | ... | ... |
[6]:
restored_nf.compute()
[6]:
| a | b | nested1 | nested2 | |||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 0.581025 | 1.837462 |
125 rows × 3 columns |
125 rows × 3 columns |
||||||||||||
| 0 | 0.581025 | 1.837462 |
125 rows × 3 columns |
125 rows × 3 columns |
||||||||||||
| 0 | 0.581025 | 1.837462 |
125 rows × 3 columns |
125 rows × 3 columns |
||||||||||||
| 0 | 0.581025 | 1.837462 |
125 rows × 3 columns |
125 rows × 3 columns |
||||||||||||
| 0 | 0.581025 | 1.837462 |
125 rows × 3 columns |
125 rows × 3 columns |
||||||||||||
| ... | ... | ... | ... | ... |
[ ]: