For nearly two years Airflow has been an integral piece of Curology’s data stack. Along the way we have adopted patterns for Airflow development that have greatly accelerated our workflow. In this article, we would like to share some of the learnings from our journey developing with and operating Airflow at Curology.
How we use Airflow
Before we dive in, it may be helpful to contextualize Airflow’s part in the Curology data story. When Airflow was introduced to our stack there were several third party services which essentially provided some form of “ETL-as-a-Service”—these provided ingress for key data like transactions or ad campaign metadata. As our needs grew, eventually these plug-and-play ETL services started to fall short of our requirements and we moved towards bringing these pipelines in-house.
Another reason to move pipelines in-house is to ensure reliability: both of data integrity and of the pipeline itself. SLAs tend to be more attainable vis-à-vis off-the-shelf solutions; faulty pipelines and downtime are directly actionable when owned by your team. And so as our stack has evolved Airflow has become increasingly important in this narrative, replacing ETL-as-a-Service providers.
In a nutshell, Airflow serves as a primary ingress plane for our most important data, pulling in data from vendors and persisting that data in our data warehousing systems. It’s important to note that Airflow is just responsible for moving the data into our warehouse or data lake and not much more: this is where we’ve found Airflow to really shine, especially in the context of an “ELT” paradigm.
As we have moved pipelines in-house we have begun to establish a standardized methodology for authoring these sorts of pipelines. Many of our pipelines are essentially connectors that talk to a vendor’s REST-like API. Such an integration is not particularly challenging, especially in the lingua franca of Airflow, Python.
However when we set out to build these integrations the first few passes were awkward and did not lend themselves well to modularity. To remedy this we looked to Airflow’s plugin system.
Admittedly this was no silver bullet and the initial pass was equally messy. Our first attempt lumped everything into a monolithic plugin which enabled some code re-use but made future additions unnecessarily coupled and difficult.
Better plugins or “connectors”
As projects progressed we eventually took a step back and thought more tactically about how we could leverage the Airflow framework effectively. We stumbled upon some prior art made available by Astronomer and feeling inspired refactored our monolith into platform-oriented “connectors”: Airflow plugins that encapsulate the logic required to interact with a vendor’s API.
Connectors are Airflow plugins that are oriented around a particular vendor or platform. For example we have a Facebook connector. These connectors are generally composed of at least two things:
- A hook which encapsulates the logic for interacting with the platform’s API,
- And one or more operators which map to a particular task in our DAGs.
Hooks tend to be as lightweight as possible: They are just a generic interface for talking to an API after all. Whereas operators tend to be more involved and less generic, encapsulating the specialized business logic of a particular task.
To illustrate this, let’s look at a simplified version of a Facebook connector. Our
plugins directory might look like this:
plugins/facebook_plugin/ ├── __init__.py ├── hooks │ ├── __init__.py │ └── facebook_graph_hook.py └── operators ├── __init__.py └── facebook_campaigns_to_s3_operator.py
Here we have a hook which manages communication with Facebook’s Graph API and a single operator which is responsible for persisting Facebook ad campaign metadata in S3. We can imagine what these implementations could look like. In order to do something useful with them we’ll need to pull them into a DAG of some kind. For instance:
from airflow.operators.facebook_plugin import ( FacebookCampaignHourlyStatsToS3Operator, FacebookCampaignsToS3Operator, ) ... with dag: extract_campaigns = FacebookCampaignsToS3Operator( task_id="extract-campaigns", ... )
Another benefit of authoring plugins in this way is testability.
One challenge of Airflow development is knowing that a particular DAG does what you expect it to do. While Airflow itself doesn’t necessarily offer many out-of-the-box solutions to this, Python provides exceptional facilities for automated testing. However applying these techniques is not always as straightforward as it might be with other frameworks.
The plugin pattern described above lends itself to fairly standard unit-style testing, with some caveats. For instance, we provide tests for all hooks and operators. We use pytest and pytest-vcr in concert to allow us to create tests against real API responses with reproducible results. The caveat here is these tests require a fair amount of deep mocking via unittest.mock, which can be opaque and difficult to reason about.
This is a simplified look at how our tests are laid out relative to our connectors:
plugins/facebook_plugin/ ├── __init__.py ├── hooks │ ├── __init__.py │ ├── cassettes │ │ ├── test_facebook_graph_hook_get.yaml │ │ └── test_facebook_graph_hook_post.yaml │ ├── facebook_graph_hook.py │ └── test_facebook_graph_hook.py └── operators ├── __init__.py ├── cassettes │ └── test_facebook_campaigns_to_s3_operator.yaml ├── facebook_campaigns_to_s3_operator.py └── test_facebook_campaigns_to_s3_operator.py
Cassettes are provided by the
pytest-vcr package and contain re-playable requests and responses which tests consume.
The goal with this style of test is to ensure that the critical codepath is being exercised and that the data we retrieve from invoking API methods or task execute methods meets an expected shape. This isn’t a replacement for on-line data integrity checks (these are also a part of DAG authorship though) but they are useful sanity checks.
One goal of moving away from ETL vendors has been to increase our confidence around the quality of the data we bring in our systems. To do so, we’ve established a small library of conventions that make on-line assertions about how data should look. These assertions can happen as the data is coming off the wire or after it’s landed in our warehouse or data lake.
A major focus for future work will be around building out a better test harness for integration-style tests, which are currently a relative blindspot. In order to enable this, we’ve begun work on a system for easily standing up self-contained environments. This will form the basis of end-to-end testing in the near-future.
The road ahead
Airflow continues to be an important layer of our data stack. That said, Airflow is a complex tool with many features and tunable parameters. What the Curology Platform Team has discovered is that by adopting some key patterns we are able to use Airflow effectively as compared to some of our earlier attempts with the framework.
We are continuing to evolve our understanding of effective Airflow patterns. In particular as we’ve moved to an S3-based data lake, we’ve discovered opportunities to further simplify our connectors. We are also exploring patterns for authoring Machine Learning workflows via Docker and Kubernetes. No doubt there will be additional discoveries along the way.
Stay tuned, more to come soon.
The Platform Team is looking for passionate data engineers to help build the data stack that empowers our internal business partners—everything from marketing automation to statistical modeling, ML, and NLP rests on the infrastructure our team is building. Please get in touch if this interests you!
We believe great skin should be a fact of life, not a luxury