Although being pretty late to the party (Airflow became an Apache Top-Level Project in 2019), I still had trouble finding an easy-to-understand, up-to-date, and lightweight solution to installing Airflow. In this post, we will create a lightweight, standalone, and easily deployed Apache Airflow development environment in just a few minutes.
Docker-Compose will be our close companion, allowing us to create a smooth development workflow with quick iteration cycles. Simply spin up a few docker containers and we can start to create our own workflows.
Note: The following setup will not be suitable for any production purposes and is intended to be used in a development environment only.
You can find the full codes for this post on my GitHub.
Apache Airflow is a batch-oriented framework that allows us to easily build scheduled data pipelines in Python. Think of “workflow as code” capable of executing any operation we can implement in Python.
Airflow is not a data processing tool itself. It’s an orchestration software. We can imagine Airflow as some kind of spider in a web. Sitting in the middle, pulling all the strings, and coordinating the workload of our data pipelines.
A data pipeline typically consists of several tasks or actions that need to be executed in a specific order. Apache Airflow models such a pipeline as a DAG (directed acyclic graph). A graph with directed edges or tasks without any loops or cycles.
This approach allows us to run independent tasks in parallel, saving time and money. Moreover, we can split a data pipeline into several smaller tasks. If a job fails, we can only rerun the failed and the downstream tasks, instead of executing the complete workflow all over again.
Airflow is composed of three main components:
Now that we shortly introduced Apache Airflow, it’s time to get started.
Since we will use docker-compose to get Airflow up and running, we have to install Docker first. Simply head over to the official Docker site and download the appropriate installation file for your OS.
We start nice and slow by simply creating a new folder for Airflow.
Just navigate via your preferred terminal to a directory, create a new folder, and change into it by running:
mkdir airflow
cd airflow
Next, we need to get our hands on a docker-compose file that specifies the required services or docker containers.
You can download docker-compose.yml file from this repo or simply create a new file named docker-compose.yml
and copy the below content.
---
version: '3.4'
x-common:
&common
image: apache/airflow:2.3.0
user: "${AIRFLOW_UID}:0"
env_file:
- .env
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- /var/run/docker.sock:/var/run/docker.sock
x-depends-on:
&depends-on
depends_on:
postgres:
condition: service_healthy
airflow-init:
condition: service_completed_successfully
services:
postgres:
image: postgres:13
container_name: postgres
ports:
- "5434:5432"
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
env_file:
- .env
scheduler:
<<: *common
<<: *depends-on
container_name: airflow-scheduler
command: scheduler
restart: on-failure
ports:
- "8793:8793"
webserver:
<<: *common
<<: *depends-on
container_name: airflow-webserver
restart: always
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 30s
retries: 5
airflow-init:
<<: *common
container_name: airflow-init
entrypoint: /bin/bash
command:
- -c
- |
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
The above docker-compose file simply specifies the required services we need to get Airflow up and running. Most importantly the scheduler, the webserver, the metadatabase (PostgreSQL), and the airflow-init job initializing the database.
At the top of the file, we make use of some local variables that are commonly used in every docker container or service.
We successfully created a docker-compose file with the mandatory services inside. However, to complete the installation process and configure Airflow properly, we need to provide some environment variables.
Still, inside your Airflow folder create a .env
file with the following content:
# Meta-Database
POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_DB=airflow
# Airflow Core
AIRFLOW__CORE__FERNET_KEY=UKMzEm3yIuFYEq1y3-2FxPNWSVwRASpahmQ9kQfEr8E=
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=True
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW_UID=0
# Backend DB
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS=False
# Airflow Init
_AIRFLOW_DB_UPGRADE=True
_AIRFLOW_WWW_USER_CREATE=True
_AIRFLOW_WWW_USER_USERNAME=airflow
_AIRFLOW_WWW_USER_PASSWORD=airflow
The above variables set the database credentials, the airflow user, and some further configurations. Most importantly, the kind of executor Airflow we will utilize. In our case, we make use of the LocalExecutor
.
Note: More information on the different kinds of executors can be found here.
And this is already it!
Just head over to the terminal and spin up all the necessary containers by running
docker compose up -d
After a short period of time, we can check the results and the Airflow Web UI by visiting http://localhost:8080
. Once we sign in with our credentials (airflow: airflow) we gain access to the user interface.
With a working Airflow environment, we can now create a simple DAG for testing purposes. First of all, make sure to run pip install apache-airflow
to install the required Python modules.
Now, inside your Airflow folder, navigate to dags
folder and create a new file called sample_dag.py
.
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id='first_sample_dag',
start_date=datetime(2022, 5, 28),
schedule_interval=None
) as dag:
start_task = EmptyOperator(
task_id='start'
)
print_hello_world = BashOperator(
task_id='print_hello_world',
bash_command='echo "HelloWorld!"'
)
end_task = EmptyOperator(
task_id='end'
)
start_task >> print_hello_world
print_hello_world >> end_task
We define a new DAG and some pretty simple tasks.
The EmptyOperator
serves no real purpose other than to create a mockup task inside the Web UI. By utilizing BashOperator
, we create a somewhat creative output of “HelloWorld!”. This allows us to visually confirm a proper running Airflow setup.
Save the file and head over to the Web UI. We can now start the DAG by manually triggering it.
Note: It may take a while before your DAG appears in the UI. We can speed things up by running the following command in our terminal:
docker exec -it --user airflow airflow-scheduler bash -c "airflow dags list"
Running the DAG shouldn’t take any longer than a couple of seconds.
Once finished, we can navigate to XComs
and inspect the output.
And this is it!
We successfully installed Airflow with docker-compose and gave it a quick test ride.
Note: We can stop the running containers by simply executing docker compose down
.
In the last section, we learned how to quickly spin up a development environment for Apache Airflow. However, we have yet to learn how to design an efficient workflow. Simply having a great tool at our fingertips won’t cut the deal alone — unfortunately.
Although Apache Airflow does a pretty good job at doing most of the heavy lifting for us, we still need to ensure certain key properties for each Airflow task, in order to obtain proper and consistent results. Luckily, a lot of best practices exist. Today, we begin with two of the most important concepts that apply universally to all workflows. Today, we learn about atomicity and idempotency.
Often used in the context of database systems, atomicity is one of the ACID properties and is considered an indivisible, irreducible series of operations such that either all occur or nothing at all. It is either performed entirely or not performed at all. In terms of Apache Airflow, that means a task should be defined in a way that allows for success with a proper result or complete failure without affecting the state of the system.
Let’s imagine, that we have to extract data from a CSV file, apply some transformations to it, and write the result to a database. Simple enough, right?
A bad, non-atomic approach would be the following. We extract the data line-by-line, apply the transformation right away, and upload the result immediately to the database. All within the same task.
Now, if some lines are corrupt and the task fails halfway through, we’re left with only a fragment of the desired results. Some lines are processed and already inserted — some are simply nonexistent. Debugging and rerunning this task while avoiding duplication would be a nightmare. An improved, atomic workflow might be defined like this.
So a general rule of thumb to keep in mind is to split up the operations into different tasks. One operation equals a single task — think Single-responsibility principle.
Unfortunately, this simple rule cannot be applied every time. Some operations are so tightly coupled, that it’s best to keep them in a single coherent unit of work. For example, authenticating to an API before executing the request.
Luckily for us, most Airflow operators are designed in an atomic fashion and can be used straight off the shelf. With the more flexible types of operators like the Bash or Python operator, however, we have to be more cautious and mindful when designing our workflow. Creating atomic Airflow tasks allows for the ability to recover from failure and rerun only the failed and downstream tasks. Atomicity provides easier maintainable and transparent workflows without hidden dependencies and side effects.
The concept of idempotency goes hand-in-hand with the idea of atomicity and describes a property of certain operations in mathematics and computer science. So the operations can be applied multiple times without changing the result beyond the initial application. Think of pressing the “on-button” on a control panel as an operation. Pressing this button multiple times has the same effect as just pressing it once. So what does this mean in the context of Apache Airflow?
Calling the same task multiple times with the same input has no additional effect. In other words, if rerunning a task without changing the input yields the same output it can be considered idempotent. Idempotency allows for decreased recovery time from failure and reduces data loss.
Now, let’s imagine our job is to fetch data from a database for a specific day and write the results to a CSV file. Rerunning this task for the same day should overwrite the existing file and produce the same output every time it is executed.
Suppose, we design our task in a different way that with each rerun we simply append the records to an existing file.
Now, we violate the concept of idempotency. Every rerun of the task produces a different result with duplicate records.
In general, tasks that write should check for existing records, overwrite or use UPSERT operations to conform to the rules of idempotency. For more general applications we have to, however, think carefully about all possible side effects.
Airflow is a batch-oriented framework that allows us to create complex data pipelines in Python. In this article, we created a simple and easy-to-use environment to quickly iterate and develop new workflows in Apache Airflow. By leveraging docker-compose we can get straight to work and code new workflows. However, such an environment should only be used for development purposes and is not suitable for any production environment that requires a more sophisticated and distributed setup of Apache Airflow.
Moreover, we covered two of the most important principles when designing DAGs in Apache Airflow: atomicity and idempotency. Committing those concepts to memory enables us to create better workflows that are recoverable, rerunnable, fault-tolerant, consistent, maintainable, transparent, and easier to understand. However, there are a lot more best practices to adhere to and consider when coding and creating the next workflow.
Source: