Source code for nested_dask.core

from __future__ import annotations

import os
from collections.abc import Callable, Mapping
from typing import Any, Literal

import dask.dataframe as dd
import dask.dataframe.dask_expr as dx
import nested_pandas as npd
import numpy as np
import pandas as pd
import pyarrow as pa
from dask.dataframe.dask_expr._collection import new_collection
from dask.dataframe.dask_expr._expr import no_default as dsk_no_default
from nested_pandas.series.dtype import NestedDtype
from nested_pandas.series.packer import pack, pack_flat, pack_lists
from pandas._libs import lib
from pandas._typing import Axis, IndexLabel
from pandas.api.extensions import no_default

# need this for the base _Frame class
# mypy: disable-error-code="misc"


class _Frame(dx.FrameBase):  # type: ignore
    """Base class for extensions of Dask Dataframes."""

    _partition_type = npd.NestedFrame

    def __init__(self, expr):
        super().__init__(expr)

    @property
    def _args(self):
        # Ensure our Dask extension can correctly be used by pickle.
        # See https://github.com/geopandas/dask-geopandas/issues/237
        return super()._args

    def optimize(self, fuse: bool = True):
        result = new_collection(self.expr.optimize(fuse=fuse))
        return result

    def __dask_postpersist__(self):
        func, args = super().__dask_postpersist__()

        return self._rebuild, (func, args)

    def _rebuild(self, graph, func, args):  # type: ignore
        collection = func(graph, *args)
        return collection


def _nested_meta_from_flat(flat, name):
    """construct meta for a packed series from a flat dataframe"""
    pd_fields = flat.dtypes.to_dict()  # grabbing pandas dtypes
    pyarrow_fields = {}  # grab underlying pyarrow dtypes
    for field, dtype in pd_fields.items():
        if hasattr(dtype, "pyarrow_dtype"):
            pyarrow_fields[field] = dtype.pyarrow_dtype
        else:  # or convert from numpy types
            pyarrow_fields[field] = pa.from_numpy_dtype(dtype)
    return pd.Series(name=name, dtype=NestedDtype.from_fields(pyarrow_fields))


[docs] class NestedFrame( _Frame, dd.DataFrame ): # can use dd.DataFrame instead of dx.DataFrame if the config is set true (default in >=2024.3.0) """An extension for a Dask Dataframe that has Nested-Pandas functionality. Examples -------- >>> import nested_dask as nd >>> base = nd.NestedFrame(base_data) >>> layer = nd.NestedFrame(layer_data) >>> base.add_nested(layer, "layer") """ _partition_type = npd.NestedFrame # Tracks the underlying data type
[docs] def __getitem__(self, item): """Adds custom __getitem__ functionality for nested columns""" if isinstance(item, str) and self._is_known_hierarchical_column(item): nested, col = item.split(".") meta = pd.Series(name=col, dtype=pd.ArrowDtype(self.dtypes[nested].fields[col])) return self.map_partitions(lambda x: x[nested].nest.get_flat_series(col), meta=meta) else: return super().__getitem__(item)
[docs] def __setitem__(self, key, value): """Adds custom __setitem__ behavior for nested columns""" # Replacing or adding columns to a nested structure # Allows statements like ndf["nested.t"] = ndf["nested.t"] - 5 # Or ndf["nested.base_t"] = ndf["nested.t"] - 5 # Performance note: This requires building a new nested structure if self._is_known_hierarchical_column(key) or ( "." in key and key.split(".")[0] in self.nested_columns ): nested, col = key.split(".") # View the nested column as a flat df new_flat = self[nested].nest.to_flat() new_flat[col] = value # Handle strings specially if isinstance(value, str): new_flat = new_flat.astype({col: pd.ArrowDtype(pa.string())}) # pack the modified df back into a nested column meta = _nested_meta_from_flat(new_flat, nested) packed = new_flat.map_partitions(lambda x: pack(x, dtype=meta.dtype), meta=meta) return super().__setitem__(nested, packed) # Adding a new nested structure from a column # Allows statements like ndf["new_nested.t"] = ndf["nested.t"] - 5 elif "." in key: new_nested, col = key.split(".") if isinstance(value, dd.Series): value.name = col value = value.to_frame() meta = _nested_meta_from_flat(value, new_nested) packed = value.map_partitions(lambda x: pack(x, dtype=meta.dtype), meta=meta) return super().__setitem__(new_nested, packed) return super().__setitem__(key, value)
def _repr_html_(self): # following dask-geopandas lead output = super()._repr_html_() return output.replace("Dask DataFrame Structure", "Nested-Dask NestedFrame Structure") @classmethod
[docs] def from_pandas( cls, data, npartitions=None, chunksize=None, sort=True, ) -> NestedFrame: """Returns an Nested-Dask NestedFrame constructed from a Nested-Pandas NestedFrame or Pandas DataFrame. Parameters ---------- data: `NestedFrame` or `DataFrame` Nested-Pandas NestedFrame containing the underlying data npartitions: `int`, optional The number of partitions of the index to create. Note that depending on the size and index of the dataframe, the output may have fewer partitions than requested. chunksize: `int`, optional The desired number of rows per index partition to use. Note that depending on the size and index of the dataframe, actual partition sizes may vary. sort: `bool`, optional Whether to sort the frame by a default index. Returns ---------- result: `NestedFrame` The constructed Dask-Nested NestedFrame object. """ result = dd.from_pandas(data, npartitions=npartitions, chunksize=chunksize, sort=sort) return NestedFrame.from_dask_dataframe(result)
@classmethod
[docs] def from_dask_dataframe(cls, df: dd.DataFrame) -> NestedFrame: """Converts a Dask Dataframe to a Dask-Nested NestedFrame Parameters ---------- df: A Dask Dataframe to convert Returns ------- `nested_dask.NestedFrame` """ return df.map_partitions(npd.NestedFrame, meta=npd.NestedFrame(df._meta.copy()))
@classmethod
[docs] def from_delayed(cls, dfs, meta=None, divisions=None, prefix="from-delayed", verify_meta=True): """ Create Nested-Dask NestedFrames from many Dask Delayed objects. Docstring is copied from `dask.dataframe.from_delayed`. Parameters ---------- dfs : A ``dask.delayed.Delayed``, a ``distributed.Future``, or an iterable of either of these objects, e.g. returned by ``client.submit``. These comprise the individual partitions of the resulting dataframe. If a single object is provided (not an iterable), then the resulting dataframe will have only one partition. meta: An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of a DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of (name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providing meta is recommended. For more information, see dask.dataframe.utils.make_meta. divisions : Partition boundaries along the index. For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions For string 'sorted' will compute the delayed values to find index values. Assumes that the indexes are mutually sorted. If None, then won't use index information prefix : Prefix to prepend to the keys. verify_meta : If True check that the partitions have consistent metadata, defaults to True. """ nf = dd.from_delayed(dfs=dfs, meta=meta, divisions=divisions, prefix=prefix, verify_meta=verify_meta) return NestedFrame.from_dask_dataframe(nf)
@classmethod
[docs] def from_map( cls, func, *iterables, args=None, meta=None, divisions=None, label=None, enforce_metadata=True, **kwargs, ): """ Create a DataFrame collection from a custom function map WARNING: The ``from_map`` API is experimental, and stability is not yet guaranteed. Use at your own risk! Parameters ---------- func : callable Function used to create each partition. If ``func`` satisfies the ``DataFrameIOFunction`` protocol, column projection will be enabled. *iterables : Iterable objects Iterable objects to map to each output partition. All iterables must be the same length. This length determines the number of partitions in the output collection (only one element of each iterable will be passed to ``func`` for each partition). args : list or tuple, optional Positional arguments to broadcast to each output partition. Note that these arguments will always be passed to ``func`` after the ``iterables`` positional arguments. meta: An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of a DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of (name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providing meta is recommended. For more information, see dask.dataframe.utils.make_meta. divisions : tuple, str, optional Partition boundaries along the index. For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions For string 'sorted' will compute the delayed values to find index values. Assumes that the indexes are mutually sorted. If None, then won't use index information label : str, optional String to use as the function-name label in the output collection-key names. enforce_metadata : bool, default True Whether to enforce at runtime that the structure of the DataFrame produced by ``func`` actually matches the structure of ``meta``. This will rename and reorder columns for each partition, and will raise an error if this doesn't work, but it won't raise if dtypes don't match. **kwargs: Key-word arguments to broadcast to each output partition. These same arguments will be passed to ``func`` for every output partition. """ nf = dd.from_map( func, *iterables, args=args, meta=meta, divisions=divisions, label=label, enforce_metadata=enforce_metadata, **kwargs, ) return NestedFrame.from_dask_dataframe(nf)
@classmethod
[docs] def from_flat(cls, df, base_columns, nested_columns=None, on=None, name="nested"): """Creates a NestedFrame with base and nested columns from a flat dataframe. Parameters ---------- df: dd.DataFrame or nd.NestedFrame A flat dataframe. base_columns: list-like The columns that should be used as base (flat) columns in the output dataframe. nested_columns: list-like, or None The columns that should be packed into a nested column. All columns in the list will attempt to be packed into a single nested column with the name provided in `nested_name`. If None, is defined as all columns not in `base_columns`. on: str or None The name of a column to use as the new index. Typically, the index should have a unique value per row for base columns, and should repeat for nested columns. For example, a dataframe with two columns; a=[1,1,1,2,2,2] and b=[5,10,15,20,25,30] would want an index like [0,0,0,1,1,1] if a is chosen as a base column. If not provided the current index will be used. name: The name of the output column the `nested_columns` are packed into. Returns ------- NestedFrame A NestedFrame with the specified nesting structure. """ # Handle meta meta = npd.NestedFrame(df[base_columns]._meta) if nested_columns is None: nested_columns = [col for col in df.columns if (col not in base_columns) and col != on] if len(nested_columns) > 0: nested_meta = pack(df[nested_columns]._meta, name) meta = meta.join(nested_meta) return df.map_partitions( lambda x: npd.NestedFrame.from_flat( df=x, base_columns=base_columns, nested_columns=nested_columns, on=on, name=name ), meta=meta, )
@classmethod
[docs] def from_lists(cls, df, base_columns=None, list_columns=None, name="nested"): """Creates a NestedFrame with base and nested columns from a flat dataframe. Parameters ---------- df: dd.DataFrame or nd.NestedFrame A dataframe with list columns. base_columns: list-like, or None Any columns that have non-list values in the input df. These will simply be kept as identical columns in the result list_columns: list-like, or None The list-value columns that should be packed into a nested column. All columns in the list will attempt to be packed into a single nested column with the name provided in `nested_name`. All columns in list_columns must have pyarrow list dtypes, otherwise the operation will fail. If None, is defined as all columns not in `base_columns`. name: The name of the output column the `nested_columns` are packed into. Returns ------- NestedFrame A NestedFrame with the specified nesting structure. Note ---- As noted above, all columns in `list_columns` must have a pyarrow ListType dtype. This is needed for proper meta propagation. To convert a list column to this dtype, you can use this command structure: `nf= nf.astype({"colname": pd.ArrowDtype(pa.list_(pa.int64()))})` Where pa.int64 above should be replaced with the correct dtype of the underlying data accordingly. Additionally, it's a known issue in Dask (https://github.com/dask/dask/issues/10139) that columns with list values will by default be converted to the string type. This will interfere with the ability to recast these to pyarrow lists. We recommend setting the following dask config setting to prevent this: `dask.config.set({"dataframe.convert-string":False})` """ # Resolve inputs for meta if base_columns is None: if list_columns is None: # with no inputs, assume all columns are list-valued list_columns = df.columns else: # if list_columns are defined, assume everything else is base base_columns = [col for col in df.columns if col not in list_columns] else: if list_columns is None: # with defined base_columns, assume everything else is list list_columns = [col for col in df.columns if col not in base_columns] # from_lists should have at least one list column defined if len(list_columns) == 0: raise ValueError("No columns were assigned as list columns.") else: # reject any list columns that are not pyarrow dtyped for col in list_columns: if not hasattr(df[col].dtype, "pyarrow_dtype"): raise TypeError( f"""List column '{col}' dtype ({df[col].dtype}) is not a pyarrow list dtype. Refer to the docstring for guidance on dtype requirements and assignment.""" ) elif not pa.types.is_list(df[col].dtype.pyarrow_dtype): raise TypeError( f"""List column '{col}' dtype ({df[col].dtype}) is not a pyarrow list dtype. Refer to the docstring for guidance on dtype requirements and assignment.""" ) meta = npd.NestedFrame(df[base_columns]._meta) nested_meta = pack_lists(df[list_columns]._meta, name) meta = meta.join(nested_meta) return df.map_partitions( lambda x: npd.NestedFrame.from_lists( df=x, base_columns=base_columns, list_columns=list_columns, name=name ), meta=meta, )
[docs] def compute(self, **kwargs): """Compute this Dask collection, returning the underlying dataframe or series.""" return npd.NestedFrame(super().compute(**kwargs))
@property
[docs] def all_columns(self) -> dict: """returns a dictionary of columns for each base/nested dataframe""" all_columns = {"base": self.columns} for column in self.columns: if isinstance(self[column].dtype, NestedDtype): nest_cols = list(self.dtypes[column].fields.keys()) all_columns[column] = nest_cols return all_columns
@property
[docs] def nested_columns(self) -> list: """retrieves the base column names for all nested dataframes""" nest_cols = [] for column in self.columns: if isinstance(self[column].dtype, NestedDtype): nest_cols.append(column) return nest_cols
def _is_known_hierarchical_column(self, colname) -> bool: """Determine whether a string is a known hierarchical column name""" if "." in colname: left, right = colname.split(".") if left in self.nested_columns: return right in self.all_columns[left] return False return False
[docs] def add_nested(self, nested, name, how="outer") -> NestedFrame: # type: ignore[name-defined] # noqa: F821 """Packs a dataframe into a nested column Parameters ---------- nested: A flat dataframe to pack into a nested column name: The name given to the nested column how: {‘left’, ‘right’, ‘outer’, ‘inner’, ‘cross’}, default ‘outer’ How to handle the operation of the two objects. * left: use calling frame’s index (or column if on is specified) * right: use other’s index. * outer: form union of calling frame’s index (or column if on is specified) with other’s index, and sort it lexicographically. * inner: form intersection of calling frame’s index (or column if on is specified) with other’s index, preserving the order of the calling’s one. * cross: creates the cartesian product from both frames, preserves the order of the left keys. Returns ------- `nested_dask.NestedFrame` """ nested = nested.map_partitions(lambda x: pack_flat(npd.NestedFrame(x))).rename(name) return self.join(nested, how=how)
[docs] def query(self, expr) -> Self: # type: ignore # noqa: F821: """ Query the columns of a NestedFrame with a boolean expression. Specified queries can target nested columns in addition to the typical column set Docstring copied from nested-pandas query Parameters ---------- expr : str The query string to evaluate. Access nested columns using `nested_df.nested_col` (where `nested_df` refers to a particular nested dataframe and `nested_col` is a column of that nested dataframe). You can refer to variables in the environment by prefixing them with an '@' character like ``@a + b``. You can refer to column names that are not valid Python variable names by surrounding them in backticks. Thus, column names containing spaces or punctuations (besides underscores) or starting with digits must be surrounded by backticks. (For example, a column named "Area (cm^2)" would be referenced as ```Area (cm^2)```). Column names which are Python keywords (like "list", "for", "import", etc) cannot be used. For example, if one of your columns is called ``a a`` and you want to sum it with ``b``, your query should be ```a a` + b``. Returns ------- DataFrame DataFrame resulting from the provided query expression. Notes ----- Queries that target a particular nested structure return a dataframe with rows of that particular nested structure filtered. For example, querying the NestedFrame "df" with nested structure "my_nested" as below will return all rows of df, but with mynested filtered by the condition: >>> df.query("mynested.a > 2") """ return self.map_partitions(lambda x: npd.NestedFrame(x).query(expr), meta=self._meta)
[docs] def dropna( self, *, axis: Axis = 0, how: str | lib.NoDefault = no_default, thresh: int | lib.NoDefault = no_default, on_nested: bool = False, subset: IndexLabel | None = None, inplace: bool = False, ignore_index: bool = False, ) -> Self: # type: ignore[name-defined] # noqa: F821: """ Remove missing values for one layer of the NestedFrame. Parameters ---------- axis : {0 or 'index', 1 or 'columns'}, default 0 Determine if rows or columns which contain missing values are removed. * 0, or 'index' : Drop rows which contain missing values. * 1, or 'columns' : Drop columns which contain missing value. Only a single axis is allowed. how : {'any', 'all'}, default 'any' Determine if row or column is removed from DataFrame, when we have at least one NA or all NA. * 'any' : If any NA values are present, drop that row or column. * 'all' : If all values are NA, drop that row or column. thresh : int, optional Require that many non-NA values. Cannot be combined with how. on_nested : str or bool, optional If not False, applies the call to the nested dataframe in the column with label equal to the provided string. If specified, the nested dataframe should align with any columns given in `subset`. subset : column label or sequence of labels, optional Labels along other axis to consider, e.g. if you are dropping rows these would be a list of columns to include. Access nested columns using `nested_df.nested_col` (where `nested_df` refers to a particular nested dataframe and `nested_col` is a column of that nested dataframe). inplace : bool, default False Whether to modify the DataFrame rather than creating a new one. ignore_index : bool, default ``False`` If ``True``, the resulting axis will be labeled 0, 1, …, n - 1. .. versionadded:: 2.0.0 Returns ------- DataFrame or None DataFrame with NA entries dropped from it or None if ``inplace=True``. Notes ----- Operations that target a particular nested structure return a dataframe with rows of that particular nested structure affected. Values for `on_nested` and `subset` should be consistent in pointing to a single layer, multi-layer operations are not supported at this time. """ # propagate meta, assumes row-based operation return self.map_partitions( lambda x: npd.NestedFrame(x).dropna( axis=axis, how=how, thresh=thresh, on_nested=on_nested, subset=subset, inplace=inplace, ignore_index=ignore_index, ), meta=self._meta, )
[docs] def sort_values( self, by: str | list[str], npartitions: int | None = None, ascending: bool | list[bool] = True, na_position: Literal["first"] | Literal["last"] = "last", partition_size: float = 128e6, sort_function: Callable[[pd.DataFrame], pd.DataFrame] | None = None, sort_function_kwargs: Mapping[str, Any] | None = None, upsample: float = 1.0, ignore_index: bool | None = False, shuffle_method: str | None = None, **options, ) -> Self: # type: ignore[name-defined] # noqa: F821: """ Sort the dataset by a single column. Sorting a parallel dataset requires expensive shuffles and is generally not recommended. See ‘set_index‘ for implementation details. Parameters: ----------- by: str or list[str] Column(s) to sort by. npartitions: int, None, or ‘auto’ The ideal number of output partitions. If None, use the same as the input. If ‘auto’ then decide by memory use. Not used when sorting nested layers. ascending: bool or list[bool], optional Sort ascending vs. descending. Defaults to True. Specify list for multiple sort orders. If this is a list of bools, must match the length of the by. na_position: {‘last’, ‘first’}, optional Puts NaNs at the beginning if ‘first’, puts NaN at the end if ‘last’. Defaults to ‘last’. partition_size: float, optional The desired size of each partition in bytes. Defaults to 128e6 (128 MB). Not used in nested sorting. sort_function: function, optional Sorting function to use when sorting underlying partitions. If None, defaults to M.sort_values (the partition library’s implementation of sort_values). Not used when sorting nested layers. sort_function_kwargs: dict, optional Additional keyword arguments to pass to the partition sorting function. By default, by, ascending, and na_position are provided. upsample: float, optional Used to increase the number of samples for quantiles. Not used in nested sorting ignore_index: bool, optional If True, the resulting axis will be labeled 0, 1, …, n - 1. Defaults to False. shuffle_method: str, optional The method to use for shuffling data. Defaults to None. Not used in nested sorting **options: keyword arguments, optional Additional options to pass to the sorting function. Returns: -------- DataFrame DataFrame with sorted values. """ # Resolve target layer targets = [] if isinstance(by, str): by = [by] # Check "by" columns for hierarchical references for col in by: if self._is_known_hierarchical_column(col): targets.append(col.split(".")[0]) else: targets.append("base") # Ensure one target layer, preventing multi-layer operations unq_targets = np.unique(targets).tolist() if len(unq_targets) > 1: raise ValueError("Queries cannot target multiple structs/layers, write a separate query for each") target_layer = unq_targets[0] # Just use dask's sort_values if the target is the base layer # Drops divisions, but this is expected behavior of a sorting operation if target_layer == "base": return super().sort_values( by=by, npartitions=npartitions, ascending=ascending, na_position=na_position, partition_size=partition_size, sort_function=sort_function, sort_function_kwargs=sort_function_kwargs, upsample=upsample, ignore_index=ignore_index, shuffle_method=shuffle_method, **options, ) # If nested target layer, go through nested-pandas API # apply via map_partitions, meta is propagated # does preserve divisions return self.map_partitions( lambda x: npd.NestedFrame(x).sort_values( by=by, ascending=ascending, na_position=na_position, ignore_index=ignore_index, **options, ), meta=self._meta, )
[docs] def reduce(self, func, *args, meta=dsk_no_default, infer_nesting=True, **kwargs) -> NestedFrame: """ Takes a function and applies it to each top-level row of the NestedFrame. docstring copied from nested-pandas The user may specify which columns the function is applied to, with columns from the 'base' layer being passsed to the function as scalars and columns from the nested layers being passed as numpy arrays. Parameters ---------- func : callable Function to apply to each nested dataframe. The first arguments to `func` should be which columns to apply the function to. See the Notes for recommendations on writing func outputs. args : positional arguments Positional arguments to pass to the function, the first *args should be the names of the columns to apply the function to. meta : dataframe or series-like, optional The dask meta of the output. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providing meta is recommended. infer_nesting : bool, default True If True, the function will pack output columns into nested structures based on column names adhering to a nested naming scheme. E.g. "nested.b" and "nested.c" will be packed into a column called "nested" with columns "b" and "c". If False, all outputs will be returned as base columns. kwargs : keyword arguments, optional Keyword arguments to pass to the function. Returns ------- `NestedFrame` `NestedFrame` with the results of the function applied to the columns of the frame. Notes ----- By default, `reduce` will produce a `NestedFrame` with enumerated column names for each returned value of the function. For more useful naming, it's recommended to have `func` return a dictionary where each key is an output column of the dataframe returned by `reduce`. Example User Function: >>> def my_sum(col1, col2): >>> '''reduce will return a NestedFrame with two columns''' >>> return {"sum_col1": sum(col1), "sum_col2": sum(col2)} When using nesting inference (infer_nesting=True), the output may contain nested columns. In such cases, the meta should be provided with the appropriate dtype for these columns. For example, the following function, which produces a nested column "lc": >>> def complex_output(flux): >>> return {"max_flux": np.max(flux), >>> "lc.flux_quantiles": np.quantile(flux, [0.1, 0.2, 0.3, 0.4, 0.5]), >>> "lc.labels": [0.1, 0.2, 0.3, 0.4, 0.5]} Would require the following meta: >>> # create a NestedDtype for the nested column "lc" >>> from nested_pandas.series.dtype import NestedDtype >>> lc_dtype = NestedDtype(pa.struct([pa.field("flux_quantiles", pa.list_(pa.float64())), >>> pa.field("labels", pa.list_(pa.float64()))])) >>> # use the lc_dtype in meta creation >>> result_meta = npd.NestedFrame({'max_flux':pd.Series([], dtype='float'), >>> 'lc':pd.Series([], dtype=lc_dtype)}) """ # Handle meta shorthands to produce nestedframe output # route standard dict meta to nestedframe if isinstance(meta, dict): series_dict = {item[0]: pd.Series(dtype=item[1]) for item in meta.items()} meta = npd.NestedFrame(series_dict) # reroute series meta to nestedframe, per consistency with nested-pandas elif isinstance(meta, tuple) and len(meta) == 2: # len 2 to only try on proper series meta meta = npd.NestedFrame(pd.Series(name=meta[0], dtype=meta[1]).to_frame()) # apply nested_pandas reduce via map_partitions # wrap the partition in a npd.NestedFrame call for: # https://github.com/lincc-frameworks/nested-dask/issues/21 return self.map_partitions( lambda x: npd.NestedFrame(x).reduce(func, *args, infer_nesting=infer_nesting, **kwargs), meta=meta )
[docs] def to_parquet(self, path, by_layer=True, **kwargs) -> None: """Creates parquet file(s) with the data of a NestedFrame, either as a single parquet file directory where each nested dataset is packed into its own column or as an individual parquet file directory for each layer. Docstring copied from nested-pandas. Note that here we always opt to use the pyarrow engine for writing parquet files. Parameters ---------- path : str The path to the parquet directory to be written. by_layer : bool, default True NOTE: by_layer=False will not reliably preserve divisions currently, be warned when using it that loading from such a dataset will likely require you to reset and set the index to generate divisions information. If False, writes the entire NestedFrame to a single parquet directory. If True, writes each layer to a separate parquet sub-directory within the directory specified by path. The filename for each outputted file will be named after its layer. For example for the base layer this is always "base". kwargs : keyword arguments, optional Keyword arguments to pass to the function. Returns ------- None """ # code copied from nested-pandas rather than wrapped # reason being that a map_partitions call is probably not well-behaved here? if not by_layer: # Todo: Investigate this more # Divisions cannot be generated from a parquet file that stores # nested information without a reset_index().set_index() loop. It # seems like this happens at the to_parquet level rather than # in read_parquet as dropping the nested columns from the dataframe # to save does enable divisions to be found, but removing the # nested columns from the set of columns to load does not. # Divisions are going to be crucial, and so I think it's best to # not support this until this is resolved. However the non-by_layer # mode is needed for by_layer so it may be best to just settle for # changing the default and filing a higher-priority bug. # raise NotImplementedError # We just defer to the pandas to_parquet method if we're not writing by layer # or there is only one layer in the NestedFrame. super().to_parquet(path, engine="pyarrow", **kwargs) else: # Write the base layer to a parquet file base_frame = self.drop(columns=self.nested_columns) base_frame.to_parquet(os.path.join(path, "base"), by_layer=False, **kwargs) # Write each nested layer to a parquet file for layer in self.all_columns: if layer != "base": path_layer = os.path.join(path, f"{layer}") self[layer].nest.to_flat().to_parquet(path_layer, engine="pyarrow", **kwargs) return None