Collection#

class dataframely.Collection[source]

Base class for all collections of data frames with a predefined schema.

A collection is comprised of a set of members which are collectively “consistent”, meaning they the collection ensures that invariants are held up across members. This is different to Schema which only ensure invariants within individual members.

In order to properly ensure that invariants hold up across members, members must have a “common primary key”, i.e. there must be an overlap of at least one primary key column across all members. Consequently, a collection is typically used to represent “semantic objects” which cannot be represented in a single data frame due to 1-N relationships that are managed in separate data frames.

A collection must only have type annotations for LazyFrame with known schema:

class MyCollection(dy.Collection):
    first_member: dy.LazyFrame[MyFirstSchema]
    second_member: dy.LazyFrame[MySecondSchema]

Besides, it may define filters (c.f. filter()) and arbitrary methods.

Attention

Do NOT use this class in combination with from __future__ import annotations as it requires the proper schema definitions to ensure that the collection is implemented correctly.

Methods:

cast

Initialize a collection by casting all members into their correct schemas.

collect_all

Collect all members of the collection.

common_primary_key

The primary keys shared by non ignored members of the collection.

create_empty

Create an empty collection without any data.

filter

Filter the members data frame by their schemas and the collection's filters.

ignored_members

The names of all members of the collection that are ignored in filters.

is_valid

Utility method to check whether validate() raises an exception.

join

Filter the collection by joining onto a data frame containing entries for the common primary key columns whose respective rows should be kept or removed in the collection members.

matches

Check whether this collection semantically matches another.

member_schemas

The schemas of all members of the collection.

members

Information about the members of the collection.

non_ignored_members

The names of all members of the collection that are not ignored in filters (default).

optional_members

The names of all optional members of the collection.

read_delta

Read all collection members from Delta Lake tables.

read_parquet

Read all collection members from parquet files in a directory.

required_members

The names of all required members of the collection.

sample

Create a random sample from the members of this collection.

scan_delta

Lazily read all collection members from Delta Lake tables.

scan_parquet

Lazily read all collection members from parquet files in a directory.

serialize

Serialize the metadata for this collection to a JSON string.

sink_parquet

Stream the members of this collection into parquet files in a directory.

to_dict

Return a dictionary representation of this collection.

validate

Validate that a set of data frames satisfy the collection's invariants.

write_delta

Write the members of this collection to Delta Lake tables.

write_parquet

Write the members of this collection to parquet files in a directory.

classmethod cast(
data: Mapping[str, FrameType],
/,
) Self[source]

Initialize a collection by casting all members into their correct schemas.

This method calls cast() on every member, thus, removing superfluous columns and casting to the correct dtypes for all input data frames.

You should typically use validate() or filter() to obtain instances of the collection as this method does not guarantee that the returned collection upholds any invariants. Nonetheless, it may be useful to use in instances where it is known that the provided data adheres to the collection’s invariants.

Parameters:

data – The data for all members. The dictionary must contain exactly one entry per member with the name of the member as key.

Returns:

The initialized collection.

Raises:

ValueError – If an insufficient set of input data frames is provided i.e. if any required member of this collection is missing in the input.

Attention

For lazy frames, casting is not performed eagerly. This prevents collecting the lazy frames’ schemas but also means that a call to polars.LazyFrame.collect() further down the line might fail because of the cast and/or missing columns.

collect_all() Self[source]

Collect all members of the collection.

This method collects all members in parallel for maximum efficiency. It is particularly useful when filter() is called with lazy frame inputs.

Returns:

The same collection with all members collected once.

Note

As all collection members are required to be lazy frames, the returned collection’s members are still “lazy”. However, they are “shallow-lazy”, meaning they are obtained by calling .collect().lazy().

classmethod common_primary_key() list[str][source]

The primary keys shared by non ignored members of the collection.

classmethod create_empty() Self[source]

Create an empty collection without any data.

This method simply calls create_empty() on all member schemas, including non-optional ones.

Returns:

An instance of this collection.

classmethod filter(
data: Mapping[str, FrameType],
/,
*,
cast: bool = False,
eager: bool = True,
) CollectionFilterResult[Self][source]

Filter the members data frame by their schemas and the collection’s filters.

Parameters:
  • data – The members of the collection which ought to be filtered. The dictionary must contain exactly one entry per member with the name of the member as key, except for optional members which may be missing. All data frames passed here will be eagerly collected within the method, regardless of whether they are a DataFrame or LazyFrame.

  • cast – Whether columns with a wrong data type in the member data frame are cast to their schemas’ defined data types if possible.

  • eager – Whether the filter operation should be performed eagerly. Note that until pola-rs/polars#24129 is released, eagerly filtering can provide significant speedups.

Returns:

A named tuple with fields result and failure. The result field provides a collection with all members filtered for the rows passing validation. Just like for validation, all members are guaranteed to maintain their input order. The failure field provides a dictionary mapping member names to their respective failure information.

Raises:

ValueError – If an insufficient set of input data frames is provided, i.e. if any required member of this collection is missing in the input.

classmethod ignored_members() set[str][source]

The names of all members of the collection that are ignored in filters.

classmethod is_valid(
data: Mapping[str, FrameType],
/,
*,
cast: bool = False,
) bool[source]

Utility method to check whether validate() raises an exception.

Parameters:
  • data – The members of the collection which ought to be validated. The dictionary must contain exactly one entry per member with the name of the member as key.

  • cast – Whether columns with a wrong data type in the member data frame are cast to their schemas’ defined data types if possible.

Returns:

Whether the provided members satisfy the invariants of the collection.

Raises:

ValueError – If an insufficient set of input data frames is provided, i.e. if any required member of this collection is missing in the input.

join(
primary_keys: LazyFrame,
how: Literal['semi', 'anti'] = 'semi',
maintain_order: Literal['none', 'left'] = 'none',
) Self[source]

Filter the collection by joining onto a data frame containing entries for the common primary key columns whose respective rows should be kept or removed in the collection members.

Parameters:
  • primary_keys – The data frame to join on. Must contain the common primary key columns of the collection.

  • how – The join strategy to use. Like in polars, semi will keep all rows that can be found in primary_keys, anti will remove them.

  • maintain_order – The maintain_order option to use for the polars join.

Returns:

The collection, with members potentially reduced in length.

Raises:

ValueError – If the collection contains any member that is annotated with ignored_in_filters=True.

Attention

This method does not validate the resulting collection. Ensure to only use this if the resulting collection still satisfies the filters of the collection. The joins are not evaluated eagerly. Therefore, a downstream call to polars.LazyFrame.collect() may fail, especially if primary_keys does not contain all columns for all common primary keys.

classmethod matches(other: type[Collection]) bool[source]

Check whether this collection semantically matches another.

Parameters:

other – The collection to compare with.

Returns:

Whether the two collections are semantically equal.

Attention

For custom filters, reliable comparison results are only guaranteed if the filter always returns a static polars expression. Otherwise, this function may falsely indicate a match.

classmethod member_schemas() dict[str, type[Schema]][source]

The schemas of all members of the collection.

classmethod members() dict[str, MemberInfo][source]

Information about the members of the collection.

classmethod non_ignored_members() set[str][source]

The names of all members of the collection that are not ignored in filters (default).

classmethod optional_members() set[str][source]

The names of all optional members of the collection.

classmethod read_delta(
source: str | Path | deltalake.DeltaTable,
*,
validation: Validation = 'warn',
**kwargs: Any,
) Self[source]

Read all collection members from Delta Lake tables.

This method reads each member from a Delta Lake table at the provided source location. The source can be a path, URI, or an existing DeltaTable object. Optional members are only read if present.

Parameters:
  • source – The location or DeltaTable to read from.

  • validation

    The strategy for running validation when reading the data:

    • "allow": The method tries to read the schema data from the parquet files. If the stored collection schema matches this collection schema, the collection is read without validation. If the stored schema mismatches this schema no metadata can be found in the parquets, or the files have conflicting metadata, this method automatically runs validate() with cast=True.

    • "warn": The method behaves similarly to "allow". However, it prints a warning if validation is necessary.

    • "forbid": The method never runs validation automatically and only returns if the metadata stores a collection schema that matches this collection.

    • "skip": The method never runs validation and simply reads the data, entrusting the user that the schema is valid. Use this option carefully.

  • kwargs – Additional keyword arguments passed directly to polars.read_delta().

Returns:

The initialized collection.

Raises:
  • ValidationRequiredError – If no collection schema can be read from the source and validation is set to "forbid".

  • ValueError – If the provided source does not contain Delta tables for all required members.

  • ValidationError – If the collection cannot be validated.

Attention

Schema metadata is stored as custom commit metadata. Only the schema information from the last commit is used, so any table modifications that are not through dataframely will result in losing the metadata.

Be aware that appending to an existing table via mode=”append” may result in violation of group constraints that dataframely cannot catch without re-validating. Only use appends if you are certain that they do not break your schema.

Be aware that this method suffers from the same limitations as serialize().

classmethod read_parquet(
directory: str | Path,
*,
validation: Literal['allow', 'forbid', 'warn', 'skip'] = 'warn',
**kwargs: Any,
) Self[source]

Read all collection members from parquet files in a directory.

This method searches for files named <member>.parquet in the provided directory for all required and optional members of the collection.

Parameters:
  • directory – The directory where the Parquet files should be read from. Parquet files may have been written with Hive partitioning.

  • validation

    The strategy for running validation when reading the data:

    • "allow": The method tries to read the schema data from the parquet files. If the stored collection schema matches this collection schema, the collection is read without validation. If the stored schema mismatches this schema no metadata can be found in the parquets, or the files have conflicting metadata, this method automatically runs validate() with cast=True.

    • "warn": The method behaves similarly to "allow". However, it prints a warning if validation is necessary.

    • "forbid": The method never runs validation automatically and only returns if the metadata stores a collection schema that matches this collection.

    • "skip": The method never runs validation and simply reads the data, entrusting the user that the schema is valid. Use this option carefully.

  • kwargs – Additional keyword arguments passed directly to polars.read_parquet().

Returns:

The initialized collection.

Raises:
  • ValidationRequiredError – If no collection schema can be read from the directory and validation is set to "forbid".

  • ValueError – If the provided directory does not contain parquet files for all required members.

  • ValidationError – If the collection cannot be validated.

Note

This method is backward compatible with older versions of dataframely in which the schema metadata was saved to schema.json files instead of being encoded into the parquet files.

Attention

Be aware that this method suffers from the same limitations as serialize().

classmethod required_members() set[str][source]

The names of all required members of the collection.

classmethod sample(
num_rows: int | None = None,
*,
overrides: Sequence[Mapping[str, Any]] | None = None,
generator: Generator | None = None,
) Self[source]

Create a random sample from the members of this collection.

Just like sampling for schemas, this method should only be used for testing. Contrary to sampling for schemas, the core difficulty when sampling related values data frames is that they must share primary keys and individual members may have a different number of rows. For this reason, overrides passed to this function must be “row-oriented” (or “sample-oriented”).

Parameters:
  • num_rows – The number of rows to sample for each member. If this is set to None, the number of rows is inferred from the length of the overrides.

  • overrides

    The overrides to set values in member schemas. The overrides must be provided as a list of samples. The structure of the samples must be as follows:

    {
        "<primary_key_1>": <value>,
        "<primary_key_2>": <value>,
        "<member_with_common_primary_key>": {
            "<column_1>": <value>,
            ...
        },
        "<member_with_superkey_of_primary_key>": [
            {
                "<column_1>": <value>,
                ...
            }
        ],
        ...
    }
    

    Any member/value can be left out and will be sampled automatically. Note that overrides for columns of members that are annotated with inline_for_sampling=True can be supplied on the top-level instead of in a nested dictionary.

  • generator – The (seeded) generator to use for sampling data. If None, a generator with random seed is automatically created.

Returns:

A collection where all members (including optional ones) have been sampled according to the input parameters.

Attention

In case the collection has members with a common primary key, the _preprocess_sample() method must return distinct primary key values for each sample. The default implementation does this on a best-effort basis but may cause primary key violations. Hence, it is recommended to override this method and ensure that all primary key columns are set.

Raises:
  • ValueError – If the _preprocess_sample() method does not return all common primary key columns for all samples.

  • ValidationError – If the sampled members violate any of the collection filters. If the collection does not have filters, this error is never raised. To prevent validation errors, overwrite the _preprocess_sample() method appropriately.

classmethod scan_delta(
source: str | Path | deltalake.DeltaTable,
*,
validation: Validation = 'warn',
**kwargs: Any,
) Self[source]

Lazily read all collection members from Delta Lake tables.

This method reads each member from a Delta Lake table at the provided source location. The source can be a path, URI, or an existing DeltaTable object. Optional members are only read if present.

Parameters:
  • source – The location or DeltaTable to read from.

  • validation

    The strategy for running validation when reading the data:

    • "allow": The method tries to read the schema data from the parquet files. If the stored collection schema matches this collection schema, the collection is read without validation. If the stored schema mismatches this schema no metadata can be found in the parquets, or the files have conflicting metadata, this method automatically runs validate() with cast=True.

    • "warn": The method behaves similarly to "allow". However, it prints a warning if validation is necessary.

    • "forbid": The method never runs validation automatically and only returns if the metadata stores a collection schema that matches this collection.

    • "skip": The method never runs validation and simply reads the data, entrusting the user that the schema is valid. Use this option carefully.

  • kwargs – Additional keyword arguments passed to polars.scan_delta().

Returns:

The initialized collection.

Raises:
  • ValidationRequiredError – If no collection schema can be read from the source and validation is set to "forbid".

  • ValueError – If the provided source does not contain Delta tables for all required members.

Note

Due to current limitations in dataframely, this method may read the Delta table into memory if validation is "warn" or "allow" and validation is required.

Attention

Schema metadata is stored as custom commit metadata. Only the schema information from the last commit is used, so any table modifications that are not through dataframely will result in losing the metadata.

Be aware that appending to an existing table via mode=”append” may result in violation of group constraints that dataframely cannot catch without re-validating. Only use appends if you are certain that they do not break your schema.

Be aware that this method suffers from the same limitations as serialize().

classmethod scan_parquet(
directory: str | Path,
*,
validation: Literal['allow', 'forbid', 'warn', 'skip'] = 'warn',
**kwargs: Any,
) Self[source]

Lazily read all collection members from parquet files in a directory.

This method searches for files named <member>.parquet in the provided directory for all required and optional members of the collection.

Parameters:
  • directory – The directory where the Parquet files should be read from. Parquet files may have been written with Hive partitioning.

  • validation

    The strategy for running validation when reading the data:

    • "allow": The method tries to read the schema data from the parquet files. If the stored collection schema matches this collection schema, the collection is read without validation. If the stored schema mismatches this schema no metadata can be found in the parquets, or the files have conflicting metadata, this method automatically runs validate() with cast=True.

    • "warn": The method behaves similarly to "allow". However, it prints a warning if validation is necessary.

    • "forbid": The method never runs validation automatically and only returns if the metadata stores a collection schema that matches this collection.

    • "skip": The method never runs validation and simply reads the data, entrusting the user that the schema is valid. Use this option carefully.

  • kwargs – Additional keyword arguments passed directly to polars.scan_parquet() for all members.

Returns:

The initialized collection.

Raises:
  • ValidationRequiredError – If no collection schema can be read from the directory and validation is set to "forbid".

  • ValueError – If the provided directory does not contain parquet files for all required members.

Note

Due to current limitations in dataframely, this method actually reads the parquet file into memory if "validation" is "warn" or "allow" and validation is required.

Note: This method is backward compatible with older versions of dataframely

in which the schema metadata was saved to schema.json files instead of being encoded into the parquet files.

Attention

Be aware that this method suffers from the same limitations as serialize().

classmethod serialize() str[source]

Serialize the metadata for this collection to a JSON string.

This method does NOT serialize any data frames, but only the _structure_ of the collection, similar to dataframely.Schema.serialize().

Returns:

The serialized collection.

Note

Serialization within dataframely itself will remain backwards-compatible at least within a major version. Until further notice, it will also be backwards-compatible across major versions.

Attention

Serialization of polars expressions and lazy frames is not guaranteed to be stable across versions of polars. This affects collections with filters or members that define custom rules or columns with custom checks: a collection serialized with one version of polars may not be deserializable with another version of polars.

Attention

This functionality is considered unstable. It may be changed at any time without it being considered a breaking change.

Raises:
  • TypeError – If a column of any member contains metadata that is not JSON-serializable.

  • ValueError – If a column of any member is not a “native” dataframely column type but a custom subclass.

sink_parquet(directory: str | Path, **kwargs: Any) None[source]

Stream the members of this collection into parquet files in a directory.

This method writes one parquet file per member into the provided directory. Each parquet file is named <member>.parquet. No file is written for optional members which are not provided in the current collection.

Parameters:
  • directory – The directory where the Parquet files should be written to. If the directory does not exist, it is created automatically, including all of its parents.

  • kwargs – Additional keyword arguments passed to polars.LazyFrame.sink_parquet(). metadata may only be provided if it is a dictionary.

Attention

This method suffers from the same limitations as serialize().

to_dict() dict[str, LazyFrame][source]

Return a dictionary representation of this collection.

classmethod validate(
data: Mapping[str, FrameType],
/,
*,
cast: bool = False,
eager: bool = True,
) Self[source]

Validate that a set of data frames satisfy the collection’s invariants.

Parameters:
  • data – The members of the collection which ought to be validated. The dictionary must contain exactly one entry per member with the name of the member as key.

  • cast – Whether columns with a wrong data type in the member data frame are cast to their schemas’ defined data types if possible.

  • eager – Whether the validation should be performed eagerly. If True, this method raises a validation error and the returned collection contains “shallow” lazy frames, i.e., lazy frames by simply calling lazy() on the validated data frame. If False, this method only raises a ValueError if data does not contain data for all required members. The returned collection contains “true” lazy frames that will be validated upon calling collect() on the individual member or collect_all() on the collection. Note that, in the latter case, information from error messages is limited.

Raises:
  • ValueError – If an insufficient set of input data frames is provided, i.e. if any required member of this collection is missing in the input.

  • ValidationError – If eager=True and any of the input data frames does not satisfy its schema definition or the filters on this collection result in the removal of at least one row across any of the input data frames. If eager=False, a ComputeError is raised upon collecting.

Returns:

An instance of the collection. All members of the collection are guaranteed to be valid with respect to their respective schemas and the filters on this collection did not remove rows from any member. The input order of each member is maintained.

write_delta(target: str | Path | deltalake.DeltaTable, **kwargs: Any) None[source]

Write the members of this collection to Delta Lake tables.

This method writes each member to a Delta Lake table at the provided target location. The target can be a path, URI, or an existing DeltaTable object. No table is written for optional members which are not provided in the current collection.

Parameters:
  • target – The location or DeltaTable where the data should be written. If the location does not exist, it is created automatically, including all of its parents.

  • kwargs – Additional keyword arguments passed to polars.DataFrame.write_delta().

Attention

Schema metadata is stored as custom commit metadata. Only the schema information from the last commit is used, so any table modifications that are not through dataframely will result in losing the metadata.

Be aware that appending to an existing table via mode=”append” may result in violation of group constraints that dataframely cannot catch without re-validating. Only use appends if you are certain that they do not break your schema.

This method suffers from the same limitations as serialize().

write_parquet(directory: str | Path, **kwargs: Any) None[source]

Write the members of this collection to parquet files in a directory.

This method writes one parquet file per member into the provided directory. Each parquet file is named <member>.parquet. No file is written for optional members which are not provided in the current collection.

Parameters:
  • directory – The directory the Parquet files should be written to. If the directory does not exist, it is created automatically, including all of its parents.

  • kwargs – Additional keyword arguments passed to polars.DataFrame.write_parquet(). metadata may only be provided if it is a dictionary.

Attention

This method suffers from the same limitations as serialize().