Implementing data contracts: schema validation
Become a validation hero using msgspec and ChatGPT
There is something worse than a data pipeline failing. A data pipeline failing because of unexpected upstream changes.
What changed? Who changed that? What are we missing? Did we always receive the same data? We start asking ourselves many questions, running SQL queries, checking the last merged pull requests, and scrolling through the logs.
Then we find that. The data source we have always consumed is missing a column (or attribute if it’s an API). I hope I was making all this up, but it’s a true story. And happened more times than I would expect.
Nowadays, we “de-structure” data ingestion, use schema-less data storages, and deal with required columns downstream. Patterns like Write-Audit-Publish can store the information first and deal with the structure and validations later. I like all those technical solutions, having fewer constraints and fewer failures. Data isn’t always perfect.
The problem comes when we need a column or attribute to calculate an important metric, or it’s used as a crucial part of some business rule. We need structure, we have constraints, and that’s fine.
Data contracts to the rescue 🦸
I recently started reading about data contracts and how that is related to data quality.
has excellent articles and talks about the topic, and I highly recommend his Data Council talk:Data contracts are agreements between data producers and consumers about the schema, semantics, distribution, and enforced data policies — Chad Sanderson (around minute 18 in the video)
It’s an agreement, a shared expectation about how the data will be, and a vehicle to promote collaboration and communication.
I added an emphasis in the last two because if that doesn’t happen, it’s almost certain that as data consumers we will have a broken pipeline, a report with missing data, or a dashboard with wrong values on important KPIs.
From the data producer perspective, data contracts will also give us the peace of mind that changing a data asset won’t impact someone else downstream.
Data contracts are essential for data quality assurance. They will save a lot of hours of debugging, pipeline fixing, and backfilling. And most importantly, they will prevent the garbage from reaching the decision-makers, clients, partners, and all those who make an impact and are fundamental to the business.
Even Titans stumble 🥺
Indeed, data contracts are great, but what happens with third-party data when we can barely “talk” with the data producer?
Where I work, we consume a lot of third-party data. Considering only one data provider category, we have 100+ APIs, with ten vendors and four resources per API. That gives us at least 40 different schemas for that category of providers.
Those APIs provide a metadata endpoint that we use to define the structure and some validations. But there’s a problem. They usually send data that violates the metadata specification, and I can’t do anything with that (apart from sending an email they will ignore), but prepare the system to identify and fix those problems.
The schema shield 🛡️
One part of the data contracts is the schema specification. In my experience, a schema change causes many problems, workarounds, and silent failures (the worst).
Sometimes we don’t have the chance to define or enforce a contract with third-party data providers. We accept what is given, capture changes and act accordingly. Part of “acting accordingly” could mean creating a validation step, a shield that prevents downstream harm.
Part of my work is pipeline development, and I found a couple of libraries to validate an API schema. I was looking for something fast, easy to integrate, hopefully with few dependencies.
The libraries I considered were msgspec and Pydantic. I knew about pydantic
because of fastapi
and the long list of packages that use it, but I never used it directly. Both libraries provide schema validation and serialization, allowing you to define schemas using Python classes and type annotations.
I used msgspec
because it is lightweight, has no external dependencies, and their benchmarks say they are the fastest Python library (~80x faster than pydantic
). I didn’t run them, but they have the source code if you want to try.
Validating schema with msgspec
msgspec
lets you describe your schema via type annotations and will efficiently validate messages against this schema while decoding.
I will use {JSON} Placeholder to test with a publicly available API. From all the available endpoints, I selected /posts/1/comments
which returns a list of dictionaries.
Schema definition:
import msgspec
class Comment(msgspec.Struct):
postId: int
id: int
name: str
email: str
body: str
The validation is implicit during the decoding, so it’s like using json.loads
with batteries included. Here is the complete example using the specified endpoint:
import msgspec
import requests
from typing import List
class Comment(msgspec.Struct):
postId: int
id: int
name: str
email: str
body: str
def fetch_comments() -> List[Comment]:
response = requests.get("https://jsonplaceholder.typicode.com/posts/1/comments")
response.raise_for_status() # Check if the request was successful
# Decode the response
comments = msgspec.json.decode(response.content, type=List[Comment])
return comments
# Fetch and print comments
comments = fetch_comments()
for comment in comments:
print(comment.id, "-", comment.body)
Pro features 🧞
Some advanced features can be used to enhance the schema validation:
Constraints: define type-specific validations. [Reference]
Schema evolution: use different schema versions with different response structures. [Reference]
“Lax” mode: unsafe implicit conversions between types. [Reference]
Constraints
Define constraints using typing.Annotated
(introduced in Python 3.9), and adding a msgspec.Meta
annotation. Let’s create an annotation for the ids that must be positive integers:
import msgspec
from typing import Annotated
Id = Annotated[int, msgspec.Meta(gt=0)]
class Comment(msgspec.Struct):
postId: Id
id: Id
name: str
email: str
body: str
Constraints can be combined to enforce complex requirements. Here I added a simple email validation:
import msgspec
from typing import Annotated
Id = Annotated[int, msgspec.Meta(gt=0)]
Email = Annotated[
str, Meta(min_length=5, max_length=100, pattern="[^@]+@[^@]+\\.[^@]+")
]
class Comment(msgspec.Struct):
postId: Id
id: Id
name: str
email: Email
body: str
Bonus track: use ChatGPT to define the classes 🤖
I’m not going to lie, ChatGPT generated most of the code that I used. Since ChatGPT is a great tool to parse structured data and is also good at coding, I allowed it to define the structure classes showing it a sample.
Prompt:
I have the following JSON response from an API endpoint, and I need to define a set of Python classes with type annotations that could serve to validate the API response schema.
The classes must be defined following msgspec specification (similar to pydantic), which derives from "msgspec.Struct". You can use "import msgspec" on top of the Python file.
Can you define them for me?
<json example here>
Result:
Final thoughts 💡
We discussed the importance of data contracts for data quality and how difficult is to have one with third-party data providers.
We focused on one specific aspect of data contracts, schema validation, and presented a simple use case using msgspec
Python library to validate an incoming API result.
Thanks for reading! Please comment if you have any questions or want to know more about the presented concepts, and share if you find this content useful.