DagsterDocs

Validating Pandas DataFrames with Dagster Types#

The dagster_pandas library provides the ability to perform data validation, emit summary statistics, and enable reliable dataframe serialization/deserialization. On top of this, the Dagster type system generates documentation of your dataframe constraints and makes it accessible in the Dagit UI.

Creating Dagster DataFrame Types#

To create a custom dagster_pandas type, use create_dagster_pandas_dataframe_type and provide a list of PandasColumn objects which specify column-level schema and constraints. For example, we can construct a custom dataframe type to represent a set of e-bike trips in the following way:

TripDataFrame = create_dagster_pandas_dataframe_type(
    name="TripDataFrame",
    columns=[
        PandasColumn.integer_column("bike_id", min_value=0),
        PandasColumn.categorical_column("color", categories={"red", "green", "blue"}),
        PandasColumn.datetime_column(
            "start_time", min_datetime=datetime(year=2020, month=2, day=10)
        ),
        PandasColumn.datetime_column("end_time", min_datetime=datetime(year=2020, month=2, day=10)),
        PandasColumn.string_column("station"),
        PandasColumn.exists("amount_paid"),
        PandasColumn.boolean_column("was_member"),
    ],
)

Once our custom data type is defined, we can use it as the type declaration for the inputs / outputs of our solid:

@solid(output_defs=[OutputDefinition(name="trip_dataframe", dagster_type=TripDataFrame)])
def load_trip_dataframe() -> DataFrame:
    return read_csv(
        script_relative_path("./ebike_trips.csv"),
        parse_dates=["start_time", "end_time"],
        date_parser=lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"),
    )

By passing in these PandasColumn objects, we are expressing the schema and constraints we expect our dataframes to follow when Dagster performs type checks for our solids. Moreover, if we go to the solid viewer, we can follow our schema documented in Dagit:

tutorial2.png

Dagster DataFrame Level Validation#

Now that we have a custom dataframe type that performs schema validation during a pipeline run, we can express dataframe level constraints (e.g number of rows, or columns).

To do this, we provide a list of dataframe constraints to create_dagster_pandas_dataframe_type; for example, using RowCountConstraint. More information on the available constraints can be found in the dagster_pandas API docs.

This looks like:

ShapeConstrainedTripDataFrame = create_dagster_pandas_dataframe_type(
    name="ShapeConstrainedTripDataFrame", dataframe_constraints=[RowCountConstraint(4)]
)

If we rerun the above example with this dataframe, nothing should change. However, if we pass in 100 to the row count constraint, we can watch our pipeline fail that type check.

Dagster DataFrame Summary Statistics#

Aside from constraint validation, create_dagster_pandas_dataframe_type also takes in a summary statistics function that emits EventMetadataEntry objects which are surfaced during pipeline runs. Since data systems seldom control the quality of the data they receive, it becomes important to monitor data as it flows through your systems. In complex pipelines, this can help debug and monitor data drift over time. Let's illustrate how this works in our example:

def compute_trip_dataframe_summary_statistics(dataframe):
    return {
        "min_start_time": min(dataframe["start_time"]).strftime("%Y-%m-%d"),
        "max_end_time": max(dataframe["end_time"]).strftime("%Y-%m-%d"),
        "num_unique_bikes": str(dataframe["bike_id"].nunique()),
        "n_rows": len(dataframe),
        "columns": str(dataframe.columns),
    }


SummaryStatsTripDataFrame = create_dagster_pandas_dataframe_type(
    name="SummaryStatsTripDataFrame", event_metadata_fn=compute_trip_dataframe_summary_statistics
)

Now if we run this pipeline in the Dagit playground:

tutorial1.png

Dagster DataFrame Custom Validation#

PandasColumn is user-pluggable with custom constraints. They can be constructed directly and passed a list of ColumnConstraint objects.

To tie this back to our example, let's say that we want to validate that the amount paid for a e-bike must be in 5 dollar increments because that is the price per mile rounded up. As a result, let's implement a DivisibleByFiveConstraint. To do this, all it needs is a markdown_description for Dagit which accepts and renders markdown syntax, an error_description for error logs, and a validation method which throws a ColumnConstraintViolationException if a row fails validation. This would look like the following:

class DivisibleByFiveConstraint(ColumnConstraint):
    def __init__(self):
        message = "Value must be divisible by 5"
        super(DivisibleByFiveConstraint, self).__init__(
            error_description=message, markdown_description=message
        )

    def validate(self, dataframe, column_name):
        rows_with_unexpected_buckets = dataframe[dataframe[column_name].apply(lambda x: x % 5 != 0)]
        if not rows_with_unexpected_buckets.empty:
            raise ColumnConstraintViolationException(
                constraint_name=self.name,
                constraint_description=self.error_description,
                column_name=column_name,
                offending_rows=rows_with_unexpected_buckets,
            )


CustomTripDataFrame = create_dagster_pandas_dataframe_type(
    name="CustomTripDataFrame",
    columns=[
        PandasColumn(
            "amount_paid",
            constraints=[ColumnDTypeInSetConstraint({"int64"}), DivisibleByFiveConstraint()],
        )
    ],
)