TaskFlow API in Apache Airflow 2.0 — Should You Use It?

TaskFlow API in Apache Airflow 2.0 — Should You Use It?

Think twice before redesigning your Airflow data pipelines

TaskFlow API is a feature that promises data sharing functionality and a simple interface for building data pipelines in Apache Airflow 2.0. It should allow the end-users to write Python code rather than Airflow code. Apart from TaskFlow, there is a TaskGroup functionality that allows a visual grouping of your data pipeline’s components. After reviewing those features, I wasn’t sure whether I should include them in the strengths or weaknesses of the new Airflow release. I ended up writing a full separate article to explain why you should think critically about your data engineering problems and whether they can be solved with those abstractions.

Note: This article is not meant as a critique of TaskFlow and TaskGroup features, but rather as a guide to use them for the suitable use cases.

1. The problem with sharing data between tasks

When designing data pipelines, we need to make a decision between:

  1. whether we want to keep the tasks atomic, i.e., using small individual components,
  2. or whether we design it as a single script.

We would then use some workflow management solution to schedule this data pipeline. With the first approach, we might end up writing a workflow with several individual tasks that may be, in the simplest form, defined as follows:

In contrast, with the second approach, we end up treating the whole ETL as a single task that can be scheduled by executing the workflow as a Bash command (ideally in an isolated container environment): python my_etl.py.

You may notice that with the second approach, you have no visibility into the actual components of a workflow, and if it fails, you don’t immediately know what was the root cause of the problem until you dive deeper into the logs. In contrast, to implement the first approach, you must be able to pass data between tasks in some way, either directly or by dumping it to some temporary location such as S3.

So far, Airflow was encouraging the second approach since sharing data between tasks using XComs required extra work. With the help of TaskFlow, Airflow 2.0 offers a feature that abstracts away pushing and pulling XCom values, thereby promising sharing data between tasks in an easy and intuitive way. Let’s look at it more closely.

2. How Airflow 2.0 tries to solve the problem of sharing data between tasks

Airflow 2.0 provides a decorator @task that internally  transforms any Python function into a PythonOperator. Tasks created this way can share data with each other. If you hear about this feature for the first time, it seems almost too good to be true:

  • new Airflow users no longer have to learn Airflow’s specific operators to build their data pipelines,
  • you can finally pass data from one task to another without having to write any tedious XComs logic.

However, if we look at this feature more closely, the TaskFlow abstraction leaves more questions than it answers.

1. How does the data sharing functionality work under the hood?

It leverages XComs, i.e., the “cross-communication” abstraction created to exchange only small amounts of metadata between tasks. This data gets persisted to a relational database with no TTL (Time To Live) or cleanup logic. You may think that this shouldn’t be a problem since Airflow currently supports specifying a custom XCom backend (such as S3 or GCS) by means of a specific backend class. Still, with the default implementation of TaskFlow, you are storing all return values in the metadata database.

You probably can imagine how this may backfire when an inexperienced user could start passing large objects between the tasks and quickly reach the storage capacity of the database without even knowing that he or she is storing anything in the database, since TaskFlow abstracts it away.

Overall, there is nothing wrong per se with storing XComs values in the database, as long as we:

  • ensure that all returned objects can be stored in a relational database (i.e., those return values are serializable and don’t exceed the data type limits — ex. for Postgres, it’s a 1GB BLOB),
  • ensure that the process or user who is doing that can be trusted (i.e., authorized to store arbitrary BLOBs to the database),
  • implement some regular process to remove the old entries automatically,
  • monitor the database storage capacity.

2. Can any Python object be used as a return value?

Quick answer: no. Only JSON-serializable objects can be, by default, leveraged with the TaskFlow syntax. This means that if you were hoping to pass a Pandas dataframe between tasks, you need to use the proper abstraction for that. According to Polidea [2], you can solve it by implementing a serialize and deserialize methods within your XCom backend class. The downside (at the time of writing): you need to implement it yourself. The upside: once you’ve implemented that properly, your TaskFlow tasks can potentially return and share any Python object you want, as long as your (de)serialization methods allow that.

3. Can you mix tasks defined with the TaskFlow syntax with the traditional Airflow operators?

This seems to be possible. You would have to validate the return values and watch out how you define the order of tasks, i.e., tasks that pass data between each other via TaskFlow syntax can have their dependencies simply defined by sharing the return values from the function calls, such as:

Then, to combine those “decorator” tasks with tasks that leverage Airflow’s operators, we could use the set_upstream command.

The DAG below demonstrates an example where we can define the ETL logic in a natural “Pythonic” way using simple (decorated) functions while making sure that we load the data in the end to S3. Once that’s finished, we could use a standard Airflow operator to load the same data from S3 to Redshift.

This is how it would look like in the UI:

Mixing the tasks defined with the TaskFlow and operator syntax — image by author

4. What happens when you want to use specific Python packages with the TaskFlow or if you don’t know in advance the size of the data you share between tasks?

At this point, we reach the true limits of the newly introduced functionality. What happens if DAG “A” needs a different Python package version than DAG “B”? Or what happens if the returned XCom value couldn’t be pushed and written to the database? And what if you don’t know in advance whether the data you will be passing between tasks fits into a 1GB BLOB object in the database? The current implementation doesn’t seem to be working well in those scenarios. The implications of that are covered in the last section of this article.

3. TaskGroup

Apart from the TaskFlow API, Airflow 2.0 provides an abstraction that allows treating a group of tasks as a single task within a DAG. The implementation of the TaskGroup was motivated by the inefficiencies and bugs that many data engineers experienced when using SubDags.

Demo: Combining TaskFlow with a TaskGroup abstraction

You can combine the TaskFlow and TaskGroup abstractions to build your data pipelines in a quite convenient and easy way. To demonstrate how TaskFlow can be combined with TaskGroup, have a look at the example DAG below.

The graphical representation of this workflow:

TaskGroup allows us to expand and collapse any group of tasks with a single click:

Wow, that looks great! Can I use it for everything?

Not so fast. Even though the task grouping looks great and shiny in the UI, it may not necessarily be the best idea to combine all related data pipelines into one “Monster-DAG”, where we would combine everything and hope that the “collapse” button gives us enough isolation between the individual subcomponents.

A TaskGroup provides only a visual grouping of tasks.

Many data engineers wish to accomplish a separation of concerns in their data pipeline design. For instance, we may need various data sources from several different systems to eventually build a data mart “Sales”. This might involve data from various marketing APIs, ERP systems, shop systems, and many more. And we don’t necessarily want to include all (possibly hundreds) steps leading to this data mart into a single data pipeline, as this monolithic “monster” pipeline approach could potentially lead to maintenance hell and would make it impossible to run only single subcomponents out of schedule when needed.

It’s beneficial to treat data pipelines related to specific business processes as lego bricks — individual components that can be written and maintained separately by different engineers and then combined into a Master (parent) data pipeline so that the individual small components are triggered in the correct order, and thus, to ensure that data dependencies are met.

Why are such data dependencies important? You don’t want to start running ETL for business logic tables until all required staging area steps (raw data) have been finished successfully. With data silos that many companies face these days, already extracting and ingesting raw data for a single use-case from all relevant sources can be considered a complex workflow on its own.

My opinionated perspective on TaskGroup

I wouldn’t consider TaskGroup a final solution that will make data engineers stop thinking about better ways to decouple their workflows from one another. It may look nice in the UI, but only grouping tasks visually does not solve the problem regarding the separation of concerns between individual data pipelines. What if you want to run only a specific TaskGroup out of schedule? Afaik, it doesn’t work — you would have to run the entire DAG.

One solution that was recommended by Airflow PMC member, Jarek Potiuk, was to build the single components according to the DRY principle and import them into several DAGs when needed. For instance, if you have a task exporting Google Ads data, you could build it once and use it in both: 1) in a standalone DAG, and 2)  in a “Data Mart Sales” DAG for which Google Ads is one of its dependencies. This gives you the advantage of avoiding code duplication and being able to trigger this ETL both independently and as part of a larger data pipeline. The downside is that you are duplicating workflow logic. With this solution,  you no longer have the metadata about the execution of a specific task in one place. Additionally, if this duplicated workflow logic doesn’t follow the DRY principle in the code, you have an even bigger problem as maintaining it becomes challenging (potential conflicts if something was changed in the standalone version, but not in the larger pipeline such as the “Data Mart” DAG).

I discussed this problem and possible solutions in more detail in the following article:

Managing dependencies between data pipelines in Apache Airflow & Prefect
Many workflow schedulers let us manage dependencies within a single data pipeline. But what if you have dependencies…towardsdatascience.com

You can find my discussion about that with Jarek Potiuk in the comment section within the above-linked article.

4. The limitations of TaskFlow API

Missing package dependency management

The TaskFlow abstraction can only work if everybody agrees to use the same package versions across all data pipelines forever. Already this naive assumption makes TaskFlow, in my opinion, not yet ready for any serious production workloads, as it would potentially encourage bad engineering practices and difficulty in making changes later when you may need to upgrade some package without breaking previously written data pipelines.

There are, of course, some ways to make it work, but they all require significant compromises. If you really want to use TaskFlow, your team might need to agree on specific package versions that will be installed within the Airflow environment. Then, you would have to stick with them to prevent scenarios when future changes to the environment would break the previously written workflows.

Another workaround would be to agree to always use low-level Python modules rather than higher-level packages. For instance, instead of reading a CSV file with pandas, you could do it with the native csv module, thereby reducing the dependency on external packages.

All of those workarounds seem to be highly impractical, if not impossible, to achieve. If at some point you need an upgraded version of some package, you will have to use either DockerOperator, PythonVirtualenvOperator or KubernetesPodOperator rather than the TaskFlow syntax because the TaskFlow abstraction only anticipates a “happy path” where all your data pipelines have exactly the same requirements and where the writes of pushed XComs to the relational database always succeed. But at this point, you may ask yourself whether such abstraction doesn’t defeat the purpose of a workflow management solution. If we can guarantee a happy path, we don’t really need a platform to manage workflows.

Photo by Pixabay from Pexels

XComs is a limitation in itself

Another limit is that TaskFlow API is built upon XComs, and XComs don’t provide true data sharing functionality, but rather provide an abstraction to only share small amounts of metadata between tasks. According to Marc Lamberti, Head of Customer Training at Astronomer, who has been teaching thousands of people on how to use Airflow:

“Airflow is an orchestrator and not a data processing framework. If you want to process gigabytes of data with Airflow, use Spark with all optimizations that It brings.” [4]

Similarly, Airflow consultants from GoDataDriven wrote in a book about Airflow:

XComs add a hidden dependency between tasks as the pulling task has an implicit dependency on the task pushing the required value. […] These hidden dependencies become even more complicated when sharing XCom values between different DAGs or execution dates, which is therefore also not a practice that we would recommend following.” [5]

Given that so many people with in-depth Airflow expertise say that we shouldn’t use Airflow to build data flow (i.e., workflow with data being passed from one task to another) and that XComs is not a recommended practice to accomplish that, why Airflow 2.0 has released TaskFlow API that builds on that? Only to confuse users and provide them with an abstraction that only addresses part of a problem, and under the hood makes use of another underlying abstraction that is not equipped to solve it reliably?

5. Discussion of the new Airflow’s abstractions

It may not be true, but I have the impression that TaskFlow and TaskGroup features don’t seem to reflect what the end-users actually need. Don’t get me wrong, both of those abstractions are useful, and I’m grateful for getting them with the free, open-source platform. Still, they appear to provide only partial solutions to problems that many data engineers are facing, such as separation of concerns in the workflow design, easily configurable and reliable data sharing between tasks, ability to quickly develop and locally test data workflows, as well as dependency resolution and maintainability of the entire system.

The new abstractions could imply moving too early to the solution domain (i.e., implementing features such as TaskFlow or TaskGroup) before fully understanding the problem domain (understanding the end-users and their needs). Already in August 2018, developers from Bluecore Engineering wrote a famous article in which they highlighted why they struggled with Airflow’s adoption at their company and what might be helpful to address those issues.

We’re All Using Airflow Wrong and How to Fix It
Tl;dr: only use Kubernetes Operatorsmedium.com

Here is a short quote from this article:

This means that all Python package dependencies from each workflow will need to be installed on each Airflow Worker for Operators to be executed successfully. […] Python package conflicts could prevent workflows from being run on the same Airflow instance altogether.

My interpretation of this article: if you want to use Airflow without headaches, use only operators that provide an isolated task environment, such as PythonVirtualenvOperator, DockerOperator or KubernetesPodOperator to prevent dependency clashes. Alternatively, use only Airflow Operators that do the actual data processing in external systems such as Spark, Snowflake, BigQuery, Redshift, etc. Given that your workflows can be executed on many different servers, the easiest way to ensure proper dependency management is to either offload data processing to an external system or package your code into an isolated and self-contained environment such as a docker container, a pod, or a virtual environment (which effectively means: forget about TaskFlow and say goodbye to small atomic tasks).

Even though TaskFlow and TaskGroup abstractions are useful, they don’t seem to be thoroughly thought through. And just imagine the confusion of users, such as myself, being constantly reminded: “Don’t pass data between tasks in Airflow”, and then getting a feature that literally has “flow” in the name and makes it easier than ever to do the actual data processing directly in Airflow.

Me after seeing TaskFlow released in Airflow 2.0 — Source

As for the TaskGroup feature, Airflow might need an abstraction that allows to trigger a DAG from another DAG and wait for its completion before moving on to the next task without relying on the start date match to achieve it. I shared how I tried to build that in Airflow in this article. This could provide a decoupled workflow design rather than just visually collapsing and expanding individual components.

Conclusion

To summarize, it’s best practice to use the right tool for the job. Airflow, at its core, doesn’t provide a first-class data sharing capability, and the TaskFlow API seems to be an attempt to get around it with no architectural redesign. With this article, I want to encourage you to think critically about the data engineering problems that you try to solve and ask yourself whether those new abstractions in Airflow 2.0 really address your needs. You may be better off keeping your data flow transformations in a tool specialized for that purpose, such as Spark, Dask, ETL tools, or dbt, and then if you want to use Airflow with them, use it just to schedule and trigger those jobs and manage their execution state, retries, logging, error notifications, etc. This way, you are using “the right tool for the job” with Airflow being just a scheduler and orchestrator.

It seems that with the new features, Airflow tries to be a one-size-fits-all solution at the cost of building abstractions that only address part of a problem.

Thank you for reading.

References & additional resources

[1] Start guide with astro CLI — Astronomer.io

[2] Airflow 2.0: DAG Authoring Redesigned — Polidea

[3] Apache Airflow 2.0 is here

[4] Marc Lamberti explaining XComs

Cover photo: https://imgflip.com

Image contains an affiliate link. If you join Datacamp this way, you're not only learning a ton of valuable information, but you're also supporting this blog.