The essential role of automated tests in data pipelines
Advanced pytest features for modern data practitioners
In the old days of data engineering, we were far from “normal” software engineers. We were using SSIS or Pentaho, and things like version control or testing weren’t part of our toolbox. Luckily, things have changed, and we incorporated a lot of concepts from software engineering
If there is something we struggle with as engineers, it is testing. We know it is important to write unit and integration tests, think about test cases, and improve coverage, but in the end we don’t do anything (or barely the minimum).
In the following lines, I will discuss why having automated tests for our data pipelines is crucial, what is different about testing data, and present an example with pytest
, highlighting valuable features that could simplify our testing process.
Early in my career, as an on-call data engineer at Lyft, I accidentally introduced a breaking code change while attempting to ship a hotfix at 4 AM to a SQL job that computed tables for core business analytics. A seemingly small change in filtering logic ended up corrupting data for all downstream pipelines and breaking dashboards for the entire company.
- Source
We can relate to Gleb's story (I can, at least). At some point in our career, we did something similar. The change seemed easy; we knew the pipeline, and the ad-hoc queries we ran to test our idea were okay, but somehow we didn’t consider a border case that broke something.
Everything would have been easier if we had something to verify our changes, understand if those affect downstream processes, and warn us. It’s called testing.
What’s the problem with testing?
Testing data is way more complicated than testing “normal” code. We must deal with schema changes, missing columns, null values, duplicates, values out of range, and the code orchestrating extraction, transformation, and loading. And I didn’t even mention the state of the data assets we interact with, which could also have inconsistencies and missing data.
Does that mean that we don’t need to build them? No. We face considerable disadvantages when we don’t have automated tests:
It’s harder to determine the impact of our changes. As we read in Gleb’s story, small and seemingly innocent changes can corrupt valuable downstream data assets.
Bugs can remain undetected for some time. Sometimes, the bug only applies to a subset of the data, or it’s hard to detect them looking at aggregated data, making the problem worse. The longer it takes to detect it, the harder it is to fix the consequences. In a talk from Jacquline Bilston from Yelp at DATA+AI Summit 2022, she mentioned that it took five days to find a critical bug and three days to fix it.
Errors have a high cost. If we introduce a bug in a data pipeline, rolling back the change and fixing the problem is usually expensive. We spend valuable time analyzing the root cause, the wrong data propagates to multiple data assets, and backfills are painful (even when pipelines are well-designed).
If we are now convinced to start testing our data pipelines, we will need to face these challenges that are unique to data testing:
Data in testing environments is way different than the data in production. Not only velocity (that maybe doesn’t matter if you’re not testing a streaming pipeline) or volume, but variety. We can have a subset of production data, but it will never be identical. There will be some nuances that aren’t captured. In addition to that, since we use that environment to test, we have a high chance that the data is corrupted or incomplete.
Setting up (and maintaining) the proper testing environment is costly. We might need to create a separate cluster, copy a subset of production data (or create fake data), run scripts to generate the state we want and think about the cases we want to test. Moving data in the cloud could be expensive, so don’t take that easy.
We have tools that can help with those challenges. We can create a “fork” of our production data using lakefs or Nessie (if using data lakes) or create fake data using Faker or Mockaroo (if you prefer a UI), but we still need to think about the test cases, build the infrastructure around the testing environment, and understand (and code) the nuances that could happen during the pipeline execution.
Solving the problem with testing
No, the title isn’t an error. I presented the challenges, but the disadvantages are still there. We still need to test our pipelines, so the solution to the “testing problem” is testing (I’m sorry).
There are different types of testing in the context of data pipelines, which I combined from software engineering and ideas from this article:
Unit tests: Validate each logical unit or functions that are part of the ETL process. If the pipeline consists of a group of transformations, those can be tested separately using a set of input values and expected output.
Contract tests: Applicable for assets consumed in downstream processes. I already presented some concepts in my previous post, but the idea is to test the items from the contract: schema, semantics, references, distribution, and freshness (SLAs).
Data quality tests: Audit the data stored in a data asset to check for accuracy, consistency, completeness, uniqueness, and timeliness.
Integration tests: The flow between different data assets is correct, and there are no communication, interaction, or compatibility problems.
Performance tests: Assesses the resource utilization and scalability of the pipeline. This is crucial for high-volume data pipelines to meet the required SLAs.
End-to-end tests: Test the data pipeline as a whole, from the source to the target or output. This could be categorized as “black box” testing because there is no need to know the internal structure of the pipeline, but only the expected output given the input.
There are many options, and each one has its challenges. For the rest of the post, I will focus on end-to-end testing with pytest
, highlighting some features that will be useful in our journey of testing data pipelines.
Data pipeline example
We will use the following data pipeline: a simple ETL for product reviews.
API: REST service providing customer reviews.
Extract: HTTP client that extracts the reviews using a timestamp filter (reviews created from that timestamp).
Transformation: Flatten the structure to aggregate the engagements of the review (other users marked them as helpful or reported it), remove engagements from “trolls”, and create the entities to populate the data assets.
Load: Update the engagement database (relational DB) and insert product reviews in an object storage.
Here is an example of a product review:
{
"user_id": "U34567",
"product_id": "P22334",
"product_name": "SmartCool Air Conditioner",
"title": "Confusing controls but works well",
"review": "It's hard to figure out the controls initially. But once you get the hang of it, it cools the room quickly.",
"stars": 3,
"timestamp": "2023-08-24T09:15:00Z",
"helpful": [
{"user_id": "U67890", "timestamp": "2023-08-24T11:10:00Z"},
{"user_id": "U11111", "timestamp": "2023-08-24T12:05:00Z"}
],
"reports": [
{"user_id": "U23456", "timestamp": "2023-08-24T13:00:00Z"},
{"user_id": "U22222", "timestamp": "2023-08-24T14:30:00Z"},
{"user_id": "U88888", "timestamp": "2023-08-24T18:45:00Z"}
]
}
By the way, I didn’t write the example. I used the following prompt with GPT-4:
I'm preparing an ETL that consists of the analysis of product reviews. The reviews are provided in a REST API that includes "user_id", "product_id", "product_name", "title", "review", "stars", "timestamp", and two lists: "helpful" and "reports". The two lists contain the "user_id" and the user's "timestamp" engaging with the review as helpful or the report.
The reviews can have zero helpful marks and zero reports, but some will have up to 5 of them (usually, all are in the helpful or report side, not both).
Can you mock 4 product reviews with different scenarios? The "user_id" in the "helpful" and "report" lists must be other than the ones that made the review.
Here is a simple implementation in Python (yes, I asked ChatGPT to generate it):
def print_function_name(func):
def wrapper(*args, **kwargs):
print(f"Executing function: {func.__name__}")
return func(*args, **kwargs)
return wrapper
class ETLProcess:
# Extraction related functions
@print_function_name
def extract_reviews(self, timestamp):
"""
Extracts reviews using the timestamp filter.
"""
pass
# Transformation related functions
@print_function_name
def flatten_reviews(self, reviews):
"""
Flattens the reviews data structure and
aggregates the engagements (helpful and reports).
"""
pass
@print_function_name
def identify_trolls(self, reviews):
"""
Queries the engagement table using the user_id from
the engagements and identifies the trolls
in the extracted reviews.
"""
pass
@print_function_name
def filter_trolls(self, reviews, trolls):
"""
Removes engagements from trolls.
A troll is a user who reports 5-star reviews
or any review with more than 5 helpful marks.
"""
pass
@print_function_name
def create_entities_for_data_assets(self, reviews):
"""
Transforms reviews into entities to populate the data assets.
"""
pass
# Load related functions
@print_function_name
def update_trolls_database(self, trolls):
"""
Updates the relational database with identified trolls.
"""
pass
@print_function_name
def load_reviews_to_object_storage(self, reviews):
"""
Inserts the reviews into an object storage.
"""
pass
# Main ETL process
def run_etl(self, timestamp):
"""
Main ETL function that runs the extraction, transformation,
and load processes in sequence.
"""
# Extract
extracted_reviews = self.extract_reviews(timestamp)
# Transform
flat_reviews = self.flatten_reviews(extracted_reviews)
trolls = self.identify_trolls(extracted_reviews)
non_troll_reviews = self.filter_trolls(flat_reviews, trolls)
entities = self.create_entities_for_data_assets(non_troll_reviews)
# Load
self.update_trolls_database(trolls)
self.load_reviews_to_object_storage(entities)
etl = ETLProcess()
start_timestamp = "2023-08-01T00:00:00Z" # Example timestamp
etl.run_etl(start_timestamp)
Testing with pytest
pytest
is a popular testing framework for Python that allows us to write simple, scalable, and efficient tests. It provides a lot of features that are relevant for testing data pipelines, such as:
Test discovery. Automatically finds and runs all the tests in the project, which makes it easy to organize and maintain the test suite.
Fixtures. Functions that run before the tests that can be used to set up the test environment, create test data, and provide reusable objects.
Parameterized tests. Run the same test with different input data, which is helpful for testing data pipelines that process large volumes of data.
Mocking. Technique in which certain parts of the code are replaced with fake objects during testing.
Assertions. Test the correctness of the code with a wide range of assertions, such as
assert
,assertEqual
,assertAlmostEqual
, and more.
With pytest
, we can write end-to-end tests that simulate the entire pipeline, from data ingestion to output.
To show the most relevant features, I will build an end-to-end test focusing on testing the transformations.
Test folder structure
This could be the folder structure:
etl_project
├── pyproject.toml
├── README.md
├── etl_project
│ └── __init__.py
│ └── etl_process.py
└── tests
└── __init__.py
└── test_extraction.py
└── test_transformation.py
└── test_load.py
We will separate each logic group into a different file. All the tests can be written in one file, but it is a good practice to separate them. I recommend separating them further if the transformation has different components, but we can have a generic transformation test file if we want to build an end-to-end test.
The test files will be discovered automatically when you run pytest
. This article has a very good list of the most used command line flags.
Fixtures
Fixtures can be used to set up the test environment, create test data or reusable objects.
Let’s set up the environment: create the connection to the test database, and add some “trolls” to have something to filter in the transformation step.
import os
import psycopg2
import pytest
@pytest.fixture(scope="session")
def environment_setup():
# Database connection
test_conn_url = os.environ.get("TEST_RELATIONAL_DB")
connection = psycopg2.connect(test_conn_url)
cursor = connection.cursor()
# Setup code: Create table, insert rows
cursor.execute("""
CREATE TABLE test.engagement
(LIKE public.engagement INCLUDING ALL);
""")
cursor.execute("""
INSERT INTO test.engagement
VALUES
('U22222', 10, 0, 10),
('U88888', 5, 1, 4);
""")
connection.commit()
yield cursor # This returns control to the test function
# Teardown code: Cleanup after tests
cursor.execute("DROP TABLE test.engagement;")
connection.commit()
cursor.close()
connection.close()
After the setup, we returned the control to the test function using yield
to finally remove the table we created and close the connection. In this case, we defined the scope as “session” only to be invoked once per test session (because it requires network access and is “expensive” to create). You can read more about scope options here.
From pytest
documentation:
At a basic level, test functions request fixtures they require by declaring them as arguments.
When
pytest
runs a test, it looks at the parameters in that test function’s signature and then searches for fixtures that have the same names as those parameters. Oncepytest
finds them, it runs those fixtures, captures what they returned (if anything), and passes those objects into the test function as arguments.
If we want to create a test that will use the environment setup, we need to include the fixture name as a function parameter.
def test_transformations(environment_setup):
# We can use the cursor here using the environment_setup variable
cursor = environment_setup
# Test code here
cursor.execute("...")
Pro tip: We can automatically request a fixture by passing in autouse=True
to the fixture’s decorator. This can avoid a lot of redundant requests from different test functions.
We can also use fixtures to create test data (using the example from the previous section). This could be useful if we want to perform unit tests, for example, to the flatten_reviews
function.
import pytest
@pytest.fixture(scope="module")
def api_response():
# Here we can use Faker if we want
return [
{
"user_id": "U34567",
"product_id": "P22334",
"product_name": "SmartCool Air Conditioner",
"title": "Confusing controls but works well",
"review": "It's hard to figure out the controls initially. But once you get the hang of it, it cools the room quickly.",
"stars": 3,
"timestamp": "2023-08-24T09:15:00Z",
"helpful": [],
"reports": []
}
]
Parameterized tests
We can run the same test with different input data. It could be helpful if we want to create different states of the database in an end-to-end testing or unit test one function using different input parameters.
Here is an example of how to use parametrized tests:
import pytest
from etl_project.etl_process import ETLProcess
# Sample data for testing
sample_reviews = [
{
"user_id": "U34567",
"product_id": "P22334",
"stars": 3,
"helpful": [
{"user_id": "U67890", "timestamp": "2023-08-24T11:10:00Z"},
{"user_id": "U11111", "timestamp": "2023-08-24T12:05:00Z"},
],
"reports": [
{"user_id": "U23456", "timestamp": "2023-08-24T13:00:00Z"},
{"user_id": "U22222", "timestamp": "2023-08-24T14:30:00Z"},
{"user_id": "U88888", "timestamp": "2023-08-24T18:45:00Z"},
]
},
{
"user_id": "U67890",
"product_id": "P45678",
"stars": 5,
"timestamp": "2023-08-27T15:20:00Z",
"helpful": [
{"user_id": "U11111", "timestamp": "2023-08-27T19:10:00Z"},
{"user_id": "U22222", "timestamp": "2023-08-27T20:15:00Z"},
],
"reports": [
{"user_id": "U23456", "timestamp": "2023-08-27T17:05:00Z"},
]
}
]
@pytest.mark.parametrize(
"reviews, trolls, expected_reports_length",
[
(sample_reviews, ["U23456"], [2, 0]),
(sample_reviews[1:2], [], [1]) # No trolls, should return all reports
]
)
def test_filter_trolls(reviews, trolls, expected_reports_length):
etl = ETLProcess()
filtered_reviews = etl.filter_trolls(reviews, trolls)
reports_length = [len(r["reports"]) for r in filtered_reviews]
assert reports_length == expected_reports_length
The first argument to pytest.mark.parametrize()
is a comma-delimited string of parameter names that will be replicated in the test function parameters. The second argument is a list
of tuples or single values representing the parameter value(s).
Mocking
Here we replace certain parts of the code with fake objects during testing. For example, if we don’t want to test the object storage connection, we can mock some function to save the file to a local folder instead of uploading it to the cloud.
In our case, if we want to create an end-to-end test but prevent the load_reviews_to_object_storage
function from uploading the reviews to the cloud, we can do something like this:
import json
import pytest
# Mock function that will save reviews to disk
def mock_load_reviews_to_object_storage(self, reviews):
with open("mock_reviews_output.json", "w") as file:
json.dump(reviews, file, indent=4)
print("Saved reviews to mock_reviews_output.json")
def test_run_etl(monkeypatch):
# Use monkeypatch to temporarily use the mock function
monkeypatch.setattr(
ETLProcess,
"load_reviews_to_object_storage",
mock_load_reviews_to_object_storage,
)
etl = ETLProcess()
start_timestamp = "2023-08-01T00:00:00Z" # Example timestamp
etl.run_etl(start_timestamp)
# Add assertions or any other verification logic here
with open("mock_reviews_output.json", "r") as file:
reviews = json.load(file)
assert isinstance(reviews, list) # example assertion
When you run this test using pytest
, the load_reviews_to_object_storage
function of the ETLProcess
class will be replaced with mock_load_reviews_to_object_storage
only during the test's execution. The actual function remains unaltered outside of this testing context.
Conclusion
We discussed why testing data pipelines is a crucial tool for the modern data engineer. Testing data pipelines can be challenging, but tools like pytest
can make it much easier.
End-to-end testing with pytest
allows us to simulate the entire pipeline, from ingestion to loading. Using fixtures, parameterized tests, and mocking, we can create a comprehensive and efficient testing suite to ensure our data pipelines' correctness.
It takes some time to build the tests and maintain them, but it’s worth the effort. Let’s confidently push those changes by having the right assets to ensure data quality.