Collection#
- class dataframely.Collection[source]
Base class for all collections of data frames with a predefined schema.
A collection is comprised of a set of members which are collectively “consistent”, meaning they the collection ensures that invariants are held up across members. This is different to
Schemawhich only ensure invariants within individual members.In order to properly ensure that invariants hold up across members, members must have a “common primary key”, i.e. there must be an overlap of at least one primary key column across all members. Consequently, a collection is typically used to represent “semantic objects” which cannot be represented in a single data frame due to 1-N relationships that are managed in separate data frames.
A collection must only have type annotations for
LazyFramewith known schema:class MyCollection(dy.Collection): first_member: dy.LazyFrame[MyFirstSchema] second_member: dy.LazyFrame[MySecondSchema]
Besides, it may define filters (c.f.
filter()) and arbitrary methods.Attention
Do NOT use this class in combination with
from __future__ import annotationsas it requires the proper schema definitions to ensure that the collection is implemented correctly.Methods:
Initialize a collection by casting all members into their correct schemas.
Collect all members of the collection.
The primary keys shared by non ignored members of the collection.
Create an empty collection without any data.
Filter the members data frame by their schemas and the collection's filters.
The names of all members of the collection that are ignored in filters.
Utility method to check whether
validate()raises an exception.Filter the collection by joining onto a data frame containing entries for the common primary key columns whose respective rows should be kept or removed in the collection members.
Check whether this collection semantically matches another.
The schemas of all members of the collection.
Information about the members of the collection.
The names of all members of the collection that are not ignored in filters (default).
The names of all optional members of the collection.
Read all collection members from Delta Lake tables.
Read all collection members from parquet files in a directory.
The names of all required members of the collection.
Create a random sample from the members of this collection.
Lazily read all collection members from Delta Lake tables.
Lazily read all collection members from parquet files in a directory.
Serialize the metadata for this collection to a JSON string.
Stream the members of this collection into parquet files in a directory.
Return a dictionary representation of this collection.
Validate that a set of data frames satisfy the collection's invariants.
Write the members of this collection to Delta Lake tables.
Write the members of this collection to parquet files in a directory.
- classmethod cast( ) Self[source]
Initialize a collection by casting all members into their correct schemas.
This method calls
cast()on every member, thus, removing superfluous columns and casting to the correct dtypes for all input data frames.You should typically use
validate()orfilter()to obtain instances of the collection as this method does not guarantee that the returned collection upholds any invariants. Nonetheless, it may be useful to use in instances where it is known that the provided data adheres to the collection’s invariants.- Parameters:
data – The data for all members. The dictionary must contain exactly one entry per member with the name of the member as key.
- Returns:
The initialized collection.
- Raises:
ValueError – If an insufficient set of input data frames is provided i.e. if any required member of this collection is missing in the input.
Attention
For lazy frames, casting is not performed eagerly. This prevents collecting the lazy frames’ schemas but also means that a call to
polars.LazyFrame.collect()further down the line might fail because of the cast and/or missing columns.
- collect_all() Self[source]
Collect all members of the collection.
This method collects all members in parallel for maximum efficiency. It is particularly useful when
filter()is called with lazy frame inputs.- Returns:
The same collection with all members collected once.
Note
As all collection members are required to be lazy frames, the returned collection’s members are still “lazy”. However, they are “shallow-lazy”, meaning they are obtained by calling
.collect().lazy().
- classmethod common_primary_key() list[str][source]
The primary keys shared by non ignored members of the collection.
- classmethod create_empty() Self[source]
Create an empty collection without any data.
This method simply calls
create_empty()on all member schemas, including non-optional ones.- Returns:
An instance of this collection.
- classmethod filter( ) CollectionFilterResult[Self][source]
Filter the members data frame by their schemas and the collection’s filters.
- Parameters:
data – The members of the collection which ought to be filtered. The dictionary must contain exactly one entry per member with the name of the member as key, except for optional members which may be missing. All data frames passed here will be eagerly collected within the method, regardless of whether they are a
DataFrameorLazyFrame.cast – Whether columns with a wrong data type in the member data frame are cast to their schemas’ defined data types if possible.
eager – Whether the filter operation should be performed eagerly. Note that until pola-rs/polars#24129 is released, eagerly filtering can provide significant speedups.
- Returns:
A named tuple with fields
resultandfailure. Theresultfield provides a collection with all members filtered for the rows passing validation. Just like for validation, all members are guaranteed to maintain their input order. Thefailurefield provides a dictionary mapping member names to their respective failure information.- Raises:
ValueError – If an insufficient set of input data frames is provided, i.e. if any required member of this collection is missing in the input.
- classmethod ignored_members() set[str][source]
The names of all members of the collection that are ignored in filters.
- classmethod is_valid( ) bool[source]
Utility method to check whether
validate()raises an exception.- Parameters:
data – The members of the collection which ought to be validated. The dictionary must contain exactly one entry per member with the name of the member as key.
cast – Whether columns with a wrong data type in the member data frame are cast to their schemas’ defined data types if possible.
- Returns:
Whether the provided members satisfy the invariants of the collection.
- Raises:
ValueError – If an insufficient set of input data frames is provided, i.e. if any required member of this collection is missing in the input.
- join(
- primary_keys: LazyFrame,
- how: Literal['semi', 'anti'] = 'semi',
- maintain_order: Literal['none', 'left'] = 'none',
Filter the collection by joining onto a data frame containing entries for the common primary key columns whose respective rows should be kept or removed in the collection members.
- Parameters:
primary_keys – The data frame to join on. Must contain the common primary key columns of the collection.
how – The join strategy to use. Like in polars,
semiwill keep all rows that can be found inprimary_keys,antiwill remove them.maintain_order – The
maintain_orderoption to use for the polars join.
- Returns:
The collection, with members potentially reduced in length.
- Raises:
ValueError – If the collection contains any member that is annotated with
ignored_in_filters=True.
Attention
This method does not validate the resulting collection. Ensure to only use this if the resulting collection still satisfies the filters of the collection. The joins are not evaluated eagerly. Therefore, a downstream call to
polars.LazyFrame.collect()may fail, especially ifprimary_keysdoes not contain all columns for all common primary keys.
- classmethod matches(other: type[Collection]) bool[source]
Check whether this collection semantically matches another.
- Parameters:
other – The collection to compare with.
- Returns:
Whether the two collections are semantically equal.
Attention
For custom filters, reliable comparison results are only guaranteed if the filter always returns a static polars expression. Otherwise, this function may falsely indicate a match.
- classmethod member_schemas() dict[str, type[Schema]][source]
The schemas of all members of the collection.
- classmethod members() dict[str, MemberInfo][source]
Information about the members of the collection.
- classmethod non_ignored_members() set[str][source]
The names of all members of the collection that are not ignored in filters (default).
- classmethod optional_members() set[str][source]
The names of all optional members of the collection.
- classmethod read_delta( ) Self[source]
Read all collection members from Delta Lake tables.
This method reads each member from a Delta Lake table at the provided source location. The source can be a path, URI, or an existing DeltaTable object. Optional members are only read if present.
- Parameters:
source – The location or DeltaTable to read from.
validation –
The strategy for running validation when reading the data:
"allow": The method tries to read the schema data from the parquet files. If the stored collection schema matches this collection schema, the collection is read without validation. If the stored schema mismatches this schema no metadata can be found in the parquets, or the files have conflicting metadata, this method automatically runsvalidate()withcast=True."warn": The method behaves similarly to"allow". However, it prints a warning if validation is necessary."forbid": The method never runs validation automatically and only returns if the metadata stores a collection schema that matches this collection."skip": The method never runs validation and simply reads the data, entrusting the user that the schema is valid. Use this option carefully.
kwargs – Additional keyword arguments passed directly to
polars.read_delta().
- Returns:
The initialized collection.
- Raises:
ValidationRequiredError – If no collection schema can be read from the source and
validationis set to"forbid".ValueError – If the provided source does not contain Delta tables for all required members.
ValidationError – If the collection cannot be validated.
Attention
Schema metadata is stored as custom commit metadata. Only the schema information from the last commit is used, so any table modifications that are not through dataframely will result in losing the metadata.
Be aware that appending to an existing table via mode=”append” may result in violation of group constraints that dataframely cannot catch without re-validating. Only use appends if you are certain that they do not break your schema.
Be aware that this method suffers from the same limitations as
serialize().
- classmethod read_parquet(
- directory: str | Path,
- *,
- validation: Literal['allow', 'forbid', 'warn', 'skip'] = 'warn',
- **kwargs: Any,
Read all collection members from parquet files in a directory.
This method searches for files named
<member>.parquetin the provided directory for all required and optional members of the collection.- Parameters:
directory – The directory where the Parquet files should be read from. Parquet files may have been written with Hive partitioning.
validation –
The strategy for running validation when reading the data:
"allow": The method tries to read the schema data from the parquet files. If the stored collection schema matches this collection schema, the collection is read without validation. If the stored schema mismatches this schema no metadata can be found in the parquets, or the files have conflicting metadata, this method automatically runsvalidate()withcast=True."warn": The method behaves similarly to"allow". However, it prints a warning if validation is necessary."forbid": The method never runs validation automatically and only returns if the metadata stores a collection schema that matches this collection."skip": The method never runs validation and simply reads the data, entrusting the user that the schema is valid. Use this option carefully.
kwargs – Additional keyword arguments passed directly to
polars.read_parquet().
- Returns:
The initialized collection.
- Raises:
ValidationRequiredError – If no collection schema can be read from the directory and
validationis set to"forbid".ValueError – If the provided directory does not contain parquet files for all required members.
ValidationError – If the collection cannot be validated.
Note
This method is backward compatible with older versions of dataframely in which the schema metadata was saved to
schema.jsonfiles instead of being encoded into the parquet files.Attention
Be aware that this method suffers from the same limitations as
serialize().
- classmethod required_members() set[str][source]
The names of all required members of the collection.
- classmethod sample(
- num_rows: int | None = None,
- *,
- overrides: Sequence[Mapping[str, Any]] | None = None,
- generator: Generator | None = None,
Create a random sample from the members of this collection.
Just like sampling for schemas, this method should only be used for testing. Contrary to sampling for schemas, the core difficulty when sampling related values data frames is that they must share primary keys and individual members may have a different number of rows. For this reason, overrides passed to this function must be “row-oriented” (or “sample-oriented”).
- Parameters:
num_rows – The number of rows to sample for each member. If this is set to
None, the number of rows is inferred from the length of the overrides.overrides –
The overrides to set values in member schemas. The overrides must be provided as a list of samples. The structure of the samples must be as follows:
{ "<primary_key_1>": <value>, "<primary_key_2>": <value>, "<member_with_common_primary_key>": { "<column_1>": <value>, ... }, "<member_with_superkey_of_primary_key>": [ { "<column_1>": <value>, ... } ], ... }
Any member/value can be left out and will be sampled automatically. Note that overrides for columns of members that are annotated with
inline_for_sampling=Truecan be supplied on the top-level instead of in a nested dictionary.generator – The (seeded) generator to use for sampling data. If
None, a generator with random seed is automatically created.
- Returns:
A collection where all members (including optional ones) have been sampled according to the input parameters.
Attention
In case the collection has members with a common primary key, the
_preprocess_sample()method must return distinct primary key values for each sample. The default implementation does this on a best-effort basis but may cause primary key violations. Hence, it is recommended to override this method and ensure that all primary key columns are set.- Raises:
ValueError – If the
_preprocess_sample()method does not return all common primary key columns for all samples.ValidationError – If the sampled members violate any of the collection filters. If the collection does not have filters, this error is never raised. To prevent validation errors, overwrite the
_preprocess_sample()method appropriately.
- classmethod scan_delta( ) Self[source]
Lazily read all collection members from Delta Lake tables.
This method reads each member from a Delta Lake table at the provided source location. The source can be a path, URI, or an existing DeltaTable object. Optional members are only read if present.
- Parameters:
source – The location or DeltaTable to read from.
validation –
The strategy for running validation when reading the data:
"allow": The method tries to read the schema data from the parquet files. If the stored collection schema matches this collection schema, the collection is read without validation. If the stored schema mismatches this schema no metadata can be found in the parquets, or the files have conflicting metadata, this method automatically runsvalidate()withcast=True."warn": The method behaves similarly to"allow". However, it prints a warning if validation is necessary."forbid": The method never runs validation automatically and only returns if the metadata stores a collection schema that matches this collection."skip": The method never runs validation and simply reads the data, entrusting the user that the schema is valid. Use this option carefully.
kwargs – Additional keyword arguments passed to
polars.scan_delta().
- Returns:
The initialized collection.
- Raises:
ValidationRequiredError – If no collection schema can be read from the source and
validationis set to"forbid".ValueError – If the provided source does not contain Delta tables for all required members.
Note
Due to current limitations in dataframely, this method may read the Delta table into memory if
validationis"warn"or"allow"and validation is required.Attention
Schema metadata is stored as custom commit metadata. Only the schema information from the last commit is used, so any table modifications that are not through dataframely will result in losing the metadata.
Be aware that appending to an existing table via mode=”append” may result in violation of group constraints that dataframely cannot catch without re-validating. Only use appends if you are certain that they do not break your schema.
Be aware that this method suffers from the same limitations as
serialize().
- classmethod scan_parquet(
- directory: str | Path,
- *,
- validation: Literal['allow', 'forbid', 'warn', 'skip'] = 'warn',
- **kwargs: Any,
Lazily read all collection members from parquet files in a directory.
This method searches for files named
<member>.parquetin the provided directory for all required and optional members of the collection.- Parameters:
directory – The directory where the Parquet files should be read from. Parquet files may have been written with Hive partitioning.
validation –
The strategy for running validation when reading the data:
"allow": The method tries to read the schema data from the parquet files. If the stored collection schema matches this collection schema, the collection is read without validation. If the stored schema mismatches this schema no metadata can be found in the parquets, or the files have conflicting metadata, this method automatically runsvalidate()withcast=True."warn": The method behaves similarly to"allow". However, it prints a warning if validation is necessary."forbid": The method never runs validation automatically and only returns if the metadata stores a collection schema that matches this collection."skip": The method never runs validation and simply reads the data, entrusting the user that the schema is valid. Use this option carefully.
kwargs – Additional keyword arguments passed directly to
polars.scan_parquet()for all members.
- Returns:
The initialized collection.
- Raises:
ValidationRequiredError – If no collection schema can be read from the directory and
validationis set to"forbid".ValueError – If the provided directory does not contain parquet files for all required members.
Note
Due to current limitations in dataframely, this method actually reads the parquet file into memory if
"validation"is"warn"or"allow"and validation is required.- Note: This method is backward compatible with older versions of dataframely
in which the schema metadata was saved to
schema.jsonfiles instead of being encoded into the parquet files.
Attention
Be aware that this method suffers from the same limitations as
serialize().
- classmethod serialize() str[source]
Serialize the metadata for this collection to a JSON string.
This method does NOT serialize any data frames, but only the _structure_ of the collection, similar to
dataframely.Schema.serialize().- Returns:
The serialized collection.
Note
Serialization within dataframely itself will remain backwards-compatible at least within a major version. Until further notice, it will also be backwards-compatible across major versions.
Attention
Serialization of
polarsexpressions and lazy frames is not guaranteed to be stable across versions of polars. This affects collections with filters or members that define custom rules or columns with custom checks: a collection serialized with one version of polars may not be deserializable with another version of polars.Attention
This functionality is considered unstable. It may be changed at any time without it being considered a breaking change.
- Raises:
TypeError – If a column of any member contains metadata that is not JSON-serializable.
ValueError – If a column of any member is not a “native” dataframely column type but a custom subclass.
- sink_parquet(directory: str | Path, **kwargs: Any) None[source]
Stream the members of this collection into parquet files in a directory.
This method writes one parquet file per member into the provided directory. Each parquet file is named
<member>.parquet. No file is written for optional members which are not provided in the current collection.- Parameters:
directory – The directory where the Parquet files should be written to. If the directory does not exist, it is created automatically, including all of its parents.
kwargs – Additional keyword arguments passed to
polars.LazyFrame.sink_parquet().metadatamay only be provided if it is a dictionary.
Attention
This method suffers from the same limitations as
serialize().
- classmethod validate( ) Self[source]
Validate that a set of data frames satisfy the collection’s invariants.
- Parameters:
data – The members of the collection which ought to be validated. The dictionary must contain exactly one entry per member with the name of the member as key.
cast – Whether columns with a wrong data type in the member data frame are cast to their schemas’ defined data types if possible.
eager – Whether the validation should be performed eagerly. If
True, this method raises a validation error and the returned collection contains “shallow” lazy frames, i.e., lazy frames by simply callinglazy()on the validated data frame. IfFalse, this method only raises aValueErrorifdatadoes not contain data for all required members. The returned collection contains “true” lazy frames that will be validated upon callingcollect()on the individual member orcollect_all()on the collection. Note that, in the latter case, information from error messages is limited.
- Raises:
ValueError – If an insufficient set of input data frames is provided, i.e. if any required member of this collection is missing in the input.
ValidationError – If
eager=Trueand any of the input data frames does not satisfy its schema definition or the filters on this collection result in the removal of at least one row across any of the input data frames. Ifeager=False, aComputeErroris raised upon collecting.
- Returns:
An instance of the collection. All members of the collection are guaranteed to be valid with respect to their respective schemas and the filters on this collection did not remove rows from any member. The input order of each member is maintained.
- write_delta(target: str | Path | deltalake.DeltaTable, **kwargs: Any) None[source]
Write the members of this collection to Delta Lake tables.
This method writes each member to a Delta Lake table at the provided target location. The target can be a path, URI, or an existing DeltaTable object. No table is written for optional members which are not provided in the current collection.
- Parameters:
target – The location or DeltaTable where the data should be written. If the location does not exist, it is created automatically, including all of its parents.
kwargs – Additional keyword arguments passed to
polars.DataFrame.write_delta().
Attention
Schema metadata is stored as custom commit metadata. Only the schema information from the last commit is used, so any table modifications that are not through dataframely will result in losing the metadata.
Be aware that appending to an existing table via mode=”append” may result in violation of group constraints that dataframely cannot catch without re-validating. Only use appends if you are certain that they do not break your schema.
This method suffers from the same limitations as
serialize().
- write_parquet(directory: str | Path, **kwargs: Any) None[source]
Write the members of this collection to parquet files in a directory.
This method writes one parquet file per member into the provided directory. Each parquet file is named
<member>.parquet. No file is written for optional members which are not provided in the current collection.- Parameters:
directory – The directory the Parquet files should be written to. If the directory does not exist, it is created automatically, including all of its parents.
kwargs – Additional keyword arguments passed to
polars.DataFrame.write_parquet().metadatamay only be provided if it is a dictionary.
Attention
This method suffers from the same limitations as
serialize().