Real-world example for hospital invoices#

This notebook will demonstrate how to use dataframely in a real-world example in the context of hospital invoices. The data model is a 1-N relationship between invoices and diagnoses.

[ ]:
import dataframely as dy
import polars as pl
from decimal import Decimal
from datetime import datetime, date

Validating individual data frames#

The base schema for the data frame containing the hospital invoices defines all columns with their respective column types.

[2]:
class InvoiceSchema(dy.Schema):
    invoice_id = dy.String()
    admission_date = dy.Date()
    discharge_date = dy.Date()
    received_at = dy.Datetime()
    amount = dy.Decimal()

Adding column constraints to the schema#

dataframely enables users to define uniqueness constraints (e.g., primary key columns), nullability, regular expressions, limits, or other per-column validation rules.

[3]:
class InvoiceSchema(dy.Schema):
    invoice_id = dy.String(primary_key=True)
    admission_date = dy.Date(nullable=False)
    discharge_date = dy.Date(nullable=False)
    received_at = dy.Datetime(nullable=False)
    amount = dy.Decimal(nullable=False, min_exclusive=Decimal(0))

Adding cross-column validation rules#

Validation rules may span across multiple columns. By using the @dy.rule decorator, we can easily define cross-column validation rules using polars expressions.

[4]:
class InvoiceSchema(dy.Schema):
    invoice_id = dy.String(primary_key=True)
    admission_date = dy.Date(nullable=False)
    discharge_date = dy.Date(nullable=False)
    received_at = dy.Datetime(nullable=False)
    amount = dy.Decimal(nullable=False, min_exclusive=Decimal(0))

    @dy.rule()
    def discharge_after_admission() -> pl.Expr:
        return pl.col("discharge_date") >= pl.col("admission_date")

    @dy.rule()
    def received_at_after_discharge() -> pl.Expr:
        return pl.col("received_at").dt.date() >= pl.col("discharge_date")

Validating a data frame#

To validate a data frame, we can pass the pl.DataFrame or pl.LazyFrame into the validate method. If we want to coerce the column types to the types specified in the schema, we can pass cast=True.

[5]:
invoices = pl.DataFrame(
    {
        "invoice_id": ["001", "002", "003"],
        "admission_date": [date(2025, 1, 1), date(2025, 1, 5), date(2025, 1, 1)],
        "discharge_date": [date(2025, 1, 4), date(2025, 1, 7), date(2025, 1, 1)],
        "received_at": [
            datetime(2025, 1, 5),
            datetime(2025, 1, 8),
            datetime(2025, 1, 2),
        ],
        "amount": [1000.0, 200.0, 400.0],
    }
)

InvoiceSchema.validate(invoices, cast=True)
[5]:
shape: (3, 5)
invoice_idadmission_datedischarge_datereceived_atamount
strdatedatedatetime[μs]decimal[*,0]
"001"2025-01-012025-01-042025-01-05 00:00:001000
"002"2025-01-052025-01-072025-01-08 00:00:00200
"003"2025-01-012025-01-012025-01-02 00:00:00400

If the data to validate contains invalid rows, dataframely will raise a RuleValidationError with a summary about the violated validation rules.

[6]:
# Raise during validation if there are invalid rows
invoices = pl.DataFrame(
    {
        "invoice_id": ["001", "002", "003"],
        "admission_date": [date(2025, 1, 1), date(2025, 1, 5), date(2025, 1, 1)],
        "discharge_date": [date(2025, 1, 4), date(2025, 1, 7), date(2025, 1, 1)],
        "received_at": [
            datetime(2025, 1, 5),
            datetime(2025, 1, 8),
            datetime(2025, 1, 2),
        ],
        "amount": [0.0, 200.0, 400.0],  # Invalid amount `0.0` here
    }
)

InvoiceSchema.validate(invoices, cast=True)
---------------------------------------------------------------------------
RuleValidationError                       Traceback (most recent call last)
Cell In[6], line 10
      1 # Raise during validation if there are invalid rows
      2 invoices = pl.DataFrame({
      3     "invoice_id": ["001", "002", "003"],
      4     "admission_date": [date(2025, 1, 1), date(2025, 1, 5), date(2025, 1, 1)],
   (...)      7     "amount": [0.0, 200.0, 400.0], # Invalid amount `0.0` here
      8 })
---> 10 InvoiceSchema.validate(invoices, cast=True)

File ~/workspace/dataframely/dataframely/schema.py:280, in Schema.validate(cls, df, cast)
    278 df_valid, failures = cls.filter(df, cast=cast)
    279 if len(failures) > 0:
--> 280     raise RuleValidationError(failures.counts())
    281 return df_valid

RuleValidationError: 1 rules failed validation:
 * Column 'amount' failed validation for 1 rules:
   - 'min_exclusive' failed for 1 rows

Soft-validation and validation failure introspection#

In a production pipeline, we typically do not want to raise an exception at run-time. dataframely provides the filter method to perform “soft-validation” which returns the rows that passed validation and an additional FailureInfo object to inspect invalid rows.

[7]:
good, failure = InvoiceSchema.filter(invoices, cast=True)
[8]:
# Inspect the reasons for the failed rows
failure.counts()
[8]:
{'amount|min_exclusive': 1}
[9]:
# Inspect the co-occurrences of validation failures
failure.cooccurrence_counts()
[9]:
{frozenset({'amount|min_exclusive'}): 1}
[10]:
# Get a data frame containing all failed rows
failure.invalid()
[10]:
shape: (1, 5)
invoice_idadmission_datedischarge_datereceived_atamount
strdatedatedatetime[μs]decimal[*,0]
"001"2025-01-012025-01-042025-01-05 00:00:000

Validating groups of data frames#

Oftentimes, data frames (or rather tables) are interdependent and proper data validation requires consideration of multiple tables. dataframely enables users to define “collections” for groups of data frames with validation rules on the collection level. To create a collection, we first introduce a second schema for diagnosis data frames.

[11]:
class DiagnosisSchema(dy.Schema):
    invoice_id = dy.String(primary_key=True)
    diagnosis_code = dy.String(primary_key=True, regex=r"[A-Z][0-9]{2,4}")
    is_main = dy.Bool(nullable=False)

    @dy.rule(group_by=["invoice_id"])
    def exactly_one_main_diagnosis() -> pl.Expr:
        return pl.col("is_main").sum() == 1

Note how we can also define validation rules on groups of rows using @dy.rule(group_by=[...]).

Schema inheritance#

What is still a bit inconvenient about this schema definition is that we have duplicated the shared primary key between InvoiceSchema and DiagnosisSchema. To this end, we can leverage schema inheritance in dataframely by introducing a common base schema.

[12]:
# Reduce redundancies in schemas by using schema inheritance.
# Here, we introduce a base schema for the shared primary key.
class InvoiceIdSchema(dy.Schema):
    invoice_id = dy.String(primary_key=True)


class InvoiceSchema(InvoiceIdSchema):
    admission_date = dy.Date(nullable=False)
    discharge_date = dy.Date(nullable=False)
    received_at = dy.Datetime(nullable=False)
    amount = dy.Decimal(nullable=False, min_exclusive=Decimal(0))

    @dy.rule()
    def discharge_after_admission() -> pl.Expr:
        return pl.col("discharge_date") >= pl.col("admission_date")

    @dy.rule()
    def received_at_after_discharge() -> pl.Expr:
        return pl.col("received_at").dt.date() >= pl.col("discharge_date")


class DiagnosisSchema(InvoiceIdSchema):
    diagnosis_code = dy.String(primary_key=True, regex=r"[A-Z][0-9]{2,4}")
    is_main = dy.Bool(nullable=False)

    @dy.rule(group_by=["invoice_id"])
    def exactly_one_main_diagnosis() -> pl.Expr:
        return pl.col("is_main").sum() == 1

Creating a dy.Collection#

To add the two schemas to a collection, we can create a new collection by subclassing dy.Collection.

[13]:
# Introduce a collection for groups of schema-validated data frames
class HospitalClaims(dy.Collection):
    invoices: dy.LazyFrame[InvoiceSchema]
    diagnoses: dy.LazyFrame[DiagnosisSchema]

Adding cross-dataframe validation rules to a collection#

To further enhance the collection, we can now add validation rules to the collection using the @dy.filter decorator. A filter receives a collection as input and must return a data frame like the following:

  • The columns must be a superset of the common primary keys across all members.

  • The rows must provide the primary keys which ought to be kept across the members. The filter results in the removal of rows which are lost as the result of inner-joining members onto the return value of this function.

[14]:
class HospitalClaims(dy.Collection):
    invoices: dy.LazyFrame[InvoiceSchema]
    diagnoses: dy.LazyFrame[DiagnosisSchema]

    @dy.filter()
    def at_least_one_diagnosis_per_invoice(self) -> pl.LazyFrame:
        return self.invoices.join(
            self.diagnoses.select(pl.col("invoice_id").unique()),
            on="invoice_id",
            how="inner",
        )

Validating a collection#

If we call validate on the collection, it will raise a validation exception if any of the input data frames does not satisfy its schema definition or the filters on this collection result in the removal of at least one row across any of the input data frames.

[15]:
invoices = pl.DataFrame(
    {
        "invoice_id": ["001", "002", "003"],
        "admission_date": [date(2025, 1, 1), date(2025, 1, 5), date(2025, 1, 1)],
        "discharge_date": [date(2025, 1, 4), date(2025, 1, 7), date(2025, 1, 1)],
        "received_at": [
            datetime(2025, 1, 5),
            datetime(2025, 1, 8),
            datetime(2025, 1, 2),
        ],
        "amount": [1000.0, 200.0, 400.0],
    }
)

diagnoses = pl.DataFrame(
    {
        "invoice_id": ["001", "001", "002"],
        "diagnosis_code": ["A123", "B456", "C789"],
        "is_main": [True, False, True],
    }
)

claims = HospitalClaims.validate(
    {"invoices": invoices, "diagnoses": diagnoses}, cast=True
)

# Aggregate diagnoses per invoice
print(
    claims.invoices.join(claims.diagnoses, on="invoice_id", how="inner")
    .group_by("invoice_id")
    .agg(
        pl.col("admission_date").first(),
        pl.col("discharge_date").first(),
        pl.col("received_at").first(),
        pl.col("amount").first(),
        pl.col("diagnosis_code"),
    )
    .collect()
)
---------------------------------------------------------------------------
MemberValidationError                     Traceback (most recent call last)
Cell In[15], line 23
      1 invoices = pl.DataFrame(
      2     {
      3         "invoice_id": ["001", "002", "003"],
   (...)     12     }
     13 )
     15 diagnoses = pl.DataFrame(
     16     {
     17         "invoice_id": ["001", "001", "002"],
   (...)     20     }
     21 )
---> 23 claims = HospitalClaims.validate(
     24     {"invoices": invoices, "diagnoses": diagnoses}, cast=True
     25 )
     27 # Aggregate diagnoses per invoice
     28 print(
     29     claims.invoices.join(claims.diagnoses, on="invoice_id", how="inner")
     30     .group_by("invoice_id")
   (...)     38     .collect()
     39 )

File ~/workspace/dataframely/dataframely/collection.py:82, in Collection.validate(cls, data, cast)
     80 out, failure = cls.filter(data, cast=cast)
     81 if any(len(fail) > 0 for fail in failure.values()):
---> 82     raise MemberValidationError(
     83         {
     84             name: RuleValidationError(fail.counts())
     85             for name, fail in failure.items()
     86         }
     87     )
     88 return out

MemberValidationError: 2 members failed validation:
 > Member 'invoices' failed validation:
   1 rules failed validation:
    - 'at_least_one_diagnosis_per_invoice' failed validation for 1 rows
 > Member 'diagnoses' failed validation:
   0 rules failed validation:

Note that collections can also be soft-validated using filter. The failure introspection is similar to schemas.