Managing dependencies between data pipelines in Apache Airflow & Prefect

If you ever built data pipelines for co-dependent business processes, you might have noticed that incorporating all of your company’s business logic into one single workflow does not work well and quickly turns into a maintenance nightmare. Many workflow scheduling systems let us manage dependencies within a single data pipeline but they don’t support us in managing dependencies between workflows.

A natural way of resolving this problem would be to split a large pipeline into many smaller ones and coordinate the dependencies between them in some parent-child relationship. However, there are many possible ways of addressing this problem and I want to share one simple approach that worked well for me. I hope it might help you to manage dependencies between your data pipelines.

How Airflow community tried to tackle this problem

Within the book about Apache Airflow [1] created by two data engineers from GoDataDriven, there is a chapter on managing dependencies. This is how they summarized the issue:

“Airflow manages dependencies between tasks within one single DAG, however it does not provide a mechanism for inter-DAG dependencies.”

SubDags

The general problem of organizing co-dependent data pipelines is well-known and one way of how people tried to address it, was to use the SubDag abstraction. Organizing workflows into SubDags initially seemed like a great idea, but quickly resulted in many problems:

  • one SubDag is interpreted as one node in a graph within the parent data pipeline, even though this child DAG may be comprised of many tasks
  • this means that each task from the child DAG runs sequentially one at a time, potentially leading to deadlocks in the scheduling processes.

Here is another quote [2] which nicely summarizes the discussion on the usefulness of SubDags:

Astronomer highly recommends staying away from SubDags.

ExternalTaskSensor

Another idea was to use a sensor within the parent DAG which regularly pokes for the execution state of a child DAG. This seems like a better idea than SubDags. However, it only works if the schedules between the parent DAG and the child DAG are aligned, which is a very strong assumption that in my case almost never was true. Imagine what will happen when you manually trigger this parent DAG? It will run forever until it times out because the child DAG does not run on the same schedule.

TriggerDagRunOperator

Another way of looking at it would be to trigger the child DAG from the parent DAG. I like this idea much more, because:

  • there is no risk of generating deadlocks
  • it will also run when the pipeline is triggered out-of-schedule, ex. through a manual trigger.

When I tried it in Airflow, though, I found that the current implementation of TriggerDagRunOperator works in a fire-and-forget way. This means that the parent DAG doesn’t wait until the triggered child DAG is finished before starting the next task! This is not what we want if we wish to manage dependencies between data pipelines.

To illustrate this, let’s look at the following pipeline:

Image by author

If we use TriggerDagRunOperator for the business logic layer in the parent DAG, it would trigger a DagRun for this child DAG, but then it would immediately start the data mart task before waiting until all bus_logic_ETL_x tasks in the child DAG are finished.

The way I solved it in Airflow

My solution was to:

  • add a Dummy task finish at the end of each child DAG
  • implement WaitForCompletion sensor, which checks in the Airflow metadata DB the state of the last DagRun of the child DAG. We identify this last DagRun of the child DAG as follows: it must start with "trig__", because every DagRun triggered by external DAG is named this way. Then, we sort those DagRuns by execution date in descending order and use LIMIT 1 to get the last one → this is exactly the DagRun we want to poke for.
  • within this sensor, we are poking for the state of the finish task within this DagRun. If it’s success, then the sensor finishes the task and can move on to the data mart task in the parent pipeline. But as long as it’s not a success, this means that either the task is still running or that it failed. In both cases, it means that we can’t move on to the data mart task.

This is the implementation of the sensor: link to Github gist

And this is how I used this sensor in the parent DAG:

How Prefect approached this problem

Prefect includes many useful abstractions that work out of the box. One of them is FlowRunTask, which includes a parameter wait=True that will have the same effect as my WaitForCompletion in Airflow without me having to implement any sensor performing periodic database-lookups. This will:

  • trigger the child-flow
  • wait for the completion of the child flow before moving on to the next task.

Master flow (i.e. Parent flow)

The entire implementation of the parent flow is as simple as that:

If we visualize this flow by running flow.visualize, we can see that the child flows are even visualized differently to easily distinguish between normal task vs. Flow triggered by FlowRunTask (or some other type of task such as ex. mapped tasks).

Child flows

I also created some very basic child flows so that we can run a full example and see the output in the UI. In the scripts below, you can see three child flows, each with three tasks:

  • staging_area flow
  • business_logic_layer flow
  • data_mart flow

Let’s now run the full example by registering the parent flow (MasterFlow) and the three child flows and confirm that the dependencies between flows work as expected.

Prefect Cloud UI: Child flows triggered by Parent Flow (MasterFlow) in the order that we specified — Image by author
Prefect Cloud UI: Parent Flow (MasterFlow) finished last as expected because it waited for the completion of child flows— Image by author

We can confirm that the dependencies have been respected and data mart started running only after both staging area and business logic layer flows were completed.

If something would fail, we could easily navigate to the respective child flow to inspect the logs:

Prefect Cloud UI: Inspecting the logs from the child flow — Image by author

How to schedule co-dependent data pipelines?

In order to properly structure and schedule those parent-child dependencies, we can use “Master” data pipelines (one could also call it “parent data pipeline” or “coordinating data pipeline”), as shown in the example above, and schedule only those parent pipelines. Since child flows are triggered directly from the parent flow, they don’t need to be scheduled.

Let’s show it on a concrete example. Let’s assume that currently, those child flows have the following execution runtimes:

  • staging_area: 2.5 hours
  • business_logic: 1.5 hours
  • data_mart: 2 hours.

This means that if we schedule the MasterFlow to run at 2 AM at night, it will start by running staging_area at 2 AM and then the business_logic will start at 4:30 AM and the data_mart will start at 6 AM and the entire ETL will be finished by 8 AM.

To implement this, we simply add a schedule on lines 6 and 9 to the MasterFlow as follows:

Note: this schedule works in UTC. To use a time zone specific schedule, use:

What are the benefits of this MasterFlow approach?

Let’s say that you want to modify the business logic layer and add one more task to it: by using this approach, you can make your modifications without any impact on the staging area and data mart workflows. You don’t need to reschedule anything and you won’t get stuck in any deadlocks.

Which implementation is easier from the user's perspective?

Overall, I found the Prefect implementation easier, as I didn’t have to write any custom sensor logic to properly reflect the dependencies between data pipelines and it works out of the box. It shows a lot of foresight in the design of those abstractions.

Prefect community was also discussing SubFlows in this Github issue [3]— follow along if you are interested.

Conclusion

In this article, we looked at how to manage dependencies between data pipelines. We demonstrated how this problem was historically approached by Airflow.

Then, I presented a simple solution to this problem, which is based on a principle: create a master data pipeline which schedules and triggers the child workflows, while ensuring that when triggered, the master data pipeline waits until the child workflow is completed before triggering the next one.

The “trigger + wait” paradigm seems to be much safer than aligning the co-dependent pipelines on their schedules and let us manage the dependencies between workflows in a simple way.

Thank you for reading!

References:

[1] “Data Pipelines with Apache Airflow” — Bas P. Harenslak and Julian Rutger de Ruiter

[2] Astronomer blog: https://www.astronomer.io/guides/subdags/

[3] Prefect community about sub-flows: https://github.com/PrefectHQ/prefect/issues/1745

Photo by Kelly Sikkema on Unsplash

Image contains an affiliate link. If you join Datacamp this way, you're not only learning a ton of valuable information, but you're also supporting this blog.